前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang实现协程池

Golang实现协程池

作者头像
地球流浪猫
发布2023-10-14 19:24:15
2100
发布2023-10-14 19:24:15
举报
文章被收录于专栏:流浪猫的golang流浪猫的golang

go实现协程池,协程轻量但并不是越多越好。虽然golang底层实现了对协程的复用,协程(Goroutine)的创建和调度由底层的运行时系统(runtime)负责,它会自动管理和复用协程,但是一瞬间并发过高仍然会导致内存资源消耗过大。使用协程池可用对资源进行有效控制。在内存资源够用的情况,或者其他不用限制同时任务数的情况,请用原生go 协程,不必使用协程池

协程池的数量和CPU核数的关系 小于或者等于CPU核数: 适用于计算密集型的任务中,如果协程的执行时间较长且没有IO操作,可以将协程池的数量设置为小于CPU核数的值。这样做可以避免过多的协程竞争CPU资源,减少上下文切换的开销,如图像处理、数据分析等。 大于CPU核数: 如果任务需要进行大量的IO操作,可以考虑将协程池的数量设置为大于CPU核数的值。 这样做可以充分利用CPU等待IO操作的时间,如网络请求、数据库查询等。

代码语言:javascript
复制
package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

type Job struct {
	ID int
}

type Worker struct {
	ID        int
	JobQueue  <-chan Job
	QuitChan  <-chan bool
	WaitGroup *sync.WaitGroup
	f         func(job Job)
}

type Pool struct {
	WorkerSize int
	JobQueue   chan Job
	QuitChan   chan bool
	WaitGroup  *sync.WaitGroup
	workerFunc func(job Job)
}

func NewWorker(id int, wg *sync.WaitGroup, jobQueue chan Job, quitChan <-chan bool) *Worker {
	return &Worker{
		ID:        id,
		JobQueue:  jobQueue,
		QuitChan:  quitChan,
		WaitGroup: wg,
	}
}

func (worker *Worker) StartWork() {
	go func() {
		for {
			select {
			case job := <-worker.JobQueue:
				worker.f(job)
				worker.WaitGroup.Done()
			case <-worker.QuitChan:
				fmt.Printf("Worker %d: quitting\n", worker.ID)
				return
			}
		}
	}()
}

func NewPool(workerSize int, f func(job Job)) *Pool {
	return &Pool{
		WorkerSize: workerSize,
		JobQueue:   make(chan Job),
		WaitGroup:  &sync.WaitGroup{},
		workerFunc: f,
	}
}

func (p *Pool) AddJob(job Job) {
	p.WaitGroup.Add(1)
	p.JobQueue <- job
}

func (p *Pool) Start() {
	for i := 0; i < p.WorkerSize; i++ {
		worker := NewWorker(i, p.WaitGroup, p.JobQueue, p.QuitChan)
		worker.f = p.workerFunc
		worker.StartWork()
	}
}

func (p *Pool) Stop() {
	for i := 0; i < p.WorkerSize; i++ {
		p.QuitChan <- true
	}
}
func (p *Pool) Close() {
	close(p.JobQueue)
	close(p.QuitChan)
}
func WorKerFunc(job Job) {
	fmt.Println("Job,id:", job.ID)
	time.Sleep(1 * time.Second)
	for i := 0; i < 1000000; i++ {
		c := 100 * 1000
		_ = c
	}
}
func main() {
	fmt.Printf("CPU 内核数量:%d,协程数量:%d", runtime.NumCPU(), runtime.NumCPU()*2)
	start := time.Now().UnixMilli()
	pool := NewPool(runtime.NumCPU()*2, WorKerFunc)
	pool.Start()
	for i := 0; i < 100; i++ {
		job := Job{
			ID: i,
		}
		pool.AddJob(job)
	}
	pool.WaitGroup.Wait()
	end := time.Now().UnixMilli()
	fmt.Println(end - start)
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-06-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档