原文翻译自 DZone,根据原文意译。
腾讯云流计算 Oceanus 是大数据实时化分析利器,兼容 Apache Flink 应用程序。新用户可以 1 元购买流计算 Oceanus(Flink) 集群,欢迎读者们体验使用。
两个最流行和发展最快的流处理框架是 Flink(自 2015 年以来)和 Kafka 的 Stream API(自 2016 年以来在 Kafka v0.10 中)。两者都是从 Apache 开源的,并迅速取代了 Spark Streaming——该领域的传统领导者。
在本文中,我将通过代码示例分享这两种流处理方法之间的主要区别。关于这个主题的文章很少涉及高级差异,例如[1]、[2]和[3],但通过代码示例提供的信息并不多。
在这篇文章中,我将解决一个简单的问题,并尝试在两个框架中提供代码并进行比较。在开始写代码之前,以下是我开始学习KStream 时的总结。
以下是本示例中的步骤:
static String TOPIC_IN = "Topic-IN";
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream(TOPIC_IN, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.reduce((value1, value2) -> value1 + value2)
.toStream()
.print(Printed.toSysOut());
Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
static String TOPIC_IN = "Topic-IN";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
kafkaConsumer.setStartFromLatest();
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
stream
.timeWindowAll(Time.seconds(5))
.reduce(new ReduceFunction<KafkaRecord>()
{
KafkaRecord result = new KafkaRecord();
@Override
public KafkaRecord reduce(KafkaRecord record1, KafkaRecord record2) throws Exception
{
result.key = record1.key;
result.value = record1.value + record2.value;
return result;
}
})
.print();
System.out.println( env.getExecutionPlan() );
env.execute();
groupByKey()
的情况下不能使用window()
; 而 Flink 提供了timeWindowAll()
可以在没有 Key 的情况下处理流中所有记录的方法。KafkaDeserializationSchema<T>
来读取 Key 和Value。如果您对 Key 不感兴趣,那么您可以将其new SimpleStringSchema()
用作FlinkKafkaConsumer<>
构造函数的第二个参数。我的MySchema
的实现可在 Github 上找到。toStream()
后才能将结果打印到控制台,而 Flink 可以直接打印结果。以下是本例中的步骤
static String TOPIC_IN = "Topic-IN";
static String TOPIC_OUT = "Topic-OUT";
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream(TOPIC_IN, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).grace(Duration.ofMillis(500)))
.reduce((value1, value2) -> value1 + value2)
.toStream()
.to(TOPIC_OUT);
Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
static String TOPIC_IN = "Topic-IN" ;
static String TOPIC_OUT = "Topic-OUT" ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
kafkaConsumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<KafkaRecord>()
{
@Override
public long extractAscendingTimestamp(KafkaRecord record)
{
return record.timestamp;
}
});
// define kafka producer using Flink API.
KafkaSerializationSchema<String> serializationSchema = (value, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT, value.getBytes());
FlinkKafkaProducer<String> kafkaProducer =
new FlinkKafkaProducer<String>(TOPIC_OUT,
serializationSchema,
prodProps,
Semantic.EXACTLY_ONCE);
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
stream
.keyBy(record -> record.key)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.milliseconds(500))
.reduce(new ReduceFunction<String>()
{
@Override
public String reduce(String value1, String value2) throws Exception
{
return value1+value2;
}
})
.addSink(kafkaProducer);
env.execute();
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。