前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go 进阶训练营 – 并行编程一:Goroutine

Go 进阶训练营 – 并行编程一:Goroutine

作者头像
Yuyy
发布2022-12-05 15:36:31
4200
发布2022-12-05 15:36:31
举报

Processes and Threads

进程

操作系统会为该应用程序创建一个进程。作为一个应用程序,它像一个为所有资源而运行的容器。这些资源包括内存地址空间、文件句柄(文件描述符)、设备和线程。

不同的应用程序使用的内存空间不同,在操作系统里,就是用进程来做的资源管理、隔离。

线程

线程是操作系统调度的一种执行路径,用于在处理器执行我们在函数中编写的代码。一个进程从一个线程开始,即主线程,当该线程终止时,进程终止。这是因为主线程是应用程序的原点。

main方法不是以主线程运行

go

主线程是一个物理线程,而main函数只是作为goroutine运行的,但是main退出,其他goroutine也会退出。

Java

main方法并不是主线程运行,也就是main方法退出,JVM进程不一定退出,main里开启的子线程会继续运行。java虚拟机(相当于进程)退出的时机是:虚拟机中所有存活的线程都是守护线程。

Goroutines and Parallelism

操作系统调度线程在可用处理器上运行,Go运行时调度 goroutines 在绑定到单个操作系统线程的逻辑处理器中运行(P)。即使使用这个单一的逻辑处理器和操作系统线程,也可以调度数十万 goroutine 以惊人的效率和性能并发运行。

Go 的并发更高效

goroutine增加,只会导致少量内存增加,不会增加操作系统线程,而且goroutine的上下文切换就交给了go runtime,也就是内核态改为用户态。

Java前不久也推出类似goroutine的虚拟线程。

Concurrency is not Parallelism.

如果将运行时配置为使用多个逻辑处理器,则调度程序将在这些逻辑处理器之间分配 goroutine,这将导致 goroutine 在不同的操作系统线程上运行。但是,要获得真正的并行性,您需要在具有多个物理处理器的计算机上运行程序。否则,goroutines 将针对单个物理处理器并发运行,即使 Go 运行时使用多个逻辑处理器。

Keep yourself busy or do the work yourself

package main

import (
    "fmt"
    "log"
    "net/http"
)

func main() {
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintln(w, "Hello, GopherCon SG")
    })
    go func() {
        if err := http.ListenAndServe(":8080", nil); err != nil {
            log.Fatal(err)
        }
    }()

    for {
    }
}

main协程异步调用阻塞函数http.ListenAndServe()后,自己啥也不干,空转CPU

要不异步调用后做其他事情,要不就自己来做

import (
  "fmt"
  "log"
  "net/http"
)

func main() {
  http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
      fmt.Fprintln(w, "Hello, GopherCon SG")
  })
  if err := http.ListenAndServe(":8080", nil); err != nil {
      log.Fatal(err)
  }
}
  • log.Fatal 内部调用 os.Exit() 会导致defer不执行,建议用panic

http.ListenAndServe() 怎么阻塞的?

    for {
        rw, err := l.Accept()
        if err != nil {
            select {
            case <-srv.getDoneChan():
                return ErrServerClosed
            default:
            }
            if ne, ok := err.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
                time.Sleep(tempDelay)
                continue
            }
            return err
        }
        connCtx := ctx
        if cc := srv.ConnContext; cc != nil {
            connCtx = cc(connCtx, rw)
            if connCtx == nil {
                panic("ConnContext returned nil")
            }
        }
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew, runHooks) // before Serve can return
        go c.serve(connCtx)
    }
  • 使用for循环不断获取请求
  • 通过新启协程委派出去,这里用协程池复用协程应该能提升性能

Leave concurrency to the caller

函数启动 goroutine,则必须向调用方提供显式停止该goroutine 的方法。例如http的ListenAndServe()Shutdown()一样。

通常,将异步执行函数的决定权交给该函数的调用方通常更容易。

读取目录信息

func ListDirectory(dir string)([]string,error)
  • 存在的问题
    • 必须读完所有数据才结束,对大型目录而言,即使已经读取到想要的数据,也得等读取完。
    • 需要将所以目录信息加载到内存,占用内存空间大。
func ListDirectory(dir string) chan string
  • 内部启动goroutine读取数据填充到chan,读取完毕关闭chan
  • 没有保存所有目录信息到内存,占用内存空间小。
  • 存在的问题
    • 调用者无法区分读取期间出现error,和空目录导致的error。
    • chan可能没有包含完整的数据,因为读取时可能发生错误。
    • 即使得到想要的数据,也无法终止读取操作。
func WalkDir(root string, fn fs.WalkDirFunc) error
type WalkDirFunc func(path string, d DirEntry, err error) error
  • 这是标准库里的实现方式
  • WalkDirFunc能感知到读取文件和目录产生的error。
  • WalkDirFunc可以返回SkipDir,让WalkDir跳过当前目录的读取。
  • WalkDirFunc返回的其他error,将导致WalkDir终止,并返回此error。

Never start a goroutine without knowning when it will stop

一个应用需要同时启动两个端口监听,一个是业务的,另一个是获取应用状态信息的。

版本一

func main() {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    go http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux) // debug
    http.ListenAndServe("0.0.0.0:8080", mux)                       // app traffic
}

版本二

func serveApp() {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    http.ListenAndServe("0.0.0.0:8080", mux)
}

func serveDebug() {
    http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}

func main() {
    go serveDebug()
    serveApp()
}
  • 解耦、拆分、封装
  • 同样把并发交给调用者决定
  • 应用无法感知serveDebug的goroutine退出。我们要确保应用的必要goroutine退出时,停止应用程序。
  • serveApp退出会导致应用退出,进而由进程管理者来决定是否重启。就像函数的并发交给调用者一样,应用的重启也应该交给应用外的程序,例如K8S,Linux进程管理工具supervisor。

版本三

func serveApp() {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    if err := http.ListenAndServe("0.0.0.0:8080", mux); err != nil {
        log.Fatal(err)
    }
}

func serveDebug() {
    if err := http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux); err != nil {
        log.Fatal(err)
    }
}

func main() {
    go serveDebug()
    go serveApp()
    select {}
}
  • ListenAndServer返回nil(编码时注意有没有这种可能),不会终止应用
  • log.Fatal 调用 os.Exit,无条件退出进程。不会触发defer,导致无法通知其他goroutine停止、无法关闭资源等。

版本四

func serveApp() error {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    return http.ListenAndServe("0.0.0.0:8080", mux)
}

func serveDebug() error {
    return http.ListenAndServe("127.0.0.1:8001", http.DefaultServeMux)
}

func main() {
    done := make(chan error, 2)
    go func() {
        done <- serveDebug()
    }()
    go func() {
        done <- serveApp()
    }()

    for i := 0; i < cap(done); i++ {
        if err := <-done; err != nil {
            fmt.Println("error: %v", err)
        }
    }
}
  • 获取到goroutine退出的error
  • 某个goroutine退出,并没有通知其他goroutine退出

版本五

func serve(addr string, handler http.Handler, stop <-chan struct{}) error {
    s := http.Server{
        Addr:    addr,
        Handler: handler,
    }

    go func() {
        <-stop // wait for stop signal
        s.Shutdown(context.Background())
    }()

    return s.ListenAndServe()
}

func serveApp(stop <-chan struct{}) error {
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
        fmt.Fprintln(resp, "Hello, QCon!")
    })
    return serve("0.0.0.0:8080", mux, stop)
}

func serveDebug(stop <-chan struct{}) error {
    return serve("127.0.0.1:8001", http.DefaultServeMux, stop)
}

func main() {
    done := make(chan error, 2)
    stop := make(chan struct{})
    go func() {
        done <- serveDebug(stop)
    }()
    go func() {
        done <- serveApp(stop)
    }()

    var stopped bool
    for i := 0; i < cap(done); i++ {
        if err := <-done; err != nil {
            fmt.Println("error: %v", err)
        }
        if !stopped {
            stopped = true
            close(stop)
        }
    }
}
  • 记录goroutine退出时的error
  • 某个goroutine退出,通知其他goroutine退出,进而停止应用
  • go-workgroup

Go 平滑重启 endless

复制父进程占有的文件描述符

netListener.File会通过系统调用 dup 复制一份 file descriptor 文件描述符。返回的新文件描述符和参数 oldfd 指向同一个文件,共享所有的索性、读写指针、各项权限或标志位等。但是不共享关闭标志位,也就是说 oldfd 已经关闭了,也不影响写入新的数据到 newfd 中。

Fork 子进程

在Go语言中 exec 包为我们很好的封装好了 Fork 调用,并且使用 ExtraFiles 可以很好的继承父进程已打开的文件。

平滑重启流程

img
img
  1. 监听 SIGHUP 信号;
  2. 收到信号时 fork 子进程(使用相同的启动命令),将服务监听的 socket 文件描述符传递给子进程;
  3. 子进程监听父进程的 socket,这个时候父进程和子进程都可以接收请求;
  4. 子进程启动成功之后发送 SIGTERM 信号给父进程,父进程停止接收新的连接(调用http的shutdown),等待旧连接处理完成(或超时);
  5. 父进程退出,升级完成;

goroutine 泄露

func leak(){
    ch make(chan int)

    go func()
        val :=<-ch
        fmt.Println("We received a value:"val)
    }()
}

保证异步工作完成

func (t *Tracker)Event(data string){
    time.Sleep(time.Millisecond)
    log.Println(data)
}
  • 使用服务端埋点来跟踪记录一些事件。
// Handle represents an example handler for the web service.
func (a *App)Handle(w http.ResponseWriter,r *http.Request){
    //Do some actual work.

    //Respond to the client.
    w.WriteHeader(http.StatusCreated)

    //Fire and Hope.
    //BUG:We are not managing this goroutine.
    go a.track.Event("this event")
}
  • 无法保证创建的 goroutine 生命周期管理,会导致在服务关闭时候,有一些事件丢失。
func (t *Tracker)Event(data string){
    //Increment counter so Shutdown knows to wait for this event.
    t.wg.Add(1)

    //Track event in a goroutine so caller is not blocked.
    go func(){
        /Decrement counter to tell Shutdown this goroutine finished.
        defer t.wg.Done()
        time.Sleep(time.Millisecond)//Simulate network write latency.
        log.Println(data)
    }()

//Shutdown waits for all tracked events to finish processing.
func (t *Tracker)Shutdown(){
    t.wg.Wait()
}
func main(){
    //Start a server.
    var a App

    //Shut the server down.

    //Wait for all event goroutines to finish.
    a.track.Shutdown()
}
  • 等待所有记录事件的goroutine完成
func main(){
    //Start a server.
    var a App

    //Shut the server down.

    //Wait up to 5 seconds for all event goroutines to finish.
    const timeout 5 time.Second
    ctx,cancel context.WithTimeout(context.Background(),timeout)
    defer cancel()

    a.track.Shutdown(ctx)
}
//Shutdown waits for all tracked events to finish processing
//or for the provided context to be canceled.
func (t *Tracker)Shutdown(ctx context.Context)error
    //Create a channel to signal when the waitgroup is finished
    ch make(chan struct{})

    //Create a goroutine to wait for all other goroutines to
    //be done then close the channel to unblock the select.
    go func(){
        t.wg.Wait()
        close(ch)
    }()

    //Block this function from returning.Wait for either the
    //waitgroup to finish or the context to expire.
    select{
    case <-ch:
        return nil
    case <-ctx.Done():
        return errors.New("timeout")
    }
}
  • 增加超时控制

参考

  1. Go语言的贡献者与布道师 Dave Cheney对Go并发的建议
  2. endless 如何实现不停机重启 Go 程序?

Post Views: 6

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-12-04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Processes and Threads
    • 进程
      • 线程
        • main方法不是以主线程运行
          • go
          • Java
      • Goroutines and Parallelism
        • Go 的并发更高效
          • Concurrency is not Parallelism.
          • Keep yourself busy or do the work yourself
            • http.ListenAndServe() 怎么阻塞的?
            • Leave concurrency to the caller
              • 读取目录信息
              • Never start a goroutine without knowning when it will stop
                • 版本一
                  • 版本二
                    • 版本三
                      • 版本四
                        • 版本五
                        • Go 平滑重启 endless
                          • 复制父进程占有的文件描述符
                            • Fork 子进程
                              • 平滑重启流程
                              • goroutine 泄露
                              • 保证异步工作完成
                              • 参考
                              相关产品与服务
                              容器服务
                              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档