首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go语言(十九)日志采集项目之logagent开发(一)

Go语言(十九)日志采集项目之logagent开发(一)

作者头像
alexhuiwang
发布2020-09-23 11:38:52
1.4K0
发布2020-09-23 11:38:52
举报
文章被收录于专栏:运维博客运维博客

日志采集项目之logagent开发(一)

项目结构

  • 项目分为如下部分:
    • logagent
      • conf: 配置文件
      • kafka: kafka集成模块
      • tailf: 日志读取模块
      • main.go: 程序入口
    • xlog: 日志打印模块,参考https://blog.51cto.com/13812615/2490744
    • oconfig: 配置文件解析模块,参考:https://blog.51cto.com/13812615/2492150
logCollect/
├── logagent
│   ├── conf
│   │   └── config.ini
│   ├── kafka
│   │   └── kafka.go
│   ├── logs
│   │   └── logagent.log
│   ├── main.go
│   └── tailf
│       └── tail.go
├── oconfig
│   └── config.go
└── xlog
    ├── console.go
    ├── file.go
    ├── level.go
    ├── log.go
    ├── log_base.go
    └── tool.go

logagent代码:

  • config/config.ini
[kafka]
address=192.168.56.11:9092
queue_size=10000
[collect_log_conf]
log_filenames=/Users/wanghui/go/src/oldBoy/day11/my.log

[logs]
#level类型有debug,info,trace,warn,error,fatal
log_level=debug
filename=./logs/logagent.log
#log_type=file,console
log_type=console
module=logagent
  • main.go
package main

import (
    "fmt"
    "oldBoy/logagent/kafka"
    "oldBoy/logagent/tailf"
    "oldBoy/oconfig"
    "oldBoy/xlog"
    "strings"
)

var (
    appConfig AppConfig
)

// 配置文件结构体
type AppConfig struct {
    KafkaConf  KafkaConfig              `ini:"kafka"`
    CollectLogConf CollectLogConfig     `ini:"collect_log_conf"`
    LogConf    LogConfig                `ini:"logs"`
}

type KafkaConfig struct {
    Address string  `ini:"address"`
    QueueSize int    `ini:"queue_size"`
}

type CollectLogConfig struct {
    LogFilenames string `ini:"log_filenames"`
}

type LogConfig struct {
    LogLevel    string    `ini:"log_level"`
    Filename    string    `ini:"filename"`
    LogType     string    `ini:"log_type"`
    Module      string    `ini:"module"`
}

func initConfig(filename string) (err error)  {
    err = oconfig.UnMarshalFile(filename,&appConfig)
    if err != nil {
        return
    }
    //打印配置文件内容
    xlog.LogDebug("config:%#v",appConfig)
    return
}

func initLog() (err error)  {
    var logType int
    var level int
    //转换格式
    if appConfig.LogConf.LogType == "console" {
        logType = xlog.XLogTypeConsole
    }else {
        logType = xlog.XLogTypeFile
    }

    switch appConfig.LogConf.LogLevel {
    case "debug":
        level = xlog.XLogLevelDebug
    case "trace":
        level = xlog.XLogLevelTrace
    case "info":
        level = xlog.XLogLevelInfo
    case "warn":
        level = xlog.XLogLevelWarn
    case "error":
        level = xlog.XLogLevelError
    case "fatal":
        level = xlog.XLogLevelFatal
    default:
        level = xlog.XLogLevelDebug
    }
    //初始化日志库
    err = xlog.Init(logType,level,appConfig.LogConf.Filename,appConfig.LogConf.Module)
    return
}

func run() (err error) {
    //1. 从日志文件读取日志
    for {
        line,err := tailf.ReadLine()
        if err != nil {
            continue
        }
        xlog.LogDebug("line:%s",line.Text)
        //2. 发送日志数据到kafka
        msg := &kafka.Message{
            Line:  line.Text,
            Topic: "nginx_log",
        }
        err = kafka.SendLog(msg)
        if err != nil {
            xlog.LogWarn("send log to kafka faild,err:%v",err)
        }
        xlog.LogDebug("send to kafka succ")
    }
    return
}

func main() {
    //初始化配置
    err := initConfig("./conf/config.ini")
    if err != nil {
        panic(fmt.Sprintf("init config faild,err:%v",err))
    }
    //初始化日志库
    err = initLog()
    if err != nil{
        panic(fmt.Sprintf("init logs faild,err:%v",err))
    }
    xlog.LogDebug("init log success")
    //kafka初始化
    address := strings.Split(appConfig.KafkaConf.Address,",")
    err = kafka.Init(address,appConfig.KafkaConf.QueueSize)
    if err != nil {
        panic(fmt.Sprintf("init kafka faild,err:%v",err))
    }
    xlog.LogDebug("init kafka succ")
    //tail初始化
    err = tailf.Init(appConfig.CollectLogConf.LogFilenames)
    if err != nil {
        panic(fmt.Sprintf("init tail faild,err:%v",err))
    }
    xlog.LogDebug("init tailf succ")

    err = run()
    if err != nil {
        xlog.LogError("run faild,err:%v",err)
        return
    }
    xlog.LogDebug("run finished\n")
}
  • kafka/kafka.go
package kafka

import (
    "fmt"
    "github.com/Shopify/sarama"
    "oldBoy/xlog"     //这个取决于goPath的配置
)

var (
    client sarama.SyncProducer
    msgChan chan *Message
)

type Message struct {
    Line string
    Topic string
}

func Init(address []string,chanSize int) (err error){
    //初始化配置
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    //连接配置
    client,err = sarama.NewSyncProducer(address,config)
    if err != nil {
        xlog.LogError("send message faild,error:%v",err)
        return
    }
    msgChan = make(chan *Message,chanSize)
    go SendKafka()
    return
}

func SendKafka()  {
    //从管道获取数据,并发送出去
    for msg := range msgChan {
        kafkaMsg := &sarama.ProducerMessage{}
        kafkaMsg.Topic = msg.Topic
        kafkaMsg.Value = sarama.StringEncoder(msg.Line)

        //发送数据
        pid,offset,err := client.SendMessage(kafkaMsg)
        if err != nil {
            xlog.LogError("send message faild,err:%v",err)
            continue   //持续发日志
        }
        xlog.LogDebug("pid:%v,offset:%v",pid,offset)
    }
}

func SendLog(msg *Message) (err error) {
    if len(msg.Line) == 0 {
        //过滤空行
        return
    }
    select {
    case msgChan <- msg:
    default:
        err = fmt.Errorf("chan is full")
    }
    return
}
  • tailf/tail.go
package tailf

import (
    "fmt"
    "github.com/hpcloud/tail"
    "oldBoy/xlog"
)

var (
    tailObj *tail.Tail
)

func Init(filename string) (err error) {
    tailObj,err = tail.TailFile(filename,tail.Config{
        ReOpen: true,
        Follow: true,
        Location: &tail.SeekInfo{Offset: 0, Whence: 2,},
        MustExist: true,
        Poll: true,
    })

    if err != nil {
        xlog.LogError("tail file error:%v",err)
        return
    }
    return
}

func ReadLine() (msg *tail.Line,err error) {
    var ok bool
    msg,ok = <- tailObj.Lines
    if !ok {
        err = fmt.Errorf("read line faild")
        return
    }
    return
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 日志采集项目之logagent开发(一)
    • 项目结构
      • logagent代码:
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档