我试图在两个表上运行左外部联接,并将结果转换为DataStream。
我在使用flink之前所做的所有连接都是内部联接,我一直使用.toRetractStream[MyCaseClass](someQueryConfig)来跟踪连接。但是,由于左联接引入了空值,我从的理解是,我不能再使用case类,因为它们在将表转换为DataStream时不支持空值。
所以,我试着用POJO来完成这个任务。这是我的代码:
class EnrichedTaskUpdateJoin(val enrichedTaskId: String, val enrichedTaskJobId: String, val enr
我有两个来自卡夫卡的事件源,一个来自user主题,另一个来自order主题,这两个主题的事件在我的代码中被创建为Flink动态表。
我有以下简单的连接查询,希望计算每个用户所放置的订单数。
select user_id,count(order_id) from user join order on user.user_id = order.user_id,
我有以下问题:
假设应用程序开始运行时,用户和订单中没有数据。
当Flink连接器从Kafka主题获得新数据时,会立即运行查询任务吗?查询中将包括多少个事件?我一次从卡夫卡控制台写了1000个用户和订单。。
如果查询(连接查询或复杂查询)需
我有两个来自两个Kafka主题的流表,我想加入这些流并对所连接的数据执行聚合功能。需要使用滑动窗口连接流。在连接和窗口数据时,我将得到一个错误Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.。
下面是代码片段
select cep.payload['id'] , ep.payload['id'] ,
使用Flink 1.13.1和以Hive表为源和接收器的pyFlink和用户定义的表聚合函数(UDTAGG),我遇到了一个错误:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException:
Table sink 'myhive.mydb.flink_tmp_model' doesn't support consuming update changes
which is produced by node PythonGroupAggregate
这是接收器的表。
我正在读
,
它有以下例子:
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
我有两个问题:
如果o.order_time和s.ship_time是正常的时间列,而不是事件时间属性,那么所有的状态都会保存在Flink中,就像普通的规则内部连接那样?这样,也许大尺寸的状态就会保持在Flink中。
如果o.order_time和s.ship_time是事件时间属性,那
我有一个特定的任务,使用Apache Flink和一些额外的逻辑将两个数据流连接到一个聚合中。
基本上,我有两个数据流:事件流和所谓的meta-events流。我使用Apache Kafka作为消息主干。我试图实现的是根据aggregation/window中给出的信息触发meta-event到评估。基本情况是:
事件数据流( Data of events )开始根据某些键在某个聚合或窗口中不断积累Type A;The记录的记录;(元事件数据流)使用给定的键接收新的meta-event,这也定义了将在数据流事件流中发出的事件总数。,构成步骤3的事件数成为聚合的触发条件。当具有给定键的Type
我正在研究Flink中join的各种实现。在批处理模式下,我遇到了hybrid-hash join和sort-merge join。在这两种情况下,都有一个阻塞洗牌,它是在连接之前完成的,因此连接之前的操作符的输出被实现到一些非临时存储中,如所说的。
我现在正在查看流连接的情况。我看到了一个实现,其中为两个输入创建了两个哈希表。每当输入出现时,都会将其保存在哈希表中,并对其他哈希表进行探测以产生结果。为了限制哈希表的大小,我们在哈希表中放置了一个输入保存的窗口。我的第一个问题是:
Do all stream join cases have this requirement of a windo
我正在开发一个应用程序,它将一些文件上传到s3桶,稍后,从s3桶中读取文件,并将其推送到数据库。
我使用Flink 1.4.2和fs.s3a API从s3桶读取和写入文件。
上传文件到s3桶没有任何问题,但是当我的应用程序的第二阶段--从s3读取这些上传的文件--启动时,我的应用程序会抛出错误
Caused by: java.io.InterruptedIOException: Reopen at position 0 on s3a://myfilepath/a/b/d/4: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClient