Synapse AnalyticsをAzure Functionsからクエリする方法

⏱️約5分
シェア:

Azure Synapse Analyticsを使って、Azure Functionsからクエリを呼び出す方法をまとめます。

手順

Data Factoryによるデータの変換とコピー

まず分析対象のデータを用意します。今回はストレージに貯めたログを分析対象とします。 分析データを分析しやすい形に加工したうえで Azure Data Lake Storage Gen2 に置きます。

ここではData Factoryというサービスを使います。 Data Factoryを使ってデータのコピーを行います。コピー時にデータの形式を選べるので Parquet 形式にします。

以下は Bicep のサンプルになります。 このサンプルでは、パスの設定でワイルドカードや式を使っていますが、そのあたりは要件に合わせて適宜変更してください。Data Factory Studio 上で UI から作成した方がわかりやすいと思います。Studio で作成したパイプライン等を Bicep ファイルとしてエクスポートもできます。

bicep
1param factoryName string
2param tags object
3@secure()
4param subscriptionId string
5param sourceResourceGroupName string
6param inputStorageAccountName string
7param outputStorageAccountName string
8
9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')
10
11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {
12 identity: {
13 type: 'SystemAssigned'
14 }
15 location: resourceGroup().location
16 name: factoryName
17 properties: {
18 publicNetworkAccess: 'Enabled'
19 }
20 tags: tags
21}
22
23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {
24 parent: factory
25 name: 'clicklog-pipeline'
26 dependsOn: [
27 inputDataSet
28 outputDataSet
29 ]
30 properties: {
31 activities: [
32 {
33 dependsOn: [
34 {
35 activity: 'LookupBlob'
36 dependencyConditions: [
37 'Succeeded'
38 ]
39 }
40 ]
41 policy: {
42 timeout: '0.12:00:00'
43 retry: 0
44 retryIntervalInSeconds: 30
45 secureOutput: false
46 secureInput: false
47 }
48 name: 'CopyAccessLog'
49 userProperties: [
50 {
51 name: 'Source'
52 value: 'container-name/<your path>/'
53 }
54 {
55 name: 'Destination'
56 value: 'destination-container-name//'
57 }
58 ]
59 type: 'Copy'
60 inputs: [
61 {
62 referenceName: 'SourceDataset'
63 type: 'DatasetReference'
64 parameters: {}
65 }
66 ]
67 outputs: [
68 {
69 referenceName: 'DestinationDataset'
70 type: 'DatasetReference'
71 parameters: {}
72 }
73 ]
74 typeProperties: {
75 source: {
76 type: 'JsonSource'
77 storeSettings: {
78 type: 'AzureBlobStorageReadSettings'
79 recursive: true
80 wildcardFolderPath: {
81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
82 type: 'Expression'
83 }
84 wildcardFileName: '*'
85 enablePartitionDiscovery: false
86 }
87 formatSettings: {
88 type: 'JsonReadSettings'
89 }
90 }
91 sink: {
92 type: 'ParquetSink'
93 storeSettings: {
94 type: 'AzureBlobStorageWriteSettings'
95 copyBehavior: 'FlattenHierarchy'
96 }
97 formatSettings: {
98 type: 'ParquetWriteSettings'
99 }
100 }
101 enableStaging: false
102 validateDataConsistency: false
103 translator: {
104 type: 'TabularTranslator'
105 mappings: [ // Define mappings here
106 {
107 source: {
108 path: '$[\'time\']'
109 }
110 sink: {
111 name: 'time'
112 type: 'DateTime'
113 }
114 }
115 // ...
116 ]
117 }
118 }
119 }
120 ]
121 policy: {
122 elapsedTimeMetric: {}
123 }
124 }
125}
126
127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
128 parent: factory
129 name: 'AzureBlobStorageInput'
130 properties: {
131 annotations: []
132 type: 'AzureBlobStorage'
133 typeProperties: {
134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'
135 accountKind: 'StorageV2'
136 }
137 // For remaining properties, see LinkedService objects
138 }
139}
140
141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
142 parent: factory
143 name: 'AzureBlobStorageOutput'
144 properties: {
145 annotations: []
146 type: 'AzureBlobStorage'
147 typeProperties: {
148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'
149 accountKind: 'StorageV2'
150 }
151 // For remaining properties, see LinkedService objects
152 }
153}
154
155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
156 parent: factory
157 name: 'DestinationDataset'
158 dependsOn: [
159 outputBlob
160 ]
161 properties: {
162 annotations: []
163 description: 'string'
164 folder: {
165 name: 'string'
166 }
167 linkedServiceName: {
168 referenceName: 'AzureBlobStorageOutput'
169 type: 'LinkedServiceReference'
170 }
171 typeProperties: {
172 location: {
173 type: 'AzureBlobStorageLocation'
174 folderPath: {
175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
176 type: 'Expression'
177 }
178 container: 'container-name'
179 }
180 compressionCodec: 'snappy'
181 }
182 type: 'Parquet'
183 // For remaining properties, see Dataset objects
184 }
185}
186
187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
188 parent: factory
189 name: 'SourceDataset'
190 dependsOn: [
191 inputBlob
192 ]
193 properties: {
194 annotations: []
195 description: 'string'
196 folder: {
197 name: 'string'
198 }
199 linkedServiceName: {
200 referenceName: 'AzureBlobStorageInput'
201 type: 'LinkedServiceReference'
202 }
203 typeProperties: {
204 location: {
205 type: 'AzureBlobStorageLocation'
206 folderPath: 'your-path'
207 container: 'container-name'
208 }
209 }
210 type: 'Json'
211 schema: {
212 type: 'object'
213 properties: {
214 time: {
215 type: 'string'
216 }
217 ...
218 }
219 }
220 // For remaining properties, see Dataset objects
221 }
222}
223
224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {
225 parent: factory
226 name: 'DailyTrigger'
227 dependsOn: [
228 pipelines
229 ]
230 properties: {
231 annotations: []
232 // runtimeState: 'Started'
233 pipelines: [
234 {
235 pipelineReference: {
236 referenceName: 'your-pipeline-name'
237 type: 'PipelineReference'
238 }
239 parameters: {}
240 }
241 ]
242 type: 'ScheduleTrigger'
243 typeProperties: {
244 recurrence: {
245 frequency: 'Day'
246 interval: 1
247 startTime: triggerStartTime
248 timeZone: 'UTC'
249 schedule: {
250 minutes: [
251 0
252 ]
253 hours: [
254 0
255 ]
256 }
257 }
258 }
259 // For remaining properties, see Trigger objects
260 }
261}
262
263output managedIdentityPrincipalId string = factory.identity.principalId

ここで出てきているリソースは以下のとおりです。

リソース説明
Microsoft.DataFactory/factoriesData Factoryの本体
Microsoft.DataFactory/factories/pipelinesパイプラインの定義。パイプラインでデータのコピー等を行います。
Microsoft.DataFactory/factories/linkedservicesこの linked service に入力と出力のストレージを紐付ける
Microsoft.DataFactory/factories/datasets入力と出力のデータセット
Microsoft.DataFactory/factories/triggersトリガーの設定。自動トリガーしないなら不要。

Synapseの作成

次に、Synapse Analyticsをデプロイします。 既定で組み込みのサーバーレス SQL プールがあるため、そこにデータベースを作成します。

データベースの作成

Built-inのサーバーレスSQLプールに対して、データベースを作成します。

sql
1CREATE DATABASE [myDB];
2GO
3
4USE [myDB];
5GO

外部データソースの追加

外部データソースを作成し、データレイクと接続しておきます。 このとき WorkspaceIdentity を作成し、この外部データソースへのアクセスを WorkspaceIdentity の権限で行うようにしておきます。

sql
1CREATE MASTER KEY
2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';
3
4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
5WITH IDENTITY = 'Managed Identity';
6GO
7
8CREATE EXTERNAL DATA SOURCE MyDataLake
9WITH (
10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',
11 CREDENTIAL = WorkspaceIdentity
12);
13GO

外部テーブルの作成

外部テーブルを作成し、先ほど作った外部データソースをDATA_SOURCEに指定します。 LOCATIONにはblobのパスを指定します。ワイルドカードを利用可能です。

sql
1-- スキーマの作成
2CREATE SCHEMA ext AUTHORIZATION dbo;
3GO
4
5CREATE SCHEMA func AUTHORIZATION dbo;
6GO
7
8-- ファイルフォーマットの作成
9CREATE EXTERNAL FILE FORMAT [ParquetFormat]
10WITH (FORMAT_TYPE = PARQUET);
11GO
12
13-- 外部テーブルの作成
14CREATE EXTERNAL TABLE ext.myTable
15(
16 -- プロパティ定義
17 time datetime2 NULL,
18 propertyA nvarchar(512) NULL,
19 propertyB nvarchar(512) NULL,
20 ...
21)
22WITH
23(
24 LOCATION = 'y=*/**',
25 DATA_SOURCE = [MyDataLake],
26 FILE_FORMAT = [ParquetFormat]
27);
28GO

Viewの作成

ビューを作成し、Functions からアクセスするプロパティなどを絞ります。

sql
1CREATE VIEW func.viewOfMyTable
2AS
3SELECT
4 [time],
5 [propertyA],
6 [propertyB],
7 ...
8FROM ext.myTable
9WHERE <何らかの条件があればここに書く>;
10GO

Service Principalへの権限付与

これは CI/CD でパイプラインからデータベースへの権限追加をするための設定です。 そういうことをする予定が無ければスキップで構いません。

パイプラインで使っている SPN の名前を使って、外部ユーザーを作成し、権限を付与します。

sql
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;
2GO
3
4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];
5GO
6
7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];
8GO

Synapse WorkspaceのManaged IdentityにEntra IDのディレクトリ閲覧者権限を与える

Functions に動的に権限を与える場合、Function 名を指定すると、Workspace 側でその Function 名に該当する Managed Identity の objectId を自動で参照し、権限を付与してくれます。 これを行うためにはWorkspace自身がディレクトリ閲覧者権限を持っている必要があるようです。

Entra ID > ロールと管理者 > ディレクトリ閲覧者 > 割り当てを追加で Workspace の Identity に権限を追加してください。 Workspace の Managed Identity の objectId は Synapse Studio の管理メニューのCredentialsで確認できます。

パイプライン上でのFunctionsへの権限付与

新しい Function をデプロイするときに、その Function に対してデータベースへのアクセス権を付与します。 Azure Pipelines 上で実行するために、私は以下の手順を行いました。

  1. SQL のスクリプトを用意
  2. スクリプト内の Function 名をパイプライン実行時に動的に書き換え
  3. SqlAzureDacpacDeployment@1を使って SQL を実行

順に説明していきます。

SQL を用意

以下のような SQL を用意しておきます。FunctionAppMiNameの部分が後で対象の Function 名に変わります。

sql
1USE [myDB];
2
3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')
4BEGIN
5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';
6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;
7END
8ELSE
9BEGIN
10 PRINT 'User [FunctionAppMiName] already exists.';
11END;
12
13IF NOT EXISTS (
14 SELECT 1
15 FROM sys.database_role_members rm
16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id
17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id
18 WHERE r.name = N'db_datareader'
19 AND m.name = N'FunctionAppMiName'
20)
21BEGIN
22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';
23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];
24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];
25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];
26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];
27END
28ELSE
29BEGIN
30 PRINT 'Already in db_datareader.';
31END;

権限について軽く説明しておきます。

db_datareaderはデータを参照するのに必要な権限です。 WorkspaceIdentityへの参照権限は、外部データソースを作ったときに用意した WorkspaceIdentity への参照権限がないとデータソースにアクセスできなかったので追加しました。 あとは、ビューへのアクセス権を追加し、外部テーブルへの直接のクエリを禁止したいので、スキーマへのアクセスを DENY しています。

Function 名の挿入

私は以下のようにPowerShell@2のタスクを使って SQL ファイルを書き換えましたが、ファイル内の Function 名を置き換えることができれば、任意の方法で構いません。

bash
1- task: PowerShell@2
2 displayName: 'Generate grant-function-mi.sql from template'
3 inputs:
4 targetType: 'inline'
5 pwsh: true
6 script: |
7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"
8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"
9
10 if (-not (Test-Path $templatePath)) {
11 throw "Template not found: $templatePath"
12 }
13
14 # テンプレート読み込み
15 $content = Get-Content $templatePath -Raw
16
17 # 置換(${DB_NAME}, ${MI_NAME} をパイプライン変数で埋める)
18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'
19
20 # 出力先ディレクトリが無ければ作成
21 $dir = Split-Path $outputPath
22 if (-not (Test-Path $dir)) {
23 New-Item -Path $dir -ItemType Directory -Force | Out-Null
24 }
25
26 # SQL を出力
27 $content | Set-Content $outputPath -Encoding UTF8
28
29 Write-Host "Generated SQL file:"
30 Get-Content $outputPath

SQL 実行

先ほど書き換えた SQL を実行します。 以下のようなタスクで実行します。

bash
1- task: SqlAzureDacpacDeployment@1
2 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'
3 inputs:
4 # Azure への接続(サービス接続名)
5 azureSubscription: '${{parameters.azureServiceConnection}}'
6
7 # Synapse serverless SQL endpoint
8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'
9 DatabaseName: 'myDB'
10
11 # 認証(サービス接続のSPNで行う場合)
12 SqlUserName: '' # 空でOK
13 SqlPassword: '' # 空でOK
14 AuthenticationType: 'servicePrincipal'
15
16 # 実行モード: DACPAC ではなく 「SQL スクリプト」
17 deployType: 'SqlTask'
18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # ↑ 上で作った .sql ファイルへのパス
19
20 # オプション(必要に応じて)
21 IpDetectionMethod: 'AutoDetect'

私の場合は、SqlAzureDacpacDeployment@1を使いましたが、この方法はパラメータとして Function 名を渡すことができません。したがって、事前に SQL を書き換えて Function 名を追加した上でタスクを呼び出す手順となっています。

ポイントは SQL ファイルに書いたように Function に対して権限を与えることなので、必ずしもこの手順である必要はありません。

Functions からのクエリ

Functions からはビューに対して、以下のようなクエリを送信します。 .NETではMicrosoft.Data.SqlClientのパッケージを使っています。

csharp
1var queryString = @"
2SELECT TOP (@maxItemCount)
3 [time],
4 [requestUri]
5 -- ...
6FROM func.viewOfMyTable
7WHERE
8 [time] >= @startDate
9 AND [time] <= @endDate
10ORDER BY
11 [time] DESC;
12";

まとめ

Functions からクエリするためには、Function に対して権限を付与する必要があります。 また、権限追加を CI/CD パイプラインから行おうとすると SPN に対する権限設定も必要となり、このあたりが少しややこしいですね。 Managed Identity の権限の話とデータベースのアクセス権の話があるので、混同しないように気をつけましょう。

また、今回は外部データソースやビューを作成しました。Functions から直接OPENROWSETしてデータにアクセスする方法もあるとは思いますが、Function により大きな権限を与える必要があることなどから選択していません。

シェア:

関連記事