我有一个场景,我正在运行一个星火流作业。这是在接收卡夫卡的数据。我所要做的就是从流中提取记录,并将它们放在本地。我还为它实现了偏移处理。消息的大小可以高达5MB。当我尝试使用0.4MB - 0.6MB文件时,作业运行得很好,但是当我尝试使用1.3MB文件运行时(这比默认的1MB更大),我将面临以下问题。
java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 9 for topic lms_uuid_test partition 0 start 5. This should not happen, and indicates that messages may have been lost
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
at org.apache.spark.util.NextIterator.isEmpty(NextIterator.scala:21)
at com.scb.BulkUpload.PortalConsumeOffset$$anonfun$createStreamingContext$1$$anonfun$apply$1.apply(PortalConsumeOffset.scala:94)
at com.scb.BulkUpload.PortalConsumeOffset$$anonfun$createStreamingContext$1$$anonfun$apply$1.apply(PortalConsumeOffset.scala:93)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)我尝试添加以下的卡夫卡消费者属性,希望更大的消息将被处理,但没有运气。
"send.buffer.bytes"->"5000000", "max.partition.fetch.bytes" -> "5000000", "consumer.fetchsizebytes" -> "5000000"我希望有人能帮我。提前谢谢。
发布于 2017-07-27 09:51:24
fetch.message.max.bytes --这将决定消费者可以获取的消息的最大大小。
属性名:fetch.message.max.bytes
每个获取请求中每个主题分区尝试获取的消息字节数。fetch请求大小必须至少与服务器允许的最大消息大小一样大,否则生产者可以发送比使用者所能获取的更大的消息。
例: Kafka生产者发送5 MB --> Kafka Broker sends /Stores 5MB --> Kafka Consumer接收5 MB
如果是,请将值设置为fetch.message.max.bytes=5242880,然后尝试使用它。
https://stackoverflow.com/questions/45345117
复制相似问题