logback日志写入kafka遇到的那些坑

这两天在学习storm实时流的时候需要将logback日志写入kafka,这期间遇到了很多坑,这里把遇到的坑和解决的问题记录一下,和大家共勉

坑1:引入kafka的依赖和import的包不对

由于第一次使用kafka,按照以往的经验,觉得应该是引入clinet的依赖 所以就引入了

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
        </dependency>

然后 Producer 引入的是 import org.apache.kafka.clients.producer.Producer 结果就在调用producer.send方法的时候一直阻塞,也不报错,并且 properties.put("serializer.class","kafka.serializer.StringEncoder"); 方法也一直报找不到类

结果一番周折,最终发现原来是引入的包不对,应该是引入

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>

并且由于kafka是用scala语言写的,所以还应该引入scala的源依赖

      <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

并且应该 import kafka.javaapi.producer.Producer;

坑二:在解决了坑一的问题以后,原本以为应该没事了,可是还是有问题,启动写日志的类一直报错

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
    at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:66)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:270)
    at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:155)
    at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:132)
    at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:657)
    at org.apache.hadoop.conf.Configuration.<clinit>(Configuration.java:173)

Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
    at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
    ... 8 more

百度了一下,发现,原因是:==log4j-over-slf4j和slf4j-log4j12是跟Java日志系统相关的两个jar包,当它们同时出现在classpath下时,就可能会引起堆栈溢出异常。==

解决的办法也很简单,在引入kafka依赖的时候,排除log4j12的依赖

<exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>

这样问题就迎刃而解了。

最后附上logback写入kafka的全部代码

logback.xml:loback配置文件

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <!--配置自定义日志输出类-->
    <appender name="KAFKA" class="com.gwf.log.KafkaAppender">
        <topic>mytopic</topic>
        <zookeeperHost>localhost:2181</zookeeperHost>
        <brokerList>localhost:9092</brokerList>
        <formatter class="com.gwf.log.formatter.JsonFormatter">
            <expectJson>false</expectJson>
        </formatter>
    </appender>

<!--debug 级别日志使用KAFKA写入-->
    <root level="debug">
        <appender-ref ref="KAFKA"/>
    </root>
</configuration>

JsonFormatter

import ch.qos.logback.classic.spi.ILoggingEvent;
import com.gwf.log.formatter.Formatter;

public class JsonFormatter implements Formatter {
    private static final String QUOTE = "\"";
    private static final String COLON = ":";
    private static final String COMMA = ",";

    private boolean expectJson = false;

    @Override
    public String format(ILoggingEvent event) {
        StringBuilder sb = new StringBuilder();
        sb.append("{");

        fieldName("level",sb);
        quoto(event.getLevel().levelStr,sb);
        sb.append(COMMA);

        fieldName("logger",sb);
        quoto(event.getLoggerName(),sb);
        sb.append(COMMA);

        fieldName("timestamp",sb);
        sb.append(event.getTimeStamp());
        sb.append(COMMA);

        fieldName("message",sb);
        if(this.expectJson){
            sb.append(event.getFormattedMessage());
        }else {
            quoto(event.getFormattedMessage(),sb);
        }

        sb.append("}");
        return sb.toString();
    }

    private static void fieldName(String name,StringBuilder sb){
        quoto(name,sb);
        sb.append(COLON);
    }

    private static void quoto(String value,StringBuilder sb){
        sb.append(QUOTE);
        sb.append(value);
        sb.append(QUOTE);
    }

    private boolean isExpectJson(){
        return expectJson;
    }

    public void setExpectJson(boolean expectJson){
        this.expectJson = expectJson;
    }
}

KafkaAppender: 写入logback日志的类,集成AppenderBase<ILoggingEvent> 并重写append方法可以自定义发送日志的逻辑

package com.gwf.log;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.gwf.log.formatter.Formatter;
import com.gwf.log.formatter.MessageFormatter;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import lombok.Data;


import java.util.Properties;


@Data
public class KafkaAppender extends AppenderBase<ILoggingEvent> {

    private String topic;
    private String zookeeperHost;
    private String brokerList;
    private Producer<String,String> producer;
    private Formatter formatter;

    @Override
    public void start() {
        if(null == this.formatter){
            this.formatter = new MessageFormatter();
        }
        super.start();
        Properties properties = new Properties();
        properties.put("metadata.broker.list",brokerList);
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.required.acks","1");
        ProducerConfig config = new ProducerConfig(properties);
        this.producer = new Producer<String, String>(config);
    }

    @Override
    public void stop() {
        super.stop();
        this.producer.close();
    }

    @Override
    protected void append(ILoggingEvent iLoggingEvent) {
    //讲日志转换成json
        String payload = this.formatter.format(iLoggingEvent);
        producer.send(new KeyedMessage<String, String>(topic,payload));
    }
}

RogueApplication: 模拟日志写入程序

package com.gwf.log;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RogueApplication  {
    private static final Logger LOG = LoggerFactory.getLogger(RogueApplication.class);

    public static void main(String[] args) throws InterruptedException {
        int slowCount = 6;
        int fastCount = 15;
        //slow state
        for (int i=0;i<slowCount;i++){
            LOG.warn("This is a warning (slow state)");
            Thread.sleep(5000);
        }

        //enter rapid state
        for(int i=0;i<fastCount;i++){
            LOG.warn("This is a warning (rapid state)");
            Thread.sleep(1000);
        }

        //return to slow state
        for(int i=0;i<slowCount;i++){
            LOG.warn("This is a warning (slow state)");
            Thread.sleep(5000);
        }
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏清晨我上码

spring mvc 国际化的几种方案

通过设置浏览器请求测试:http://localhost:8080/xxx/nation/test

8623
来自专栏闻道于事

Spring boot之SpringApplicationBuilder,@@Configuration注解,@Component注解

3890
来自专栏扎心了老铁

springboot与thrift集成实现服务端和客户端

我们这里用一个简单的小功能来演示一下如何使用springboot集成thrift 这个功能是,判断hdfs路径存在。 1、先解决依赖 <dependencie...

2.9K9
来自专栏Java技术栈

SpringMVC表单验证器的使用

本章讲解SpringMVC中怎么通过注解对表单参数进行验证。 SpringBoot配置 使用springboot, spring-boot-starter-we...

2953
来自专栏技术记录

rabbitMQ教程(三) spring整合rabbitMQ代码实例

一、开启rabbitMQ服务,导入MQ jar包和gson jar包(MQ默认的是jackson,但是效率不如Gson,所以我们用gson) ? ?  二、发...

3338
来自专栏IT 指南者专栏

Spring 框架系列之 JDBC 整合

微信公众号:compassblog 欢迎关注、转发,互相学习,共同进步! 有任何问题,请后台留言联系! 1、Spring框架整合 DAO 模板 JDBC:org...

32711
来自专栏石奈子的Java之路

原 荐 SpringBoot 2.0 系列0

4713
来自专栏你不就像风一样

2小时学会Spring Boot(IDE:eclipse)

1.)使启动类继承SpringBootServletInitializer 覆写configure()方法。

3684
来自专栏Hongten

apache的开源项目-模板引擎(Velocity)_学习了两天就上手啦_源码下载

首先,如果你对Velocity不是很了解,还是建议你去apache的官方网站上去走走....

1231
来自专栏java、Spring、技术分享

Spring Import 三种用法与源码解读

  最近在看Spring Cloud相关的源码,每次引入一个新的starter,发现都会加一些enable的注解,比如:@EnableDiscoveryClie...

3104

扫码关注云+社区

领取腾讯云代金券