卡夫卡,Flink和Tidb的新版本。假设我有三个源MySql表-- s_a、s_b和s_c,并希望实时收集记录以针对TiDb表t_a和t_b。映射规则是
`s_a` --> `t_a`
`s_b` union `s_c` ---> `t_b` with some transformation (e.g., field remapping).
我采用的解决方案是kafka +带有Tidb接收器的Flink,其中binlog更改被订阅到Kafka主题;Flink使用该主题并将转换结果写入Tidb。对我来说,flink代码部分的问题是:
如
Flink的CEP库有可能在接收到的每个输入中发出数据吗?即使没有匹配的模式?例如,假设有这样的模式
pattern: ba
input stream: a b a a
expect output stream is: F F T F
默认行为如下:
_ _ T _
_ = times that there is not any output.
在我看来,最简单的解决方案是将输入与输出流连接( flink中没有任何左连接,我应该通过coFlatMap来准备它)并映射到输出(用值改变Nones并删除输入),但是我不知道它是否是一个好的解决方案(性能上的)。
我是Flink的新手,我正在使用Flink KeyedBroadCastProcessFunction进行模式评估,一些类似于(https://flink.apache.org/2019/06/26/broadcast-state.html)的东西,我正在使用JAVA开发我的代码,但我不知道如何处理异常,如果在处理数据流时发生任何故障,我搜索了很多次,但没有得到我最终在以下两个链接结束 Flink: what's the best way to handle exceptions inside Flink jobs Apache Flink - exception handling
假设我有两种不同类型的数据流,一种提供天气数据,另一种提供车辆数据,我想使用Flink对数据进行复杂的事件处理。
Flink 1.3.x中的哪种方法是正确的方法?我看到了不同的方法,如Union、Connect、Window Join。基本上,我只想尝试这样一个简单的CEP:
IF weather is wet AND vehicle speed > 60
WITHIN the last 10 seconds
THEN raise alert
谢谢!
flink流具有多个数据流,然后使用org.apache.flink.streaming.api.datastream.DataStream#union方法对这些数据流进行合并。然后,我得到了问题,数据流是混乱的,我不能设置窗口来对数据流中的数据进行排序。 Sorting union of streams to identify user sessions in Apache Flink 我得到了答案,但com.liam.learn.flink.example.union.UnionStreamDemo.SortFunction#onTimer从未被调用过。 环境信息: flink版本1.7
所以我用它安装了一个样例Flink项目。
我正在尝试使用这个模板,它让我可以开始编写一个Flink摄取应用程序,而不必担心依赖关系,但它适得其反。当我尝试与sbt同步时,我发现找不到Flink (不是要说谎,但这部分对我来说甚至是模糊的)。我想知道是否有人知道如何让我的项目找到Flink。使用这项技术非常令人兴奋。
Error while importing sbt project:
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; support was removed in 8.0
[
我正在尝试使用我的felipeogutierrez/explore-flink:1.11.1-scala_2.12镜像可用here到kubernetes集群配置中,就像它说here一样。我用maven编译了我的项目https://github.com/felipegutierrez/explore-flink,并用这个Dockerfile扩展了默认的flink图像flink:1.11.1-scala_2.12 FROM maven:3.6-jdk-8-slim AS builder
# get explore-flink job and compile it
COPY ./java/expl
我正在尝试写一些输出到S3使用电子病历与Flink。我使用的是Scala 2.11.7、Flink 1.3.2和EMR 5.11。但是,我得到了以下错误:
java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
at org.apache.fl
使用flink版本1.13.1
我写了一个自定义的度量报告,但在我的flink中似乎不起作用。启动flink时,JobManager显示警告日志,如下所示:
2021-08-25 14:54:06,243 WARN org.apache.flink.runtime.metrics.ReporterSetup [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available f
我目前正在使用Flink 1.7 +gcs-连接器库。我试图让StreamingFileSink写到GCS桶中,并遇到以下异常:
我遇到了一个Jira: --但我不清楚代码是否曾经合并过。
在需要做什么方面有什么帮助是非常感谢的,以使这一工作?
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
at org.apache.flink.runtime.fs.hdfs.Ha
我尝试在Flink (版本1.14.0)上使用可查询状态,但不幸的是,我一直收到以下错误: 2021-11-07 11:10:55,795 ERROR org.apache.flink.queryablestate.network.AbstractServerHandler [] - Error while handling request with ID 1
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
at java
我正在设置一个使用ZooKeeper和3个TaskManagers的JobManager HA集群。我在kubernetes中这样做,并且需要JM通过全主机名(hostname -f而不是hostname)来寻址to,因为它们是在有状态集中。 目前,似乎TMs将他们的地址报告给JM只是作为nostname… 2019-04-12 08:58:32,426 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-m