作为流分析作业的输出设置的持久实体函数不会触发或接收任何数据。我可以在流分析作业中看到输入数据,它的输出可以正确地存储数据(对于流分析,作业存储,输出测试,我也设置了)。如果我用postman测试CalculatePositioni函数,该函数和实体将接收数据并正确地更新状态。
我还得到了一个函数应用程序的绑定错误,我看不到缺少了什么:
流分析职务输出与我的持续时间函数设置:

在输出创建时,我选择了键名默认值。我注意到,在函数应用程序中,我没有看到需要输入散列键的任何地方或local.settings.json。

我的功能应用程序。
namespace BladeFunctionApps
{
public static class TransformAlgorithm
{
public static async Task Process_Algorithm([EntityTrigger] IDurableEntityContext context, ILogger log)
{
//Set or Get intermediate calculation state values
Algorithm_SaveStateVariables intermediate_calc_results;
if (context.HasState)
{
intermediate_calc_results = context.GetState<Algorithm_SaveStateVariables>();
log.LogInformation("Success getting state values: CumItems:" + intermediate_calc_results.AverageMagnetometerX +
" Avg:"+ intermediate_calc_results.AverageMagnetometerX);
}
else
{
log.LogError("Failed to get state values.");
intermediate_calc_results = new Algorithm_SaveStateVariables();
}
//Get values from incoming Json object and perform calculations
JObject deviceBatchMessage = (JObject)JsonConvert.DeserializeObject(context.GetInput<string>());
Random rnd = new Random();
int incomingDataLines = rnd.Next(1, 10);
intermediate_calc_results.DataItemsNumber=incomingDataLines++;
bool parsebool=float.TryParse(deviceBatchMessage["body"]["magnetX"].ToString(), out float tempMagnX);
if(parsebool)
intermediate_calc_results.AverageMagnetometerX+= tempMagnX;
else
{
log.LogError("Could not parse to float: " + deviceBatchMessage["body"]["magnetX"].ToString());
intermediate_calc_results.AverageMagnetometerX += 0;
}
context.SetState(intermediate_calc_results);
}
[FunctionName("CalculatePosition")]
public static async Task<ActionResult> CalculateTransform(
[HttpTrigger(AuthorizationLevel.Function, "get", Route = null)] HttpRequest req,
[DurableClient] IDurableEntityClient client, ILogger log)
{
//read data
// Extract the body from the request
log.LogInformation("I'm at the top of CalculatePosition");
string jsonContent = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(jsonContent))
{
log.LogInformation("CalculatePosition: Received Data null or empty.");
return new StatusCodeResult(204); // 204, ASA connectivity check
}
else
log.LogInformation("CalculatePosition: Received Data:" + jsonContent);
//send data to algo for processing
var entityId = new EntityId(nameof(Process_Algorithm), "ProcessTransformAlgorithm");
await client.SignalEntityAsync(entityId,"FullOperation",jsonContent);
return (ActionResult)new OkObjectResult($"CalculatePosition: CalculatePosition Success");
}
}
}

缺少什么绑定?


在作业中使用查询的输入/输出设置应该可以工作,我认为是这样的,因为它与我用于输出到blob中并与blob一起工作的设置是相同的,而不是用于Azure蒸汽分析。



发布于 2022-11-18 17:20:49
您需要在Azure流分析作业上运行一个查询,以使数据流进入Azure函数。在运行查询之前,当将Azure函数绑定为输出到Azure流分析作业时,必须设置输出别名。
在这里您可以提供别名

一旦定义了属性,就可以创建一个查询将数据推送到输出别名和Azure函数中。下面是一个显示相同内容的示例查询。

查询根据流分析作业中的数据或字段而变化。有关更详细的信息,请参阅本教程用函数作为输出更新作业。下面是另一个示例,它将指导您进行绑定和配置过程。
下面是我创建的一个HTTPTrigger函数,用于测试作业和Azure函数之间的数据流。持久函数的行为与使用HTTPTrigger的行为相同。
public static class StreamAnalyticsFunction
{
[FunctionName("StreamAnalyticsFunction")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string name = req.Query["name"];
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic data = JsonConvert.DeserializeObject(requestBody);
name = name ?? data?.name;
string responseMessage = string.IsNullOrEmpty(name)
? "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response."
: $"Hello, {name}. This HTTP triggered function executed successfully.";
return new OkObjectResult(responseMessage);
}
}这里是我的功能集成和数据流。



https://stackoverflow.com/questions/74403626
复制相似问题