如何从 Azure Functions 查询 Synapse Analytics

本文整理了如何使用 Azure Synapse Analytics,从 Azure Functions 发起查询。
步骤
使用 Data Factory 转换并复制数据
首先准备要分析的数据。这里以存储账户中累积的日志为分析对象。 将分析数据加工成更适合分析的形态后,放到 Azure Data Lake Storage Gen2。
这里使用名为 Data Factory 的服务。 用 Data Factory 执行数据复制。复制时可以选择数据格式,因此这里将其转换为 Parquet 格式。
下面是一个 Bicep 示例。 该示例在路径设置中使用了通配符和表达式,但请根据需求自行调整。通常在 Data Factory Studio 里通过 UI 创建会更容易理解。你也可以将 Studio 中创建的管道等导出为 Bicep 文件。
1param factoryName string2param tags object3@secure()4param subscriptionId string5param sourceResourceGroupName string6param inputStorageAccountName string7param outputStorageAccountName string8 9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')10 11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {12 identity: {13 type: 'SystemAssigned'14 }15 location: resourceGroup().location16 name: factoryName17 properties: {18 publicNetworkAccess: 'Enabled'19 }20 tags: tags21}22 23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {24 parent: factory25 name: 'clicklog-pipeline'26 dependsOn: [27 inputDataSet28 outputDataSet29 ]30 properties: {31 activities: [32 {33 dependsOn: [34 {35 activity: 'LookupBlob'36 dependencyConditions: [37 'Succeeded'38 ]39 }40 ]41 policy: {42 timeout: '0.12:00:00'43 retry: 044 retryIntervalInSeconds: 3045 secureOutput: false46 secureInput: false47 }48 name: 'CopyAccessLog'49 userProperties: [50 {51 name: 'Source'52 value: 'container-name/<your path>/'53 }54 {55 name: 'Destination'56 value: 'destination-container-name//'57 }58 ]59 type: 'Copy'60 inputs: [61 {62 referenceName: 'SourceDataset'63 type: 'DatasetReference'64 parameters: {}65 }66 ]67 outputs: [68 {69 referenceName: 'DestinationDataset'70 type: 'DatasetReference'71 parameters: {}72 }73 ]74 typeProperties: {75 source: {76 type: 'JsonSource'77 storeSettings: {78 type: 'AzureBlobStorageReadSettings'79 recursive: true80 wildcardFolderPath: {81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'82 type: 'Expression'83 }84 wildcardFileName: '*'85 enablePartitionDiscovery: false86 }87 formatSettings: {88 type: 'JsonReadSettings'89 }90 }91 sink: {92 type: 'ParquetSink'93 storeSettings: {94 type: 'AzureBlobStorageWriteSettings'95 copyBehavior: 'FlattenHierarchy'96 }97 formatSettings: {98 type: 'ParquetWriteSettings'99 }100 }101 enableStaging: false102 validateDataConsistency: false103 translator: {104 type: 'TabularTranslator'105 mappings: [ // Define mappings here106 {107 source: {108 path: '$[\'time\']'109 }110 sink: {111 name: 'time'112 type: 'DateTime'113 }114 }115 // ...116 ]117 }118 }119 }120 ]121 policy: {122 elapsedTimeMetric: {}123 }124 }125}126 127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {128 parent: factory129 name: 'AzureBlobStorageInput'130 properties: {131 annotations: []132 type: 'AzureBlobStorage'133 typeProperties: {134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'135 accountKind: 'StorageV2'136 }137 // For remaining properties, see LinkedService objects138 }139}140 141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {142 parent: factory143 name: 'AzureBlobStorageOutput'144 properties: {145 annotations: []146 type: 'AzureBlobStorage'147 typeProperties: {148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'149 accountKind: 'StorageV2'150 }151 // For remaining properties, see LinkedService objects152 }153}154 155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {156 parent: factory157 name: 'DestinationDataset'158 dependsOn: [159 outputBlob160 ]161 properties: {162 annotations: []163 description: 'string'164 folder: {165 name: 'string'166 }167 linkedServiceName: {168 referenceName: 'AzureBlobStorageOutput'169 type: 'LinkedServiceReference'170 }171 typeProperties: {172 location: {173 type: 'AzureBlobStorageLocation'174 folderPath: {175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'176 type: 'Expression'177 }178 container: 'container-name'179 }180 compressionCodec: 'snappy'181 }182 type: 'Parquet'183 // For remaining properties, see Dataset objects184 }185}186 187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {188 parent: factory189 name: 'SourceDataset'190 dependsOn: [191 inputBlob192 ]193 properties: {194 annotations: []195 description: 'string'196 folder: {197 name: 'string'198 }199 linkedServiceName: {200 referenceName: 'AzureBlobStorageInput'201 type: 'LinkedServiceReference'202 }203 typeProperties: {204 location: {205 type: 'AzureBlobStorageLocation'206 folderPath: 'your-path'207 container: 'container-name'208 }209 }210 type: 'Json'211 schema: {212 type: 'object'213 properties: {214 time: {215 type: 'string'216 }217 ...218 }219 }220 // For remaining properties, see Dataset objects221 }222}223 224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {225 parent: factory226 name: 'DailyTrigger'227 dependsOn: [228 pipelines229 ]230 properties: {231 annotations: []232 // runtimeState: 'Started'233 pipelines: [234 {235 pipelineReference: {236 referenceName: 'your-pipeline-name'237 type: 'PipelineReference'238 }239 parameters: {}240 }241 ]242 type: 'ScheduleTrigger'243 typeProperties: {244 recurrence: {245 frequency: 'Day'246 interval: 1247 startTime: triggerStartTime248 timeZone: 'UTC'249 schedule: {250 minutes: [251 0252 ]253 hours: [254 0255 ]256 }257 }258 }259 // For remaining properties, see Trigger objects260 }261}262 263output managedIdentityPrincipalId string = factory.identity.principalId这里出现的资源如下。
| 资源 | 说明 |
|---|---|
| Microsoft.DataFactory/factories | Data Factory 本体 |
| Microsoft.DataFactory/factories/pipelines | 管道定义。用于执行数据复制等。 |
| Microsoft.DataFactory/factories/linkedservices | 将输入与输出存储关联到该 linked service |
| Microsoft.DataFactory/factories/datasets | 输入与输出的数据集 |
| Microsoft.DataFactory/factories/triggers | 触发器设置。不需要自动触发时可省略。 |
创建 Synapse
接下来部署 Synapse Analytics。 由于默认提供内置的无服务器 SQL 池,因此我们在其中创建数据库。
创建数据库
在内置的无服务器 SQL 池中创建数据库。
1CREATE DATABASE [myDB];2GO3 4USE [myDB];5GO添加外部数据源
创建外部数据源,并连接到数据湖。
此时创建 WorkspaceIdentity,并让该外部数据源的访问使用 WorkspaceIdentity 的权限。
1CREATE MASTER KEY2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';3 4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity5WITH IDENTITY = 'Managed Identity';6GO7 8CREATE EXTERNAL DATA SOURCE MyDataLake9WITH (10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',11 CREDENTIAL = WorkspaceIdentity12);13GO创建外部表
创建外部表,并将 DATA_SOURCE 指定为之前创建的外部数据源。
LOCATION 里指定 blob 的路径,可以使用通配符。
1-- 创建架构2CREATE SCHEMA ext AUTHORIZATION dbo;3GO4 5CREATE SCHEMA func AUTHORIZATION dbo;6GO7 8-- 创建文件格式9CREATE EXTERNAL FILE FORMAT [ParquetFormat]10WITH (FORMAT_TYPE = PARQUET);11GO12 13-- 创建外部表14CREATE EXTERNAL TABLE ext.myTable15(16 -- 列定义17 time datetime2 NULL,18 propertyA nvarchar(512) NULL,19 propertyB nvarchar(512) NULL,20 ...21)22WITH23(24 LOCATION = 'y=*/**',25 DATA_SOURCE = [MyDataLake],26 FILE_FORMAT = [ParquetFormat]27);28GO创建 View
创建视图,并筛选出 Functions 需要访问的字段等。
1CREATE VIEW func.viewOfMyTable2AS3SELECT4 [time],5 [propertyA],6 [propertyB],7 ...8FROM ext.myTable9WHERE <如有需要,在此填写条件>;10GO为 Service Principal 授权
这是为了在 CI/CD 中从流水线向数据库添加权限的设置。 如果没有这样的计划,可以跳 过。
使用流水线中使用的 SPN 名称创建外部用户,并授予权限。
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;2GO3 4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];5GO6 7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];8GO给 Synapse Workspace 的托管 标识授予 Entra ID 的“目录读取者”权限
如果要动态授予 Functions 权限,只要指定 Function 名称,Workspace 侧会自动查找与该 Function 名称对应的托管标识 objectId,并完成授权。 要实现这一点,Workspace 自身似乎需要拥有“目录读取者”权限。
请在 Entra ID > 角色与管理员 > 目录读取者 > 添加分配 中,将该角色分配给 Workspace 的 Identity。
Workspace 的托管标识 objectId 可在 Synapse Studio 的管理菜单 Credentials 中确认。
在流水线上为 Functions 授权
当你部署新的 Function 时,需要为该 Function 授予数据库访问权限。 为了在 Azure Pipelines 上执行,我使用了以下步骤。
- 准备 SQL 脚本
- 在流水线运行时动态替换脚本中的 Function 名称
- 使用
SqlAzureDacpacDeployment@1执行 SQL
下面按顺序说明。
准备 SQL
准备如下 SQL。FunctionAppMiName 的部分稍后会替换为目标 Function 名称。
1USE [myDB];2 3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')4BEGIN5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;7END8ELSE9BEGIN10 PRINT 'User [FunctionAppMiName] already exists.';11END;12 13IF NOT EXISTS (14 SELECT 115 FROM sys.database_role_members rm16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id18 WHERE r.name = N'db_datareader'19 AND m.name = N'FunctionAppMiName'20)21BEGIN22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];27END28ELSE29BEGIN30 PRINT 'Already in db_datareader.';31END;简单说明一下权限:
db_datareader 是读取数据所需的权限。
另外我授予了对 WorkspaceIdentity 的引用权限,因为在创建外部数据源时,如果没有对 WorkspaceIdentity 的引用权限就无法访问数据源。
最后,为了只允许通过视图访问并禁止直接查询外部表,我授予视图访问权限并对 schema 进行 DENY。
注入 Function 名称
我使用 PowerShell@2 任务按如下方式改写 SQL 文件;只要能替换文件中的 Function 名称,任何方式都可以。
1- task: PowerShell@22 displayName: 'Generate grant-function-mi.sql from template'3 inputs:4 targetType: 'inline'5 pwsh: true6 script: |7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"9 10 if (-not (Test-Path $templatePath)) {11 throw "Template not found: $templatePath"12 }13 14 # 读取模板15 $content = Get-Content $templatePath -Raw16 17 # 替换(用流水线变量填充)18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'19 20 # 如果输出目录不存在则创建21 $dir = Split-Path $outputPath22 if (-not (Test-Path $dir)) {23 New-Item -Path $dir -ItemType Directory -Force | Out-Null24 }25 26 # 输出 SQL27 $content | Set-Content $outputPath -Encoding UTF828 29 Write-Host "Generated SQL file:"30 Get-Content $outputPath执行 SQL
执行刚刚改写后的 SQL。 可以用如下任务来执行。
1- task: SqlAzureDacpacDeployment@12 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'3 inputs:4 # Azure 连接(服务连接名称)5 azureSubscription: '${{parameters.azureServiceConnection}}'6 7 # Synapse serverless SQL endpoint8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'9 DatabaseName: 'myDB'10 11 # 身份验证(使用服务连接的 SPN)12 SqlUserName: '' # 可留空13 SqlPassword: '' # 可留空14 AuthenticationType: 'servicePrincipal'15 16 # 执行模式:SQL 脚本(非 DACPAC)17 deployType: 'SqlTask'18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # 上面生成的 .sql 文件路径19 20 # 选项(按需)21 IpDetectionMethod: 'AutoDetect'我这里使用了 SqlAzureDacpacDeployment@1,但该方法无法把 Function 名称作为参数传入。因此需要在调用任务之前先改写 SQL,把 Function 名称写入。
关键点是像 SQL 文件中那样为 Function 授权,所以不一定必须按完全相同的步骤来做。
从 Functions 发起查询
从 Functions 对视图发送如下查询。
在 .NET 中使用 Microsoft.Data.SqlClient 包。
1var queryString = @"2SELECT TOP (@maxItemCount)3 [time],4 [requestUri]5 -- ...6FROM func.viewOfMyTable7WHERE8 [time] >= @startDate9 AND [time] <= @endDate10ORDER BY11 [time] DESC;12";总结
要从 Functions 发起查询,需要为 Function 授予权限。 另外,如果要在 CI/CD 流水线中进行授权,还需要为 SPN 做权限设置,这部分会稍微复杂一些。 由于同时涉及托管标识权限与数据库访问权限两类概念,注意不要混淆。
本文中我们创建了外部数据源和视图。虽然也可以让 Functions 直接通过 OPENROWSET 访问数据,但那通常需要给 Function 更大的权限,因此这里没有采用。


