Kafka除了生产者和消费者的核心组件外,它的另外一个核心组件就是连接器,简单的可以把连接器理解为是Kafka系统与其他系统之间实现数据传输的通道。通过Kafka的连接器,可以把大量的数据移入到Kafka的系统,也可以把数据从Kafka的系统移出。具体如下显示:
依据如上,这样Kafka的连接器就完成了输入和输出的数据传输的管道。也就很好的理解了我们从第三方获取到海量的实时流的数据,通过生产者和消费者的模式写入到Kafka的系统,再经过连接器把数据最终存储到目标的可存储的数据库,比如Hbase等。基于如上,Kafka的连接器使用场景具体可以总结为:
1、Kafka作为一个连接的管道,把目标的数据写入到Kafka的系统,再通过Kafka的连接器把数据移出到目标的数据库
2、Kafka作为数据传输的中间介质,仅仅是数据传输的一个管道或者说是数据传输的中间介质而已。如日志文件的信息传输到Kafka的系统后,然后再从Kafka的系统把这些数据移出到ElasticSearch中进行存储并展示。通过Kafka的连接器,可以有效的把Kafka系统的生产者模式和消费者模式进行的整合,完成它的解耦。
启动Kafka系统的连接器可以通过两种方式来进行启动,一种方式是单机模式,另外一种的方式是分布式模式,这里主要是以单机模式来启动Kafka的连接器。在kafka/config的目录下配置连接器的信息,它的配置文件名称为:connect-file-source.properties,配置的内容为:
#设置连接器名称
name=local-file-source
#指定连接器类
connector.class=FileStreamSource
#设置最大任务数
tasks.max=1
#指定读取的文件
file=/tmp/source.txt
#指定主题名
topic=login
我们在该配置文件中制定了读取的文件/tmp/source.txt,下面我们在这个目录下在这个文件里面添加内容,具体内容如下:
下面通过单机的模式来启动连接器的程序,启动命令为:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties
启动成功后输出的信息,如下所示:
#!/usr/bin/env python
#!coding:utf-8
[2021-06-07 20:43:12,715] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495)
[2021-06-07 20:43:12,727] INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 12 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:574)
Kafka系统的连接器进程是以后台服务的形式在执行,它的默认端口是8083,我们可以通过REST API的方式来获取到相关的信息,比如获取到活跃连接器的实例列表,它的接口信息为:GET /connectors,具体如下:
下来再来演示下指定连接器的实例信息,返回它的详细信息,具体PostMan输出的信息如下:
下来我们再来看获取连接器的配置信息,也就是config的信息,具体PostMan的请求截图如下:
连接器提供了很多的REST API的接口,这里就不一一的演示。
下面来实现把数据导入到Kafka的login主题中,被导入的数据为
执行命令:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties
执行成功后,在Kafka的消费者程序里面就可以看到数据被导入到login的主题中,如下所示:
下面实现在单机的模式下,把Kafka主题中的数据导出到本地的具体文件中,在config的配置文件connect-file-sink.properties中指定被导出的数据写入到本地的具体文件中,具体文件内容如下:
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/target.txttopics=login
它的执行顺序具体为:
执行命令先把目标数据消费到Kafka的主题中,命令为:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties
然后在/tmp下创建文件target.txt。下来执行如下命令把Kafka的数据导出到本地,命令为:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties
控制台打印的log信息为:
[2021-06-08 15:37:11,766] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets asynchronously using sequence number 1: {data-1=OffsetAndMetadata{offset=5, leaderEpoch=null, metadata=''}, data-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, data-3=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}, data-2=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, data-5=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, data-4=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:346)
最后在/tmp下查看target.txt文件的内容,发现和最初被消费的目标文件内容是一致的。
根据如上,通过连接器把目标数据消费到Kafka系统的主题中,最后再通过连接器导出到本地的目标存储数据的地方(可能是数据库,也可能是文本)。这样就实现了最初说的连接数据管道的目的之一。