首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何控制golang协程的并发数量问题

如何控制golang协程的并发数量问题

作者头像
公众号-利志分享
发布2022-04-25 09:19:45
2K0
发布2022-04-25 09:19:45
举报
文章被收录于专栏:利志分享利志分享

问题

最近搞压测,写了一个压测的工具,就是想通过go来实现一秒发多少个请求,然后我写了一段这样的代码,如下,当然压测的代码我就没有贴了,我贴了主要核心代码,主要是起一个定时器,然后通过但仅此去读定时器的通道,然后for循环起goroutine,goroutine里面是进行并发的逻辑。

package main

import (
  "fmt"
  "time"
)

func main() {
  if err := recover(); err != nil {
    fmt.Println(err)
  }
  tick1 := time.NewTicker(1 * time.Second)
  for {
    select {
    case <-tick1.C:
      for i := 0; i < 100000; i++ {
        go func() {
          if err := recover(); err != nil {
            fmt.Println(err)
          }
          fmt.Println("xxx")
          time.Sleep(10 * time.Second)
        }()
      }
    }
  }
  time.Sleep(2 * time.Hour)
}

上面的代码执行完了之后发现会报错:

panic: too many concurrent operations on a single file or socket (max 1048575)

goroutine 1287935 [running]:
internal/poll.(*fdMutex).rwlock(0xc0000b6060, 0xc13e5a2e00, 0x4a1527)
        /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x146
internal/poll.(*FD).writeLock(...)
        /usr/local/go/src/internal/poll/fd_mutex.go:239
internal/poll.(*FD).Write(0xc0000b6060, 0xc08cd342c8, 0x4, 0x8, 0x0, 0x0, 0x0)
        /usr/local/go/src/internal/poll/fd_unix.go:254 +0x6e
os.(*File).write(...)
        /usr/local/go/src/os/file_posix.go:48
os.(*File).Write(0xc0000b4008, 0xc08cd342c8, 0x4, 0x8, 0x0, 0x0, 0x0)
        /usr/local/go/src/os/file.go:173 +0x77
fmt.Fprintln(0x4e9300, 0xc0000b4008, 0xc13e5a2fb0, 0x1, 0x1, 0x0, 0x0, 0x0)
        /usr/local/go/src/fmt/print.go:265 +0x8b
fmt.Println(...)
        /usr/local/go/src/fmt/print.go:274
main.main.func1.1()
        /root/test_go/t5.go:25 +0x96
created by main.main.func1
        /root/test_go/t5.go:21 +0x5f

关键的报错信息是:

panic: too many concurrent operations on a single file or socket (max 1048575)

对单个 file/socket的并发操作个数超过了系统上限,这个是标准输出造成的。

解决方案

1:不同的应用程序,消耗的资源是不一样的。比较推荐的方式的是:应用程序来主动限制并发的协程数量。

关于上面的问题代码我们进行优化,通过channel来控制并发数。

  • make(chan struct{}, 300) 创建缓冲区大小为 300 的 channel,在没有被接收的情况下,至多发送 300 个消息则被阻塞。
  • 开启协程前,调用 ch <- struct{}{},若缓存区满,则阻塞。
  • 协程任务结束,调用 <-ch 释放缓冲区。
package main

import (
  "fmt"
  "time"
)

func main() {
  if err := recover(); err != nil {
    fmt.Println(err)
  }
  tick1 := time.NewTicker(1 * time.Second)
  //初始化channel个数
  ch := make(chan struct{}, 300)
  for {
    select {
    case <-tick1.C:
      for i := 0; i < 100000; i++ {
        //这里往channel写数据
        ch <- struct{}{}
        go func() {
          if err := recover(); err != nil {
            fmt.Println(err)
          }
          fmt.Println("xxx")
          time.Sleep(10 * time.Second)
          //读取channel数据
          <-ch
        }()
      }
    }
  }
  time.Sleep(2 * time.Hour)
}

执行后,从日志中可以很容易看到,每秒钟只并发执行了 300 个任务,达到了协程并发控制的目的。

2:调整系统资源的上限

可以使用 ulimit -n 999999,将同时打开的文件句柄数量调整为 999999 来解决这个问题

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-07-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 利志分享 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档