1.前言
本来想整理一下flink部署方案(standalone、yarn、k8)的,写的文档在公司内网,拿不到外网。姑且跳过部署方案,有部署相关问题的,可以评论或私聊给我。直接讲下Flink-Connector-Kafka吧。
2.概述
Flink提供Kafka连接器,用于读取和写入kafka topics。Flink Kafka消费者集成了Flink的检查点机制来提供Flink处理的exactly-once语义(注意:不是端到端保证)。为实现这一点,Flink并不依靠Kafka的消费者group offset跟踪,而是跟踪检查点内部offset。
Flink’s Kafkaconsumer称为(or 09 forKafka 0.9.0.x versions, etc. or justfor Kafka >= 1.0.0 versions)。可消费一个或多个topic。构造方法至少需要(3-4个参数),直接看代码(注:zookeeper.connect仅kafka0.8版本才需要)。
注:当然kafka consumer支持的所有配置项,这里都可以配置。
Propertiesproperties=newProperties();
properties.setProperty("bootstrap.servers","localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect","localhost:2181");
properties.setProperty("group.id","test");
DataStreamstream=env
.addSource(newFlinkKafkaConsumer08("topic",newSimpleStringSchema(),properties));
3.反序列化
Consumer从kafka读取到二进制的bytemessage,需要通过反序列化将其转换成Flink的Java/Scala objects,具体反序列化接口内容:
packageorg.apache.flink.api.common.serialization;
importorg.apache.flink.annotation.Public;
importorg.apache.flink.api.java.typeutils.ResultTypeQueryable;
importjava.io.IOException;
importjava.io.Serializable;
/**
* The deserialization schema describes how to turn the byte messages delivered by certain
* data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
* processed by Flink.
*
*
In addition, the DeserializationSchema describes the produced type ({@link#getProducedType()}),
* which lets Flink create internal serializers and structures to handle the type.
*
*
Note:In most cases, one should start from {@linkAbstractDeserializationSchema}, which
* takes care of producing the return type information automatically.
*
*
A DeserializationSchema must be {@linkSerializable} because its instances are often part of
* an operator or transformation function.
*
*@paramT>The type created by the deserialization schema.
*/
@Public
public interfaceDeserializationSchemaextendsSerializable,ResultTypeQueryable {
/**
* Deserializes the byte message.
*
*@parammessageThe message, as a byte array.
*
*@returnThe deserialized message as an object (null if the message cannot be deserialized).
*/
Tdeserialize(byte[] message)throwsIOException;
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
*@paramnextElementThe element to test for the end-of-stream signal.
*@returnTrue, if the element signals end of stream, false otherwise.
*/
booleanisEndOfStream(TnextElement);
}
4.这里穿插说明一下flink序列化与反序列化
flink-core默认提供一个最简单的字符序列化/反序列化类(默认:utf-8)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
packageorg.apache.flink.api.common.serialization;
importorg.apache.flink.annotation.PublicEvolving;
importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;
importorg.apache.flink.api.common.typeinfo.TypeInformation;
importjava.io.IOException;
importjava.io.ObjectOutputStream;
importjava.nio.charset.Charset;
importjava.nio.charset.StandardCharsets;
import staticorg.apache.flink.util.Preconditions.checkNotNull;
/**
* Very simple serialization schema for strings.
*
*
By default, the serializer uses "UTF-8" for string/byte conversion.
*/
@PublicEvolving
public classSimpleStringSchemaimplementsDeserializationSchema,SerializationSchema {
private static final longserialVersionUID=1L;
/** The charset to use to convert between strings and bytes.
* The field is transient because we serialize a different delegate object instead */
private transientCharsetcharset;
/**
* Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
*/
publicSimpleStringSchema() {
this(StandardCharsets.UTF_8);
}
/**
* Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
*
*@paramcharsetThe charset to use to convert between strings and bytes.
*/
publicSimpleStringSchema(Charset charset) {
this.charset=checkNotNull(charset);
}
/**
* Gets the charset used by this schema for serialization.
*@returnThe charset used by this schema for serialization.
*/
publicCharsetgetCharset() {
returncharset;
}
// ------------------------------------------------------------------------
// Kafka Serialization
// ------------------------------------------------------------------------
@Override
publicStringdeserialize(byte[] message) {
return newString(message,charset);
}
@Override
public booleanisEndOfStream(String nextElement) {
return false;
}
@Override
public byte[]serialize(String element) {
returnelement.getBytes(charset);
}
@Override
publicTypeInformationgetProducedType() {
returnBasicTypeInfo.STRING_TYPE_INFO;
}
// ------------------------------------------------------------------------
// Java Serialization
// ------------------------------------------------------------------------
private voidwriteObject(ObjectOutputStream out)throwsIOException {
out.defaultWriteObject();
out.writeUTF(charset.name());
}
private voidreadObject(java.io.ObjectInputStream in)throwsIOException,ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset= Charset.forName(charsetName);
}
}
官方demo也有个KafkaEventSchema类实现自定义KafkaEvent(POJO)序列化/反序列化类
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
packageorg.apache.flink.streaming.examples.kafka;
importorg.apache.flink.api.common.serialization.DeserializationSchema;
importorg.apache.flink.api.common.serialization.SerializationSchema;
importorg.apache.flink.api.common.typeinfo.TypeInformation;
importjava.io.IOException;
/**
* The serialization schema for the {@linkKafkaEvent} type. This class defines how to transform a
* Kafka record's bytes to a {@linkKafkaEvent}, and vice-versa.
*/
public classKafkaEventSchemaimplementsDeserializationSchema,SerializationSchema {
private static final longserialVersionUID=6154188370181669758L;
@Override
public byte[]serialize(KafkaEvent event) {
returnevent.toString().getBytes();
}
@Override
publicKafkaEventdeserialize(byte[] message)throwsIOException {
returnKafkaEvent.fromString(newString(message));
}
@Override
public booleanisEndOfStream(KafkaEvent nextElement) {
return false;
}
@Override
publicTypeInformationgetProducedType() {
returnTypeInformation.of(KafkaEvent.class);
}
}
总结:数据源不是固定数据格式的话,直接使用SimpleStringSchema即可。否则,可以自己实现固定数据格式对应的POJO。
5.KafkaConsumers Start Position Configuration
这个不用多说,直接看demo,一目了然
finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08myConsumer=newFlinkKafkaConsumer08(...);
myConsumer.setStartFromEarliest();// start fromthe earliest record possible
myConsumer.setStartFromLatest();// start fromthe latest record
myConsumer.setStartFromTimestamp(...);// start fromspecified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets();// the defaultbehaviour
DataStreamstream=env.addSource(myConsumer);
6.KafkaConsumers and Fault Tolerance
Flinkcheckpoint开启时,checkpoint以一致的方式将kafka offsets和其他operations状态一起保存。Job失败,Flink将存储最近一次的checkpoint,启动之后依据该checkpoint内kafka offsets,开始消费kafka数据。
finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
7.KafkaConsumers Topic and Partition Discovery
默认情况下,partitiondiscovery是关闭的。开启方法:在properties config设置非负值给discovery时间间隔(ms)
8.Kafka ConsumersOffset Committing Behaviour Configuration
Flink Kafka Consumer允许配置offsets的提交到kafka或者(zookeeperin 0.8)。这种方式提交的offsets并不用来保证容错性,仅用来暴露出consumer的消费情况。Offsets提交行为描述:未开启checkpoint,offsets提交依赖于consumer的自动提交((orfor Kafka0.8) /)开启checkpoint,offsets提交到checkpoint,setCommitOffsetsOnCheckpoints(boolean)默认是开启的。consumer的自动提交强制关闭,这一点见如下源码:
// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
// this overwrites whatever setting the user configured in the properties
if(offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS|| offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
}
9.Flink subtask与kafka partition对应算法(重要的一点)
这一点特别重要,官网没说明。先看下算法实现:
*@parampartitionthe Kafka partition
*@paramnumParallelSubtaskstotal number of parallel subtasks
*
*@returnindex of the target subtask that the Kafka partition should be assigned to.
*/
public static intassign(KafkaTopicPartition partition, intnumParallelSubtasks) {
intstartIndex = ((partition.getTopic().hashCode() *31) &0x7FFFFFFF) % numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the start index
return(startIndex + partition.getPartition()) % numParallelSubtasks;
}
其实算法也很简单,举个例子,topic有三个partition,flinkconsumerkafka并行度为3,那么:
partition0对应flinkconsumerkafkasubtask1;
partition1对应flinkconsumerkafkasubtask2;
partition2对应flinkconsumerkafkasubtask0;
我们都知道partition跟subtask都是从0开始计数的,个人觉得完全可以改成如下算法:
return(partition.getPartition()) % numParallelSubtasks;
最合适的做法就是flinkconsumerkafka并行度与kafka partition数相同,即能保证消费效率,也不浪费资源。
10总结
至此,Flink-connector-kafka的consumer用法基本介绍完成。有疑问欢迎沟通交流。
领取专属 10元无门槛券
私享最新 技术干货