Go实现海量日志收集系统(二)

一篇文章主要是关于整体架构以及用到的软件的一些介绍,这一篇文章是对各个软件的使用介绍,当然这里主要是关于架构中我们agent的实现用到的内容

关于zookeeper+kafka

我们需要先把两者启动,先启动zookeeper,再启动kafka 启动ZooKeeper:./bin/zkServer.sh start 启动kafka:./bin/kafka-server-start.sh ./config/server.properties

操作kafka需要安装一个包:go get github.com/Shopify/sarama 写一个简单的代码,通过go调用往kafka里扔数据:

package main

import (
    "github.com/Shopify/sarama"
    "fmt"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    msg := &sarama.ProducerMessage{}
    msg.Topic = "nginx_log"
    msg.Value = sarama.StringEncoder("this is a good test,my message is zhaofan")
    client,err := sarama.NewSyncProducer([]string{"192.168.0.118:9092"},config)
    if err != nil{
        fmt.Println("producer close err:",err)
        return
    }
    defer client.Close()

    pid,offset,err := client.SendMessage(msg)
    if err != nil{
        fmt.Println("send message failed,",err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n",pid,offset)
}

config.Producer.RequiredAcks = sarama.WaitForAll 这里表示是在给kafka扔数据的时候是否需要确认收到kafka的ack消息

msg.Topic = "nginx_log" 因为kafka是一个分布式系统,假如我们要读的是nginx日志,apache日志,我们可以根据topic做区分,同时也是我们也可以有不同的分区

我们将上述代码执行一下,就会往kafka中扔一条消息,可以通过kakfa中自带的消费者命令查看: ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nginx_log --from-beginning

我们可以将最后的代码稍微更改一下,更改为循环发送:

for{
    pid,offset,err := client.SendMessage(msg)
    if err != nil{
        fmt.Println("send message failed,",err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n",pid,offset)
    time.Sleep(2*time.Second)
}

这样当我们再次执行的程序的时候,我们可以看到客户端在不停的消费到数据:

这样我们就实现一个kakfa的生产者的简单的demo

接下来我们还需要知道一个工具的使用tailf

tailf

我们的agent需要读日志目录下的日志文件,而日志文件是不停的增加并且切换文件的,所以我们就需要借助于tailf这个包来读文件,当然这里的tailf和linux里的tail -f命令虽然不同,但是效果是差不多的,都是为了获取日志文件新增加的内容。

而我们的客户端非常重要的一个地方就是要读日志文件并且将读到的日志文件推送到kafka

这里需要我们下载一个包:go get github.com/hpcloud/tail

我们通过下面一个例子对这个包进行一个基本的使用,更详细的api说明看:https://godoc.org/github.com/hpcloud/tail

package main

import (
    "github.com/hpcloud/tail"
    "fmt"
    "time"
)

func main() {
    filename := "/Users/zhaofan/go_project/src/go_dev/13/tailf/my.log"
    tails,err := tail.TailFile(filename,tail.Config{
        ReOpen:true,
        Follow:true,
        Location:&tail.SeekInfo{Offset:0,Whence:2},
        MustExist:false,
        Poll:true,
    })

    if err !=nil{
        fmt.Println("tail file err:",err)
        return
    }

    var msg *tail.Line
    var ok bool
    for true{
        msg,ok = <-tails.Lines
        if !ok{
            fmt.Printf("tail file close reopen,filenam:%s\n",tails,filename)
            time.Sleep(100*time.Millisecond)
            continue
        }
        fmt.Println("msg:",msg.Text)
    }
}

最终实现的效果是当你文件里面添加内容后,就可以不断的读取文件中的内容

日志库的使用

这里是通过beego的日志库实现的,beego的日志库是可以单独拿出来用的,还是非常方便的,使用例子如下:

package main

import (
    "github.com/astaxie/beego/logs"
    "encoding/json"
    "fmt"
)

func main() {
    config := make(map[string]interface{})
    config["filename"] = "/Users/zhaofan/go_project/src/go_dev/13/log/logcollect.log"
    config["level"] = logs.LevelTrace
    configStr,err := json.Marshal(config)
    if err != nil{
        fmt.Println("marshal failed,err:",err)
        return
    }
    logs.SetLogger(logs.AdapterFile,string(configStr))
    logs.Debug("this is a debug,my name is %s","stu01")
    logs.Info("this is a info,my name is %s","stu02")
    logs.Trace("this is trace my name is %s","stu03")
    logs.Warn("this is a warn my name is %s","stu04")
}

简单版本logagent的实现

这里主要是先实现核心的功能,后续再做优化和改进,主要实现能够根据配置文件中配置的日志路径去读取日志并将读取的实时推送到kafka消息队列中

关于logagent的主要结构如下:

程序目录结构为:

├── conf
│   └── app.conf
├── config.go
├── kafka.go
├── logs
│   └── logcollect.log
├── main.go
└── server.go

app.conf :配置文件 config.go:用于初始化读取配置文件中的内容,这里的配置文件加载是通过之前自己实现的配置文件热加载包处理的,博客地址:http://www.cnblogs.com/zhaof/p/8593204.html logcollect.log:日志文件 kafka.go:对kafka的操作,包括初始化kafka连接,以及给kafka发送消息 server.go:主要是tail 的相关操作,用于去读日志文件并将内容放到channel中

所以这里我们主要的代码逻辑或者重要的代码逻辑就是kafka.go 以及server.go

kafka.go代码内容为:

// 这里主要是kafak的相关操作,包括了kafka的初始化,以及发送消息的操作
package main

import (
    "github.com/Shopify/sarama"
    "github.com/astaxie/beego/logs"
)

var (
    client sarama.SyncProducer
    kafkaSender *KafkaSender
)

type KafkaSender struct {
    client sarama.SyncProducer
    lineChan chan string
}

// 初始化kafka
func NewKafkaSender(kafkaAddr string)(kafka *KafkaSender,err error){
    kafka = &KafkaSender{
        lineChan:make(chan string,100000),
    }
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    client,err := sarama.NewSyncProducer([]string{kafkaAddr},config)
    if err != nil{
        logs.Error("init kafka client failed,err:%v\n",err)
        return
    }
    kafka.client = client
    for i:=0;i<appConfig.KafkaThreadNum;i++{
        // 根据配置文件循环开启线程去发消息到kafka
        go kafka.sendToKafka()
    }
    return
}

func initKafka()(err error){
    kafkaSender,err = NewKafkaSender(appConfig.kafkaAddr)
    return
}

func (k *KafkaSender) sendToKafka(){
    //从channel中读取日志内容放到kafka消息队列中
    for v := range k.lineChan{
        msg := &sarama.ProducerMessage{}
        msg.Topic = "nginx_log"
        msg.Value = sarama.StringEncoder(v)
        _,_,err := k.client.SendMessage(msg)
        if err != nil{
            logs.Error("send message to kafka failed,err:%v",err)
        }
    }
}

func (k *KafkaSender) addMessage(line string)(err error){
    //我们通过tailf读取的日志文件内容先放到channel里面
    k.lineChan <- line
    return
}

server.go的代码为:

package main

import (
    "github.com/hpcloud/tail"
    "fmt"
    "sync"
    "github.com/astaxie/beego/logs"
    "strings"
)

type TailMgr struct {
    //因为我们的agent可能是读取多个日志文件,这里通过存储为一个map
    tailObjMap map[string]*TailObj
    lock sync.Mutex
}

type TailObj struct {
    //这里是每个读取日志文件的对象
    tail *tail.Tail
    offset int64  //记录当前位置
    filename string
}

var tailMgr *TailMgr
var waitGroup sync.WaitGroup

func NewTailMgr()(*TailMgr){
    tailMgr =  &TailMgr{
        tailObjMap:make(map[string]*TailObj,16),
    }
    return tailMgr
}

func (t *TailMgr) AddLogFile(filename string)(err error){
    t.lock.Lock()
    defer t.lock.Unlock()
    _,ok := t.tailObjMap[filename]
    if ok{
        err = fmt.Errorf("duplicate filename:%s\n",filename)
        return
    }
    tail,err := tail.TailFile(filename,tail.Config{
        ReOpen:true,
        Follow:true,
        Location:&tail.SeekInfo{Offset:0,Whence:2},
        MustExist:false,
        Poll:true,
    })

    tailobj := &TailObj{
        filename:filename,
        offset:0,
        tail:tail,
    }
    t.tailObjMap[filename] = tailobj
    return
}

func (t *TailMgr) Process(){
    //开启线程去读日志文件
    for _, tailObj := range t.tailObjMap{
        waitGroup.Add(1)
        go tailObj.readLog()
    }
}

func (t *TailObj) readLog(){
    //读取每行日志内容
    for line := range t.tail.Lines{
        if line.Err != nil {
            logs.Error("read line failed,err:%v",line.Err)
            continue
        }
        str := strings.TrimSpace(line.Text)
        if len(str)==0 || str[0] == '\n'{
            continue
        }

        kafkaSender.addMessage(line.Text)
    }
    waitGroup.Done()
}


func RunServer(){
    tailMgr = NewTailMgr()
    // 这一部分是要调用tailf读日志文件推送到kafka中
    for _, filename := range appConfig.LogFiles{
        err := tailMgr.AddLogFile(filename)
        if err != nil{
            logs.Error("add log file failed,err:%v",err)
            continue
        }

    }
    tailMgr.Process()
    waitGroup.Wait()
}

可以整体演示一下代码实现的效果,当我们运行程序之后我配置文件配置的目录为: log_files=/app/log/a.log,/Users/zhaofan/a.log 我通过一个简单的代码对对a.log循环追加内容,你可以从kafka的客户端消费力看到内容了:

完成的代码地址:https://github.com/pythonsite/logagent

小结

这次只是实现logagent的核心功能,实现了从日志文件中通过多个线程获取要读的日志内容,这里借助了tailf,并将获取的内容放到channel中,kafka.go会从channel中取出日志内容并放到kafka的消息队列中 这里并没有做很多细致的处理,下一篇文章会在这个代码的基础上进行改进。同时现在的配置文件的方式也不是最佳的,每次改动配置文件都需要重新启动程序,后面将通过etcd的方式。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏跟着阿笨一起玩NET

让visual studio 工具箱重新找回失去的DevExpess第三方组件

     有时候vs工具箱莫名其妙的丢失或者重置了DexExpress第三方组件,此时又免于麻烦重新安装一遍。可以通过如下方法重新恢复vs工具箱丢失的DevEx...

1473

如何使用Midnight Commander,一个可视文件管理器

对于初学者来说,使用命令行工具来管理Linux服务器上的文件的方式可能是令人生畏的,耗时的,有时甚至是会带来风险的。而资深使用者会希望切换到更合适的工具来处理不...

1.3K5
来自专栏张戈的专栏

SecureCRT全局发送相同命令,快速抓取服务器信息的方法

昨天,在新公司接到了第一个任务:统计所有服务器的几个信息。200 多台呢!一个台一台的去执行命令也太苦逼了吧?于是度了下,找到了这个方法,感觉很不错!现在来分享...

3627
来自专栏腾讯NEXT学位

JavaScript全栈开发-工具篇(下)

? 文章目录 ? 四、测试工具 1. 单元测试 单元测试(unit testing),是指对软件中的最小可测试单元进行检查和验证。常见的单元测试工具有: * ...

1312
来自专栏向治洪

React Native热更新方案

随着 React Native 的不断发展完善,越来越多的公司选择使用 React Native 替代 iOS/Android 进行部分业务线的开发,也有不少使...

3.7K7
来自专栏AhDung

【手记】让Fiddler抓取入站请求,或者叫用Fiddler做反向代理

最近在弄公众号开发,除了主动去调公众号接口,还存在公众号后台要反过来调你的情形,攻受转换一线间。对于回调的情况,想要知道对方是怎样来请求的很有必要。此前经常用F...

2123
来自专栏Felix的技术分享

在macOS 10.12 上编译 Android 5.1

2604
来自专栏IMWeb前端团队

Web自动化之Headless Chrome开发工具库

本文作者:IMWeb 钌子_rawbin 原文出处:IMWeb社区 未经同意,禁止转载 命令行运行Headless Chrome Chrome 安装(...

2326
来自专栏极客猴

Django 实战1:搭建属于自己社工查询系统(上)

前面的文章已经把模板、模型、视图、表单等知识点逐一讲解,大家已经熟悉它们具体用法。但如何将其串联起来还一筹莫展。本篇文章分享我之前做过的一个小项目,帮助大家抹开...

1.1K2
来自专栏陈仁松博客

UWP基础教程 - App多语言支持

UWP实现本地化非常简单,所谓本地化表现最为直观的就是UI上文字和布局方式了,针对文字,提供不同的语言资源文件即可,而针对布局方式,比如在 阿拉伯地区 阅读顺序...

3646

扫码关注云+社区

领取腾讯云代金券