diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index f44c03c5..00000000 Binary files a/.DS_Store and /dev/null differ diff --git a/.github/workflows/adf-cicd.yml b/.github/workflows/adf-cicd.yml index a931a659..10852bb1 100644 --- a/.github/workflows/adf-cicd.yml +++ b/.github/workflows/adf-cicd.yml @@ -21,7 +21,7 @@ jobs: - name: Setup Node.js environment uses: actions/setup-node@v3.4.1 with: - node-version: 18.x + node-version: 22.x - name: install ADF Utilities package run: npm install diff --git a/.gitignore b/.gitignore index 4b9c4fbd..a614dfe6 100644 --- a/.gitignore +++ b/.gitignore @@ -260,6 +260,12 @@ paket-files/ # PyTest coverage report .coverage +# Ignore Python cache files +**/*/__pycache__/* +**/*/__pycache__/ +**/*.py[cod] +**/*.pyc + # Function App Local Publish files publishFunctions funcapp.zip diff --git a/infrastructure/deployment/check_params_from_file.ps1 b/infrastructure/deployment/check_params_from_file.ps1 new file mode 100644 index 00000000..5a94500c --- /dev/null +++ b/infrastructure/deployment/check_params_from_file.ps1 @@ -0,0 +1,23 @@ +#Assigining the parameters for the environment +param( + [Parameter(Mandatory=$false)] + [string] $parametersFile +) + +# Read all lines, ignore comments and 'using' statements +$lines = Get-Content $parametersFile | Where-Object { + $_ -notmatch '^\s*//' -and $_ -notmatch '^\s*using' +} + +# Extract param name and value +$params = foreach ($line in $lines) { + if ($line -match 'param\s+(\w+)\s*=\s*(.+)') { + [PSCustomObject]@{ + Name = $matches[1] + Value = $matches[2].Trim() + } + } +} + +# Render as table +$params | Format-Table -AutoSize \ No newline at end of file diff --git a/infrastructure/deployment/deploy_azure_functions.ps1 b/infrastructure/deployment/deploy_azure_functions.ps1 index 44654912..de7484a7 100644 --- a/infrastructure/deployment/deploy_azure_functions.ps1 +++ b/infrastructure/deployment/deploy_azure_functions.ps1 @@ -7,7 +7,10 @@ param( [string] $resourceGroupName, [Parameter(Mandatory=$true)] - [string] $functionAppName + [string] $functionAppName, + + [Parameter(Mandatory=$true)] + [string] $keyVaultName ) # This command cleans the build output of the specified project using the Release configuration. diff --git a/infrastructure/deployment/deploy_data_factory_components.ps1 b/infrastructure/deployment/deploy_data_factory_components.ps1 index 7b06e204..5d02b5ba 100644 --- a/infrastructure/deployment/deploy_data_factory_components.ps1 +++ b/infrastructure/deployment/deploy_data_factory_components.ps1 @@ -3,6 +3,9 @@ param( [Parameter(Mandatory=$true)] [string] $tenantId, + [Parameter(Mandatory=$true)] + [string] $subscriptionId, + [Parameter(Mandatory=$true)] [string] $location, @@ -12,8 +15,11 @@ param( [Parameter(Mandatory=$true)] [string] $dataFactoryName ) + # Modules # Install-Module -Name "Az" +# az login --tenant $tenantId + # Import-Module -Name "Az" # Install-Module -Name "Az.DataFactory" @@ -25,13 +31,16 @@ Import-Module -Name azure.datafactory.tools # Get Deployment Objects and Params files $scriptPath = (Join-Path -Path (Get-Location) -ChildPath "src/azure.datafactory") + $scriptPath = (Get-Location).Path -replace 'infrastructure\\deployment','' + $scriptPath += "\src\azure.datafactory" + $options = New-AdfPublishOption $options.CreateNewInstance = $false # New ADF workspace deployment not required. $options.Excludes.Add("trigger.*","") $options.Excludes.Add("factory.*","") - -Publish-AdfV2FromJson -RootFolder "$scriptPath" -ResourceGroupName "$resourceGroupName" -DataFactoryName "$dataFactoryName" -Location "$location" -Option $options -Stage "install" +Set-AzContext -Subscription $subscriptionId +Publish-AdfV2FromJson -RootFolder "$scriptPath" -ResourceGroupName "$resourceGroupName" -DataFactoryName "$dataFactoryName" -Location "$location" -Option $options -Stage "install" \ No newline at end of file diff --git a/infrastructure/deployment/deploy_wrapper.ps1 b/infrastructure/deployment/deploy_wrapper.ps1 index c0c91ed3..6768a759 100644 --- a/infrastructure/deployment/deploy_wrapper.ps1 +++ b/infrastructure/deployment/deploy_wrapper.ps1 @@ -19,6 +19,22 @@ param( # Login to the Azure Tenant # az login --tenant $tenantId +$currentLocation = Split-Path -Path $MyInvocation.MyCommand.Path -Parent +$checkParamsScript = $currentLocation + '\check_params_from_file.ps1' +& $checkParamsScript ` + -parametersFile $parametersFile + +$acceptInput = Read-Host "Are the utilised parameters correct? (Y) yes, (all other input) no. Press enter to confirm." + +if ( $acceptInput.ToUpper() -eq "Y") +{ + Write-Host "Proceed with deployment" +} +else { + Write-Host "Cancelling deployment" + exit +} + # DEMO: Start a timer $processTimerStart = [System.Diagnostics.Stopwatch]::StartNew() @@ -28,7 +44,6 @@ $bicepDeployment = az deployment sub create ` --location $location ` --template-file $templateFile ` --parameters $parametersFile ` - # --what-if | ConvertFrom-Json # Save Outputs of reusable details from BiCep for other scripts @@ -48,8 +63,7 @@ $sqlDatabaseName = $bicepDeployment.properties.outputs.sqlDatabaseName.value $currentLocation = Split-Path -Path $MyInvocation.MyCommand.Path -Parent # Get Subscription Id from Name -$subscriptionDetails = az account subscription list | ConvertFrom-Json -$subscriptionIdValue = $subscriptionDetails.subscriptionId +$subscriptionIdValue = az account list --query "[?name=='${subscriptionId}'].id" --output tsv # Grant User Key Vault Secret Administrator RBAC to save Function App Key to KV $userDetails = az ad signed-in-user show | ConvertFrom-Json @@ -67,7 +81,8 @@ $deployAzureFunctionsScript = $currentLocation + '\deploy_azure_functions.ps1' & $deployAzureFunctionsScript ` -currentLocation $currentLocation ` -resourceGroupName $resourceGroupName ` - -functionAppName $functionAppName + -functionAppName $functionAppName ` + -keyVaultName $keyVaultName # Set environment variables for Data Factory LS deployments: @@ -82,6 +97,7 @@ $Env:KEYVAULT = $keyVaultName $deployDataFactoryComponentsScript = $currentLocation + '\deploy_data_factory_components.ps1' & $deployDataFactoryComponentsScript ` -tenantId $tenantId ` + -subscriptionId $subscriptionId ` -location $location ` -resourceGroupName $resourceGroupName ` -dataFactoryName $dataFactoryName diff --git a/src/azure.databricks/python/notebooks/__pycache__/__init__.cpython-311.pyc b/src/azure.databricks/python/notebooks/__pycache__/__init__.cpython-311.pyc deleted file mode 100644 index cd9d7cc6..00000000 Binary files a/src/azure.databricks/python/notebooks/__pycache__/__init__.cpython-311.pyc and /dev/null differ diff --git a/src/azure.databricks/python/notebooks/ingest/__pycache__/__init__.cpython-311.pyc b/src/azure.databricks/python/notebooks/ingest/__pycache__/__init__.cpython-311.pyc deleted file mode 100644 index 0b79fd0c..00000000 Binary files a/src/azure.databricks/python/notebooks/ingest/__pycache__/__init__.cpython-311.pyc and /dev/null differ diff --git a/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/ConfigurePayloadVariables.cpython-311.pyc b/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/ConfigurePayloadVariables.cpython-311.pyc deleted file mode 100644 index 8a023566..00000000 Binary files a/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/ConfigurePayloadVariables.cpython-311.pyc and /dev/null differ diff --git a/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/CreateMergeQuery.cpython-311.pyc b/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/CreateMergeQuery.cpython-311.pyc deleted file mode 100644 index 1ac265d8..00000000 Binary files a/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/CreateMergeQuery.cpython-311.pyc and /dev/null differ diff --git a/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/__init__.cpython-311.pyc b/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/__init__.cpython-311.pyc deleted file mode 100644 index f02e3303..00000000 Binary files a/src/azure.databricks/python/notebooks/ingest/utils/__pycache__/__init__.cpython-311.pyc and /dev/null differ diff --git a/src/azure.databricks/python/notebooks/utils/__pycache__/CheckPayloadFunctions.cpython-311.pyc b/src/azure.databricks/python/notebooks/utils/__pycache__/CheckPayloadFunctions.cpython-311.pyc deleted file mode 100644 index 37209f53..00000000 Binary files a/src/azure.databricks/python/notebooks/utils/__pycache__/CheckPayloadFunctions.cpython-311.pyc and /dev/null differ diff --git a/src/azure.databricks/python/notebooks/utils/__pycache__/__init__.cpython-311.pyc b/src/azure.databricks/python/notebooks/utils/__pycache__/__init__.cpython-311.pyc deleted file mode 100644 index 554857b6..00000000 Binary files a/src/azure.databricks/python/notebooks/utils/__pycache__/__init__.cpython-311.pyc and /dev/null differ diff --git a/src/azure.databricks/python/tests/__pycache__/__init__.cpython-311.pyc b/src/azure.databricks/python/tests/__pycache__/__init__.cpython-311.pyc deleted file mode 100644 index 3e2910ac..00000000 Binary files a/src/azure.databricks/python/tests/__pycache__/__init__.cpython-311.pyc and /dev/null differ diff --git a/src/azure.databricks/python/tests/__pycache__/test_CreateDeltaObjects.cpython-311-pytest-7.2.2.pyc b/src/azure.databricks/python/tests/__pycache__/test_CreateDeltaObjects.cpython-311-pytest-7.2.2.pyc deleted file mode 100644 index c31a389b..00000000 Binary files a/src/azure.databricks/python/tests/__pycache__/test_CreateDeltaObjects.cpython-311-pytest-7.2.2.pyc and /dev/null differ diff --git a/src/azure.databricks/python/tests/__pycache__/test_CreateMergeQuery.cpython-311-pytest-7.2.2.pyc b/src/azure.databricks/python/tests/__pycache__/test_CreateMergeQuery.cpython-311-pytest-7.2.2.pyc deleted file mode 100644 index cd43294a..00000000 Binary files a/src/azure.databricks/python/tests/__pycache__/test_CreateMergeQuery.cpython-311-pytest-7.2.2.pyc and /dev/null differ diff --git a/src/azure.databricks/python/tests/__pycache__/test_HelperFunctions.cpython-311-pytest-7.2.2.pyc b/src/azure.databricks/python/tests/__pycache__/test_HelperFunctions.cpython-311-pytest-7.2.2.pyc deleted file mode 100644 index 0e2d3175..00000000 Binary files a/src/azure.databricks/python/tests/__pycache__/test_HelperFunctions.cpython-311-pytest-7.2.2.pyc and /dev/null differ diff --git a/src/azure.databricks/python/tests/__pycache__/test_IngestCheckPayloadFunctions.cpython-311-pytest-7.2.2.pyc b/src/azure.databricks/python/tests/__pycache__/test_IngestCheckPayloadFunctions.cpython-311-pytest-7.2.2.pyc deleted file mode 100644 index 7aed256c..00000000 Binary files a/src/azure.databricks/python/tests/__pycache__/test_IngestCheckPayloadFunctions.cpython-311-pytest-7.2.2.pyc and /dev/null differ diff --git a/src/azure.databricks/python/tests/__pycache__/test_IngestConfigurePayloadVariables.cpython-311-pytest-7.2.2.pyc b/src/azure.databricks/python/tests/__pycache__/test_IngestConfigurePayloadVariables.cpython-311-pytest-7.2.2.pyc deleted file mode 100644 index 6de058c8..00000000 Binary files a/src/azure.databricks/python/tests/__pycache__/test_IngestConfigurePayloadVariables.cpython-311-pytest-7.2.2.pyc and /dev/null differ diff --git a/src/azure.databricks/python/tests/__pycache__/test_WriteToDelta.cpython-311-pytest-7.2.2.pyc b/src/azure.databricks/python/tests/__pycache__/test_WriteToDelta.cpython-311-pytest-7.2.2.pyc deleted file mode 100644 index cd787c56..00000000 Binary files a/src/azure.databricks/python/tests/__pycache__/test_WriteToDelta.cpython-311-pytest-7.2.2.pyc and /dev/null differ diff --git a/src/azure.datafactory/dataset/Ingest_DS_Amazon_S3_Parquet.json b/src/azure.datafactory/dataset/Ingest_DS_Amazon_S3_Parquet.json new file mode 100644 index 00000000..a00a632c --- /dev/null +++ b/src/azure.datafactory/dataset/Ingest_DS_Amazon_S3_Parquet.json @@ -0,0 +1,67 @@ +{ + "name": "Ingest_DS_Amazon_S3_Parquet", + "properties": { + "linkedServiceName": { + "referenceName": "Ingest_LS_AmazonS3_AKAuth", + "type": "LinkedServiceReference", + "parameters": { + "ServiceURL": { + "value": "@dataset().DSServiceURL", + "type": "Expression" + }, + "SecretKeyName": { + "value": "@dataset().DSSecretKeyName", + "type": "Expression" + }, + "AccessKeyId": { + "value": "@dataset().DSAccessKeyId", + "type": "Expression" + } + } + }, + "parameters": { + "DSAccessKeyId": { + "type": "string" + }, + "DSSecretKeyName": { + "type": "string" + }, + "DSServiceURL": { + "type": "string" + }, + "DSBucket": { + "type": "string" + }, + "DSDirectory": { + "type": "string" + }, + "DSFileName": { + "type": "string" + } + }, + "folder": { + "name": "Ingest" + }, + "annotations": [], + "type": "Parquet", + "typeProperties": { + "location": { + "type": "AmazonS3Location", + "bucketName": { + "value": "@dataset().DSBucket", + "type": "Expression" + }, + "folderPath": { + "value": "@dataset().DSDirectory", + "type": "Expression" + }, + "fileName": { + "value": "@dataset().DSFileName", + "type": "Expression" + } + }, + "compressionCodec": "snappy" + }, + "schema": [] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/dataset/Ingest_DS_Oracle_SIDAuth.json b/src/azure.datafactory/dataset/Ingest_DS_Oracle_SIDAuth.json deleted file mode 100644 index 5e26a640..00000000 --- a/src/azure.datafactory/dataset/Ingest_DS_Oracle_SIDAuth.json +++ /dev/null @@ -1,54 +0,0 @@ -{ - "name": "Ingest_DS_Oracle_SIDAuth", - "properties": { - "linkedServiceName": { - "referenceName": "Ingest_LS_Oracle_SIDAuth", - "type": "LinkedServiceReference", - "parameters": { - "LSHostName": { - "value": "@dataset().DSHostname", - "type": "Expression" - }, - "LSUsername": { - "value": "@dataset().DSUsername", - "type": "Expression" - }, - "LSOracleSid": { - "value": "@dataset().DSOracleSid", - "type": "Expression" - }, - "LSPortNumber": { - "value": "@dataset().DSPortNumber", - "type": "Expression" - }, - "LSPassword": { - "value": "@dataset().DSPassword", - "type": "Expression" - } - } - }, - "parameters": { - "DSHostname": { - "type": "string" - }, - "DSUsername": { - "type": "string" - }, - "DSOracleSid": { - "type": "string" - }, - "DSPortNumber": { - "type": "string" - }, - "DSPassword": { - "type": "string" - } - }, - "folder": { - "name": "Ingest" - }, - "annotations": [], - "type": "OracleTable", - "schema": [] - } -} \ No newline at end of file diff --git a/src/azure.datafactory/linkedService/Common_LS_cumulusdatabase.json b/src/azure.datafactory/linkedService/Common_LS_cumulusdatabase.json index 5e904e8f..0d955af4 100644 --- a/src/azure.datafactory/linkedService/Common_LS_cumulusdatabase.json +++ b/src/azure.datafactory/linkedService/Common_LS_cumulusdatabase.json @@ -9,7 +9,7 @@ ], "type": "AzureSqlDatabase", "typeProperties": { - "connectionString": "Integrated Security=False;Encrypt=True;Connection Timeout=30;Data Source=cfcdemodevsqldbuks01.database.windows.net;Initial Catalog=metadata-db" + "connectionString": "Integrated Security=False;Encrypt=True;Connection Timeout=30;Data Source=cfcumulusdevsqluks25.database.windows.net;Initial Catalog=cfcumulusdevsqldbuks25" } } } \ No newline at end of file diff --git a/src/azure.datafactory/linkedService/Ingest_LS_AmazonS3_AKAuth.json b/src/azure.datafactory/linkedService/Ingest_LS_AmazonS3_AKAuth.json new file mode 100644 index 00000000..1b0d36fd --- /dev/null +++ b/src/azure.datafactory/linkedService/Ingest_LS_AmazonS3_AKAuth.json @@ -0,0 +1,34 @@ +{ + "name": "Ingest_LS_AmazonS3_AKAuth", + "properties": { + "parameters": { + "ServiceURL": { + "type": "string" + }, + "SecretKeyName": { + "type": "string" + }, + "AccessKeyId": { + "type": "string" + } + }, + "annotations": [], + "type": "AmazonS3", + "typeProperties": { + "serviceUrl": "@{linkedService().ServiceURL}", + "accessKeyId": "@{linkedService().AccessKeyId}", + "secretAccessKey": { + "type": "AzureKeyVaultSecret", + "store": { + "referenceName": "Common_LS_cumuluskeys", + "type": "LinkedServiceReference" + }, + "secretName": { + "value": "@linkedService().SecretKeyName", + "type": "Expression" + } + }, + "authenticationType": "AccessKey" + } + } +} \ No newline at end of file diff --git a/src/azure.datafactory/linkedService/Ingest_LS_Oracle_SIDAuth.json b/src/azure.datafactory/linkedService/Ingest_LS_Oracle_SIDAuth.json deleted file mode 100644 index 163e7f89..00000000 --- a/src/azure.datafactory/linkedService/Ingest_LS_Oracle_SIDAuth.json +++ /dev/null @@ -1,39 +0,0 @@ -{ - "name": "Ingest_LS_Oracle_SIDAuth", - "properties": { - "parameters": { - "LSHostname": { - "type": "string" - }, - "LSUsername": { - "type": "string" - }, - "LSOracleSid": { - "type": "string" - }, - "LSPortNumber": { - "type": "string" - }, - "LSPassword": { - "type": "string" - } - }, - "annotations": [], - "type": "Oracle", - "typeProperties": { - "connectionString": "host=@{linkedService().LSHostname};port=@{linkedService().LSPortNumber};sid=@{linkedService().LSOracleSid};user id=@{linkedService().LSUsername}", - "password": { - "type": "AzureKeyVaultSecret", - "store": { - "referenceName": "Common_LS_cumuluskeys", - "type": "LinkedServiceReference" - }, - "secretName": { - "value": "@linkedService().LSPassword", - "type": "Expression" - } - } - } - }, - "type": "Microsoft.DataFactory/factories/linkedservices" -} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Ingest_PL_AmazonS3.json b/src/azure.datafactory/pipeline/Ingest_PL_AmazonS3.json new file mode 100644 index 00000000..8305b7a8 --- /dev/null +++ b/src/azure.datafactory/pipeline/Ingest_PL_AmazonS3.json @@ -0,0 +1,408 @@ +{ + "name": "Ingest_PL_AmazonS3", + "properties": { + "activities": [ + { + "name": "Get Ingest Payload", + "type": "Lookup", + "dependsOn": [], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "AzureSqlSource", + "sqlReaderStoredProcedureName": "[ingest].[GetDatasetPayload]", + "storedProcedureParameters": { + "DatasetId": { + "type": "Int16", + "value": { + "value": "@pipeline().parameters.DatasetId", + "type": "Expression" + } + } + }, + "queryTimeout": "02:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "GetSetMetadata", + "type": "DatasetReference" + }, + "firstRowOnly": true + } + }, + { + "name": "Set Run DateTime", + "type": "SetVariable", + "dependsOn": [], + "policy": { + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "variableName": "LocalRunDateTime", + "value": { + "value": "@if(equals(pipeline().parameters.RunDateTime,' '),string(utcnow()),pipeline().parameters.RunDateTime)", + "type": "Expression" + } + } + }, + { + "name": "Set Target Path", + "type": "SetVariable", + "dependsOn": [ + { + "activity": "Set Run DateTime", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "variableName": "TargetPath", + "value": { + "value": "@formatDateTime(variables('LocalRunDateTime'), '\\ye\\ar=yyyy/\\mon\\t\\h=MM/\\d\\a\\y=dd/\\hour=HH')", + "type": "Expression" + } + } + }, + { + "name": "Set LoadType", + "description": "Set the Data Load type:\nIncremental Load = 1\nFull Load = 0", + "type": "SetVariable", + "dependsOn": [ + { + "activity": "Get Ingest Payload", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "variableName": "LoadType", + "value": { + "value": "@activity('Get Ingest Payload').output.firstRow.LoadAction", + "type": "Expression" + } + } + }, + { + "name": "Set Directory Path", + "type": "SetVariable", + "dependsOn": [ + { + "activity": "Set Target Path", + "dependencyConditions": [ + "Succeeded" + ] + }, + { + "activity": "Set LoadType", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "variableName": "DirectoryName", + "value": { + "value": "@concat(\n activity('Get Ingest Payload').output.firstRow.ConnectionDisplayName,\n '\\',\n activity('Get Ingest Payload').output.firstRow.DatasetDisplayName,\n '\\',\n 'version=',\n activity('Get Ingest Payload').output.firstRow.VersionNumber,\n '\\',\n variables('LoadType'),\n '\\',\n variables('TargetPath')\n )", + "type": "Expression" + } + } + }, + { + "name": "Amazon S3 Type", + "type": "Switch", + "dependsOn": [ + { + "activity": "Set Directory Path", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "on": { + "value": "@replace(activity('Get Ingest Payload').output.firstRow.LinkedServiceName,'Ingest_LS_','')", + "type": "Expression" + }, + "cases": [ + { + "value": "AmazonS3_AKAuth", + "activities": [ + { + "name": "Fetch Amazon S3 Auth Username", + "type": "WebActivity", + "dependsOn": [], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": true, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "method": "GET", + "url": { + "value": "@concat(activity('Get Ingest Payload').output.firstRow.Username,'?api-version=7.0')", + "type": "Expression" + }, + "authentication": { + "type": "MSI", + "resource": "https://vault.azure.net" + } + } + }, + { + "name": "AmazonS3 AKAuth Copy", + "type": "Copy", + "dependsOn": [ + { + "activity": "Fetch Amazon S3 Auth Username", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "ParquetSource", + "additionalColumns": [ + { + "name": "PipelineRunId", + "value": { + "value": "@pipeline().RunId", + "type": "Expression" + } + }, + { + "name": "PipelineExecutionDateTime", + "value": { + "value": "@utcNow()", + "type": "Expression" + } + } + ], + "storeSettings": { + "type": "AmazonS3ReadSettings", + "recursive": true, + "enablePartitionDiscovery": false + }, + "formatSettings": { + "type": "ParquetReadSettings" + } + }, + "sink": { + "type": "ParquetSink", + "storeSettings": { + "type": "AzureBlobFSWriteSettings", + "copyBehavior": "PreserveHierarchy" + }, + "formatSettings": { + "type": "ParquetWriteSettings" + } + }, + "enableStaging": false, + "translator": { + "type": "TabularTranslator", + "typeConversion": true, + "typeConversionSettings": { + "allowDataTruncation": true, + "treatBooleanAsNumber": false + } + } + }, + "inputs": [ + { + "referenceName": "Ingest_DS_Amazon_S3_Parquet", + "type": "DatasetReference", + "parameters": { + "DSAccessKeyId": { + "value": "@activity('Fetch Amazon S3 Auth Username').output.value", + "type": "Expression" + }, + "DSSecretKeyName": { + "value": "@activity('Get Ingest Payload').output.firstRow.KeyVaultSecret", + "type": "Expression" + }, + "DSServiceURL": { + "value": "@activity('Get Ingest Payload').output.firstRow.ConnectionLocation", + "type": "Expression" + }, + "DSBucket": { + "value": "@activity('Get Ingest Payload').output.firstRow.SourceLocation", + "type": "Expression" + }, + "DSDirectory": { + "value": "@activity('Get Ingest Payload').output.firstRow.SourcePath", + "type": "Expression" + }, + "DSFileName": { + "value": "@activity('Get Ingest Payload').output.firstRow.SourceName", + "type": "Expression" + } + } + } + ], + "outputs": [ + { + "referenceName": "Ingest_DS_DataLake_Parquet", + "type": "DatasetReference", + "parameters": { + "DSStorageName": { + "value": "@activity('Get Ingest Payload').output.firstRow.TargetStorageName", + "type": "Expression" + }, + "DSContainerName": { + "value": "@activity('Get Ingest Payload').output.firstRow.TargetStorageContainer", + "type": "Expression" + }, + "DSDirectoryName": { + "value": "@variables('DirectoryName')", + "type": "Expression" + }, + "DSFileName": { + "value": "@activity('Get Ingest Payload').output.firstRow.DatasetDisplayName", + "type": "Expression" + } + } + } + ] + } + ] + } + ], + "defaultActivities": [ + { + "name": "Supported Linked Service Type", + "type": "Fail", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "message": { + "value": "@concat('The Linked Service type and authentication combination is not currently supported.')", + "type": "Expression" + }, + "errorCode": "16" + } + } + ] + } + }, + { + "name": "Update Metadata Load Status", + "type": "SqlServerStoredProcedure", + "dependsOn": [ + { + "activity": "Amazon S3 Type", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "storedProcedureName": "[ingest].[SetIngestLoadStatus]", + "storedProcedureParameters": { + "DatasetId": { + "value": { + "value": "@pipeline().parameters.DatasetId", + "type": "Expression" + }, + "type": "Int32" + }, + "IngestStage": { + "value": "Raw", + "type": "String" + }, + "LoadType": { + "value": { + "value": "@activity('Get Ingest Payload').output.firstRow.LoadType", + "type": "Expression" + }, + "type": "String" + }, + "FileLoadDateTime": { + "value": { + "value": "@variables('LocalRunDateTime')", + "type": "Expression" + }, + "type": "DateTime" + } + } + }, + "linkedServiceName": { + "referenceName": "Common_LS_cumulusdatabase", + "type": "LinkedServiceReference" + } + } + ], + "parameters": { + "DatasetId": { + "type": "int" + }, + "RunDateTime": { + "type": "string", + "defaultValue": " " + } + }, + "variables": { + "LocalRunDateTime": { + "type": "String" + }, + "TargetPath": { + "type": "String" + }, + "DirectoryName": { + "type": "String" + }, + "LoadType": { + "type": "String" + } + }, + "folder": { + "name": "Cumulus.Ingest" + }, + "annotations": [] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Ingest_PL_Oracle.json b/src/azure.datafactory/pipeline/Ingest_PL_Oracle.json index 1a986638..dbc6545c 100644 --- a/src/azure.datafactory/pipeline/Ingest_PL_Oracle.json +++ b/src/azure.datafactory/pipeline/Ingest_PL_Oracle.json @@ -20,153 +20,6 @@ "type": "Expression" }, "cases": [ - { - "value": "Oracle_SIDAuth", - "activities": [ - { - "name": "SQLDB SQLAuth Copy", - "type": "Copy", - "dependsOn": [ - { - "activity": "Fetch SQL Auth Username", - "dependencyConditions": [ - "Succeeded" - ] - } - ], - "policy": { - "timeout": "0.12:00:00", - "retry": 0, - "retryIntervalInSeconds": 30, - "secureOutput": true, - "secureInput": false - }, - "userProperties": [], - "typeProperties": { - "source": { - "type": "OracleSource", - "additionalColumns": [ - { - "name": "PipelineRunId", - "value": { - "value": "@pipeline().RunId", - "type": "Expression" - } - }, - { - "name": "PipelineExecutionDateTime", - "value": { - "value": "@utcnow()", - "type": "Expression" - } - } - ], - "oracleReaderQuery": { - "value": "@activity('Get Ingest Payload').output.firstRow.SourceQuery", - "type": "Expression" - }, - "partitionOption": "None", - "convertDecimalToInteger": false, - "queryTimeout": "02:00:00" - }, - "sink": { - "type": "ParquetSink", - "storeSettings": { - "type": "AzureBlobFSWriteSettings" - }, - "formatSettings": { - "type": "ParquetWriteSettings" - } - }, - "enableStaging": false, - "translator": { - "type": "TabularTranslator", - "typeConversion": true, - "typeConversionSettings": { - "allowDataTruncation": true, - "treatBooleanAsNumber": false - } - } - }, - "inputs": [ - { - "referenceName": "Ingest_DS_Oracle_SIDAuth", - "type": "DatasetReference", - "parameters": { - "DSHostname": { - "value": "@activity('Get Ingest Payload').output.firstRow.ConnectionLocation", - "type": "Expression" - }, - "DSUsername": { - "value": "@activity('Fetch SQL Auth Username').output.value", - "type": "Expression" - }, - "DSOracleSid": { - "value": "@activity('Get Ingest Payload').output.firstRow.SourceLocation", - "type": "Expression" - }, - "DSPortNumber": { - "value": "@activity('Get Ingest Payload').output.firstRow.ConnectionPort", - "type": "Expression" - }, - "DSPassword": { - "value": "@last(array(split(activity('Get Ingest Payload').output.firstRow.KeyVaultSecret, '/secrets/')))", - "type": "Expression" - } - } - } - ], - "outputs": [ - { - "referenceName": "Ingest_DS_DataLake_Parquet", - "type": "DatasetReference", - "parameters": { - "DSStorageName": { - "value": "@activity('Get Ingest Payload').output.firstRow.TargetStorageName", - "type": "Expression" - }, - "DSContainerName": { - "value": "@activity('Get Ingest Payload').output.firstRow.TargetStorageContainer", - "type": "Expression" - }, - "DSDirectoryName": { - "value": "@variables('DirectoryName')", - "type": "Expression" - }, - "DSFileName": { - "value": "@activity('Get Ingest Payload').output.firstRow.DatasetDisplayName", - "type": "Expression" - } - } - } - ] - }, - { - "name": "Fetch SQL Auth Username", - "type": "WebActivity", - "dependsOn": [], - "policy": { - "timeout": "0.12:00:00", - "retry": 0, - "retryIntervalInSeconds": 30, - "secureOutput": true, - "secureInput": false - }, - "userProperties": [], - "typeProperties": { - "method": "GET", - "url": { - "value": "@concat(activity('Get Ingest Payload').output.firstRow.Username,'?api-version=7.0')", - "type": "Expression" - }, - "authentication": { - "type": "MSI", - "resource": "https://vault.azure.net" - } - } - } - ] - }, { "value": "Oracle_V2_SIDAuth", "activities": [ diff --git a/src/azure.datafactory/pipeline/Intentional Error.json b/src/azure.datafactory/pipeline/Intentional Error.json new file mode 100644 index 00000000..2cf956b5 --- /dev/null +++ b/src/azure.datafactory/pipeline/Intentional Error.json @@ -0,0 +1,88 @@ +{ + "name": "Intentional Error", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait1", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + }, + { + "name": "Raise Errors or Not", + "description": "Framework development worker simulator.", + "type": "IfCondition", + "dependsOn": [ + { + "activity": "Wait1", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "expression": { + "value": "@equals(pipeline().parameters.RaiseErrors,'true')", + "type": "Expression" + }, + "ifTrueActivities": [ + { + "name": "Call Fail Procedure", + "type": "SqlServerStoredProcedure", + "dependsOn": [], + "policy": { + "timeout": "0.00:10:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "storedProcedureName": "[dbo].[FailProcedure]", + "storedProcedureParameters": { + "RaiseError": { + "value": { + "value": "@pipeline().parameters.RaiseErrors", + "type": "Expression" + }, + "type": "String" + } + } + }, + "linkedServiceName": { + "referenceName": "Common_LS_cumulusdatabase", + "type": "LinkedServiceReference" + } + } + ] + } + } + ], + "parameters": { + "RaiseErrors": { + "type": "string", + "defaultValue": "false" + }, + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Mermaid_Visualiser.json b/src/azure.datafactory/pipeline/Mermaid_Visualiser.json new file mode 100644 index 00000000..1d028f7f --- /dev/null +++ b/src/azure.datafactory/pipeline/Mermaid_Visualiser.json @@ -0,0 +1,268 @@ +{ + "name": "Mermaid_Visualiser", + "properties": { + "activities": [ + { + "name": "GetMarkdown", + "type": "Lookup", + "dependsOn": [], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "AzureSqlSource", + "sqlReaderStoredProcedureName": "[control].[GetOrchestrationLineage]", + "storedProcedureParameters": { + "1_FilterFailedAndBlocked": { + "type": "Boolean", + "value": { + "value": "@pipeline().parameters.FilterFailedAndBlocked", + "type": "Expression" + } + }, + "BatchName": { + "type": "String", + "value": { + "value": "@pipeline().parameters.BatchName", + "type": "Expression" + } + }, + "UseStatusColours": { + "type": "Boolean", + "value": { + "value": "@pipeline().parameters.UseStatusColours", + "type": "Expression" + } + } + }, + "queryTimeout": "02:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "GetSetMetadata", + "type": "DatasetReference" + } + } + }, + { + "name": "GetVisualiser", + "type": "Lookup", + "dependsOn": [], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": true, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "AzureSqlSource", + "sqlReaderQuery": "EXEC control.GetVisualiser;", + "queryTimeout": "02:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "GetSetMetadata", + "type": "DatasetReference" + } + } + }, + { + "name": "SwitchMarkdownTarget", + "type": "Switch", + "dependsOn": [ + { + "activity": "GetMarkdown", + "dependencyConditions": [ + "Succeeded" + ] + }, + { + "activity": "GetVisualiser", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "on": { + "value": "@activity('GetVisualiser').output.firstRow.MarkdownTargetName", + "type": "Expression" + }, + "cases": [ + { + "value": "AzureDevOps", + "activities": [ + { + "name": "GetADOToken", + "type": "WebActivity", + "dependsOn": [], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": true, + "secureInput": true + }, + "userProperties": [], + "typeProperties": { + "method": "GET", + "url": { + "value": "@concat(activity('GetVisualiser').output.firstRow.PATSecretName,'?api-version=7.0')", + "type": "Expression" + }, + "authentication": { + "type": "MSI", + "resource": "https://vault.azure.net" + } + } + }, + { + "name": "GetADOBaseURL", + "type": "WebActivity", + "dependsOn": [], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": true, + "secureInput": true + }, + "userProperties": [], + "typeProperties": { + "method": "GET", + "url": { + "value": "@concat(activity('GetVisualiser').output.firstRow.URLSecretName,'?api-version=7.0')", + "type": "Expression" + }, + "authentication": { + "type": "MSI", + "resource": "https://vault.azure.net" + } + } + }, + { + "name": "TempDeleteWiki", + "type": "WebActivity", + "dependsOn": [ + { + "activity": "GetADOToken", + "dependencyConditions": [ + "Succeeded" + ] + }, + { + "activity": "GetADOBaseURL", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": true, + "secureInput": true + }, + "userProperties": [], + "typeProperties": { + "method": "DELETE", + "headers": { + "Authorization": { + "value": "@concat('Bearer ',activity('GetADOToken').output.value)", + "type": "Expression" + } + }, + "url": { + "value": "@concat(activity('GetADOBaseURL').output.value,'/pages?path=/','Mermaid','&api-version=7.1')", + "type": "Expression" + } + } + }, + { + "name": "PopulateWiki", + "type": "WebActivity", + "dependsOn": [ + { + "activity": "TempDeleteWiki", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": true, + "secureInput": true + }, + "userProperties": [], + "typeProperties": { + "method": "PUT", + "headers": { + "Authorization": { + "value": "@concat('Bearer ',activity('GetADOToken').output.value)", + "type": "Expression" + } + }, + "url": { + "value": "@concat(activity('GetADOBaseURL').output.value,'/pages/','Mermaid','?api-version=7.1')", + "type": "Expression" + }, + "body": { + "value": "@concat('{ \n \"content\": \"',activity('GetMarkdown').output.firstRow.MarkdownOutput,\n '\"}')", + "type": "Expression" + } + } + } + ] + } + ], + "defaultActivities": [ + { + "name": "InvalidMarkdownTarget", + "type": "Fail", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "message": "Invalid Markdown Target Selected", + "errorCode": "16" + } + } + ] + } + } + ], + "parameters": { + "BatchName": { + "type": "String", + "defaultValue": "Daily" + }, + "UseStatusColours": { + "type": "Bool", + "defaultValue": true + }, + "FilterFailedAndBlocked": { + "type": "Bool", + "defaultValue": false + } + }, + "folder": { + "name": "Cumulus.Utils" + }, + "annotations": [], + "lastPublishTime": "2025-11-07T12:58:30Z" + }, + "type": "Microsoft.DataFactory/factories/pipelines" +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Transform_PL_Unmanaged.json b/src/azure.datafactory/pipeline/Transform_PL_Unmanaged.json index c13778ad..0557502d 100644 --- a/src/azure.datafactory/pipeline/Transform_PL_Unmanaged.json +++ b/src/azure.datafactory/pipeline/Transform_PL_Unmanaged.json @@ -19,10 +19,10 @@ "type": "AzureSqlSource", "sqlReaderStoredProcedureName": "[transform].[GetUnmanagedNotebookPayload]", "storedProcedureParameters": { - "DatasetId": { + "NotebookId": { "type": "Int32", "value": { - "value": "@pipeline().parameters.DatasetId", + "value": "@pipeline().parameters.NotebookId", "type": "Expression" } } @@ -239,7 +239,7 @@ } ], "parameters": { - "DatasetId": { + "NotebookId": { "type": "string" } }, diff --git a/src/azure.datafactory/pipeline/Wait 1.json b/src/azure.datafactory/pipeline/Wait 1.json new file mode 100644 index 00000000..4d293bad --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 1.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 1", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait1", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 10.json b/src/azure.datafactory/pipeline/Wait 10.json new file mode 100644 index 00000000..18431cdc --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 10.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 10", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait10", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 2.json b/src/azure.datafactory/pipeline/Wait 2.json new file mode 100644 index 00000000..b150ed8c --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 2.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 2", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait2", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 3.json b/src/azure.datafactory/pipeline/Wait 3.json new file mode 100644 index 00000000..83ba0cd4 --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 3.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 3", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait3", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 4.json b/src/azure.datafactory/pipeline/Wait 4.json new file mode 100644 index 00000000..520cb4a0 --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 4.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 4", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait4", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 5.json b/src/azure.datafactory/pipeline/Wait 5.json new file mode 100644 index 00000000..6d9f7726 --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 5.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 5", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait5", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 6.json b/src/azure.datafactory/pipeline/Wait 6.json new file mode 100644 index 00000000..c08991fa --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 6.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 6", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait6", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 7.json b/src/azure.datafactory/pipeline/Wait 7.json new file mode 100644 index 00000000..5b465855 --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 7.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 7", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait7", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 8.json b/src/azure.datafactory/pipeline/Wait 8.json new file mode 100644 index 00000000..b72b4489 --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 8.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 8", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait8", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.datafactory/pipeline/Wait 9.json b/src/azure.datafactory/pipeline/Wait 9.json new file mode 100644 index 00000000..9188385c --- /dev/null +++ b/src/azure.datafactory/pipeline/Wait 9.json @@ -0,0 +1,33 @@ +{ + "name": "Wait 9", + "properties": { + "description": "Used just so the procfwk has something to call during development.", + "activities": [ + { + "name": "Wait9", + "description": "Framework development worker simulator.", + "type": "Wait", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.WaitTime", + "type": "Expression" + } + } + } + ], + "parameters": { + "WaitTime": { + "type": "int", + "defaultValue": 5 + } + }, + "folder": { + "name": "ControlDemo" + }, + "annotations": [ + "_ProcFwkWorker" + ] + } +} \ No newline at end of file diff --git a/src/azure.functionapp/azure.functionapp.csproj b/src/azure.functionapp/azure.functionapp.csproj index 2675596d..6500c28b 100644 --- a/src/azure.functionapp/azure.functionapp.csproj +++ b/src/azure.functionapp/azure.functionapp.csproj @@ -9,22 +9,22 @@ - - - - + + + + - - - - + + + + - + - - - - + + + + diff --git a/src/azure.functionapp/returns/FabricJobInstance.cs b/src/azure.functionapp/returns/FabricJobInstance.cs new file mode 100644 index 00000000..b65e5289 --- /dev/null +++ b/src/azure.functionapp/returns/FabricJobInstance.cs @@ -0,0 +1,77 @@ +using Newtonsoft.Json; + +namespace cloudformations.cumulus.returns +{ + //See: + //https://learn.microsoft.com/en-us/rest/api/fabric/core/job-scheduler/get-item-job-instance?tabs=HTTP#itemjobstatus + + public class FabricJobInstance + { + [JsonProperty("id")] + public Guid Id { get; set; } + + [JsonProperty("itemId")] + public Guid ItemId { get; set; } + + [JsonProperty("jobType")] + public string JobType { get; set; } + + [JsonProperty("invokeType")] + public string InvokeType { get; set; } + + [JsonProperty("status")] + public string JobStatus { get; set; } + + [JsonProperty("failureReason")] + public ErrorResponse? FailureReason { get; set; } + + [JsonProperty("rootActivityId")] + public Guid RootActivityId { get; set; } + + [JsonProperty("startTimeUtc")] + public DateTime? StartTimeUtc { get; set; } + + [JsonProperty("endTimeUtc")] + public DateTime? EndTimeUtc { get; set; } + } + + public class ErrorResponse + { + [JsonProperty("errorCode")] + public string? ErrorCode { get; set; } // A short error code (e.g., "BadRequest", "NotFound") + + [JsonProperty("message")] + public string? Message { get; set; } // A human-readable error message + + [JsonProperty("requestId")] + public Guid RequestId { get; set; } + + [JsonProperty("moreDetails")] + public ErrorResponseDetails? MoreDetails { get; set; } // Optional detailed description + + [JsonProperty("relatedResource")] + public ErrorRelatedResource? RelatedResource { get; set; } + } + + public class ErrorResponseDetails + { + [JsonProperty("errorCode")] + public string? ErrorCode { get; set; } + + [JsonProperty("message")] + public string? Message { get; set; } + + [JsonProperty("relatedResource")] + public ErrorRelatedResource? RelatedResource { get; set; } + } + + public class ErrorRelatedResource + { + [JsonProperty("resourceId")] + public string? ResourceId { get; set; } + + [JsonProperty("resourceType")] + public string? ResourceType { get; set; } + } + +} diff --git a/src/azure.functionapp/returns/PipelineRunStatus.cs b/src/azure.functionapp/returns/PipelineRunStatus.cs index 3e70af64..ba7031e8 100644 --- a/src/azure.functionapp/returns/PipelineRunStatus.cs +++ b/src/azure.functionapp/returns/PipelineRunStatus.cs @@ -23,6 +23,8 @@ private static string ConvertPipelineStatus(string actualStatus) { string simpleStatus = actualStatus switch { + "Not Started" => Running, + "Not started" => Running, //microsoft inconsistency "Queued" => Running, "InProgress" => Running, "Canceling" => Running, //microsoft typo diff --git a/src/azure.functionapp/services/MicrosoftFabricService.cs b/src/azure.functionapp/services/MicrosoftFabricService.cs index 08dcc0b1..f4b3ec2b 100644 --- a/src/azure.functionapp/services/MicrosoftFabricService.cs +++ b/src/azure.functionapp/services/MicrosoftFabricService.cs @@ -1,15 +1,13 @@ -using Azure; -using Azure.Identity; -using Azure.ResourceManager; +using Azure.Analytics.Synapse.Artifacts.Models; using cloudformations.cumulus.helpers; using cloudformations.cumulus.returns; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using Microsoft.Identity.Client; using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using System.Net; using System.Net.Http.Headers; -using Windows.Media.Protection.PlayReady; +using System.Text; +using System.Threading; + namespace cloudformations.cumulus.services { @@ -19,149 +17,386 @@ public class MicrosoftFabricService : PipelineService private FabricClient fabricClient; private HttpClient pbiApiClient; private Task fabApiClient; + private readonly HttpClient generalClient; private string workspaceId; private string pipelineId; + private string bearerToken; public MicrosoftFabricService(PipelineRequest request, ILogger logger) { _logger = logger; - _logger.LogInformation("Creating FAB connectivity client."); + _logger.LogInformation("Creating FAB connectivity clientGeneral."); fabricClient = new FabricClient(true); pbiApiClient = fabricClient.CreatePowerBIAPIClient(); fabApiClient = fabricClient.CreateFabricAPIClient(); + workspaceId = String.Empty; pipelineId = String.Empty; - } + //Get bearer for all requests + bearerToken = fabricClient.GetBearerToken(); + _logger.LogDebug(bearerToken); + + // create a reusable client for generic Fabric REST calls (authorization header set once) + generalClient = new HttpClient(); + generalClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", bearerToken); + + if (!String.IsNullOrEmpty(request.OrchestratorName)) //Null ref handling, although already validated Nulls in PipelineRequest.cs input + { + // Resolve workspace Id (synchronous caller - use GetAwaiter().GetResult to preserve constructor signature) + workspaceId = ResolveWorkspaceIdAsync(request.OrchestratorName, CancellationToken.None).GetAwaiter().GetResult(); + + _logger.LogInformation("Resolved workspace Id: " + workspaceId); + } + } + public override PipelineRunStatus PipelineCancel(PipelineRunRequest request) { - throw new NotImplementedException(); + if (!String.IsNullOrEmpty(request.PipelineName)) //Null ref handling, although already validated Nulls in PipelineRequest.cs input + { + // Resolve pipeline id using the common async helper (kept synchronous for API compatibility) + pipelineId = ResolvePipelineIdAsync(workspaceId, request.PipelineName, CancellationToken.None) + .GetAwaiter().GetResult(); + + _logger.LogInformation("Resolved pipeline Id: " + pipelineId + " for pipeline Name: " + request.PipelineName); + } + + string jobUrl = String.Empty; + HttpResponseMessage? postResponse = null; //Declared here to support IF condition wrap for pipeline params. + FabricJobInstance? pipelineRunResponse; //Declared here to support IF condition wrap for pipeline params. + + // Use the reusable _generalClient helper to GET and deserialize the job instance + jobUrl = $"https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{pipelineId}/jobs/instances/{request.RunId}/cancel"; + + var postTask = pbiApiClient.PostAsync(jobUrl, null); + postTask.Wait(); + postResponse = postTask.Result; + + Thread.Sleep(3000); //give the job instance handler a chance to excute the cancel request before checking + + //Check status after cancel request + if (postResponse.IsSuccessStatusCode) + { + _logger.LogInformation("Data pipeline run instance response status: " + postResponse.StatusCode); + + // Use the reusable _generalClient helper to GET and deserialize the job instance + jobUrl = $"https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{pipelineId}/jobs/instances/{request.RunId}"; + + pipelineRunResponse = SendGetAndDeserializeAsync(jobUrl, CancellationToken.None) + .GetAwaiter().GetResult(); + } + else + { + var error = postResponse.Content.ReadAsStringAsync().Result; + _logger.LogError("Pipeline run creation failed: " + error); + throw new Exception($"Pipeline run creation failed: {postResponse.StatusCode}"); + } + + return new PipelineRunStatus + { + PipelineName = request.PipelineName, + ActualStatus = pipelineRunResponse.JobStatus, + RunId = request.RunId + }; } public override PipelineRunStatus PipelineExecute(PipelineRequest request) { - Console.WriteLine("Getting Fabric API bearer token."); + // Keep method synchronous by using Task.GetAwaiter().GetResult() around async helpers + try + { + if (!String.IsNullOrEmpty(request.PipelineName)) //Null ref handling, although already validated Nulls in PipelineRequest.cs input + { + pipelineId = ResolvePipelineIdAsync(workspaceId, request.PipelineName, CancellationToken.None).GetAwaiter().GetResult(); + _logger.LogInformation("Resolved pipeline Id: " + pipelineId + " for pipeline Name: " + request.PipelineName); + } - string bearerToken = fabricClient.GetBearerToken(); + StringContent content; + string pipelineRunId = String.Empty; + string postUrl = $"https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{pipelineId}/jobs/instances?jobType=Pipeline"; - _logger.LogDebug(bearerToken); + _logger.LogDebug(postUrl); - Task wsResponse; - Task pipeResponse; - Task pipeRunResponse; + Task? postTask = null; //Declared here to support IF condition wrap for pipeline params. + HttpResponseMessage? postResponse = null; //Declared here to support IF condition wrap for pipeline params. - //use Fabric API to resolve guid values from reference names before making instance call - using (var client = new HttpClient()) - { - client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", bearerToken); + if (request.PipelineParameters == null) + { + _logger.LogInformation("Creating data pipeline run instance request without parameters."); + + postTask = pbiApiClient.PostAsync(postUrl, null); + postTask.Wait(); + postResponse = postTask.Result; + } + else + { + _logger.LogInformation("Creating data pipeline run instance request with parameters."); - //resolve workspace id - _logger.LogInformation("Getting workspace Id."); + //Build dictionary to house pipeline parameters + Dictionary pipeParameters; + pipeParameters = []; - HttpRequestMessage wsRequest = new HttpRequestMessage(HttpMethod.Get, $"https://api.fabric.microsoft.com/v1/workspaces"); - wsResponse = client.SendAsync(wsRequest); - wsResponse.Wait(); + foreach (var key in request.PipelineParameters.Keys) //pipeline parameters can be null and not used + { + if (String.IsNullOrEmpty(request.PipelineParameters[key])) continue; + + _logger.LogInformation($"Adding parameter key: {key} value: {request.PipelineParameters[key]} to pipeline call."); + + pipeParameters.Add(key, request.PipelineParameters[key]); + } - _logger.LogInformation("Getting workspace response status: " + wsResponse.Result.StatusCode); + //Convert dictionary of pipeline parameters for API content call - if (wsResponse.IsCompleted) + var json = JsonConvert.SerializeObject(pipeParameters); + string wrapperBody = $" {{\r\n \"executionData\": {{\r\n \"pipelineName\": \"{request.PipelineName}\",\r\n \"parameters\": {{\r\n {json.Replace("{","").Replace("}","")} }}\r\n }}\r\n}} "; //bit of a hack, tech debt, make a class for it + content = new StringContent(wrapperBody, Encoding.UTF8, "application/json"); + + _logger.LogDebug(wrapperBody.ToString()); + + postTask = pbiApiClient.PostAsync(postUrl, content); + postTask.Wait(); + postResponse = postTask.Result; + } + + //Parse response + if (postResponse.IsSuccessStatusCode) { - FabricWorkspaces? workspaceResponse = JsonConvert.DeserializeObject(wsResponse.Result.Content.ReadAsStringAsync().Result); + _logger.LogInformation("Data pipeline run instance response status: " + postResponse.StatusCode); - if(workspaceResponse is null) + var location = postResponse.Headers.Location?.OriginalString; + if (!string.IsNullOrEmpty(location)) { - throw new Exception("FabricWorkspaces workspaceResponse is null. Check content response values from client request."); + var parts = location.Split('/').ToList(); + pipelineRunId = parts.Last(); } - else - { - workspaceId = workspaceResponse.Value - .AsQueryable() - .Where(workspace => workspace.DisplayName.Equals(request.OrchestratorName, StringComparison.OrdinalIgnoreCase)) - .Select(workspace => workspace.Id) - .First(); + } + else + { + var error = postResponse.Content.ReadAsStringAsync().Result; + _logger.LogError("Pipeline run creation failed: " + error); + throw new Exception($"Pipeline run creation failed: {postResponse.StatusCode}"); + } - _logger.LogInformation("Resolved workspace Id: " + workspaceId); - } + return new PipelineRunStatus() + { + PipelineName = request.PipelineName, + RunId = pipelineRunId, + ActualStatus = "Not Started" + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "PipelineExecute failed."); + throw; + } + } + + public override PipelineErrorDetail PipelineGetErrorDetails(PipelineRunRequest request) + { + try + { + if (!String.IsNullOrEmpty(request.PipelineName)) //Null ref handling, although already validated Nulls in PipelineRequest.cs input + { + // Resolve pipeline id using shared async helper (kept synchronous for API compatibility) + pipelineId = ResolvePipelineIdAsync(workspaceId, request.PipelineName, CancellationToken.None) + .GetAwaiter().GetResult(); + _logger.LogInformation("Resolved pipeline Id: " + pipelineId + " for pipeline Name: " + request.PipelineName); } - //resolve pipeline id - _logger.LogInformation("Getting pipeline Id."); + // Get job instance via reusable _generalClient helper + var jobUrl = $"https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{pipelineId}/jobs/instances/{request.RunId}"; + var pipelineRunResponse = SendGetAndDeserializeAsync(jobUrl, CancellationToken.None) + .GetAwaiter().GetResult(); - HttpRequestMessage pipeRequest = new HttpRequestMessage(HttpMethod.Get, $"https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items?type=DataPipeline"); - pipeResponse = client.SendAsync(pipeRequest); - pipeResponse.Wait(); + if (pipelineRunResponse is null) + throw new Exception("FabricJobInstance response was null."); - _logger.LogInformation("Getting workspace response status: " + pipeResponse.Result.StatusCode); + var output = new PipelineErrorDetail() + { + PipelineName = request.PipelineName, + ActualStatus = pipelineRunResponse.JobStatus, + RunId = request.RunId, + ResponseCount = 1 + }; - if (pipeResponse.IsCompleted) - { - FabricDataPipelines? pipelineResponse = JsonConvert.DeserializeObject(pipeResponse.Result.Content.ReadAsStringAsync().Result); + output.Errors.Add(new FailedActivity() + { + ActivityRunId = pipelineRunResponse.RootActivityId.ToString(), + ActivityName = "Unknown", + ActivityType = pipelineRunResponse.JobType ?? "Unknown", + ErrorCode = pipelineRunResponse.FailureReason?.ErrorCode, + ErrorType = pipelineRunResponse.JobType, + ErrorMessage = pipelineRunResponse.FailureReason?.Message + }); + + return output; + } + catch (Exception ex) + { + _logger.LogError(ex, "PipelineGetErrorDetails failed."); + throw; + } + } - if (pipelineResponse is null) - { - throw new Exception("FabricDataPipelines pipelineResponse is null. Check content response values from client request."); - } - else - { - pipelineId = pipelineResponse.Value - .AsQueryable() - .Where(workspace => workspace.DisplayName.Equals(request.PipelineName, StringComparison.OrdinalIgnoreCase)) - .Select(workspace => workspace.Id) - .First(); + public override PipelineRunStatus PipelineGetStatus(PipelineRunRequest request) + { + try + { + if (!String.IsNullOrEmpty(request.PipelineName)) //Null ref handling, although already validated Nulls in PipelineRequest.cs input + { + // Resolve pipeline id using the common async helper (kept synchronous for API compatibility) + pipelineId = ResolvePipelineIdAsync(workspaceId, request.PipelineName, CancellationToken.None) + .GetAwaiter().GetResult(); - _logger.LogInformation("Resolved pipeline Id: " + pipelineId); - } + _logger.LogInformation("Resolved pipeline Id: " + pipelineId + " for pipeline Name: " + request.PipelineName); } - } - _logger.LogInformation("Data pipeline run instance request."); + // Use the reusable _generalClient helper to GET and deserialize the job instance + var jobUrl = $"https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{pipelineId}/jobs/instances/{request.RunId}"; - pipeRunResponse = pbiApiClient.PostAsync($"https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{pipelineId}/jobs/instances?jobType=Pipeline",null); - pipeRunResponse.Wait(); + var pipelineRunResponse = SendGetAndDeserializeAsync(jobUrl, CancellationToken.None) + .GetAwaiter().GetResult(); - List parts; - String pipelineRunId = String.Empty; + if (pipelineRunResponse is null) + { + throw new Exception("FabricJobInstance response was null."); + } - if (pipeRunResponse.IsCompleted) + return new PipelineRunStatus + { + PipelineName = request.PipelineName, + ActualStatus = pipelineRunResponse.JobStatus, + RunId = request.RunId + }; + } + catch (Exception ex) { - _logger.LogInformation("Data pipeline run instance response status: " + pipeRunResponse.Result.StatusCode); - _logger.LogDebug(pipeRunResponse.Result.ToString()); - - string pipelineRunLocation = pipeRunResponse.Result.Headers.Location.OriginalString; - parts = pipelineRunLocation.Split('/').ToList(); - pipelineRunId = parts.Last(); + _logger.LogError(ex, "PipelineGetStatus failed."); + throw; } + } + + public override PipelineDescription PipelineValidate(PipelineRequest request) + { + _logger.LogInformation("Checking pipeline Id."); - _logger.LogInformation("Data pipeline run instance creating return details."); + try + { + if (!String.IsNullOrEmpty(request.PipelineName)) //Null ref handling, although already validated Nulls in PipelineRequest.cs input + { + // Resolve pipeline id using shared async helper (kept synchronous for API compatibility) + pipelineId = ResolvePipelineIdAsync(workspaceId, request.PipelineName, CancellationToken.None) + .GetAwaiter().GetResult(); - return new PipelineRunStatus() + _logger.LogInformation("Resolved pipeline Id: " + pipelineId); + } + + return new PipelineDescription() + { + PipelineExists = "True", + PipelineName = request.PipelineName, + PipelineId = pipelineId, + PipelineType = "Unknown", + ActivityCount = 0 //technical debt + }; + } + catch (InvalidOperationException) { - PipelineName = request.PipelineName, - RunId = pipelineRunId, - ActualStatus = "Not Started" - }; + _logger.LogInformation("Validated FAB pipeline does not exist."); + return new PipelineDescription() + { + PipelineExists = "False", + PipelineName = request.PipelineName, + PipelineId = "Unknown", + PipelineType = "Unknown", + ActivityCount = 0 //technical debt + }; + } + catch (Exception ex) + { + _logger.LogInformation(ex.Message); + _logger.LogInformation(ex.GetType().ToString()); + throw new InvalidRequestException("Failed to validate pipeline. ", ex); + } } - public override PipelineErrorDetail PipelineGetErrorDetails(PipelineRunRequest request) + private async Task SendGetAndDeserializeAsync(string url, CancellationToken ct = default) { - throw new NotImplementedException(); + var resp = await generalClient.GetAsync(url, ct).ConfigureAwait(false); + resp.EnsureSuccessStatusCode(); + var s = await resp.Content.ReadAsStringAsync(ct).ConfigureAwait(false); + return JsonConvert.DeserializeObject(s); } - public override PipelineRunStatus PipelineGetStatus(PipelineRunRequest request) + private async Task ResolveWorkspaceIdAsync(string workspaceDisplayName, CancellationToken ct = default) { - throw new NotImplementedException(); + if (string.IsNullOrWhiteSpace(workspaceDisplayName)) + throw new ArgumentException("workspaceDisplayName is required", nameof(workspaceDisplayName)); + + var url = "https://api.fabric.microsoft.com/v1/workspaces"; + var workspaceResponse = await SendGetAndDeserializeAsync(url, ct).ConfigureAwait(false); + + if (workspaceResponse is null || workspaceResponse.Value == null) + throw new Exception("Fabric Workspaces response was null."); + + var id = workspaceResponse.Value + .AsQueryable() + .Where(w => w.DisplayName.Equals(workspaceDisplayName, StringComparison.OrdinalIgnoreCase)) + .Select(w => w.Id) + .FirstOrDefault(); + + if (string.IsNullOrEmpty(id)) + throw new InvalidOperationException($"Workspace '{workspaceDisplayName}' not found."); + + return id; } - public override PipelineDescription PipelineValidate(PipelineRequest request) + private async Task ResolvePipelineIdAsync(string workspaceId, string pipelineName, CancellationToken ct = default) { - throw new NotImplementedException(); + if (string.IsNullOrWhiteSpace(workspaceId)) throw new ArgumentException(nameof(workspaceId)); + if (string.IsNullOrWhiteSpace(pipelineName)) throw new ArgumentException(nameof(pipelineName)); + + var url = $"https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items?type=DataPipeline"; + var pipelineResponse = await SendGetAndDeserializeAsync(url, ct).ConfigureAwait(false); + + if (pipelineResponse is null || pipelineResponse.Value == null) + throw new Exception("FabricDataPipelines response was null."); + + var id = pipelineResponse.Value + .AsQueryable() + .Where(p => p.DisplayName.Equals(pipelineName, StringComparison.OrdinalIgnoreCase)) + .Select(p => p.Id) + .FirstOrDefault(); + + if (string.IsNullOrEmpty(id)) + throw new InvalidOperationException($"Pipeline '{pipelineName}' not found in workspace '{workspaceId}'."); + + return id; } + public override void Dispose() { - GC.SuppressFinalize(this); + try + { + generalClient?.Dispose(); + pbiApiClient?.Dispose(); + + // if fabApiClient was created as Task keep this defensive check + if (fabApiClient != null && fabApiClient.IsCompletedSuccessfully) + { + fabApiClient.Result?.Dispose(); + } + + // if FabricClient implements IDisposable, dispose it: + (fabricClient as IDisposable)?.Dispose(); + } + finally + { + GC.SuppressFinalize(this); + } } } } diff --git a/src/metadata.common/Scripts/DefaultConnectionTypes.sql b/src/metadata.common/Scripts/DefaultConnectionTypes.sql index 1437732c..724a2fa5 100644 --- a/src/metadata.common/Scripts/DefaultConnectionTypes.sql +++ b/src/metadata.common/Scripts/DefaultConnectionTypes.sql @@ -83,3 +83,8 @@ @ConnectionTypeDisplayName = N'Jira', @SourceLanguageType = 'SQL', @Enabled = 1; + + EXEC [common].[AddConnectionType] + @ConnectionTypeDisplayName = N'Amazon S3', + @SourceLanguageType = 'NA', + @Enabled = 1; diff --git a/src/metadata.common/common/Stored Procedures/AddGenericPayloadPipelineDependencies.sql b/src/metadata.common/common/Stored Procedures/AddGenericPayloadPipelineDependencies.sql index 9757ba9c..7cec05d0 100644 --- a/src/metadata.common/common/Stored Procedures/AddGenericPayloadPipelineDependencies.sql +++ b/src/metadata.common/common/Stored Procedures/AddGenericPayloadPipelineDependencies.sql @@ -179,26 +179,26 @@ DECLARE @Dependencies TABLE ( DECLARE @DependenciesStagingTable TABLE ( PipelineId INT, StageId INT, - ParameterValue INT + ParameterValue VARCHAR(250) ) DECLARE @PipelineIdResult INT; DECLARE @DependantPipelineIdResult INT; INSERT INTO @DependenciesStagingTable (PipelineId, StageId, ParameterValue) SELECT - p.PipelineId, p.StageId, CAST(pp.ParameterValue AS INT) AS ParameterValue --,* + p.PipelineId, p.StageId, pp.ParameterValue FROM control.Pipelines AS p INNER JOIN control.PipelineParameters AS pp ON p.PipelineId = pp.PipelineId SELECT @PipelineIdResult = PipelineId FROM @DependenciesStagingTable -WHERE ParameterValue = @DatasetId +WHERE ParameterValue = CAST(@DatasetId AS VARCHAR(250)) AND StageId = @StageId SELECT @DependantPipelineIdResult = PipelineId FROM @DependenciesStagingTable -WHERE ParameterValue = @DependantDatasetId +WHERE ParameterValue = CAST(@DependantDatasetId AS VARCHAR(250)) AND StageId = @DependantStageId diff --git a/src/metadata.control/Scripts/MetadataAsCodeControl.sql b/src/metadata.control/Scripts/MetadataAsCodeControl.sql index b58176a5..7748deeb 100644 --- a/src/metadata.control/Scripts/MetadataAsCodeControl.sql +++ b/src/metadata.control/Scripts/MetadataAsCodeControl.sql @@ -8,6 +8,7 @@ EXEC [control].[AddStages] 'Fact', 'Transform cleansed data and apply business l EXEC [control].[AddStages] 'Speed', 'Regular loading of frequently used data.', 0 EXEC [control].[AddStages] 'ControlRaw', 'Demo of wait pipelines representing Raw load .', 0 EXEC [control].[AddStages] 'ControlCleansed', 'Demo of wait pipelines representing cleansed load.', 0 +EXEC [control].[AddStages] 'ControlCurated', 'Demo of wait pipelines representing curated load.', 0 EXEC [control].[AddStages] 'ControlSpeed', 'Demo of wait pipelines loading of frequently used data.', 0 EXEC [control].[AddBatches] 'ControlDemoHourly', 'Ad-hoc Demo Batch for Control Wait pipeline executions.', 1 @@ -23,6 +24,7 @@ EXEC [control].[AddBatchStageLink] 'Daily', 'Dimension' EXEC [control].[AddBatchStageLink] 'Daily', 'Fact' EXEC [control].[AddBatchStageLink] 'ControlDemoDaily', 'ControlRaw' EXEC [control].[AddBatchStageLink] 'ControlDemoDaily', 'ControlCleansed' +EXEC [control].[AddBatchStageLink] 'ControlDemoDaily', 'ControlCurated' -- Add tenants to the metadata control table. EXEC [control].[AddTenants] '$(TenantID)', 'Default', 'Example value for $(Environment) environment.' diff --git a/src/metadata.control/Security/db_cumulususer.sql b/src/metadata.control/Security/db_cumulususer.sql index d1ca9bbf..86d6f9bb 100644 --- a/src/metadata.control/Security/db_cumulususer.sql +++ b/src/metadata.control/Security/db_cumulususer.sql @@ -14,6 +14,14 @@ GRANT ON SCHEMA::[control] TO [db_cumulususer] GO +GRANT + EXECUTE, + SELECT, + CONTROL, + ALTER +ON SCHEMA::[dbo] TO [db_cumulususer] +GO + /* ALTER ROLE [db_cumulususer] ADD MEMBER [cumulusfactorydev]; diff --git a/src/metadata.control/control/Stored Procedures/GetLineageProperties.sql b/src/metadata.control/control/Stored Procedures/GetLineageProperties.sql new file mode 100644 index 00000000..b725cc97 --- /dev/null +++ b/src/metadata.control/control/Stored Procedures/GetLineageProperties.sql @@ -0,0 +1,41 @@ +CREATE PROCEDURE control.GetLineageProperties +AS +BEGIN + DECLARE @MarkdownTargetType VARCHAR(250); + DECLARE @PATSecretName VARCHAR(250); + DECLARE @URLSecretName VARCHAR(250); + DECLARE @OverrideCustomMarkdownTargetType BIT; + + SELECT + @MarkdownTargetType = PropertyValue + FROM control.LineageProperties +WHERE PropertyName = 'MarkdownTargetType'; + + SELECT + @PATSecretName = PropertyValue + FROM control.LineageProperties + WHERE PropertyName = 'PATSecretName'; + + SELECT + @URLSecretName = PropertyValue + FROM control.LineageProperties + WHERE PropertyName = 'URLSecretName'; + + SELECT + @OverrideCustomMarkdownTargetType = PropertyValue + FROM control.LineageProperties + WHERE PropertyName = 'OverrideCustomMarkdownTargetType'; + + + IF @MarkdownTargetType NOT IN ('AzureDevOps', 'Local', 'GitHub') + AND @OverrideCustomMarkdownTargetType = 0 + BEGIN + RAISERROR('Invalid Markdown target selected. Please review your site is supported, otherwise set the "OverrideCustomMarkdownTargetType" property to 1.',16,1) + RETURN 0; + END + + SELECT + @MarkdownTargetType AS MarkdownTargetType, + @PATSecretName AS PatSecretName, + @URLSecretName AS URLSecretName +END; \ No newline at end of file diff --git a/src/metadata.control/control/Stored Procedures/GetOrchestrationLineage.sql b/src/metadata.control/control/Stored Procedures/GetOrchestrationLineage.sql new file mode 100644 index 00000000..d28431af --- /dev/null +++ b/src/metadata.control/control/Stored Procedures/GetOrchestrationLineage.sql @@ -0,0 +1,721 @@ +CREATE PROCEDURE [control].[GetOrchestrationLineage] ( + -- Core + @BatchName VARCHAR(255), + + -- Customisation + @UseStatusColours BIT = 0, + @StageLineageLevel VARCHAR(25) = 'Simple', + @UseExecutionHistory BIT = 1, + + -- Filters + @1_FilterFailedAndBlocked BIT = 0, + + @2_FilterDataset BIT = 0, + @2_DatasetDetails VARCHAR(MAX) = '', + + @3_FilterDataSource BIT = 0, + @3_FilterDataSourceByType BIT = 0, + @3_DataSourceDetails VARCHAR(MAX) = '', + + -- Visualisation + @SuccessColour VARCHAR(6) = '40B0A6', + @FailedColour VARCHAR(6) = 'E66100', + @BlockedColour VARCHAR(6) = 'DCDB88', + @RunningColour VARCHAR(6) = 'AEAEBD', + @DefaultColour VARCHAR(6) = 'ECECFF' + +) AS + /* EXPERIMENTAL: + 1. Filter chain of only failed and blocked records + 2. Filter chain for specific Data Source + 3. Filter chain for specific Datasets + */ + + SET NOCOUNT ON; + + -- Parameter Validation + DECLARE @BatchExists BIT; + + SELECT + @BatchExists = CASE WHEN COUNT(*) = 0 THEN 0 ELSE 1 END + FROM [control].[Batches] + WHERE BatchName = @BatchName; + + IF @BatchExists = 0 + BEGIN + RAISERROR('Batch name specified does not exist within metadata.',16,1); + RETURN ''; + END + + IF (@3_FilterDataSource = 1 OR @3_FilterDataSourceByType = 1) AND @2_FilterDataset = 1 + BEGIN + RAISERROR('Filtering Data Source and Specific Datasets is not an allowed combination currently.',16,1); + RETURN ''; + END + + IF (@3_FilterDataSource = 1 AND @3_FilterDataSourceByType = 1) + BEGIN + RAISERROR('Filtering Data Source and Data Sources by Type is not an allowed combination currently.',16,1); + RETURN; + END + /* + User requirements: I want to filter my results on a specific dataset. + This includes any pipelines which interact directly with the dataset. + This also includes any pipelines which are a dependency of pipelines interacting with the dataset. + This also includes any pipelines which are a pre-requisite of pipelines interacting with the dataset. + */ + + DECLARE @PipelineIds TABLE ( + PipelineId INT + ) + + IF @3_FilterDataSource = 1 + BEGIN + WITH filteredConnections AS( + SELECT c.ConnectionId + FROM common.Connections c + INNER JOIN OPENJSON(@3_DataSourceDetails) + WITH ( + name VARCHAR(100) '$.name' + ) j + ON c.ConnectionDisplayName = j.name + ) + , filteredConnectionsJoin AS ( + SELECT p.pipelineid + FROM [control].[Pipelines] p + LEFT JOIN [control].[PipelineParameters] pp + ON p.PipelineId = pp.PipelineId + INNER JOIN [control].[Orchestrators] o + ON p.[OrchestratorId] = o.[OrchestratorId] + INNER JOIN [control].[Stages] s + ON p.[StageId] = s.[StageId] + INNER JOIN [control].[BatchStageLink] bs + ON s.[StageId] = bs.[StageId] + INNER JOIN [control].[Batches] b + ON bs.[BatchId] = b.[BatchId] + LEFT JOIN [ingest].[Datasets] id + ON p.PipelineName like 'Ingest_PL_%' + AND pp.ParameterValue = CAST(id.DatasetId AS CHAR(4)) + LEFT JOIN [transform].[Datasets] td + ON p.PipelineName like 'Transform_PL_%' + AND pp.ParameterValue = CAST(td.DatasetId AS CHAR(4)) + + WHERE ( + id.ConnectionFK IN (SELECT ConnectionId FROM filteredConnections) + ) + ) + + INSERT INTO @PipelineIds (PipelineId) + SELECT pd.pipelineid + FROM filteredConnectionsJoin cte + INNER JOIN [control].PipelineDependencies pd + ON cte.pipelineId = pd.DependantPipelineId + OR cte.pipelineId = pd.PipelineId + UNION + SELECT pd.DependantPipelineId + FROM filteredConnectionsJoin cte + INNER JOIN [control].PipelineDependencies pd + ON cte.pipelineId = pd.DependantPipelineId + OR cte.pipelineId = pd.PipelineId + + END + IF @3_FilterDataSourceByType = 1 + BEGIN + WITH filteredConnections AS( + SELECT c.ConnectionId + FROM common.Connections c + INNER JOIN common.ConnectionTypes ct + ON c.ConnectionTypeFK = ct.ConnectionTypeId + INNER JOIN OPENJSON(@3_DataSourceDetails) + WITH ( + name VARCHAR(100) '$.name' + ) j + ON ct.ConnectionTypeDisplayName = j.name + ) + , filteredConnectionsJoin AS ( + SELECT p.pipelineid + FROM [control].[Pipelines] p + LEFT JOIN [control].[PipelineParameters] pp + ON p.PipelineId = pp.PipelineId + INNER JOIN [control].[Orchestrators] o + ON p.[OrchestratorId] = o.[OrchestratorId] + INNER JOIN [control].[Stages] s + ON p.[StageId] = s.[StageId] + INNER JOIN [control].[BatchStageLink] bs + ON s.[StageId] = bs.[StageId] + INNER JOIN [control].[Batches] b + ON bs.[BatchId] = b.[BatchId] + LEFT JOIN [ingest].[Datasets] id + ON p.PipelineName like 'Ingest_PL_%' + AND pp.ParameterValue = CAST(id.DatasetId AS CHAR(4)) + LEFT JOIN [transform].[Datasets] td + ON p.PipelineName like 'Transform_PL_%' + AND pp.ParameterValue = CAST(td.DatasetId AS CHAR(4)) + + WHERE ( + id.ConnectionFK IN (SELECT ConnectionId FROM filteredConnections) + ) + ) + + INSERT INTO @PipelineIds (PipelineId) + SELECT pd.pipelineid + FROM filteredConnectionsJoin cte + INNER JOIN [control].PipelineDependencies pd + ON cte.pipelineId = pd.DependantPipelineId + OR cte.pipelineId = pd.PipelineId + UNION + SELECT pd.DependantPipelineId + FROM filteredConnectionsJoin cte + INNER JOIN [control].PipelineDependencies pd + ON cte.pipelineId = pd.DependantPipelineId + OR cte.pipelineId = pd.PipelineId + + END + + IF @2_FilterDataset = 1 + BEGIN + + WITH filteredDatasets AS( + SELECT * + FROM OPENJSON(@2_DatasetDetails) + WITH (name VARCHAR(100) '$.name' ) + ) + + , filteredDatasetJoin AS ( + SELECT p.pipelineid + FROM [control].[Pipelines] p + LEFT JOIN [control].[PipelineParameters] pp + ON p.PipelineId = pp.PipelineId + INNER JOIN [control].[Orchestrators] o + ON p.[OrchestratorId] = o.[OrchestratorId] + INNER JOIN [control].[Stages] s + ON p.[StageId] = s.[StageId] + INNER JOIN [control].[BatchStageLink] bs + ON s.[StageId] = bs.[StageId] + INNER JOIN [control].[Batches] b + ON bs.[BatchId] = b.[BatchId] + LEFT JOIN [ingest].[Datasets] id + ON p.PipelineName like 'Ingest_PL_%' + AND pp.ParameterValue = CAST(id.DatasetId AS CHAR(4)) + LEFT JOIN [transform].[Datasets] td + ON p.PipelineName like 'Transform_PL_%' + AND pp.ParameterValue = CAST(td.DatasetId AS CHAR(4)) + + WHERE ( + id.DatasetDisplayName IN (SELECT name FROM filteredDatasets) + OR + td.DatasetName IN (SELECT name FROM filteredDatasets) + ) + ) + + INSERT INTO @PipelineIds (PipelineId) + SELECT pd.pipelineid + FROM filteredDatasetJoin cte + INNER JOIN [control].PipelineDependencies pd + ON cte.pipelineId = pd.DependantPipelineId + OR cte.pipelineId = pd.PipelineId + UNION + SELECT pd.DependantPipelineId + FROM filteredDatasetJoin cte + INNER JOIN [control].PipelineDependencies pd + ON cte.pipelineId = pd.DependantPipelineId + OR cte.pipelineId = pd.PipelineId + END + ELSE + BEGIN + INSERT INTO @PipelineIds (PipelineId) + SELECT PipelineId + FROM [control].[Pipelines] + END + + DECLARE @UsingBatchExecutions BIT; + SELECT @UsingBatchExecutions = CAST(PropertyValue AS INT) + FROM control.Properties + WHERE PropertyName = 'UseExecutionBatches'; + + + + DECLARE @PageContent NVARCHAR(MAX) = ''; + DECLARE @BaseData TABLE + ( + [OrchestratorId] INT NOT NULL, + [OrchestratorName] NVARCHAR(200) NOT NULL, + [StageId] INT NOT NULL, + [StageName] VARCHAR(225) NOT NULL, + [PipelineId] INT NOT NULL, + [PipelineName] NVARCHAR(200) NOT NULL, + [AdditionalPipelineInfo] NVARCHAR(500) NULL + ) + + -- Get LatestExecution counts + DECLARE @CountCurrentExecution INT; + DECLARE @CountLatestExecution INT; + + IF @UseExecutionHistory = 1 + BEGIN + SELECT @CountCurrentExecution = COALESCE(COUNT(*),0) + FROM control.CurrentExecution + WHERE (@UsingBatchExecutions = 1 + AND LocalExecutionId = ( + SELECT TOP 1 ExecutionId FROM control.BatchExecution WHERE BatchName = @BatchName + )) + OR @UsingBatchExecutions = 0; + + SELECT @CountLatestExecution = COALESCE(COUNT(*),0) + FROM control.ExecutionLog + WHERE (@UsingBatchExecutions = 1 + AND LocalExecutionId = ( + SELECT TOP 1 ExecutionId FROM control.BatchExecution WHERE BatchName = @BatchName + )) + OR @UsingBatchExecutions = 0; + END + ELSE + BEGIN + SET @CountCurrentExecution = 0 + SET @CountLatestExecution = 0 + END + + --get reusable metadata + INSERT INTO @BaseData + SELECT + o.[OrchestratorId], + o.[OrchestratorName], + s.[StageId], + s.[StageName], + p.[PipelineId], + p.[PipelineName], + CASE + WHEN STRING_AGG(pp.ParameterName,'') IS NOT NULL THEN CONCAT(' - ',STRING_AGG(CONCAT(pp.ParameterName, ': ', REPLACE(REPLACE(pp.ParameterValue,'[','#91;'),']','#93;')),',')) + ELSE '' + END + FROM + [control].[Pipelines] p + LEFT JOIN [control].[PipelineParameters] pp + ON p.PipelineId = pp.PipelineId + INNER JOIN [control].[Orchestrators] o + ON p.[OrchestratorId] = o.[OrchestratorId] + INNER JOIN [control].[Stages] s + ON p.[StageId] = s.[StageId] + INNER JOIN [control].[BatchStageLink] bs + ON s.[StageId] = bs.[StageId] + INNER JOIN [control].[Batches] b + ON bs.[BatchId] = b.[BatchId] + LEFT JOIN [ingest].[Datasets] id + ON p.PipelineName like 'Ingest_PL_%' + AND pp.ParameterValue = CAST(id.DatasetId AS CHAR(4)) + LEFT JOIN [transform].[Datasets] td + ON p.PipelineName like 'Transform_PL_%' + AND pp.ParameterValue = CAST(td.DatasetId AS CHAR(4)) + LEFT JOIN [control].[CurrentExecution] AS ce -- IF OR + ON p.PipelineId = ce.PipelineId + AND @CountCurrentExecution > 0 + LEFT JOIN [control].[ExecutionLog] AS el -- IF OR + ON p.PipelineId = el.PipelineId + AND @CountLatestExecution > 0 + AND @CountCurrentExecution = 0 + AND el.LocalExecutionId = ( + SELECT TOP 1 LocalExecutionId FROM control.ExecutionLog ORDER BY LogId DESC) + AND LogId IN (SELECT MAX(LogId) FROM control.ExecutionLog GROUP BY PipelineId) + WHERE + p.[Enabled] = 1 + AND b.[BatchName] = @BatchName + AND id.DatasetId IS NULL + AND td.DatasetId IS NULL + AND ( + -- Filter for blocked and failed against current executions table + (@1_FilterFailedAndBlocked = 1 AND @CountCurrentExecution > 0 AND ce.PipelineStatus IN ('Failed' , 'Blocked')) OR + -- Filter for blocked and failed against latest executions table + (@1_FilterFailedAndBlocked = 1 AND @CountLatestExecution > 0 AND @CountCurrentExecution = 0 AND el.PipelineStatus IN ('Failed' , 'Blocked')) OR + -- No filter on pipeline status + (@1_FilterFailedAndBlocked = 0)) + AND p.PipelineId IN ( + SELECT PipelineId + FROM @PipelineIds + ) + GROUP BY o.[OrchestratorId], + o.[OrchestratorName], + s.[StageId], + s.[StageName], + p.[PipelineId], + p.[PipelineName] + + UNION + + SELECT + o.[OrchestratorId], + o.[OrchestratorName], + s.[StageId], + s.[StageName], + p.[PipelineId], + p.[PipelineName], + CASE + WHEN id.DatasetId IS NOT NULL THEN CONCAT(' - ', id.DatasetDisplayName) + WHEN td.DatasetId IS NOT NULL THEN CONCAT(' - ', td.DatasetName) + ELSE '' + END + FROM + [control].[Pipelines] p + LEFT JOIN [control].[PipelineParameters] pp + ON p.PipelineId = pp.PipelineId + INNER JOIN [control].[Orchestrators] o + ON p.[OrchestratorId] = o.[OrchestratorId] + INNER JOIN [control].[Stages] s + ON p.[StageId] = s.[StageId] + INNER JOIN [control].[BatchStageLink] bs + ON s.[StageId] = bs.[StageId] + INNER JOIN [control].[Batches] b + ON bs.[BatchId] = b.[BatchId] + LEFT JOIN [ingest].[Datasets] id + ON p.PipelineName like 'Ingest_PL_%' + AND pp.ParameterValue = CAST(id.DatasetId AS CHAR(4)) + LEFT JOIN [transform].[Datasets] td + ON p.PipelineName like 'Transform_PL_%' + AND pp.ParameterValue = CAST(td.DatasetId AS CHAR(4)) + LEFT JOIN [control].[CurrentExecution] AS ce -- IF OR + ON p.PipelineId = ce.PipelineId + AND @CountCurrentExecution > 0 + LEFT JOIN [control].[ExecutionLog] AS el -- IF OR + ON p.PipelineId = el.PipelineId + AND @CountLatestExecution > 0 + AND @CountCurrentExecution = 0 + AND el.LocalExecutionId = ( + SELECT TOP 1 LocalExecutionId FROM control.ExecutionLog ORDER BY LogId DESC) + AND LogId IN (SELECT MAX(LogId) FROM control.ExecutionLog GROUP BY PipelineId) + WHERE + p.[Enabled] = 1 + AND b.[BatchName] = @BatchName + AND (id.[DatasetId] IS NOT NULL OR td.[DatasetId] IS NOT NULL) + AND ( + -- Filter for blocked and failed against current executions table + (@1_FilterFailedAndBlocked = 1 AND @CountCurrentExecution > 0 AND ce.PipelineStatus IN ('Failed' , 'Blocked')) OR + -- Filter for blocked and failed against latest executions table + (@1_FilterFailedAndBlocked = 1 AND @CountLatestExecution > 0 AND @CountCurrentExecution = 0 AND el.PipelineStatus IN ('Failed' , 'Blocked')) OR + -- No filter on pipeline status + (@1_FilterFailedAndBlocked = 0)) + AND p.PipelineId IN ( + SELECT PipelineId + FROM @PipelineIds + ); + + + --add orchestrator(s) sub graphs + ;WITH orchestrators AS + ( + SELECT DISTINCT + [OrchestratorId], + [OrchestratorName], + 'subgraph ' + [OrchestratorName] + '\n' + + 'style ' + [OrchestratorName] + ' fill:#F5F5F5,stroke:#F5F5F5' + '\n' + + '##o' + CAST([OrchestratorId] * 10000 AS VARCHAR) + '##' + '\n' + 'end' + '\n' + AS OrchestratorSubGraphs + FROM + @BaseData + ) + + SELECT + @PageContent += OrchestratorSubGraphs + FROM + orchestrators; + + + + --add stage sub graphs + ;WITH stages AS + ( + SELECT DISTINCT + [OrchestratorId], + [StageName], + [StageId] + FROM + @BaseData + ), + stageSubs AS + ( + SELECT + [OrchestratorId], + STRING_AGG('subgraph ' + [StageName] + '\n' + + 'style ' + [StageName] + ' fill:#E0E0E0,stroke:#E0E0E0' + '\n' + + '##s' + CAST([StageId] AS VARCHAR) + '##' + '\n' + 'end', '\n' + ) AS 'StageSubGraphs' + FROM + stages + GROUP BY + [OrchestratorId] + ) + SELECT + @PageContent = REPLACE(@PageContent,'##o' + CAST([OrchestratorId] * 10000 AS VARCHAR) + '##',[StageSubGraphs]) + FROM + stageSubs; + + --add pipelines within stage + + DECLARE @LatestExecutions TABLE ( + [LocalExecutionId] [uniqueidentifier] NULL, + [StageId] [int] NOT NULL, + [PipelineId] [int] NOT NULL, + [PipelineName] [nvarchar](200) NULL, + [PipelineStatus] [nvarchar](200) NULL, + [HexColour] [nvarchar](6) NOT NULL, + [PipelinePrecedence] INT NULL + ) + + + + IF @CountCurrentExecution > 0 + BEGIN + INSERT INTO @LatestExecutions ( + [LocalExecutionId], + [StageId], + [PipelineId], + [PipelineName], + [PipelineStatus], + [HexColour], + [PipelinePrecedence]) + SELECT + [LocalExecutionId], + [StageId], + [PipelineId], + [PipelineName], + [PipelineStatus], + CASE + WHEN @UseStatusColours = 1 AND PipelineStatus = 'Success' THEN @SuccessColour + WHEN @UseStatusColours = 1 AND PipelineStatus = 'Blocked' THEN @BlockedColour + WHEN @UseStatusColours = 1 AND PipelineStatus = 'Failed' THEN @FailedColour + WHEN @UseStatusColours = 1 AND PipelineStatus = 'Running' THEN @RunningColour + ELSE @DefaultColour + END AS HexColour, + CASE + WHEN PipelineStatus = 'Success' THEN 1 + WHEN PipelineStatus = 'Blocked' THEN 2 + WHEN PipelineStatus = 'Failed' THEN 3 + WHEN PipelineStatus = 'Running' THEN 4 + WHEN PipelineStatus = 'Pending' THEN 5 + ELSE 999 + END AS [PipelinePrecedence] + FROM control.CurrentExecution + END + + ELSE IF @CountCurrentExecution = 0 AND @CountLatestExecution > 0 + BEGIN + INSERT INTO @LatestExecutions ( + [LocalExecutionId], + [StageId], + [PipelineId], + [PipelineName], + [PipelineStatus], + [HexColour], + [PipelinePrecedence]) + SELECT + [LocalExecutionId], + [StageId], + [PipelineId], + [PipelineName], + [PipelineStatus], + CASE + WHEN @UseStatusColours = 1 AND PipelineStatus = 'Success' THEN @SuccessColour + WHEN @UseStatusColours = 1 AND PipelineStatus = 'Blocked' THEN @BlockedColour + WHEN @UseStatusColours = 1 AND PipelineStatus = 'Failed' THEN @FailedColour + WHEN @UseStatusColours = 1 AND PipelineStatus = 'Running' THEN @RunningColour + ELSE @DefaultColour + END AS HexColour, + CASE + WHEN PipelineStatus = 'Success' THEN 1 + WHEN PipelineStatus = 'Blocked' THEN 2 + WHEN PipelineStatus = 'Failed' THEN 3 + WHEN PipelineStatus = 'Running' THEN 4 + WHEN PipelineStatus = 'Pending' THEN 5 + ELSE 999 + END AS [PipelinePrecedence] + FROM control.ExecutionLog + WHERE LocalExecutionId = ( + SELECT TOP 1 LocalExecutionId FROM control.ExecutionLog ORDER BY LogId DESC) + AND LogId IN (SELECT MAX(LogId) FROM control.ExecutionLog GROUP BY PipelineId) + + END + + ELSE IF @CountCurrentExecution = 0 AND @CountLatestExecution = 0 + BEGIN + INSERT INTO @LatestExecutions ( + [PipelineId], + [StageId], + [HexColour]) + SELECT + [PipelineId], + [StageId], + @DefaultColour AS HexColour + FROM @BaseData + END + + ;WITH pipelines AS + ( + SELECT + BE.StageId, + STRING_AGG( + CAST( + 'p' + CAST(BE.PipelineId * 10 AS VARCHAR(MAX)) + + '(' + BE.PipelineName + BE.AdditionalPipelineInfo + ')' + '\n' + + 'style p' + CAST(BE.PipelineId * 10 AS VARCHAR(MAX)) + + ' fill:#' + LE.HexColour + ',stroke:#' + LE.HexColour + AS VARCHAR(MAX) + ), + '\n' + ) AS PipelinesInStage + FROM @BaseData BE + INNER JOIN @LatestExecutions LE + ON BE.PipelineId = LE.PipelineId + GROUP BY BE.StageId + ) + + SELECT + @PageContent = REPLACE(@PageContent,'##s' + CAST([StageId] AS VARCHAR) + '##',[PipelinesInStage]) + FROM + pipelines + + --add stage nodes + ;WITH stageNodeExecutions AS ( + SELECT + BE.StageId, + MAX(LE.PipelinePrecedence) AS StageStatus + FROM + @BaseData BE + LEFT JOIN + @LatestExecutions LE + ON BE.PipelineId = LE.PipelineId + GROUP BY BE.StageId + ), + StageNodeStatuses AS ( + SELECT + StageId + ,CASE + WHEN @UseStatusColours = 1 AND StageStatus = 1 THEN @SuccessColour + WHEN @UseStatusColours = 1 AND StageStatus = 2 THEN @BlockedColour + WHEN @UseStatusColours = 1 AND StageStatus = 3 THEN @FailedColour + WHEN @UseStatusColours = 1 AND StageStatus = 4 THEN @RunningColour + ELSE @DefaultColour + END AS HexColour + FROM stageNodeExecutions + ) + + ,stageNodes AS + ( + SELECT DISTINCT + BE.[StageId], + 's' + CAST(BE.[StageId] * 100 AS VARCHAR) + '[' + BE.[StageName] + ']' + '\n' + + 'style s' + CAST(BE.[StageId] * 100 AS VARCHAR) + ' fill:#' + SNS.HexColour + ',stroke:#' + SNS.HexColour + '\n' AS StageNode + FROM + @BaseData BE + LEFT JOIN + StageNodeStatuses SNS + ON BE.StageId = SNS.StageId + + ) + SELECT + @PageContent = @PageContent + [StageNode] + FROM + stageNodes + ORDER BY + [StageId]; + + --add stage to pipeline relationships + IF @StageLineageLevel = 'Detail' + BEGIN + SELECT @PageContent = @PageContent + 's' + CAST([StageId] * 100 AS VARCHAR) + + ' --> ' + 'p' + CAST([PipelineId] * 10 AS VARCHAR) + '\n' + FROM @BaseData; + END + ELSE IF @StageLineageLevel = 'Simple' + BEGIN + WITH StageLineageCTE AS ( + SELECT DISTINCT + 's' + CAST([StageId] * 100 AS VARCHAR) + + ' --> ' + [StageName] + '\n' AS StageLineage + FROM @BaseData) + SELECT @PageContent = @PageContent + StageLineage + FROM StageLineageCTE + END + + --add stage to stage relationships + ;WITH maxStage AS + ( + SELECT + MAX([StageId]) -1 AS maxStageId + FROM + @BaseData + ), + nextStage AS ( + SELECT DISTINCT + a.[StageId], + CASE + WHEN MIN(b.[StageId]) IS NULL THEN a.[StageId] + 1 + ELSE MIN(b.[StageId]) + END AS [NextStageId] + FROM @BaseData a + LEFT JOIN @BaseData b + ON b.[StageId] > a.[StageId] + + GROUP BY a.[StageId] + ), + stageToStage AS + ( + SELECT DISTINCT + 's' + CAST(b.[StageId] * 100 AS VARCHAR) + + ' ==> ' + 's' + CAST(n.[NextStageId] * 100 AS VARCHAR) + '\n' AS Content + FROM + @BaseData b + INNER JOIN nextStage n + ON b.StageId = n.StageId + CROSS JOIN maxStage + WHERE + b.[StageId] <= maxStage.[maxStageId] + ) + + SELECT + @PageContent = @PageContent + [Content] + FROM + stageToStage + + --add pipeline to pipeline relationships + ;WITH pipelineRelationships AS ( + SELECT DISTINCT 'p' + CAST(pd.[PipelineId] * 10 AS VARCHAR) + + ' -.-> ' + 'p' + CAST(pd.[DependantPipelineId] * 10 AS VARCHAR) + '\n' AS RelationshipTxt + FROM + [control].[PipelineDependencies] pd + INNER JOIN @BaseData b1 + ON pd.[PipelineId] = b1.[PipelineId] + INNER JOIN @BaseData b2 + ON pd.[DependantPipelineId] = b2.[PipelineId] + ) + + SELECT + @PageContent = @PageContent + RelationshipTxt + FROM + pipelineRelationships; + --add batch subgraph + SELECT + @PageContent = 'subgraph ' + [BatchName] + '\n' + + 'style ' + @BatchName + ' fill:#DEEBF7,stroke:#DEEBF7' + '\n' + @PageContent + FROM + [control].[Batches] + WHERE + [BatchName] = @BatchName; + + SET @PageContent = @PageContent + 'end'; + + --add mermaid header + DECLARE @PageHeader VARCHAR(1000) = '::: mermaid' + '\n' + 'graph' + IF @UseStatusColours = 1 + BEGIN + SET @PageHeader = @PageHeader + '\n' + 'subgraph Legend [Pipeline Status]' + '\n' + 'style Legend fill:#FFFFFF,stroke:#FFFFFF' + '\n' + + 'success(Success)' + '\n' + 'style success fill:#' + @SuccessColour +',stroke:#' + @SuccessColour + '\n' + + 'failure(Failure)' + '\n' + 'style failure fill:#' + @FailedColour + ',stroke:#' + @FailedColour + '\n' + + 'blocked(Blocked)' + '\n' + 'style blocked fill:#' + @BlockedColour + ',stroke:#' + @BlockedColour + '\n' + + 'running(Running)' + '\n' + 'style running fill:#' + @RunningColour + ',stroke:#' + @RunningColour + '\n' + + 'pending(Pending)' + '\n' + 'style pending fill:#' + @DefaultColour + ',stroke:#' + @DefaultColour + '\n' + + 'end' + END + SELECT + @PageContent = @PageHeader + '\n' + @PageContent + '\n' + ':::'; + + --return output + PRINT CAST(@PageContent AS NTEXT); + SELECT CAST(@PageContent AS NTEXT) AS MarkdownOutput; \ No newline at end of file diff --git a/src/metadata.control/control/Tables/LineageProperties.sql b/src/metadata.control/control/Tables/LineageProperties.sql new file mode 100644 index 00000000..929d190d --- /dev/null +++ b/src/metadata.control/control/Tables/LineageProperties.sql @@ -0,0 +1,11 @@ +CREATE TABLE [control].[LineageProperties] + ( + [PropertyId] [int] IDENTITY (1, 1) NOT NULL, + [PropertyName] NVARCHAR(50) NOT NULL, + [PropertyValue] NVARCHAR(250) NOT NULL, + [Description] VARCHAR(MAX) NULL, + [ValidFrom] [datetime] CONSTRAINT [DF_LineageProperties_ValidFrom] DEFAULT (GETDATE()) NOT NULL, + [ValidTo] [datetime] NULL, + [Enabled] BIT NOT NULL, + CONSTRAINT [PK_LineageProperties] PRIMARY KEY CLUSTERED ([PropertyId] ASC, [PropertyName] ASC) + ); \ No newline at end of file diff --git a/src/metadata.control/dbo/Stored Procedures/FailProcedure.sql b/src/metadata.control/dbo/Stored Procedures/FailProcedure.sql new file mode 100644 index 00000000..de9e7a91 --- /dev/null +++ b/src/metadata.control/dbo/Stored Procedures/FailProcedure.sql @@ -0,0 +1,12 @@ +CREATE PROCEDURE [dbo].[FailProcedure] + ( + @RaiseError VARCHAR(50) + ) +AS +BEGIN + IF(@RaiseError = 'true') + BEGIN + RAISERROR('The Stored Procedure intentionally failed.',16,1); + RETURN 0; + END +END; \ No newline at end of file diff --git a/src/metadata.control/metadata.control.sqlproj b/src/metadata.control/metadata.control.sqlproj index 44b80608..fd1ef5a4 100644 --- a/src/metadata.control/metadata.control.sqlproj +++ b/src/metadata.control/metadata.control.sqlproj @@ -68,6 +68,7 @@ + @@ -132,8 +133,12 @@ + + + + diff --git a/src/metadata.data/Scripts/ControlWaitPipelineMetadataAsCode.sql b/src/metadata.data/Scripts/ControlWaitPipelineMetadataAsCode.sql new file mode 100644 index 00000000..a7c6cebc --- /dev/null +++ b/src/metadata.data/Scripts/ControlWaitPipelineMetadataAsCode.sql @@ -0,0 +1,100 @@ +-- Metadata as Code for Control Demo Wait Pipelines +MERGE INTO control.pipelines AS target +USING ( + SELECT + o.OrchestratorId, + s.StageId, + p.PipelineName, + p.Enabled + FROM ( + VALUES + ('$(ADFName)', 'ControlRaw', 'Wait 1', 1), + ('$(ADFName)', 'ControlRaw', 'Wait 2', 1), + ('$(ADFName)', 'ControlRaw', 'Wait 3', 1), + ('$(ADFName)', 'ControlRaw', 'Intentional Error', 1), + ('$(ADFName)', 'ControlCleansed', 'Wait 4', 1), + ('$(ADFName)', 'ControlCleansed', 'Wait 5', 1), + ('$(ADFName)', 'ControlCleansed', 'Wait 6', 1), + ('$(ADFName)', 'ControlCleansed', 'Wait 7', 1), + ('$(ADFName)', 'ControlCurated', 'Wait 8', 1), + ('$(ADFName)', 'ControlCurated', 'Wait 9', 1), + ('$(ADFName)', 'ControlCurated', 'Wait 10', 1) + ) AS p(OrchestratorName, StageName, PipelineName, Enabled) + INNER JOIN control.orchestrators o ON o.OrchestratorName = p.OrchestratorName + INNER JOIN control.stages s ON s.StageName = p.StageName +) AS source (OrchestratorId, StageId, PipelineName, Enabled) +ON ( + target.OrchestratorId = source.OrchestratorId AND + target.StageId = source.StageId AND + target.PipelineName = source.PipelineName +) +WHEN MATCHED THEN + UPDATE SET Enabled = source.Enabled +WHEN NOT MATCHED THEN + INSERT (OrchestratorId, StageId, PipelineName, Enabled) + VALUES (source.OrchestratorId, source.StageId, source.PipelineName, source.Enabled); + + +-- Metadata as Code for Control Demo Wait Pipeline Parameters +MERGE INTO control.pipelineparameters AS target +USING ( + SELECT + p.PipelineId, + params.ParameterName, + params.ParameterValue + FROM ( + VALUES + ('Wait 1', 'WaitTime', '2'), + ('Wait 2', 'WaitTime', '7'), + ('Wait 3', 'WaitTime', '1'), + ('Intentional Error', 'RaiseErrors', 'true'), + ('Wait 4', 'WaitTime', '1'), + ('Wait 5', 'WaitTime', '1'), + ('Wait 6', 'WaitTime', '4'), + ('Wait 7', 'WaitTime', '7'), + ('Wait 8', 'WaitTime', '1'), + ('Wait 9', 'WaitTime', '1'), + ('Wait 10', 'WaitTime', '1') + ) AS params(PipelineName, ParameterName, ParameterValue) + INNER JOIN control.pipelines p + ON p.PipelineName = params.PipelineName +) AS source (PipelineId, ParameterName, ParameterValue) +ON ( + target.PipelineId = source.PipelineId AND + target.ParameterName = source.ParameterName +) +WHEN MATCHED THEN + UPDATE SET ParameterValue = source.ParameterValue +WHEN NOT MATCHED THEN + INSERT (PipelineId, ParameterName, ParameterValue) + VALUES (source.PipelineId, source.ParameterName, source.ParameterValue); + +-- Metadata as Code for Control Demo Wait Pipeline Dependencies +MERGE INTO control.PipelineDependencies AS target +USING ( + SELECT + p.PipelineId, + d.PipelineId AS DependantPipelineId + FROM ( + VALUES + ('Wait 1', 'Wait 4'), + ('Wait 2', 'Wait 5'), + ('Wait 3', 'Wait 6'), + ('Intentional Error', 'Wait 7'), + ('Wait 4', 'Wait 8'), + ('Wait 5', 'Wait 8'), + ('Wait 6', 'Wait 9'), + ('Wait 7', 'Wait 10') + ) AS deps(PipelineName, DependantPipelineName) + INNER JOIN control.pipelines p + ON p.PipelineName = deps.PipelineName + INNER JOIN control.pipelines d + ON d.PipelineName = deps.DependantPipelineName +) AS source (PipelineId, DependantPipelineId) +ON ( + target.PipelineId = source.PipelineId AND + target.DependantPipelineId = source.DependantPipelineId +) +WHEN NOT MATCHED THEN + INSERT (PipelineId, DependantPipelineId) + VALUES (source.PipelineId, source.DependantPipelineId); \ No newline at end of file diff --git a/src/metadata.data/Scripts/IngestDatasetsMetadataAsCode.sql b/src/metadata.data/Scripts/IngestDatasetsMetadataAsCode.sql index e0fd63bb..fe0c1c1b 100644 --- a/src/metadata.data/Scripts/IngestDatasetsMetadataAsCode.sql +++ b/src/metadata.data/Scripts/IngestDatasetsMetadataAsCode.sql @@ -1,9 +1,9 @@ -- Metadata As Code - Ingest - add sample Datasets --Datasets: -EXEC [ingest].[AddDatasets] 'AdventureWorksDemo', 'Ingest_LS_SQLDB_MIAuth', 'CumulusDemoDataSource01', 'CF.Cumulus.Ingest.Compute', 'SalesOrderHeader', 'SalesLT', 'SalesOrderHeader', 'parquet', 1, '2025-01-01 00:00:00.0000000', NULL, 'I', 0, 0, 'WHERE ModifiedDate > GETDATE() - 7', 'SalesLT', 'SalesOrderHeader', 1; -EXEC [ingest].[AddDatasets] 'AdventureWorksDemo', 'Ingest_LS_SQLDB_MIAuth', 'CumulusDemoDataSource01', 'CF.Cumulus.Ingest.Compute', 'SalesOrderDetail', 'SalesLT', 'SalesOrderDetail', 'parquet', 1, '2025-01-01 00:00:00.0000000', NULL, 'I', 0, 0, 'WHERE ModifiedDate > GETDATE() - 7', 'SalesLT', 'SalesOrderDetail', 1; -EXEC [ingest].[AddDatasets] 'AdventureWorksDemo', 'Ingest_LS_SQLDB_MIAuth', 'CumulusDemoDataSource01', 'CF.Cumulus.Ingest.Compute', 'Product', 'SalesLT', 'Product', 'parquet', 1, '2025-01-01 00:00:00.0000000', NULL, 'I', 0, 0, 'WHERE ModifiedDate > GETDATE() - 7', 'SalesLT', 'Product', 1; +EXEC [ingest].[AddDatasets] 'AdventureWorksDemo', 'Ingest_LS_SQLDB_MIAuth', '$(DemoResourceName)', 'CF.Cumulus.Ingest.Compute', 'SalesOrderHeader', 'SalesLT', 'SalesOrderHeader', 'parquet', 1, '2025-01-01 00:00:00.0000000', NULL, 'I', 0, 0, 'WHERE ModifiedDate > GETDATE() - 7', 'SalesLT', 'SalesOrderHeader', 1; +EXEC [ingest].[AddDatasets] 'AdventureWorksDemo', 'Ingest_LS_SQLDB_MIAuth', '$(DemoResourceName)', 'CF.Cumulus.Ingest.Compute', 'SalesOrderDetail', 'SalesLT', 'SalesOrderDetail', 'parquet', 1, '2025-01-01 00:00:00.0000000', NULL, 'I', 0, 0, 'WHERE ModifiedDate > GETDATE() - 7', 'SalesLT', 'SalesOrderDetail', 1; +EXEC [ingest].[AddDatasets] 'AdventureWorksDemo', 'Ingest_LS_SQLDB_MIAuth', '$(DemoResourceName)', 'CF.Cumulus.Ingest.Compute', 'Product', 'SalesLT', 'Product', 'parquet', 1, '2025-01-01 00:00:00.0000000', NULL, 'I', 0, 0, 'WHERE ModifiedDate > GETDATE() - 7', 'SalesLT', 'Product', 1; --Attributes for SalesOrderHeader EXEC [ingest].[AddAttributes] 'AdventureWorksDemo', 'SalesOrderHeader', 1, 'SalesOrderID', 'int', 'INTEGER', '', '', 1, 0, 1 diff --git a/src/metadata.data/Scripts/Script.PostDeployment.sql b/src/metadata.data/Scripts/Script.PostDeployment.sql index 31d59031..1f56ec3d 100644 --- a/src/metadata.data/Scripts/Script.PostDeployment.sql +++ b/src/metadata.data/Scripts/Script.PostDeployment.sql @@ -13,3 +13,4 @@ Post-Deployment Script Template :r .\IngestDatasetsMetadataAsCode.sql :r .\TransformDatasetsMetadataAsCode.sql :r .\ControlPipelineMetadataAsCode.sql +:r .\ControlWaitPipelineMetadataAsCode.sql diff --git a/src/metadata.data/metadata.data.sqlproj b/src/metadata.data/metadata.data.sqlproj index 10ee9055..4280dd1b 100644 --- a/src/metadata.data/metadata.data.sqlproj +++ b/src/metadata.data/metadata.data.sqlproj @@ -61,7 +61,9 @@ + + diff --git a/src/metadata.ingest/ingest/Stored Procedures/GetDatasetPayload.sql b/src/metadata.ingest/ingest/Stored Procedures/GetDatasetPayload.sql index 9db8a35c..a52c730f 100644 --- a/src/metadata.ingest/ingest/Stored Procedures/GetDatasetPayload.sql +++ b/src/metadata.ingest/ingest/Stored Procedures/GetDatasetPayload.sql @@ -357,6 +357,12 @@ BEGIN RETURN 0; END + ELSE IF (@LoadType = 'I') AND (@ConnectionType = 'Amazon S3') + BEGIN + RAISERROR('The Amazon S3 Connection type does not support incremental loading. Please change the load type in ingest.Datasets.',16,1) + RETURN 0; + END + ELSE IF (@LoadType = 'I') AND (@SourceLanguageType <> 'XML') AND (@ConnectionType <> 'REST API') BEGIN SELECT diff --git a/src/metadata.transform/transform/Stored Procedures/GetUnmanagedNotebookPayload.sql b/src/metadata.transform/transform/Stored Procedures/GetUnmanagedNotebookPayload.sql index bd2b12e3..ac586a8d 100644 --- a/src/metadata.transform/transform/Stored Procedures/GetUnmanagedNotebookPayload.sql +++ b/src/metadata.transform/transform/Stored Procedures/GetUnmanagedNotebookPayload.sql @@ -1,6 +1,6 @@ CREATE PROCEDURE [transform].[GetUnmanagedNotebookPayload] ( - @DatasetId INT + @NotebookId INT ) AS BEGIN @@ -11,9 +11,9 @@ BEGIN SELECT @ResultRowCount = COUNT(*) FROM - [transform].[Datasets] AS ds - INNER JOIN [transform].[Notebooks] AS n + INNER JOIN + [transform].[Datasets] AS ds ON n.NotebookId = ds.BusinessLogicNotebookFK INNER JOIN @@ -44,7 +44,7 @@ BEGIN [common].Connections AS cn5 ON cn5.ConnectionDisplayName = 'PrimaryKeyVault' WHERE - ds.DatasetId = @DatasetId + n.NotebookId = @NotebookId AND nt.NotebookTypeName = 'Unmanaged' AND @@ -58,13 +58,13 @@ BEGIN IF @ResultRowCount = 0 BEGIN - RAISERROR('No results returned for the provided Dataset Id. Confirm Dataset is enabled, and related Connections and Notebooks Parameters are enabled.',16,1) + RAISERROR('No results returned for the provided Notebook Id. Confirm Notebook is enabled, and related Connections and Parameters are enabled.',16,1) RETURN 0; END IF @ResultRowCount > 1 BEGIN - RAISERROR('Multiple results returned for the provided Dataset Id. Confirm that only a single active dataset is being referenced.',16,1) + RAISERROR('Multiple results returned for the provided Notebook Id. Confirm that only a single active Notebook is being referenced.',16,1) RETURN 0; END @@ -94,9 +94,9 @@ BEGIN ds.SchemaName, n.NotebookPath AS 'NotebookFullPath' FROM - [transform].[Datasets] AS ds - INNER JOIN [transform].[Notebooks] AS n + INNER JOIN + [transform].[Datasets] AS ds ON n.NotebookId = ds.BusinessLogicNotebookFK INNER JOIN @@ -128,7 +128,7 @@ BEGIN ON cn5.ConnectionDisplayName = 'PrimaryKeyVault' WHERE - ds.DatasetId = @DatasetId + n.NotebookId = @NotebookId AND nt.NotebookTypeName = 'Unmanaged' AND @@ -140,4 +140,4 @@ BEGIN AND ccn.Enabled = 1 -END +END \ No newline at end of file