腾讯云
开发者社区
文档
建议反馈
控制台
首页
学习
活动
专区
工具
TVP
最新优惠活动
文章/答案/技术大牛
搜索
搜索
关闭
发布
登录/注册
精选内容/技术社群/优惠产品,
尽在小程序
立即前往
文章
问答
(1512)
视频
沙龙
1
回答
如
何在
flink
中
读取
前
N
条
kafka
消息
?
java
、
apache-kafka
、
apache-flink
我正在使用
flink
构建一个管道,它的来源是
kafka
。为了测试,我只想
读取
来自
kafka
的
前
N
条
消息
,然后需要停止流。 我该怎么做呢?我使用的是FlinkKafkaConsumer08。
浏览 15
提问于2019-02-15
得票数 0
回答已采纳
1
回答
手动重置
kafka
偏移量时,
Flink
偏移量进入不一致状态
apache-kafka
、
flink-streaming
我们有一个
flink
流应用程序,从卡夫卡
读取
消息
。由于某些原因,我们不得不从
kafka
重置命令将
kafka
偏移重置为最新,因为有大量堆积。我们希望
flink
应用程序跳过所有这些
消息
,并从重置后出现的新
消息
开始。问题是因为
flink
在内部管理它的偏移量,它不知道这个重置,它现在只从后向
读取
msg (重置
前
的偏移点),现在也不能提交偏移量。因此,每次重启
flink
应用程序时,它都会再次
浏览 62
提问于2021-06-19
得票数 4
回答已采纳
1
回答
如
何在
Flink
中
跳过损坏的
消息
?
apache-flink
如
何在
Flink
中
跳过损坏的
消息
我有DAG: KafkaSrcConsumer > FlatMap > Window > SinkFunction 现在,如果我在操作符"KafkaSrcConsumer“
中
从
Kafka
获得corruptedMessage,我想抛出/跳过该
消息
,并且不想将损坏的
消息
转发给下一个操作符"FlatMap”。我们如
何在
Apache
Flink
浏览 27
提问于2019-04-05
得票数 0
回答已采纳
1
回答
流在Hadoop
中
的应用
hadoop
、
spark-streaming
、
apache-flink
、
flink-streaming
、
flink-cep
在这个例子
中
,我确实看到他们创建了一个单独的应用程序(类似于流应用程序),该应用程序生成和消费数据,并在数据上应用模式匹配。他们现在还没有在.Till(
如
卡夫卡)之间放置一个流层,单应用程序就足以满足这一目的,这使得它非常优化。现在,我知道,如果我使用
Kafka
,那么我需要两个应用程序;一个用于将数据摄取到
Kafka
主题中,另一个用于消费
Kafka
主题的数据。我有几个问题我没有得到回答: 什么时候?
浏览 2
提问于2016-06-13
得票数 0
0
回答
Apache
Flink
1.3.2与
Kafka
1.1.0的连接问题
apache-flink
、
flink-streaming
、
flink-cep
我使用的是Apache
Flink
1.3.2集群。我们正在使用
Kafka
消息
,自从将代理升级到1.1.0 (从0.10.2)以来,我们经常在日志中注意到这个错误:由于这个原因,我们有时会在处理过程
中
遇到
浏览 32
提问于2018-07-21
得票数 0
回答已采纳
1
回答
Flink
读到
Kafka
,在某些情况下,消费速度急剧下降。
java
、
apache-kafka
、
apache-flink
我们有一个
Flink
作业(
Flink
版本: 1.9),它通过键连接两个
kafka
源,对于每个键,启动一个5分钟的定时器,
消息
被缓存在
Flink
状态,当定时器结束时,将具有相同键的
消息
合并到一个胖
消息
中
(通常每个键有1~5
条
消息
)并将其发送给
kafka
。 source1 (160个分区,每分钟20~3000万
条
浏览 1
提问于2021-06-04
得票数 0
回答已采纳
1
回答
使用Apache流处理缓冲转换
消息
(例如,1000计数)
apache-flink
、
stream-processing
在订阅了来自源(ex:
Kafka
、AWS Kinesis数据流)的
消息
,然后使用
Flink
运算符对流数据应用转换、聚合等之后,我希望缓冲最终
消息
(计数为ex:1000),并在单个请求中将每个批发送到外部如
何在
Apache
中
实现缓冲机制(将每1000
条
记录作为一个批处理创建)?
Flink
pipileine:流源->转换/减少使用操作符->缓冲区1000
条
消息
-> post到REST 感谢
浏览 4
提问于2019-11-07
得票数 1
回答已采纳
2
回答
从
Kafka
读到
Flink
Scala Shell
scala
、
apache-kafka
、
apache-flink
、
scala-shell
我正在尝试连接到本地机器上的
Kafka
(2.1),并在随
Flink
(1.7.2)附带的shell
中
阅读。我正在做的事情是::require
flink
-connector-
kafka
-base_2.11-1.7.1.jar import org.apache.
flink
.streaming.connectors.
kafka
.FlinkK
浏览 0
提问于2019-03-11
得票数 1
回答已采纳
1
回答
无法使用
Flink
和Gelly实现高CPU利用率
benchmarking
、
cpu-usage
、
apache-flink
、
gelly
我已经用
Flink
streaming做了一段时间的实验,使用了像Yahoo streaming benchmark:这样的基准测试,它应该给系统带来压力,但我从来没有达到令人满意的CPU利用率--事实上最近,我开始使用Gelly,
Flink
的Graph API,使用一些提供的示例算法(例如Pagerank),批处理数据集从数万到数亿个顶点。
浏览 15
提问于2018-01-18
得票数 4
1
回答
从
kafka
到redis的
flink
管道
java
、
redis
、
apache-kafka
、
apache-flink
我使用
flink
从
kafka
中
读取
并写入到redis
中
。 为了测试,我只想阅读来自
kafka
的
前
10
条
消息
。public boolean isEndOfStream(String nextElement) {
浏览 21
提问于2019-02-13
得票数 2
回答已采纳
3
回答
Scala和apache
flink
新手,为什么我的map函数可以正确运行REPL,但
Flink
失败
scala
、
kafka-consumer-api
、
apache-flink
、
flink-streaming
我正在尝试(未成功)在Apache
Flink
中
运行一个简单的hello world类型的程序。代码从Apache
Kafka
获取一
条
消息
,并添加一个“。并将新字符串打印到stdout。代码正确地从
Kafka
获取
消息
,但是map函数添加了".“失败。我在REPL提示符下尝试了这个函数,scala代码在那里可以正常工作。: String = hello scala> val output = input.flatMap(value => value
浏览 1
提问于2016-11-22
得票数 0
2
回答
有趣的
Flink
问题--如果任务管理器失败了,那么如何恢复
Flink
中
的状态以保证只处理一次?
java
、
apache-kafka
、
apache-flink
、
flink-streaming
嗨,我是
Flink
的新手,并试图用下面的场景找出一些最佳实践:我们的目标是过滤掉任何存在的重复记录,这样我们在输出
Kafka
主题中就没有重复的
消息
了。问题是当我在处理文件的过程
中
取消任务管理器时,为了模拟失败,一旦文件重新启动,
浏览 3
提问于2022-09-19
得票数 0
6
回答
Kafka
控制台消费者:如何只获取一个主题的最后
N
条
消息
,而不是从头开始获取所有
消息
?
apache-kafka
/bin/
kafka
-avro-console-consumer --zookeeper 10.0.0.225:2181/
kafka
--topic myTopic --property schema.registry.url我宁愿只得到最后的
N
个。我如何使用
kafka
控制台消费者做到这一点?
浏览 2
提问于2016-08-17
得票数 29
3
回答
关于
Flink
反序列化的两个问题
java
、
json
、
deserialization
、
apache-flink
、
json-ld
我是
Flink
和集群计算的新手。我花了一整天的时间试图正确地解析
Flink
,一
条
来自
Kafka
的愚蠢的流,没有结果:这有点令人沮丧.在
kafka
中
,我用一个字符串键标识了一个JSON-LD
消息
流。我只想在
Flink
中
检索它们,然后用不同的键分离
消息
。我试过所有的去序列化器,但都没有用。(DataInputDeseriali
浏览 8
提问于2017-06-12
得票数 1
1
回答
Spark Structured Streaming:以批量查询的方式
读取
kafka
主题中的
前
N
条
消息
apache-kafka
、
spark-structured-streaming
我有一个要求,我想要采样一个
kafka
主题(用于检查其数据质量等),然后触发到它上的流作业。采样的参数之一可以是
消息
的数量。我引用了"http://spark.apache.org/docs/latest/structured-streaming-
kafka
-integration.html#creating-a-
kafka
-source-for-batch-queries不可能从其中
读取
前
N
条</em
浏览 32
提问于2020-07-27
得票数 1
1
回答
使用Apache
Flink
进行数据流
apache-flink
、
flink-streaming
我正在构建一个要求低于要求的应用程序,我刚刚开始使用
flink
。摄入卡夫卡的数据,比如50个分区(输入速率- 100,000毫希/秒) 我可以使用Redi
浏览 2
提问于2016-06-10
得票数 3
1
回答
无法在
Flink
新
Kafka
消费者api的检查点上向
Kafka
提交消费抵消(1.14)
apache-flink
、
flink-streaming
在应用程序非常新的开始时,必须从检查点上卡夫卡主题的最新偏移量
中
读取
,它必须在重启后(当应用程序手动/系统错误终止时)将所消耗的偏移量提交给
Kafka
,它必须从上次提交的偏移量中选择,并且必须使用消费者延迟由于上次使用的
消息
未提交,因此将
读取
两次(重复)。遵循的步骤 设置了
Kafk
浏览 10
提问于2021-12-16
得票数 2
1
回答
不触发
flink
cep sql事件
apache-flink
、
flink-sql
、
flink-cep
我在
Flink
SQL中使用CEP模式,它正在按预期工作,连接到
Kafka
broker。但是当我连接到基于集群的云
kafka
设置时,
Flink
CEP并没有触发。然后,我以json格式发送
消息
,
如
{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":&quo
浏览 1
提问于2021-10-25
得票数 0
回答已采纳
1
回答
可以使用
Kafka
传输文件吗?
apache-kafka
、
kafka-consumer-api
、
kafka-producer-api
我每天都有数千个文件生成,我想使用
Kafka
进行流式传输。当我尝试
读取
文件时,每一行都被视为一
条
单独的
消息
。我想知道如
何在
Kafka
主题中将每个文件的内容作为一
条
消息
,并与消费者一起将
Kafka
主题中的每个
消息
写在一个单独的文件
中
。
浏览 0
提问于2016-08-24
得票数 11
2
回答
Flink
SQL凭空创建了一些东西
apache-flink
、
flink-streaming
、
flink-sql
、
data-generation
早些时候,我问
Flink
是否可以,答案是肯定的。现在,我将更多地研究
Flink
SQL的功能。在SQL
中
,这种类型的挑战有时很容易(例如,SELECT 1在常规引擎(
如
MySQL)
中
工作),但有时也是不可能的,例如。为了简单起见:假设我希望每秒至少生成一
条
消息
,并且不介意里面是什么。 一个简单的select语句不会起到这个作用,因为当没有选择时,您不会得到任何输出。除了窗口之外,我在
Flink
SQL中看不到有时间概念的任何东
浏览 0
提问于2021-03-27
得票数 1
点击加载更多
扫码
添加站长 进交流群
领取专属
10元无门槛券
手把手带您无忧上云
相关
资讯
当Flink遇到Kafka-FlinkKafkaConsumer使用详解
不惧流量持续上涨,BIGO 借助 Flink 与 Pulsar 打造实时消息系统
Flink 支持哪些数据源和数据接收器?
日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践
流计算框架 Flink与Storm 的性能对比
热门
标签
更多标签
云服务器
即时通信 IM
ICP备案
对象存储
实时音视频
活动推荐
运营活动
广告
关闭
领券