前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka stream errorlog报警实例

kafka stream errorlog报警实例

作者头像
code4it
发布2018-09-17 15:10:37
6130
发布2018-09-17 15:10:37
举报
文章被收录于专栏:码匠的流水账码匠的流水账

KafkaAppender

log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java

代码语言:javascript
复制
public void append(final LogEvent event) {
        if (event.getLoggerName().startsWith("org.apache.kafka")) {
            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
        } else {
            try {
                final Layout<? extends Serializable> layout = getLayout();
                byte[] data;
                if (layout != null) {
                    if (layout instanceof SerializedLayout) {
                        final byte[] header = layout.getHeader();
                        final byte[] body = layout.toByteArray(event);
                        data = new byte[header.length + body.length];
                        System.arraycopy(header, 0, data, 0, header.length);
                        System.arraycopy(body, 0, data, header.length, body.length);
                    } else {
                        data = layout.toByteArray(event);
                    }
                } else {
                    data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
                }
                manager.send(data);
            } catch (final Exception e) {
                LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
                throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
            }
        }
    }

JsonLayout的话,是走data = layout.toByteArray(event);这一步,而toByteArray是调用 log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/layout/AbstractStringLayout.java

代码语言:javascript
复制
    /**
     * Formats the Log Event as a byte array.
     *
     * @param event The Log Event.
     * @return The formatted event as a byte array.
     */
    @Override
    public byte[] toByteArray(final LogEvent event) {
        return getBytes(toSerializable(event));
    }
    protected byte[] getBytes(final String s) {
        if (useCustomEncoding) { // rely on branch prediction to eliminate this check if false
            return StringEncoder.encodeSingleByteChars(s);
        }
        try { // LOG4J2-935: String.getBytes(String) gives better performance
            return s.getBytes(charsetName);
        } catch (final UnsupportedEncodingException e) {
            return s.getBytes(charset);
        }
    }

toSerializable是调用 log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/layout/AbstractJacksonLayout.java

代码语言:javascript
复制
    @Override
    public String toSerializable(final LogEvent event) {
        final StringBuilderWriter writer = new StringBuilderWriter();
        try {
            toSerializable(event, writer);
            return writer.toString();
        } catch (final IOException e) {
            // Should this be an ISE or IAE?
            LOGGER.error(e);
            return Strings.EMPTY;
        }
    }
    public void toSerializable(final LogEvent event, final Writer writer)
            throws JsonGenerationException, JsonMappingException, IOException {
        objectWriter.writeValue(writer, convertMutableToLog4jEvent(event));
        writer.write(eol);
        markEvent();
    }
    private static LogEvent convertMutableToLog4jEvent(final LogEvent event) {
        // TODO Jackson-based layouts have certain filters set up for Log4jLogEvent.
        // TODO Need to set up the same filters for MutableLogEvent but don't know how...
        // This is a workaround.
        return event instanceof MutableLogEvent
                ? ((MutableLogEvent) event).createMemento()
                : event;
    }

流式聚合

代码语言:javascript
复制
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> source = builder.stream("error-log");
        KStream<String, Log4jLogEventJson> beanStream = source.map(new KeyValueMapper<String, String, KeyValue<String, Log4jLogEventJson>>() {
            @Override
            public KeyValue<String, Log4jLogEventJson> apply(String key, String value) {
                Log4jLogEventJson bean = JSON.parseObject(value,Log4jLogEventJson.class);
                return new KeyValue<>(bean.getLoggerName(), bean);
            }
        });

        GenericJsonSerde<ErrorLogStats> statsGenericJsonSerde = new GenericJsonSerde<>(ErrorLogStats.class);
        GenericJsonSerde<Log4jLogEventJson> log4jLogEventJsonGenericJsonSerde = new GenericJsonSerde<>(Log4jLogEventJson.class);

        beanStream.groupByKey(Serdes.String(),log4jLogEventJsonGenericJsonSerde)
                .aggregate(ErrorLogStats::new,
                        (key, value, stats) -> {
                            System.out.println("key:"+key.getClass());
                            System.out.println("value:"+value.getClass());
                            return stats.add(value);
                        },
                        TimeWindows.of(10000).advanceBy(10000),
                        statsGenericJsonSerde,
                        "aggregate")
                .foreach((k,v) -> {
                    LOGGER.info("key:{},start:{},end:{},value:{}",k.key(),k.window().start(),k.window().end(),v.errors.size());
                    //这里进行报警即可
                });

输出日志实例

代码语言:javascript
复制
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:767)  - stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_0
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_2
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_1
2017-10-15 17:44:19 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_2
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
key:class java.lang.String
value:class com.example.demo.Log4jLogEventJson
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:767)  - stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_0
2017-10-15 17:44:29 [StreamThread-1] (ErrorLogStreamTest.java:90)  - key:com.example.demo.controller.DemoController,start:1508060660000,end:1508060670000,value:3
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 0_2
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_1
2017-10-15 17:44:29 [StreamThread-1] (StreamThread.java:805)  - stream-thread [StreamThread-1] Committing task StreamTask 1_2
2017-10-15 17:44:29 [StreamThread-1] (ErrorLogStreamTest.java:90)  - key:com.example.demo.controller.ErrorController,start:1508060660000,end:1508060670000,value:1

小结

自从kafka有了stream之后,感觉可以减少很多技术栈了,比如我可以不用学storm或者spark,就可以直接在kakfa上进行流式操作。关于kafka stream如何进行分布式呢,后续再研究下。

doc

  • log4j2输出到kafka
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-10-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • KafkaAppender
  • 流式聚合
  • 输出日志实例
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档