Docker下的Kafka学习之三:集群环境下的java开发

在上一章《Docker下的Kafka学习之二:搭建集群环境》中我们学会了搭建kafka集群环境,今天我们来实战集群环境下的用java发送和消费kafka的消息;

环境规划

本次实战我们要搭建的环境略有一些复杂,整体环境如下图:

如上图所示,从浏览器发起一个请求会经历以下历程: 1. 请求到nginx后,由nginx转发到tomcat,nginx后面接了两个tomcat,容器名分别是producer1和producer2,部署的都是kafkaclusterproducerdemo这个应用的war包; 2. producer1和producer2收到消息后,向broker投递消息; 3. consumer1、consumer2、consumer3是三个tomcat,上面都部署了kafkaclusterconsumerdemo这个应用,连接了kafka的partition,收到消息后消费这些消息,这三个consumer属于同一个group,共同消息主题”test002”的消息;

整个环境的ip和功能说明列表如下:

容器name

镜像

IP

功能

端口映射

端口功能

broker1

bolingcavalry/ssh-kafka292081-zk346:0.0.1

172.18.0.2

一号broker

19011:22

ssh

broker2

bolingcavalry/ssh-kafka292081-zk346:0.0.1

172.18.0.3

二号broker

19012:22

ssh

broker3

bolingcavalry/ssh-kafka292081-zk346:0.0.1

172.18.0.4

三号broker

19013:22

ssh

producer1

bolingcavalry/online_deploy_tomcat:0.0.1

172.18.0.5

一号消息制造者

19014:8080

页面访问和在线web部署

producer2

bolingcavalry/online_deploy_tomcat:0.0.1

172.18.0.6

二号消息制造者

19015:8080

页面访问和在线web部署

nginx

daocloud.io/library/nginx:latest

172.18.0.7

入口

19016:80

页面访问

consumer1

bolingcavalry/online_deploy_tomcat:0.0.1

172.18.0.8

一号消息消费者

19017:8080

页面访问和在线web部署

consumer2

bolingcavalry/online_deploy_tomcat:0.0.1

172.18.0.9

二号消息消费者

19018:8080

页面访问和在线web部署

consumer3

bolingcavalry/online_deploy_tomcat:0.0.1

172.18.0.10

三号消息消费者

19019:8080

页面访问和在线web部署

docker-compose.yml配置

这么多容器,还是用docker-compose来配置和管理比较方便,配置好的docker-compose.yml文件如下所示:

version: '2'
services:
  broker1: 
    image: bolingcavalry/ssh-kafka292081-zk346:0.0.1
    container_name: broker1
    ports:
      - "19011:22"
    restart: always
  broker2: 
    image: bolingcavalry/ssh-kafka292081-zk346:0.0.1
    container_name: broker2
    depends_on:
      - broker1
    ports:
      - "19012:22"
    restart: always  
  broker3: 
    image: bolingcavalry/ssh-kafka292081-zk346:0.0.1
    container_name: broker3
    depends_on:
      - broker2
    ports:
      - "19013:22"
    restart: always
  producer1: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    container_name: producer1
    depends_on:
      - broker3
    links: 
      - broker1:hostb1
      - broker2:hostb2
      - broker3:hostb3    
    ports:
      - "19014:8080"
    environment:
      TOMCAT_SERVER_ID: producer1  
    restart: always
  producer2: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    container_name: producer2
    depends_on:
      - producer1
    links: 
      - broker1:hostb1
      - broker2:hostb2
      - broker3:hostb3   
    ports:
      - "19015:8080"
    environment:
      TOMCAT_SERVER_ID: producer2    
    restart: always
  nginx: 
    image: daocloud.io/library/nginx:latest
    container_name: nginx
    depends_on:
      - producer2
    links: 
      - producer1:t01
      - producer2:t02
    ports:
      - "19016:80"
    restart: always
  consumer1: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    container_name: consumer1
    depends_on:
      - nginx
    links: 
      - broker1:hostb1
      - broker2:hostb2
      - broker3:hostb3
    ports:
      - "19017:8080"
    environment:
      TOMCAT_SERVER_ID: consumer1  
    restart: always
  consumer2: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    container_name: consumer2
    depends_on:
      - consumer1
    ports:
      - "19018:8080"
    links: 
      - broker1:hostb1
      - broker2:hostb2
      - broker3:hostb3
    environment:
      TOMCAT_SERVER_ID: consumer2   
    restart: always
  consumer3: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    container_name: consumer3
    depends_on:
      - consumer2
    ports:
      - "19019:8080"
    links: 
      - broker1:hostb1
      - broker2:hostb2
      - broker3:hostb3
    environment:
      TOMCAT_SERVER_ID: consumer3 
    restart: always    

如上所示,broker1、broker2、broker3是用来搭建kafka集群环境的,作为生产消息的tomcat有两个,消费消息的tomcat有三个,再加上一个nginx,就是全部的容器了;

启动所有容器

在docker-compose.yml所在目录下执行以下命令即可启动所有容器:

docker-compose up -d

启动的过程下图所示:

搭建kafka集群环境

详细的搭建步骤,请看《Docker下的Kafka学习之二:搭建集群环境》,所有操作都在broker1、broker2、broker3这三个容器上进行;

创建topic

在容器broker1上执行以下命令,就会在kafka上创建一个主题,有6个patition:

/usr/local/work/kafka_2.9.2-0.8.1/bin/kafka-topics.sh --create --zookeeper broker1:2181,broker2:2181,broker3:2181 --replication-factor 1 --partitions 6 --topic test002

查看broker1的/tmp/kafka-logs/目录,就能看到partition目录,如下图:

如上图,broker1负责的是partition2和partition5; 去broker2、borker3检查后,得到每个broker和partition的关系如下:

容器

partition

broker1

partition2、partition5

broker2

partition0、partition3

broker3

partition1、partition4

配置nginx

遗憾的是nginx容器里面没有安装vim工具,需要先执行apt-get update,再执行apt-get install -y vim,将vim装好,再去修改/etc/nginx/nginx.conf文件,修改后的内容如下:

user  nginx;
worker_processes  1;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    keepalive_timeout  65;

    #gzip  on;

    #include /etc/nginx/conf.d/*.conf;
upstream tomcat_client {
         server t01:8080 weight=1;
         server t02:8080 weight=1;
    }

    server {
        server_name "";
        listen 80 default_server;
        listen [::]:80 default_server ipv6only=on;

        location / {
            proxy_pass http://tomcat_client;
            proxy_redirect default;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }
    }
}

和原来的内容相比,主要的修改点就是注释掉“include /etc/nginx/conf.d/*.conf”,再增加upstream和server的配置; 修改完成后,执行/usr/sbin/nginx -s reload命令让nginx加载修改后的配置; 在当前电脑的浏览器输入“http://localhost:19016/examples/servlets/servlet/HelloWorldExample”试试,可以看到请求已经被nginx转发到tomcat上去了,如下图:

关于tomcat的在线部署

本次实战要开发两个java的web应用,然后将构建的war在线部署到tomcat上去,关于在线部署的详情请参照《实战docker,编写Dockerfile定制tomcat镜像,实现web应用在线部署》,本次开发的两个java应用的pom.xml中已经配置好了在线部署的插件和参数,读者们只需要将本地maven配置好部署所需的用户名和密码即可;

源码下载

本次开发的两个java工程的源码都可以在github下载,地址是:git@github.com:zq2599/blog_demos.git,这里面有多个工程,本次用到的两个工程如下图红框所示:

  • kafkaclusterproducerdemo是生产kafka消息的工程;
  • kafkaclusterconsumerdemo是消费kafka消息的工程;

接下来开始编码了;

消息生产的工程kafkaclusterproducerdemo

  1. 用maven创建一个web工程kafkaclusterproducerdemo;
  2. pom.xml中,除了spring,日志相关的依赖,还要加入下面这些:
<!--fastjson-->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.39</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.5</version>
    </dependency>

    <!--kafka-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.9.2</artifactId>
      <version>0.8.1</version>
    </dependency>

如上所示,除了kafka的依赖,fastjson和commons-lang3也会用到; 3. 自定义partition规则,创建一个实现了Partitioner接口的类BusinessPartition,可以通过key的值来决定将消息投递到那个partition,这里的做法是直接用key的值来代表partition,源码如下:

public class BusinessPartition implements Partitioner {

    /**
     * 构造函数的函数体没有东西,但是不能没有构造函数
     * @param props
     */
    public BusinessPartition(VerifiableProperties props) {
        super();
    }

    public int partition(Object o, int i) {
        int partitionValue = 0;

        if(o instanceof String && StringUtils.isNoneBlank((String)o)){
            partitionValue = Integer.valueOf((String)o);
        }

        return partitionValue;
    }
}

注意:带VerifiableProperties的构造方法一定要写! 4. 消息服务初始化: 初始化工作是放在一个Bean的init方法中进行的,如下:

@PostConstruct
    public void init(){
        try {
            Properties props = new Properties();
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("zk.connect", "hostb1:2181,hostb1:2181,hostb1:2181");
            props.put("metadata.broker.list", "hostb1:9092,hostb1:9092,hostb1:9092");
            props.put("partitioner.class","com.bolingcavalry.service.BusinessPartition");
            producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

hostb1、hostb2、hostb3都是docker-compose.yml配置的link参数; 值得注意的是”partitioner.class”这个参数的值,是我们刚刚创建的BusinessPartition这个类,这样kafka就知道用哪个自定义类来处理partition的逻辑了; 5. 发送消息: 发送消息的方法有两个,第二个接受外部传来的key,用来确定当前消息发往哪个partition:

public void sendSimpleMsg(String topic, String message) {
        //producer的内部实现中,已经考虑了线程安全,所以此处不用加锁了
        producer.send(new KeyedMessage<String, String>(topic, message));
    }

    public void sendKeyMsg(String topic, String key, String message) {
        //producer的内部实现中,已经考虑了线程安全,所以此处不用加锁了
        producer.send(new KeyedMessage<String, String>(topic, key, message));
    }
  1. 接受浏览器请求的MessageProduceController: MessageProduceController的关键代码如下:
@RequestMapping("/keymessage")
    @ResponseBody
    public String keymessage(HttpServletRequest request, Model model) {
        String topic = get(request, "topic");
        String content = get(request, "content");
        String keyStr = get(request, "key");

        SimpleMessage simpleMessage = new SimpleMessage();
        simpleMessage.setContent(content);
        simpleMessage.setFrom(TOMCAT_ID);

        String message = JSON.toJSONString(simpleMessage);

        logger.info("start simple, topic [{}], key [{}], message [{}]", topic, keyStr, message);
        messageService.sendKeyMsg(topic, keyStr, message);
        logger.info("end simple, topic [{}], key [{}], message [{}]", topic, keyStr, message);

        return String.format("success [%s], topic [%s], key [%s], content [%s]", tag(), topic, keyStr, content);
    }

如上所示,收到web请求时,会构造一个SimpleMessage对象,里面有消息内容和当前tomcat的标识(因为有两个tomcat,通过此属性可以区分是哪个发的消息),再将这个对象转成字符串,然后请求kafka发送该消息,key也是从请求参数中取得的; 7. 部署到tomcat上去: 按照我们之前的规划,kafkaclusterproducerdemo要部署到producer1和producer2这两个容器上,producer1和producer2的8080端口分别映射到了当前电脑的19014和19015端口,所以我们在pom.xml中,tomcat7-maven-plugin插件的url参数中端口改为19014和19015分别部署一次,就能将war在线部署到两个tomcat上去了,如下图:

消息消费的工程kafkaclusterconsumerdemo

  1. 用maven创建一个web工程kafkaclusterconsumerdemo;
  2. pom.xml的依赖和kafkaclusterproducerdemo工程大体上是一样的,只是zookeeper的依赖这里要格外注意,不能用kafka的间接依赖,而是要自己控制,并指定版本:
<dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.6</version>
    </dependency>

    <!--kafka-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.9.2</artifactId>
      <version>0.8.1</version>
      <exclusions>
        <exclusion>
          <artifactId>org.apache.zookeeper</artifactId>
          <groupId>zookeeper</groupId>
        </exclusion>
      </exclusions>
    </dependency>

如上所示,zookeeper的版本是3.4.6,而kafka中对zookeeper的间接依赖已经被排除了; 3. 对kafka消息的消费,已经封装在一个Bean中,初始化逻辑如下:

private static final String GROUP_ID = "testgroup001";
    private static final String ZK = "hostb1:2181,hostb2:2181,hostb3:2181";
    private static final String TOPIC = "test002";
    private static final int THREAD_NUM = 2;


    @PostConstruct
    public void init(){
        logger.info("start init kafka consumer service");
        // 1. 创建Kafka连接器
        consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(ZK, GROUP_ID));

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, THREAD_NUM);

        // 2. 指定数据的解码器
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        // 3. 获取连接数据的迭代器对象集合
        /**
         * Key: Topic主题
         * Value: 对应Topic的数据流读取器,大小是topicCountMap中指定的topic大小
         */
        Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        // 4. 从返回结果中获取对应topic的数据流处理器
        List<KafkaStream<String, String>> streams = consumerMap.get(TOPIC);

        logger.info("streams size {}", streams.size());

        // 5. 创建线程池
        this.executorPool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM,
                0,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new CustomThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        // 6. 构建数据输出对象
        int threadNumber = 0;
        for (final KafkaStream<String, String> stream : streams) {
            this.executorPool.submit(new Processer(stream, threadNumber));
            threadNumber++;
        }

        logger.info("end init kafka consumer service");
    }

如上所示,我们用的是High level的方式,很多细节的东西就不用关心了,例如commit,offset等; 由于本次实战的消息打算分成6个partition,一共有三个消费tomcat,所以每个tomcat上启动两个消费者线程来处理消息,这样就保证了每个partition都有一个单独的线程来处理; 4. 消息处理的业务类Processer实现了Runnable,关键代码如下:

public void run() {
        // 1. 获取数据迭代器
        ConsumerIterator<String, String> iter = this.stream.iterator();

        logger.info("server [{}] start run", TOMCAT_ID);

        // 2. 迭代输出数据
        while (iter.hasNext()) {
            // 2.1 获取数据值
            MessageAndMetadata value = iter.next();

            // 2.2 输出
            logger.info("server [{}], threadNumber [{}], offset [{}], key [{}], message[{}]",
                    TOMCAT_ID,
                    threadNumber,
                    value.offset(),
                    value.key(),
                    value.message());
        }
        // 3. 表示当前线程执行完成
        logger.info("Shutdown Thread:" + this.threadNumber);
    }

如上,每当从kafka中取得了消息,就通过日志打印出来; 7. 部署到tomcat上去: 按照我们之前的规划,kafkaclusterconsumerdemo要部署到consumer1、consumer2、consumer3这三个容器上,它们的8080端口分别映射到了当前电脑的19017、19018、19019端口上,所以我们在pom.xml中,tomcat7-maven-plugin插件的url参数中端口改为这三个端口分别部署一次,就能将war在线部署到三个tomcat上去了;

打印实时日志

推荐同时打开五个控制台,分别登上producer1、producer2、consumer1、consumer2、consumer3这五个容器,查看日志时分别做如下操作: 1. 对producer1和producer2,执行:

tail -f /usr/local/tomcat/logs/kafkaclusterproducerdemo/output.2017-10-29.log

output.xxxxxx.log要用当天的日期; 2. 对consumer1、consumer2、consumer3,执行:

tail -f /usr/local/tomcat/logs/kafkaclusterconsumerdemo/output.2017-10-29.log

这样就能将日志实时打印出来了;

检查kafkaclusterconsumerdemo是否启动成功

实战中经常出现consumer1、consumer2、consumer3等容器在部署了war包后启动失败的情况,请浏览器输入http://localhost:19017/kafkaclusterconsumerdemo来检查consumer1是否启动成功,启动成功后的效果如下:

如果启动失败页面就404错误了,检查日志发现是连接zookeeper失败,如下图:

这时候建议多部署几次,就能连接成功了,启动成功的日志如下图所示:

实战消息发送和接收

用浏览器分别访问以下六个地址: 1. http://localhost:19016/kafkaclusterproducerdemo/keymessage?topic=test002&content=message001&key=1 2. http://localhost:19016/kafkaclusterproducerdemo/keymessage?topic=test002&content=message002&key=2 3. http://localhost:19016/kafkaclusterproducerdemo/keymessage?topic=test002&content=message003&key=3 4. http://localhost:19016/kafkaclusterproducerdemo/keymessage?topic=test002&content=message004&key=4 5. http://localhost:19016/kafkaclusterproducerdemo/keymessage?topic=test002&content=message005&key=5 6. http://localhost:19016/kafkaclusterproducerdemo/keymessage?topic=test002&content=message006&key=6

上面六个地址代表发送了六个消息,例如第一个消息的key是1,消息内容是message001,在consumer3的日志中我们看到了这个消息,如下图所示:

从from字段我们还能发现这个消息是从producer1发出的;

从每个consumer日志中的key可以将consumer1、consumer2、consumer3和partition的关系梳理如下表:

容器

partition

consumer1

partition4、partition5

consumer2

partition0、partition1

consumer3

partition2、partition3

至此,kafka集群环境下的java开发实战就全部结束了,和之前的入门实战相比稍微复杂了一些,但也更接近实际生产环境的操作了,希望能对读者您的学习和开发有所帮助;

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏七夜安全博客

后门编程(1)之双管道主动连接型

974
来自专栏跟着阿笨一起玩NET

Log4Net日志记录两种方式

     log4net库是Apache log4j框架在Microsoft .NET平台的实现,是一个帮助程序员将日志信息输出到各种目标(控制台、文件、数据库...

632
来自专栏张善友的专栏

[腾讯社区开放平台].NET SDK基于New BSD协议开源

“QQ登录”使用户能使用QQ账号一键登录接入网站,大大降低了注册、登录的门槛。借助庞大的QQ用户群,给第三方网站带来更多新用户。已登录用户还可以将在第三方网站发...

1929
来自专栏编程

当你在 Linux 上启动一个进程时会发生什么?

英文:Julia Evans,编译:Linux中国 / jessie-pang linux.cn/article-9256-1.html 本文是关于 fork ...

1787
来自专栏技术/开源

开源API集成测试工具 Hitchhiker v0.2更新 - 压力测试

Hitchhiker 是一款开源的 Restful Api 集成测试工具,支持Schedule, 数据对比,压力测试,可以轻松部署到本地,和你的team成员一起...

17810
来自专栏疯狂的小程序

轻松理解小程序 session的实现

小程序版 websocket 聊天室。 从服务器到小程序客户端配置基础教程。

4069
来自专栏崔庆才的专栏

分布式爬虫原理之Scrapy分布式实现

2726
来自专栏Python小屋

Python扩展库psutil用法精要

0、安装与导入psutil pip install psutil import psutil 1、查看CPU信息 >>> psutil.cpu_count() ...

2804
来自专栏开发与安全

linux系统编程之进程(五):终端、作业控制与守护进程

一、终端的概念 在UNIX系统中,用户通过终端登录系统后得到一个Shell进程,这个终端成为Shell进程的控制终端(Controlling Terminal)...

1959
来自专栏企鹅号快讯

基于 Swoole 的微信扫码登录

随着微信的普及,扫码登录方式越来越被现在的应用所使用。它因为不用去记住密码,只要有微信号即可方便快捷登录。微信的开放平台原生就有支持扫码登录的功能,不过大部分人...

2379

扫码关注云+社区