前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Docker下kafka学习,三部曲之三:java开发

Docker下kafka学习,三部曲之三:java开发

作者头像
程序员欣宸
发布2018-01-04 16:02:23
1K0
发布2018-01-04 16:02:23
举报
文章被收录于专栏:实战docker实战docker

在前两章《Docker下kafka学习,三部曲之一:极速体验kafka》《Docker下kafka学习,三部曲之二:本地环境搭建》中,我们通过命令行体验了kafka的消息发布订阅服务,本章我们实战开发两个java应用,一个发布消息,一个订阅消息。

源码地址:git@github.com:zq2599/blog_demos.git 里面有多个工程,本次实战用到的工程如下图红框所示:

这里写图片描述
这里写图片描述

kafka_producer是消息生产的web工程; kafka_consumer是消息消费的web工程;

梳理容器之间的关系

开始编码之前,我们先把整个环境的容器关系梳理一下,如下图:

这里写图片描述
这里写图片描述

以上就是本次实战所有的容器,它们的功能和关系列举如下:

容器

镜像

功能

对其他容器的依赖

zk_server

zookeeper:3.3.6

一致性协调server

不依赖其他容器

kafka_server

bolingcavalry/kafka:0.0.1

消息发布订阅服务提供方

zk_server

tomcat_producer

bolingcavalry/online_deploy_tomcat:0.0.1

部署了发布消息的应用

zk_server,kafka_server

tomcat_consumer

bolingcavalry/online_deploy_tomcat:0.0.1

部署了订阅消息的应用

zk_server

关于上面列表中的提到两个镜像,在此说明一下:

  1. bolingcavalry/kafka是部署了kafka服务的镜像,详情可以参考文章《Docker下kafka学习,三部曲之二:本地环境搭建》
  2. bolingcavalry/online_deploy_tomcat是部署了tomcat的镜像,并且这个tomcat支持maven插件直接通过url部署war包,本次实战的两个工程就是用这种方式快速部署的,详情可以参考文章《实战docker,编写Dockerfile定制tomcat镜像,实现web应用在线部署》

启动容器

所有的容器关系已经梳理清楚,开始启动容器吧,创建docker-compose.yml文件,内容如下:

代码语言:javascript
复制
version: '2'
services:
  zk_server: 
    image: daocloud.io/library/zookeeper:3.3.6
    restart: always
  kafka_server: 
    image: bolingcavalry/kafka:0.0.1
    links: 
      - zk_server:zkhost
    command: /bin/sh -c '/usr/local/work/start_server.sh'
    restart: always
  tomcat_producer: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    links: 
      - zk_server:zkhost
      - kafka_server:kafkahost
    ports: 
      - "8081:8080"
    environment:
      TOMCAT_SERVER_ID: tomcat_producer
    restart: always
  tomcat_consumer: 
    image: bolingcavalry/online_deploy_tomcat:0.0.1
    links: 
      - zk_server:zkhost
    ports: 
      - "8082:8080"
    environment:
      TOMCAT_SERVER_ID: tomcat_consumer
    restart: always

通过此yml可以看到,四个容器都是按照上面的表格设计的,另外tomcat_producer的8080端口映射到当前电脑的8081端口,tomcat_consumer的8080端口映射到当前电脑的8082端口,这样我们浏览器上通过8081和8082端口就能访问到两个tomcat的服务了;

打开终端,在docker-compose.yml文件所在目录执行:

代码语言:javascript
复制
docker-compose up -d

如下图,四个容器都启动了:

这里写图片描述
这里写图片描述

现在浏览器输入localhost:8081和localhost:8082都能看到熟悉的tomcat首页,如下图,证明两个tomcat也顺利启动了:

这里写图片描述
这里写图片描述

到目前为止,环境已经准备完毕,可以写代码了,请打开从git@github.com:zq2599/blog_demos.git下载的源码,先进入kafka_producer目录,看消息是如何发布的:

消息发布应用

kafka_producer目录里面是个标准的maven工程,推荐用intellij idea打开;

在pom中除了常用的spring相关依赖,还要添加kafka的依赖:

代码语言:javascript
复制
<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.9.2</artifactId>
      <version>0.8.1</version>
    </dependency>

在web.xml中添加spring相关的配置,用来处理http请求:

代码语言:javascript
复制
<servlet>
    <servlet-name>SpringMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
    <async-supported>true</async-supported>
  </servlet>
  <servlet-mapping>
    <servlet-name>SpringMVC</servlet-name>
    <url-pattern>/</url-pattern>
  </servlet-mapping>

在spring-mvc.xml文件中加入spring和spring-mvc所需的配置 ,例如自动扫描包路径,等视图模式:

代码语言:javascript
复制
<!-- 自动扫描该包,使SpringMVC认为包下用了@controller注解的类是控制器 -->
    <context:component-scan base-package="com.bolingcavalry.controller" />

    <!-- 添加注解驱动 -->
    <mvc:annotation-driven enable-matrix-variables="true" />
    <!-- 允许对静态资源文件的访问 -->

    <mvc:default-servlet-handler />

    <!-- 定义跳转的文件的前后缀 ,视图模式配置 -->
    <bean
        class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <!-- 这里的配置我的理解是自动给后面action的方法return的字符串加上前缀和后缀,变成一个 可用的url地址 -->
        <property name="prefix" value="/WEB-INF/jsp/" />
        <property name="suffix" value=".jsp" />
    </bean>

    <!-- 配置文件上传,如果没有使用文件上传可以不用配置,当然如果不配,那么配置文件中也不必引入上传组件包 -->
    <bean id="multipartResolver"
        class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
        <!-- 默认编码 -->
        <property name="defaultEncoding" value="utf-8" />
        <!-- 文件大小最大值 -->
        <property name="maxUploadSize" value="10485760000" />
        <!-- 内存中的最大值 -->
        <property name="maxInMemorySize" value="40960" />
    </bean>

KafkaProducer是个单例,处于最底层位置,负责核心工具类Producer的实例化,并对外提供发送消息的方法send(String topic, String message):

代码语言:javascript
复制
/**
     * 禁止被外部实例化
     */
    private KafkaProducer(){
        super();

        try {
            Properties props = new Properties();
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("zk.connect", Constants.ZK_HOST + ":2181");
            props.put("metadata.broker.list", Constants.BROKER_HOST + ":9092");
            producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static KafkaProducer getInstance(){
        if(null==instance) {
            synchronized (KafkaProducer.class){
                if(null==instance){
                    instance = new KafkaProducer();
                }
            }
        }

        return instance;
    }

    /**
     * 发送一条消息
     * @param topic
     * @param message
     */
    public void send(String topic, String message){
        //producer的内部实现中,已经考虑了线程安全,所以此处不用加锁了
        producer.send(new KeyedMessage<Integer, String>(topic, message));
    }

KafkaService是所有底层服务对外的接口,目前只有一个方法produce,用于发送执行主题的消息:

代码语言:javascript
复制
    /**
     * 产生一条消息
     * @param message
     */
    void produce(String topic, String message);

KafkaServiceImpl实现了KafkaService接口,produce方法里面是对KafkaProducer提供的方法的调用:

代码语言:javascript
复制
    @Override
    public void produce(String topic, String message) {
        KafkaProducer.getInstance().send(topic, message);
    }

最上层是KafkaController,发送消息的web请求达到时,postsend方法会被调用:

代码语言:javascript
复制
@RequestMapping("/postsend")
public String postsend(HttpServletRequest request, Model model) {

        String message = request.getParameter("message");

        System.out.println("send -> " + message);

        String topic = Constants.TOPIC;

        kafkaService.produce(topic, message);

        model.addAttribute("message", message);
        addCommon(topic, model);
        return "send_finish";
    }

Controller和Service的关系整理如下图:

这里写图片描述
这里写图片描述

再看看jsp,一个最简单的form,提交到/postsend:

代码语言:javascript
复制
<div>
    <form method="post" action="${pageContext.request.contextPath}/postsend">
        <p>
            消息内容&nbsp;&nbsp;:&nbsp;<input type="text" name="message" maxlength="999">
        </p>

        <input type="Submit" value="提交">
        &nbsp;
        <input type="Reset" value="重置">
    </form>
</div>

为了通过mvn命令直接把war包部署到tomcat上去,需要修改pom中的tomcat7-maven-plugin插件的配置参数,如下图红框所示,端口要改成8081:

这里写图片描述
这里写图片描述

以上就是发送消息的代码,在pom.xml目录下执行mvn clean package -U -Dmaven.test.skip=true tomcat7:redeploy开始编译,打包,部署,执行成功后,在浏览器输入http://localhost:8081/kafkaproducer/send就打开了输入消息的页面:

这里写图片描述
这里写图片描述

接下来我们看消息订阅应用

消息订阅应用

基础的web.xml,spring等配置和上面的消息发布应用一致,就不再赘述了,直接看关键代码,先看封装了核心处理代码的KafkaConsumer.java:

代码语言:javascript
复制
public static KafkaConsumer getInstance(){
        if(null==instance) {
            synchronized (KafkaConsumer.class){
                if(null==instance){
                    instance = new KafkaConsumer();
                }
            }
        }

        return instance;
    }

    /**
     * 启动一个consumer
     * @param topic
     */
    public void startConsume(String topic){
        Properties props = new Properties();
        props.put("zookeeper.connect", zkConnect);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));


        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        final ConsumerIterator<byte[], byte[]> it = stream.iterator();

        Runnable executor = new Runnable() {
            @Override
            public void run() {
                while (it.hasNext()) {
                    System.out.println("************** receive:" + new String(it.next().message()));
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        new Thread(executor).start();
    }

KafkaConsumer是个单例,getInstance方法用于返回唯一实例,startConsume方法用于订阅指定的topic,由于只是个demo,对于收到消息后对消息的消费也放在了这里,只是简单的打印消息内容;

KafkaService是消息订阅服务对外暴露的唯一接口,里面只有一个方法声明:订阅指定topic的消息,具体的实现在KafkaServiceImpl:

代码语言:javascript
复制
static Set<String> runningTopicSet = new HashSet<String>();

    @Override
    public void startConsume(String topic) {
        if(runningTopicSet.contains(topic)){
            System.out.println("topic [" + topic + "]'s consumer is running");
            return;
        }

        //如果该topic对应的consumer没有启动,就立即启动
        synchronized (runningTopicSet){
            if(!runningTopicSet.contains(topic)){
                System.out.println("start topic [" + topic + "]");
                runningTopicSet.add(topic);
                KafkaConsumer.getInstance().startConsume(topic);
            }
        }

    }

可以看到startConsume方法先检查指定的主题是否已经订阅过了,如果没有订阅过才会调用KafkaConsumer提供的startConsume方法进行订阅;

在docker-compose.yml中,为tomcat_consumer容器分配的映射端口是8082,所以kafkaconsumer工程中,pom.xml里的插件配置的tomcat端口参数也要改成8082,如下图:

这里写图片描述
这里写图片描述

订阅消息的代码就这些了,在pom.xml目录下执行mvn clean package -U -Dmaven.test.skip=true tomcat7:redeploy开始编译,打包,部署,执行成功后,在浏览器输入http://localhost:8082/kafkaconsumer/start就打开了订阅消息的页面:

这里写图片描述
这里写图片描述

代码分析完毕,容器也都起来了,可以实战一下啦。

发送消息测试

http://localhost:8081/kafkaproducer/send打开的网页中输入“123456“,点击“提交“按钮就会发送一条topic等于topic001的消息,如下图:

这里写图片描述
这里写图片描述

消息订阅测试:

http://localhost:8082/kafkaconsumer/start的页面中,Topic输入框中输入”topic001”,点击“提交“按钮,就会订阅topic等于”topic001”的消息,如下图:

这里写图片描述
这里写图片描述

接下来我们要通过终端来查看订阅消息的活动状态,先通过docker ps确定应用kafkaconsumer所在容器的name,如下图:

这里写图片描述
这里写图片描述

确定了容器名,就能通过命令docker logs -f 001_tomcat_consumer_1在终端上看到容器的实时输出,如下图:

这里写图片描述
这里写图片描述

从上图可以看到,在应用kafkaproducer上发送的”123456”消息已经被kafkaconsumer收到并打印出来了,此时再去kafkaproducer的页面上发送一条消息,可以在这里看到消息会立即被kafkaconsumer收到并打印出来。

以上就是kafka入门的整个实例了,源码地址:git@github.com:zq2599/blog_demos.git,欢迎使用。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-05-11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 梳理容器之间的关系
  • 启动容器
  • 消息发布应用
  • 消息订阅应用
  • 发送消息测试
  • 消息订阅测试:
相关产品与服务
容器镜像服务
容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档