专栏首页码匠的流水账聊聊tunny的workerWrapper

聊聊tunny的workerWrapper

本文主要研究一下tunny的workerWrapper

workerWrapper

type workerWrapper struct {
    worker        Worker
    interruptChan chan struct{}

    // reqChan is NOT owned by this type, it is used to send requests for work.
    reqChan chan<- workRequest

    // closeChan can be closed in order to cleanly shutdown this worker.
    closeChan chan struct{}

    // closedChan is closed by the run() goroutine when it exits.
    closedChan chan struct{}
}

func newWorkerWrapper(
    reqChan chan<- workRequest,
    worker Worker,
) *workerWrapper {
    w := workerWrapper{
        worker:        worker,
        interruptChan: make(chan struct{}),
        reqChan:       reqChan,
        closeChan:     make(chan struct{}),
        closedChan:    make(chan struct{}),
    }

    go w.run()

    return &w
}

workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性

interrupt

func (w *workerWrapper) interrupt() {
    close(w.interruptChan)
    w.worker.Interrupt()
}

interrupt方法关闭w.interruptChan,执行w.worker.Interrupt()

run

func (w *workerWrapper) run() {
    jobChan, retChan := make(chan interface{}), make(chan interface{})
    defer func() {
        w.worker.Terminate()
        close(retChan)
        close(w.closedChan)
    }()

    for {
        // NOTE: Blocking here will prevent the worker from closing down.
        w.worker.BlockUntilReady()
        select {
        case w.reqChan <- workRequest{
            jobChan:       jobChan,
            retChan:       retChan,
            interruptFunc: w.interrupt,
        }:
            select {
            case payload := <-jobChan:
                result := w.worker.Process(payload)
                select {
                case retChan <- result:
                case <-w.interruptChan:
                    w.interruptChan = make(chan struct{})
                }
            case _, _ = <-w.interruptChan:
                w.interruptChan = make(chan struct{})
            }
        case <-w.closeChan:
            return
        }
    }
}

run首先创建jobChan、retChan,然后for循环执行select读取reqChan,之后读取jobChan的payload,进行处理,然后写入到retChan

stop

func (w *workerWrapper) stop() {
    close(w.closeChan)
}

stop方法关闭w.closeChan

join

func (w *workerWrapper) join() {
    <-w.closedChan
}

join方法则等待w.closedChan

小结

tunny的workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性,它提供了interrupt、run、stop、join方法。

doc

  • tunny

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-04-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊tunny的workerWrapper

    tunny的workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性,它提供...

    codecraft
  • 聊聊golang的tunny

    tunny的Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法;NewFunc方法创建的是clos...

    codecraft
  • 聊聊golang的tunny

    tunny的Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法;NewFunc方法创建的是clos...

    codecraft
  • 聊聊flink的TimeCharacteristic

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/Time...

    codecraft
  • 聊聊flink的SourceFunction

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

    codecraft
  • 聊聊flink的CsvReader

    flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.jav...

    codecraft
  • 聊聊flink的BoltWrapper

    flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper....

    codecraft
  • 聊聊flink的EventTime

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/func...

    codecraft
  • 聊聊flink的ParallelIteratorInputFormat

    本文主要研究一下flink的ParallelIteratorInputFormat

    codecraft
  • 聊聊flink的RichParallelSourceFunction

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

    codecraft
  • 聊聊flink的InputFormatSourceFunction

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/envi...

    codecraft
  • 聊聊flink的MemoryStateBackend

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBacken...

    codecraft
  • 聊聊flink的OperatorStateBackend

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorSta...

    codecraft
  • 聊聊flink的MemCheckpointStreamFactory

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointS...

    codecraft
  • 聊聊storm的OpaquePartitionedTridentSpoutExecutor

    本文主要研究一下storm的OpaquePartitionedTridentSpoutExecutor

    codecraft
  • 聊聊storm的ICommitterTridentSpout

    storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/ICommitterTridentSp...

    codecraft
  • 聊聊flink的FsStateBackend

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBacken...

    codecraft
  • 聊聊flink的FsCheckpointStorage

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointS...

    codecraft
  • 聊聊flink的MemoryBackendCheckpointStorage

    本文主要研究一下flink的MemoryBackendCheckpointStorage

    codecraft

扫码关注云+社区

领取腾讯云代金券