因此,我们有一个拥有大约400个数据集和大约200条管道的工厂,它变得越来越笨拙。专注于从sql源复制到blob接收器。因为我们复制到blob,所以模式没有影响。我希望每个源都有一个数据集,每个blob帐户都有一个数据集,每个源/blob帐户的组合都有一个管道,从查找中动态地向它提供配置。
我们已经成功地开发了一个管道,它使用虚拟数据集作为源和宿。如果你给它输入一个查询,容器名称和文件夹名称,它就能工作。
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "DynamicCopy",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select 1 a"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "AzureSql",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob",
"type": "DatasetReference",
"parameters": {
"container": "raw-test",
"folder": "test"
}
}
]
}
]
}
}
当我们将一个查找放在它之前并将它包装在foreach中时,它停止工作。不是很有帮助的
"errorCode":"400","message":“内部活动失败”,"failureType":"UserError","target":"ForEach“
foreach中还没有实际引用查找存储过程[dbo].[adfdynamic]
:
create proc adfdynamic as
select 'raw-test' container, 'test_a' folder, 'select 1 a, 2 b'
UNION ALL
select 'raw-test' container, 'test_b' folder, 'select 3 c, 2 d'
所以我想要的行为是:
带有content {'c,d','3,2'}
的raw-
sql数据集:
{
"name": "AzureSql",
"properties": {
"linkedServiceName": {
"referenceName": "Dest",
"type": "LinkedServiceReference"
},
"type": "AzureSqlTable",
"structure": [
{
"name": "CustomerKey",
"type": "Int32"
},
{
"name": "Name",
"type": "String"
}
],
"typeProperties": {
"tableName": "[dbo].[DimCustomer]"
}
}
}
blob数据集:
{
"name": "AzureBlob",
"properties": {
"linkedServiceName": {
"referenceName": "AzureStorage1",
"type": "LinkedServiceReference"
},
"parameters": {
"container": {
"type": "String"
},
"folder": {
"type": "String"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ",",
"treatEmptyAsNull": false,
"skipLineCount": 0,
"firstRowAsHeader": false
},
"fileName": {
"value": "@{dataset().folder}/out.dsv",
"type": "Expression"
},
"folderPath": {
"value": "@dataset().container",
"type": "Expression"
}
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
和非工作的动态流水线:
{
"name": "Copy",
"properties": {
"activities": [
{
"name": "ForEach",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('Lookup').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "select 1 a, 2 b from dest",
"type": "Expression"
}
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "AzureSql",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob",
"type": "DatasetReference",
"parameters": {
"container": {
"value": "raw-test",
"type": "Expression"
},
"folder": {
"value": "folder",
"type": "Expression"
}
}
}
]
}
]
}
},
{
"name": "Lookup",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
}
}
]
}
}
对于格式设置,我深表歉意。一条消息中的代码太多了?
发布于 2018-08-31 11:02:27
发布于 2018-09-27 06:57:21
这不完全是对您问题的回答,但我做了一件使工作更简单的事情,那就是创建了一个名为GenericBlob的数据集。这有两个参数容器和路径。这可能有助于简化您正在做的事情。我也曾经有20个blob数据集,现在我有一个...(假设blobs位于相同的存储帐户中)。
https://stackoverflow.com/questions/52097236
复制相似问题