这是一个虚构的玩具例子,试图在我的问题的更困难的部分得到帮助。假设我有来自卡夫卡流的销售数据:
...
Period: 5, SalesPersonId: 78, Sale: TRUE, Timestamp: ...,
Period: 5, SalesPersonId: 43, Sale: FALSE, Timestamp: ...,
Period: 5, SalesPersonId: 33, Sale: TRUE, Timestamp: ...,
...
每一行代表特定销售人员的销售机会(在特定期间)。
以下是周期的工作原理:周期大约持续2-3周。但是,这段时间并不在我的控制之下,当他们到达溪流时,他们已经被分配了。在期间之间的过渡中,我可能会在一到两天内收到最后一段时间的数据(例如,日本的销售地点可能还停留在旧时期)。有人在梁聊天建议我可以使用会话窗口的情况下,如果我只是包括在我的关键期间,并使间隙持续2天,大约。看上去很管用。
我很清楚如何做这样的事情,比如:每个期间的销售机会总数、每个销售人员的平均销售比率等等。例如,调用由以下查询产生的PCollection:A:
SELECT
period,
salesPersonId,
COUNT(*) as totalSalesOpportunities,
COUNT(*) FILTER(WHERE sale) as totalSales,
ROUND(COUNT(*) FILTER (WHERE SALE)/COUNT(*),2) as salesRate
FROM stream
GROUP BY period, salesPersonId
我的要求比这更复杂。假设我们公司有一个假设,在一段时间内有更多的销售机会的销售人员会有一个更好的销售率。也许总销售机会是动机的一种表现,或者额外的机会给了更多的尝试去销售那个时期出售的任何产品。所以,公司想要这个统计数字:
这一时期(到目前为止)销售机会在90%或以上的销售人员的总销售率是多少?10%或更低百分比?I.,
(TOTAL SALES MADE BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)/(TOTAL SALES OPPORTUNITIES BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)
当然,在一个时期的早期,第90个百分位数可能只有3个机会。但是,随着时间的推移,这种分配会分散开来,可能会有40个机会。那么,如果这个统计数据是每小时更新的,那就没关系了。
据我所知,我需要做以下工作,称之为B
Rekey A, apply ApproximateQuantiles, feed it back to filter A, reaggregate A.
但是,我认为这不可能是渐进的。那么,我如何表达“继续递增地做A,但B作为一个批处理操作每小时”呢?
或者,是否有更好的方法来处理这种情况?
发布于 2019-04-26 22:33:40
如果我正确理解您的问题,您需要对同一数据进行两种类型的聚合。
这里要注意的一点是,不能使A依赖于B,而B依赖于A,因为这会在管道图中创建一个循环。
您可以从包含原始输入流的PC1开始。
PC2: PC1 -> Do A‘(与A相同) -> Do B
PC3: PC1 -> Do A,以PC2作为侧输入。
您可以阅读更多关于侧输入这里的信息。
https://stackoverflow.com/questions/55816542
复制相似问题