基于akka的多线程应用程序日志收集服务

Akka is a toolkit and runtime for building highly concurrent,distributed, and resilient message-driven applications on the JVM. Akka是JVM之上高并发的分布式,可伸缩的消息驱动应用框架。下面我们将通过Akka框架实现多线程的日志收集功能。我们把收集到的日志数据实时存放到HDFS中,以供后续分析挖掘使用。

通过SSH方式,远程登录到客户端 使用SSH在远程客户端执行SHELL命令,如最常见的tail -f实时读取日志文件增量

package com.changtu.serviceimport java.io.import akka.actor.Actorimport com.changtu.core.Hostimport com.changtu.util.Loggingimport com.changtu.util.hdfs.HDFSUtilsimport com.changtu.util.host.import org.apache.hadoop.fs.Pathimport org.joda.time.DateTimeimport scala.util./** * Created by lubinsu on 8/22/2016. * 日志收集程序,通过指定的命令收集各个客户端的日志,通过akka实现并发操作 */class CollectLogService extends Actor with Logging { override def receive: Receive = { case Host(host, port, cmd) => getLogs(host, port, cmd) match { case 0 => logger.info("success.") case _ => logger.error("error.") } case _ => logger.warn("unknown operation.") } /** * 根据shell命令收集指定主机上的日志 * * @param host 需要收集的主机 * @param port ssh端口号 * @param cmd 执行命令 * @return 返回执行后的状态码 */ private def getLogs(host: String, port: String, cmd: String): Int = { // 密码解密 val password = AES.decrypt(Configuration("passwd").getProperty(host.concat("-hadoop")), "secretKey.changtu.com") match { case Success(encrypted) => encrypted.asInstanceOf[String] case Failure(e) => logger.error(e.getMessage) "" } val ssh = (cmd: String) => SSH(host, "hadoop", port.toInt, cmd, "", password, loadToHdfs) ssh(cmd) } /** * 收集到的日志处理方式 * @param msg 传入一行行记录 */ private def loadToHdfs(msg: String, host: String): Unit = { //logger.info(msg) val currentTime = DateTime.now.toString("yyyyMMdd") val path = "/user/hadoop/bigdata/logs/rest.".concat(host).concat("-").concat(currentTime).concat(".log") HDFSUtils.createDirectory(path, deleteF = false) val fsout = HDFSUtils.getHdfs.append(new Path(path)) val br = new BufferedWriter(new OutputStreamWriter(fsout)) br.write(msg) br.newLine() br.close() fsout.close() } /** * 收集到的日志处理方式 * @param msg 传入一行行记录 */ private def loadToKafka(msg: String, host: String): Unit = { //logger.info(msg) val currentTime = DateTime.now.toString("yyyyMMdd") val path = "/user/hadoop/bigdata/logs/rest.".concat(host).concat("-").concat(currentTime).concat(".log") HDFSUtils.createDirectory(path, deleteF = false) val fsout = HDFSUtils.getHdfs.append(new Path(path)) val br = new BufferedWriter(new OutputStreamWriter(fsout)) br.write(msg) br.newLine() br.close() fsout.close() }}

定义Host class

package com.changtu.util/*** Created by lubinsu on 8/16/2016.* 配置默认参数*/package object host {/*** Specifies the default `charset` used to encode and decode strings.*/private[host] final val DefaultCharset = "UTF-8"}

命令派发

package com.changtu.serviceimport akka.actor.import com.changtu.core.Hostimport com.changtu.util.Loggingimport com.changtu.util.host.import scala.util./*** Created by lubinsu on 8/23/2016.* 命令派发*/class CollectMonitor extends Actor with Logging {override def receive: Receive = {case Host(host, port, cmd) =>getLogFiles(host, port, cmd)val collector = context.actorOf(Props[CollectLogService], "collector-".concat(host))context.children.foreach( p => {println(p.path.name)})collector ! Host(host, port, cmd)case _ => logger.warn("unknown operation.")}private def getLogFiles(host: String, port: String, cmd: String): Int = {// 密码解密val password = AES.decrypt(Configuration("passwd").getProperty(host.concat("-hadoop")), "secretKey.changtu.com") match {case Success(encrypted) =>encrypted.asInstanceOf[String]case Failure(e) =>logger.error(e.getMessage)""}val ssh = (cmd: String) => SSH(host, "hadoop", port.toInt, cmd, "", password)ssh("find /appl/logs -type f")}}

主Actor

package com.changtu.apiimport akka.actor.import com.changtu.core.Hostimport com.changtu.service.CollectMonitor/*** Created by lubinsu on 8/23/2016.* 主Actor*/object CollectLogs extends App {if (args.length ")System.exit(1)}val Array(hosts, cmd) = argsval system = ActorSystem("CollectSystem")val monitor = system.actorOf(Props[CollectMonitor], name = "CollectMonitor-".concat(hosts))hosts.split(",").foreach( p => {// default Actor constructormonitor ! Host(p.split(":")(0), p.split(":")(1), cmd)})}

打包执行

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180503G0I66W00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券