前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用好工作池 WaitGroup

用好工作池 WaitGroup

作者头像
酷走天涯
发布2019-05-26 16:20:18
6220
发布2019-05-26 16:20:18
举报

本节学习

  • WaitGroup的用法
  • 如何实现工作池
WaitGroup的用法

WaitGroup 用于实现工作池,因此要理解工作池,我们首先需要学习 WaitGroup。

WaitGroup 用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。假设我们有 3 个并发执行的 Go 协程(由 Go 主协程生成)。Go 主协程需要等待这 3 个协程执行结束后,才会终止。这就可以用 WaitGroup 来实现

代码语言:javascript
复制
package main

import (
    "fmt"
    "time"
    "sync"
)

func login(wg *sync.WaitGroup){
    time.Sleep(time.Second)
    fmt.Println("登录完成")
    wg.Done()
}
func getUserInfo(wg *sync.WaitGroup){
    time.Sleep(time.Second)
    fmt.Println("获取用户信息")
    wg.Done()  //4
}

func main() {
   var wg sync.WaitGroup // 1
   wg.Add(1) //2
   go login(&wg)
   wg.Add(1)
   go getUserInfo(&wg) //3

   wg.Wait()
   fmt.Println("执行完毕")
}

上面写了两次wg.Add(1),当然你也可以写一次wg.Add(2)

image.png

下面是waitGroup 的使用说明 1.WaitGroup 是一个等待协程完成的结构体 2.主协程通过调用Add 方法设置等待协程的数量 3.每个子协程完成的时候,需要调用Done 方法,那么等待的协程数量会自动减一 4.wait方法主要告诉协程,开启等待模式,知道所有的协程完成工作

注意事项 go login(&wg) 我们一定要传递指针类型的变量,因为sync.WaitGroup 是结构体,是值类型,在传递的过程中会赋值,如果不用指针,创建的时候,就不是原来的结构体了

工作池

工作池就是一组等待任务分配的协程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。

我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。

我们工作池的核心功能如下:

创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。 将作业添加到该输入型缓冲信道中。 作业完成后,再将结果写入一个输出型缓冲信道。 从输出型缓冲信道读取并打印结果。

代码语言:javascript
复制
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    id       int
    randomno int
}
type Result struct {
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)


// 1.创建工作任务
func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    // 关闭工作信道
    close(jobs)
}

// 2.计算数的和
func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

// 执行一项工作 一项工作启用 一个协程 工作完毕后,等待组减一 多个协程同时调用 这个方法 会对 同一个信道 jobs 产生竞争,谁先拿到值,谁先执行
func 3.worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

// 4.创建执行数量的工作组
func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg) 
    }
    wg.Wait()
    // 当所有任务执行完毕后,关闭通道
    close(results)
}



// 5.对结果进行输出
func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

func main() {
    startTime := time.Now()

    noOfJobs := 100

    go allocate(noOfJobs)

    done := make(chan bool)
    // 完成一项任务执行一次输出
    go result(done)
    
    // 创建工作池开始做任务
    noOfWorkers := 50

    createWorkerPool(noOfWorkers)
    
    // 等待所有任务完成
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

案例二

我们要下载100 张图片,模拟每张图片下载需要500ms,请使用工作池实现这个下载图片的任务

download.go

代码语言:javascript
复制
package util

import (
    "sync"
    "fmt"
)

var results = make(chan string)
var tasks = make(chan string)

type Download struct {
    urls []string
    results []string
}

func (d *Download)CreateTasks(urls []string){
    d.urls = urls
    for _,v := range d.urls{
        tasks <- v
    }
    close(tasks)
}

func (d *Download)DownloadBy(url string){
    d.results = append(d.results,"下载完成了")

}

func (d *Download)startDownload(ws * sync.WaitGroup){
    for task :=  range tasks{
        d.DownloadBy(task)
    }
    ws.Done()
}

func (d *Download)CreateWorkerPool(num int, finish chan bool){
    var ws sync.WaitGroup
    for i := 0;i <num;i++{
        ws.Add(1)
        go d.startDownload(&ws)
    }
    ws.Wait()
    close(results)
    finish <- true
}



func (d *Download) Download(urls []string,finish func([]string)){
    result := make(chan bool)
    go d.CreateWorkerPool(10,result)
    go d.CreateTasks(urls)
    <- result
    fmt.Println("最后执行的")
    if(finish != nil){
        finish(d.results)
    }
}

main.go 文件

代码语言:javascript
复制
package main

import (
        "fmt"
    "strconv"
     "./util"

)



func main() {
    download := util.Download{}
    var urls = []string{}
    for i:=0;i < 100;i++{
        urls = append(urls, strconv.Itoa(i))
    }
    download.Download(urls, func(result []string) {
       fmt.Println(result)
    }) 
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • WaitGroup的用法
  • 工作池
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档