目前容器日志有两种输出形式:
不论你的业务容器日志如何输出,都是可以使用统一的日志收集器收集。常见的日志收集方式:
docker run --log-driver=fluentd --log-opt fluentd-address=myhost.local:24224
同样如果是日志文件,将文件暴露出来直接使用 fluentd 收集。# /container/container.go:63
type CommonContainer struct {
StreamConfig *stream.Config
...
}
# /container/stream/streams.go:26
type Config struct {
sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
}
moby源码来看,每一个container实例都有几个属性stdout,stderr,stdin,以及管道stdinPipe(当容器使用-i参数启动时标准输入将被运行,daemon将能够使用此管道向容器内写入标准输入). 那么针对如上的实例该如何实现日志收集转发?
# /container/container.go:312
func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {
c, err := logger.GetLogDriver(cfg.Type)
if err != nil {
return nil, fmt.Errorf("Failed to get logging factory: %v", err)
}
ctx := logger.Context{
Config: cfg.Config,
ContainerID: container.ID,
ContainerName: container.Name,
ContainerEntrypoint: container.Path,
ContainerArgs: container.Args,
ContainerImageID: container.ImageID.String(),
ContainerImageName: container.Config.Image,
ContainerCreated: container.Created,
ContainerEnv: container.Config.Env,
ContainerLabels: container.Config.Labels,
DaemonName: "docker",
}
// Set logging file for "json-logger"
if cfg.Type == jsonfilelog.Name {
ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID))
if err != nil {
return nil, err
}
}
return c(ctx)
}
#/container/container.go:978
func (container *Container) startLogging() error {
if container.HostConfig.LogConfig.Type == "none" {
return nil // do not start logging routines
}
l, err := container.StartLogger(container.HostConfig.LogConfig)
if err != nil {
return fmt.Errorf("Failed to initialize logging driver: %v", err)
}
copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
container.LogCopier = copier
copier.Run()
container.LogDriver = l
// set LogPath field only for json-file logdriver
if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
container.LogPath = jl.LogPath()
}
return nil
}
第一个方法是为container查找log-driver。首先根据容器配置的log-driver类别调用:logger.GetLogDriver(cfg.Type)
返回一个方法类型:
/daemon/logger/factory.go:9
type Creator func(Context) (Logger, error)
实质就是从工厂类注册的logdriver插件去查找,具体源码下文分析。获取到c方法后构建调用参数具体就是容器的一些信息。然后使用调用c方法返回driver。driver是个接口类型,我们看看有哪些方法:
# /daemon/logger/logger.go:61
type Logger interface {
Log(*Message) error
Name() string
Close() error
}
很简单的三个方法,也很容易理解,Log()发送日志消息到driver,Close()进行关闭操作(根据不同实现)。 也就是说我们自己实现一个logdriver,只需要实现如上三个方法,然后注册到logger工厂类中即可。下面我们来看/daemon/logger/factory.go 第二个方法就是处理日志了,获取到日志driver,在创建一个Copier,顾名思义就是复制日志,分别从stdout 和stderr复制到logger driver。下面看看具体关键实现:
#/daemon/logger/copir.go:41
func (c *Copier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
reader := bufio.NewReader(src)
for {
select {
case <-c.closed:
return
default:
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'})
// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
}
if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
}
return
}
}
}
}
每读取一行数据,构建一个消息,调用logdriver的log方法发送到driver处理。
位于/daemon/logger/factory.go的源码实现即时日志driver的注册器,其中几个重要的方法(上文已经提到一个):
# /daemon/logger/factory.go:21
func (lf *logdriverFactory) register(name string, c Creator) error {
if lf.driverRegistered(name) {
return fmt.Errorf("logger: log driver named '%s' is already registered", name)
}
lf.m.Lock()
lf.registry[name] = c
lf.m.Unlock()
return nil
}
# /daemon/logger/factory.go:39
func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error {
lf.m.Lock()
defer lf.m.Unlock()
if _, ok := lf.optValidator[name]; ok {
return fmt.Errorf("logger: log validator named '%s' is already registered", name)
}
lf.optValidator[name] = l
return nil
}
看起来很简单,就是将一个Creator方法类型添加到一个map结构中,将LogOptValidator添加到另一个map这里注意加锁的操作。
#/daemon/logger/factory.go:13
type LogOptValidator func(cfg map[string]string) error
这个主要是验证driver的参数 ,dockerd和docker启动参数中有:—log-opt
使用自己实现的 zeroMQ-driver 直接将容器日志通过 0MQ 发到日志统一处理中心。在处理中心统一完成下一步处理。如果平台用户需要将日志向外输出或者直接对接平台内日志分析应用,我们的处理是在应用 pod 中启动日志收集插件容器(封装扩展的 fluentd ),根据用户的需要配置日志出口,实现应用级日志收集。容器日志首先是由 docker-daemon 收集到,再根据容器 log-driver 配置进行相应操作,也就是说如果你的宿主机网络与容器网络不通(k8s 集群),日志从宿主机到 pod 中的收集容器只有两种方式:走外层网络,文件挂载。 我们采用文件挂载方式。 以zmq-driver为例讲讲我们怎么实现自己的driver。直接接收容器的日志。
//定义一个struct,这里包含一个zmq套接字
type ZmqLogger struct {
writer *zmq.Socket
containerId string
tenantId string
serviceId string
felock sync.Mutex
}
//定义init方法调用logger注册器的方法注册当前driver
//和参数验证方法。
func init() {
if err := logger.RegisterLogDriver(name, New); err != nil {
logrus.Fatal(err)
}
if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
logrus.Fatal(err)
}
}
//实现一个上文提到的Creator方法注册logdriver.
//这里新建一个zmq套接字构建一个实例
func New(ctx logger.Context) (logger.Logger, error) {
zmqaddress := ctx.Config[zmqAddress]
puber, err := zmq.NewSocket(zmq.PUB)
if err != nil {
return nil, err
}
var (
env = make(map[string]string)
tenantId string
serviceId string
)
for _, pair := range ctx.ContainerEnv {
p := strings.SplitN(pair, "=", 2)
//logrus.Errorf("ContainerEnv pair: %s", pair)
if len(p) == 2 {
key := p[0]
value := p[1]
env[key] = value
}
}
tenantId = env["TENANT_ID"]
serviceId = env["SERVICE_ID"]
if tenantId == "" {
tenantId = "default"
}
if serviceId == "" {
serviceId = "default"
}
puber.Connect(zmqaddress)
return &ZmqLogger{
writer: puber,
containerId: ctx.ID(),
tenantId: tenantId,
serviceId: serviceId,
felock: sync.Mutex{},
}, nil
}
//实现Log方法,这里使用zmq socket发送日志消息
//这里必须注意,zmq socket是线程不安全的,我们知道
//本方法可能被两个线程(复制stdout和肤质stderr)调用//必须使用锁保证线程安全。否则会发生错误。
func (s *ZmqLogger) Log(msg *logger.Message) error {
s.felock.Lock()
defer s.felock.Unlock()
s.writer.Send(s.tenantId, zmq.SNDMORE)
s.writer.Send(s.serviceId, zmq.SNDMORE)
if msg.Source == "stderr" {
s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
} else {
s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
}
return nil
}
//实现Close方法,这里用来关闭zmq socket。
//同样注意线程安全,调用此方法的是容器关闭协程。
func (s *ZmqLogger) Close() error {
s.felock.Lock()
defer s.felock.Unlock()
if s.writer != nil {
return s.writer.Close()
}
return nil
}
func (s *ZmqLogger) Name() string {
return name
}
//验证参数的方法,我们使用参数传入zmq pub的地址。
func ValidateLogOpt(cfg map[string]string) error {
for key := range cfg {
switch key {
case zmqAddress:
default:
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
}
}
if cfg[zmqAddress] == "" {
return fmt.Errorf("must specify a value for log opt '%s'", zmqAddress)
}
return nil
}
管理节点的日志根据功能不同分散到多个目录中,下面列出了日志目录及分类:
/data/service_logs 或 /logs 云帮后台服务日志目录
/data/docker_logs 计算节点运行的容器日志(标准输出,汇总日志)
/grdata/logs 应用的构建日志
一些主要日志目录介绍:
//var/log/upstart # 安装日志目录
├── dc-agent.logs #安装日志,如果安装出现问题可以通过该目录下的日志进行排查.主要就是`dc-agent`文件。
/data/docker_logs # 应用容器日志
├── stdout.log #计算节点上运行的容器日志(标准输出)都汇总到这个目录中。每天一个日志文件,当天的日志命名为 stdout.log
/grdata/logs # 应用构建日志
├── #每个应用的构建日志都存放在这个目录中,可以执行定期清理工作。
/logs(/data/service_logs)
├── labor # 平台worker日志目录
│ ├── build_work # 构建worker日志
│ │ ├── event.log
│ │ ├── main.log
│ ├── docker_logger # docker记录日志的worker日志
│ │ ├── main.log
│ ├── lb_worker # 负载均衡worker日志
│ │ ├── main.log
│ ├── mq_work # 消息队列worker日志
│ │ ├── app_slug.log
│ │ ├── code_check.log
│ │ ├── loader.log
│ │ ├── main.log
│ │ ├── regionlog.log
│ │ ├── service_event.log
│ │ ├── set_service_running.log
│ ├── pods_clean # k8s pod清理worker日志
│ │ ├── main.log
│ ├── service_container_monitor # 容器服务监控worker日志
│ │ ├── main.log
│ ├── client_error.log
│ ├── error.log # 需排查的error日志文件
│ ├── plugin_error.log
│ ├── request.log
│ ├── untopic.log
└── region_api # 区域中心api日志目录
├── deploy # 部署日志
│ ├── clean.log
│ ├── lb.log
├── monitor # 监控日志
│ ├── hook.log
├── service # 服务相关日志
│ └── create.log
├── client_error.log
├── error.log
├── request.log
└── untopic.log
crontab -e
# 添加计划任务
6 0 * * * /usr/bin/find /data/service_logs/ -name "*.log.*" -mtime +3 -delete
7 0 * * * /usr/bin/find /grdata/logs -name "*.log" -mtime +3 -delete
8 0 * * * /usr/bin/find /data/docker_logs -name "*.log" -mtime +3 -delete
最新版默认已经集成此工具,通过此工具可以快速定位应用。
COMMANDS: │USER TTY FROM LOGIN@ IDLE WHAT
exec 进入容器方法。grctl exec POD_NAME COMMAND │ysicing console - 日18 3days -
get 获取应用运行详细信息。grctl get PATH │ysicing s003 - 三11 - tmux -l
log 获取服务的日志。grctl log SERVICE_ID │➜ shell
node 获取计算节点信息 │➜ shell w
tenant 获取租户应用(包括未运行)信息。 grctl tenant TENANT_NAME │17:08 up 3 days, 22:16, 2 users, load averages: 2.25 2.15 2.27
help, h Shows a list of commands or help for one command │USER TTY FROM LOGIN@ IDLE WHAT
│ysicing console - 日18 3days -
GLOBAL OPTIONS: │ysicing s003 - 三11 - tmux -l
--config FILE, -c FILE Load configuration from FILE (default: "/etc/goodrain/grctl.json")│➜ shell w
--region.url value, -u value Region api url. to form region get other config (default: "http://│17:08 up 3 days, 22:16, 2 users, load averages: 2.25 2.15 2.27
region.goodrain.me:8888") │USER TTY FROM LOGIN@ IDLE WHAT
--help, -h show help │ysicing console - 日18 3days -
--version, -v print the version
MONGO_HOST=127.0.0.1
MONGO_PORT=端口
查看效果:
本地可以使用Robomongo远程连接Mongodb.
应用日志对接ELK