Как выполнять запросы к Synapse Analytics из Azure Functions

⏱️4 мин
Поделиться:

В этой статье собраны шаги по выполнению запросов из Azure Functions с использованием Azure Synapse Analytics.

Шаги

Преобразование и копирование данных с помощью Data Factory

Сначала подготовим данные для анализа. В этом примере мы анализируем логи, накопленные в хранилище. После преобразования данных в удобный для аналитики формат разместим их в Azure Data Lake Storage Gen2.

Здесь мы используем сервис Data Factory. С помощью Data Factory выполняем копирование данных. Так как при копировании можно выбрать формат, используем формат Parquet.

Ниже приведён пример Bicep. В этом примере в настройках пути используются подстановочные символы и выражения — при необходимости адаптируйте их под свои требования. Часто проще создать пайплайн через UI в Data Factory Studio. Пайплайны, созданные в Studio, также можно экспортировать в Bicep.

bicep
1param factoryName string
2param tags object
3@secure()
4param subscriptionId string
5param sourceResourceGroupName string
6param inputStorageAccountName string
7param outputStorageAccountName string
8
9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')
10
11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {
12 identity: {
13 type: 'SystemAssigned'
14 }
15 location: resourceGroup().location
16 name: factoryName
17 properties: {
18 publicNetworkAccess: 'Enabled'
19 }
20 tags: tags
21}
22
23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {
24 parent: factory
25 name: 'clicklog-pipeline'
26 dependsOn: [
27 inputDataSet
28 outputDataSet
29 ]
30 properties: {
31 activities: [
32 {
33 dependsOn: [
34 {
35 activity: 'LookupBlob'
36 dependencyConditions: [
37 'Succeeded'
38 ]
39 }
40 ]
41 policy: {
42 timeout: '0.12:00:00'
43 retry: 0
44 retryIntervalInSeconds: 30
45 secureOutput: false
46 secureInput: false
47 }
48 name: 'CopyAccessLog'
49 userProperties: [
50 {
51 name: 'Source'
52 value: 'container-name/<your path>/'
53 }
54 {
55 name: 'Destination'
56 value: 'destination-container-name//'
57 }
58 ]
59 type: 'Copy'
60 inputs: [
61 {
62 referenceName: 'SourceDataset'
63 type: 'DatasetReference'
64 parameters: {}
65 }
66 ]
67 outputs: [
68 {
69 referenceName: 'DestinationDataset'
70 type: 'DatasetReference'
71 parameters: {}
72 }
73 ]
74 typeProperties: {
75 source: {
76 type: 'JsonSource'
77 storeSettings: {
78 type: 'AzureBlobStorageReadSettings'
79 recursive: true
80 wildcardFolderPath: {
81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
82 type: 'Expression'
83 }
84 wildcardFileName: '*'
85 enablePartitionDiscovery: false
86 }
87 formatSettings: {
88 type: 'JsonReadSettings'
89 }
90 }
91 sink: {
92 type: 'ParquetSink'
93 storeSettings: {
94 type: 'AzureBlobStorageWriteSettings'
95 copyBehavior: 'FlattenHierarchy'
96 }
97 formatSettings: {
98 type: 'ParquetWriteSettings'
99 }
100 }
101 enableStaging: false
102 validateDataConsistency: false
103 translator: {
104 type: 'TabularTranslator'
105 mappings: [ // Define mappings here
106 {
107 source: {
108 path: '$[\'time\']'
109 }
110 sink: {
111 name: 'time'
112 type: 'DateTime'
113 }
114 }
115 // ...
116 ]
117 }
118 }
119 }
120 ]
121 policy: {
122 elapsedTimeMetric: {}
123 }
124 }
125}
126
127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
128 parent: factory
129 name: 'AzureBlobStorageInput'
130 properties: {
131 annotations: []
132 type: 'AzureBlobStorage'
133 typeProperties: {
134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'
135 accountKind: 'StorageV2'
136 }
137 // For remaining properties, see LinkedService objects
138 }
139}
140
141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
142 parent: factory
143 name: 'AzureBlobStorageOutput'
144 properties: {
145 annotations: []
146 type: 'AzureBlobStorage'
147 typeProperties: {
148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'
149 accountKind: 'StorageV2'
150 }
151 // For remaining properties, see LinkedService objects
152 }
153}
154
155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
156 parent: factory
157 name: 'DestinationDataset'
158 dependsOn: [
159 outputBlob
160 ]
161 properties: {
162 annotations: []
163 description: 'string'
164 folder: {
165 name: 'string'
166 }
167 linkedServiceName: {
168 referenceName: 'AzureBlobStorageOutput'
169 type: 'LinkedServiceReference'
170 }
171 typeProperties: {
172 location: {
173 type: 'AzureBlobStorageLocation'
174 folderPath: {
175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
176 type: 'Expression'
177 }
178 container: 'container-name'
179 }
180 compressionCodec: 'snappy'
181 }
182 type: 'Parquet'
183 // For remaining properties, see Dataset objects
184 }
185}
186
187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
188 parent: factory
189 name: 'SourceDataset'
190 dependsOn: [
191 inputBlob
192 ]
193 properties: {
194 annotations: []
195 description: 'string'
196 folder: {
197 name: 'string'
198 }
199 linkedServiceName: {
200 referenceName: 'AzureBlobStorageInput'
201 type: 'LinkedServiceReference'
202 }
203 typeProperties: {
204 location: {
205 type: 'AzureBlobStorageLocation'
206 folderPath: 'your-path'
207 container: 'container-name'
208 }
209 }
210 type: 'Json'
211 schema: {
212 type: 'object'
213 properties: {
214 time: {
215 type: 'string'
216 }
217 ...
218 }
219 }
220 // For remaining properties, see Dataset objects
221 }
222}
223
224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {
225 parent: factory
226 name: 'DailyTrigger'
227 dependsOn: [
228 pipelines
229 ]
230 properties: {
231 annotations: []
232 // runtimeState: 'Started'
233 pipelines: [
234 {
235 pipelineReference: {
236 referenceName: 'your-pipeline-name'
237 type: 'PipelineReference'
238 }
239 parameters: {}
240 }
241 ]
242 type: 'ScheduleTrigger'
243 typeProperties: {
244 recurrence: {
245 frequency: 'Day'
246 interval: 1
247 startTime: triggerStartTime
248 timeZone: 'UTC'
249 schedule: {
250 minutes: [
251 0
252 ]
253 hours: [
254 0
255 ]
256 }
257 }
258 }
259 // For remaining properties, see Trigger objects
260 }
261}
262
263output managedIdentityPrincipalId string = factory.identity.principalId

Используемые здесь ресурсы:

РесурсОписание
Microsoft.DataFactory/factoriesЭкземпляр Data Factory
Microsoft.DataFactory/factories/pipelinesОпределение пайплайна. Пайплайн выполняет копирование данных и т. п.
Microsoft.DataFactory/factories/linkedservicesLinked services, связывающие входное и выходное хранилище
Microsoft.DataFactory/factories/datasetsВходные и выходные датасеты
Microsoft.DataFactory/factories/triggersНастройка триггера. Не нужна, если не требуется автозапуск.

Создание Synapse

Далее развернём Synapse Analytics. Так как по умолчанию доступен встроенный serverless SQL pool, создадим базу данных там.

Создание базы данных

Создайте базу данных во встроенном serverless SQL pool.

sql
1CREATE DATABASE [myDB];
2GO
3
4USE [myDB];
5GO

Добавление внешнего источника данных

Создайте внешний источник данных и подключите его к data lake. При этом создайте WorkspaceIdentity и настройте доступ к этому внешнему источнику данных через права WorkspaceIdentity.

sql
1CREATE MASTER KEY
2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';
3
4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
5WITH IDENTITY = 'Managed Identity';
6GO
7
8CREATE EXTERNAL DATA SOURCE MyDataLake
9WITH (
10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',
11 CREDENTIAL = WorkspaceIdentity
12);
13GO

Создание внешней таблицы

Создайте внешнюю таблицу и укажите DATA_SOURCE на внешний источник данных, созданный ранее. В LOCATION укажите путь в blob. Поддерживаются wildcard.

sql
1-- Создание схемы
2CREATE SCHEMA ext AUTHORIZATION dbo;
3GO
4
5CREATE SCHEMA func AUTHORIZATION dbo;
6GO
7
8-- Создание формата файла
9CREATE EXTERNAL FILE FORMAT [ParquetFormat]
10WITH (FORMAT_TYPE = PARQUET);
11GO
12
13-- Создание внешней таблицы
14CREATE EXTERNAL TABLE ext.myTable
15(
16 -- Определение столбцов
17 time datetime2 NULL,
18 propertyA nvarchar(512) NULL,
19 propertyB nvarchar(512) NULL,
20 ...
21)
22WITH
23(
24 LOCATION = 'y=*/**',
25 DATA_SOURCE = [MyDataLake],
26 FILE_FORMAT = [ParquetFormat]
27);
28GO

Создание представления (View)

Создайте view и ограничьте поля, к которым будет обращаться Functions.

sql
1CREATE VIEW func.viewOfMyTable
2AS
3SELECT
4 [time],
5 [propertyA],
6 [propertyB],
7 ...
8FROM ext.myTable
9WHERE <укажите условие здесь при необходимости>;
10GO

Выдача прав service principal

Это настройка для добавления прав к базе данных из CI/CD-пайплайна. Если вы не планируете это делать, раздел можно пропустить.

Используя имя SPN, которое используется в пайплайне, создайте внешнего пользователя и выдайте права.

sql
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;
2GO
3
4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];
5GO
6
7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];
8GO

Назначить Managed Identity рабочего пространства Synapse роль Directory Readers в Entra ID

Если вы хотите выдавать права Functions динамически, можно указать имя Function — workspace автоматически найдёт objectId managed identity, соответствующей этому имени Function, и выдаст права. Для этого, похоже, самому workspace нужна роль Directory Readers.

Назначьте роль identity рабочего пространства через Entra ID > Роли и администраторы > Directory Readers > Добавить назначения. ObjectId managed identity рабочего пространства можно посмотреть в Synapse Studio в админ-меню Credentials.

Выдача прав Functions из пайплайна

При деплое новой Function выдайте ей доступ к базе данных. Чтобы выполнить это в Azure Pipelines, я использовал следующие шаги.

  1. Подготовить SQL-скрипт
  2. Динамически заменить имя Function в скрипте во время выполнения пайплайна
  3. Выполнить SQL с помощью SqlAzureDacpacDeployment@1

Далее — по порядку.

Подготовить SQL

Подготовьте SQL, как показано ниже. Часть FunctionAppMiName позже будет заменена на целевое имя Function.

sql
1USE [myDB];
2
3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')
4BEGIN
5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';
6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;
7END
8ELSE
9BEGIN
10 PRINT 'User [FunctionAppMiName] already exists.';
11END;
12
13IF NOT EXISTS (
14 SELECT 1
15 FROM sys.database_role_members rm
16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id
17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id
18 WHERE r.name = N'db_datareader'
19 AND m.name = N'FunctionAppMiName'
20)
21BEGIN
22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';
23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];
24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];
25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];
26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];
27END
28ELSE
29BEGIN
30 PRINT 'Already in db_datareader.';
31END;

Коротко о правах:

db_datareader нужен для чтения данных. Также я добавил право ссылаться на WorkspaceIdentity, потому что без этого не удавалось обращаться к внешнему источнику данных. Далее я выдаю доступ к view и хочу запретить прямые запросы к внешним таблицам, поэтому делаю DENY на schema.

Подставить имя Function

Я переписал SQL-файл с помощью задачи PowerShell@2, как показано ниже, но подойдёт любой способ, который позволяет заменить имя Function в файле.

bash
1- task: PowerShell@2
2 displayName: 'Generate grant-function-mi.sql from template'
3 inputs:
4 targetType: 'inline'
5 pwsh: true
6 script: |
7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"
8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"
9
10 if (-not (Test-Path $templatePath)) {
11 throw "Template not found: $templatePath"
12 }
13
14 # Прочитать шаблон
15 $content = Get-Content $templatePath -Raw
16
17 # Заменить (подставить значения из переменных пайплайна)
18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'
19
20 # Создать выходной каталог, если его нет
21 $dir = Split-Path $outputPath
22 if (-not (Test-Path $dir)) {
23 New-Item -Path $dir -ItemType Directory -Force | Out-Null
24 }
25
26 # Записать SQL
27 $content | Set-Content $outputPath -Encoding UTF8
28
29 Write-Host "Generated SQL file:"
30 Get-Content $outputPath

Выполнить SQL

Выполните переписанный SQL. Например, задачей как ниже.

bash
1- task: SqlAzureDacpacDeployment@1
2 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'
3 inputs:
4 # Подключение к Azure (имя service connection)
5 azureSubscription: '${{parameters.azureServiceConnection}}'
6
7 # Synapse serverless SQL endpoint
8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'
9 DatabaseName: 'myDB'
10
11 # Аутентификация (через SPN service connection)
12 SqlUserName: '' # Можно оставить пустым
13 SqlPassword: '' # Можно оставить пустым
14 AuthenticationType: 'servicePrincipal'
15
16 # Режим выполнения: SQL-скрипт (не DACPAC)
17 deployType: 'SqlTask'
18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # Путь к .sql файлу, созданному выше
19
20 # Опции (при необходимости)
21 IpDetectionMethod: 'AutoDetect'

В моём случае я использовал SqlAzureDacpacDeployment@1, но этот способ не позволяет передать имя Function как параметр. Поэтому сначала нужно переписать SQL, чтобы встроить имя Function, и затем запускать задачу.

Ключевой момент — выдать права Function, как показано в SQL-файле. Конкретная реализация может отличаться.

Запрос из Functions

Из Functions отправляйте запрос к view примерно так. В .NET используется пакет Microsoft.Data.SqlClient.

csharp
1var queryString = @"
2SELECT TOP (@maxItemCount)
3 [time],
4 [requestUri]
5 -- ...
6FROM func.viewOfMyTable
7WHERE
8 [time] >= @startDate
9 AND [time] <= @endDate
10ORDER BY
11 [time] DESC;
12";

Итоги

Чтобы выполнять запросы из Functions, необходимо выдать права самой Function. Также при выдаче прав из CI/CD-пайплайна требуется настройка прав для SPN — это может быть немного запутанно. Так как есть два аспекта (права managed identity и права доступа к базе данных), важно их не смешивать.

В этой статье мы создали внешний источник данных и view. Можно было бы обращаться к данным из Functions напрямую через OPENROWSET, но я не выбрал этот путь, потому что тогда обычно требуется выдавать Function более широкие права.

Поделиться:

Связанные статьи