Flink-Connector-Kafka(一)

1.前言

本来想整理一下flink部署方案(standalone、yarn、k8)的,写的文档在公司内网,拿不到外网。姑且跳过部署方案,有部署相关问题的,可以评论或私聊给我。直接讲下Flink-Connector-Kafka吧。

2.概述

Flink提供Kafka连接器,用于读取和写入kafka topics。Flink Kafka消费者集成了Flink的检查点机制来提供Flink处理的exactly-once语义(注意:不是端到端保证)。为实现这一点,Flink并不依靠Kafka的消费者group offset跟踪,而是跟踪检查点内部offset。

Flink’s Kafkaconsumer称为(or 09 forKafka 0.9.0.x versions, etc. or justfor Kafka >= 1.0.0 versions)。可消费一个或多个topic。构造方法至少需要(3-4个参数),直接看代码(注:zookeeper.connect仅kafka0.8版本才需要)。

注:当然kafka consumer支持的所有配置项,这里都可以配置。

Propertiesproperties=newProperties();

properties.setProperty("bootstrap.servers","localhost:9092");

// only required for Kafka 0.8

properties.setProperty("zookeeper.connect","localhost:2181");

properties.setProperty("group.id","test");

DataStreamstream=env

.addSource(newFlinkKafkaConsumer08("topic",newSimpleStringSchema(),properties));

3.反序列化

Consumer从kafka读取到二进制的bytemessage,需要通过反序列化将其转换成Flink的Java/Scala objects,具体反序列化接口内容:

packageorg.apache.flink.api.common.serialization;

importorg.apache.flink.annotation.Public;

importorg.apache.flink.api.java.typeutils.ResultTypeQueryable;

importjava.io.IOException;

importjava.io.Serializable;

/**

* The deserialization schema describes how to turn the byte messages delivered by certain

* data sources (for example Apache Kafka) into data types (Java/Scala objects) that are

* processed by Flink.

*

*

In addition, the DeserializationSchema describes the produced type ({@link#getProducedType()}),

* which lets Flink create internal serializers and structures to handle the type.

*

*

Note:In most cases, one should start from {@linkAbstractDeserializationSchema}, which

* takes care of producing the return type information automatically.

*

*

A DeserializationSchema must be {@linkSerializable} because its instances are often part of

* an operator or transformation function.

*

*@paramT>The type created by the deserialization schema.

*/

@Public

public interfaceDeserializationSchemaextendsSerializable,ResultTypeQueryable {

/**

* Deserializes the byte message.

*

*@parammessageThe message, as a byte array.

*

*@returnThe deserialized message as an object (null if the message cannot be deserialized).

*/

Tdeserialize(byte[] message)throwsIOException;

/**

* Method to decide whether the element signals the end of the stream. If

* true is returned the element won't be emitted.

*

*@paramnextElementThe element to test for the end-of-stream signal.

*@returnTrue, if the element signals end of stream, false otherwise.

*/

booleanisEndOfStream(TnextElement);

}

4.这里穿插说明一下flink序列化与反序列化

flink-core默认提供一个最简单的字符序列化/反序列化类(默认:utf-8)

/*

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

packageorg.apache.flink.api.common.serialization;

importorg.apache.flink.annotation.PublicEvolving;

importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;

importorg.apache.flink.api.common.typeinfo.TypeInformation;

importjava.io.IOException;

importjava.io.ObjectOutputStream;

importjava.nio.charset.Charset;

importjava.nio.charset.StandardCharsets;

import staticorg.apache.flink.util.Preconditions.checkNotNull;

/**

* Very simple serialization schema for strings.

*

*

By default, the serializer uses "UTF-8" for string/byte conversion.

*/

@PublicEvolving

public classSimpleStringSchemaimplementsDeserializationSchema,SerializationSchema {

private static final longserialVersionUID=1L;

/** The charset to use to convert between strings and bytes.

* The field is transient because we serialize a different delegate object instead */

private transientCharsetcharset;

/**

* Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.

*/

publicSimpleStringSchema() {

this(StandardCharsets.UTF_8);

}

/**

* Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.

*

*@paramcharsetThe charset to use to convert between strings and bytes.

*/

publicSimpleStringSchema(Charset charset) {

this.charset=checkNotNull(charset);

}

/**

* Gets the charset used by this schema for serialization.

*@returnThe charset used by this schema for serialization.

*/

publicCharsetgetCharset() {

returncharset;

}

// ------------------------------------------------------------------------

// Kafka Serialization

// ------------------------------------------------------------------------

@Override

publicStringdeserialize(byte[] message) {

return newString(message,charset);

}

@Override

public booleanisEndOfStream(String nextElement) {

return false;

}

@Override

public byte[]serialize(String element) {

returnelement.getBytes(charset);

}

@Override

publicTypeInformationgetProducedType() {

returnBasicTypeInfo.STRING_TYPE_INFO;

}

// ------------------------------------------------------------------------

// Java Serialization

// ------------------------------------------------------------------------

private voidwriteObject(ObjectOutputStream out)throwsIOException {

out.defaultWriteObject();

out.writeUTF(charset.name());

}

private voidreadObject(java.io.ObjectInputStream in)throwsIOException,ClassNotFoundException {

in.defaultReadObject();

String charsetName = in.readUTF();

this.charset= Charset.forName(charsetName);

}

}

官方demo也有个KafkaEventSchema类实现自定义KafkaEvent(POJO)序列化/反序列化类

/*

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

packageorg.apache.flink.streaming.examples.kafka;

importorg.apache.flink.api.common.serialization.DeserializationSchema;

importorg.apache.flink.api.common.serialization.SerializationSchema;

importorg.apache.flink.api.common.typeinfo.TypeInformation;

importjava.io.IOException;

/**

* The serialization schema for the {@linkKafkaEvent} type. This class defines how to transform a

* Kafka record's bytes to a {@linkKafkaEvent}, and vice-versa.

*/

public classKafkaEventSchemaimplementsDeserializationSchema,SerializationSchema {

private static final longserialVersionUID=6154188370181669758L;

@Override

public byte[]serialize(KafkaEvent event) {

returnevent.toString().getBytes();

}

@Override

publicKafkaEventdeserialize(byte[] message)throwsIOException {

returnKafkaEvent.fromString(newString(message));

}

@Override

public booleanisEndOfStream(KafkaEvent nextElement) {

return false;

}

@Override

publicTypeInformationgetProducedType() {

returnTypeInformation.of(KafkaEvent.class);

}

}

总结:数据源不是固定数据格式的话,直接使用SimpleStringSchema即可。否则,可以自己实现固定数据格式对应的POJO。

5.KafkaConsumers Start Position Configuration

这个不用多说,直接看demo,一目了然

finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08myConsumer=newFlinkKafkaConsumer08(...);

myConsumer.setStartFromEarliest();// start fromthe earliest record possible

myConsumer.setStartFromLatest();// start fromthe latest record

myConsumer.setStartFromTimestamp(...);// start fromspecified epoch timestamp (milliseconds)

myConsumer.setStartFromGroupOffsets();// the defaultbehaviour

DataStreamstream=env.addSource(myConsumer);

6.KafkaConsumers and Fault Tolerance

Flinkcheckpoint开启时,checkpoint以一致的方式将kafka offsets和其他operations状态一起保存。Job失败,Flink将存储最近一次的checkpoint,启动之后依据该checkpoint内kafka offsets,开始消费kafka数据。

finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);

7.KafkaConsumers Topic and Partition Discovery

默认情况下,partitiondiscovery是关闭的。开启方法:在properties config设置非负值给discovery时间间隔(ms)

8.Kafka ConsumersOffset Committing Behaviour Configuration

Flink Kafka Consumer允许配置offsets的提交到kafka或者(zookeeperin 0.8)。这种方式提交的offsets并不用来保证容错性,仅用来暴露出consumer的消费情况。Offsets提交行为描述:未开启checkpoint,offsets提交依赖于consumer的自动提交((orfor Kafka0.8) /)开启checkpoint,offsets提交到checkpoint,setCommitOffsetsOnCheckpoints(boolean)默认是开启的。consumer的自动提交强制关闭,这一点见如下源码:

// make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;

// this overwrites whatever setting the user configured in the properties

if(offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS|| offsetCommitMode == OffsetCommitMode.DISABLED) {

properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

}

9.Flink subtask与kafka partition对应算法(重要的一点)

这一点特别重要,官网没说明。先看下算法实现:

*@parampartitionthe Kafka partition

*@paramnumParallelSubtaskstotal number of parallel subtasks

*

*@returnindex of the target subtask that the Kafka partition should be assigned to.

*/

public static intassign(KafkaTopicPartition partition, intnumParallelSubtasks) {

intstartIndex = ((partition.getTopic().hashCode() *31) &0x7FFFFFFF) % numParallelSubtasks;

// here, the assumption is that the id of Kafka partitions are always ascending

// starting from 0, and therefore can be used directly as the offset clockwise from the start index

return(startIndex + partition.getPartition()) % numParallelSubtasks;

}

其实算法也很简单,举个例子,topic有三个partition,flinkconsumerkafka并行度为3,那么:

partition0对应flinkconsumerkafkasubtask1;

partition1对应flinkconsumerkafkasubtask2;

partition2对应flinkconsumerkafkasubtask0;

我们都知道partition跟subtask都是从0开始计数的,个人觉得完全可以改成如下算法:

return(partition.getPartition()) % numParallelSubtasks;

最合适的做法就是flinkconsumerkafka并行度与kafka partition数相同,即能保证消费效率,也不浪费资源。

10总结

至此,Flink-connector-kafka的consumer用法基本介绍完成。有疑问欢迎沟通交流。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181213G1EX3800?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

同媒体快讯

扫码关注云+社区

领取腾讯云代金券