Confluent 入门


软件准备

  • Confluent安装包 下载地址:https://www.confluent.io/download/ Confluent有两个类型可以下载,企业版(Enterprise)需要付费,可以免费使用30天,我这里使用的是开源版(Open Source)版,版本号是4.1.1

1. Confluent 介绍

(1) Confluent 是什么?

Confluent平台是一个可靠的,高性能的流处理平台,你可以通过这个平台组织和管理各式各样的数据源中的数据。

image.png

(2) Confluent 中有什么?

  • Confluent开源版
    • Confluent Kafka Connectors
      • Kafka Connect JDBC Connector
      • Kafka Connect HDFS Connector
      • Kafka Connect Elasticsearch Connector
      • Kafka Connect S3 Connector
    • Confluent Kafka Clients
      • C/C++ Client Library
      • Python Client Library
      • Go Client Library
      • .Net Client Library
    • Confluent Schema Registry
    • Confluent Kafka REST Proxy
  • Confluent 企业版中增加的功能
    • Automatic Data Balancing
    • Multi-Datacenter Replication
    • Confluent Control Center
    • JMS Client

2. Confluent 开源版安装

(1) 解压安装包,可以看到以下目录:

[root@confluent confluent-4.1.1]# ll
total 24
drwxr-xr-x  3 1000 1000 4096 May 12 08:01 bin
drwxr-xr-x 14 1000 1000 4096 May 12 07:05 etc
drwxr-xr-x  3 1000 1000 4096 May 12 06:47 lib
-rw-r--r--  1 1000 1000  871 May 12 08:02 README
drwxr-xr-x  6 1000 1000 4096 May 12 07:05 share
drwxr-xr-x  2 1000 1000 4096 May 12 08:02 src

(2) 启动confluent

[root@confluent confluent-4.1.1]# bin/confluent start
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]

confluent start 会启动 confluent 全部组件,如果想要单独启动,比如单独启动 schema-registry,可以执行以下命令:

schema-registry-start

具体的单独启动各组件的命令,进入 bin 目录下,一看就能明白,不再赘述。

3. 简单使用

(1) 创建 topic

[root@confluent confluent-4.1.1]# bin/kafka-topics \
> --create \
> --zookeeper localhost:2181 \
> --replication-factor 1 \
> --partitions 1 \
> --topic confluent-test-001
Created topic "confluent-test-001".

说明: confluent 中内嵌了 Kafka 和 Zookeeper,你也可以通过指定不同的 zookeeper 在其他的 kafka 集群中创建 topic 或执行其他操作。

(2) 生产数据

[root@confluent confluent-4.1.1]# bin/ksql-datagen \
> quickstart=users \
> format=json \
> topic=confluent-test-001 \
> maxInterval=1000
[2018-06-22 14:53:19,170] INFO AvroDataConfig values: 
    schemas.cache.config = 1
    enhanced.avro.schema.support = false
    connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:179)
User_5 --> ([ 1513083004885 | 'User_5' | 'Region_9' | 'OTHER' ])
User_8 --> ([ 1508770926089 | 'User_8' | 'Region_3' | 'OTHER' ])
User_9 --> ([ 1504006562725 | 'User_9' | 'Region_5' | 'FEMALE' ])
User_8 --> ([ 1490524175099 | 'User_8' | 'Region_2' | 'OTHER' ])
User_8 --> ([ 1489424770134 | 'User_8' | 'Region_8' | 'MALE' ])
User_1 --> ([ 1516449943408 | 'User_1' | 'Region_4' | 'OTHER' ])
......

以上命令是内嵌的一个kafka-producer脚本,生成随机的用户信息,可以通过 quickstart=[CLICKSTREAM_CODES, CLICKSTREAM, CLICKSTREAM_USERS, ORDERS, RATINGS, USERS, USERS_, PAGEVIEWS] 来生成不同的数据,这个脚本会运行很长时间(官网只说了很长时间,到底多长,没说),除非你手动停止

(3) 使用 KSQL 查询生产的数据

在另一个窗口中,进入KSQL命令行(上一个窗口继续发数据不要停)

[root@confluent confluent-4.1.1]# bin/ksql
                  
                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017 Confluent Inc.

CLI v4.1.1, Server v4.1.1 located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

把生产过来的数据创建为user表:

ksql> CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, \
> userid VARCHAR, \interests array<VARCHAR>, contact_info map<VARCHAR, VARCHAR>) \
> WITH (KAFKA_TOPIC='confluent-test-001', VALUE_FORMAT='JSON', KEY = 'userid');

 Message       
---------------
 Table created 
---------------

设置消费偏移量为 "earliest":

ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

查询:

ksql> select * from users;
1529651156298 | User_7 | 1497590434653 | OTHER | Region_7 | User_7 | null | null
1529651158082 | User_9 | 1508375625042 | OTHER | Region_1 | User_9 | null | null
1529651160496 | User_5 | 1501045879443 | MALE | Region_6 | User_5 | null | null
1529651161870 | User_6 | 1514541057484 | FEMALE | Region_5 | User_6 | null | null
1529651162248 | User_3 | 1498247501220 | MALE | Region_1 | User_3 | null | null
1529651162727 | User_1 | 1495368101769 | FEMALE | Region_3 | User_1 | null | null
1529651164048 | User_4 | 1508110530233 | MALE | Region_6 | User_4 | null | null
.....
# 只要生产数据的程序没有停止,这里会一直打印查询结果

4. 关闭服务

[root@confluent confluent-4.1.1]# bin/confluent stop
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Keegan小钢

App项目实战之路(五):服务端篇

近一个月因为忙于其他事情,一直没能抽出时间来更新项目进度。现在,只能趁着国庆期间,赶紧抽空更新下进度。这次,我想简单谈谈服务端的一些东西。

2066
来自专栏杨建荣的学习笔记

MySQL中使用undrop来恢复drop的表(上)

MySQL中可以使用编程语言(比如Python)来解析binlog中DML的逆操作来达到闪回的效果,如果数据不多,手工解析也可以。这也是现在大家碰到的很多DML...

3945
来自专栏Netkiller

Linux 系统与数据库安全

目录 1. 帐号安全 1.1. Shell 安全 1.2. .history 文件 2. 临时文件安全 3. 其他安全问题 4. 防火墙配置 5. 数据库安全 ...

3695
来自专栏雨过天晴

原 在已安装Apache和MySQL的Ub

1443
来自专栏散尽浮华

[原创]Gerrit中文乱码问题解决方案分享

应开发同事的要求,部署了Gitlab+Gerrit+Jenkins的持续集成环境. 但是发现了一个问题,Gerrit登陆后有中文乱码出现. 具体情况如下: (1...

3135
来自专栏运维前线

CentOS6 安装代码检测工具sonarsource

CentOS6 安装sonarsource sonarqube下载地址: https://sonarsource.bintray.com/Distributio...

2599
来自专栏杨建荣的学习笔记

配置dg broker的问题分析及修复(r6笔记第84天)

最近从同事那儿接手了一套新环境,备库因为服务器问题已经下架,重新配了一台服务器,所以需要搭一套备库,主库已经配置好了,而且同事已经把在主库把dg broker配...

3687
来自专栏乐沙弥的世界

MySQL [ERROR] Table 'mysql.user' doesn't exist

    一次源码新装的mysql,由于没有复制my-default.cnf到/etc/my.cnf位置,在启动mysql的时候碰到了无法打开mysql.user...

1673
来自专栏张戈的专栏

MySQL在线DDL修改表结构的简单经验分享

摘 要 在线DDL修改生产环境的大表一直是运维、DBA一个很头痛的问题,本文分享一些相关经验,希望对还在头痛的同学能有所帮助,当然更希望路过的大神,如果有更靠...

3467
来自专栏FreeBuf

新手指南:DVWA-1.9全级别教程之SQL Injection

目前,最新的DVWA已经更新到1.9版本(点击原文查看链接),而网上的教程大多停留在旧版本,且没有针对DVWA high级别的教程,因此萌发了一个撰写新手教程的...

4638

扫码关注云+社区

领取腾讯云代金券