如何从 Azure Functions 查询 Synapse Analytics

⏱️约4分钟
分享:

本文整理了如何使用 Azure Synapse Analytics,从 Azure Functions 发起查询。

步骤

使用 Data Factory 转换并复制数据

首先准备要分析的数据。这里以存储账户中累积的日志为分析对象。 将分析数据加工成更适合分析的形态后,放到 Azure Data Lake Storage Gen2。

这里使用名为 Data Factory 的服务。 用 Data Factory 执行数据复制。复制时可以选择数据格式,因此这里将其转换为 Parquet 格式。

下面是一个 Bicep 示例。 该示例在路径设置中使用了通配符和表达式,但请根据需求自行调整。通常在 Data Factory Studio 里通过 UI 创建会更容易理解。你也可以将 Studio 中创建的管道等导出为 Bicep 文件。

bicep
1param factoryName string
2param tags object
3@secure()
4param subscriptionId string
5param sourceResourceGroupName string
6param inputStorageAccountName string
7param outputStorageAccountName string
8
9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')
10
11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {
12 identity: {
13 type: 'SystemAssigned'
14 }
15 location: resourceGroup().location
16 name: factoryName
17 properties: {
18 publicNetworkAccess: 'Enabled'
19 }
20 tags: tags
21}
22
23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {
24 parent: factory
25 name: 'clicklog-pipeline'
26 dependsOn: [
27 inputDataSet
28 outputDataSet
29 ]
30 properties: {
31 activities: [
32 {
33 dependsOn: [
34 {
35 activity: 'LookupBlob'
36 dependencyConditions: [
37 'Succeeded'
38 ]
39 }
40 ]
41 policy: {
42 timeout: '0.12:00:00'
43 retry: 0
44 retryIntervalInSeconds: 30
45 secureOutput: false
46 secureInput: false
47 }
48 name: 'CopyAccessLog'
49 userProperties: [
50 {
51 name: 'Source'
52 value: 'container-name/<your path>/'
53 }
54 {
55 name: 'Destination'
56 value: 'destination-container-name//'
57 }
58 ]
59 type: 'Copy'
60 inputs: [
61 {
62 referenceName: 'SourceDataset'
63 type: 'DatasetReference'
64 parameters: {}
65 }
66 ]
67 outputs: [
68 {
69 referenceName: 'DestinationDataset'
70 type: 'DatasetReference'
71 parameters: {}
72 }
73 ]
74 typeProperties: {
75 source: {
76 type: 'JsonSource'
77 storeSettings: {
78 type: 'AzureBlobStorageReadSettings'
79 recursive: true
80 wildcardFolderPath: {
81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
82 type: 'Expression'
83 }
84 wildcardFileName: '*'
85 enablePartitionDiscovery: false
86 }
87 formatSettings: {
88 type: 'JsonReadSettings'
89 }
90 }
91 sink: {
92 type: 'ParquetSink'
93 storeSettings: {
94 type: 'AzureBlobStorageWriteSettings'
95 copyBehavior: 'FlattenHierarchy'
96 }
97 formatSettings: {
98 type: 'ParquetWriteSettings'
99 }
100 }
101 enableStaging: false
102 validateDataConsistency: false
103 translator: {
104 type: 'TabularTranslator'
105 mappings: [ // Define mappings here
106 {
107 source: {
108 path: '$[\'time\']'
109 }
110 sink: {
111 name: 'time'
112 type: 'DateTime'
113 }
114 }
115 // ...
116 ]
117 }
118 }
119 }
120 ]
121 policy: {
122 elapsedTimeMetric: {}
123 }
124 }
125}
126
127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
128 parent: factory
129 name: 'AzureBlobStorageInput'
130 properties: {
131 annotations: []
132 type: 'AzureBlobStorage'
133 typeProperties: {
134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'
135 accountKind: 'StorageV2'
136 }
137 // For remaining properties, see LinkedService objects
138 }
139}
140
141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
142 parent: factory
143 name: 'AzureBlobStorageOutput'
144 properties: {
145 annotations: []
146 type: 'AzureBlobStorage'
147 typeProperties: {
148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'
149 accountKind: 'StorageV2'
150 }
151 // For remaining properties, see LinkedService objects
152 }
153}
154
155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
156 parent: factory
157 name: 'DestinationDataset'
158 dependsOn: [
159 outputBlob
160 ]
161 properties: {
162 annotations: []
163 description: 'string'
164 folder: {
165 name: 'string'
166 }
167 linkedServiceName: {
168 referenceName: 'AzureBlobStorageOutput'
169 type: 'LinkedServiceReference'
170 }
171 typeProperties: {
172 location: {
173 type: 'AzureBlobStorageLocation'
174 folderPath: {
175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
176 type: 'Expression'
177 }
178 container: 'container-name'
179 }
180 compressionCodec: 'snappy'
181 }
182 type: 'Parquet'
183 // For remaining properties, see Dataset objects
184 }
185}
186
187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
188 parent: factory
189 name: 'SourceDataset'
190 dependsOn: [
191 inputBlob
192 ]
193 properties: {
194 annotations: []
195 description: 'string'
196 folder: {
197 name: 'string'
198 }
199 linkedServiceName: {
200 referenceName: 'AzureBlobStorageInput'
201 type: 'LinkedServiceReference'
202 }
203 typeProperties: {
204 location: {
205 type: 'AzureBlobStorageLocation'
206 folderPath: 'your-path'
207 container: 'container-name'
208 }
209 }
210 type: 'Json'
211 schema: {
212 type: 'object'
213 properties: {
214 time: {
215 type: 'string'
216 }
217 ...
218 }
219 }
220 // For remaining properties, see Dataset objects
221 }
222}
223
224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {
225 parent: factory
226 name: 'DailyTrigger'
227 dependsOn: [
228 pipelines
229 ]
230 properties: {
231 annotations: []
232 // runtimeState: 'Started'
233 pipelines: [
234 {
235 pipelineReference: {
236 referenceName: 'your-pipeline-name'
237 type: 'PipelineReference'
238 }
239 parameters: {}
240 }
241 ]
242 type: 'ScheduleTrigger'
243 typeProperties: {
244 recurrence: {
245 frequency: 'Day'
246 interval: 1
247 startTime: triggerStartTime
248 timeZone: 'UTC'
249 schedule: {
250 minutes: [
251 0
252 ]
253 hours: [
254 0
255 ]
256 }
257 }
258 }
259 // For remaining properties, see Trigger objects
260 }
261}
262
263output managedIdentityPrincipalId string = factory.identity.principalId

这里出现的资源如下。

资源说明
Microsoft.DataFactory/factoriesData Factory 本体
Microsoft.DataFactory/factories/pipelines管道定义。用于执行数据复制等。
Microsoft.DataFactory/factories/linkedservices将输入与输出存储关联到该 linked service
Microsoft.DataFactory/factories/datasets输入与输出的数据集
Microsoft.DataFactory/factories/triggers触发器设置。不需要自动触发时可省略。

创建 Synapse

接下来部署 Synapse Analytics。 由于默认提供内置的无服务器 SQL 池,因此我们在其中创建数据库。

创建数据库

在内置的无服务器 SQL 池中创建数据库。

sql
1CREATE DATABASE [myDB];
2GO
3
4USE [myDB];
5GO

添加外部数据源

创建外部数据源,并连接到数据湖。 此时创建 WorkspaceIdentity,并让该外部数据源的访问使用 WorkspaceIdentity 的权限。

sql
1CREATE MASTER KEY
2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';
3
4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
5WITH IDENTITY = 'Managed Identity';
6GO
7
8CREATE EXTERNAL DATA SOURCE MyDataLake
9WITH (
10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',
11 CREDENTIAL = WorkspaceIdentity
12);
13GO

创建外部表

创建外部表,并将 DATA_SOURCE 指定为之前创建的外部数据源。 LOCATION 里指定 blob 的路径,可以使用通配符。

sql
1-- 创建架构
2CREATE SCHEMA ext AUTHORIZATION dbo;
3GO
4
5CREATE SCHEMA func AUTHORIZATION dbo;
6GO
7
8-- 创建文件格式
9CREATE EXTERNAL FILE FORMAT [ParquetFormat]
10WITH (FORMAT_TYPE = PARQUET);
11GO
12
13-- 创建外部表
14CREATE EXTERNAL TABLE ext.myTable
15(
16 -- 列定义
17 time datetime2 NULL,
18 propertyA nvarchar(512) NULL,
19 propertyB nvarchar(512) NULL,
20 ...
21)
22WITH
23(
24 LOCATION = 'y=*/**',
25 DATA_SOURCE = [MyDataLake],
26 FILE_FORMAT = [ParquetFormat]
27);
28GO

创建 View

创建视图,并筛选出 Functions 需要访问的字段等。

sql
1CREATE VIEW func.viewOfMyTable
2AS
3SELECT
4 [time],
5 [propertyA],
6 [propertyB],
7 ...
8FROM ext.myTable
9WHERE <如有需要,在此填写条件>;
10GO

为 Service Principal 授权

这是为了在 CI/CD 中从流水线向数据库添加权限的设置。 如果没有这样的计划,可以跳过。

使用流水线中使用的 SPN 名称创建外部用户,并授予权限。

sql
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;
2GO
3
4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];
5GO
6
7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];
8GO

给 Synapse Workspace 的托管标识授予 Entra ID 的“目录读取者”权限

如果要动态授予 Functions 权限,只要指定 Function 名称,Workspace 侧会自动查找与该 Function 名称对应的托管标识 objectId,并完成授权。 要实现这一点,Workspace 自身似乎需要拥有“目录读取者”权限。

请在 Entra ID > 角色与管理员 > 目录读取者 > 添加分配 中,将该角色分配给 Workspace 的 Identity。 Workspace 的托管标识 objectId 可在 Synapse Studio 的管理菜单 Credentials 中确认。

在流水线上为 Functions 授权

当你部署新的 Function 时,需要为该 Function 授予数据库访问权限。 为了在 Azure Pipelines 上执行,我使用了以下步骤。

  1. 准备 SQL 脚本
  2. 在流水线运行时动态替换脚本中的 Function 名称
  3. 使用 SqlAzureDacpacDeployment@1 执行 SQL

下面按顺序说明。

准备 SQL

准备如下 SQL。FunctionAppMiName 的部分稍后会替换为目标 Function 名称。

sql
1USE [myDB];
2
3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')
4BEGIN
5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';
6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;
7END
8ELSE
9BEGIN
10 PRINT 'User [FunctionAppMiName] already exists.';
11END;
12
13IF NOT EXISTS (
14 SELECT 1
15 FROM sys.database_role_members rm
16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id
17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id
18 WHERE r.name = N'db_datareader'
19 AND m.name = N'FunctionAppMiName'
20)
21BEGIN
22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';
23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];
24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];
25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];
26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];
27END
28ELSE
29BEGIN
30 PRINT 'Already in db_datareader.';
31END;

简单说明一下权限:

db_datareader 是读取数据所需的权限。 另外我授予了对 WorkspaceIdentity 的引用权限,因为在创建外部数据源时,如果没有对 WorkspaceIdentity 的引用权限就无法访问数据源。 最后,为了只允许通过视图访问并禁止直接查询外部表,我授予视图访问权限并对 schema 进行 DENY

注入 Function 名称

我使用 PowerShell@2 任务按如下方式改写 SQL 文件;只要能替换文件中的 Function 名称,任何方式都可以。

bash
1- task: PowerShell@2
2 displayName: 'Generate grant-function-mi.sql from template'
3 inputs:
4 targetType: 'inline'
5 pwsh: true
6 script: |
7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"
8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"
9
10 if (-not (Test-Path $templatePath)) {
11 throw "Template not found: $templatePath"
12 }
13
14 # 读取模板
15 $content = Get-Content $templatePath -Raw
16
17 # 替换(用流水线变量填充)
18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'
19
20 # 如果输出目录不存在则创建
21 $dir = Split-Path $outputPath
22 if (-not (Test-Path $dir)) {
23 New-Item -Path $dir -ItemType Directory -Force | Out-Null
24 }
25
26 # 输出 SQL
27 $content | Set-Content $outputPath -Encoding UTF8
28
29 Write-Host "Generated SQL file:"
30 Get-Content $outputPath

执行 SQL

执行刚刚改写后的 SQL。 可以用如下任务来执行。

bash
1- task: SqlAzureDacpacDeployment@1
2 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'
3 inputs:
4 # Azure 连接(服务连接名称)
5 azureSubscription: '${{parameters.azureServiceConnection}}'
6
7 # Synapse serverless SQL endpoint
8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'
9 DatabaseName: 'myDB'
10
11 # 身份验证(使用服务连接的 SPN)
12 SqlUserName: '' # 可留空
13 SqlPassword: '' # 可留空
14 AuthenticationType: 'servicePrincipal'
15
16 # 执行模式:SQL 脚本(非 DACPAC)
17 deployType: 'SqlTask'
18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # 上面生成的 .sql 文件路径
19
20 # 选项(按需)
21 IpDetectionMethod: 'AutoDetect'

我这里使用了 SqlAzureDacpacDeployment@1,但该方法无法把 Function 名称作为参数传入。因此需要在调用任务之前先改写 SQL,把 Function 名称写入。

关键点是像 SQL 文件中那样为 Function 授权,所以不一定必须按完全相同的步骤来做。

从 Functions 发起查询

从 Functions 对视图发送如下查询。 在 .NET 中使用 Microsoft.Data.SqlClient 包。

csharp
1var queryString = @"
2SELECT TOP (@maxItemCount)
3 [time],
4 [requestUri]
5 -- ...
6FROM func.viewOfMyTable
7WHERE
8 [time] >= @startDate
9 AND [time] <= @endDate
10ORDER BY
11 [time] DESC;
12";

总结

要从 Functions 发起查询,需要为 Function 授予权限。 另外,如果要在 CI/CD 流水线中进行授权,还需要为 SPN 做权限设置,这部分会稍微复杂一些。 由于同时涉及托管标识权限与数据库访问权限两类概念,注意不要混淆。

本文中我们创建了外部数据源和视图。虽然也可以让 Functions 直接通过 OPENROWSET 访问数据,但那通常需要给 Function 更大的权限,因此这里没有采用。

分享:

相关文章

将Azure资源描述为ARM Template
Guides

将Azure资源描述为ARM Template

ARM Template是定义Azure资源的json文件。本文介绍如何高效创建ARM Template来部署新资源。

mark241
将后端连接到Azure API管理
Guides

将后端连接到Azure API管理

了解如何使用ARM模板向API管理添加API。使用operations、policy和backend资源连接到后端。

mark241