前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang 基础:原生并发 goroutine channel 和 select 常见使用场景

Golang 基础:原生并发 goroutine channel 和 select 常见使用场景

作者头像
张拭心 shixinzhang
发布2022-05-10 08:15:20
8370
发布2022-05-10 08:15:20
举报

文章目录

  • goroutine
    • goroutine 调度原理
  • channel
    • channel 的不同类型
      • 作为参数的单向类型
      • 关闭 channel
      • len(channel)
      • nil channel
    • 无缓冲 channel 的常见用途 🔥
      • 多 goroutine 通信:信号
      • 多 goroutine 同步:通过阻塞,替代锁
    • 带缓冲 channel 的常见用途 🔥
      • 消息队列
      • 计数信号量
  • select
    • channel 与 select 结合的常见用途 🔥
      • 利用 default 分支避免阻塞
      • 实现超时
      • 心跳机制
  • 总结

Go 语言之父 Rob Pike 的经典名言:“不要通过共享内存来通信,应该通过通信来共享内存(Don’t communicate by sharing memory, share memory by communicating)

C/C++ 线程的复杂性:

线程退出时要考虑新创建的线程是否要与主线程分离(detach) 还是需要主线程等待子线程终止(join)并获取其终止状态? 又或者是否需要在新线程中设置取消点(cancel point)来保证被主线程取消(cancel)的时候能顺利退出

goroutine 是由 Go 运行时(runtime)负责调度的、轻量的用户级线程。

优势:

  1. 占用内存小,goroutine 初始栈只有 2k,比 Linux 线程小多了
  2. 用户态调度,不需要内核介入,代价更小
  3. 一退出就会被回收
  4. 提供 channel 通信

无论是 Go 自身运行时代码还是用户层 Go 代码,都无一例外地运行在 goroutine 中。

goroutine

调用函数、方法(具名、匿名、闭包都可以)时,前面加上 go 关键字,就会创建一个 goroutine。

goroutine 调度原理

Goroutine 调度器的任务:将 Goroutine 按照一定算法放到不同的操作系统线程中去执行。

演进:

  • G-M 模型(废弃):将 G(Goroutine) 调度到 M(Machine) 上运行
  • G-P-M 模型(使用中):增加中间层 P(Processor),提供队列管理多个 G,然后在合适的时候绑定 M。先后使用协作式、抢占式调度。
  • NUMA 调度模型(尚未实现)
【G-P-M调度图】
【G-P-M调度图】

图片来自:https://time.geekbang.org/column/article/476643

  • G:存储 Goroutine 的执行信息,包括:栈、状态
  • P:逻辑处理器,有一个待调度的 G 队列
  • M:真正的计算资源,Go 代码运行的真实载体(用户态线程),要执行 G 需要绑定 P,绑定后会从 P 的本地队列和全局队列获取 G 然后执行
//src/runtime/runtime2.go
type g struct {
    stack      stack   // offset known to runtime/cgo
    sched      gobuf
    goid       int64
    gopc       uintptr // pc of go statement that created this goroutine
    startpc    uintptr // pc of goroutine function
    ... ...
}

type p struct {
    lock mutex

    id          int32
    status      uint32 // one of pidle/prunning/...
  
    mcache      *mcache
    racectx     uintptr

    // Queue of runnable goroutines. Accessed without lock.
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr

    runnext guintptr

    // Available G's (status == Gdead)
    gfree    *g
    gfreecnt int32

    ... ...
}



type m struct {
    g0            *g     // goroutine with scheduling stack
    mstartfn      func()
    curg          *g     // current running goroutine
    ... ...
}

从 Go 1.2 以后,Go 调度器使用 G-P-M 模型,调度目标:公平地将 G 调度到 P 上运行。

调度策略:

  1. 常规执行,G 运行超出时间片后抢占调度
  2. G 阻塞在 channel 或者 I/O 上时,会被放置到等待队列,M 会尝试运行 P 的下一个可运行 G;当 G 可运行时,会被唤醒并修改状态,然后放到某个 P 的队列中,等待被绑定 M、执行
  3. G 阻塞在 syscall 上时,执行 G 的 M 也会受影响,会解绑 P、进入挂起状态;syscall 返回后,G 会尝试获取可用的 P,没获取到的话,修改状态,等待被运行

如果一个 G 任务运行 10ms,sysmon 就会认为它的运行时间太久而发出抢占式调度的请求。 一旦 G 的抢占标志位被设为 true,那么等到这个 G 下一次调用函数或方法时,运行时就可以将 G 抢占并移出运行状态,放入队列中,等待下一次被调度。

// $GOROOT/src/runtime/proc.go


// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

channel

和线程一样,一个应用内部启动的所有 goroutine 共享进程空间的资源,如果多个 goroutine 访问同一块内存数据,将会存在竞争。

Go 提供了 channel 作为 goroutine 之间的通信方式,goroutine 可以从 channel 读取数据,处理后再把数据写到 channel 中。

channel 是和 切片、map 类似的复合类型,使用时需要指定具体的类型:

c := make(chan int) //c 是一个 int 类型的 channel

和函数一样,channel 也是“第一公民” 身份,可以做变量、参数、返回值等。

func spawn(f func() error) <-chan error {
    c := make(chan error)

    go func() {
        c <- f()
    }()

    return c
}

func main() {
    c := spawn(func() error {
        time.Sleep(2 * time.Second)
        return errors.New("timeout")
    })
    fmt.Println(<-c)
}

main goroutine 与子 goroutine 之间建立了一个元素类型为 error 的 channel,子 goroutine 退出时,会将它执行的函数的错误返回值写入这个 channel,main goroutine 可以通过读取 channel 的值来获取子 goroutine 的退出状态。

channel 的不同类型

通过 make 可以创建 2 种类型的 channel:

  1. 无缓冲:读写是同步进行,没有对接人的话会一直阻塞着
  2. 有缓冲:有数据时读不会阻塞;未满时写数据不会阻塞

下面是无 buffer channel 的测试例子:

func testNoBufferChannel() {
    var c chan int = make(chan int) //无缓冲,同步进行,没有对接人,就会阻塞住
    //var c chan int = make(chan int, 5)    //有缓冲,容量为 5

    //大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
    go func() {
        fmt.Println("goroutine run")
        b := <-c //读取 channel
        fmt.Println("read from channel: ", b)
    }()

    fmt.Println("main goroutine before write")
    c <- 1  //没有 buffer,写入 channel 时会阻塞,直到有读取
    fmt.Println("main goroutine finish")
}

运行结果:

main goroutine before write
goroutine run
read from channel:  1
main goroutine finish

和预期一致,主 goroutine 在写入无 buffer 的 channel 时会阻塞,直到 子 goroutine 读取。

下面是有 buffer channel 的测试例子:

func testBufferChannel() {
    c := make(chan int, 1)  //有缓冲,容量为 5

    //大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
    go func() {
        fmt.Println("child_goroutine run")
        b := <-c //读取 channel,有数据时不会阻塞
        fmt.Println("child_goroutine read from channel: ", b)
    }()

    fmt.Println("main goroutine before write first")
    c <- 1  //有 buffer,写入 channel 时不会阻塞,除非满了
    fmt.Println("main_goroutine first write finish, len:", len(c))

    fmt.Println("main_goroutine write second:")
    c <-2
    fmt.Println("main_goroutine finish, len:", len(c))

    time.Sleep( 3 * time.Second)    //不加这个子 goroutine 没执行就退出了
}

运行结果:

main goroutine before write first
main_goroutine first write finish, len: 1
main_goroutine write second:
child_goroutine run
child_goroutine read from channel:  1
main_goroutine finish, len: 1

可以看到

  1. 第一次写完立刻就返回;第二次写时,因为这个 goroutine 已经满了,所以阻塞在写上
  2. 子 goroutine 读取了一次,主 goroutine 才从写上返回

作为参数的单向类型

  1. 只发送, chan<-
  2. 只接收, <-chan
func testSingleDirectionChannel() {

    f := func(a chan<- int, b <- chan int) {    //a 是只能写入,b 是只能读取
        x := <- a   //编译报错:Invalid operation: <- a (receive from send-only type chan<- int)
        b <- 2      //编译报错:nvalid operation: b <- 2 (send to receive-only type <-chan int)
    }
}

通常只发送 channel 类型和只接收 channel 类型,会被用作函数的参数类型或返回值,用于限制对 channel 内的操作,或者是明确可对 channel 进行的操作的类型

普通channel,可以传入函数作为只发送或只接收类型

关闭 channel

close(channel) 后,不同语句的结果:

func testCloseChannel() {
    a := make(chan int)
    close(a)    //先关闭,然后看下几种读取关闭 channel 的结果
    b := <- a
    fmt.Println("关闭后直接读取:", b)  //0
    c, ok := <-a
    fmt.Println("关闭后通过逗号 ok 读取:", c, ok)    //0 false

    for v := range a{   //关闭的话直接跳过
        fmt.Println("关闭后通过 for-range 读取", v)
    }
}

通过“comma, ok” 我们可以知道 channel 是否被关闭。

一般由发送端负责关闭 channel,原因:

  1. 向一个关闭的 channel 中发送数据,会 panic (⚠️注意了!!!)
  2. 发送端没有办法判断 channel 是否已经关闭。

len(channel)

当 ch 为无缓冲 channel 时,len(ch) 总是返回 0;当 ch 为带缓冲 channel 时,len(ch) 返回当前 channel ch 中尚未被读取的元素个数。

如果只是想知道 channel 中是否有数据、不想阻塞,可以使用 len(channel) 先做检查:

【len(channel) 的图】
【len(channel) 的图】

nil channel

默认读取一个关闭的 channel,会返回零值。但是读取一个 nil channel,操作将阻塞。

所以在有些场景下,可能需要手动修改 channel 为 nil,以实现阻塞的效果,比如在 select 语句中。

无缓冲 channel 的常见用途 🔥

Go 语言倡导:

Do not communicate by sharing memory; instead, share memory by communicating. 不要通过共享内存来通信,而是通过通信来共享内存

多 goroutine 通信:信号

基于无 buffer channel,可以实现一对一和一对多的信号传递。

1.一对一

type signal struct{}

//接收一个函数,在子 routine 里执行,然后返回一个 channel,用于主 routine 等待
func spawn(f func()) <-chan signal {
    c := make(chan signal)
    go func() {
        fmt.Println("exec f in child_routine");
        f();
        fmt.Println("f exec finished, write to channel")
        c<- signal{}
    }()
    return c
}

//测试使用无 buffer channel 实现信号
func testUseNonBufferChannelImplSignal() {
    //模拟主 routine 等待子 routine

    worker := func() {
        fmt.Println("do some work")
        time.Sleep(3 * time.Second)
    }

    fmt.Println("start a worker...")
    c := spawn(worker)

    fmt.Println("spawn finished, read channel...")
    <-c //读取,阻塞等待

    fmt.Println("worker finished")
}

上面的代码中,主 routine 创建了一个函数,然后在子 routine 中执行,主 routine 阻塞在一个 channel 上,等待子 routine 完成后继续执行。

执行结果:

start a worker...
spawn finished, read channel...
exec f in child_routine
do some work
f exec finished, write to channel
worker finished

可以看到,这样的确实现了类似“信号”的机制:在一个 routine 中通知另一个 routine。 如果 channel 的类型复杂些,就可以传递任意数据了!

struct{} 大小是0,不占内存

2.一对多

关闭一个无 buffer channel 会让所有阻塞在这个 channel 上的 read 操作返回,基于此我们可以实现 1 对 n 的“广播”机制。

var waitGroup sync.WaitGroup

func spawnGroup(f func(ind int), count int, groupSignal chan struct{}) <-chan signal {
	c := make(chan signal)	//用于让主 routine 阻塞的 channel
	waitGroup.Add(count)	//等待总数

	//创建 n 个 goroutine
	for i := 0; i < count; i++ {
		go func(index int) {
			<- groupSignal	//读取阻塞,等待通知执行

			//fmt.Println("exec f in child_routine, index: ", i);
			//⚠️注意上面注释的代码,这里不能直接访问 for 循环的 i,因为这个是复用的,会导致访问的值不是目标值

			fmt.Println("exec f in child_routine, index: ", index);
			f(index);
			fmt.Println(index , " exec finished, write to channel")

			waitGroup.Done()
		}(i + 1)
	}

	//创建通知主 routine 结束的 routine,不能阻塞当前函数
	go func() {
		//需要同步等待所有子 routine 执行完
		waitGroup.Wait()
		c <- signal{}	//写入数据
	}()
	return c
}


func testUseNonBufferChannelImplGroupSignal() {
	worker := func(i int) {
		fmt.Println("do some work, index ", i)
		time.Sleep(3 * time.Second)
	}

	groupSignal := make(chan struct{})
	c := spawnGroup(worker, 5, groupSignal)

	fmt.Println("main routine: close channel")
	close(groupSignal)	//通知刚创建的所有 routine


	fmt.Println("main routine: read channel...")
	<- c	//阻塞在这里

	fmt.Println("main routine: all worker finished")
}

上面的代码做了这些事:

  1. 创建 channelA,传递给多个 goroutine
  2. 子 routine 中读取等待这个 channelA
  3. 主 routine 关闭 channel,然后阻塞在 channelB 上,此时所有子 routine 开始执行
  4. 所有子 routine 执行完后,通过 channelB 唤醒主 routine

运行结果:

main routine: close channel
main routine: read channel
exec f in child_routine, index:  2
do some work, index  2
exec f in child_routine, index:  1
do some work, index  1
exec f in child_routine, index:  3
do some work, index  3
exec f in child_routine, index:  4
do some work, index  4
exec f in child_routine, index:  5
do some work, index  5
4  exec finished, write to channel
5  exec finished, write to channel
3  exec finished, write to channel
1  exec finished, write to channel
2  exec finished, write to channel
main routine: all worker finished

一句话总结: 用 2 个 channel 实现了 【主 routine 通知所有子 routine 开始】 和【子 routine 通知主 routine 任务结束】。

多 goroutine 同步:通过阻塞,替代锁

type NewCounter struct {
	c chan int
	i int
}

func CreateNewCounter() *NewCounter {
	counter := &NewCounter{
		c: make(chan int),
		i: 0,
	}

	go func() {
		for {
			counter.i ++
			counter.c <- counter.i		//每次加一,阻塞在这里
		}
	}()

	return counter
}

func (c *NewCounter)Increase() int {
	return <- c.c		//读取到的值,是上一次加一
}

//多协程并发增加计数,通过 channel 写入阻塞,读取时加一
func testCounterWithChannel() {
	fmt.Println("\ntestCounterWithChannel ->>>")

	group := sync.WaitGroup{}
	counter := CreateNewCounter()

	for i:=0 ; i<10 ; i++ {
		group.Add(1)

		go func(i int) {
			count := counter.Increase()
			fmt.Printf("Goroutine-%d, count %d \n", i, count)
		}(i)
	}

	group.Wait()

}

上面的代码中,我们创建了一个单独的协程,在其中循环增加计数,但每次加一后,就会尝试写入 channel(无 buffer 的),在没有人读取时,会阻塞在这个方法上。

然后在 10 个协程里并发读取 channel,从而实现每次读取递增。

带缓冲 channel 的常见用途 🔥

消息队列

channel 的特性符合对消息队列的要求:

  1. 跨 goroutine 访问安全
  2. FIFO
  3. 可设置容量
  4. 异步收发

Go 支持 channel 的初衷是将它作为 Goroutine 间的通信手段,它并不是专门用于消息队列场景的。 如果你的项目需要专业消息队列的功能特性,比如支持优先级、支持权重、支持离线持久化等,那么 channel 就不合适了,可以使用第三方的专业的消息队列实现。

计数信号量

由于带 buffer channel 的特性(容量满时写入会阻塞),可以用它的容量表示同时最大并发数量。

下面是一个例子:

var active = make(chan struct{}, 3)	//"信号量",最多 3 个
var jobs = make(chan int, 10)

//使用带缓存的 channel,容量就是信号量的大小
func testSemaphoreWithBufferChannel() {

	//先写入数据,用作表示任务
	go func() {
		for i:= 0; i < 9; i++ {
			jobs <- i + 1
		}
		close(jobs)
	}()

	var wg sync.WaitGroup

	for j := range jobs {
		wg.Add(1)

		//执行任务
		go func(i int) {
			//通知开始执行,当容量用完时,阻塞
			active <- struct{}{}

			//fmt.Println("exec job ", i)
			log.Printf("exec job: %d, length of active: %d \n", i, len(active))
			time.Sleep(2 * time.Second)

			//执行完,通知结束
			<- active
			wg.Done()

		}(j)
	}

	wg.Wait()
}

上面的代码中,我们用 channel jobs 表示要执行的任务(这里为 8 个),然后用 channel active 表示信号量(最多三个)。

然后在 8 个 goroutine 里执行任务,每个任务耗时 2s。在每次执行任务前,先写入 channel 表示获取信号量;执行完后读取,表示释放信号量。

由于信号量最多三个,所以同一时刻最多能有 3 个任务得以执行。

运行结果如下,符合预期:

2022/04/20 19:14:26 exec job: 1, length of active: 1 
2022/04/20 19:14:26 exec job: 9, length of active: 2 
2022/04/20 19:14:26 exec job: 5, length of active: 3 
2022/04/20 19:14:28 exec job: 6, length of active: 3 
2022/04/20 19:14:28 exec job: 7, length of active: 3 
2022/04/20 19:14:28 exec job: 8, length of active: 3 
2022/04/20 19:14:30 exec job: 3, length of active: 3 
2022/04/20 19:14:30 exec job: 2, length of active: 3 
2022/04/20 19:14:30 exec job: 4, length of active: 3 

select

当需要在一个 goroutine 同时读/写多个 channel 时,可以使用 select:

类似 Linux 的 I/O 多路复用思路,我们可以叫它:goroutine 多路复用。

func testSelect() {
    channelA := make(chan int)
    channelB := make(chan int)

    go func() {
        var readA bool
        var readB bool

        for {
            select {
            case x := <- channelA:
                fmt.Println("child_routine: read from channelA:", x)
                readA = true
            case y := <- channelB:
                fmt.Println("child_routine:  read from channelB:", y)
                readB = true
            //default:
            //  //其他 case 阻塞,就执行 default
            //  fmt.Println("default")
            }

            if readA && readB {

                fmt.Println("child_goroutine finish")
                return;
            } else {
                fmt.Println("child_goroutine still loop, ", readA, readB)
            }
        }
    }()

    fmt.Println("main goroutine")

    time.Sleep(2 * time.Second)

    fmt.Println("main goroutine, write to channelA")
    channelA <- 111
    fmt.Println("main goroutine, write to channelA finish")

    time.Sleep(1 * time.Second)

    fmt.Println("main goroutine, write to channelB")
    channelB <- 111
    fmt.Println("main goroutine, write to channelB finish")

    time.Sleep( 5 * time.Second)
    fmt.Println("main goroutinefinish")
}

输出:

main goroutine
main goroutine, write to channelA
main goroutine, write to channelA finish
child_routine: read from channelA: 111
child_goroutine still loop,  true false
main goroutine, write to channelB
main goroutine, write to channelB finish
child_routine:  read from channelB: 111
child_goroutine finish
main goroutinefinish

可以看到:

  1. 使用 select 在一个 goroutine 里读取了 2 个 channel
  2. 这 2 个 case 里的 channel 都不可读时,select 阻塞,只会执行 default,不会执行 select 代码块以外的
  3. 主 goroutine 写入数据后,select 的其中一个 case 返回,然后继续执行 select 后面的逻辑
  4. 下一轮循环后 2 个 case 都不可读,继续阻塞
  5. 然后主 goroutine 写入后,另外一个 case 也返回,循环结束

channel 与 select 结合的常见用途 🔥

利用 default 分支避免阻塞

select 的 default 分支语义:当所有 case 语句里读/写 channel 阻塞时,会执行 default!

无论 channel 是否有 buffer。

有些时候,我们可能不希望阻塞在写入 channel 上,那可以利用 select default 的特性,这样封装一个函数,当写入阻塞时,返回一个 false,让外界可以处理阻塞的情况:

func tryWriteChannel(c chan<- int, value int) bool {
	select {
	case c <- value
		return true
	default:	//其他没就绪时,会执行
		return false
	}
}

这样使用:

			//active <- 1		//之前直接写 channel,如果满了,就会阻塞
			writed := tryWriteChannel(active, 1)	//改成这样,可以在阻塞时,处理相关逻辑
			if !writed {
				log.Println("failed to write channel")
				return
			}

实现超时

假如我们想在一个 channel 的读/写操作上加一个超时逻辑,可以通过这样实现: 在 select 代码块中,加一个 case,这个 case 会在超时后执行,这样会结束其他 case。

比如这样:

func tryGetSemaphore(c chan<- struct{}) bool {
	select {
	case c <- struct {}{}:
		return true
	case <- time.After(1 * time.Second):		//在写 channel 的基础上,额外加一个情况,超时情况
		log.Println("timeout!!!")
		//1s 后返回,可以在这里做超时处理
		return true
	}
}

及时调用 timer 的 Stop 方法回收 Timer 资源。

心跳机制

循环执行一个额外的 case,这个 case 会定时返回。

func worker() {
  heartbeat := time.NewTicker(30 * time.Second)
  defer heartbeat.Stop()
  for {
    select {
    case <-c:
      // ... do some stuff
    case <- heartbeat.C:
      //... do heartbeat stuff
    }
  }
}

time.NewTicker 会创建一个定时执行的心跳,可以把这个 ticker channel 读取的操作放到一个 case 里,这样 select 代码块就会定时执行一次。

ticker 也要及时 Stop。

总结

本文介绍了 Golang 中通过 goroutine channel 和 select 实现并发操作的一些典型场景。

可以看到,通过 goroutine 实现并发是如此的简单;通过 channel 无 buffer 和有 buffer,实现一些 goroutine 同步机制也比较方便;结合 select,实现 goroutine 的统一管理。

在学习一门语言时,既要结合已有的语言知识,也要吸收新语言的设计思想。

需要记住的是,Go 提倡通过 CSP 模型(communicating sequential processes 通信顺序进程)进行通信,而不是传统语言的共享内存方式。

CSP:两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。

我们在遇到多 goroutine 通信、同步的情况,可以尽量多使本文的内容进行处理。

不过对于某些情况,也可以使用 go 提供的 sync 包下的内容,进行局部同步。下篇文章我们就来看看这些内容。

对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据时,我们可以使用更为高效的低级同步原语(如 mutex),保证 goroutine 对数据的同步访问。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-04-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • goroutine
    • goroutine 调度原理
    • channel
      • channel 的不同类型
        • 作为参数的单向类型
        • 关闭 channel
        • len(channel)
        • nil channel
      • 无缓冲 channel 的常见用途 🔥
        • 多 goroutine 通信:信号
        • 多 goroutine 同步:通过阻塞,替代锁
      • 带缓冲 channel 的常见用途 🔥
        • 消息队列
        • 计数信号量
    • select
      • channel 与 select 结合的常见用途 🔥
        • 利用 default 分支避免阻塞
        • 实现超时
        • 心跳机制
    • 总结
    相关产品与服务
    消息队列 CMQ 版
    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档