Go实现海量日志收集系统(四)

到这一步,我的收集系统就已经完成很大一部分工作,我们重新看一下我们之前画的图:

我们已经完成前面的部分,剩下是要完成后半部分,将kafka中的数据扔到ElasticSearch,并且最终通过kibana展现出来

ElasticSearch

官网地址这里介绍了非常详细的安装方法: https://www.elastic.co/downloads/elasticsearch

但是其实这里是需要配置一些东西的,要不然直接启动是会悲剧的,在网上找了一个地址,如果出现类似的错误直接处理就行,我自己已经验证了: https://blog.csdn.net/liangzhao_jay/article/details/56840941

如下图所示就表示已经安装完成:

 通过go写一个简单的调用ElasticSearch的例子:

package main

import (
    "fmt"
    elastic "gopkg.in/olivere/elastic.v2"
)

type Tweet struct{
    User string
    Message string
}

func main(){
    client,err := elastic.NewClient(elastic.SetSniff(false),elastic.SetURL("http://192.168.0.118:9200/"))
    if err != nil{
        fmt.Println("connect es error",err)
        return
    }
    fmt.Println("conn es succ")
    tweet := Tweet{User:"olivere name",Message:"Take Five"}
    _, err = client.Index().Index("twitter").Type("tweet").Id("1").BodyJson(tweet).Do()
    if err != nil {
        panic(err)
        return
    }
    fmt.Println("insert succ")
}

logtransfer

logtransfer主要负责从 kafka队列中读取日志信息,并且添加到ElasticSearch中

看那一下logtransfer 目录结构如下:

├── conf
│   └── app.conf
├── es.go
├── etcd.go
├── ip.go
├── kafka.go
├── logs
│   └── transfer.log
└── main.go

conf:存放配置文件 es.go:主要是连接ElasticSearch的部分以及用于将消息放到ElasticSearch中 etcd.go:主要用于做动态的配置更改,当我们需要将kafka中的哪些topic日志内容扔到ElasticSearch中 ip.go: 用于获取当前服务器的ip地址 kafka.go: 主要是kafka的处理逻辑,包括连接kafka以及从kafka中读日志内容 main.go:代码的入口函数

整体大代码框架,通过如图展示:

和之前的logagent中的代码有很多启示是可以复用的或者稍作更改,就可以了,其中es之心的,主要是连接ElasticSearch并将日志内容放进去

es.go的代码内容为:

package main

import (
    "gopkg.in/olivere/elastic.v2"
    "github.com/astaxie/beego/logs"
    "sync"
    "encoding/json"
)

var waitGroup sync.WaitGroup

var client *elastic.Client

func initEs(addr string,) (err error){
    client,err = elastic.NewClient(elastic.SetSniff(false),elastic.SetURL(addr))
    if err != nil{
        logs.Error("connect to es error:%v",err)
        return
    }
    logs.Debug("conn to es success")
    return
}

func reloadKafka(topicArray []string) {
    for _, topic := range topicArray{
        kafkaMgr.AddTopic(topic)
    }
}

func reload(){
    //GetLogConf() 从channel中获topic信息,而这部分信息是从etcd放进去的
    for conf := range GetLogConf(){
        var topicArray []string
        err := json.Unmarshal([]byte(conf),&topicArray)
        if err != nil {
            logs.Error("unmarshal failed,err:%v conf:%v",err,conf)
            continue
        }
        reloadKafka(topicArray)
    }
}

func Run(esThreadNum int) (err error) {
    go reload()
    for i:=0;i<esThreadNum;i++{
        waitGroup.Add(1)
        go sendToEs()
    }
    waitGroup.Wait()
    return
}

type EsMessage struct {
    Message string
}

func sendToEs(){
    // 从msgChan中读取日志内容并扔到elasticsearch中
    for msg:= range GetMessage() {
        var esMsg EsMessage
        esMsg.Message = msg.line
        _,err := client.Index().Index(msg.topic).Type(msg.topic).BodyJson(esMsg).Do()
        if err != nil {
            logs.Error("send to es failed,err:%v",err)
            continue
        }
        logs.Debug("send to es success")
    }
    waitGroup.Done()
}

最终我将logagnet以及logtransfer部署到虚拟机上进行测试的效果是:

这样当我再次查日志的时候就可以不用登陆每台服务器去查日志,只需要通过页面根据关键字迅速看到相关日志,当然目前实现的功能还是有点粗糙,etcd的更改程序,是自己写的发送程序,其实更好的解决方法是通过页面,让用户点来点去,来控制自己要收集哪些日志,以及自己要将哪些topic的日志从kafka中放到ElasticSearch (本人是做后端开发,不擅长前端的开发,不过后面可以试着写个页面试试,估计会很丑哈哈)

同时这里关于各个部分的安装并没有做过多的介绍,以及维护,当然我们的目标是是通过这些开源的软件以及包来实现我们想要的功能,后期的维护,肯定需要对各个组件部分都进行深入了解

这里附赠一下那个etcd客户端代码:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "golang.org/x/net/context"
)

var logconf = `
[
    {
        "topic":"eslservice_log",
        "log_path":"/opt/pbx/log/eslservice.log",
        "service":"eslservice",
        "send_rate":50000
    }
]
`

var test111 = `
[
    {
        "topic":"test_log",
        "log_path":"D:/a.log",
        "service":"test",
        "send_rate":50000
    }
]
`


var transconf = `
[
    "eslservice_log"
]
`

func main() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.90.78:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil {
        fmt.Println("connect failed,err:",err)
        return
    }
    fmt.Println("connect success")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    //_,err = cli.Put(ctx,"/logagent/192.168.90.11/log_config",logconf)
    //_,err = cli.Put(ctx,"/logagent/192.168.90.61/log_config",test111)
    _, err = cli.Put(ctx,"/logtransfer/192.168.90.11/log_config",transconf)
    cancel()
    if err != nil {
        fmt.Println("put failed ,err:",err)
        return
    }
    ctx,cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"/logtransfer/",clientv3.WithPrefix())
    cancel()
    if err != nil {
        fmt.Println("get failed,err:",err)
        return
    }
    for _,ev:=range resp.Kvs{
        fmt.Printf("%s:%s\n",ev.Key,ev.Value)
    }
}

到目前为止基本的功能都已经实现了,当然了现在的代码结构还有的糙,后面会进行优化! 整个项目中的代码: logagent代码地址:https://github.com/pythonsite/logagent logtransfer代码地址:https://github.com/pythonsite/logtransfer

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏自动化测试实战

《selenium2 python 自动化测试实战》(13)——上传文件

38960
来自专栏电光石火

推荐几款比较好看HTML admin后台模板

H+是一个完全响应式,基于Bootstrap3.3.6最新版本开发的扁平化主题,她采用了主流的左右两栏式布局,使用了Html5+CSS3等现代技术,她提供了...

6.7K10
来自专栏mukekeheart的iOS之旅

iOS工具——Xcode9无证书真机调试

  入坑iOS开发这么久,一直都是在模拟器上运行,公司的项目也都有公司的开发者账号进行真机调试。但是很多时候在网上download一些demo想在真机上运行看一...

684100
来自专栏王二麻子IT技术交流园地

十一、VueJs 填坑日记之使用Amaze ui调整列表和内容页面

上一篇博文我们整合了Amaze ui,并且调整了一个头部header和底部footer文件,其实做起来也很简单,只要按照步骤来做,完全没有问题。今天我们来重新调...

307100
来自专栏GreenLeaves

Fiddler4抓包工具使用教程一

本文参考自http://blog.csdn.net/ohmygirl/article/details/17846199,纯属读书笔记,加深记忆 1、抓包工具有很...

846100
来自专栏salesforce零基础学习

salesforce零基础学习(七十四)apex:actionRegion以及apex:actionSupport浅谈

我们在开发中,很难会遇见不提交表单的情况。常用的apex:commandButton,apex:commandLink,apex:actionFunction,...

28070
来自专栏自由而无用的灵魂的碎碎念

查看服务时提示“一个或多个ActiveX控件无法显示”的解决方法

一个或多个ActiveX控件无法显示,原因可能是下列其中之一: 1)当前安全设置禁止运行此页面中的ActiveX控件,或 2)您已经阻止了其中一个...

14430
来自专栏向治洪

React native开发中常见的错误

react native环境搭建请移步:react native环境搭建 这里说说react native创建完成之后,运行中出现的常见问题, 问题1: jav...

33860
来自专栏雨尘分享

SDWebImage 引发的 cell不断下拉引起的闪退 卡顿

21820
来自专栏向治洪

React Native项目组织结构介绍

代码组织: 目录结构: . ├── components //组成应用的各个组件 │   ├── Routers.android.js //每个组...

29970

扫码关注云+社区

领取腾讯云代金券