Como consultar o Synapse Analytics a partir do Azure Functions

⏱️5 min
Compartilhar:

Este artigo resume como executar consultas a partir do Azure Functions usando o Azure Synapse Analytics.

Passos

Transformação e cópia de dados com o Data Factory

Primeiro, prepare os dados a serem analisados. Neste exemplo, analisaremos logs armazenados no Storage. Depois de transformar os dados para um formato mais adequado para análise, coloque-os no Azure Data Lake Storage Gen2.

Aqui usamos um serviço chamado Data Factory. Com o Data Factory, fazemos a cópia dos dados. Como é possível escolher o formato durante a cópia, vamos usar o formato Parquet.

A seguir há um exemplo em Bicep. Este exemplo usa curingas e expressões na configuração do caminho; ajuste esses pontos conforme seus requisitos. Muitas vezes é mais fácil criar pelo UI no Data Factory Studio. Também é possível exportar pipelines criados no Studio como arquivos Bicep.

bicep
1param factoryName string
2param tags object
3@secure()
4param subscriptionId string
5param sourceResourceGroupName string
6param inputStorageAccountName string
7param outputStorageAccountName string
8
9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')
10
11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {
12 identity: {
13 type: 'SystemAssigned'
14 }
15 location: resourceGroup().location
16 name: factoryName
17 properties: {
18 publicNetworkAccess: 'Enabled'
19 }
20 tags: tags
21}
22
23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {
24 parent: factory
25 name: 'clicklog-pipeline'
26 dependsOn: [
27 inputDataSet
28 outputDataSet
29 ]
30 properties: {
31 activities: [
32 {
33 dependsOn: [
34 {
35 activity: 'LookupBlob'
36 dependencyConditions: [
37 'Succeeded'
38 ]
39 }
40 ]
41 policy: {
42 timeout: '0.12:00:00'
43 retry: 0
44 retryIntervalInSeconds: 30
45 secureOutput: false
46 secureInput: false
47 }
48 name: 'CopyAccessLog'
49 userProperties: [
50 {
51 name: 'Source'
52 value: 'container-name/<your path>/'
53 }
54 {
55 name: 'Destination'
56 value: 'destination-container-name//'
57 }
58 ]
59 type: 'Copy'
60 inputs: [
61 {
62 referenceName: 'SourceDataset'
63 type: 'DatasetReference'
64 parameters: {}
65 }
66 ]
67 outputs: [
68 {
69 referenceName: 'DestinationDataset'
70 type: 'DatasetReference'
71 parameters: {}
72 }
73 ]
74 typeProperties: {
75 source: {
76 type: 'JsonSource'
77 storeSettings: {
78 type: 'AzureBlobStorageReadSettings'
79 recursive: true
80 wildcardFolderPath: {
81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
82 type: 'Expression'
83 }
84 wildcardFileName: '*'
85 enablePartitionDiscovery: false
86 }
87 formatSettings: {
88 type: 'JsonReadSettings'
89 }
90 }
91 sink: {
92 type: 'ParquetSink'
93 storeSettings: {
94 type: 'AzureBlobStorageWriteSettings'
95 copyBehavior: 'FlattenHierarchy'
96 }
97 formatSettings: {
98 type: 'ParquetWriteSettings'
99 }
100 }
101 enableStaging: false
102 validateDataConsistency: false
103 translator: {
104 type: 'TabularTranslator'
105 mappings: [ // Define mappings here
106 {
107 source: {
108 path: '$[\'time\']'
109 }
110 sink: {
111 name: 'time'
112 type: 'DateTime'
113 }
114 }
115 // ...
116 ]
117 }
118 }
119 }
120 ]
121 policy: {
122 elapsedTimeMetric: {}
123 }
124 }
125}
126
127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
128 parent: factory
129 name: 'AzureBlobStorageInput'
130 properties: {
131 annotations: []
132 type: 'AzureBlobStorage'
133 typeProperties: {
134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'
135 accountKind: 'StorageV2'
136 }
137 // For remaining properties, see LinkedService objects
138 }
139}
140
141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
142 parent: factory
143 name: 'AzureBlobStorageOutput'
144 properties: {
145 annotations: []
146 type: 'AzureBlobStorage'
147 typeProperties: {
148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'
149 accountKind: 'StorageV2'
150 }
151 // For remaining properties, see LinkedService objects
152 }
153}
154
155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
156 parent: factory
157 name: 'DestinationDataset'
158 dependsOn: [
159 outputBlob
160 ]
161 properties: {
162 annotations: []
163 description: 'string'
164 folder: {
165 name: 'string'
166 }
167 linkedServiceName: {
168 referenceName: 'AzureBlobStorageOutput'
169 type: 'LinkedServiceReference'
170 }
171 typeProperties: {
172 location: {
173 type: 'AzureBlobStorageLocation'
174 folderPath: {
175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
176 type: 'Expression'
177 }
178 container: 'container-name'
179 }
180 compressionCodec: 'snappy'
181 }
182 type: 'Parquet'
183 // For remaining properties, see Dataset objects
184 }
185}
186
187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
188 parent: factory
189 name: 'SourceDataset'
190 dependsOn: [
191 inputBlob
192 ]
193 properties: {
194 annotations: []
195 description: 'string'
196 folder: {
197 name: 'string'
198 }
199 linkedServiceName: {
200 referenceName: 'AzureBlobStorageInput'
201 type: 'LinkedServiceReference'
202 }
203 typeProperties: {
204 location: {
205 type: 'AzureBlobStorageLocation'
206 folderPath: 'your-path'
207 container: 'container-name'
208 }
209 }
210 type: 'Json'
211 schema: {
212 type: 'object'
213 properties: {
214 time: {
215 type: 'string'
216 }
217 ...
218 }
219 }
220 // For remaining properties, see Dataset objects
221 }
222}
223
224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {
225 parent: factory
226 name: 'DailyTrigger'
227 dependsOn: [
228 pipelines
229 ]
230 properties: {
231 annotations: []
232 // runtimeState: 'Started'
233 pipelines: [
234 {
235 pipelineReference: {
236 referenceName: 'your-pipeline-name'
237 type: 'PipelineReference'
238 }
239 parameters: {}
240 }
241 ]
242 type: 'ScheduleTrigger'
243 typeProperties: {
244 recurrence: {
245 frequency: 'Day'
246 interval: 1
247 startTime: triggerStartTime
248 timeZone: 'UTC'
249 schedule: {
250 minutes: [
251 0
252 ]
253 hours: [
254 0
255 ]
256 }
257 }
258 }
259 // For remaining properties, see Trigger objects
260 }
261}
262
263output managedIdentityPrincipalId string = factory.identity.principalId

Os recursos que aparecem aqui são os seguintes.

RecursoDescrição
Microsoft.DataFactory/factoriesA instância do Data Factory
Microsoft.DataFactory/factories/pipelinesDefinição do pipeline. O pipeline executa cópia de dados etc.
Microsoft.DataFactory/factories/linkedservicesLinked services que ligam o storage de entrada e saída
Microsoft.DataFactory/factories/datasetsDatasets de entrada e saída
Microsoft.DataFactory/factories/triggersConfiguração do gatilho. Não é necessário se não houver execução automática.

Criar o Synapse

Em seguida, implante o Synapse Analytics. Como existe um pool SQL serverless integrado por padrão, criaremos o banco de dados nele.

Criar um banco de dados

Crie um banco de dados no pool SQL serverless integrado.

sql
1CREATE DATABASE [myDB];
2GO
3
4USE [myDB];
5GO

Adicionar uma fonte de dados externa

Crie uma fonte de dados externa e conecte ao data lake. Nesse momento, crie WorkspaceIdentity e configure o acesso a essa fonte externa para usar as permissões do WorkspaceIdentity.

sql
1CREATE MASTER KEY
2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';
3
4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
5WITH IDENTITY = 'Managed Identity';
6GO
7
8CREATE EXTERNAL DATA SOURCE MyDataLake
9WITH (
10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',
11 CREDENTIAL = WorkspaceIdentity
12);
13GO

Criar uma tabela externa

Crie uma tabela externa e defina DATA_SOURCE para a fonte de dados externa criada anteriormente. Em LOCATION, especifique o caminho do blob. É possível usar curingas.

sql
1-- Criar esquema
2CREATE SCHEMA ext AUTHORIZATION dbo;
3GO
4
5CREATE SCHEMA func AUTHORIZATION dbo;
6GO
7
8-- Criar formato de arquivo
9CREATE EXTERNAL FILE FORMAT [ParquetFormat]
10WITH (FORMAT_TYPE = PARQUET);
11GO
12
13-- Criar tabela externa
14CREATE EXTERNAL TABLE ext.myTable
15(
16 -- Definições de colunas
17 time datetime2 NULL,
18 propertyA nvarchar(512) NULL,
19 propertyB nvarchar(512) NULL,
20 ...
21)
22WITH
23(
24 LOCATION = 'y=*/**',
25 DATA_SOURCE = [MyDataLake],
26 FILE_FORMAT = [ParquetFormat]
27);
28GO

Criar uma View

Crie uma view e restrinja as colunas que serão acessadas pelo Functions.

sql
1CREATE VIEW func.viewOfMyTable
2AS
3SELECT
4 [time],
5 [propertyA],
6 [propertyB],
7 ...
8FROM ext.myTable
9WHERE <adicione uma condição aqui se necessário>;
10GO

Conceder permissões ao Service Principal

Esta é uma configuração para adicionar permissões ao banco de dados a partir de um pipeline no CI/CD. Se você não pretende fazer isso, pode pular esta seção.

Usando o nome do SPN usado no pipeline, crie um usuário externo e conceda permissões.

sql
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;
2GO
3
4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];
5GO
6
7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];
8GO

Conceder ao Managed Identity do workspace do Synapse a função Directory Readers no Entra ID

Se você quiser conceder permissões dinamicamente ao Functions, ao especificar o nome da Function o workspace consultará automaticamente o objectId do Managed Identity correspondente e concederá as permissões. Para isso, parece que o próprio workspace precisa ter o papel Directory Readers.

Adicione o papel à identity do workspace em Entra ID > Funções e administradores > Directory Readers > Adicionar atribuições. O objectId do Managed Identity do workspace pode ser verificado no Synapse Studio, no menu de administração Credentials.

Conceder permissões ao Functions no pipeline

Ao implantar uma nova Function, conceda a ela acesso ao banco de dados. Para executar isso no Azure Pipelines, eu usei os passos abaixo.

  1. Preparar um script SQL
  2. Substituir dinamicamente o nome da Function no script durante a execução do pipeline
  3. Executar o SQL usando SqlAzureDacpacDeployment@1

A seguir, explico em ordem.

Preparar o SQL

Prepare um SQL como o seguinte. A parte FunctionAppMiName será substituída depois pelo nome da Function de destino.

sql
1USE [myDB];
2
3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')
4BEGIN
5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';
6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;
7END
8ELSE
9BEGIN
10 PRINT 'User [FunctionAppMiName] already exists.';
11END;
12
13IF NOT EXISTS (
14 SELECT 1
15 FROM sys.database_role_members rm
16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id
17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id
18 WHERE r.name = N'db_datareader'
19 AND m.name = N'FunctionAppMiName'
20)
21BEGIN
22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';
23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];
24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];
25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];
26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];
27END
28ELSE
29BEGIN
30 PRINT 'Already in db_datareader.';
31END;

Uma breve explicação das permissões:

db_datareader é necessário para ler os dados. Também concedi permissão de referência ao WorkspaceIdentity, pois sem isso não consegui acessar a fonte de dados externa. Além disso, concedo acesso à view e quero proibir consultas diretas às tabelas externas, então aplico DENY no schema.

Inserir o nome da Function

Eu reescrevi o arquivo SQL usando uma tarefa PowerShell@2, como abaixo, mas qualquer método serve desde que você consiga substituir o nome da Function dentro do arquivo.

bash
1- task: PowerShell@2
2 displayName: 'Generate grant-function-mi.sql from template'
3 inputs:
4 targetType: 'inline'
5 pwsh: true
6 script: |
7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"
8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"
9
10 if (-not (Test-Path $templatePath)) {
11 throw "Template not found: $templatePath"
12 }
13
14 # Ler o template
15 $content = Get-Content $templatePath -Raw
16
17 # Substituir (inserir valores usando variáveis do pipeline)
18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'
19
20 # Criar diretório de saída se não existir
21 $dir = Split-Path $outputPath
22 if (-not (Test-Path $dir)) {
23 New-Item -Path $dir -ItemType Directory -Force | Out-Null
24 }
25
26 # Escrever SQL
27 $content | Set-Content $outputPath -Encoding UTF8
28
29 Write-Host "Generated SQL file:"
30 Get-Content $outputPath

Executar o SQL

Execute o SQL reescrito. Por exemplo, com uma tarefa como a seguinte.

bash
1- task: SqlAzureDacpacDeployment@1
2 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'
3 inputs:
4 # Conexão com o Azure (nome da service connection)
5 azureSubscription: '${{parameters.azureServiceConnection}}'
6
7 # Synapse serverless SQL endpoint
8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'
9 DatabaseName: 'myDB'
10
11 # Autenticação (usando o SPN da service connection)
12 SqlUserName: '' # Pode ficar vazio
13 SqlPassword: '' # Pode ficar vazio
14 AuthenticationType: 'servicePrincipal'
15
16 # Modo de execução: script SQL (não DACPAC)
17 deployType: 'SqlTask'
18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # Caminho para o arquivo .sql gerado acima
19
20 # Opções (conforme necessário)
21 IpDetectionMethod: 'AutoDetect'

No meu caso, usei SqlAzureDacpacDeployment@1, mas esse método não permite passar o nome da Function como parâmetro. Portanto, o fluxo é reescrever o SQL antes para embutir o nome da Function e então executar a tarefa.

O ponto principal é conceder permissões à Function como mostrado no arquivo SQL, então você não precisa seguir exatamente este procedimento.

Consulta a partir do Functions

No Functions, envie uma consulta para a view como a seguir. No .NET, uso o pacote Microsoft.Data.SqlClient.

csharp
1var queryString = @"
2SELECT TOP (@maxItemCount)
3 [time],
4 [requestUri]
5 -- ...
6FROM func.viewOfMyTable
7WHERE
8 [time] >= @startDate
9 AND [time] <= @endDate
10ORDER BY
11 [time] DESC;
12";

Resumo

Para consultar a partir do Functions, é necessário conceder permissões à Function. Além disso, ao tentar adicionar permissões via um pipeline de CI/CD, também é necessário configurar permissões para o SPN, o que pode ser um pouco confuso. Como há dois tópicos (permissões de Managed Identity e permissões de acesso ao banco de dados), tome cuidado para não misturá-los.

Neste artigo criamos uma fonte de dados externa e uma view. Também seria possível acessar os dados diretamente do Functions com OPENROWSET, mas não escolhi essa abordagem porque ela exigiria conceder permissões mais amplas à Function.

Compartilhar:

Artigos relacionados