专栏首页码匠的流水账聊聊golang的tunny

聊聊golang的tunny

本文主要研究一下tunny

Worker

type Worker interface {
    // Process will synchronously perform a job and return the result.
    Process(interface{}) interface{}

    // BlockUntilReady is called before each job is processed and must block the
    // calling goroutine until the Worker is ready to process the next job.
    BlockUntilReady()

    // Interrupt is called when a job is cancelled. The worker is responsible
    // for unblocking the Process implementation.
    Interrupt()

    // Terminate is called when a Worker is removed from the processing pool
    // and is responsible for cleaning up any held resources.
    Terminate()
}

Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法

closureWorker

type closureWorker struct {
    processor func(interface{}) interface{}
}

func (w *closureWorker) Process(payload interface{}) interface{} {
    return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt()       {}
func (w *closureWorker) Terminate()       {}

closureWorker定义了processor属性,它实现了Worker接口的Process、BlockUntilReady、Interrupt、Terminate方法,其中Process方法委托给processor

callbackWorker

type callbackWorker struct{}

func (w *callbackWorker) Process(payload interface{}) interface{} {
    f, ok := payload.(func())
    if !ok {
        return ErrJobNotFunc
    }
    f()
    return nil
}

func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt()       {}
func (w *callbackWorker) Terminate()       {}

callbackWorker定义了processor属性,它实现了Worker接口的Process、BlockUntilReady、Interrupt、Terminate方法,其中Process方法执行的是payload函数

Pool

type Pool struct {
    queuedJobs int64

    ctor    func() Worker
    workers []*workerWrapper
    reqChan chan workRequest

    workerMut sync.Mutex
}

func New(n int, ctor func() Worker) *Pool {
    p := &Pool{
        ctor:    ctor,
        reqChan: make(chan workRequest),
    }
    p.SetSize(n)

    return p
}

func NewFunc(n int, f func(interface{}) interface{}) *Pool {
    return New(n, func() Worker {
        return &closureWorker{
            processor: f,
        }
    })
}

func NewCallback(n int) *Pool {
    return New(n, func() Worker {
        return &callbackWorker{}
    })
}

Pool定义了queuedJobs、ctor、workers、reqChan、workerMut属性;New方法根据n和ctor创建Pool;NewFunc方法根据n和f来创建closureWorker;NewCallback方法创建callbackWorker

Process

func (p *Pool) Process(payload interface{}) interface{} {
    atomic.AddInt64(&p.queuedJobs, 1)

    request, open := <-p.reqChan
    if !open {
        panic(ErrPoolNotRunning)
    }

    request.jobChan <- payload

    payload, open = <-request.retChan
    if !open {
        panic(ErrWorkerClosed)
    }

    atomic.AddInt64(&p.queuedJobs, -1)
    return payload
}

Process方法首先递增queuedJobs,然后从reqChan读取request,然后往jobChan写入payload,之后再等待retChan,最后递减queuedJobs

SetSize

func (p *Pool) SetSize(n int) {
    p.workerMut.Lock()
    defer p.workerMut.Unlock()

    lWorkers := len(p.workers)
    if lWorkers == n {
        return
    }

    // Add extra workers if N > len(workers)
    for i := lWorkers; i < n; i++ {
        p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
    }

    // Asynchronously stop all workers > N
    for i := n; i < lWorkers; i++ {
        p.workers[i].stop()
    }

    // Synchronously wait for all workers > N to stop
    for i := n; i < lWorkers; i++ {
        p.workers[i].join()
    }

    // Remove stopped workers from slice
    p.workers = p.workers[:n]
}

SetSize方法首先通过workerMut加锁,然后根据lWorkers创建newWorkerWrapper,之后执行worker.stop,再执行worker.join(),然后清空workers

Close

func (p *Pool) Close() {
    p.SetSize(0)
    close(p.reqChan)
}

Close方法执行SetSize(0)及close(p.reqChan)

实例

func TestFuncJob(t *testing.T) {
    pool := NewFunc(10, func(in interface{}) interface{} {
        intVal := in.(int)
        return intVal * 2
    })
    defer pool.Close()

    for i := 0; i < 10; i++ {
        ret := pool.Process(10)
        if exp, act := 20, ret.(int); exp != act {
            t.Errorf("Wrong result: %v != %v", act, exp)
        }
    }
}

TestFuncJob通过NewFunc创建pool,

小结

tunny的Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法;NewFunc方法创建的是closureWorker,NewCallback方法创建的是callbackWorker。

doc

  • tunny

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-04-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊golang的tunny

    tunny的Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法;NewFunc方法创建的是clos...

    codecraft
  • 聊聊tunny的workerWrapper

    tunny的workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性,它提供...

    codecraft
  • 聊聊tunny的workerWrapper

    tunny的workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性,它提供...

    codecraft
  • 聊聊golang的lumberjack

    Logger定义了Filename、MaxSize(单个文件大小最大值,单位M)、MaxAge(单位天)、MaxBackups、LocalTime、Compre...

    codecraft
  • 聊聊golang的lumberjack

    Logger定义了Filename、MaxSize(单个文件大小最大值,单位M)、MaxAge(单位天)、MaxBackups、LocalTime、Compre...

    codecraft
  • 聊聊golang的defer

    defer可以拆解为return赋值,defer执行,最后代码返回三步;defer的顺序按逆序执行。

    codecraft
  • 聊聊golang的log

    codecraft
  • 聊聊golang的defer

    defer可以拆解为return赋值,defer执行,最后代码返回三步;defer的顺序按逆序执行。

    codecraft
  • 聊聊golang的零值

    codecraft
  • 聊聊golang的包init

    codecraft
  • 聊聊golang的零值

    codecraft
  • 聊聊golang的包init

    codecraft
  • 聊聊golang的zap的CheckedEntry

    CheckedEntry内嵌了Entry,定义了ErrorOutput、dirty、CheckWriteAction、cores属性;entry包使用_cePo...

    codecraft
  • 聊聊golang的zap的ZapKafkaWriter

    WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;Zap...

    codecraft
  • 聊聊golang的zap的hook

    Hooks方法将log的core使用zapcore.RegisterHooks包装了一下;RegisterHooks方法创建hooked,hooks赋值给hoo...

    codecraft
  • 聊聊golang的zap的zapgrpc

    https://github.com/grpc/grpc-go/blob/master/grpclog/logger.go

    codecraft
  • 聊聊golang的zap的NewTee

    zapcore.NewTee方法可以把多个core衔接在一起,对应logger的操作会同时操作这些core。

    codecraft
  • 聊聊golang的zap的global.go

    global.go提供了ReplaceGlobals方法用于注册全局的单例的logger;提供了NewStdLog方法用于返回标准库的log.Logger,然后...

    codecraft
  • 聊聊golang的zap的buffer

    zap@v1.16.0/internal/bufferpool/bufferpool.go

    codecraft

扫码关注云+社区

领取腾讯云代金券