前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊golang的zap的Sink

聊聊golang的zap的Sink

作者头像
code4it
发布2020-12-24 11:00:32
5560
发布2020-12-24 11:00:32
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下golang的zap的Sink

Sink

zap@v1.16.0/sink.go

代码语言:javascript
复制
type Sink interface {
    zapcore.WriteSyncer
    io.Closer
}

type WriteSyncer interface {
    io.Writer
    Sync() error
}

type Writer interface {
    Write(p []byte) (n int, err error)
}

type Closer interface {
    Close() error
}

Sink接口内嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)接口

RegisterSink

zap@v1.16.0/sink.go

代码语言:javascript
复制
const schemeFile = "file"

var (
    _sinkMutex     sync.RWMutex
    _sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme
)

func init() {
    resetSinkRegistry()
}

func resetSinkRegistry() {
    _sinkMutex.Lock()
    defer _sinkMutex.Unlock()

    _sinkFactories = map[string]func(*url.URL) (Sink, error){
        schemeFile: newFileSink,
    }
}

func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {
    _sinkMutex.Lock()
    defer _sinkMutex.Unlock()

    if scheme == "" {
        return errors.New("can't register a sink factory for empty string")
    }
    normalized, err := normalizeScheme(scheme)
    if err != nil {
        return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
    }
    if _, ok := _sinkFactories[normalized]; ok {
        return fmt.Errorf("sink factory already registered for scheme %q", normalized)
    }
    _sinkFactories[normalized] = factory
    return nil
}

RegisterSink方法会往_sinkFactories注册指定scheme的sink factory,该factory接收url.URL返回Sink;resetSinkRegistry方法默认注册了scheme为file的newFileSink

newFileSink

zap@v1.16.0/sink.go

代码语言:javascript
复制
func newFileSink(u *url.URL) (Sink, error) {
    if u.User != nil {
        return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u)
    }
    if u.Fragment != "" {
        return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u)
    }
    if u.RawQuery != "" {
        return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u)
    }
    // Error messages are better if we check hostname and port separately.
    if u.Port() != "" {
        return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u)
    }
    if hn := u.Hostname(); hn != "" && hn != "localhost" {
        return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u)
    }
    switch u.Path {
    case "stdout":
        return nopCloserSink{os.Stdout}, nil
    case "stderr":
        return nopCloserSink{os.Stderr}, nil
    }
    return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
}

newFileSink使用os.OpenFile创建*os.File,由于*os.File拥有Write、Sync、Close方法,因而它实现了Sink接口

newSink

zap@v1.16.0/sink.go

代码语言:javascript
复制
func newSink(rawURL string) (Sink, error) {
    u, err := url.Parse(rawURL)
    if err != nil {
        return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)
    }
    if u.Scheme == "" {
        u.Scheme = schemeFile
    }

    _sinkMutex.RLock()
    factory, ok := _sinkFactories[u.Scheme]
    _sinkMutex.RUnlock()
    if !ok {
        return nil, &errSinkNotFound{u.Scheme}
    }
    return factory(u)
}

newSink方法会根据rawURL解析对应的scheme,如果scheme为空则默认为file,然后从_sinkFactories找到对应的factory,创建sink返回

open

zap@v1.16.0/writer.go

代码语言:javascript
复制
func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
    writers, close, err := open(paths)
    if err != nil {
        return nil, nil, err
    }

    writer := CombineWriteSyncers(writers...)
    return writer, close, nil
}

func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {
    writers := make([]zapcore.WriteSyncer, 0, len(paths))
    closers := make([]io.Closer, 0, len(paths))
    close := func() {
        for _, c := range closers {
            c.Close()
        }
    }

    var openErr error
    for _, path := range paths {
        sink, err := newSink(path)
        if err != nil {
            openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
            continue
        }
        writers = append(writers, sink)
        closers = append(closers, sink)
    }
    if openErr != nil {
        close()
        return writers, nil, openErr
    }

    return writers, close, nil
}

zap.Open方法会使用newSink来创建sink作为zapcore.WriteSyncer

实例

代码语言:javascript
复制
func registerSinkDemo() {
    zap.RegisterSink("mq", mq.NewMqSink)
    writer, close, err := zap.Open("mq://192.168.99.100:9876/log")
    if err != nil {
        panic(err)
    }
    defer close()
    logger := zap.New(zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), writer, zap.DebugLevel)).Sugar()
    logger.Info("hello")
}

type MqWriteSyncer struct {
    topic    string
    producer rocketmq.Producer
    ctx      context.Context
}

func (m *MqWriteSyncer) Close() error {
    return m.producer.Shutdown()
}

func (m *MqWriteSyncer) Write(p []byte) (n int, err error) {
    msg := &primitive.Message{
        Topic: m.topic,
        Body:  p,
    }
    err = m.producer.SendOneWay(m.ctx, msg)
    return len(p), err
}

func (m *MqWriteSyncer) Sync() error {
    return nil
}

func NewMqSink(url *url.URL) (zap.Sink, error) {
    broker := fmt.Sprintf("%s:%s", url.Hostname(), url.Port())
    topic := url.Path[1:len(url.Path)]
    p, _ := rocketmq.NewProducer(
        producer.WithNameServer([]string{broker}),
        producer.WithRetry(2),
    )
    err := p.Start()
    if err != nil {
        fmt.Printf("start producer error: %s", err.Error())
        return nil, err
    }

    return &MqWriteSyncer{producer: p, ctx: context.Background(), topic: topic}, nil
}

这里通过zap.RegisterSink来注册一个mq的sink factory,然后通过zap.Open来创建MqWriteSyncer;MqWriteSyncer实现了zapcore.WriteSyncer的Write、Sync方法,同时也实现了Sink的Close方法

小结

Sink接口内嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)接口;zap.RegisterSink用于注册指定scheme的sink factory,而zap.Open则会解析url来找到对应的sink factory创建对应的sink,即writer。

doc

  • zap
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Sink
  • RegisterSink
  • newFileSink
  • newSink
  • open
  • 实例
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档