Cómo consultar Synapse Analytics desde Azure Functions

⏱️5 min
Compartir:

Este artículo resume cómo ejecutar consultas desde Azure Functions utilizando Azure Synapse Analytics.

Pasos

Transformación y copia de datos con Data Factory

Primero, prepare los datos que desea analizar. En este ejemplo, analizamos registros almacenados en Storage. Después de transformar los datos a un formato más adecuado para el análisis, colóquelos en Azure Data Lake Storage Gen2.

Aquí usamos un servicio llamado Data Factory. Con Data Factory se realiza la copia de datos. Como durante la copia puede elegir el formato, lo convertiremos a Parquet.

A continuación se muestra un ejemplo de Bicep. En este ejemplo se usan comodines y expresiones en la configuración de rutas, pero ajuste esas partes según sus requisitos. Normalmente es más fácil crearlo desde la UI en Data Factory Studio. También puede exportar los pipelines creados en Studio como archivos Bicep.

bicep
1param factoryName string
2param tags object
3@secure()
4param subscriptionId string
5param sourceResourceGroupName string
6param inputStorageAccountName string
7param outputStorageAccountName string
8
9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')
10
11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {
12 identity: {
13 type: 'SystemAssigned'
14 }
15 location: resourceGroup().location
16 name: factoryName
17 properties: {
18 publicNetworkAccess: 'Enabled'
19 }
20 tags: tags
21}
22
23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {
24 parent: factory
25 name: 'clicklog-pipeline'
26 dependsOn: [
27 inputDataSet
28 outputDataSet
29 ]
30 properties: {
31 activities: [
32 {
33 dependsOn: [
34 {
35 activity: 'LookupBlob'
36 dependencyConditions: [
37 'Succeeded'
38 ]
39 }
40 ]
41 policy: {
42 timeout: '0.12:00:00'
43 retry: 0
44 retryIntervalInSeconds: 30
45 secureOutput: false
46 secureInput: false
47 }
48 name: 'CopyAccessLog'
49 userProperties: [
50 {
51 name: 'Source'
52 value: 'container-name/<your path>/'
53 }
54 {
55 name: 'Destination'
56 value: 'destination-container-name//'
57 }
58 ]
59 type: 'Copy'
60 inputs: [
61 {
62 referenceName: 'SourceDataset'
63 type: 'DatasetReference'
64 parameters: {}
65 }
66 ]
67 outputs: [
68 {
69 referenceName: 'DestinationDataset'
70 type: 'DatasetReference'
71 parameters: {}
72 }
73 ]
74 typeProperties: {
75 source: {
76 type: 'JsonSource'
77 storeSettings: {
78 type: 'AzureBlobStorageReadSettings'
79 recursive: true
80 wildcardFolderPath: {
81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
82 type: 'Expression'
83 }
84 wildcardFileName: '*'
85 enablePartitionDiscovery: false
86 }
87 formatSettings: {
88 type: 'JsonReadSettings'
89 }
90 }
91 sink: {
92 type: 'ParquetSink'
93 storeSettings: {
94 type: 'AzureBlobStorageWriteSettings'
95 copyBehavior: 'FlattenHierarchy'
96 }
97 formatSettings: {
98 type: 'ParquetWriteSettings'
99 }
100 }
101 enableStaging: false
102 validateDataConsistency: false
103 translator: {
104 type: 'TabularTranslator'
105 mappings: [ // Define mappings here
106 {
107 source: {
108 path: '$[\'time\']'
109 }
110 sink: {
111 name: 'time'
112 type: 'DateTime'
113 }
114 }
115 // ...
116 ]
117 }
118 }
119 }
120 ]
121 policy: {
122 elapsedTimeMetric: {}
123 }
124 }
125}
126
127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
128 parent: factory
129 name: 'AzureBlobStorageInput'
130 properties: {
131 annotations: []
132 type: 'AzureBlobStorage'
133 typeProperties: {
134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'
135 accountKind: 'StorageV2'
136 }
137 // For remaining properties, see LinkedService objects
138 }
139}
140
141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
142 parent: factory
143 name: 'AzureBlobStorageOutput'
144 properties: {
145 annotations: []
146 type: 'AzureBlobStorage'
147 typeProperties: {
148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'
149 accountKind: 'StorageV2'
150 }
151 // For remaining properties, see LinkedService objects
152 }
153}
154
155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
156 parent: factory
157 name: 'DestinationDataset'
158 dependsOn: [
159 outputBlob
160 ]
161 properties: {
162 annotations: []
163 description: 'string'
164 folder: {
165 name: 'string'
166 }
167 linkedServiceName: {
168 referenceName: 'AzureBlobStorageOutput'
169 type: 'LinkedServiceReference'
170 }
171 typeProperties: {
172 location: {
173 type: 'AzureBlobStorageLocation'
174 folderPath: {
175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
176 type: 'Expression'
177 }
178 container: 'container-name'
179 }
180 compressionCodec: 'snappy'
181 }
182 type: 'Parquet'
183 // For remaining properties, see Dataset objects
184 }
185}
186
187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
188 parent: factory
189 name: 'SourceDataset'
190 dependsOn: [
191 inputBlob
192 ]
193 properties: {
194 annotations: []
195 description: 'string'
196 folder: {
197 name: 'string'
198 }
199 linkedServiceName: {
200 referenceName: 'AzureBlobStorageInput'
201 type: 'LinkedServiceReference'
202 }
203 typeProperties: {
204 location: {
205 type: 'AzureBlobStorageLocation'
206 folderPath: 'your-path'
207 container: 'container-name'
208 }
209 }
210 type: 'Json'
211 schema: {
212 type: 'object'
213 properties: {
214 time: {
215 type: 'string'
216 }
217 ...
218 }
219 }
220 // For remaining properties, see Dataset objects
221 }
222}
223
224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {
225 parent: factory
226 name: 'DailyTrigger'
227 dependsOn: [
228 pipelines
229 ]
230 properties: {
231 annotations: []
232 // runtimeState: 'Started'
233 pipelines: [
234 {
235 pipelineReference: {
236 referenceName: 'your-pipeline-name'
237 type: 'PipelineReference'
238 }
239 parameters: {}
240 }
241 ]
242 type: 'ScheduleTrigger'
243 typeProperties: {
244 recurrence: {
245 frequency: 'Day'
246 interval: 1
247 startTime: triggerStartTime
248 timeZone: 'UTC'
249 schedule: {
250 minutes: [
251 0
252 ]
253 hours: [
254 0
255 ]
256 }
257 }
258 }
259 // For remaining properties, see Trigger objects
260 }
261}
262
263output managedIdentityPrincipalId string = factory.identity.principalId

Los recursos que aparecen aquí son los siguientes.

RecursoDescripción
Microsoft.DataFactory/factoriesLa instancia de Data Factory
Microsoft.DataFactory/factories/pipelinesDefinición del pipeline. El pipeline realiza copias de datos, etc.
Microsoft.DataFactory/factories/linkedservicesLinked services que vinculan el almacenamiento de entrada y salida
Microsoft.DataFactory/factories/datasetsDatasets de entrada y salida
Microsoft.DataFactory/factories/triggersConfiguración del trigger. No es necesario si no desea ejecución automática.

Crear Synapse

A continuación, implemente Synapse Analytics. Como existe un pool SQL serverless integrado por defecto, crearemos una base de datos allí.

Crear una base de datos

Cree una base de datos en el pool SQL serverless integrado.

sql
1CREATE DATABASE [myDB];
2GO
3
4USE [myDB];
5GO

Agregar una fuente de datos externa

Cree una fuente de datos externa y conéctela al data lake. En este momento, cree WorkspaceIdentity y configure el acceso a esta fuente de datos externa utilizando los permisos de WorkspaceIdentity.

sql
1CREATE MASTER KEY
2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';
3
4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
5WITH IDENTITY = 'Managed Identity';
6GO
7
8CREATE EXTERNAL DATA SOURCE MyDataLake
9WITH (
10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',
11 CREDENTIAL = WorkspaceIdentity
12);
13GO

Crear una tabla externa

Cree una tabla externa y establezca DATA_SOURCE en la fuente de datos externa creada anteriormente. En LOCATION especifique la ruta del blob. Se pueden usar comodines.

sql
1-- Crear esquema
2CREATE SCHEMA ext AUTHORIZATION dbo;
3GO
4
5CREATE SCHEMA func AUTHORIZATION dbo;
6GO
7
8-- Crear formato de archivo
9CREATE EXTERNAL FILE FORMAT [ParquetFormat]
10WITH (FORMAT_TYPE = PARQUET);
11GO
12
13-- Crear tabla externa
14CREATE EXTERNAL TABLE ext.myTable
15(
16 -- Definición de columnas
17 time datetime2 NULL,
18 propertyA nvarchar(512) NULL,
19 propertyB nvarchar(512) NULL,
20 ...
21)
22WITH
23(
24 LOCATION = 'y=*/**',
25 DATA_SOURCE = [MyDataLake],
26 FILE_FORMAT = [ParquetFormat]
27);
28GO

Crear una vista

Cree una vista y reduzca las propiedades a las que accederán las Functions.

sql
1CREATE VIEW func.viewOfMyTable
2AS
3SELECT
4 [time],
5 [propertyA],
6 [propertyB],
7 ...
8FROM ext.myTable
9WHERE <escriba aquí una condición si es necesario>;
10GO

Conceder permisos al service principal

Esta es una configuración para agregar permisos a la base de datos desde un pipeline en CI/CD. Si no planea hacerlo, puede omitir esta sección.

Usando el nombre del SPN utilizado en el pipeline, cree un usuario externo y conceda permisos.

sql
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;
2GO
3
4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];
5GO
6
7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];
8GO

Conceder el rol “Directory Readers” (Entra ID) a la identidad administrada del workspace de Synapse

Si desea conceder permisos dinámicamente a Functions, puede especificar el nombre de la Function y el workspace buscará automáticamente el objectId de la identidad administrada correspondiente y concederá permisos. Para hacerlo, parece que el propio workspace necesita tener el rol “Directory Readers”.

Agregue el rol a la identidad del workspace desde Entra ID > Roles y administradores > Directory Readers > Agregar asignaciones. Puede confirmar el objectId de la identidad administrada del workspace en Synapse Studio, en el menú de administración Credentials.

Conceder permisos a Functions desde el pipeline

Cuando implemente una nueva Function, concédale acceso a la base de datos. Para ejecutarlo en Azure Pipelines, utilicé los siguientes pasos.

  1. Preparar un script SQL
  2. Reemplazar dinámicamente el nombre de la Function dentro del script durante la ejecución del pipeline
  3. Ejecutar el SQL usando SqlAzureDacpacDeployment@1

A continuación lo explico paso a paso.

Preparar el SQL

Prepare un SQL como el siguiente. La parte FunctionAppMiName se reemplazará más adelante por el nombre de la Function objetivo.

sql
1USE [myDB];
2
3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')
4BEGIN
5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';
6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;
7END
8ELSE
9BEGIN
10 PRINT 'User [FunctionAppMiName] already exists.';
11END;
12
13IF NOT EXISTS (
14 SELECT 1
15 FROM sys.database_role_members rm
16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id
17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id
18 WHERE r.name = N'db_datareader'
19 AND m.name = N'FunctionAppMiName'
20)
21BEGIN
22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';
23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];
24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];
25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];
26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];
27END
28ELSE
29BEGIN
30 PRINT 'Already in db_datareader.';
31END;

Breve explicación de los permisos:

db_datareader es necesario para leer datos. También agregué el permiso de referencia sobre WorkspaceIdentity, porque sin él no podía accederse a la fuente de datos externa. Además, concedo acceso a la vista y quiero prohibir consultas directas a las tablas externas, por lo que hago DENY sobre el esquema.

Insertar el nombre de la Function

Reescribí el archivo SQL usando una tarea PowerShell@2 como se muestra a continuación, pero cualquier método es válido mientras pueda reemplazar el nombre de la Function en el archivo.

bash
1- task: PowerShell@2
2 displayName: 'Generate grant-function-mi.sql from template'
3 inputs:
4 targetType: 'inline'
5 pwsh: true
6 script: |
7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"
8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"
9
10 if (-not (Test-Path $templatePath)) {
11 throw "Template not found: $templatePath"
12 }
13
14 # Leer plantilla
15 $content = Get-Content $templatePath -Raw
16
17 # Reemplazar (rellenar usando variables del pipeline)
18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'
19
20 # Crear el directorio de salida si no existe
21 $dir = Split-Path $outputPath
22 if (-not (Test-Path $dir)) {
23 New-Item -Path $dir -ItemType Directory -Force | Out-Null
24 }
25
26 # Escribir SQL
27 $content | Set-Content $outputPath -Encoding UTF8
28
29 Write-Host "Generated SQL file:"
30 Get-Content $outputPath

Ejecutar el SQL

Ejecute el SQL reescrito. Por ejemplo, con una tarea como la siguiente.

bash
1- task: SqlAzureDacpacDeployment@1
2 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'
3 inputs:
4 # Conexión a Azure (nombre de la conexión de servicio)
5 azureSubscription: '${{parameters.azureServiceConnection}}'
6
7 # Synapse serverless SQL endpoint
8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'
9 DatabaseName: 'myDB'
10
11 # Autenticación (usando el SPN de la conexión de servicio)
12 SqlUserName: '' # Puede quedar vacío
13 SqlPassword: '' # Puede quedar vacío
14 AuthenticationType: 'servicePrincipal'
15
16 # Modo de ejecución: script SQL (no DACPAC)
17 deployType: 'SqlTask'
18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # Ruta al archivo .sql generado arriba
19
20 # Opciones (según sea necesario)
21 IpDetectionMethod: 'AutoDetect'

En mi caso utilicé SqlAzureDacpacDeployment@1, pero este método no permite pasar el nombre de la Function como parámetro. Por ello, el flujo consiste en reescribir el SQL previamente para insertar el nombre de la Function y luego ejecutar la tarea.

Lo importante es conceder permisos a la Function como se muestra en el archivo SQL, por lo que no necesariamente tiene que seguir exactamente este procedimiento.

Consulta desde Functions

Desde Functions, envíe una consulta a la vista como la siguiente. En .NET uso el paquete Microsoft.Data.SqlClient.

csharp
1var queryString = @"
2SELECT TOP (@maxItemCount)
3 [time],
4 [requestUri]
5 -- ...
6FROM func.viewOfMyTable
7WHERE
8 [time] >= @startDate
9 AND [time] <= @endDate
10ORDER BY
11 [time] DESC;
12";

Resumen

Para consultar desde Functions, debe conceder permisos a la Function. Además, si desea agregar permisos desde un pipeline de CI/CD, también necesita configurar permisos para el SPN, lo que puede ser un poco complejo. Como hay dos capas (permisos de identidad administrada y permisos de acceso a la base de datos), tenga cuidado de no confundirlas.

En este artículo creamos una fuente de datos externa y una vista. También sería posible acceder a los datos directamente desde Functions usando OPENROWSET, pero no elegí ese enfoque porque requeriría conceder permisos más amplios a la Function.

Compartir:

Artículos relacionados