前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink(1.12.1)日志配置Logback实现日志切分和kafka发送

Flink(1.12.1)日志配置Logback实现日志切分和kafka发送

作者头像
Eights
发布2021-03-16 15:02:03
1.7K0
发布2021-03-16 15:02:03
举报
文章被收录于专栏:Eights做数据Eights做数据

文档主要内容

产线环境上的Flink应用是长时运行的应用,日志量较大,需要将flink应用的日志发送到外部系统,方便进行日志检索。

最近,在开发环境上遇到了,Flink连接kafka报错刷出大量错误日志,把磁盘打满的问题。Flink从1.11开始已经实现了日志滚动,于是决定将Flink版本升级到最新的1.12.1并配置logback的rollingFileAppender和kafkaAppender实现日志切分和kafka发送。

集群环境

  • CDH-5.16.2
  • Flink-1.12.1
  • flink on yarn per job模式

Flink日志配置Logback实现日志切分和kafka发送

kafka发送部分的实现请参考之前的文章:如何将Flink应用的日志发送到kafka。其中,logback的jar包添加与该文一致。

Flink日志配置官网参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/advanced/logging.html

logback appenders配置参考:http://logback.qos.ch/manual/appenders.html

logback详细配置
  • 其中AppNameLayOut是为了在日志中打上每个Flink应用独立的业务名称
代码语言:javascript
复制
<configuration>
    <property name="LOG_PATTERN"
              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %app [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n"/>

    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.file}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
            <fileNamePattern>${log.file}.%i</fileNamePattern>
            <minIndex>1</minIndex>
            <maxIndex>10</maxIndex>
        </rollingPolicy>
        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <maxFileSize>10KB</maxFileSize>
        </triggeringPolicy>
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="com.changan.carbond.common.logconver.AppNameLayOut">
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %app [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
                </pattern>
            </layout>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">

        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="com.changan.carbond.common.logconver.AppNameLayOut">
                <pattern>${LOG_PATTERN}</pattern>
            </layout>
            <charset>UTF-8</charset>
        </encoder>
        <topic>flink-app-logs</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
        <producerConfig>bootstrap.servers=broker1:9092,broker2:9092,broker3:9092</producerConfig>
        <producerConfig>retries=3</producerConfig>
        <producerConfig>acks=1</producerConfig>
        <producerConfig>batch-size=16384</producerConfig>
        <producerConfig>buffer-memory=33554432</producerConfig>
        <!-- wait up to 1000ms and collect log messages before sending them as a batch -->
        <producerConfig>linger.ms=1000</producerConfig>
        <!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
        <producerConfig>max.block.ms=0</producerConfig>
    </appender>

    <!-- This affects logging for both user code and Flink -->
    <root level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </root>

    <!-- Uncomment this if you want to only change Flink's logging -->
    <!--<logger name="org.apache.flink" level="INFO">-->
    <!--<appender-ref ref="file"/>-->
    <!--</logger>-->

    <!-- The following lines keep the log level of common libraries/connectors on
         log level INFO. The root logger does not override this. You have to manually
         change the log levels here. -->
    <logger name="akka" level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>
    <logger name="org.apache.kafka" level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>
    <logger name="org.apache.hadoop" level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>
    <logger name="org.apache.zookeeper" level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>

    <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
    <logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
        <appender-ref ref="file"/>
        <appender-ref ref="kafka"/>
    </logger>
</configuration>
Flink日志切分和日志搜集测试
  • 编写一个简单的Flink应用,在apply方法中打个日志
  • 编译打包,运行flink run将任务提交到集群,采用flink on yarn per job模式
  • 将flink集群日志文件配置为10kb滚动一个文件,测试日志滚动的效果
代码语言:javascript
复制
flink run -m yarn-cluster -yd -ytm 2g -ys 4 -yjm 2g -ynm flink-demo测试 \
-c com.eights.carbond.streaming.sensor.AverageSensorReadings \
./flink-demo-1.0-SNAPSHOT.jar
  • 业务独立名称打在日志中
  • 日志文件滚动正常
  • ES检查是否有Flink日志进入

整个日志搜集正常,Flink1.12日志配置logback日志切分和kafka搜集完成

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-02-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Eights做数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文档主要内容
  • 集群环境
  • Flink日志配置Logback实现日志切分和kafka发送
    • logback详细配置
      • Flink日志切分和日志搜集测试
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档