前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 连接器使用与开发

Kafka 连接器使用与开发

作者头像
Se7en258
发布2021-05-18 10:58:04
2.3K0
发布2021-05-18 10:58:04
举报
文章被收录于专栏:Se7en的架构笔记

Kafka 连接器介绍

Kafka 连接器通常用来构建数据管道,一般有两种使用场景:

  • 开始和结束的端点:例如,将 Kafka 中的数据导出到 HBase 数据库,或者把 Oracle 数据库中的数据导入 Kafka 中。
  • 数据传输的中间介质:例如,为了把海量的日志数据存储到 Elasticsearch 中,可以先把这些日志数据传输到 Kafka 中,然后再从 Kafka 中将这些数据导入到 Elasticsearch 中进行存储。Kafka 连接器可以作为数据管道各个阶段的缓冲区,将消费者程序和生产者程序有效地进行解耦。

Kafka 连接器分为两种:

  • Source 连接器:负责将数据导入 Kafka。
  • Sink 连接器:负责将数据从 Kafka 系统中导出。

连接器作为 Kafka 的一部分,是随着 Kafka 系统一起发布的,无须独立安装。

Kafka 连接器特性

Kafka 连接器包含以下特性:

  • 1.是一种处理数据的通用框架,Kafka 连接器指定了一种标准,用来约束 Kafka 与其他系统的集成,简化了 Kafka 连接器的开发、部署和管理过程。
  • 2.提供单机模式和分布式模式:Kafka 连接器支持两种模式,既能扩展到支持大型集群,也可以缩小到开发和测试小规模的集群。
  • 3.提供 REST 接口:使用 REST API 来提交请求并管理 Kafka 连接器。
  • 4.自动管理偏移量:Kafka 连接器可以自动管理偏移量。
  • 5.分布式和可扩展:Kafka 连接器建立在现有的组管理协议上,可以通过添加更多的连接器实例来实现水平扩展,实现分布式服务。
  • 6.数据流和批量集成:利用 Kafka 已有的能力,Kafka 连接器是桥接数据流和批处理系统的一种理想的解决方案。

Kafka 连接器核心概念

  • 连接器实例:连接器实例决定了消息数据的流向,即消息从何处复制,以及将复制的消息写入到何处。连接器实例负责 Kafka 与其他系统之间的逻辑处理,连接器实例通常以 JAR 包形式存在,通过实现 Kafka 系统应用接口来完成。
  • 任务数:在分布式模式下,每一个连接器实例可以将一个作业切分成多个任务(Task),然后再将任务分发到各个事件线程(Worker)中去执行。任务不会保存当前的状态信息,通常由特定的 Kafka Topic 来保存,例如,指定具体属性 offset.storage.topicstatus.storage.topic 的值来保存。
  • 事件线程:在 Kafka 中,连接器实例和任务数都是逻辑层面的,需要由具体的线程来执行,事件线程包含两种模式--单机模式和分布式模式。
  • 转换器:转换器能将字节数据转换成 Kafka 连接器的内部格式,也能将 Kafka 连接器内部存储的数据格式转换成字节数据。

使用 Kafka 连接器

单机模式

单机模式配置文件

配置单机模式连接器相关参数 config/connect-standalone.properties:

代码语言:javascript
复制
# Kafka 集群 broker 地址
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

# 指定键值对 JSON 转换器类
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 启用键值对转换器
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 单机模式设置偏移量存储文件
offset.storage.file.filename=/tmp/connect.offsets

# 设置偏移量持久化时间间隔
offset.flush.interval.ms=10000
将数据从文件导入 Kafka Topic 中

编辑 Kafka 连接器 配置文件 config/connect-file-source.properties:

代码语言:javascript
复制
# 设置连接器名字
name=local-file-source
# 指定连接器类
connector.class=FileStreamSource
# 设置最大任务数
tasks.max=1
# 指定读取的文件
file=/tmp/test.txt
# 指定写入 Kafka 的 Topic 
topic=connect_test

创建数据源文件并添加数据:

代码语言:javascript
复制
[root@kafka1 ~]# cat /tmp/test.txt
kafka
hadoop
kafka-connect

启动一个单机模式的连接器将数据导入 Kafka Topic 中:

代码语言:javascript
复制
[root@kafka1 kafka]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties 

启动消费者程序查看导入到 connect_test 主题中的数据:

代码语言:javascript
复制
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic connect_test -from-beginning
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"hadoop"}
{"schema":{"type":"string","optional":false},"payload":"kafka-connect"}
{"schema":

当往文件中追加数据时,消费者可以消费到新的数据:

代码语言:javascript
复制
[root@kafka1 ~]# echo java >> /tmp/test.txt 
[root@kafka1 ~]# echo python  >> /tmp/test.txt 

消费者消费到的新的数据:

代码语言:javascript
复制
{"type":"string","optional":false},"payload":"java"}
{"schema":{"type":"string","optional":false},"payload":"python"}
将 Kafka Topic 中的数据导出到文件

编辑 Kafka 连接器 配置文件 config/connect-file-sink.properties:

代码语言:javascript
复制
# 设置连接器名字
name=local-file-sink
# 指定连接器类
connector.class=FileStreamSink
# 设置最大任务数
tasks.max=1
# 将数据写入的文件
file=/tmp/sink.txt
# 指定导出数据的 Kafka 的 Topic
topics=connect_test

启动一个单机模式的连接器将 Kafka Topic 中的数据导出:

代码语言:javascript
复制
[root@kafka1 kafka]# connect-standalone.sh  config/connect-standalone.properties config/connect-file-sink.properties

查看导出文件的内容:

代码语言:javascript
复制
[root@kafka1 ~]# cat /tmp/sink.txt 
python
kafka
hadoop
kafka-connect
java

分布式模式

在分布式模式下, Kafka 连接器会自动均衡每个事件线程所处理的任务数。允许用户动态地增加或者减少任务,在执行任务、修改配置、提交偏移量时能够得到容错保障。

在分布式模式下,Kafka 连接器会在 Kafka Topic 中存储偏移量,配置和任务状态(单机模式下是保持在本地文件中)。建议手动创建存储偏移量的主题,这样可以按需设置主题的分区数和副本数。

在分布式模式下, Kafka 连接器的配置文件不能使用命令行,需要使用 REST API 来执行创建,修改和销毁 Kafka 连机器的操作。

创建连接器相关主题
代码语言:javascript
复制
# 创建偏移量的的存储主题
kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 3 --partitions 1 --topic connect-offsets
# 创建配置存储主题
kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 3 --partitions 6 --topic connect-configs
# 创建任务状态存储主题
kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 3 --partitions 6 --topic connect-status
分布式模式配置文件
代码语言:javascript
复制
# 设置 Kafka 集群地址
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

# 设置连接器唯一组名称
group.id=connect-cluster

# 指定键值对 JSON 转换器类
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 启用键值对转换器
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 设置偏移量的的存储主题
offset.storage.topic=connect-offsets

# 设置配置存储主题
config.storage.topic=connect-configs

# 设置任务状态存储主题
status.storage.topic=connect-status

# 设置偏移量持久化时间间隔
offset.flush.interval.ms=10000

启动分布式模式连接器:

代码语言:javascript
复制
[root@kafka1 kafka]# connect-distributed.sh config/connect-distributed.properties 

查看连接器版本号信息:

代码语言:javascript
复制
[root@kafka1 ~]# curl http://kafka1:8083
{"version":"2.7.0","commit":"448719dc99a19793","kafka_cluster_id":"wp8iI172SaqLHqNvEh3T-w"}

查看当前已安装的插件:

代码语言:javascript
复制
[root@kafka1 ~]# curl http://kafka1:8083/connector-plugins -s | jq
[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.7.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.7.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]
连接器 API 接口

由于 Kafka 连接器旨在作为服务运行,因此还提供了用于管理连接器的 REST API。默认情况下,此服务在端口 8083 上运行。以下是当前支持的 API 接口:

代码语言:javascript
复制
GET /connectors #返回活动连接器的列表
POST /connectors #创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象
GET /connectors/{name} #获取有关特定连接器的信息
GET /connectors/{name}/config #获取特定连接器的配置参数
PUT /connectors/{name}/config #更新特定连接器的配置参数
GET /connectors/{name}/status #获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态
GET /connectors/{name}/tasks #获取当前为连接器运行的任务列表
GET /connectors/{name}/tasks/{taskid}/status #获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息
PUT /connectors/{name}/pause #暂停连接器及其任务,停止消息处理,直到连接器恢复
PUT /connectors/{name}/resume #恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)
POST /connectors/{name}/restart #重新启动连接器(通常是因为失败)
POST /connectors/{name}/tasks/{taskId}/restart #重启个别任务(通常是因为失败)
DELETE /connectors/{name} #删除连接器,停止所有任务并删除其配置

#Kafka Connect还提供了用于获取有关连接器插件信息的REST API:
GET /connector-plugins #返回安装在Kafka Connect集群中的连接器插件列表。请注意,API仅检查处理请求的worker的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果添加新的连接器jar
PUT /connector-plugins/{connector-type}/config/validate # 根据配置定义验证提供的配置值。此API执行每个配置验证,在验证期间返回建议值和错误消息。
将数据从文件导入到 Kafka Topic 中

通过 REST API 请求创建一个新的连接器实例,将数据导入到 Kafka Topic 中。这里使用的是 Chrome 浏览器上名为 API Tester 的插件:

请求 URL:http://kafka1:8083/connectors

请求 Body:

代码语言:javascript
复制
{
    "name": "distributed-console-source", #自定义连接器名字
    "config":
    {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "tasks.max": "1",
        "topic": "distributed_connect_test", #创建的topic
        "file": "/tmp/distributed_test.txt"  #读取的文件
    }
}

查看刚刚创建的连接器:

代码语言:javascript
复制
[root@kafka1 ~]# curl http://kafka1:8083/connectors -s | jq
[
  "distributed-console-source"
]

此时开启一个消费者实例可以成功消费到 Kafka Topic 中的数据:

代码语言:javascript
复制
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic distributed_connect_test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"distributed_kafka"}
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"hadoop"}
将 Kafka Topic 中的数据导出到文件

通过 REST API 请求创建一个新的连接器实例,将数据从 Kafka Topic 中导出到文件中。

请求 URL: http://kafka1:8083/connectors

请求 Body:

代码语言:javascript
复制
{
    "name": "distributed-console-sink",
    "config":
    {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "distributed_connect_test", #Kafka 中存在的 Topic
        "file": "/tmp/distributed_sink.txt" #导出数据到指定文件
    }
}

查看目前的连接器:

代码语言:javascript
复制
[root@kafka1 ~]# curl http://kafka1:8083/connectors -s | jq
[
  "distributed-console-sink",
  "distributed-console-source"
]

查看文件可以看到数据成功从 Kafka Topic 中导出:

代码语言:javascript
复制
[root@kafka1 ~]# cat /tmp/distributed_sink.txt 
distributed_kafka
kafka
hadoop

开发 Kafka 连接器插件

开发一个完整的 Kafka 连接器插件,分为两部分来实现:

  • 编写 Source 连接器。Source 连接器负责将第三方系统的数据导入 Kafka Topic 中。
  • 编写 Sink 连接器。Sink 连接器负责将 Kafka Topic 中的数据导出到第三方系统中。

第三方系统可以是关系型数据库(如 MySQL、Oracle 等)、文件系统(如本地文件,分布式文件系统等)、日志系统等。

本实例使用的是 Maven 工程,需要在 pom.xml 文件中引入 Kafka 依赖包:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.2.1</version>
</dependency>

编写 Source 连接器

编写一个自定义的 Source 连接器,需要实现两个抽象类:

  • SourceConnector 类,用来初始化连接器配置和任务数。
  • SourceTask 类,用来实现标准输入或者文件读取。
编写输入连接器实例
代码语言:javascript
复制
package book_8;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;

/**
 * 输入连接器,用来实现读取配置信息和分配任务等一些初始化工作
 * @author 程治玮
 * @since 2021/3/16 9:51 下午
 */
public class CustomerFileStreamSourceConnector extends SourceConnector {
    // 定义主题配置变量
    public static final String TOPIC_CONFIG = "topic";
    // 定义文件配置变量
    public static final String FILE_CONFIG = "file";

    // 实例化一个配置对象
    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.").define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");

    // 声明文件名变量
    private String filename;
    // 声明主题变量
    private String topic;

    /** 获取版本. */
    public String version() {
        return AppInfoParser.getVersion();
    }

    /** 开始初始化. */
    public void start(Map<String, String> props) {
        filename = props.get(FILE_CONFIG);
        topic = props.get(TOPIC_CONFIG);
        if (topic == null || topic.isEmpty())
            throw new ConnectException("FileStreamSourceConnector configuration must include 'topic' setting");
        if (topic.contains(","))
            throw new ConnectException("FileStreamSourceConnector should only have a single topic when used as a source.");
    }

    /** 实例化输入类. */
    public Class<? extends Task> taskClass() {
        return CustomerFileStreamSourceTask.class;
    }

    /** 获取配置信息. */
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        ArrayList<Map<String, String>> configs = new ArrayList<>();
        Map<String, String> config = new HashMap<>();
        if (filename != null)
            config.put(FILE_CONFIG, filename);
        config.put(TOPIC_CONFIG, topic);
        configs.add(config);
        return configs;
    }

    @Override
    public void stop() {
    }

    /** 获取配置对象. */
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}
编写输入连接器任务类
代码语言:javascript
复制
package book_8;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * 输入连接器任务类,用来实现标准输入或者文件读取
 * @author 程治玮
 * @since 2021/3/16 9:47 下午
 */

public class CustomerFileStreamSourceTask extends SourceTask {
    // 声明一个日志类
    private static final Logger LOG = LoggerFactory.getLogger(CustomerFileStreamSourceTask.class);
    // 定义文件字段
    public static final String FILENAME_FIELD = "filename";
    // 定义偏移量字段
    public static final String POSITION_FIELD = "position";
    // 定义值的值的数据格式
    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;

    // 声明文件名
    private String filename;
    // 声明输入流对象
    private InputStream stream;
    // 声明读取对象
    private BufferedReader reader = null;
    // 定义缓冲区大小
    private char[] buffer = new char[1024];
    // 声明偏移量变量
    private int offset = 0;
    // 声明主题名
    private String topic = null;

    // 声明输入流偏移量
    private Long streamOffset;

    /** 获取版本. */
    public String version() {
        return new CustomerFileStreamSourceConnector().version();
    }

    /** 开始执行任务. */
    public void start(Map<String, String> props) {
        filename = props.get(CustomerFileStreamSourceConnector.FILE_CONFIG);
        if (filename == null || filename.isEmpty()) {
            stream = System.in;
            streamOffset = null;
            reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
        }
        topic = props.get(CustomerFileStreamSourceConnector.TOPIC_CONFIG);
        if (topic == null)
            throw new ConnectException("FileStreamSourceTask config missing topic setting");
    }

    /** 读取记录并返回数据集. */
    public List<SourceRecord> poll() throws InterruptedException {
        if (stream == null) {
            try {
                stream = new FileInputStream(filename);
                Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
                if (offset != null) {
                    Object lastRecordedOffset = offset.get(POSITION_FIELD);
                    if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
                        throw new ConnectException("Offset position is the incorrect type");
                    if (lastRecordedOffset != null) {
                        LOG.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
                        long skipLeft = (Long) lastRecordedOffset;
                        while (skipLeft > 0) {
                            try {
                                long skipped = stream.skip(skipLeft);
                                skipLeft -= skipped;
                            } catch (IOException e) {
                                LOG.error("Error while trying to seek to previous offset in file: ", e);
                                throw new ConnectException(e);
                            }
                        }
                        LOG.debug("Skipped to offset {}", lastRecordedOffset);
                    }
                    streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
                } else {
                    streamOffset = 0L;
                }
                reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
                LOG.debug("Opened {} for reading", logFilename());
            } catch (FileNotFoundException e) {
                LOG.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
                synchronized (this) {
                    this.wait(1000);
                }
                return null;
            }
        }

        try {
            final BufferedReader readerCopy;
            synchronized (this) {
                readerCopy = reader;
            }
            if (readerCopy == null)
                return null;

            ArrayList<SourceRecord> records = null;

            int nread = 0;
            while (readerCopy.ready()) {
                nread = readerCopy.read(buffer, offset, buffer.length - offset);
                LOG.trace("Read {} bytes from {}", nread, logFilename());

                if (nread > 0) {
                    offset += nread;
                    if (offset == buffer.length) {
                        char[] newbuf = new char[buffer.length * 2];
                        System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
                        buffer = newbuf;
                    }

                    String line;
                    do {
                        line = extractLine();
                        if (line != null) {
                            LOG.trace("Read a line from {}", logFilename());
                            if (records == null)
                                records = new ArrayList<>();
                            records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, null, null, null, VALUE_SCHEMA, line, System.currentTimeMillis()));
                        }
                    } while (line != null);
                }
            }

            if (nread <= 0)
                synchronized (this) {
                    this.wait(1000);
                }

            return records;
        } catch (IOException e) {
        }
        return null;
    }

    /** 解析一条记录. */
    private String extractLine() {
        int until = -1, newStart = -1;
        for (int i = 0; i < offset; i++) {
            if (buffer[i] == '\n') {
                until = i;
                newStart = i + 1;
                break;
            } else if (buffer[i] == '\r') {
                if (i + 1 >= offset)
                    return null;

                until = i;
                newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
                break;
            }
        }

        if (until != -1) {
            String result = new String(buffer, 0, until);
            System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
            offset = offset - newStart;
            if (streamOffset != null)
                streamOffset += newStart;
            return result;
        } else {
            return null;
        }
    }

    /** 停止任务. */
    public void stop() {
        LOG.trace("Stopping");
        synchronized (this) {
            try {
                if (stream != null && stream != System.in) {
                    stream.close();
                    LOG.trace("Closed input stream");
                }
            } catch (IOException e) {
                LOG.error("Failed to close FileStreamSourceTask stream: ", e);
            }
            this.notify();
        }
    }

    private Map<String, String> offsetKey(String filename) {
        return Collections.singletonMap(FILENAME_FIELD, filename);
    }

    private Map<String, Long> offsetValue(Long pos) {
        return Collections.singletonMap(POSITION_FIELD, pos);
    }

    /** 判断是标准输入还是读取文件. */
    private String logFilename() {
        return filename == null ? "stdin" : filename;
    }
}

编写 Sink 连接器

在 Kafka 系统中,实现一个自定义的 Sink 连接器,需要实现两个抽象类。

  • SinkTask 类:用来实现标准输出或者文件写入。
  • SinkConnector 类:用来初始化连接器配置和任务数。
编写输出连接器实例
代码语言:javascript
复制
package book_8;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

/**
 * 输出连接器,用来实现读取配置信息和分配任务等一些初始化工作
 * @author 程治玮
 * @since 2021/3/16 9:56 下午
 */
public class CustomerFileStreamSinkConnector extends SinkConnector {

    // 声明文件配置变量
    public static final String FILE_CONFIG = "file";
    // 实例化一个配置对象
    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Destination filename.");

    // 声明一个文件名变量
    private String filename;

    /** 获取版本信息. */
    public String version() {
        return AppInfoParser.getVersion();
    }

    /** 执行初始化. */
    public void start(Map<String, String> props) {
        filename = props.get(FILE_CONFIG);
    }

    /** 实例化输出类.*/
    public Class<? extends Task> taskClass() {
        return CustomerFileStreamSinkTask.class;
    }

    /** 获取配置信息. */
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        ArrayList<Map<String, String>> configs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> config = new HashMap<>();
            if (filename != null)
                config.put(FILE_CONFIG, filename);
            configs.add(config);
        }
        return configs;
    }

    public void stop() {
    }

    /** 获取配置对象. */
    public ConfigDef config() {
        return CONFIG_DEF;
    }

}
编写输出连接器任务类
代码语言:javascript
复制
package book_8;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 输出连接器任务类,用来实现标准输出或者文件写入
 * @author 程治玮
 * @since 2021/3/16 9:54 下午
 */
public class CustomerFileStreamSinkTask extends SinkTask {
    // 声明一个日志对象
    private static final Logger LOG = LoggerFactory.getLogger(CustomerFileStreamSinkTask.class);

    // 声明一个文件名变量
    private String filename;
    // 声明一个输出流对象
    private PrintStream outputStream;

    /** 构造函数. */
    public CustomerFileStreamSinkTask() {
    }

    /** 重载构造函数. */
    public CustomerFileStreamSinkTask(PrintStream outputStream) {
        filename = null;
        this.outputStream = outputStream;
    }

    /** 获取版本号. */
    public String version() {
        return new CustomerFileStreamSinkConnector().version();
    }

    /** 开始执行任务. */
    public void start(Map<String, String> props) {
        filename = props.get(CustomerFileStreamSinkConnector.FILE_CONFIG);
        if (filename == null) {
            outputStream = System.out;
        } else {
            try {
                outputStream = new PrintStream(new FileOutputStream(filename, true), false, StandardCharsets.UTF_8.name());
            } catch (FileNotFoundException | UnsupportedEncodingException e) {
                throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
            }
        }
    }

    /** 发送记录给Sink并输出. */
    public void put(Collection<SinkRecord> sinkRecords) {
        for (SinkRecord record : sinkRecords) {
            LOG.trace("Writing line to {}: {}", logFilename(), record.value());
            outputStream.println(record.value());
        }
    }

    /** 持久化数据. */
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        LOG.trace("Flushing output stream for {}", logFilename());
        outputStream.flush();
    }

    /** 停止任务. */
    public void stop() {
        if (outputStream != null && outputStream != System.out)
            outputStream.close();
    }

    /** 判断是标准输出还是文件写入. */
    private String logFilename() {
        return filename == null ? "stdout" : filename;
    }
}

打包与部署

将编写好的连接器代码打成 JAR 包,放在每台 Kafka 的 libs目录下,然后重启 Kafka 集群 和 分布式模式连接器。

启动完成后,可以通过下面命令查看已安装的连接器插件,可以看到两个自定义开发的连接器插件已经部署成功:

代码语言:javascript
复制
[root@kafka1 ~]# curl http://kafka1:8083/connector-plugins -s | jq
[
  # 自定义的 Sink 连接器插件
  {
    "class": "book_8.CustomerFileStreamSinkConnector",
    "type": "sink",
    "version": "2.7.0"
  },
 # 自定义的 Source 连接器插件
  {
    "class": "book_8.CustomerFileStreamSourceConnector",
    "type": "source",
    "version": "2.7.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.7.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.7.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

将数据从文件导入 Kafka Topic 中

请求 URL:http://kafka1:8083/connectors

请求 Body:

代码语言:javascript
复制
{
    "name": "customer-distributed-console-source",
    "config":
    {
        "connector.class": "book_8.CustomerFileStreamSourceConnector",
        "tasks.max": "1",
        "topic": "customer_distributed_connect_test",
        "file": "/tmp/customer_distributed_test.txt"
    }
}

查看现在已经创建的连接器:

代码语言:javascript
复制
[root@kafka1 ~]# curl http://kafka1:8083/connectors -s | jq
[
  "customer-distributed-console-source",
  "distributed-console-sink",
  "distributed-console-source"
]

往文件中添加两条数据:

代码语言:javascript
复制
echo kubernetes >> /tmp/customer_distributed_test.txt
echo netty >> /tmp/customer_distributed_test.txt

通过消费者可以消费到刚刚添加的两条数据:

代码语言:javascript
复制
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic customer_distributed_connect_test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"kubernetes"}
{"schema":{"type":"string","optional":false},"payload":"netty"}

将 Kafka Topic 中的数据导出到文件

请求 URL:http://kafka1:8083/connectors

请求 Body:

代码语言:javascript
复制
{
    "name": "customer-distributed-console-sink",
    "config":
    {
        "connector.class": "book_8.CustomerFileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "customer_distributed_connect_test",
        "file": "/tmp/customer_distributed_sink.txt"
    }
}

查看现在已经创建的连接器:

代码语言:javascript
复制
[root@kafka1 ~]# curl http://kafka1:8083/connectors -s | jq
[
  "customer-distributed-console-source",
  "distributed-console-sink",
  "distributed-console-source",
  "customer-distributed-console-sink"
]

查看文件,可以看到成功从 Kafka Topic 中将数据导出到文件:

代码语言:javascript
复制
[root@kafka1 ~]# cat /tmp/customer_distributed_sink.txt 
kubernetes
netty
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Se7en的架构笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka 连接器介绍
  • Kafka 连接器特性
  • Kafka 连接器核心概念
  • 使用 Kafka 连接器
    • 单机模式
      • 单机模式配置文件
      • 将数据从文件导入 Kafka Topic 中
      • 将 Kafka Topic 中的数据导出到文件
    • 分布式模式
      • 创建连接器相关主题
      • 分布式模式配置文件
      • 连接器 API 接口
      • 将数据从文件导入到 Kafka Topic 中
      • 将 Kafka Topic 中的数据导出到文件
  • 开发 Kafka 连接器插件
    • 编写 Source 连接器
      • 编写输入连接器实例
      • 编写输入连接器任务类
    • 编写 Sink 连接器
      • 编写输出连接器实例
      • 编写输出连接器任务类
    • 打包与部署
      • 将数据从文件导入 Kafka Topic 中
        • 将 Kafka Topic 中的数据导出到文件
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档