JavaWeb项目架构之Kafka分布式日志队列

架构、分布式、日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了。

kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

特性

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

主要功能

  • 发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因
  • 以容错的方式记录消息流,kafka以文件的方式来存储消息流
  • 可以再消息发布的时候进行处理

使用场景

  • 在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能
  • 构建实时的流数据处理程序来变换或处理数据流,数据处理功能

消息传输流程

760273-20171108181426763-1692750478.png

相关术语介绍

  • **Broker**

Kafka集群包含一个或多个服务器,这种服务器被称为broker

  • **Topic**

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • **Partition**

Partition是物理上的概念,每个Topic包含一个或多个Partition.

  • **Producer**

负责发布消息到Kafka broke

  • **Consumer**

消息消费者,向Kafka broker读取消息的客户端。

  • **Consumer Group**

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

Kafka安装

环境

Linux、JDK、Zookeepe

下载二进制程序

wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka\_2.11-0.10.0.1.tgz

安装

tar -zxvf kafka\_2.11-0.10.0.1.tgz

cd kafka\_2.11-0.10.0.1

目录说明

bin 启动,停止等命令

config 配置文件

libs 类库

参数说明

#########################参数解释##############################



broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样



port=9092 #当前kafka对外提供服务的端口默认是9092



host.name=192.168.1.170 #这个参数默认是关闭的



num.network.threads=3 #这个是borker进行网络处理的线程数



num.io.threads=8 #这个是borker进行I/O处理的线程数



log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个



socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能



socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘



socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小



num.partitions=1 #默认的分区数,一个topic默认1个分区数



log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天



message.max.byte=5242880  #消息保存的最大值5M



default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务



replica.fetch.max.bytes=5242880  #取消息的最大直接数



log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件



log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除



log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能



zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218 #设置zookeeper的连接端口、如果非集群配置一个地址即可



#########################参数解释##############################

启动kafka

启动kafka之前要启动相应的zookeeper集群、自行安装,这里不做说明。

#进入到kafka的bin目录

./kafka-server-start.sh -daemon ../config/server.properties

Kafka集成

环境

spring-boot、elasticsearch、kafka

pom.xml引入:

<!-- kafka 消息队列 -->

<dependency>

<groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

    <version>1.1.1.RELEASE</version>

</dependency>

生产者

import java.util.HashMap;

import java.util.Map;



import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

/\*\*

 \* 生产者

 \* 创建者 科帮网

 \* 创建时间    2018年2月4日

 \*/

@Configuration

@EnableKafka

public class KafkaProducerConfig {



    @Value("${kafka.producer.servers}")

    private String servers;

    @Value("${kafka.producer.retries}")

    private int retries;

    @Value("${kafka.producer.batch.size}")

    private int batchSize;

    @Value("${kafka.producer.linger}")

    private int linger;

    @Value("${kafka.producer.buffer.memory}")

    private int bufferMemory;





    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP\_SERVERS\_CONFIG, servers);

        props.put(ProducerConfig.RETRIES\_CONFIG, retries);

        props.put(ProducerConfig.BATCH\_SIZE\_CONFIG, batchSize);

        props.put(ProducerConfig.LINGER\_MS\_CONFIG, linger);

        props.put(ProducerConfig.BUFFER\_MEMORY\_CONFIG, bufferMemory);

        props.put(ProducerConfig.KEY\_SERIALIZER\_CLASS\_CONFIG, StringSerializer.class);

        props.put(ProducerConfig.VALUE\_SERIALIZER\_CLASS\_CONFIG, StringSerializer.class);

        return props;

    }



    public ProducerFactory<String, String> producerFactory() {

        return new DefaultKafkaProducerFactory<>(producerConfigs());

    }



    @Bean

    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<String, String>(producerFactory());

    }

}

消费者

mport java.util.HashMap;

import java.util.Map;



import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.config.KafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/\*\*

 \* 消费者

 \* 创建者 科帮网

 \* 创建时间    2018年2月4日

 \*/

@Configuration

@EnableKafka

public class KafkaConsumerConfig {

    @Value("${kafka.consumer.servers}")

    private String servers;

    @Value("${kafka.consumer.enable.auto.commit}")

    private boolean enableAutoCommit;

    @Value("${kafka.consumer.session.timeout}")

    private String sessionTimeout;

    @Value("${kafka.consumer.auto.commit.interval}")

    private String autoCommitInterval;

    @Value("${kafka.consumer.group.id}")

    private String groupId;

    @Value("${kafka.consumer.auto.offset.reset}")

    private String autoOffsetReset;

    @Value("${kafka.consumer.concurrency}")

    private int concurrency;

    @Bean

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        factory.setConcurrency(concurrency);

        factory.getContainerProperties().setPollTimeout(1500);

        return factory;

    }



    public ConsumerFactory<String, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs());

    }





    public Map<String, Object> consumerConfigs() {

        Map<String, Object> propsMap = new HashMap<>();

        propsMap.put(ConsumerConfig.BOOTSTRAP\_SERVERS\_CONFIG, servers);

        propsMap.put(ConsumerConfig.ENABLE\_AUTO\_COMMIT\_CONFIG, enableAutoCommit);

        propsMap.put(ConsumerConfig.AUTO\_COMMIT\_INTERVAL\_MS\_CONFIG, autoCommitInterval);

        propsMap.put(ConsumerConfig.SESSION\_TIMEOUT\_MS\_CONFIG, sessionTimeout);

        propsMap.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG, StringDeserializer.class);

        propsMap.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG, StringDeserializer.class);

        propsMap.put(ConsumerConfig.GROUP\_ID\_CONFIG, groupId);

        propsMap.put(ConsumerConfig.AUTO\_OFFSET\_RESET\_CONFIG, autoOffsetReset);

        return propsMap;

    }



    @Bean

    public Listener listener() {

        return new Listener();

    }

}

日志监听

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;



import com.itstyle.es.common.utils.JsonMapper;

import com.itstyle.es.log.entity.SysLogs;

import com.itstyle.es.log.repository.ElasticLogRepository;

/\*\*

 \* 扫描监听

 \* 创建者 科帮网

 \* 创建时间    2018年2月4日

 \*/

@Component

public class Listener {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    

    @Autowired

    private  ElasticLogRepository elasticLogRepository;

    

    @KafkaListener(topics = {"itstyle"})

    public void listen(ConsumerRecord<?, ?> record) {

        logger.info("kafka的key: " + record.key());

        logger.info("kafka的value: " + record.value());

        if(record.key().equals("itstyle\_log")){

            try {

                SysLogs log = JsonMapper.fromJsonString(record.value().toString(), SysLogs.class);

                logger.info("kafka保存日志: " + log.getUsername());

                elasticLogRepository.save(log);

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

    }

}

测试日志传输

  /\*\*

    \* kafka 日志队列测试接口

    \*/

   @GetMapping(value="kafkaLog")

   public @ResponseBody String kafkaLog() {

        SysLogs log = new SysLogs();

        log.setUsername("红薯");

        log.setOperation("开源中国社区");

        log.setMethod("com.itstyle.es.log.controller.kafkaLog()");

        log.setIp("192.168.1.80");

        log.setGmtCreate(new Timestamp(new Date().getTime()));

        log.setExceptionDetail("开源中国社区");

        log.setParams("{'name':'码云','type':'开源'}");

        log.setDeviceType((short)1);

        log.setPlatFrom((short)1);

        log.setLogType((short)1);

        log.setDeviceType((short)1);

        log.setId((long)200000);

        log.setUserId((long)1);

        log.setTime((long)1);

        //模拟日志队列实现

        String json = JsonMapper.toJsonString(log);

        kafkaTemplate.send("itstyle", "itstyle\_log",json);

        return "success";

   }

Kafka与Redis

之前简单的介绍过,JavaWeb项目架构之Redis分布式日志队列,有小伙伴们聊到, Redis PUB/SUB没有任何可靠性保障,也不会持久化。当然了,原项目中仅仅是记录日志,并不是十分重要的信息,可以有一定程度上的丢失

Kafka与Redis PUB/SUB之间最大的区别在于Kafka是一个完整的分布式发布订阅消息系统,而Redis PUB/SUB只是一个组件而已。

使用场景

  • Redis PUB/SUB

消息持久性需求不高、吞吐量要求不高、可以忍受数据丢失

  • Kafka

高可用、高吞吐、持久性、多样化的消费处理模型

开源项目源码(参考):https://gitee.com/52itstyle/spring-boot-elasticsearch

作者: 小柒

出处: https://blog.52itstyle.com

分享是快乐的,也见证了个人成长历程,文章大多都是工作经验总结以及平时学习积累,基于自身认知不足之处在所难免,也请大家指正,共同进步。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏后端技术探索

PHP非阻塞模式

让PHP不再阻塞当PHP作为后端处理需要完成一些长时间处理,为了快速响应页面请求,不作结果返回判断的情况下,可以有如下措施:

1301
来自专栏Ryan Miao

使用Apache Server 的ab进行web请求压力测试

参考:http://www.cnblogs.com/spring3mvc/archive/2010/11/23/2414741.html 自己写代码经常是顺着逻...

3497
来自专栏葡萄城控件技术团队

Web API 持续集成:PostMan+Newman+Jenkins(图文讲解)

上篇文章我们已经完成了API测试工具选型,接下来是一系列周期性的开发测试过程:接口开发、检出代码、运行测试、记录结果、发送报告。为了快速发现问题,并减少重复过程...

1472
来自专栏cloudskyme

Apache nutch1.5 & Apache solr3.6

第1章引言 1.1nutch和solr Nutch 是一个开源的、Java 实现的搜索引擎。它提供了我们运行自己的搜索引擎所需的全部工具。 Solr 拥有像 w...

3654
来自专栏北京马哥教育

Linux磁盘及文件系统管理

磁盘(Hard Disk Drive,简称HDD)是一种存储介质,传统的机械硬盘由一个或多个铝制或玻璃制的碟片组成,碟片外覆盖有铁磁性材料。 磁盘的物...

3123
来自专栏SpringBoot 核心技术

第四十三章: 基于SpringBoot & RabbitMQ完成TopicExchange分布式消息消费

36115
来自专栏坚毅的PHP

hbase问题总结

 1 java.io.IOException: java.io.IOException: java.lang.IllegalArgumentException:...

5636
来自专栏小怪聊职场

Java|Spring+SpringMVC+MyBatis框架科普

2665
来自专栏张戈的专栏

Linux系统crontab备份数据库执行不成功?可能是百分号%在作怪!

之前博客分享过一篇《Linux/vps 本地七天循环备份和七牛远程备份脚本》,我自己也一直在用。某天检查备份的时候,突然发现数据库的备份的压缩包是空的! 看了下...

3104
来自专栏云鼎实验室的专栏

跨站的艺术-XSS入门与介绍

什么是XSS? XSS全称跨站脚本(Cross Site Scripting),为不和层叠样式表(Cascading Style Sheets, CSS)的缩写...

4297

扫码关注云+社区

领取腾讯云代金券