Come interrogare Synapse Analytics da Azure Functions

⏱️4 min
Condividi:

Questo articolo riassume come eseguire query da Azure Functions utilizzando Azure Synapse Analytics.

Procedura

Trasformazione e copia dei dati con Data Factory

Per prima cosa, prepariamo i dati da analizzare. In questo esempio analizziamo log salvati nello storage. Dopo aver trasformato i dati in un formato più adatto all’analisi, li posizioniamo in Azure Data Lake Storage Gen2.

Qui utilizziamo un servizio chiamato Data Factory. Con Data Factory eseguiamo la copia dei dati. Poiché durante la copia è possibile scegliere il formato, li convertiamo in formato Parquet.

Di seguito un esempio Bicep. Questo esempio utilizza wildcard ed espressioni nella configurazione dei percorsi, ma modificate queste parti in base ai requisiti. Spesso è più chiaro creare il tutto dall’UI in Data Factory Studio. È anche possibile esportare i pipeline creati in Studio come file Bicep.

bicep
1param factoryName string
2param tags object
3@secure()
4param subscriptionId string
5param sourceResourceGroupName string
6param inputStorageAccountName string
7param outputStorageAccountName string
8
9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')
10
11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {
12 identity: {
13 type: 'SystemAssigned'
14 }
15 location: resourceGroup().location
16 name: factoryName
17 properties: {
18 publicNetworkAccess: 'Enabled'
19 }
20 tags: tags
21}
22
23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {
24 parent: factory
25 name: 'clicklog-pipeline'
26 dependsOn: [
27 inputDataSet
28 outputDataSet
29 ]
30 properties: {
31 activities: [
32 {
33 dependsOn: [
34 {
35 activity: 'LookupBlob'
36 dependencyConditions: [
37 'Succeeded'
38 ]
39 }
40 ]
41 policy: {
42 timeout: '0.12:00:00'
43 retry: 0
44 retryIntervalInSeconds: 30
45 secureOutput: false
46 secureInput: false
47 }
48 name: 'CopyAccessLog'
49 userProperties: [
50 {
51 name: 'Source'
52 value: 'container-name/<your path>/'
53 }
54 {
55 name: 'Destination'
56 value: 'destination-container-name//'
57 }
58 ]
59 type: 'Copy'
60 inputs: [
61 {
62 referenceName: 'SourceDataset'
63 type: 'DatasetReference'
64 parameters: {}
65 }
66 ]
67 outputs: [
68 {
69 referenceName: 'DestinationDataset'
70 type: 'DatasetReference'
71 parameters: {}
72 }
73 ]
74 typeProperties: {
75 source: {
76 type: 'JsonSource'
77 storeSettings: {
78 type: 'AzureBlobStorageReadSettings'
79 recursive: true
80 wildcardFolderPath: {
81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
82 type: 'Expression'
83 }
84 wildcardFileName: '*'
85 enablePartitionDiscovery: false
86 }
87 formatSettings: {
88 type: 'JsonReadSettings'
89 }
90 }
91 sink: {
92 type: 'ParquetSink'
93 storeSettings: {
94 type: 'AzureBlobStorageWriteSettings'
95 copyBehavior: 'FlattenHierarchy'
96 }
97 formatSettings: {
98 type: 'ParquetWriteSettings'
99 }
100 }
101 enableStaging: false
102 validateDataConsistency: false
103 translator: {
104 type: 'TabularTranslator'
105 mappings: [ // Define mappings here
106 {
107 source: {
108 path: '$[\'time\']'
109 }
110 sink: {
111 name: 'time'
112 type: 'DateTime'
113 }
114 }
115 // ...
116 ]
117 }
118 }
119 }
120 ]
121 policy: {
122 elapsedTimeMetric: {}
123 }
124 }
125}
126
127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
128 parent: factory
129 name: 'AzureBlobStorageInput'
130 properties: {
131 annotations: []
132 type: 'AzureBlobStorage'
133 typeProperties: {
134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'
135 accountKind: 'StorageV2'
136 }
137 // For remaining properties, see LinkedService objects
138 }
139}
140
141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
142 parent: factory
143 name: 'AzureBlobStorageOutput'
144 properties: {
145 annotations: []
146 type: 'AzureBlobStorage'
147 typeProperties: {
148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'
149 accountKind: 'StorageV2'
150 }
151 // For remaining properties, see LinkedService objects
152 }
153}
154
155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
156 parent: factory
157 name: 'DestinationDataset'
158 dependsOn: [
159 outputBlob
160 ]
161 properties: {
162 annotations: []
163 description: 'string'
164 folder: {
165 name: 'string'
166 }
167 linkedServiceName: {
168 referenceName: 'AzureBlobStorageOutput'
169 type: 'LinkedServiceReference'
170 }
171 typeProperties: {
172 location: {
173 type: 'AzureBlobStorageLocation'
174 folderPath: {
175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
176 type: 'Expression'
177 }
178 container: 'container-name'
179 }
180 compressionCodec: 'snappy'
181 }
182 type: 'Parquet'
183 // For remaining properties, see Dataset objects
184 }
185}
186
187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
188 parent: factory
189 name: 'SourceDataset'
190 dependsOn: [
191 inputBlob
192 ]
193 properties: {
194 annotations: []
195 description: 'string'
196 folder: {
197 name: 'string'
198 }
199 linkedServiceName: {
200 referenceName: 'AzureBlobStorageInput'
201 type: 'LinkedServiceReference'
202 }
203 typeProperties: {
204 location: {
205 type: 'AzureBlobStorageLocation'
206 folderPath: 'your-path'
207 container: 'container-name'
208 }
209 }
210 type: 'Json'
211 schema: {
212 type: 'object'
213 properties: {
214 time: {
215 type: 'string'
216 }
217 ...
218 }
219 }
220 // For remaining properties, see Dataset objects
221 }
222}
223
224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {
225 parent: factory
226 name: 'DailyTrigger'
227 dependsOn: [
228 pipelines
229 ]
230 properties: {
231 annotations: []
232 // runtimeState: 'Started'
233 pipelines: [
234 {
235 pipelineReference: {
236 referenceName: 'your-pipeline-name'
237 type: 'PipelineReference'
238 }
239 parameters: {}
240 }
241 ]
242 type: 'ScheduleTrigger'
243 typeProperties: {
244 recurrence: {
245 frequency: 'Day'
246 interval: 1
247 startTime: triggerStartTime
248 timeZone: 'UTC'
249 schedule: {
250 minutes: [
251 0
252 ]
253 hours: [
254 0
255 ]
256 }
257 }
258 }
259 // For remaining properties, see Trigger objects
260 }
261}
262
263output managedIdentityPrincipalId string = factory.identity.principalId

Le risorse che compaiono qui sono le seguenti.

RisorsaDescrizione
Microsoft.DataFactory/factoriesL’istanza di Data Factory
Microsoft.DataFactory/factories/pipelinesDefinizione del pipeline. Il pipeline esegue copie dati, ecc.
Microsoft.DataFactory/factories/linkedservicesLinked service che collegano lo storage di input e output
Microsoft.DataFactory/factories/datasetsDataset di input e output
Microsoft.DataFactory/factories/triggersConfigurazione del trigger. Non necessario se non serve l’esecuzione automatica.

Creare Synapse

Successivamente, distribuiamo Synapse Analytics. Poiché per impostazione predefinita è disponibile un pool SQL serverless integrato, creiamo lì un database.

Creare un database

Creare un database nel pool SQL serverless integrato.

sql
1CREATE DATABASE [myDB];
2GO
3
4USE [myDB];
5GO

Aggiungere una sorgente dati esterna

Creare una sorgente dati esterna e collegarla al data lake. In questa fase, creare WorkspaceIdentity e configurare l’accesso a questa sorgente dati esterna utilizzando i permessi di WorkspaceIdentity.

sql
1CREATE MASTER KEY
2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';
3
4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
5WITH IDENTITY = 'Managed Identity';
6GO
7
8CREATE EXTERNAL DATA SOURCE MyDataLake
9WITH (
10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',
11 CREDENTIAL = WorkspaceIdentity
12);
13GO

Creare una tabella esterna

Creare una tabella esterna e impostare DATA_SOURCE sulla sorgente dati esterna creata in precedenza. Per LOCATION, specificare il percorso del blob. È possibile utilizzare wildcard.

sql
1-- Creare lo schema
2CREATE SCHEMA ext AUTHORIZATION dbo;
3GO
4
5CREATE SCHEMA func AUTHORIZATION dbo;
6GO
7
8-- Creare il formato file
9CREATE EXTERNAL FILE FORMAT [ParquetFormat]
10WITH (FORMAT_TYPE = PARQUET);
11GO
12
13-- Creare la tabella esterna
14CREATE EXTERNAL TABLE ext.myTable
15(
16 -- Definizione delle colonne
17 time datetime2 NULL,
18 propertyA nvarchar(512) NULL,
19 propertyB nvarchar(512) NULL,
20 ...
21)
22WITH
23(
24 LOCATION = 'y=*/**',
25 DATA_SOURCE = [MyDataLake],
26 FILE_FORMAT = [ParquetFormat]
27);
28GO

Creare una view

Creare una view e limitare le proprietà (colonne) a cui accederanno le Functions.

sql
1CREATE VIEW func.viewOfMyTable
2AS
3SELECT
4 [time],
5 [propertyA],
6 [propertyB],
7 ...
8FROM ext.myTable
9WHERE <scrivi qui una condizione se necessario>;
10GO

Concedere permessi al service principal

Questa è una configurazione per aggiungere permessi al database dal pipeline in CI/CD. Se non prevedete di farlo, potete saltare questa sezione.

Usando il nome dello SPN utilizzato dal pipeline, creare un utente esterno e concedere i permessi.

sql
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;
2GO
3
4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];
5GO
6
7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];
8GO

Concedere il ruolo “Directory Readers” di Entra ID alla managed identity del workspace Synapse

Se volete concedere permessi dinamicamente a Functions, potete specificare il nome della Function: il workspace cercherà automaticamente l’objectId della managed identity corrispondente e assegnerà i permessi. Per farlo, sembra che il workspace stesso debba avere il ruolo “Directory Readers”.

Aggiungete il ruolo alla identity del workspace da Entra ID > Ruoli e amministratori > Directory Readers > Aggiungi assegnazioni. Potete verificare l’objectId della managed identity del workspace in Synapse Studio, nel menu di amministrazione Credentials.

Concedere permessi a Functions dal pipeline

Quando distribuite una nuova Function, concedetele l’accesso al database. Per eseguirlo su Azure Pipelines, ho seguito questi passaggi.

  1. Preparare uno script SQL
  2. Sostituire dinamicamente il nome della Function nello script durante l’esecuzione del pipeline
  3. Eseguire il SQL usando SqlAzureDacpacDeployment@1

Di seguito spiego i passaggi in ordine.

Preparare il SQL

Preparare un SQL come il seguente. La parte FunctionAppMiName verrà poi sostituita con il nome della Function target.

sql
1USE [myDB];
2
3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')
4BEGIN
5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';
6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;
7END
8ELSE
9BEGIN
10 PRINT 'User [FunctionAppMiName] already exists.';
11END;
12
13IF NOT EXISTS (
14 SELECT 1
15 FROM sys.database_role_members rm
16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id
17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id
18 WHERE r.name = N'db_datareader'
19 AND m.name = N'FunctionAppMiName'
20)
21BEGIN
22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';
23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];
24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];
25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];
26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];
27END
28ELSE
29BEGIN
30 PRINT 'Already in db_datareader.';
31END;

Breve spiegazione dei permessi:

db_datareader è necessario per leggere i dati. Ho aggiunto anche il permesso di riferimento su WorkspaceIdentity, perché senza non riuscivo ad accedere alla sorgente dati esterna. Infine, concedo l’accesso alla view e, per impedire query dirette sulle tabelle esterne, applico un DENY sullo schema.

Inserire il nome della Function

Ho riscritto il file SQL usando un task PowerShell@2 come segue, ma qualsiasi metodo va bene purché possiate sostituire il nome della Function nel file.

bash
1- task: PowerShell@2
2 displayName: 'Generate grant-function-mi.sql from template'
3 inputs:
4 targetType: 'inline'
5 pwsh: true
6 script: |
7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"
8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"
9
10 if (-not (Test-Path $templatePath)) {
11 throw "Template not found: $templatePath"
12 }
13
14 # Leggere il template
15 $content = Get-Content $templatePath -Raw
16
17 # Sostituire (inserire valori usando variabili della pipeline)
18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'
19
20 # Creare la directory di output se non esiste
21 $dir = Split-Path $outputPath
22 if (-not (Test-Path $dir)) {
23 New-Item -Path $dir -ItemType Directory -Force | Out-Null
24 }
25
26 # Scrivere l'SQL
27 $content | Set-Content $outputPath -Encoding UTF8
28
29 Write-Host "Generated SQL file:"
30 Get-Content $outputPath

Eseguire il SQL

Eseguite il SQL riscritto. Ad esempio con un task come il seguente.

bash
1- task: SqlAzureDacpacDeployment@1
2 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'
3 inputs:
4 # Connessione ad Azure (nome della service connection)
5 azureSubscription: '${{parameters.azureServiceConnection}}'
6
7 # Synapse serverless SQL endpoint
8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'
9 DatabaseName: 'myDB'
10
11 # Autenticazione (usando lo SPN della service connection)
12 SqlUserName: '' # Può essere lasciato vuoto
13 SqlPassword: '' # Può essere lasciato vuoto
14 AuthenticationType: 'servicePrincipal'
15
16 # Modalità di esecuzione: script SQL (non DACPAC)
17 deployType: 'SqlTask'
18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # Percorso al file .sql generato sopra
19
20 # Opzioni (se necessario)
21 IpDetectionMethod: 'AutoDetect'

Nel mio caso ho usato SqlAzureDacpacDeployment@1, ma questo metodo non consente di passare il nome della Function come parametro. Di conseguenza, il flusso consiste nel riscrivere in anticipo il SQL per includere il nome della Function e poi eseguire il task.

Il punto importante è concedere i permessi alla Function come indicato nel file SQL: non è necessario seguire esattamente questa procedura.

Query da Functions

Da Functions, inviate una query alla view come segue. In .NET utilizzo il pacchetto Microsoft.Data.SqlClient.

csharp
1var queryString = @"
2SELECT TOP (@maxItemCount)
3 [time],
4 [requestUri]
5 -- ...
6FROM func.viewOfMyTable
7WHERE
8 [time] >= @startDate
9 AND [time] <= @endDate
10ORDER BY
11 [time] DESC;
12";

Riepilogo

Per interrogare da Functions, dovete concedere i permessi alla Function. Inoltre, se volete aggiungere i permessi da un pipeline CI/CD, serve anche una configurazione dei permessi per lo SPN, e questo può essere un po’ complicato. Dato che ci sono due livelli (permessi della managed identity e permessi di accesso al database), fate attenzione a non confonderli.

In questo articolo abbiamo creato una sorgente dati esterna e una view. Sarebbe anche possibile accedere ai dati direttamente da Functions usando OPENROWSET, ma non ho scelto questo approccio perché richiederebbe di concedere permessi più ampi alla Function.

Condividi:

Articoli correlati