前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang实现生产者-消费者的(N:1)模型

Golang实现生产者-消费者的(N:1)模型

作者头像
素履coder
发布2023-07-11 14:43:09
4590
发布2023-07-11 14:43:09
举报
文章被收录于专栏:素履coder素履coder

1. 目标模型#

  • 多个生产者对应一个消费者,即 N:1
  • 消费者处理生产者发送过来的消息时是并行处理的,但是有速率限制,最大为5qps
  • 消费者处理完了某个生产者的消息后,通知对应的生产者
  • 当某个生产者发送的所有消息都收到处理完成的消息后,执行后续逻辑

2. 实现分析#

根据上面的条件,可知:

  • 需要开启goroutine来实现并发处理
  • 使用带缓存的channel控制并发量
  • 使用for-select结构实现挂起等待
  • 使用单独的channl实现通知机制
  • 使用sync.WaitGroup保证goroutine执行完成

3. 代码实现#

3.1 初始变量#
代码语言:javascript
复制
const rateLimit = 5 // 每秒5个

type msgChan struct { //  用于通信的结构体
	Id       int64
	Text     string
	DoneChan chan int64
}

var ch chan msgChan

func InitChan() {
	ch = make(chan msgChan, 10)
}

func SendToChan(msg msgChan) {
	ch <- msg
}

func GetFromChan() chan msgChan {
	return ch
}
3.2 生产者#
代码语言:javascript
复制
func producer(i int64) *msgChan {
	var msg msgChan
	msg.Id = i
	msg.Text = fmt.Sprintf("消息文本%v", i)
	msg.DoneChan = make(chan int64)
	SendToChan(msg) // 发送到channel
	fmt.Println("producer: ", i)
	return &msg
}
3.3 消费者#
代码语言:javascript
复制
func consumer() {
	for {
		select {
		case msg, ok := <-GetFromChan():
			if ok {
				fmt.Printf("consumer %v processing ..., time: %v\n",
					msg.Id, time.Now().Format("2006-01-02 15:04:05"))

                // 因为 rateLimit = 5,所以模拟每个go大概需运行多长时间
                // 也可以在外部采用time.tick的方式
				time.Sleep((1000 / rateLimit) * time.Millisecond)

				msg.DoneChan <- msg.Id // 通知生产者已经消费完了
			}
		}
	}
}
3.4 主函数#
代码语言:javascript
复制
func main() {
	InitChan()
	
	// todo 消费者
	go consumer()
    
	// todo 生产者
	var wg sync.WaitGroup
	wg.Add(20) // 确保每个go都能运行完
	for i := 0; i < 20; i++ {
		msg := producer(int64(i))
		// todo 等待消费者消费完的通知(哪个先消费完就接收哪个)
		go func(m *msgChan, w *sync.WaitGroup) {
			defer w.Done()
			if v, ok := <-m.DoneChan; ok {
				fmt.Println("receive done: ", v)
				close(m.DoneChan) // 通信完后关闭channel
			}
		}(msg, &wg)
	}
	wg.Wait()
	close(ch)	// 结束的时候关闭channel
	fmt.Println("后续操作...")
}
3.5 运行结果#

根据时间间隔可知,实现了一秒钟最多运行5个go协程

代码语言:javascript
复制
macBook-Pro-8 go_learning % go build test12.go
macBook-Pro-8 go_learning % ./test12      
producer:  0
producer:  1
producer:  2
producer:  3
producer:  4
producer:  5
producer:  6
producer:  7
producer:  8
producer:  9
producer:  10
consumer 0 processing ..., time: 2023-06-12 22:45:31
receive done:  0
consumer 1 processing ..., time: 2023-06-12 22:45:31
producer:  11
consumer 2 processing ..., time: 2023-06-12 22:45:31
producer:  12
receive done:  1
consumer 3 processing ..., time: 2023-06-12 22:45:32
producer:  13
receive done:  2
consumer 4 processing ..., time: 2023-06-12 22:45:32
producer:  14
receive done:  3
consumer 5 processing ..., time: 2023-06-12 22:45:32
receive done:  4
producer:  15
consumer 6 processing ..., time: 2023-06-12 22:45:32
receive done:  5
producer:  16
consumer 7 processing ..., time: 2023-06-12 22:45:32
receive done:  6
producer:  17
consumer 8 processing ..., time: 2023-06-12 22:45:33
producer:  18
receive done:  7
consumer 9 processing ..., time: 2023-06-12 22:45:33
receive done:  8
producer:  19
consumer 10 processing ..., time: 2023-06-12 22:45:33
receive done:  9
consumer 11 processing ..., time: 2023-06-12 22:45:33
receive done:  10
consumer 12 processing ..., time: 2023-06-12 22:45:33
receive done:  11
consumer 13 processing ..., time: 2023-06-12 22:45:34
receive done:  12
consumer 14 processing ..., time: 2023-06-12 22:45:34
receive done:  13
consumer 15 processing ..., time: 2023-06-12 22:45:34
receive done:  14
consumer 16 processing ..., time: 2023-06-12 22:45:34
receive done:  15
consumer 17 processing ..., time: 2023-06-12 22:45:34
receive done:  16
consumer 18 processing ..., time: 2023-06-12 22:45:35
receive done:  17
consumer 19 processing ..., time: 2023-06-12 22:45:35
receive done:  18
receive done:  19
后续操作...
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-06-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 目标模型#
  • 2. 实现分析#
  • 3. 代码实现#
    • 3.1 初始变量#
      • 3.2 生产者#
        • 3.3 消费者#
          • 3.4 主函数#
            • 3.5 运行结果#
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档