首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka火花流增加消息大小

Kafka火花流增加消息大小
EN

Stack Overflow用户
提问于 2017-07-27 08:20:20
回答 1查看 2.9K关注 0票数 0

我有一个场景,我正在运行一个星火流作业。这是在接收卡夫卡的数据。我所要做的就是从流中提取记录,并将它们放在本地。我还为它实现了偏移处理。消息的大小可以高达5MB。当我尝试使用0.4MB - 0.6MB文件时,作业运行得很好,但是当我尝试使用1.3MB文件运行时(这比默认的1MB更大),我将面临以下问题。

代码语言:javascript
运行
复制
java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 9 for topic lms_uuid_test partition 0 start 5. This should not happen, and indicates that messages may have been lost
    at scala.Predef$.assert(Predef.scala:179)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
    at org.apache.spark.util.NextIterator.isEmpty(NextIterator.scala:21)
    at com.scb.BulkUpload.PortalConsumeOffset$$anonfun$createStreamingContext$1$$anonfun$apply$1.apply(PortalConsumeOffset.scala:94)
    at com.scb.BulkUpload.PortalConsumeOffset$$anonfun$createStreamingContext$1$$anonfun$apply$1.apply(PortalConsumeOffset.scala:93)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我尝试添加以下的卡夫卡消费者属性,希望更大的消息将被处理,但没有运气。

代码语言:javascript
运行
复制
"send.buffer.bytes"->"5000000", "max.partition.fetch.bytes" -> "5000000", "consumer.fetchsizebytes" -> "5000000"

我希望有人能帮我。提前谢谢。

EN

Stack Overflow用户

回答已采纳

发布于 2017-07-27 09:51:24

fetch.message.max.bytes --这将决定消费者可以获取的消息的最大大小。

属性名:fetch.message.max.bytes

每个获取请求中每个主题分区尝试获取的消息字节数。fetch请求大小必须至少与服务器允许的最大消息大小一样大,否则生产者可以发送比使用者所能获取的更大的消息。

例: Kafka生产者发送5 MB --> Kafka Broker sends /Stores 5MB --> Kafka Consumer接收5 MB

如果是,请将值设置为fetch.message.max.bytes=5242880,然后尝试使用它。

票数 1
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45345117

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档