Docker下kafka学习,三部曲之三:java开发

在前两章《Docker下kafka学习,三部曲之一:极速体验kafka》《Docker下kafka学习,三部曲之二:本地环境搭建》中,我们通过命令行体验了kafka的消息发布订阅服务,本章我们实战开发两个java应用,一个发布消息,一个订阅消息。

源码地址:git@github.com:zq2599/blog_demos.git 里面有多个工程,本次实战用到的工程如下图红框所示:

kafka_producer是消息生产的web工程; kafka_consumer是消息消费的web工程;

梳理容器之间的关系

开始编码之前,我们先把整个环境的容器关系梳理一下,如下图:

以上就是本次实战所有的容器,它们的功能和关系列举如下:

容器

镜像

功能

对其他容器的依赖

zk_server

zookeeper:3.3.6

一致性协调server

不依赖其他容器

kafka_server

bolingcavalry/kafka:0.0.1

消息发布订阅服务提供方

zk_server

tomcat_producer

bolingcavalry/online_deploy_tomcat:0.0.1

部署了发布消息的应用

zk_server,kafka_server

tomcat_consumer

bolingcavalry/online_deploy_tomcat:0.0.1

部署了订阅消息的应用

zk_server

关于上面列表中的提到两个镜像,在此说明一下:

  1. bolingcavalry/kafka是部署了kafka服务的镜像,详情可以参考文章《Docker下kafka学习,三部曲之二:本地环境搭建》
  2. bolingcavalry/online_deploy_tomcat是部署了tomcat的镜像,并且这个tomcat支持maven插件直接通过url部署war包,本次实战的两个工程就是用这种方式快速部署的,详情可以参考文章《实战docker,编写Dockerfile定制tomcat镜像,实现web应用在线部署》

启动容器

所有的容器关系已经梳理清楚,开始启动容器吧,创建docker-compose.yml文件,内容如下:

version: '2'
services:
  zk_server: 
    image: daocloud.io/library/zookeeper:3.3.6
    restart: always
  kafka_server: 
    image: bolingcavalry/kafka:0.0.1
    links: 
      - zk_server:zkhost
    command: /bin/sh -c '/usr/local/work/start_server.sh'
    restart: always
  tomcat_producer: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    links: 
      - zk_server:zkhost
      - kafka_server:kafkahost
    ports: 
      - "8081:8080"
    environment:
      TOMCAT_SERVER_ID: tomcat_producer
    restart: always
  tomcat_consumer: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    links: 
      - zk_server:zkhost
    ports: 
      - "8082:8080"
    environment:
      TOMCAT_SERVER_ID: tomcat_consumer
    restart: always

通过此yml可以看到,四个容器都是按照上面的表格设计的,另外tomcat_producer的8080端口映射到当前电脑的8081端口,tomcat_consumer的8080端口映射到当前电脑的8082端口,这样我们浏览器上通过8081和8082端口就能访问到两个tomcat的服务了;

打开终端,在docker-compose.yml文件所在目录执行:

docker-compose up -d

如下图,四个容器都启动了:

现在浏览器输入localhost:8081和localhost:8082都能看到熟悉的tomcat首页,如下图,证明两个tomcat也顺利启动了:

到目前为止,环境已经准备完毕,可以写代码了,请打开从git@github.com:zq2599/blog_demos.git下载的源码,先进入kafka_producer目录,看消息是如何发布的:

消息发布应用

kafka_producer目录里面是个标准的maven工程,推荐用intellij idea打开;

在pom中除了常用的spring相关依赖,还要添加kafka的依赖:

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.9.2</artifactId>
      <version>0.8.1</version>
    </dependency>

在web.xml中添加spring相关的配置,用来处理http请求:

<servlet>
    <servlet-name>SpringMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
    <async-supported>true</async-supported>
  </servlet>
  <servlet-mapping>
    <servlet-name>SpringMVC</servlet-name>
    <url-pattern>/</url-pattern>
  </servlet-mapping>

在spring-mvc.xml文件中加入spring和spring-mvc所需的配置 ,例如自动扫描包路径,等视图模式:

<!-- 自动扫描该包,使SpringMVC认为包下用了@controller注解的类是控制器 -->
    <context:component-scan base-package="com.bolingcavalry.controller" />

    <!-- 添加注解驱动 -->
    <mvc:annotation-driven enable-matrix-variables="true" />
    <!-- 允许对静态资源文件的访问 -->

    <mvc:default-servlet-handler />

    <!-- 定义跳转的文件的前后缀 ,视图模式配置 -->
    <bean
        class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <!-- 这里的配置我的理解是自动给后面action的方法return的字符串加上前缀和后缀,变成一个 可用的url地址 -->
        <property name="prefix" value="/WEB-INF/jsp/" />
        <property name="suffix" value=".jsp" />
    </bean>

    <!-- 配置文件上传,如果没有使用文件上传可以不用配置,当然如果不配,那么配置文件中也不必引入上传组件包 -->
    <bean id="multipartResolver"
        class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
        <!-- 默认编码 -->
        <property name="defaultEncoding" value="utf-8" />
        <!-- 文件大小最大值 -->
        <property name="maxUploadSize" value="10485760000" />
        <!-- 内存中的最大值 -->
        <property name="maxInMemorySize" value="40960" />
    </bean>

KafkaProducer是个单例,处于最底层位置,负责核心工具类Producer的实例化,并对外提供发送消息的方法send(String topic, String message):

/**
     * 禁止被外部实例化
     */
    private KafkaProducer(){
        super();

        try {
            Properties props = new Properties();
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("zk.connect", Constants.ZK_HOST + ":2181");
            props.put("metadata.broker.list", Constants.BROKER_HOST + ":9092");
            producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static KafkaProducer getInstance(){
        if(null==instance) {
            synchronized (KafkaProducer.class){
                if(null==instance){
                    instance = new KafkaProducer();
                }
            }
        }

        return instance;
    }

    /**
     * 发送一条消息
     * @param topic
     * @param message
     */
    public void send(String topic, String message){
        //producer的内部实现中,已经考虑了线程安全,所以此处不用加锁了
        producer.send(new KeyedMessage<Integer, String>(topic, message));
    }

KafkaService是所有底层服务对外的接口,目前只有一个方法produce,用于发送执行主题的消息:

    /**
     * 产生一条消息
     * @param message
     */
    void produce(String topic, String message);

KafkaServiceImpl实现了KafkaService接口,produce方法里面是对KafkaProducer提供的方法的调用:

    @Override
    public void produce(String topic, String message) {
        KafkaProducer.getInstance().send(topic, message);
    }

最上层是KafkaController,发送消息的web请求达到时,postsend方法会被调用:

@RequestMapping("/postsend")
public String postsend(HttpServletRequest request, Model model) {

        String message = request.getParameter("message");

        System.out.println("send -> " + message);

        String topic = Constants.TOPIC;

        kafkaService.produce(topic, message);

        model.addAttribute("message", message);
        addCommon(topic, model);
        return "send_finish";
    }

Controller和Service的关系整理如下图:

再看看jsp,一个最简单的form,提交到/postsend:

<div>
    <form method="post" action="${pageContext.request.contextPath}/postsend">
        <p>
            消息内容&nbsp;&nbsp;:&nbsp;<input type="text" name="message" maxlength="999">
        </p>

        <input type="Submit" value="提交">
        &nbsp;
        <input type="Reset" value="重置">
    </form>
</div>

为了通过mvn命令直接把war包部署到tomcat上去,需要修改pom中的tomcat7-maven-plugin插件的配置参数,如下图红框所示,端口要改成8081:

以上就是发送消息的代码,在pom.xml目录下执行mvn clean package -U -Dmaven.test.skip=true tomcat7:redeploy开始编译,打包,部署,执行成功后,在浏览器输入http://localhost:8081/kafkaproducer/send就打开了输入消息的页面:

接下来我们看消息订阅应用

消息订阅应用

基础的web.xml,spring等配置和上面的消息发布应用一致,就不再赘述了,直接看关键代码,先看封装了核心处理代码的KafkaConsumer.java:

public static KafkaConsumer getInstance(){
        if(null==instance) {
            synchronized (KafkaConsumer.class){
                if(null==instance){
                    instance = new KafkaConsumer();
                }
            }
        }

        return instance;
    }

    /**
     * 启动一个consumer
     * @param topic
     */
    public void startConsume(String topic){
        Properties props = new Properties();
        props.put("zookeeper.connect", zkConnect);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));


        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        final ConsumerIterator<byte[], byte[]> it = stream.iterator();

        Runnable executor = new Runnable() {
            @Override
            public void run() {
                while (it.hasNext()) {
                    System.out.println("************** receive:" + new String(it.next().message()));
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        new Thread(executor).start();
    }

KafkaConsumer是个单例,getInstance方法用于返回唯一实例,startConsume方法用于订阅指定的topic,由于只是个demo,对于收到消息后对消息的消费也放在了这里,只是简单的打印消息内容;

KafkaService是消息订阅服务对外暴露的唯一接口,里面只有一个方法声明:订阅指定topic的消息,具体的实现在KafkaServiceImpl:

static Set<String> runningTopicSet = new HashSet<String>();

    @Override
    public void startConsume(String topic) {
        if(runningTopicSet.contains(topic)){
            System.out.println("topic [" + topic + "]'s consumer is running");
            return;
        }

        //如果该topic对应的consumer没有启动,就立即启动
        synchronized (runningTopicSet){
            if(!runningTopicSet.contains(topic)){
                System.out.println("start topic [" + topic + "]");
                runningTopicSet.add(topic);
                KafkaConsumer.getInstance().startConsume(topic);
            }
        }

    }

可以看到startConsume方法先检查指定的主题是否已经订阅过了,如果没有订阅过才会调用KafkaConsumer提供的startConsume方法进行订阅;

在docker-compose.yml中,为tomcat_consumer容器分配的映射端口是8082,所以kafkaconsumer工程中,pom.xml里的插件配置的tomcat端口参数也要改成8082,如下图:

订阅消息的代码就这些了,在pom.xml目录下执行mvn clean package -U -Dmaven.test.skip=true tomcat7:redeploy开始编译,打包,部署,执行成功后,在浏览器输入http://localhost:8082/kafkaconsumer/start就打开了订阅消息的页面:

代码分析完毕,容器也都起来了,可以实战一下啦。

发送消息测试

http://localhost:8081/kafkaproducer/send打开的网页中输入“123456“,点击“提交“按钮就会发送一条topic等于topic001的消息,如下图:

消息订阅测试:

http://localhost:8082/kafkaconsumer/start的页面中,Topic输入框中输入”topic001”,点击“提交“按钮,就会订阅topic等于”topic001”的消息,如下图:

接下来我们要通过终端来查看订阅消息的活动状态,先通过docker ps确定应用kafkaconsumer所在容器的name,如下图:

确定了容器名,就能通过命令docker logs -f 001_tomcat_consumer_1在终端上看到容器的实时输出,如下图:

从上图可以看到,在应用kafkaproducer上发送的”123456”消息已经被kafkaconsumer收到并打印出来了,此时再去kafkaproducer的页面上发送一条消息,可以在这里看到消息会立即被kafkaconsumer收到并打印出来。

以上就是kafka入门的整个实例了,源码地址:git@github.com:zq2599/blog_demos.git,欢迎使用。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

Spark源码之Standalone模式下master持久化引擎讲解

Spark源码之Standalone模式下master持久化引擎讲解 Standalone 模式下Master为了保证故障恢复,会持久化一些重要的数据,来避免m...

17210
来自专栏Java帮帮-微信公众号-技术文章全总结

RabbitMQ详解解答【面试+工作】

如果安装rabbitMQ首先安装基于erlang语言支持的OTP软件,然后在下载rabbitMQ软件进行安装(安装过程都是下一步,在此不在说了)

551
来自专栏Core Net

ASP.NET Core 2.0 : 三. 项目结构

3725
来自专栏比原链

Derek解读Bytom源码-protobuf生成比原核心代码

Gitee地址:https://gitee.com/BytomBlockchain/bytom

571
来自专栏Linyb极客之路

使用lazyInit缩短Spring Boot启动时间

Spring Boot可以进行有助于相关针对项目的设置,包括最常见的默认设置和随时可用的配置,这无疑是很棒的,因为它节省了宝贵的时间 然而,对于框架的新手来说,...

967
来自专栏张善友的专栏

为什么 web 开发人员需要迁移到. NET Core, 并使用 ASP.NET Core MVC 构建 web 和 webservice/API

如果你是一个初学者开始学习 ASP.NET 或 ASP.NET MVC, 你可能并不知道什么是. net Framework和. net ore。不用担心!我建...

1544
来自专栏Spark学习技巧

深入了解HBase架构

1152
来自专栏腾讯移动品质中心TMQ的专栏

应用宝基于Robotium自动化测试(下)

基于Robotium自动化测试(上)》一文中小编介绍了框架选择、测试环境搭建、用例编写、跨应用处理等等内容,本文将承接上文,继续介绍测试报告生成、持续集成等等相...

1736
来自专栏逆向技术

调试器编写第一讲,调试器基本框架

                  调试器编写第一讲,调试器基本框架 今天开始调试器第一讲,调试器的基本框架,我们用过很多调试器,比如 WinDbg,Olly...

1885
来自专栏张善友的专栏

为什么 web 开发人员需要迁移到. NET Core, 并使用 ASP.NET Core MVC 构建 web 和 webservice/API

2089

扫码关注云+社区