Synapse AnalyticsをAzure Functionsからクエリする方法

Azure Synapse Analyticsを使って、Azure Functionsからクエリを呼び出す方法をまとめます。
手順
Data Factoryによるデータの変換とコピー
まず分析対象のデータを用意します。今回はストレージに貯めたログを分析対象とします。 分析データを分析しやすい形に加工したうえで Azure Data Lake Storage Gen2 に置きます。
ここではData Factoryというサービスを使います。 Data Factoryを 使ってデータのコピーを行います。コピー時にデータの形式を選べるので Parquet 形式にします。
以下は Bicep のサンプルになります。 このサンプルでは、パスの設定でワイルドカードや式を使っていますが、そのあたりは要件に合わせて適宜変更してください。Data Factory Studio 上で UI から作成した方がわかりやすいと思います。Studio で作成したパイプライン等を Bicep ファイルとしてエクスポートもできます。
1param factoryName string2param tags object3@secure()4param subscriptionId string5param sourceResourceGroupName string6param inputStorageAccountName string7param outputStorageAccountName string8 9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')10 11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {12 identity: {13 type: 'SystemAssigned'14 }15 location: resourceGroup().location16 name: factoryName17 properties: {18 publicNetworkAccess: 'Enabled'19 }20 tags: tags21}22 23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {24 parent: factory25 name: 'clicklog-pipeline'26 dependsOn: [27 inputDataSet28 outputDataSet29 ]30 properties: {31 activities: [32 {33 dependsOn: [34 {35 activity: 'LookupBlob'36 dependencyConditions: [37 'Succeeded'38 ]39 }40 ]41 policy: {42 timeout: '0.12:00:00'43 retry: 044 retryIntervalInSeconds: 3045 secureOutput: false46 secureInput: false47 }48 name: 'CopyAccessLog'49 userProperties: [50 {51 name: 'Source'52 value: 'container-name/<your path>/'53 }54 {55 name: 'Destination'56 value: 'destination-container-name//'57 }58 ]59 type: 'Copy'60 inputs: [61 {62 referenceName: 'SourceDataset'63 type: 'DatasetReference'64 parameters: {}65 }66 ]67 outputs: [68 {69 referenceName: 'DestinationDataset'70 type: 'DatasetReference'71 parameters: {}72 }73 ]74 typeProperties: {75 source: {76 type: 'JsonSource'77 storeSettings: {78 type: 'AzureBlobStorageReadSettings'79 recursive: true80 wildcardFolderPath: {81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'82 type: 'Expression'83 }84 wildcardFileName: '*'85 enablePartitionDiscovery: false86 }87 formatSettings: {88 type: 'JsonReadSettings'89 }90 }91 sink: {92 type: 'ParquetSink'93 storeSettings: {94 type: 'AzureBlobStorageWriteSettings'95 copyBehavior: 'FlattenHierarchy'96 }97 formatSettings: {98 type: 'ParquetWriteSettings'99 }100 }101 enableStaging: false102 validateDataConsistency: false103 translator: {104 type: 'TabularTranslator'105 mappings: [ // Define mappings here106 {107 source: {108 path: '$[\'time\']'109 }110 sink: {111 name: 'time'112 type: 'DateTime'113 }114 }115 // ...116 ]117 }118 }119 }120 ]121 policy: {122 elapsedTimeMetric: {}123 }124 }125}126 127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {128 parent: factory129 name: 'AzureBlobStorageInput'130 properties: {131 annotations: []132 type: 'AzureBlobStorage'133 typeProperties: {134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'135 accountKind: 'StorageV2'136 }137 // For remaining properties, see LinkedService objects138 }139}140 141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {142 parent: factory143 name: 'AzureBlobStorageOutput'144 properties: {145 annotations: []146 type: 'AzureBlobStorage'147 typeProperties: {148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'149 accountKind: 'StorageV2'150 }151 // For remaining properties, see LinkedService objects152 }153}154 155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {156 parent: factory157 name: 'DestinationDataset'158 dependsOn: [159 outputBlob160 ]161 properties: {162 annotations: []163 description: 'string'164 folder: {165 name: 'string'166 }167 linkedServiceName: {168 referenceName: 'AzureBlobStorageOutput'169 type: 'LinkedServiceReference'170 }171 typeProperties: {172 location: {173 type: 'AzureBlobStorageLocation'174 folderPath: {175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'176 type: 'Expression'177 }178 container: 'container-name'179 }180 compressionCodec: 'snappy'181 }182 type: 'Parquet'183 // For remaining properties, see Dataset objects184 }185}186 187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {188 parent: factory189 name: 'SourceDataset'190 dependsOn: [191 inputBlob192 ]193 properties: {194 annotations: []195 description: 'string'196 folder: {197 name: 'string'198 }199 linkedServiceName: {200 referenceName: 'AzureBlobStorageInput'201 type: 'LinkedServiceReference'202 }203 typeProperties: {204 location: {205 type: 'AzureBlobStorageLocation'206 folderPath: 'your-path'207 container: 'container-name'208 }209 }210 type: 'Json'211 schema: {212 type: 'object'213 properties: {214 time: {215 type: 'string'216 }217 ...218 }219 }220 // For remaining properties, see Dataset objects221 }222}223 224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {225 parent: factory226 name: 'DailyTrigger'227 dependsOn: [228 pipelines229 ]230 properties: {231 annotations: []232 // runtimeState: 'Started'233 pipelines: [234 {235 pipelineReference: {236 referenceName: 'your-pipeline-name'237 type: 'PipelineReference'238 }239 parameters: {}240 }241 ]242 type: 'ScheduleTrigger'243 typeProperties: {244 recurrence: {245 frequency: 'Day'246 interval: 1247 startTime: triggerStartTime248 timeZone: 'UTC'249 schedule: {250 minutes: [251 0252 ]253 hours: [254 0255 ]256 }257 }258 }259 // For remaining properties, see Trigger objects260 }261}262 263output managedIdentityPrincipalId string = factory.identity.principalIdここで出てきているリソースは以下のとおりです。
| リソース | 説明 |
|---|---|
| Microsoft.DataFactory/factories | Data Factoryの本体 |
| Microsoft.DataFactory/factories/pipelines | パイプラインの定義。パイプラインでデータのコピー等を行います。 |
| Microsoft.DataFactory/factories/linkedservices | この linked service に入力と出力のストレージを紐付ける |
| Microsoft.DataFactory/factories/datasets | 入力と出力のデータセット |
| Microsoft.DataFactory/factories/triggers | トリガーの 設定。自動トリガーしないなら不要。 |
Synapseの作成
次に、Synapse Analyticsをデプロイします。 既定で組み込みのサーバーレス SQL プールがあるため、そこにデータベースを作成します。
データベースの作成
Built-inのサーバーレスSQLプールに対して、データベースを作成します。
1CREATE DATABASE [myDB];2GO3 4USE [myDB];5GO外部データソースの追加
外部データソースを作成し、データレイクと接続しておきます。 このとき WorkspaceIdentity を作成し、この外部データソースへのアクセスを WorkspaceIdentity の権限で行うようにしておきます。
1CREATE MASTER KEY2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';3 4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity5WITH IDENTITY = 'Managed Identity';6GO7 8CREATE EXTERNAL DATA SOURCE MyDataLake9WITH (10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',11 CREDENTIAL = WorkspaceIdentity12);13GO外部テーブルの作成
外部テーブルを作成し、先ほど作った外部データソースをDATA_SOURCEに指定します。 LOCATIONにはblobのパスを指定します。ワイルドカードを利用可能です。
1-- スキーマの作成2CREATE SCHEMA ext AUTHORIZATION dbo;3GO4 5CREATE SCHEMA func AUTHORIZATION dbo;6GO7 8-- ファイルフォーマットの作成9CREATE EXTERNAL FILE FORMAT [ParquetFormat]10WITH (FORMAT_TYPE = PARQUET);11GO12 13-- 外部テーブルの作成14CREATE EXTERNAL TABLE ext.myTable15(16 -- プロパティ定義17 time datetime2 NULL,18 propertyA nvarchar(512) NULL,19 propertyB nvarchar(512) NULL,20 ...21)22WITH23(24 LOCATION = 'y=*/**',25 DATA_SOURCE = [MyDataLake],26 FILE_FORMAT = [ParquetFormat]27);28GOViewの作成
ビューを作成し、Functions からアクセスするプロパティなどを絞ります。
1CREATE VIEW func.viewOfMyTable2AS3SELECT4 [time],5 [propertyA],6 [propertyB],7 ...8FROM ext.myTable9WHERE <何らかの条件があればここに書く>;10GOService Principalへの権限付与
これは CI/CD でパイプラインからデータベースへの権限追加をするための設定です。 そういうことをする予定が無ければスキップで構いません。
パイプラインで使っている SPN の名前を使って、外部ユーザーを作成し、権限を付与します。
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;2GO3 4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];5GO6 7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];8GOSynapse WorkspaceのManaged IdentityにEntra IDのディレクトリ閲覧者権限を与える
Functions に動的に権限を与える場合、Function 名を指定すると、Workspace 側でその Function 名に該当する Managed Identity の objectId を自動で参照し、権限を付与してくれます。 これを行うためにはWorkspace自身がディレクトリ閲覧者権限を持っている必要があるようです。
Entra ID > ロールと管理者 > ディレクトリ閲覧者 > 割り当てを追加で Workspace の Identity に権限を追加してください。
Workspace の Managed Identity の objectId は Synapse Studio の管理メニューのCredentialsで確認できます。
パイプライン上でのFunctionsへの権限付与
新しい Function をデプロイするときに、その Function に対してデータベースへのアクセス権を付与します。 Azure Pipelines 上で実行するために、私は以下の手順を行いました。
- SQL のスクリプトを用意
- スクリプト内の Function 名をパイプライン実行時に動的に書き換え
SqlAzureDacpacDeployment@1を使って SQL を実行
順に説明していきます。
SQL を用意
以下のような SQL を用意しておきます。FunctionAppMiNameの部分が後で対象の Function 名に変わります。
1USE [myDB];2 3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')4BEGIN5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;7END8ELSE9BEGIN10 PRINT 'User [FunctionAppMiName] already exists.';11END;12 13IF NOT EXISTS (14 SELECT 115 FROM sys.database_role_members rm16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id18 WHERE r.name = N'db_datareader'19 AND m.name = N'FunctionAppMiName'20)21BEGIN22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];27END28ELSE29BEGIN30 PRINT 'Already in db_datareader.';31END;権限について軽く説明しておきます。
db_datareaderはデータを参照するのに必要な権限です。
WorkspaceIdentityへの参照権限は、外部データソースを作ったときに用意した WorkspaceIdentity への参照権限がないとデータソースにアクセスできなかったので追加しました。
あとは、ビューへのアクセス権を追加し、外部テーブルへの直接のクエリを禁止したいので、スキーマへのアクセスを DENY しています。
Function 名の挿入
私は以下のようにPowerShell@2のタスクを使って SQL ファイルを書き換えましたが、ファイル内の Function 名を置き換えることができれば、任意の方法で構いません。
1- task: PowerShell@22 displayName: 'Generate grant-function-mi.sql from template'3 inputs:4 targetType: 'inline'5 pwsh: true6 script: |7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"9 10 if (-not (Test-Path $templatePath)) {11 throw "Template not found: $templatePath"12 }13 14 # テンプレート読み込み15 $content = Get-Content $templatePath -Raw16 17 # 置換(${DB_NAME}, ${MI_NAME} をパイプライン変数で埋める)18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'19 20 # 出力先ディレクトリが無ければ作成21 $dir = Split-Path $outputPath22 if (-not (Test-Path $dir)) {23 New-Item -Path $dir -ItemType Directory -Force | Out-Null24 }25 26 # SQL を出力27 $content | Set-Content $outputPath -Encoding UTF828 29 Write-Host "Generated SQL file:"30 Get-Content $outputPathSQL 実行
先ほど書き換えた SQL を実行します。 以下のようなタスクで実行します。
1- task: SqlAzureDacpacDeployment@12 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'3 inputs:4 # Azure への接続(サービス接続名)5 azureSubscription: '${{parameters.azureServiceConnection}}'6 7 # Synapse serverless SQL endpoint8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'9 DatabaseName: 'myDB'10 11 # 認証(サービス接続のSPNで行う場合)12 SqlUserName: '' # 空でOK13 SqlPassword: '' # 空でOK14 AuthenticationType: 'servicePrincipal'15 16 # 実行モード: DACPAC ではなく 「SQL スクリプト」17 deployType: 'SqlTask'18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # ↑ 上で作った .sql ファイルへのパス19 20 # オプション(必要に応じて)21 IpDetectionMethod: 'AutoDetect'私の場合は、SqlAzureDacpacDeployment@1を使いましたが、この方法はパラメータとして Function 名を渡すことができません。したがって、事前に SQL を書き換えて Function 名を追加した上でタスクを呼び出す手順となっています。
ポイントは SQL ファイルに書いたように Function に対して権限を与えることなので、必ずしもこの手順である必要はありません。
Functions からのクエリ
Functions からはビューに対して、以下のようなクエリを送信します。
.NETではMicrosoft.Data.SqlClientのパッケージを使っています。
1var queryString = @"2SELECT TOP (@maxItemCount)3 [time],4 [requestUri]5 -- ...6FROM func.viewOfMyTable7WHERE8 [time] >= @startDate9 AND [time] <= @endDate10ORDER BY11 [time] DESC;12";まとめ
Functions からクエリするためには、Function に対して権限を付与する必要があります。 また、権限追加を CI/CD パイプラインから行おうとすると SPN に対する権限設定も必要となり、このあたりが少しややこしいですね。 Managed Identity の権限の話とデータベースのアクセス権の話があるので、混同しないように気をつけましょう。
また、今回は外部データソースやビューを作成しました。Functions から直接OPENROWSETしてデータにアクセスす る方法もあるとは思いますが、Function により大きな権限を与える必要があることなどから選択していません。