So fragen Sie Synapse Analytics aus Azure Functions ab

⏱️4 Min
Teilen:

Dieser Artikel fasst zusammen, wie Sie mit Azure Synapse Analytics aus Azure Functions heraus Abfragen ausführen.

Schritte

Daten mit Data Factory transformieren und kopieren

Zuerst bereiten wir die zu analysierenden Daten vor. In diesem Beispiel analysieren wir Logs, die in Storage gespeichert sind. Wir bereiten die Daten so auf, dass sie sich gut analysieren lassen, und legen sie in Azure Data Lake Storage Gen2 ab.

Hier verwenden wir den Dienst Data Factory. Mit Data Factory kopieren wir die Daten. Beim Kopieren kann man das Format auswählen, daher verwenden wir das Parquet-Format.

Unten sehen Sie ein Bicep-Beispiel. In diesem Beispiel werden in den Pfad-Einstellungen Wildcards und Ausdrücke verwendet. Passen Sie diese Teile an Ihre Anforderungen an. Oft ist es leichter, das Ganze in Data Factory Studio per UI zu erstellen. Sie können Pipelines, die Sie in Studio erstellen, auch als Bicep-Dateien exportieren.

bicep
1param factoryName string
2param tags object
3@secure()
4param subscriptionId string
5param sourceResourceGroupName string
6param inputStorageAccountName string
7param outputStorageAccountName string
8
9param triggerStartTime string = utcNow('yyyy-MM-ddTHH:mm:ssZ')
10
11resource factory 'Microsoft.DataFactory/factories@2018-06-01' = {
12 identity: {
13 type: 'SystemAssigned'
14 }
15 location: resourceGroup().location
16 name: factoryName
17 properties: {
18 publicNetworkAccess: 'Enabled'
19 }
20 tags: tags
21}
22
23resource pipelines 'Microsoft.DataFactory/factories/pipelines@2018-06-01' = {
24 parent: factory
25 name: 'clicklog-pipeline'
26 dependsOn: [
27 inputDataSet
28 outputDataSet
29 ]
30 properties: {
31 activities: [
32 {
33 dependsOn: [
34 {
35 activity: 'LookupBlob'
36 dependencyConditions: [
37 'Succeeded'
38 ]
39 }
40 ]
41 policy: {
42 timeout: '0.12:00:00'
43 retry: 0
44 retryIntervalInSeconds: 30
45 secureOutput: false
46 secureInput: false
47 }
48 name: 'CopyAccessLog'
49 userProperties: [
50 {
51 name: 'Source'
52 value: 'container-name/<your path>/'
53 }
54 {
55 name: 'Destination'
56 value: 'destination-container-name//'
57 }
58 ]
59 type: 'Copy'
60 inputs: [
61 {
62 referenceName: 'SourceDataset'
63 type: 'DatasetReference'
64 parameters: {}
65 }
66 ]
67 outputs: [
68 {
69 referenceName: 'DestinationDataset'
70 type: 'DatasetReference'
71 parameters: {}
72 }
73 ]
74 typeProperties: {
75 source: {
76 type: 'JsonSource'
77 storeSettings: {
78 type: 'AzureBlobStorageReadSettings'
79 recursive: true
80 wildcardFolderPath: {
81 value: 'your-path/@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
82 type: 'Expression'
83 }
84 wildcardFileName: '*'
85 enablePartitionDiscovery: false
86 }
87 formatSettings: {
88 type: 'JsonReadSettings'
89 }
90 }
91 sink: {
92 type: 'ParquetSink'
93 storeSettings: {
94 type: 'AzureBlobStorageWriteSettings'
95 copyBehavior: 'FlattenHierarchy'
96 }
97 formatSettings: {
98 type: 'ParquetWriteSettings'
99 }
100 }
101 enableStaging: false
102 validateDataConsistency: false
103 translator: {
104 type: 'TabularTranslator'
105 mappings: [ // Define mappings here
106 {
107 source: {
108 path: '$[\'time\']'
109 }
110 sink: {
111 name: 'time'
112 type: 'DateTime'
113 }
114 }
115 // ...
116 ]
117 }
118 }
119 }
120 ]
121 policy: {
122 elapsedTimeMetric: {}
123 }
124 }
125}
126
127resource inputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
128 parent: factory
129 name: 'AzureBlobStorageInput'
130 properties: {
131 annotations: []
132 type: 'AzureBlobStorage'
133 typeProperties: {
134 serviceEndpoint: 'https://${inputStorageAccountName}.blob.${environment().suffixes.storage}/'
135 accountKind: 'StorageV2'
136 }
137 // For remaining properties, see LinkedService objects
138 }
139}
140
141resource outputBlob 'Microsoft.DataFactory/factories/linkedservices@2018-06-01' = {
142 parent: factory
143 name: 'AzureBlobStorageOutput'
144 properties: {
145 annotations: []
146 type: 'AzureBlobStorage'
147 typeProperties: {
148 serviceEndpoint: 'https://${outputStorageAccountName}.blob.${environment().suffixes.storage}/'
149 accountKind: 'StorageV2'
150 }
151 // For remaining properties, see LinkedService objects
152 }
153}
154
155resource outputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
156 parent: factory
157 name: 'DestinationDataset'
158 dependsOn: [
159 outputBlob
160 ]
161 properties: {
162 annotations: []
163 description: 'string'
164 folder: {
165 name: 'string'
166 }
167 linkedServiceName: {
168 referenceName: 'AzureBlobStorageOutput'
169 type: 'LinkedServiceReference'
170 }
171 typeProperties: {
172 location: {
173 type: 'AzureBlobStorageLocation'
174 folderPath: {
175 value: '@{concat(\'y=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'yyyy\'))}/@{concat(\'m=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'MM\'))}/@{concat(\'d=\', formatDateTime(addHours(pipeline().TriggerTime, -2), \'dd\'))}'
176 type: 'Expression'
177 }
178 container: 'container-name'
179 }
180 compressionCodec: 'snappy'
181 }
182 type: 'Parquet'
183 // For remaining properties, see Dataset objects
184 }
185}
186
187resource inputDataSet 'Microsoft.DataFactory/factories/datasets@2018-06-01' = {
188 parent: factory
189 name: 'SourceDataset'
190 dependsOn: [
191 inputBlob
192 ]
193 properties: {
194 annotations: []
195 description: 'string'
196 folder: {
197 name: 'string'
198 }
199 linkedServiceName: {
200 referenceName: 'AzureBlobStorageInput'
201 type: 'LinkedServiceReference'
202 }
203 typeProperties: {
204 location: {
205 type: 'AzureBlobStorageLocation'
206 folderPath: 'your-path'
207 container: 'container-name'
208 }
209 }
210 type: 'Json'
211 schema: {
212 type: 'object'
213 properties: {
214 time: {
215 type: 'string'
216 }
217 ...
218 }
219 }
220 // For remaining properties, see Dataset objects
221 }
222}
223
224resource DailyTrigger 'Microsoft.DataFactory/factories/triggers@2018-06-01' = {
225 parent: factory
226 name: 'DailyTrigger'
227 dependsOn: [
228 pipelines
229 ]
230 properties: {
231 annotations: []
232 // runtimeState: 'Started'
233 pipelines: [
234 {
235 pipelineReference: {
236 referenceName: 'your-pipeline-name'
237 type: 'PipelineReference'
238 }
239 parameters: {}
240 }
241 ]
242 type: 'ScheduleTrigger'
243 typeProperties: {
244 recurrence: {
245 frequency: 'Day'
246 interval: 1
247 startTime: triggerStartTime
248 timeZone: 'UTC'
249 schedule: {
250 minutes: [
251 0
252 ]
253 hours: [
254 0
255 ]
256 }
257 }
258 }
259 // For remaining properties, see Trigger objects
260 }
261}
262
263output managedIdentityPrincipalId string = factory.identity.principalId

Die hier verwendeten Ressourcen sind wie folgt.

RessourceBeschreibung
Microsoft.DataFactory/factoriesDie Data-Factory-Instanz
Microsoft.DataFactory/factories/pipelinesPipeline-Definition. Die Pipeline führt u. a. Datenkopien aus.
Microsoft.DataFactory/factories/linkedservicesDieses Linked Service verbindet Input- und Output-Storage
Microsoft.DataFactory/factories/datasetsInput- und Output-Datasets
Microsoft.DataFactory/factories/triggersTrigger-Konfiguration. Nicht erforderlich, wenn keine automatische Ausführung nötig ist.

Synapse erstellen

Als Nächstes deployen wir Synapse Analytics. Da standardmäßig ein integrierter serverloser SQL-Pool vorhanden ist, erstellen wir dort eine Datenbank.

Datenbank erstellen

Erstellen Sie eine Datenbank im integrierten serverlosen SQL-Pool.

sql
1CREATE DATABASE [myDB];
2GO
3
4USE [myDB];
5GO

Externe Datenquelle hinzufügen

Erstellen Sie eine externe Datenquelle und verbinden Sie sie mit dem Data Lake. Dabei erstellen Sie WorkspaceIdentity und stellen sicher, dass der Zugriff auf diese externe Datenquelle mit den Berechtigungen von WorkspaceIdentity erfolgt.

sql
1CREATE MASTER KEY
2ENCRYPTION BY PASSWORD = 'StrongPassword_ChangeThis!';
3
4CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
5WITH IDENTITY = 'Managed Identity';
6GO
7
8CREATE EXTERNAL DATA SOURCE MyDataLake
9WITH (
10 LOCATION = 'https://<your storage name>.blob.core.windows.net/<container name>',
11 CREDENTIAL = WorkspaceIdentity
12);
13GO

Externe Tabelle erstellen

Erstellen Sie eine externe Tabelle und setzen Sie DATA_SOURCE auf die zuvor erstellte externe Datenquelle. Für LOCATION geben Sie den Blob-Pfad an. Wildcards können verwendet werden.

sql
1-- Schema erstellen
2CREATE SCHEMA ext AUTHORIZATION dbo;
3GO
4
5CREATE SCHEMA func AUTHORIZATION dbo;
6GO
7
8-- Dateiformat erstellen
9CREATE EXTERNAL FILE FORMAT [ParquetFormat]
10WITH (FORMAT_TYPE = PARQUET);
11GO
12
13-- Externe Tabelle erstellen
14CREATE EXTERNAL TABLE ext.myTable
15(
16 -- Spaltendefinitionen
17 time datetime2 NULL,
18 propertyA nvarchar(512) NULL,
19 propertyB nvarchar(512) NULL,
20 ...
21)
22WITH
23(
24 LOCATION = 'y=*/**',
25 DATA_SOURCE = [MyDataLake],
26 FILE_FORMAT = [ParquetFormat]
27);
28GO

View erstellen

Erstellen Sie eine View und beschränken Sie die Spalten auf die Eigenschaften, auf die Functions zugreifen soll.

sql
1CREATE VIEW func.viewOfMyTable
2AS
3SELECT
4 [time],
5 [propertyA],
6 [propertyB],
7 ...
8FROM ext.myTable
9WHERE <falls nötig Bedingung hier einfügen>;
10GO

Berechtigungen für den Service Principal vergeben

Dies ist eine Konfiguration, um in CI/CD aus der Pipeline heraus Datenbankberechtigungen zu vergeben. Wenn Sie das nicht planen, können Sie diesen Abschnitt überspringen.

Verwenden Sie den Namen des in der Pipeline verwendeten SPN, erstellen Sie einen externen Benutzer und vergeben Sie Berechtigungen.

sql
1CREATE USER [pipeline-sp] FROM EXTERNAL PROVIDER;
2GO
3
4ALTER ROLE db_accessadmin ADD MEMBER [pipeline-sp];
5GO
6
7ALTER ROLE db_owner ADD MEMBER [pipeline-sp];
8GO

Der Managed Identity des Synapse-Workspaces die Entra-ID-Rolle „Directory Readers“ geben

Wenn Sie Berechtigungen dynamisch an Functions vergeben möchten, können Sie den Function-Namen angeben. Der Workspace sucht dann automatisch die Managed-Identity-ObjectId des entsprechenden Function-Namens und vergibt die Berechtigungen. Dafür muss der Workspace selbst die Rolle „Directory Readers“ haben.

Fügen Sie die Rolle unter Entra ID > Rollen und Administratoren > Directory Readers > Zuweisungen hinzufügen der Workspace-Identity hinzu. Die ObjectId der Managed Identity des Workspaces können Sie in Synapse Studio im Admin-Menü unter Credentials prüfen.

Berechtigungen für Functions in der Pipeline vergeben

Beim Deployen einer neuen Function vergeben Sie dieser Function Zugriff auf die Datenbank. Um dies in Azure Pipelines auszuführen, habe ich die folgenden Schritte verwendet.

  1. Ein SQL-Skript vorbereiten
  2. Den Function-Namen im Skript zur Laufzeit der Pipeline dynamisch ersetzen
  3. Das SQL mit SqlAzureDacpacDeployment@1 ausführen

Im Folgenden erkläre ich die Schritte der Reihe nach.

SQL vorbereiten

Bereiten Sie SQL wie folgt vor. Der Teil FunctionAppMiName wird später durch den Ziel-Function-Namen ersetzt.

sql
1USE [myDB];
2
3IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = N'FunctionAppMiName')
4BEGIN
5 PRINT 'Creating user [FunctionAppMiName] FROM EXTERNAL PROVIDER...';
6 CREATE USER [FunctionAppMiName] FROM EXTERNAL PROVIDER;
7END
8ELSE
9BEGIN
10 PRINT 'User [FunctionAppMiName] already exists.';
11END;
12
13IF NOT EXISTS (
14 SELECT 1
15 FROM sys.database_role_members rm
16 JOIN sys.database_principals r ON rm.role_principal_id = r.principal_id
17 JOIN sys.database_principals m ON rm.member_principal_id = m.principal_id
18 WHERE r.name = N'db_datareader'
19 AND m.name = N'FunctionAppMiName'
20)
21BEGIN
22 PRINT 'Adding [FunctionAppMiName] to db_datareader...';
23 ALTER ROLE db_datareader ADD MEMBER [FunctionAppMiName];
24 GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::WorkspaceIdentity TO [FunctionAppMiName];
25 GRANT SELECT ON OBJECT::func.viewOfMyTable TO [FunctionAppMiName];
26 DENY SELECT ON SCHEMA::ext TO [FunctionAppMiName];
27END
28ELSE
29BEGIN
30 PRINT 'Already in db_datareader.';
31END;

Kurze Erklärung der Berechtigungen:

db_datareader ist notwendig, um Daten lesen zu können. Die Referenzberechtigung auf WorkspaceIdentity habe ich hinzugefügt, weil der Zugriff auf die externe Datenquelle ohne diese Berechtigung nicht funktioniert hat. Außerdem vergebe ich Zugriff auf die View und verbiete direkte Abfragen der externen Tabelle, indem ich den Zugriff auf das Schema per DENY sperre.

Function-Namen einfügen

Ich habe die SQL-Datei mit einer PowerShell@2-Task wie folgt umgeschrieben. Jede Methode ist ok, solange Sie den Function-Namen im File ersetzen können.

bash
1- task: PowerShell@2
2 displayName: 'Generate grant-function-mi.sql from template'
3 inputs:
4 targetType: 'inline'
5 pwsh: true
6 script: |
7 $templatePath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql.template"
8 $outputPath = "$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql"
9
10 if (-not (Test-Path $templatePath)) {
11 throw "Template not found: $templatePath"
12 }
13
14 # Template einlesen
15 $content = Get-Content $templatePath -Raw
16
17 # Ersetzen (Werte über Pipeline-Variablen einfügen)
18 $content = $content -replace 'FunctionAppMiName', '$(deploy_resources.functionName)'
19
20 # Ausgabeverzeichnis erstellen, falls nicht vorhanden
21 $dir = Split-Path $outputPath
22 if (-not (Test-Path $dir)) {
23 New-Item -Path $dir -ItemType Directory -Force | Out-Null
24 }
25
26 # SQL schreiben
27 $content | Set-Content $outputPath -Encoding UTF8
28
29 Write-Host "Generated SQL file:"
30 Get-Content $outputPath

SQL ausführen

Führen Sie das zuvor umgeschriebene SQL aus. Das geht z. B. mit einer Task wie der folgenden.

bash
1- task: SqlAzureDacpacDeployment@1
2 displayName: 'Grant db_datareader to Function MI (Synapse serverless)'
3 inputs:
4 # Azure-Verbindung (Name der Service Connection)
5 azureSubscription: '${{parameters.azureServiceConnection}}'
6
7 # Synapse serverless SQL endpoint
8 ServerName: '${{parameters.synapseWorkspaceName}}.sql.azuresynapse.net'
9 DatabaseName: 'myDB'
10
11 # Authentifizierung (über das SPN der Service Connection)
12 SqlUserName: '' # Kann leer bleiben
13 SqlPassword: '' # Kann leer bleiben
14 AuthenticationType: 'servicePrincipal'
15
16 # Ausführungsmodus: SQL-Skript (nicht DACPAC)
17 deployType: 'SqlTask'
18 SqlFile: '$(System.DefaultWorkingDirectory)/drop/sql/grant-function-mi.sql' # Pfad zur oben erzeugten .sql-Datei
19
20 # Optionen (bei Bedarf)
21 IpDetectionMethod: 'AutoDetect'

In meinem Fall habe ich SqlAzureDacpacDeployment@1 verwendet, aber diese Methode kann den Function-Namen nicht als Parameter übergeben. Daher wird das SQL vorher umgeschrieben, sodass der Function-Name eingebettet ist, und anschließend wird die Task ausgeführt.

Wichtig ist, dass Sie der Function wie im SQL-File gezeigt die Berechtigungen geben. Das genaue Vorgehen kann abweichen.

Abfrage aus Functions

Von Functions aus senden Sie eine Abfrage an die View wie folgt. In .NET verwende ich das Paket Microsoft.Data.SqlClient.

csharp
1var queryString = @"
2SELECT TOP (@maxItemCount)
3 [time],
4 [requestUri]
5 -- ...
6FROM func.viewOfMyTable
7WHERE
8 [time] >= @startDate
9 AND [time] <= @endDate
10ORDER BY
11 [time] DESC;
12";

Zusammenfassung

Um aus Functions abzufragen, müssen Sie der Function Berechtigungen geben. Wenn Sie Berechtigungen über eine CI/CD-Pipeline hinzufügen möchten, brauchen Sie zusätzlich passende Berechtigungen für den SPN, was etwas komplizierter werden kann. Da es sowohl um Managed-Identity-Berechtigungen als auch um Datenbankzugriffsrechte geht, achten Sie darauf, diese nicht zu verwechseln.

In diesem Artikel haben wir eine externe Datenquelle und eine View erstellt. Es wäre auch möglich, aus Functions direkt per OPENROWSET auf die Daten zuzugreifen, aber das würde typischerweise breitere Berechtigungen für die Function erfordern, daher habe ich diesen Weg nicht gewählt.

Teilen:

Verwandte Artikel