专栏首页品茗ITSpringBoot入门建站全系列(二十八)整合Kafka做日志监控

SpringBoot入门建站全系列(二十八)整合Kafka做日志监控

SpringBoot入门建站全系列(二十八)整合Kafka做日志监控

一、概述

Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

所以说,Kafka还是一个MQ,这时候,你肯定会想到ActiveMQ、RabbitMQ、RocketMQ等,在《Web基础配置篇(十): ActiveMQ与RabbitMQ的安装配置及使用》 一篇中,已经大概讲述了他们之间的区别,这里还是要简单说明一下:

  • ActiveMQ是java写的消息队列,ActiveMq几个月才发一次版本,社区已经不活跃了;
  • RabbitMQ是基于erlang开发,国人很少学erlang的,但社区还是蛮活跃的,而且性能极其好,延时很低;
  • RocketMQ是java写的,阿里的,网上都说怕它哪天gg了,中小型公司用起来就麻烦了,但是性能蛮好的;
  • Kafka是基于scala的,主要是面向大数据的,最大的优点,就是吞吐量高。

所以,网上一般的推荐就是,中小型公司可以选择RabbitMQ,因为怕阿里不维护RocketMQ了,就没有能力去维护RocketMQ;大型软件公司可以选择rocketMq,因为有钱,所以有人维护。至于kafka,根据业务场景选择,大数据领域中以及日志采集,肯定是首选kafka了。

前面文章已经有整合过ActiveMQ和RabbitMQ:

《SpringBoot入门建站全系列(十七)整合ActiveMq(JMS类消息队列)》SpringBoot入门建站全系列(十八)整合RabbitMQ(AMQP类消息队列)

本篇整合kafka,使用Spring kafka对kafka进行操作,后续会使用spring-integration-kafka进行整合。

代码可以在SpringBoot组件化构建https://www.pomit.cn/java/spring/springboot.html中的Kafka组件中查看,并下载。

**如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以<a

href="https://jq.qq.com/?_wv=1027&k=52sgH1J"

target="_blank">

加入我们的java学习圈,点击即可加入

</a>

,共同学习,节约学习时间,减少很多在学习中遇到的难题。**

二、配置

本文假设你已经引入spring-boot-starter-web。已经是个SpringBoot项目了,如果不会搭建,可以打开这篇文章看一看《SpringBoot入门建站全系列(一)项目建立》

需要提前搭好Kafka,Web基础配置篇(十四): Kafka单机、集群的安装配置及使用 这里有Kafka的安装方法。

2.1 Maven依赖

使用activemq可以使用spring-boot-starter-activemq,方便快捷,一般springboot对大多数开源项目都做了整合,提供了专用的stater。

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

2.2 配置文件

在application.properties 中需要配置kafka的信息,也可以配置自定义的配置,如:

spring.kafka.bootstrap-servers=10.247.63.210:9092,10.247.62.76:9092

# producer
spring.kafka.producer.retries=1
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

kafka.topics.log=logCenter


# consumer
# group id
spring.kafka.consumer.group-id=log_center_group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

这里面,

  • spring.kafka.bootstrap-servers是kafka的集群地址,必须配置。
  • spring.kafka.producer.retries是生产者重试次数
  • spring.kafka.producer.value-serializer是发送数据转换,这里配置的是转成成json数据。
  • kafka.topics.log,这是我自己定义的一个kafka的topics。
  • spring.kafka.consumer.group-id是kafka的消费group-id,带group-id能避免重复消费
  • spring.kafka.consumer.auto-offset-reset是消费开始时机
  • spring.kafka.consumer.enable-auto-commit是自动提交offset
  • spring.kafka.consumer.value-deserializer是数据解码方式;这个地方如果设置为JsonDeserializer,会提示No type information in headers and no default type provided,需要额外配置,https://stackoverflow.com/questions/55109508/spring-kafka-no-type-information-in-headers-and-no-default-type-provided 这里有人说可以使用StringDeserializer接收,然后使用StringJsonMessageConverter做转换,经试验单独消费使用StringJsonMessageConverter是无效的,所以自己用json工具转一下最简单了。

注意:

  • 只要不更改group.id,每次重新消费kafka,都是从上次消费结束的地方继续开始,不论auto.offset.reset属性设置的是什么
  • 如果Kafka上积累了数据,想从最新的地方开始消费,则可以更改group.id,auto.offset.reset设置为latest。
  • 如果Kafka上积累了数据,想从最开始的地方开始消费,则可以更改group.id,auto.offset.reset设置为earliest。

三、Kafka的使用

3.1 Topics的建立

可以使用脚本来建立,也可以使用代码建立。

Web基础配置篇(十四): Kafka单机、集群的安装配置及使用 这里有使用脚本建立topics的方式。

下面的config会建立新的topics,如果已经存在,这个bean会被忽略。

KafkaTopicsConfig :

package com.cff.springbootwork.kafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//如果不存topics,通过NewTopic新建
@Configuration
public class KafkaTopicsConfig {

	@Bean
	public NewTopic logCenter() {
	    return new NewTopic("logCenter", 2, (short) 2);
	}

	@Bean
	public NewTopic logTest() {
	    return new NewTopic("logCenter_test", 2, (short) 2);
	}
}

3.2 生产者(切面做日志监控)

kafka一般用来做日志分析,我们假设kafka会对应用的请求数据、业务处理数据搜集并提交给日志中心(可以是elk等)。

这里,我们用切面来对Controller层和Service层记录日志:

3.2.1 Controller层

KafkaTestRest:

package com.cff.springbootwork.kafka.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.cff.springbootwork.kafka.model.TransDataModel;
import com.cff.springbootwork.kafka.service.BusinessSerivce;

@RestController
@RequestMapping("/kafkaTest")
public class KafkaTestRest {
	@Autowired
	BusinessSerivce businessSerivce;

	@RequestMapping(value = "/test")
	public TransDataModel test(TransDataModel defaultMqModel) {
		return businessSerivce.doTrans(defaultMqModel);
	}

	@RequestMapping(value = "/test2")
	public TransDataModel test2(TransDataModel seccondMqModel) {
		return businessSerivce.doTrans(seccondMqModel);
	}
}
3.2.2 Service层

BusinessSerivce :

package com.cff.springbootwork.kafka.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.cff.springbootwork.kafka.model.TransDataModel;

@Service
public class BusinessSerivce {
	private final Logger log = LoggerFactory.getLogger(this.getClass());

	public TransDataModel doTrans(TransDataModel defaultMqModel) {
		log.info("处理消息{}", defaultMqModel);

		defaultMqModel.setType("1111");

		return defaultMqModel;
	}
}
3.2.3 日志切面

KafkaLogAspect:

package com.cff.springbootwork.kafka.log;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.cff.springbootwork.kafka.model.KafkaLogModel;

@Aspect
@Component
public class KafkaLogAspect {
	@Autowired
	private KafkaTemplate<String, KafkaLogModel> kafkaTemplate;

	@Value("${kafka.topics.log}")
	private String logTopics;

	private Logger log = LoggerFactory.getLogger(KafkaLogAspect.class);

	@Around("execution(public * com.cff.springbootwork.kafka.service.BusinessSerivce.doTrans(..))")
	public Object doAroundService(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
		KafkaLogModel kafkaLogModel = new KafkaLogModel();
		kafkaLogModel.setLogType("SERVICE");
		Object[] objs = proceedingJoinPoint.getArgs();
		kafkaLogModel.setReqContent(objs);
		Object obj = proceedingJoinPoint.proceed();
		kafkaLogModel.setResContent(obj);

		log.info("开始发送给kafka,数据{}", kafkaLogModel.toString());
		ProducerRecord<String, KafkaLogModel> record = new ProducerRecord<String, KafkaLogModel>(logTopics,
				kafkaLogModel);
		ListenableFuture<SendResult<String, KafkaLogModel>> future = kafkaTemplate.send(record);
		future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaLogModel>>() {
			@Override
			public void onSuccess(SendResult<String, KafkaLogModel> result) {
				int partition = result.getRecordMetadata().partition();
				log.info("kafka存储partition为{}", partition);
			}

			@Override
			public void onFailure(Throwable ex) {

			}
		});

		log.info("开始发送给kafka,数据{}", kafkaLogModel.toString());
		return obj;
	}

	@Around("execution(public * com.cff.springbootwork.kafka.web.KafkaTestRest.*(..))")
	public Object doAroundController(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
		KafkaLogModel kafkaLogModel = new KafkaLogModel();
		kafkaLogModel.setLogType("CONTROLLER");
		Object[] objs = proceedingJoinPoint.getArgs();
		kafkaLogModel.setReqContent(objs);
		Object obj = proceedingJoinPoint.proceed();
		kafkaLogModel.setResContent(obj);

		log.info("开始发送给kafka,数据{}", kafkaLogModel.toString());
		ProducerRecord<String, KafkaLogModel> record = new ProducerRecord<String, KafkaLogModel>(logTopics,
				kafkaLogModel);
		kafkaTemplate.send(record);

		log.info("开始发送给kafka,数据{}", kafkaLogModel.toString());
		return obj;
	}
}

这里,

  • 分别对doAroundController和doAroundService对Controller、Service做切面。
  • 组建日志对象KafkaLogModel,并组建ProducerRecord对象,补全topics信息,也可以指定分区、key等信息。
  • kafkaTemplate发送消息。可以使用ListenableFuture对发送结果进行处理。

3.3 消费者

消费者一般和生产者是不在一起的,这里为了测试,就写在一起了。

消费者只需要使用@KafkaListener注解相应的方法即可。参数是字符串,接收消息。

这个地方如果设置为JsonDeserializer,会提示No type information in headers and no default type provided,需要额外配置,https://stackoverflow.com/questions/55109508/spring-kafka-no-type-information-in-headers-and-no-default-type-provided 这里有人说可以使用StringDeserializer接收,然后使用StringJsonMessageConverter做转换,经试验单独消费使用StringJsonMessageConverter是无效的,所以自己用json工具转一下最简单了。

KafkaLogConsumer :

package com.cff.springbootwork.kafka.consumer;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.cff.springbootwork.kafka.model.KafkaLogModel;
import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class KafkaLogConsumer {
	private Logger log = LoggerFactory.getLogger(this.getClass());

	@KafkaListener(topics = { "${kafka.topics.log}" })
	public void consumer(String message) {
		ObjectMapper mapper = new ObjectMapper();
		KafkaLogModel kafkaLogModel;
		try {
			kafkaLogModel = mapper.readValue(message, KafkaLogModel.class);
			log.info("收到消息:{}", kafkaLogModel.toString());
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

四、过程中用到的实体

TransDataModel:

package com.cff.springbootwork.kafka.model;

public class TransDataModel {
	public String title;
	public String content;
	public String type;

	public String getTitle() {
		return title;
	}

	public void setTitle(String title) {
		this.title = title;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	@Override
	public String toString() {
		return "TransDataModel [title=" + title + ", content=" + content + ", type=" + type + "]";
	}

}

KafkaLogModel:

package com.cff.springbootwork.kafka.model;

public class KafkaLogModel {
	// 日志类型 controller日志: CONTROLLER service日志: SERVICE
	private String logType;

	private Object reqContent;

	private Object resContent;

	public String getLogType() {
		return logType;
	}

	public void setLogType(String logType) {
		this.logType = logType;
	}

	public Object getReqContent() {
		return reqContent;
	}

	public void setReqContent(Object reqContent) {
		this.reqContent = reqContent;
	}

	public Object getResContent() {
		return resContent;
	}

	public void setResContent(Object resContent) {
		this.resContent = resContent;
	}

	@Override
	public String toString() {
		return "KafkaLogModel [logType=" + logType + ", reqContent=" + reqContent + ", resContent=" + resContent + "]";
	}
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • SpringBoot入门建站全系列(十一)Spring-security进行权限认证

    Spring 是一个非常流行和成功的 Java 应用开发框架。Spring Security 基于 Spring 框架,提供了一套 Web 应用安全性的完整解决...

    品茗IT
  • Web基础配置篇(十四): Kafka单机、集群的安装配置及使用

    Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线...

    品茗IT
  • SpringCloud微服务实战系列(十三)分布式锁之Redis实现(redisson)

    分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,...

    品茗IT
  • springboot kafka集成(实现producer和consumer)

    本文介绍如何在springboot项目中集成kafka收发message。 1、先解决依赖 springboot相关的依赖我们就不提了,和kafka相关的只依赖...

    用户1225216
  • LeetCode 429 N-ary Tree Level Order Traversal

    该题的要点是保持左右顺序,和记录当前层数,保持顺序的意思是指要将树某一层的数据从左至右放置到数组中,记录层数就更不用说了,每一层对应一个数组,要能区分数据的层级...

    一份执着✘
  • java部分util公用方法

    用户1418372
  • 谷歌跨界音乐圈:AI用上千乐器数据创造出人类没听过的声音,来听听看!

    大数据文摘
  • 第四篇:SpringBoot与任务

    版权声明:本文为博主原创文章,未经博主允许不得转载。 ...

    用户1212940
  • caffe详解之solver

    通过前面的讲解我们了解了不同的层,作为我们设计深度神经网络的积木,梳理了caffe中给定的损失函数(当然我们也可以针对自己的问题设计),并进一步对优化算法进行了...

    AI异构
  • SpringBoot开发案例之整合Kafka实现消息队列

    最近在做一款秒杀的案例,涉及到了同步锁、数据库锁、分布式锁、进程内队列以及分布式消息队列,这里对SpringBoot集成Kafka实现消息队列做一个简单的记录。

    小柒2012

扫码关注云+社区

领取腾讯云代金券