前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >加米谷:Kafka Connect如何运行管理

加米谷:Kafka Connect如何运行管理

原创
作者头像
加米谷大数据
发布2018-03-30 16:24:37
1.7K0
发布2018-03-30 16:24:37
举报
文章被收录于专栏:加米谷大数据加米谷大数据

上节讲述了Kafka OffsetMonitor:监控消费者和延迟的队列,本节更详细的介绍如何配置,运行和管理Kafka Connect,有兴趣的请关注我们的公众号。

运行Kafka Connect

Kafka Connect目前支持两种执行模式:

独立(单进程)和分布式

在独立模式下,所有的工作都在一个单进程中进行的。这样易于配置,在一些情况下,只有一个在工作是好的(例如,收集日志文件),但它不会从kafka Connection的功能受益,如容错。通过下面的命令开始一个单进程的例子:

bin/connect-standalone.sh config/connect-standalone.propertiesconnector1.properties [connector2.properties ...] 第一个参数是worker(工人)的配置,这包括 Kafka连接的参数设置,序列化格式,以及频繁地提交offset(偏移量)。本节提供的例子用的是默认的配置conf/server.properties。其余的参数是connector(连接器)配置文件。你可以配置你需要的,但是所有的执行都在同一个进程(在不同的线程)。分布式的模式会自动平衡。允许你动态的扩展(或缩减),并在执行任务期间和配置、偏移量提交中提供容错保障,非常类似于独立模式: bin/connect-distributed.sh config/connect-distributed.properties

在不同的类中,配置参数定义了Kafka Connect如何处理,哪里存储配置,如何分配work,哪里存储offset和任务状态。 在分布式模式中,Kafka Connect在topic中存储offset,配置和任务状态。建议手动创建offset的topic,可以自己来定义需要的分区数和副本数。如果启动Kafka Connect时还没有创建topic,那么topic将自动创建(使用默认的分区和副本),这可能不是最合适的(因为kafka可不知道业务需要,只能根据默认参数创建)。特别是以下配置参数尤为关键,

启动集群之前设置: group.id (默认connect-cluster) - Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。 config.storage.topic (默认connect-configs) - topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以确保是单个partition(自动创建的可能会有多个partition)。 offset.storage.topic (默认 connect-offsets) - topic用于存储offsets;这个topic应该配置多个 partition和副本。 status.storage.topic (默认 connect-status) - topic 用于存储状态;这个topic 可以有多个 partitions和副本 注意,在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的REST API来创建,修改和销毁connector。

配置连接器(connector)

Connector的配置是简单的key-value映射。对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。在分布式模式,JSON负载connector的创建(或修改)请求。大多数配置都是依赖的connector,有几个常见的选项: name - 连接器唯一的名称,不能重复。 connector.calss - 连接器的Java类。 tasks.max - 连接器创建任务的最大数。 connector.class配置支持多种格式:全名或连接器类的别名。比如连接器是org.apache.kafka.connect.file.FileStreamSinkConnector,你可以指定全名,也可以使 用FileStreamSink或FileStreamSinkConnector。Sink connector也有一个额外的选项来控制它们的输入: topics - 作为连接器的输入的topic列表。 对于其他的选项,你可以查看连接器的文档。

REST API 由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端 口是8083。以下是当前支持的终端入口: GET /connectors - 返回活跃的connector列表 POST /connectors - 创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段 (connector的配置参数)的JSON对象。 GET /connectors/{name} - 获取指定connector的信息 GET /connectors/{name}/config - 获取指定connector的配置参数 PUT /connectors/{name}/config - 更新指定connector的配置参数 GET /connectors/{name}/status - 获取connector的当前状态,包括它是否正在运行,失败,暂停 等。 GET /connectors/{name}/tasks - 获取当前正在运行的connector的任务列表。 GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括是否是运行中的, 失败的,暂停的等, PUT /connectors/{name}/pause - 暂停连接器和它的任务,停止消息处理,直到connector恢复。 PUT /connectors/{name}/resume - 恢复暂停的connector(如果connector没有暂停,则什么都不做) POST /connectors/{name}/restart - 重启connector(connector已故障) POST /connectors/{name}/tasks/{taskId}/restart - 重启单个任务 (通常这个任务已失败) DELETE /connectors/{name} - 删除connector, 停止所有的任务并删除其配置

Kafka Connector还提供了获取有关connector plugins信息的REST API: GET /connector-plugins- 返回已在Kafka Connect集群安装的connector plugin列表。请注意,API仅验证处理请求的worker的connector。这以为着你可能看不不一致的结果,特别是在滚动升级的时候(添加新的connector jar) PUT /connector-plugins/{connector-type}/config/validate - 对提供的配置值进行验证,执行对每个配置验证,返回验证的建议值和错误信息。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档