Comment interroger Synapse Analytics depuis Azure Functions

⏱️5 min
Partager :

Cet article résume comment exécuter des requêtes depuis Azure Functions en utilisant Azure Synapse Analytics.

Étapes

Transformation et copie des données avec Data Factory

Commencez par préparer les données à analyser. Dans cet exemple, nous analysons des journaux stockés dans un compte de stockage. Après avoir transformé les données dans un format plus adapté à l’analyse, placez-les dans Azure Data Lake Storage Gen2.

Ici, nous utilisons un service appelé Data Factory. Avec Data Factory, nous copions les données. Comme vous pouvez choisir le format lors de la copie, nous utilisons le format Parquet.

Ci-dessous, un exemple Bicep. Cet exemple utilise des jokers et des expressions dans la configuration des chemins, mais adaptez ces parties selon vos besoins. Il est souvent plus simple de créer le pipeline depuis l’UI dans Data Factory Studio. Les pipelines créés dans Studio peuvent aussi être exportés en fichiers Bicep.

bicep
1param factoryName string
2param tags object
3@secure()
4param subscriptionId string
5param sourceResourceGroupName string
6param inputStorageAccountName string
7param outputStorageAccountName string
8
9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')
10
11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {
12 identity: {
13 type: 'SystemAssigned'
14 }
15 location: resourceGroup().location
16 name: factoryName
17 properties: {
18 publicNetworkAccess: 'Enabled'
19 }
20 tags: tags
21}
22
23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {
24 parent: factory
25 name: 'clicklog-pipeline'
26 dependsOn: [
27 inputDataSet
28 outputDataSet
29 ]
30 properties: {
31 activities: [
32 {
33 dependsOn: [
34 {
35 activity: 'LookupBlob'
36 dependencyConditions: [
37 'Succeeded'
38 ]
39 }
40 ]
41 policy: {
42 timeout: '0.12:00:00'
43 retry: 0
44 retryIntervalInSeconds: 30
45 secureOutput: false
46 secureInput: false
47 }
48 name: 'CopyAccessLog'
49 userProperties: [
50 {
51 name: 'Source'
52 value: 'container-name/<your path>/'
53 }
54 {
55 name: 'Destination'
56 value: 'destination-container-name//'
57 }
58 ]
59 type: 'Copy'
60 inputs: [
61 {
62 referenceName: 'SourceDataset'
63 type: 'DatasetReference'
64 parameters: {}
65 }
66 ]
67 outputs: [
68 {
69 referenceName: 'DestinationDataset'
70 type: 'DatasetReference'
71 parameters: {}
72 }
73 ]
74 typeProperties: {
75 source: {
76 type: 'JsonSource'
77 storeSettings: {
78 type: 'AzureBlobStorageReadSettings'
79 recursive: true
80 wildcardFolderPath: {
81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
82 type: 'Expression'
83 }
84 wildcardFileName: '*'
85 enablePartitionDiscovery: false
86 }
87 formatSettings: {
88 type: 'JsonReadSettings'
89 }
90 }
91 sink: {
92 type: 'ParquetSink'
93 storeSettings: {
94 type: 'AzureBlobStorageWriteSettings'
95 copyBehavior: 'FlattenHierarchy'
96 }
97 formatSettings: {
98 type: 'ParquetWriteSettings'
99 }
100 }
101 enableStaging: false
102 validateDataConsistency: false
103 translator: {
104 type: 'TabularTranslator'
105 mappings: [ // Define mappings here
106 {
107 source: {
108 path: '$[\'time\']'
109 }
110 sink: {
111 name: 'time'
112 type: 'DateTime'
113 }
114 }
115 // ...
116 ]
117 }
118 }
119 }
120 ]
121 policy: {
122 elapsedTimeMetric: {}
123 }
124 }
125}
126
127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
128 parent: factory
129 name: 'AzureBlobStorageInput'
130 properties: {
131 annotations: []
132 type: 'AzureBlobStorage'
133 typeProperties: {
134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'
135 accountKind: 'StorageV2'
136 }
137 // For remaining properties, see LinkedService objects
138 }
139}
140
141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
142 parent: factory
143 name: 'AzureBlobStorageOutput'
144 properties: {
145 annotations: []
146 type: 'AzureBlobStorage'
147 typeProperties: {
148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'
149 accountKind: 'StorageV2'
150 }
151 // For remaining properties, see LinkedService objects
152 }
153}
154
155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
156 parent: factory
157 name: 'DestinationDataset'
158 dependsOn: [
159 outputBlob
160 ]
161 properties: {
162 annotations: []
163 description: 'string'
164 folder: {
165 name: 'string'
166 }
167 linkedServiceName: {
168 referenceName: 'AzureBlobStorageOutput'
169 type: 'LinkedServiceReference'
170 }
171 typeProperties: {
172 location: {
173 type: 'AzureBlobStorageLocation'
174 folderPath: {
175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
176 type: 'Expression'
177 }
178 container: 'container-name'
179 }
180 compressionCodec: 'snappy'
181 }
182 type: 'Parquet'
183 // For remaining properties, see Dataset objects
184 }
185}
186
187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
188 parent: factory
189 name: 'SourceDataset'
190 dependsOn: [
191 inputBlob
192 ]
193 properties: {
194 annotations: []
195 description: 'string'
196 folder: {
197 name: 'string'
198 }
199 linkedServiceName: {
200 referenceName: 'AzureBlobStorageInput'
201 type: 'LinkedServiceReference'
202 }
203 typeProperties: {
204 location: {
205 type: 'AzureBlobStorageLocation'
206 folderPath: 'your-path'
207 container: 'container-name'
208 }
209 }
210 type: 'Json'
211 schema: {
212 type: 'object'
213 properties: {
214 time: {
215 type: 'string'
216 }
217 ...
218 }
219 }
220 // For remaining properties, see Dataset objects
221 }
222}
223
224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {
225 parent: factory
226 name: 'DailyTrigger'
227 dependsOn: [
228 pipelines
229 ]
230 properties: {
231 annotations: []
232 // runtimeState: 'Started'
233 pipelines: [
234 {
235 pipelineReference: {
236 referenceName: 'your-pipeline-name'
237 type: 'PipelineReference'
238 }
239 parameters: {}
240 }
241 ]
242 type: 'ScheduleTrigger'
243 typeProperties: {
244 recurrence: {
245 frequency: 'Day'
246 interval: 1
247 startTime: triggerStartTime
248 timeZone: 'UTC'
249 schedule: {
250 minutes: [
251 0
252 ]
253 hours: [
254 0
255 ]
256 }
257 }
258 }
259 // For remaining properties, see Trigger objects
260 }
261}
262
263output managedIdentityPrincipalId string = factory.identity.principalId

Les ressources utilisées ici sont les suivantes.

RessourceDescription
Microsoft.DataFactory/factoriesL’instance Data Factory
Microsoft.DataFactory/factories/pipelinesDéfinition du pipeline. Le pipeline exécute des copies de données, etc.
Microsoft.DataFactory/factories/linkedservicesLinked services reliant les stockages d’entrée et de sortie
Microsoft.DataFactory/factories/datasetsDatasets d’entrée et de sortie
Microsoft.DataFactory/factories/triggersConfiguration des triggers. Inutile si vous ne déclenchez pas automatiquement.

Créer Synapse

Ensuite, déployez Synapse Analytics. Comme un pool SQL serverless intégré est disponible par défaut, nous y créons une base de données.

Créer une base de données

Créez une base de données dans le pool SQL serverless intégré.

sql
1CREATE DATABASE [myDB];
2GO
3
4USE [myDB];
5GO

Ajouter une source de données externe

Créez une source de données externe et connectez-la au data lake. À ce moment-là, créez WorkspaceIdentity et configurez l’accès à cette source de données externe via les autorisations de WorkspaceIdentity.

sql
1CREATE MASTER KEY
2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';
3
4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
5WITH IDENTITY = 'Managed Identity';
6GO
7
8CREATE EXTERNAL DATA SOURCE MyDataLake
9WITH (
10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',
11 CREDENTIAL = WorkspaceIdentity
12);
13GO

Créer une table externe

Créez une table externe et définissez DATA_SOURCE sur la source de données externe créée précédemment. Pour LOCATION, indiquez le chemin blob. Les jokers sont pris en charge.

sql
1-- Créer le schéma
2CREATE SCHEMA ext AUTHORIZATION dbo;
3GO
4
5CREATE SCHEMA func AUTHORIZATION dbo;
6GO
7
8-- Créer le format de fichier
9CREATE EXTERNAL FILE FORMAT [ParquetFormat]
10WITH (FORMAT_TYPE = PARQUET);
11GO
12
13-- Créer la table externe
14CREATE EXTERNAL TABLE ext.myTable
15(
16 -- Définition des colonnes
17 time datetime2 NULL,
18 propertyA nvarchar(512) NULL,
19 propertyB nvarchar(512) NULL,
20 ...
21)
22WITH
23(
24 LOCATION = 'y=*/**',
25 DATA_SOURCE = [MyDataLake],
26 FILE_FORMAT = [ParquetFormat]
27);
28GO

Créer une vue

Créez une vue et limitez les propriétés (colonnes) auxquelles Functions accédera.

sql
1CREATE VIEW func.viewOfMyTable
2AS
3SELECT
4 [time],
5 [propertyA],
6 [propertyB],
7 ...
8FROM ext.myTable
9WHERE <ajoutez une condition ici si nécessaire>;
10GO

Accorder des autorisations au service principal

Il s’agit d’une configuration pour ajouter des autorisations à la base de données depuis un pipeline CI/CD. Si vous ne prévoyez pas de le faire, vous pouvez ignorer cette section.

En utilisant le nom du SPN utilisé dans le pipeline, créez un utilisateur externe et accordez des autorisations.

sql
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;
2GO
3
4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];
5GO
6
7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];
8GO

Donner le rôle « Directory Readers » (Entra ID) à l’identité managée du workspace Synapse

Si vous souhaitez accorder dynamiquement des autorisations à Functions, vous pouvez spécifier le nom de la Function. Le workspace recherchera automatiquement l’objectId de l’identité managée correspondant à ce nom et accordera les autorisations. Pour cela, le workspace lui-même doit disposer du rôle « Directory Readers ».

Ajoutez le rôle via Entra ID > Rôles et administrateurs > Directory Readers > Ajouter des attributions à l’identité du workspace. Vous pouvez vérifier l’objectId de l’identité managée du workspace dans Synapse Studio, menu d’administration Credentials.

Accorder des autorisations à Functions depuis le pipeline

Lorsque vous déployez une nouvelle Function, accordez-lui l’accès à la base de données. Pour exécuter cela dans Azure Pipelines, j’ai suivi les étapes suivantes.

  1. Préparer un script SQL
  2. Remplacer dynamiquement le nom de la Function dans le script à l’exécution du pipeline
  3. Exécuter le SQL avec SqlAzureDacpacDeployment@1

Je détaille ces étapes ci-dessous.

Préparer le SQL

Préparez un SQL comme suit. La partie FunctionAppMiName sera remplacée plus tard par le nom de la Function cible.

sql
1USE [myDB];
2
3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')
4BEGIN
5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';
6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;
7END
8ELSE
9BEGIN
10 PRINT 'User [FunctionAppMiName] already exists.';
11END;
12
13IF NOT EXISTS (
14 SELECT 1
15 FROM sys.database_role_members rm
16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id
17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id
18 WHERE r.name = N'db_datareader'
19 AND m.name = N'FunctionAppMiName'
20)
21BEGIN
22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';
23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];
24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];
25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];
26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];
27END
28ELSE
29BEGIN
30 PRINT 'Already in db_datareader.';
31END;

Brève explication des autorisations :

db_datareader est nécessaire pour lire les données. J’ai également ajouté l’autorisation de référence sur WorkspaceIdentity, car l’accès à la source de données externe ne fonctionnait pas sans cela. Enfin, j’accorde l’accès à la vue et je veux interdire les requêtes directes sur les tables externes, donc je fais un DENY sur le schéma.

Insérer le nom de la Function

J’ai réécrit le fichier SQL à l’aide d’une tâche PowerShell@2 comme ci-dessous, mais n’importe quelle méthode convient tant que vous pouvez remplacer le nom de la Function dans le fichier.

bash
1- task: PowerShell@2
2 displayName: 'Generate grant-function-mi.sql from template'
3 inputs:
4 targetType: 'inline'
5 pwsh: true
6 script: |
7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"
8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"
9
10 if (-not (Test-Path $templatePath)) {
11 throw "Template not found: $templatePath"
12 }
13
14 # Lire le modèle
15 $content = Get-Content $templatePath -Raw
16
17 # Remplacer (renseigner via les variables du pipeline)
18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'
19
20 # Créer le répertoire de sortie s'il n'existe pas
21 $dir = Split-Path $outputPath
22 if (-not (Test-Path $dir)) {
23 New-Item -Path $dir -ItemType Directory -Force | Out-Null
24 }
25
26 # Écrire le SQL
27 $content | Set-Content $outputPath -Encoding UTF8
28
29 Write-Host "Generated SQL file:"
30 Get-Content $outputPath

Exécuter le SQL

Exécutez le SQL réécrit. Par exemple, avec une tâche comme celle-ci.

bash
1- task: SqlAzureDacpacDeployment@1
2 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'
3 inputs:
4 # Connexion Azure (nom de la connexion de service)
5 azureSubscription: '${{parameters.azureServiceConnection}}'
6
7 # Synapse serverless SQL endpoint
8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'
9 DatabaseName: 'myDB'
10
11 # Authentification (via le SPN de la connexion de service)
12 SqlUserName: '' # Peut rester vide
13 SqlPassword: '' # Peut rester vide
14 AuthenticationType: 'servicePrincipal'
15
16 # Mode d'exécution : script SQL (pas DACPAC)
17 deployType: 'SqlTask'
18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # Chemin vers le fichier .sql généré ci-dessus
19
20 # Options (si nécessaire)
21 IpDetectionMethod: 'AutoDetect'

Dans mon cas, j’ai utilisé SqlAzureDacpacDeployment@1, mais cette méthode ne permet pas de passer le nom de la Function en paramètre. Il faut donc réécrire le SQL au préalable pour inclure le nom de la Function, puis exécuter la tâche.

Le point important est d’accorder les autorisations à la Function comme indiqué dans le fichier SQL. Vous n’êtes pas obligé de suivre exactement cette procédure.

Requête depuis Functions

Depuis Functions, envoyez une requête à la vue comme ci-dessous. En .NET, j’utilise le package Microsoft.Data.SqlClient.

csharp
1var queryString = @"
2SELECT TOP (@maxItemCount)
3 [time],
4 [requestUri]
5 -- ...
6FROM func.viewOfMyTable
7WHERE
8 [time] >= @startDate
9 AND [time] <= @endDate
10ORDER BY
11 [time] DESC;
12";

Conclusion

Pour interroger depuis Functions, vous devez accorder des autorisations à la Function. De plus, si vous souhaitez ajouter les autorisations depuis un pipeline CI/CD, vous devrez aussi configurer des autorisations pour le SPN, ce qui peut être un peu délicat. Comme il y a à la fois des autorisations d’identité managée et des autorisations d’accès à la base, faites attention à ne pas les confondre.

Dans cet article, nous avons créé une source de données externe et une vue. Il serait aussi possible d’accéder aux données directement depuis Functions avec OPENROWSET, mais cela demanderait d’accorder des autorisations plus larges à la Function, donc je n’ai pas choisi cette approche.

Partager :

Articles connexes