假设我有两种不同类型的数据流,一种提供天气数据,另一种提供车辆数据,我想使用Flink对数据进行复杂的事件处理。
Flink 1.3.x中的哪种方法是正确的方法?我看到了不同的方法,如Union、Connect、Window Join。基本上,我只想尝试这样一个简单的CEP:
IF weather is wet AND vehicle speed > 60
WITHIN the last 10 seconds
THEN raise alert
谢谢!
我正在尝试将Oracle数据库表中的记录合并到本地SQL表中。
我有一个包的变量,它是一个对象,名为OWell。
我有一个数据流任务,它以SQL语句的形式获取OWell数据(select order by order by well_id,select well_id,order from order by Well_ID),然后还有一个转换任务,将well_id从长度为15的DT_STR转换为DT_WSTR;将well_name从长度为15的DT_STR转换为长度为50的DT_WSTR。然后将其存储在记录集OWell中。
进行转换的原因是我要向其中添加记录的表有一个标识字段: SSIS将we
在我看来,Processing Time Temporal Join用于流和外部数据库,并且总是基于联接条件在外部数据库中的最新值join。此外,当Processing Time Temporal Join时使用the external table is not feasible to materialize the table as a dynamic table within Flink.
类似地,Lookup Join用于流和外部数据库,并且始终基于联接条件在外部数据库中look up值。
Lookup Join会在Flink中实现外部数据库表吗?他们之间有什么区别?
我试图将数据插入数据库,但首先使用查找检查每一行是否存在,类似于这里建议的方法:
SELECT DISTINCT VALUES // OleDb Source
|
LOOKUP // If exists
| // No Match Output
OLE DB DESTINATION // Insert new records
我正在使用RetainSameConnection=True来启用工作流中的事务。使用大约10,000行的默认缓冲区,当行被传递到OLE DB目标时
UDF的整个scala项目都在这里: Flink_SQL_Client_UDF/Scala_fixed/ 我注册udf的操作如下所示: ①mvn scala:compile package
②cp table_api-1.0-SNAPSHOT.jar $FLINK_HOME/lib
③add the following sentence into $FLINK_HOME/conf/flink-conf.yaml
flink.execution.jars: $FLINK_HOME/lib/table_api-1.0-SNAPSHOT.jar
④create temporary function