前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >logback日志写入kafka遇到的那些坑

logback日志写入kafka遇到的那些坑

作者头像
Meet相识
发布2018-09-12 16:21:10
7.4K0
发布2018-09-12 16:21:10
举报
文章被收录于专栏:技术专栏技术专栏技术专栏

这两天在学习storm实时流的时候需要将logback日志写入kafka,这期间遇到了很多坑,这里把遇到的坑和解决的问题记录一下,和大家共勉

坑1:引入kafka的依赖和import的包不对

由于第一次使用kafka,按照以往的经验,觉得应该是引入clinet的依赖 所以就引入了

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
        </dependency>

然后 Producer 引入的是 import org.apache.kafka.clients.producer.Producer 结果就在调用producer.send方法的时候一直阻塞,也不报错,并且 properties.put("serializer.class","kafka.serializer.StringEncoder"); 方法也一直报找不到类

结果一番周折,最终发现原来是引入的包不对,应该是引入

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>

并且由于kafka是用scala语言写的,所以还应该引入scala的源依赖

      <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

并且应该 import kafka.javaapi.producer.Producer;

坑二:在解决了坑一的问题以后,原本以为应该没事了,可是还是有问题,启动写日志的类一直报错

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
    at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:66)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:270)
    at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:155)
    at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:132)
    at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:657)
    at org.apache.hadoop.conf.Configuration.<clinit>(Configuration.java:173)

Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
    at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
    ... 8 more

百度了一下,发现,原因是:==log4j-over-slf4j和slf4j-log4j12是跟Java日志系统相关的两个jar包,当它们同时出现在classpath下时,就可能会引起堆栈溢出异常。==

解决的办法也很简单,在引入kafka依赖的时候,排除log4j12的依赖

<exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>

这样问题就迎刃而解了。

最后附上logback写入kafka的全部代码

logback.xml:loback配置文件
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <!--配置自定义日志输出类-->
    <appender name="KAFKA" class="com.gwf.log.KafkaAppender">
        <topic>mytopic</topic>
        <zookeeperHost>localhost:2181</zookeeperHost>
        <brokerList>localhost:9092</brokerList>
        <formatter class="com.gwf.log.formatter.JsonFormatter">
            <expectJson>false</expectJson>
        </formatter>
    </appender>

<!--debug 级别日志使用KAFKA写入-->
    <root level="debug">
        <appender-ref ref="KAFKA"/>
    </root>
</configuration>
JsonFormatter
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.gwf.log.formatter.Formatter;

public class JsonFormatter implements Formatter {
    private static final String QUOTE = "\"";
    private static final String COLON = ":";
    private static final String COMMA = ",";

    private boolean expectJson = false;

    @Override
    public String format(ILoggingEvent event) {
        StringBuilder sb = new StringBuilder();
        sb.append("{");

        fieldName("level",sb);
        quoto(event.getLevel().levelStr,sb);
        sb.append(COMMA);

        fieldName("logger",sb);
        quoto(event.getLoggerName(),sb);
        sb.append(COMMA);

        fieldName("timestamp",sb);
        sb.append(event.getTimeStamp());
        sb.append(COMMA);

        fieldName("message",sb);
        if(this.expectJson){
            sb.append(event.getFormattedMessage());
        }else {
            quoto(event.getFormattedMessage(),sb);
        }

        sb.append("}");
        return sb.toString();
    }

    private static void fieldName(String name,StringBuilder sb){
        quoto(name,sb);
        sb.append(COLON);
    }

    private static void quoto(String value,StringBuilder sb){
        sb.append(QUOTE);
        sb.append(value);
        sb.append(QUOTE);
    }

    private boolean isExpectJson(){
        return expectJson;
    }

    public void setExpectJson(boolean expectJson){
        this.expectJson = expectJson;
    }
}
KafkaAppender: 写入logback日志的类,集成AppenderBase<ILoggingEvent> 并重写append方法可以自定义发送日志的逻辑
package com.gwf.log;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.gwf.log.formatter.Formatter;
import com.gwf.log.formatter.MessageFormatter;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import lombok.Data;


import java.util.Properties;


@Data
public class KafkaAppender extends AppenderBase<ILoggingEvent> {

    private String topic;
    private String zookeeperHost;
    private String brokerList;
    private Producer<String,String> producer;
    private Formatter formatter;

    @Override
    public void start() {
        if(null == this.formatter){
            this.formatter = new MessageFormatter();
        }
        super.start();
        Properties properties = new Properties();
        properties.put("metadata.broker.list",brokerList);
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.required.acks","1");
        ProducerConfig config = new ProducerConfig(properties);
        this.producer = new Producer<String, String>(config);
    }

    @Override
    public void stop() {
        super.stop();
        this.producer.close();
    }

    @Override
    protected void append(ILoggingEvent iLoggingEvent) {
    //讲日志转换成json
        String payload = this.formatter.format(iLoggingEvent);
        producer.send(new KeyedMessage<String, String>(topic,payload));
    }
}
RogueApplication: 模拟日志写入程序
package com.gwf.log;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RogueApplication  {
    private static final Logger LOG = LoggerFactory.getLogger(RogueApplication.class);

    public static void main(String[] args) throws InterruptedException {
        int slowCount = 6;
        int fastCount = 15;
        //slow state
        for (int i=0;i<slowCount;i++){
            LOG.warn("This is a warning (slow state)");
            Thread.sleep(5000);
        }

        //enter rapid state
        for(int i=0;i<fastCount;i++){
            LOG.warn("This is a warning (rapid state)");
            Thread.sleep(1000);
        }

        //return to slow state
        for(int i=0;i<slowCount;i++){
            LOG.warn("This is a warning (slow state)");
            Thread.sleep(5000);
        }
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.01.05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 坑1:引入kafka的依赖和import的包不对
  • 坑二:在解决了坑一的问题以后,原本以为应该没事了,可是还是有问题,启动写日志的类一直报错
  • 最后附上logback写入kafka的全部代码
    • logback.xml:loback配置文件
      • JsonFormatter
        • KafkaAppender: 写入logback日志的类,集成AppenderBase<ILoggingEvent> 并重写append方法可以自定义发送日志的逻辑
          • RogueApplication: 模拟日志写入程序
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档