如果我不能消费消息并想在5秒后重试,我需要添加睡眠语句。为此,我需要设置任何配置属性吗?
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, Object>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, Object>> record)
throws Exception {
while (record.hasNext()) {
ConsumerRecord<String, Object> consumerRecord = record.next();
boolean flag=false;
while(flag){
flag= processmessage(record.value())
if(!flag)
Thread.sleep(1000)
}
}
}
});
当前,我无法运行我的作业
发布于 2019-06-21 09:01:05
你可以在spark流媒体应用中使用睡眠。
但是等等,
Spark streaming作业运行微批处理,我们定义流间隔时间,通常以几秒为单位(可以是1s,2s,...等)。如果您在spark streaming代码中使用了睡眠,则将需要额外的时间来完成每个微批的运行。如果数据来得非常频繁,这可能会影响性能。
这完全取决于应用程序的需求,休眠是否会造成任何性能问题或延迟,或者如果数据在长时间间隔后到来,休眠可能不会产生影响。
希望这能有所帮助。
https://stackoverflow.com/questions/56661502
复制相似问题