专栏首页实时计算替代Flume——Kafka Connect简介

替代Flume——Kafka Connect简介

我们知道过去对于Kafka的定义是分布式,分区化的,带备份机制的日志提交服务。也就是一个分布式的消息队列,这也是他最常见的用法。但是Kafka不止于此,打开最新的官网。

我们看到Kafka最新的定义是:Apache Kafka® is a distributed streaming platform

分布式流处理平台。

这里也清晰的描述了Kafka的特点:Kafka用于构建实时数据管道和流式应用程序。它具有水平可扩展性、容错性、速度极快,并在数千家公司投入生产。

所以现在的Kafka已经不仅是一个分布式的消息队列,更是一个流处理平台。这源于它于0.9.0.0和0.10.0.0引入的两个全新的组件Kafka Connect与Kafka Streaming。

Kafka Connect简介

我们知道消息队列必须存在上下游的系统,对消息进行搬入搬出。比如经典的日志分析系统,通过flume读取日志写入kafka,下游由storm进行实时的数据处理。

Kafka Connect的作用就是替代Flume,让数据传输这部分工作可以由Kafka Connect来完成。Kafka Connect是一个用于在Apache Kafka和其他系统之间可靠且可靠地传输数据的工具。它可以快速地将大量数据集合移入和移出Kafka。

Kafka Connect的导入作业可以将数据库或从应用程序服务器收集的数据传入到Kafka,导出作业可以将Kafka中的数据传递到查询系统,也可以传输到批处理系统以进行离线分析。

Kafka Connect功能包括:

  • 一个通用的Kafka连接的框架 - Kafka Connect规范化了其他数据系统与Kafka的集成,简化了连接器开发,部署和管理
  • 分布式和独立模式 - 支持大型分布式的管理服务,也支持小型生产环境的部署
  • REST界面 - 通过易用的REST API提交和管理Kafka Connect
  • 自动偏移管理 - 只需从连接器获取一些信息,Kafka Connect就可以自动管理偏移量提交过程,因此连接器开发人员无需担心连接器开发中偏移量提交这部分的开发
  • 默认情况下是分布式和可扩展的 - Kafka Connect构建在现有的组管理协议之上。可以添加扩展集群
  • 流媒体/批处理集成 - 利用Kafka现有的功能,Kafka Connect是桥接流媒体和批处理数据系统的理想解决方案

运行Kafka Connect

Kafka Connect目前支持两种运行模式:独立和集群。

独立模式

在独立模式下,只有一个进程,这种更容易设置和使用。但是没有容错功能。

启动:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
独立模式配置

第一个参数config/connect-standalone.properties是一些基本的配置:

这几个在独立和集群模式下都需要设置:

#bootstrap.servers   kafka集群列表
bootstrap.servers=localhost:9092
#key.converter       key的序列化转换器  比如json的  key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter     value的序列化转换器
value.converter=org.apache.kafka.connect.json.JsonConverter

#独立模式特有的配置:
#offset.storage.file.filename       用于存储偏移量的文件
offset.storage.file.filename =/home/kafka/connect.offsets
独立模式连接器配置(配置文件)

后面的参数connector1.properties [connector2.properties ...] 可以多个,是连接器配置内容

这里我们配置一个从文件读取数据并存入kafka的配置:

connect-file-sink.properties

  • name - 连接器的唯一名称。尝试再次使用相同名称注册将失败。
  • connector.class - 连接器的Java类 此连接器的类的全名或别名。这里我们选择FileStreamSink
  • tasks.max - 应为此连接器创建的最大任务数。如果连接器无法达到此级别的并行性,则可能会创建更少的任务。
  • key.converter - (可选)覆盖worker设置的默认密钥转换器。
  • value.converter - (可选)覆盖worker设置的默认值转换器。 下面两个必须设置一个:
    • topics - 以逗号分隔的主题列表,用作此连接器的输入
    • topics.regex - 用作此连接器输入的主题的Java正则表达式
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

可以在连接器中配置转换器

需要指定参数:

  • transforms - 转换的别名列表,指定将应用转换的顺序。
  • transforms.$alias.type - 转换的完全限定类名。
  • transforms.$alias.$transformationSpecificConfig 转换的配置属性

例如,我们把刚才的文件转换器的内容添加字段

首先设置connect-standalone.properties

key.converter.schemas.enable = false
value.converter.schemas.enable = false

设置connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

没有转换前的结果:

"foo"
"bar"
"hello world"

转换后:

{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

常用转换类型:

  • InsertField - 使用静态数据或记录元数据添加字段
  • ReplaceField - 过滤或重命名字段
  • MaskField - 用类型的有效空值替换字段(0,空字符串等)
  • ValueToKey Value转换为Key
  • HoistField - 将整个事件作为单个字段包装在Struct或Map中
  • ExtractField - 从Struct和Map中提取特定字段,并在结果中仅包含此字段
  • SetSchemaMetadata - 修改架构名称或版本
  • TimestampRouter - 根据原始主题和时间戳修改记录主题
  • RegexRouter - 根据原始主题,替换字符串和正则表达式修改记录主题

集群模式

集群模式下,可以扩展,容错。

启动:
> bin/connect-distributed.sh config/connect-distributed.properties

在集群模式下,Kafka Connect在Kafka主题中存储偏移量,配置和任务状态。

集群模式配置

connect-distributed.properties

#也需要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#还有一些配置要注意
#group.id(默认connect-cluster) - Connect的组id 请注意,这不得与使用者的组id 冲突
group.id=connect-cluster

#用于存储偏移的主题; 此主题应具有许多分区
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

#用于存储连接器和任务配置的主题  只能一个分区
config.storage.topic=connect-configs
config.storage.replication.factor=1

#用于存储状态的主题; 此主题可以有多个分区
status.storage.topic=connect-status
status.storage.replication.factor=1

在集群模式下,配置并不会在命令行传进去,而是需要REST API来创建,修改和销毁连接器。

集群模式连接器配置(REST API)

可以配置REST API服务器,支持http与https

listeners=http://localhost:8080,https://localhost:8443

默认情况下,如果未listeners指定,则REST服务器使用HTTP协议在端口8083上运行。

以下是当前支持的REST API:

  • GET /connectors - 返回活动连接器列表
  • POST /connectors - 创建一个新的连接器; 请求主体应该是包含字符串name字段的JSON对象和包含config连接器配置参数的对象字段
  • GET /connectors/{name} - 获取有关特定连接器的信息
  • GET /connectors/{name}/config - 获取特定连接器的配置参数
  • PUT /connectors/{name}/config - 更新特定连接器的配置参数
  • GET /connectors/{name}/status - 获取连接器的当前状态,包括它是否正在运行,失败,暂停等,分配给哪个工作人员,错误信息(如果失败)以及所有任务的状态
  • GET /connectors/{name}/tasks - 获取当前为连接器运行的任务列表
  • GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括它是否正在运行,失败,暂停等,分配给哪个工作人员,以及错误信息是否失败
  • PUT /connectors/{name}/pause - 暂停连接器及其任务,这将停止消息处理,直到恢复连接器
  • PUT /connectors/{name}/resume - 恢复暂停的连接器(如果连接器未暂停,则不执行任何操作)
  • POST /connectors/{name}/restart - 重新启动连接器(通常是因为它已经失败)
  • POST /connectors/{name}/tasks/{taskId}/restart - 重启个别任务(通常因为失败)
  • DELETE /connectors/{name} - 删除连接器,暂停所有任务并删除其配置

连接器开发指南

kakfa允许开发人员自己去开发一个连接器。

核心概念

要在Kafka和其他系统之间复制数据,用户需要创建一个Connector

Connector有两种形式:

SourceConnectors从另一个系统导入数据,例如,JDBCSourceConnector将关系数据库导入Kafka

SinkConnectors导出数据,例如,HDFSSinkConnector将Kafka主题的内容导出到HDFS文件

和对应的Task:

SourceTaskSinkTask

Task形成输入输出流,开发Task要注意偏移量的问题。

每个流应该是一系列键值记录。还需要定期提交已处理的数据的偏移量,以便在发生故障时,处理可以从上次提交的偏移量恢复。Connector还需要是动态的,实现还负责监视外部系统是否存在任何更改。

开发一个简单的连接器

开发连接器只需要实现两个接口,即ConnectorTask

这里我们简单开发一个FileStreamConnector。

此连接器是为在独立模式下使用,SourceConnector/ SourceTask读取文件的每一行,SinkConnector/ SinkTask每个记录写入一个文件。

连接器示例:

继承SourceConnector,添加字段(要读取的文件名和要将数据发送到的主题)

public class FileStreamSourceConnector extends SourceConnector {
    private String filename;
    private String topic;

定义实际读取数据的类

@Override
public Class<? extends Task> taskClass() {
    return FileStreamSourceTask.class;
}

FileStreamSourceTask下面定义该类。接下来,我们添加一些标准的生命周期方法,start()stop()

@Override
public void start(Map<String, String> props) {
    // The complete version includes error handling as well.
    filename = props.get(FILE_CONFIG);
    topic = props.get(TOPIC_CONFIG);
}
 
@Override
public void stop() {
    // Nothing to do since no background monitoring is required.
}

最后,实施的真正核心在于taskConfigs()

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    // Only one input stream makes sense.
    Map<String, String> config = new HashMap<>();
    if (filename != null)
        config.put(FILE_CONFIG, filename);
    config.put(TOPIC_CONFIG, topic);
    configs.add(config);
    return configs;
}

任务示例:

源任务

实现SourceTask 创建FileStreamSourceTask继承SourceTask

public class FileStreamSourceTask extends SourceTask {
    String filename;
    InputStream stream;
    String topic;
 
    @Override
    public void start(Map<String, String> props) {
        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = openOrThrowError(filename);
        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
    }
 
    @Override
    public synchronized void stop() {
        stream.close();
    }

接下来,我们实现任务的主要功能,即poll()从输入系统获取事件并返回以下内容的方法List

@Override
public List<SourceRecord> poll() throws InterruptedException {
    try {
        ArrayList<SourceRecord> records = new ArrayList<>();
        while (streamValid(stream) && records.isEmpty()) {
            LineAndOffset line = readToNextLine(stream);
            if (line != null) {
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
            } else {
                Thread.sleep(1);
            }
        }
        return records;
    } catch (IOException e) {
        // Underlying stream was killed, probably as a result of calling stop. Allow to return
        // null, and driving thread will handle any shutdown if necessary.
    }
    return null;
}
接收任务

不像SourceConnectorSinkConnectorSourceTaskSinkTask有非常不同的接口,因为SourceTask采用的是拉接口,并SinkTask使用推接口。两者共享公共生命周期方法,但SinkTask完全不同:

public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }
 
    public abstract void put(Collection<SinkRecord> records);
 
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }

这是一个简单的例子,它们有简单的结构化数据 - 每一行只是一个字符串。几乎所有实用的连接器都需要具有更复杂数据格式的模式。要创建更复杂的数据,您需要使用Kafka Connect dataAPI。

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .build();
 
Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75);

更多Kafka相关技术文章:

什么是Kafka? Kafka监控工具汇总 Kafka快速入门 Kafka核心之Consumer Kafka核心之Producer

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Kafka 0.8 Producer (0.9以前版本适用)

    Kafka旧版本producer由scala编写,0.9以后已经废除,但是很多公司还在使用0.9以前的版本,所以总结如下: 要注意包Producer是 kaf...

    实时计算
  • Spark Streaming——Spark第一代实时计算引擎

    虽然SparkStreaming已经停止更新,Spark的重点也放到了 Structured Streaming ,但由于Spark版本过低或者其他技术选型问题...

    实时计算
  • Kafka监控必备——Kafka-Eagle 2.0.2正式发布

    对于经常使用Kafka的同学,拥有一个炫酷又实用的监控系统是非常有必要的。可以实时的监控数据流的情况,了解实时数据流的变化。

    实时计算
  • 替代Flume——Kafka Connect简介

    我们知道过去对于Kafka的定义是分布式,分区化的,带备份机制的日志提交服务。也就是一个分布式的消息队列,这也是他最常见的用法。但是Kafka不止于此,打开最新...

    用户6070864
  • 图文详解:Kafka到底有哪些秘密让我对它情有独钟呢?

    最近很少发文,一是开始总结了一下自己做了两个多月的公号了,都收获了什么,学到了什么。

    浅羽的IT小屋
  • 麦当劳玩转虚拟现实,VR版“开心农场”约吗?

    镁客网
  • 学界 | 一文概览卷积神经网络中的类别不均衡问题

    机器之心
  • scrapy分布式浅谈+京东示例

    Github:https://github.com/xbhog/scrapyRedis

    xbhog
  • 【NIPS 主旨演讲】Yann LeCun:用预测学习替代无监督学习(75PPT)

    【新智元导读】日前,Facebook AI 实验室负责人、纽约大学教授 Yann LeCun 受邀在今年的 NIPS 大会发表主旨演讲,畅谈深度学习最近技术进展...

    新智元
  • Maven 核心原理解析(1)

    Maven 是每一位Java工程师每天都会接触的工具, 但据我所知其实很多人对Maven理解的并不深, 只把它当做一个依赖管理工具(下载依赖、打包), Mave...

    Java帮帮

扫码关注云+社区

领取腾讯云代金券