前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >日志打入kafka改造历程-我们到底能走多远系列

日志打入kafka改造历程-我们到底能走多远系列

作者头像
java架构师
发布2019-01-28 10:29:45
4410
发布2019-01-28 10:29:45
举报
文章被收录于专栏:Java架构师进阶

方案

日志收集的方案有很多,包括各种日志过滤清洗,分析,统计,而且看起来都很高大上。本文只描述一个打入kafka的功能。

流程:app->kafka->logstash->es->kibana

业务应用直接将日志打入kafka,然后由logstash消费,数据进入es。

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

另一方面,应用在服务器上会打日志文件。

如图:

详细

初步实现

首先,我们来初步实现这个方案,搭建elk略去不谈,其中特别注意各个版本的兼容。这里主要在代码层面讲解如何实现的历程。

要将日志数据写入kafka,我们想只要依赖官方提供的kafka client就可以了,翻看github,有现成的:链接

没多少代码,通看一遍,在此基础上进行修改即可。

以下代码在spring boot框架基础。

核心appender代码:

publicclassKafkaAppenderextendsKafkaAppenderConfig{/**

* Kafka clients uses this prefix for its slf4j logging.

* This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects.

*/privatestaticfinalString KAFKA_LOGGER_PREFIX ="org.apache.kafka.clients";publicstaticfinalLogger logger = LoggerFactory.getLogger(KafkaAppender.class);privateLazyProducer lazyProducer =null;privatefinalAppenderAttachableImpl aai =newAppenderAttachableImpl();privatefinalConcurrentLinkedQueue queue =newConcurrentLinkedQueue();privatefinalFailedDeliveryCallback failedDeliveryCallback =newFailedDeliveryCallback() {@OverridepublicvoidonFailedDelivery(E evt, Throwable throwable){ aai.appendLoopOnAppenders(evt); } };publicKafkaAppender(){// setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer)addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); }@OverridepublicvoiddoAppend(E e){ ensureDeferredAppends();if(einstanceofILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) { deferAppend(e); }else{super.doAppend(e); } }@Overridepublicvoidstart(){// only error free appenders should be activatedif(!checkPrerequisites())return; lazyProducer =newLazyProducer();super.start(); }@Overridepublicvoidstop(){super.stop();if(lazyProducer !=null&& lazyProducer.isInitialized()) {try{ lazyProducer.get().close(); }catch(KafkaException e) {this.addWarn("Failed to shut down kafka producer: "+ e.getMessage(), e); } lazyProducer =null; } }@OverridepublicvoidaddAppender(Appender<E> newAppender){ aai.addAppender(newAppender); }@OverridepublicIterator> iteratorForAppenders() {returnaai.iteratorForAppenders(); }@OverridepublicAppender getAppender(String name) {returnaai.getAppender(name); }@OverridepublicbooleanisAttached(Appender<E> appender){returnaai.isAttached(appender); }@OverridepublicvoiddetachAndStopAllAppenders(){ aai.detachAndStopAllAppenders(); }@OverridepublicbooleandetachAppender(Appender<E> appender){returnaai.detachAppender(appender); }@OverridepublicbooleandetachAppender(String name){returnaai.detachAppender(name); }@Overrideprotectedvoidappend(E e){// encode 逻辑finalbyte[] payload = encoder.doEncode(e);finalbyte[] key = keyingStrategy.createKey(e);finalProducerRecord record =newProducerRecord(topic, key, payload); Producer producer = lazyProducer.get();if(producer ==null){ logger.error("kafka producer is null");return; }// 核心发送方法deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback); }protectedProducer createProducer() {returnnewKafkaProducer(newHashMap(producerConfig)); }privatevoiddeferAppend(E event){ queue.add(event); }// drains queue events to superprivatevoidensureDeferredAppends(){ E event;while((event = queue.poll()) !=null) {super.doAppend(event); } }/** * Lazy initializer for producer, patterned after commons-lang. * *@seeLazyInitializer */privateclassLazyProducer{privatevolatileProducer producer;privatebooleaninitialized;publicProducer get() { Producer result =this.producer;if(result ==null) {synchronized(this) {if(!initialized){ result =this.producer;if(result ==null) {// 注意 这里initialize可能失败,比如传入servers为非法字符,返回producer为空,所以只用initialized标记来确保不进行重复初始化,而避免不断出错的初始化this.producer = result =this.initialize(); initialized =true; } } } }returnresult; }protectedProducer initialize() { Producer producer =null;try{ producer = createProducer(); }catch(Exception e) { addError("error creating producer", e); }returnproducer; }publicbooleanisInitialized(){returnproducer !=null; } }}

以上代码对producer生产时进行initialized标记,确保在异常场景时只生产一次。

在实际场景中比如我们的servers配置非ip的字符,initialize方法会返回null,因为判断是否进行initialize()方法是判断producer是否为空,所以进入不断失败的情况,从而导致应用启动失败。

配置logback-spring.xml:

${LOG_KAFKA_TOPIC}bootstrap.servers=${LOG_KAFKA_SERVERS}

bootstrap.properties配置:

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

application.log.kafka.bootstrap.servers=10.0.11.55:9092application.log.kafka.topic=prod-java

在打入kafka的json进行自定义,上面的encoder.doEncode(e)进行扩展:

publicclass FormatKafkaMessageEncoder extends KafkaMessageEncoderBase {protectedstaticfinalintBUILDER_CAPACITY =2048;protectedstaticfinalintLENGTH_OPTION =2048;publicstaticfinalStringCAUSED_BY ="Caused by: ";publicstaticfinalStringSUPPRESSED ="Suppressed: ";publicstaticfinalcharTAB ='\t';publicbyte[] encode(ILoggingEvent event) { Map formatMap =newHashMap<>(); formatMap.put("timestamp", event.getTimeStamp()!=0?String.valueOf(newDate(event.getTimeStamp())):""); formatMap.put("span", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-SpanId"):""); formatMap.put("trace", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-TraceId"):""); formatMap.put("class", event.getLoggerName()); formatMap.put("level", event.getLevel() !=null?event.getLevel().toString():""); formatMap.put("message", event.getMessage()); formatMap.put("stacktrace", event.getThrowableProxy()!=null?convertStackTrace(event.getThrowableProxy()):""); formatMap.put("thread", event.getThreadName()); formatMap.put("ip", IpUtil.getLocalIP()); formatMap.put("application", event.getLoggerContextVO()!=null&&event.getLoggerContextVO().getPropertyMap()!=null? event.getLoggerContextVO().getPropertyMap().get("springAppName"):"");StringformatJson =JSONObject.toJSONString(formatMap);returnformatJson.getBytes(); } @Overridepublicbyte[] doEncode(E event) {returnencode((ILoggingEvent) event); }publicStringconvertStackTrace(IThrowableProxy tp){ StringBuilder sb =newStringBuilder(BUILDER_CAPACITY); recursiveAppend(sb, tp,null);returnsb.toString(); }privatevoidrecursiveAppend(StringBuilder sb, IThrowableProxy tp,Stringprefix) {if(tp ==null){return; }if(prefix !=null) { sb.append(prefix); } sb.append(tp.getClassName()).append(": ").append(tp.getMessage()); sb.append(CoreConstants.LINE_SEPARATOR); StackTraceElementProxy[] stepArray = tp.getStackTraceElementProxyArray();booleanunrestrictedPrinting = LENGTH_OPTION > stepArray.length;intmaxIndex = (unrestrictedPrinting) ? stepArray.length : LENGTH_OPTION;for(inti =0; i < maxIndex; i++) { sb.append(TAB); StackTraceElementProxy element = stepArray[i]; sb.append(element); sb.append(CoreConstants.LINE_SEPARATOR); } IThrowableProxy[] suppressed = tp.getSuppressed();if(suppressed !=null) {for(IThrowableProxy current : suppressed) { recursiveAppend(sb, current, SUPPRESSED); } } recursiveAppend(sb, tp.getCause(), CAUSED_BY); }}

其中recursiveAppend方法是模仿ch.qos.logback.classic.spi.ThrowableProxyUtil,用来答应异常的全部堆栈。

还有这个ip的获取问题,InetAddress.getLocalHost().getHostAddress()解决不了。

以下是详细代码:

publicclassIpUtil{publicstaticfinalString DEFAULT_IP ="127.0.0.1";publicstaticString cacheLocalIp =null;privatestaticLogger logger = LoggerFactory.getLogger(IpUtil.class);/** * 直接根据第一个网卡地址作为其内网ipv4地址,避免返回 127.0.0.1 * *@return*/privatestaticStringgetLocalIpByNetworkCard(){ String ip =null;try{for(Enumeration e = NetworkInterface.getNetworkInterfaces(); e.hasMoreElements(); ) { NetworkInterface item = e.nextElement();for(InterfaceAddress address : item.getInterfaceAddresses()) {if(item.isLoopback() || !item.isUp()) {continue; }if(address.getAddress()instanceofInet4Address) { Inet4Address inet4Address = (Inet4Address) address.getAddress(); ip = inet4Address.getHostAddress(); } } } }catch(Exception e) { logger.error("getLocalIpByNetworkCard error", e);try{ ip = InetAddress.getLocalHost().getHostAddress(); }catch(Exception e1) { logger.error("InetAddress.getLocalHost().getHostAddress() error", e1); ip = DEFAULT_IP; } }returnip ==null? DEFAULT_IP : ip; }publicsynchronizedstaticStringgetLocalIP(){if(cacheLocalIp ==null){ cacheLocalIp = getLocalIpByNetworkCard();returncacheLocalIp; }else{returncacheLocalIp; } }}

另外在logback-spring.xml中配置了本地日志appender:

<!-- 按照每天生成日志文件 --><!-- rollover daily -->${LOG_FOLDER}/${springAppName}.%d{yyyy-MM-dd}.%i.log<!-- each file should be at most 100MB, keep 6 days worth of history-->300MB<!--历史文件保留个数-->5<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->${CONSOLE_LOG_PATTERN}

注意这里使用SizeAndTimeBasedRollingPolicy而不是使用TimeBasedRollingPolicy+SizeBasedTriggeringPolicy。

后者是按文件大小优先级最高不会自动按日期生成新的log文件。

至此,一个打入kafka日志的代码就算完结了,功能完全,执行正确。

异常场景

思考下,在启动应用或在应用运行时,kafka无法正确接收信息,比如挂掉了。那么这个打日志的功能会怎么表现呢?

当然是每次写日志都会尝试去连kafka,但是失败,必然影响应用状态。

所以想到熔断的思路,假设kafka挂掉,可以通过熔断的方式降低对应用的影响。

这里就实现了一下熔断器的逻辑。

状态流转图:

熔断器:

/**

* @desc 熔断器

* 1,使用failureCount和consecutiveSuccessCount控制断路器状态的流转,两者都使用了AtomicInteger以确保并发场数量的精准

* 2,successCount 没有使用AtomicInteger 不确保准确性

* 3,failureThreshold,consecutiveSuccessThreshold,timeout参数非法赋默认值

*/publicclassCircuitBreaker{privatestaticfinal Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);privateString name;/**

* 熔断器状态

*/privateCircuitBreakerState state;/**

* 失败次数阀值

*/privateintfailureThreshold;/**

* 熔断状态时间窗口

*/privatelongtimeout;/**

* 失败次数

*/privateAtomicInteger failureCount;/**

* 成功次数 (并发不准确)

*/privateintsuccessCount;/**

* 半开时间窗口里连续成功的次数

*/privateAtomicInteger consecutiveSuccessCount;/**

* 半开时间窗口里连续成功的次数阀值

*/privateintconsecutiveSuccessThreshold;publicCircuitBreaker(String name,intfailureThreshold,intconsecutiveSuccessThreshold,longtimeout){if(failureThreshold <=0){ failureThreshold =1; }if(consecutiveSuccessThreshold <=0){ consecutiveSuccessThreshold =1; }if(timeout <=0){ timeout =10000; }this.name = name;this.failureThreshold = failureThreshold;this.consecutiveSuccessThreshold = consecutiveSuccessThreshold;this.timeout = timeout;this.failureCount =newAtomicInteger(0);this.consecutiveSuccessCount =newAtomicInteger(0); state =newCloseCircuitBreakerState(this); }publicvoidincreaseFailureCount(){ failureCount.addAndGet(1); }publicvoidincreaseSuccessCount(){ successCount++; }publicvoidincreaseConsecutiveSuccessCount(){ consecutiveSuccessCount.addAndGet(1); }publicbooleanincreaseFailureCountAndThresholdReached(){returnfailureCount.addAndGet(1) >= failureThreshold; }publicbooleanincreaseConsecutiveSuccessCountAndThresholdReached(){returnconsecutiveSuccessCount.addAndGet(1) >= consecutiveSuccessThreshold; }publicbooleanisNotOpen(){return!isOpen(); }/**

* 熔断开启 关闭保护方法的调用

* @return

*/publicbooleanisOpen(){returnstate instanceof OpenCircuitBreakerState; }/**

* 熔断关闭 保护方法正常执行

* @return

*/publicbooleanisClose(){returnstate instanceof CloseCircuitBreakerState; }/**

* 熔断半开 保护方法允许测试调用

* @return

*/publicbooleanisHalfClose(){returnstate instanceof HalfOpenCircuitBreakerState; }publicvoidtransformToCloseState(){ state =newCloseCircuitBreakerState(this); }publicvoidtransformToHalfOpenState(){ state =newHalfOpenCircuitBreakerState(this); }publicvoidtransformToOpenState(){ state =newOpenCircuitBreakerState(this); }/**

* 重置失败次数

*/publicvoidresetFailureCount(){ failureCount.set(0); }/**

* 重置连续成功次数

*/publicvoidresetConsecutiveSuccessCount(){ consecutiveSuccessCount.set(0); }publiclonggetTimeout(){returntimeout; }/**

* 判断是否到达失败阀值

* @return

*/protectedbooleanfailureThresholdReached(){returnfailureCount.get() >= failureThreshold; }/**

* 判断连续成功次数是否达到阀值

* @return

*/protectedbooleanconsecutiveSuccessThresholdReached(){returnconsecutiveSuccessCount.get() >= consecutiveSuccessThreshold; }/**

* 保护方法失败后操作

*/publicvoidactFailed(){ state.actFailed(); }/**

* 保护方法成功后操作

*/publicvoidactSuccess(){ state.actSuccess(); }publicstaticinterface Executor {/**

* 任务执行接口

*

*/voidexecute(); }publicvoidexecute(Executor executor){if(!isOpen()){try{ executor.execute();this.actSuccess(); }catch(Exception e){this.actFailed(); logger.error("CircuitBreaker executor error", e); } }else{ logger.error("CircuitBreaker named {} is open",this.name); } }publicStringshow(){ Mapmap=newHashMap<>();map.put("name:",name);map.put("state", isClose());map.put("failureThreshold:",failureThreshold);map.put("failureCount:",failureCount);map.put("consecutiveSuccessThreshold:",consecutiveSuccessThreshold);map.put("consecutiveSuccessCount:",consecutiveSuccessCount);map.put("successCount:",successCount);map.put("timeout:",timeout);map.put("state class",state.getClass());returnJSONObject.toJSONString(map); }}

状态机:

publicinterfaceCircuitBreakerState{/**

* 保护方法失败后操作

*/voidactFailed();/**

* 保护方法成功后操作

*/voidactSuccess();}publicabstractclassAbstractCircuitBreakerStateimplementsCircuitBreakerState{protectedCircuitBreaker circuitBreaker;publicAbstractCircuitBreakerState(CircuitBreaker circuitBreaker){this.circuitBreaker = circuitBreaker; }@OverridepublicvoidactFailed(){ circuitBreaker.increaseFailureCount(); }@OverridepublicvoidactSuccess(){ circuitBreaker.increaseSuccessCount(); }}publicclassCloseCircuitBreakerStateextendsAbstractCircuitBreakerState{publicCloseCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker); circuitBreaker.resetFailureCount(); circuitBreaker.resetConsecutiveSuccessCount(); }@OverridepublicvoidactFailed(){// 进入开启状态if(circuitBreaker.increaseFailureCountAndThresholdReached()) { circuitBreaker.transformToOpenState(); } }}publicclassHalfOpenCircuitBreakerStateextendsAbstractCircuitBreakerState{publicHalfOpenCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker); circuitBreaker.resetConsecutiveSuccessCount(); }@OverridepublicvoidactFailed(){super.actFailed(); circuitBreaker.transformToOpenState(); }@OverridepublicvoidactSuccess(){super.actSuccess();// 达到成功次数的阀值 关闭熔断if(circuitBreaker.increaseFailureCountAndThresholdReached()){ circuitBreaker.transformToCloseState(); } }}publicclassOpenCircuitBreakerStateextendsAbstractCircuitBreakerState{publicOpenCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker);finalTimer timer =newTimer(); timer.schedule(newTimerTask() {@Overridepublicvoidrun(){ circuitBreaker.transformToHalfOpenState(); timer.cancel(); } }, circuitBreaker.getTimeout()); }}/* @desc 熔断器工厂 集中应用中的CircuitBreaker

* 注意:这里一个熔断器一旦生产,生命周期和应用一样,不会被清除

*/publicclassCircuitBreakerFactory{privatestaticConcurrentHashMap circuitBreakerMap =newConcurrentHashMap();publicCircuitBreakergetCircuitBreaker(String name){ CircuitBreaker circuitBreaker = circuitBreakerMap.get(name);returncircuitBreaker; }/** * *@paramname 唯一名称 *@paramfailureThreshold 失败次数阀值 *@paramconsecutiveSuccessThreshold 时间窗内成功次数阀值 *@paramtimeout 时间窗 * 1,close状态时 失败次数>=failureThreshold,进入open状态 * 2,open状态时每隔timeout时间会进入halfOpen状态 * 3,halfOpen状态里需要连续成功次数达到consecutiveSuccessThreshold, * 即可进入close状态,出现失败则继续进入open状态 *@return*/publicstaticCircuitBreakerbuildCircuitBreaker(String name,intfailureThreshold,intconsecutiveSuccessThreshold,longtimeout){ CircuitBreaker circuitBreaker =newCircuitBreaker(name, failureThreshold, consecutiveSuccessThreshold, timeout); circuitBreakerMap.put(name, circuitBreaker);returncircuitBreaker; }}

发送kafka消息时使用熔断器:

/**

* 因日志为非业务应用核心服务,防止kafka不稳定导致影响应用状态,这里使用使用熔断机制 失败3次开启熔断,每隔20秒半开熔断,连续成功两次关闭熔断。

*/CircuitBreaker circuitBreaker = CircuitBreakerFactory.buildCircuitBreaker("KafkaAppender-c",3,2,20000); public boolean send(Producer producer, ProducerRecord record, final E event, final FailedDeliveryCallback failedDeliveryCallback) {if(circuitBreaker.isNotOpen()){try{ producer.send(record, (metadata, exception) -> {if(exception !=null) { circuitBreaker.actFailed(); failedDeliveryCallback.onFailedDelivery(event, exception); logger.error("kafka producer send log error",exception); }else{ circuitBreaker.actSuccess(); } });returntrue; }catch(KafkaException e){circuitBreaker.actFailed();failedDeliveryCallback.onFailedDelivery(event, e);logger.error("kafka send log error",e);returnfalse; } }else{logger.error("kafka log circuitBreaker open");returnfalse; } }

总结

1,elk搭建时需特别注意各个版本的兼容,kafka client的版本需和kafka版本保持一致

2,方案容许kafka日志失败,而本地日志更加可靠,所以用熔断器方案,以应对万一。也可用于对其他第三方请求时使用。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.01.17 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云直播
云直播(Cloud Streaming Services,CSS)为您提供极速、稳定、专业的云端直播处理服务,根据业务的不同直播场景需求,云直播提供了标准直播、快直播、云导播台三种服务,分别针对大规模实时观看、超低延时直播、便捷云端导播的场景,配合腾讯云视立方·直播 SDK,为您提供一站式的音视频直播解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档