首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Go语言并发编程

前言

学无止境,无止境学。坚持每天学点编程知识,坚持每天写点小文章,坚持每天进步一点点。大家好,我是张大鹏,喜欢学习和分享,希望能够通过此公众号,将自己学到的东西分享给大家,和大家一起交流,一起成长,一起进步。

乾坤未定,你我皆是黑马。大鹏一日同风起,扶摇直上九万里。宝剑锋从磨砺出,梅花香自苦寒来。不积跬步,无以至千里,不积小流无以成江海。

如果有多余的时间,就坚持学习吧,小时候学习改变命运,长大了学习丰富内涵,老了学习带来宁静。活到老,学到老,我能行!

概述

课程说明

《Go语言零基础入门班》收费说明:

本篇文章是《Go语言零基础入门班》的第4篇文章,前面还有:

《Go语言快速入门》

《Go语言基本数据类型》

《Go语言组合数据类型》

并发编程入门

使用go启动一个协程

package main

import (

"fmt"

"time"

)

func hello() {

fmt.Println("hello")

}

func main() {

go hello()

// Go的主进程不会自动等待协程结束

// 所以我们让主进程休眠,主动等待

time.Sleep(time.Second)

}

协程是匿名函数

使用go配合匿名函数启动一个协程是更为常见的使用方式。

示例代码:

package main

import (

"fmt"

"time"

)

func main() {

go func() {

fmt.Println("hello")

}()

// Go的主进程不会自动等待协程结束

// 所以我们让主进程休眠,主动等待

time.Sleep(time.Second)

}

管道的基本用法

管道的创建和读写

创建管道:

c := make(chan int)

向管道写入数据:

c

从管道读取数据:

x, y :=

基本示例:

package main

import "fmt"

func main() {

// 创建管道

c := make(chan int)

// 向管道写数据

go func(){

c

}()

// 从管道读数据

v :=

fmt.Println(v)

}

求数组和

需求:定义函数,求一个整数类型切片中所有元素的和。

示例代码:

package main

import "fmt"

// 求和函数

func getArrSum(arr []int){

sum := 0

for _,v:= range arr {

sum+=v

}

fmt.Println(sum)

}

func main() {

// 构建数组

arr := []int

// 求和

getArrSum(arr)

}

使用管道求单个数组和

需求:在上个案例的基础上,使用管道存储求和结果。定义一个管道作为函数的参数,在函数中求和结束后将求和结果写入到管道,在主协程中,读取管道中的结果并打印。

示例代码:

package main

import "fmt"

// 求和函数

func getArrSum(arr []int, c chan

sum := 0

for _,v:= range arr {

sum+=v

}

c

}

func main() {

// 构建数组

arr := []int

// 构建管道

c := make(chan int)

// 开启一个协程进行求和

go getArrSum(arr, c)

// 等待计算结束后读取求和结果

sum :=

// 查看结果

fmt.Println(sum)

}

通过本案例我们应该掌握:

普通管道的定义方式:chan 类型

只写管道的定义方式:chan

只读管道的定义方式:

使用管道求多个数组和

需求:在之前案例的基础上,实现同时求多个数组的和。比如,求[1,2], [3,4], [5,6]三个数组的和。

示例代码:

package main

import "fmt"

// 求和函数

func getArrSum(arr []int, c chan

sum := 0

for _,v:= range arr {

sum+=v

}

c

}

func main() {

// 构建数组

arr1 := []int

arr2 := []int

arr3 := []int

// 构建管道

c := make(chan int)

// 开启一个协程进行求和

go getArrSum(arr1, c)

go getArrSum(arr2, c)

go getArrSum(arr3, c)

// 等待计算结束后读取求和结果

// 管道写了三次,就要读取三次

// 这里有一个非常重要的特性,管道是一种类似栈结构,先进后出

// 所以,我们传入的是arr1、arr2、arr3,得到的却是sum3、sum2、sum1

sum3 :=

sum2 :=

sum1 :=

// 查看结果

fmt.Println(sum1, sum2, sum3)

}

案例分析:通过本案例我们应该掌握

管道是一种栈结构,先进后出

往管道里面写了多少个数据,就必须要读多少次,否则可能会造成死锁

将单数组拆分多数组并发求和

需求:在之前案例的基础上,我们将一个数组,拆分为多个数组进行并发求和。比如将[12,3,4,5,6],拆分为[1,2], [3,4], [5,6]进行并发求和。

示例代码:

package main

import "fmt"

// 求和函数

func getArrSum(arr []int, c chan

sum := 0

for _,v:= range arr {

sum+=v

}

c

}

func main() {

// 构建数组

arr := []int

fmt.Println(arr[:2])

fmt.Println(arr[2:4])

fmt.Println(arr[4:])

// 构建管道

c := make(chan int)

// 开启一个协程进行求和

go getArrSum(arr[:2], c)

go getArrSum(arr[2:4], c)

go getArrSum(arr[4:], c)

// 等待计算结束后读取求和结果

// 管道写了三次,就要读取三次

// 这里有一个非常重要的特性,管道是一种类似栈结构,先进后出

// 所以,我们传入的是arr1、arr2、arr3,得到的却是sum3、sum2、sum1

sum3 :=

sum2 :=

sum1 :=

// 查看结果

fmt.Println(sum1, sum2, sum3)

}

案例分析:通过本案例我们应该掌握

Go语言中的数组也叫切片或slice,和Python里面的列表类似,是可以通过切片表达式进行局部访问的

利用切片的特性,我们可以非常轻松的实现将一个大数组拆分为多个小数组,进行并发求和

非并发求数组和

需求:构造一个具有100万个随机10-110范围内的整数的切片,并编写一个求和函数,求切片中所有元素的和,并计算求和需要的时间。

如何生成随机浮点数

package main

import (

"fmt"

"math/rand"

"time"

)

func init() {

rand.Seed(time.Now().UnixNano())

}

func main() {

for i := 0; i < 10; i++ {

fmt.Println(rand.Float64())

}

}

如何生成指定范围随机整数

package main

import (

"fmt"

"math"

"math/rand"

"time"

)

func init() {

rand.Seed(time.Now().UnixNano())

}

func main() {

start := 10

end := 110

for i := 0; i < 10; i++ {

num := rand.Float64()*float64(end) + float64(start)

numInt := int(math.Floor(num))

fmt.Println(numInt)

}

}

如何生成整数切片数组

package main

import (

"fmt"

"math"

"math/rand"

"time"

)

func init() {

rand.Seed(time.Now().UnixNano())

}

func getArr(min, max, length int) (arr []int) {

for i := 0; i < length; i++ {

num := rand.Float64()*float64(max) + float64(min)

numInt := int(math.Floor(num))

arr = append(arr, numInt)

}

return

}

func main() {

start := 10

end := 110

length := 1000000

arr := getArr(start, end, length)

fmt.Println(arr[:3], len(arr))

}

如何求切片数组和

package main

import (

"fmt"

"math"

"math/rand"

"time"

)

func init() {

rand.Seed(time.Now().UnixNano())

}

func getArr(min, max, length int) (arr []int) {

for i := 0; i < length; i++ {

num := rand.Float64()*float64(max) + float64(min)

numInt := int(math.Floor(num))

arr = append(arr, numInt)

}

return

}

func getArrSum(arr []int) (sum int) {

for _, num := range arr {

sum += num

}

return

}

func main() {

start := 10

end := 110

length := 1000000

arr := getArr(start, end, length)

sum := getArrSum(arr)

fmt.Println(sum)

}

如何计算求和时间

package main

import (

"fmt"

"math"

"math/rand"

"time"

)

func init() {

rand.Seed(time.Now().UnixNano())

}

func getArr(min, max, length int) (arr []int) {

for i := 0; i < length; i++ {

num := rand.Float64()*float64(max) + float64(min)

numInt := int(math.Floor(num))

arr = append(arr, numInt)

}

return

}

func getArrSum(arr []int) (sum int) {

for _, num := range arr {

sum += num

}

return

}

func main() {

start := 10

end := 110

length := 1000000

arr := getArr(start, end, length)

startTime := time.Now()

sum := getArrSum(arr)

fmt.Println(sum)

spendTime := time.Since(startTime)

fmt.Println(spendTime)

}

并发求数组和

需求:

定义一个方法,接受一个数组和一个管道,对数组求和并将结果写入管道

定义一个方法,接受一个数组,如果数组元素个数大于3个,则将数组等分为3个子数组,并发求和

生成100万个随机10-110之间整数构成的随机数组,并发求和并计算时间

如何将求和结果写入管道

package main

import (

"fmt"

"math"

"math/rand"

"time"

)

func init() {

rand.Seed(time.Now().UnixNano())

}

func getArr(min, max, length int) (arr []int) {

for i := 0; i < length; i++ {

num := rand.Float64()*float64(max) + float64(min)

numInt := int(math.Floor(num))

arr = append(arr, numInt)

}

return

}

func getArrSum(arr []int) (sum int) {

for _, num := range arr {

sum += num

}

return

}

func writeArrSumChan(arr []int, c chan

c

}

func main() {

start := 10

end := 110

length := 1000000

arr := getArr(start, end, length)

startTime := time.Now()

c := make(chan int)

go writeArrSumChan(arr, c)

sum :=

fmt.Println(sum)

spendTime := time.Since(startTime)

fmt.Println(spendTime)

// 主动等待结束

time.Sleep(time.Second * 3)

}

如何并发求和

package main

import (

"fmt"

"math"

"math/rand"

"time"

)

func init() {

rand.Seed(time.Now().UnixNano())

}

func getArr(min, max, length int) (arr []int) {

for i := 0; i < length; i++ {

num := rand.Float64()*float64(max) + float64(min)

numInt := int(math.Floor(num))

arr = append(arr, numInt)

}

return

}

func getArrSum(arr []int) (sum int) {

for _, num := range arr {

sum += num

}

return

}

func writeArrSumChan(arr []int, c chan

c

}

// 并发求和

func getArrSum2(arr []int) (sum int) {

// 不需要并发求和

length := len(arr)

if length

sum = getArrSum(arr)

}

// 拆分数组

step := length / 3

arr1 := arr[:step]

arr2 := arr[step : step+step]

arr3 := arr[step+step:]

// 并发求和

c := make(chan int)

go writeArrSumChan(arr1, c)

go writeArrSumChan(arr2, c)

go writeArrSumChan(arr3, c)

v1, v2, v3 :=

sum += v1 + v2 + v3

return

}

func main() {

var (

start = 10

end = 110

length = 100000000

arr = getArr(start, end, length)

startTime time.Time

spendTime time.Duration

sum int

)

// 并发求和

startTime = time.Now()

sum = getArrSum2(arr)

fmt.Println(sum)

spendTime = time.Since(startTime)

fmt.Println("并发求和消耗时间:", spendTime)

// 非并发求和

startTime = time.Now()

sum = getArrSum(arr)

fmt.Println(sum)

spendTime = time.Since(startTime)

fmt.Println("非并发求和消耗时间:", spendTime)

// 主动等待结束

time.Sleep(time.Second * 3)

}

输出结果:

6450261436

并发求和消耗时间: 25.405516ms

6450261436

非并发求和消耗时间: 47.803627ms

Goroutine连接池

ants连接池概述

参考文档:https://github.com/panjf2000/ants/blob/master/README_ZH.md

虽然Go的Goroutine非常强大,几乎可以随意创建,但是毕竟资源是有限的。当我们大批量重复使用Goroutine的时候,将会占用非常多的系统资源,这个时候Goroutine连接池就非常有用了。

ants是一个非常流行的Goroutine连接池工具,具备以下功能:

自动调度海量的 goroutines,复用 goroutines

定期清理过期的 goroutines,进一步节省资源

提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool

优雅处理 panic,防止程序崩溃

资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能

非阻塞机制

使用默认连接池

如何使用默认连接池提交任务

// 任务子函数

func demoFunc() {

time.Sleep(10 * time.Millisecond)

fmt.Println("Hello World!")

}

// 任务函数

var wg sync.WaitGroup

syncCalculateSum := func() {

demoFunc()

wg.Done()

}

// 提交任务

ants.Submit(syncCalculateSum)

并发执行1000次无参方法

package main

import (

"fmt"

"sync"

"time"

"github.com/panjf2000/ants/v2"

)

// 核心的方法

func demoFunc() {

time.Sleep(10 * time.Millisecond)

fmt.Println("Hello World!")

}

func main() {

defer ants.Release()

runTimes := 1000

// 使用通用的连接池

var wg sync.WaitGroup

syncCalculateSum := func() {

demoFunc()

wg.Done()

}

// 提交指定次数的任务

for i := 0; i < runTimes; i++ {

wg.Add(1)

// 使用默认的连接池

_ = ants.Submit(syncCalculateSum)

}

wg.Wait()

fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running())

fmt.Printf("任务完成.\n")

}

使用自定义连接池

package main

import (

"fmt"

"sync"

"sync/atomic"

"github.com/panjf2000/ants/v2"

)

// 全局变量

var sum int32

// 核心的方法

func myFunc(i interface{}) {

n := i.(int32)

atomic.AddInt32(&sum, n)

fmt.Printf("run with %d\n", n)

}

func main() {

runTimes := 1000

// 使用通用的连接池

var wg sync.WaitGroup

// 创建连接池

p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {

myFunc(i)

wg.Done()

})

defer p.Release()

// 提交任务

for i := 0; i < runTimes; i++ {

wg.Add(1)

_ = p.Invoke(int32(i))

}

wg.Wait()

fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running())

fmt.Println("任务完成:", sum)

}

使用连接池并发求数组和

package main

import (

"fmt"

"math"

"math/rand"

"sync"

"sync/atomic"

"github.com/panjf2000/ants/v2"

)

// 全局变量

var sum int64

// 核心的方法

func sumArr(i interface{}) {

arr := i.([]int64)

for _, v := range arr {

atomic.AddInt64(&sum, v)

}

}

// 获取随机数组

func getArr(min, max, length int) (arr []int64) {

for i := 0; i < length; i++ {

num := rand.Float64()*float64(max) + float64(min)

numInt := int64(math.Floor(num))

arr = append(arr, numInt)

}

return

}

func main() {

arr := getArr(0, 100, 100000000)

// 使用通用的连接池

var wg sync.WaitGroup

// 创建连接池

p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {

sumArr(i)

wg.Done()

})

defer p.Release()

// 拆分数组

step := len(arr) / 3

arr1 := arr[:step]

arr2 := arr[step : step+step]

arr3 := arr[step+step:]

// 提交任务

wg.Add(1)

_ = p.Invoke(arr1)

wg.Add(1)

_ = p.Invoke(arr2)

wg.Add(1)

_ = p.Invoke(arr3)

wg.Wait()

fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running())

fmt.Println("任务完成:", sum)

}

比较连接池和普通方法求和消耗时间

package main

import (

"fmt"

"math"

"math/rand"

"sync"

"sync/atomic"

"time"

"github.com/panjf2000/ants/v2"

)

// 全局变量

var sum int64

// 核心的方法

func sumArr(i interface{}) {

arr := i.([]int64)

for _, v := range arr {

atomic.AddInt64(&sum, v)

}

}

// 获取随机数组

func getArr(min, max, length int) (arr []int64) {

for i := 0; i < length; i++ {

num := rand.Float64()*float64(max) + float64(min)

numInt := int64(math.Floor(num))

arr = append(arr, numInt)

}

return

}

// 求数组和

func getArrSum(arr []int64) (sum int64) {

for _, num := range arr {

sum += num

}

return

}

func main() {

var (

arr = getArr(0, 100, 100000000)

startTime time.Time

spendTime time.Duration

wg sync.WaitGroup

)

// 创建连接池

p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {

sumArr(i)

wg.Done()

})

defer p.Release()

// 拆分数组

startTime = time.Now()

step := len(arr) / 3

arr1 := arr[:step]

arr2 := arr[step : step+step]

arr3 := arr[step+step:]

// 提交任务

wg.Add(1)

_ = p.Invoke(arr1)

wg.Add(1)

_ = p.Invoke(arr2)

wg.Add(1)

_ = p.Invoke(arr3)

wg.Wait()

spendTime = time.Since(startTime)

fmt.Println("使用连接池求和:", sum, spendTime)

// 普通方法求和

startTime = time.Now()

sum = getArrSum(arr)

spendTime = time.Since(startTime)

fmt.Println("使用普通方法求和:", sum, spendTime)

}

输出结果:

使用连接池求和: 4950340694 1.323886243s

使用普通方法求和: 4950340694 46.091214ms

从结果可以发现,连接池消耗的时间反而比普通方法更多。主要是因为,连接池不仅要拆分数组,每次求和的时候,还需要单独将每个元素累加到全局变量sum上,会有额外的计算步骤。

即就是说,连接池有自己适用的使用场景,并非只要上连接池效率就高于一切,在真实的开发中,要根据实际需求考虑是否需要使用Goroutine连接池。

总结

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20230204A01J6T00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

相关快讯

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券