前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >GoLang 的并发编程与通信(一) -- goroutine 与通道

GoLang 的并发编程与通信(一) -- goroutine 与通道

作者头像
用户3147702
发布2022-06-27 14:11:11
6180
发布2022-06-27 14:11:11
举报
文章被收录于专栏:小脑斧科技博客

1. 引言

服务端程序每一时刻都在经受着大量并发流量的考验,而如今,CPU 指令运行频率的提升已经面临瓶颈,只能通过核心数的增长来大幅提升其指令的执行能力。 因此,现代程序设计中,并发编程的支持就显得越来越重要。 GoLang 进行并发编程十分轻松,他有两种风格可供选择:

  1. goroutine 和通道
  2. 通过共享内存同步的传统多线程模型

本文,我们就来详细介绍一下 goroutine 与通道机制如何来使用。

2. goroutine

GoLang 中,goroutine 是最为简单的一种并发执行机制,每一个并发执行的活动都被称为 goroutine,每个 goroutine 类似于一个线程,但它与线程只有着非常大的差别,这将在下一篇文章中进行分析和讲述。 当程序启动时,用来执行 main 函数的 goroutine 被称为主 goroutine,此后,只要在调用函数时,前面加上关键词 go,就可以创建一个新的 goroutine:

f() // 调用函数 f(),并等待他返回 go f() // 并发调用函数 f(),不用等待

2.1. 示例

下面的例子展示了主 goroutine 运行的同时,并发执行打点 goroutine,每毫秒更新一次标准输出:

代码语言:javascript
复制
package main

import (
    "fmt"
    "time"
)

func fib(n int) int {
    if n < 2 {
        return n
    }
    return fib(n-1) + fib(n-2)
}

func ticking(delay time.Duration) {
    arrows := [...]string{"-", "\\", "|", "/"}
    for i := 1; true; i++ {
        time.Sleep(delay)
        fmt.Printf("\r%s %dms", arrows[i % 4], i)
    }
}

func main() {
    timestamp := time.Now().UnixNano() / 1e6
    go ticking(100 * time.Millisecond)
    const n = 45
    result := fib(n)
    fmt.Printf("\nFibonacci(%d) = %d\n", n, result)
    fmt.Printf("Use time: %.2fms", float64(time.Now().UnixNano() / 1e6 - timestamp)/100.0)
}

可以看到标准输出伴随着一个字符的直线在旋转的同时,他后面的数字在增加,直到斐波那契数列指定位数的值完成计算并输出:

- 108ms Fibonacci(45) = 1134903170 Use time: 108.24ms

一旦主 goroutine 运行结束,所有 goroutine 都会暴力地直接中止运行,然后程序退出。

3. 通过网络进行 goroutine 间的通信 — 标准库 net 包的使用

和 java 等很多语言中的线程一样,goroutine 也不能被其他 goroutine 中止,但多个 goroutine 之间可以进行通信。 通过网络进行通信是非常常用的并发通信机制,在 golang 中,net 包提供了 TCP、UDP、域套接字 的支持。

3.1. 通过 TCP 实现 goroutine 间通信

TCP 是一种非常常用的网络通信协议,关于 TCP 的详细介绍,可以参看主页君此前的文章: 传输控制协议 — TCP TCP连接的建立和终止

下面的代码展示了使用 net 包提供的方法进行 TCP 通信的示例:

代码语言:javascript
复制
package main

import (
    "bufio"
    "fmt"
    "io"
    "net"
    "os"
    "time"
)

func client(conn *net.TCPConn) {
    defer func() {
        _ = conn.Close()
    }()

    reader := bufio.NewReader(conn)
    b := []byte(time.Now().String() + " " + conn.LocalAddr().String() + " say hello to Server\n")
    _, _ = conn.Write(b)

    msg, err := reader.ReadString('\n')
    if err != nil || err == io.EOF {
        _, _ = fmt.Fprintf(os.Stderr, "client read failed: %v", err)
    }
    fmt.Println("client recieved: " + msg)
}

func server(tcpListener *net.TCPListener) {
    defer tcpListener.Close()
    fmt.Println("Server ready to read ...")
    serverConn, err := tcpListener.AcceptTCP()
    if err != nil {
        _, _ = fmt.Fprintf(os.Stderr, "server accept failed: %v", err)
        return
    }
    defer func() {
        _ = serverConn.Close()
    }()

    reader := bufio.NewReader(serverConn)
    message, err := reader.ReadString('\n')
    if nil != err || err == io.EOF {
        _, _ = fmt.Fprintf(os.Stderr, "server read failed: %v", err)
        return
    }
    fmt.Println("server recieved: " + message)
    b := []byte(serverConn.RemoteAddr().String() + " Server say world\n")
    _, _ = serverConn.Write(b)
}

func main() {
    tcpAddrServer, _ := net.ResolveTCPAddr("tcp", "localhost:8461")
    tcpListener, _ := net.ListenTCP("tcp", tcpAddrServer)
    go server(tcpListener)

    tcpAddrCient, _ := net.ResolveTCPAddr("tcp", "localhost:8461")
    clientConn, err := net.DialTCP("tcp", nil, tcpAddrCient)
    if nil != err {
        fmt.Println("Client connect error ! " + err.Error())
        return
    }

    fmt.Println(clientConn.LocalAddr().String() + " : Client connected!")
    go client(clientConn)
    for {}
}

上面的代码由主 goroutine 分别依次启动了两个 goroutine — server 和 client。 server 等待 client 传来的字符串,打印并回传一句字符串。 client 传递一句字符串给 server 后将 server 回传的字符串打印出来。

执行代码,打印出了:

Server ready to read … 127.0.0.1:5777 : Client connected! server recieved: 2019-11-14 18:11:02.8257556 +0800 CST m=+3.096348701 127.0.0.1:5777 say hello to Server client recieved: 127.0.0.1:5777 Server say world

上述代码显得较为繁琐,在实际的 goroutine 通信中,如果是在 unix 环境下,选择 unix 域套接字进行 goroutine 间通信是更好的选择。 关于使用 net 包进行网络通信,后续会有文章进行详细介绍,敬请期待。

4. 通道

上述通过 net 包实现的网络通信看上去非常复杂,别急,GoLang 提供了更为好用的连接 goroutine 的工具 — 通道。 通道实现了一个 goroutine 发送特定值到另一个 goroutine 的通信机制,它与 unix 环境中的管道非常类似。 此前,我们已经介绍过 unix 环境中的管道的使用,他是 unix 环境下最为常用的进程间通信方式: 管道

4.1. 通道的类型

通道的类型就是 chan xxx,其中 xxx 可以是任何类型名,例如:

chan int chan struct{}

上述这样类型的通道既可以用来发送也可以用来接收数据。 同样,我们也可以声明只用于发送或接收的单向通道:

chan<- int — 只用于发送的通道 <-chan int — 只用于接收的通道

4.2. 通道的创建和关闭

4.2.1. 通道的创建

和 map 一样,通道通过内置函数 make 就可以实现创建:

代码语言:javascript
复制
ch := make(chan int)
ch := make(chan int, 3)

make 的第二个参数是可选的,用来表示创建的缓冲区大小,默认表示无缓冲区。 如果缓冲区已满或没有缓冲区,那么在通道上的发送操作会被阻塞,直到另一个 goroutine 在该通道上接收数据或者发送操作被中止。 同样,当缓冲区为空或没有缓冲区,接收操作也会被阻塞,直到由发送方发送新的数据。

特别的,有时我们并不想通过通道传递任何数据,只是想要通过通道发送一个信号,此时,我们通常使用 chan struct{} 类型的通道,传递一个 struct{}{} 空对象。

4.2.2. 通道的关闭

内置函数同样提供了关闭通道的方法:

代码语言:javascript
复制
close(ch)

在通道关闭后,任何发送操作都会产生宕机,而接收操作会读取通道中所有剩余数据。 如果在通道关闭后,所有数据已经被接收,再次执行接收操作会立即返回对应类型的零值。

在 GoLang 中,如果在使用文件后没有执行 close 操作,将会造成无法回收的内存泄漏,但对于通道来说不会,垃圾回收器会根据通道是否可以被访问来决定是否回收相应的资源,无论通道是否进行过 close 操作。

4.3. 通道的发送和接收

代码语言:javascript
复制
ch <- x    // 发送语句,将变量 x 的值发送到通道 ch
x = <-ch   // 接收语句,接收通道 ch 中的数据并赋值给变量 x
<-ch       // 接受语句,丢弃接收到的通道中的数据

4.4. 示例

4.4.1. 双向通道

有了通道,我们上面通过 TCP 实现通信的例子就可以十分简化了:

代码语言:javascript
复制
package main

import (
    "fmt"
)

func client(ch chan string, sigover chan struct{}) {
    str := "Client say hello"
    ch <- str
    str = <-ch
    fmt.Println("client recieved: " + str)
    sigover <- struct{}{}
}

func server(ch chan string) {
    str := <-ch
    fmt.Println("server recieved: " + str)
    str = "Server say world"
    ch <- str
    close(ch)
}

func main() {
    ch := make(chan string)
    sigover := make(chan struct{})
    go client(ch, sigover)
    go server(ch)
    <-sigover
}

打印出了:

server recieved: Client say hello client recieved: Server say world

这个例子展示了通过 chan struct{} 类型的通道报告 goroutine 运行结束的机制,这是非常常用的一种方法。

4.4.2. 单向通道

下面的例子通过单向通道计算了 1 到 10 的平方:

代码语言:javascript
复制
package main

import "fmt"

func counter(out chan<- int) {
    for x := 1; x <= 10; x++ {
        out <- x
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v*v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    nums := make(chan int)
    squares := make(chan int)

    go counter(nums)
    go squarer(squares, nums)
    printer(squares)
}

打印出了:

1 4 9 16 25 36 49 64 81 100

4.5. 缓冲通道

上面通道的创建操作中,我们已经讲述过具有缓冲的通道的创建和使用。 带有缓冲区的通道可以看作是一个队列,进行先入先出操作。

4.5.1. 获取缓冲通道的缓冲区容量和已缓冲元素数

代码语言:javascript
复制
cap(ch)  // 获取缓冲通道缓冲区容量
len(ch)  // 获取缓冲通道已缓冲元素数

通过上述两个方法,我们可以实现非阻塞的通道读写操作。

5. 通道的多路复用 — select

通常,操作系统中的 IO 操作同时只能对一个 fd 执行读取或写入操作,但对于服务端程序来说,多个客户端与服务端建立连接,任何时刻任何连接都有可能有数据到来,那么如果使用传统的阻塞式 IO,我们的进程一旦阻塞等待某个连接,其他连接都将无法被处理,而入股哦使用非阻塞式 IO,那么一遍遍轮询全部连接将大大降低执行效率。 现代操作系统提供了这样问题的理想解决方案 — IO 复用模型。 IO复用 & UNIX下的五种IO模型

GoLang 中,通道的使用也存在同样的问题,那就是按照上面描述的通道的使用,一个 goroutine 同时只能与另一个 goroutine 通信,那么,如果一个 goroutine 要同时接收多个通道中数据的到来,上面的使用方式就显得力不从心了。 GoLang 中提供了与操作系统中的 IO 复用模型类似的通道多路复用模型 — select。

5.1. 使用方式

select 的使用方式与 switch 语句非常相似:

代码语言:javascript
复制
select {
case value1 <- ch1:
    // do something
case value2 <- ch2:
    // do something
}

5.2. 示例

下面的例子是基于上面计算数字平方的修改:

代码语言:javascript
复制
package main

import (
    "fmt"
    "os"
    "time"
)

func counter(out chan<- int, abort <-chan struct{}) {
    tick := time.Tick(time.Second)
    i := 1
    for {
        select {
        case <-tick:
            out <- i
            i += 1
        case <-abort:
            close(out)
            return
        }
    }
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v*v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func setabort(abort chan struct{}) {
    _, _ = os.Stdin.Read(make([]byte, 1)) // 等待读取单个字节
    abort <- struct{}{}
}

func main() {
    nums := make(chan int)
    squares := make(chan int)
    abort := make(chan struct{})

    fmt.Println("Calculate the square of each number per second, press any key to abort")
    go setabort(abort)
    go counter(nums, abort)
    go squarer(squares, nums)
    printer(squares)
}

我们将上面十个数的循环改成了无限循环,通过接受一个字符来向 abort 通道发送一个信号,从而实现流程的中止。 counter 函数同时从每秒生成心跳的 tick 通道和随时可能产生中止信号的 abort 通道读取数据,此时,select 多路复用就显得非常有用了。 执行程序并在适当时候输入字符 a 生成中止信号,程序打印出了:

1 4 9 a

5.3. 通过 select 实现非阻塞式通道读写

与 switch 语句一样,select 也可以加入 default 语句,如果所有的 case 条件中的通道均没有数据就绪,那么 select 语句不会阻塞等待,而是会去执行 default 语句,这就实现了通道的非阻塞式读写。 下面的例子展示了通道的非阻塞式读写:

代码语言:javascript
复制
package main

import "fmt"

func main() {
    abort := make(chan struct{})
    over := make(chan struct{})
    go func(over chan struct{}) {
        select {
        case <-abort:
            fmt.Println("Launch aborted!")
            return
        default:
            fmt.Println("No abort signal launched")
        }
        over <- struct{}{}
    }(over)
    <-over
}

执行程序,因为没有 goroutine 向这个匿名 goroutine 传递 abort 信号,所以他打印出了:

No abort signal launched

6. 参考资料

《Go 语言程序设计》。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. goroutine
    • 2.1. 示例
    • 3. 通过网络进行 goroutine 间的通信 — 标准库 net 包的使用
      • 3.1. 通过 TCP 实现 goroutine 间通信
      • 4. 通道
        • 4.1. 通道的类型
          • 4.2. 通道的创建和关闭
            • 4.2.1. 通道的创建
            • 4.2.2. 通道的关闭
          • 4.3. 通道的发送和接收
            • 4.4. 示例
              • 4.4.1. 双向通道
              • 4.4.2. 单向通道
            • 4.5. 缓冲通道
              • 4.5.1. 获取缓冲通道的缓冲区容量和已缓冲元素数
          • 5. 通道的多路复用 — select
            • 5.1. 使用方式
              • 5.2. 示例
                • 5.3. 通过 select 实现非阻塞式通道读写
                • 6. 参考资料
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档