专栏首页吴亲强的深夜食堂使用 Go 每分钟处理百万请求
原创

使用 Go 每分钟处理百万请求

微信公众号:吴亲强的深夜食堂

介绍

偶然间看到一篇写于15年的文章,说实话,标题确实吸引了我,不过看了几遍之后,确实精彩。 关于这篇文章,我就不直接翻译了。 项目的需求就是 客户端发送请求,服务端接收请求处理数据(原文是把资源上传至 Amazon S3 资源中)。本质上就是这样,

我稍微改动了原文的业务代码,但是并不影响核心模块。在第一版中,每收到一个 Request,开启一个 goroutine 进行处理,快速响应,很常规的操作。

代码如下

初版

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

type Payload struct {
	// 传啥不重要
}

func (p *Payload) UpdateToS3() error {
	//存储逻辑,模拟操作耗时
	time.Sleep(500 * time.Millisecond)
	fmt.Println("上传成功")
	return nil
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	// 业务过滤
	// 请求body解析......
	var p Payload
	go p.UpdateToS3()
	w.Write([]byte("操作成功"))
}

func main() {
	http.HandleFunc("/payload", payloadHandler)
	log.Fatal(http.ListenAndServe(":8099", nil))
}

这样操作存在什么问题呢?一般情况下,没什么问题。但是如果是高并发的场景下,不对 goroutine数进行控制,你的 CPU 使用率暴涨,内存占用暴涨,直至程序奔溃。

如果此操作落地至数据库,例如 mysql,那么相应的,你数据库的服务器磁盘IO、网络带宽 、CPU负载、内存消耗都会达到非常高的情况,一并奔溃。所以,一旦程序中出现不可控的事物,往往是危险的信号。

中版

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

const MaxQueue = 400

var Queue chan Payload

func init() {
	Queue = make(chan Payload, MaxQueue)
}

type Payload struct {
	// 传啥不重要
}

func (p *Payload) UpdateToS3() error {
	//存储逻辑,模拟操作耗时
	time.Sleep(500 * time.Millisecond)
	fmt.Println("上传成功")
	return nil
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	// 业务过滤
	// 请求body解析......
	var p Payload
	//go p.UpdateToS3()
	Queue <- p
	w.Write([]byte("操作成功"))
}

// 处理任务
func StartProcessor() {
	for {
		select {
		case payload := <-Queue:
			payload.UpdateToS3()
		}
	}
}

func main() {
	http.HandleFunc("/payload", payloadHandler)
	//单独开一个g接收与处理任务
	go StartProcessor()
	log.Fatal(http.ListenAndServe(":8099", nil))
}

这一版借助带 bufferedchannel 来完成这个功能,这样控制住了无限制的 goroutine,但是依然没有解决问题。

处理请求是一个同步的操作,每次只会处理一个任务,然而高并发下请求进来的速度远远超过了处理的速度。这种情况,一旦 channel 满了之后, 后续的请求将会被阻塞等地啊。然后你会发现,响应的时间会大幅度的开始增加, 甚至不再有任何的响应。

终版

package main

import (
"fmt"
"log"
"net/http"
"time"
)

const (
	MaxWorker = 100 //随便设置值
	MaxQueue  = 200 // 随便设置值
)

// 一个可以发送工作请求的缓冲 channel
var JobQueue chan Job

func init() {
	JobQueue = make(chan Job, MaxQueue)
}

type Payload struct{}

type Job struct {
	PayLoad Payload
}

type Worker struct {
	WorkerPool chan chan Job
	JobChannel chan Job
	quit       chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool),
	}
}

// Start 方法开启一个 worker 循环,监听退出 channel,可按需停止这个循环
func (w Worker) Start() {
	go func() {
		for {
			// 将当前的 worker 注册到 worker 队列中
			w.WorkerPool <- w.JobChannel
			select {
			case job := <-w.JobChannel:
				// 	真正业务的地方
				//	模拟操作耗时
				time.Sleep(500 * time.Millisecond)
				fmt.Printf("上传成功:%v\n", job)
			case <-w.quit:
				return
			}
		}
	}()
}

func (w Worker) stop() {
	go func() {
		w.quit <- true
	}()
}

// 初始化操作

type Dispatcher struct {
	// 注册到 dispatcher 的 worker channel 池
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
	// 开始运行 n 个 worker
	for i := 0; i < MaxWorker; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				// 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
				jobChannel := <-d.WorkerPool
				// 分发任务到 worker job channel 中
				jobChannel <- job
			}(job)
		}
	}
}

// 接收请求,把任务筛入JobQueue。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
	work := Job{PayLoad: Payload{}}
	JobQueue <- work
	_, _ = w.Write([]byte("操作成功"))
}

func main() {
	// 通过调度器创建worker,监听来自 JobQueue的任务
	d := NewDispatcher(MaxWorker)
	d.Run()
	http.HandleFunc("/payload", payloadHandler)
	log.Fatal(http.ListenAndServe(":8099", nil))
}

最终采用的是两级 channel,一级是将用户请求数据放入到 chan Job 中,这个 channel job 相当于待处理的任务队列。

另一级用来存放可以处理任务的 work 缓存队列,类型为 chan chan Job。调度器把待处理的任务放入一个空闲的缓存队列当中,work 会一直处理它的缓存队列。通过这种方式,实现了一个 worker 池。大致画了一个图帮助理解

首先我们在接收到一个请求后,创建 Job 任务,把它放入到任务队列中等待 work 池处理。

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	work := Job{PayLoad: Payload{}}
	JobQueue <- work
	_, _ = w.Write([]byte("操作成功"))
}

调度器初始化 work 池后,在 dispatch 中,一旦我们接收到 JobQueue 的任务,就去尝试获取一个可用的 worker,分发任务给 workerjob channel 中。 注意这个过程不是同步的,而是每接收到一个 job,就开启一个 G 去处理。这样可以保证 JobQueue 不需要进行阻塞,对应的往 JobQueue 理论上也不需要阻塞地写入任务。

func (d *Dispatcher) Run() {
	// 开始运行 n 个 worker
	for i := 0; i < MaxWorker; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				// 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
				jobChannel := <-d.WorkerPool
				// 分发任务到 worker job channel 中
				jobChannel <- job
			}(job)
		}
	}
}

这里"不可控"的 G 和上面还是又所不同的。仅仅极短时间内处于阻塞读 Chan 状态, 当有空闲的 worker 被唤醒,然后分发任务,整个生命周期远远短于上面的操作。

最后,强烈建议看一下原文,原文地址在:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用ClickHouse对每秒6百万次请求进行HTTP分析

    我们在Cloudflare的一个大规模数据基础架构挑战是为我们的客户提供HTTP流量分析。我们所有客户都可以通过两种方式使用HTTP分析:

    后场技术
  • 用 Python 实现每秒处理 120 万次 HTTP 请求

    用 Python 做到每秒处理上百万次 HTTP 请求,可能吗?也许不能,但直到最近,这已成为现实。

    IT派
  • 用 Python 实现每秒处理 120 万次 HTTP 请求

    学Python最简单的方法是什么?推荐阅读:30万年薪Python开发工程师成长魔法 用 Python 做到每秒处理上百万次 HTTP 请求,可能吗?也许不能,...

    小小科
  • 用Python实现每秒处理120万次HTTP请求

    [link]https://github.com/squeaky-pl/japronto

    双面人
  • go实现的压测工具【单台机器100w连接压测实战】

    本文介绍压测是什么,解释压测的专属名词,教大家如何压测。介绍市面上的常见压测工具(ab、locust、Jmeter、go实现的压测工具、云压测),对比这些压测工...

    link1st
  • Go 语言构建高并发分布式系统实践

    你知道互联网最抢手的技术人才有哪些吗?最新互联网职场生态报告显示,最抢手的十大互联网技术人才排名中Go语言开发人员位居第三,从中不难见得,Go语言的渗透率越来越...

    李海彬
  • Go 语言构建高并发分布式系统实践

    你知道互联网最抢手的技术人才有哪些吗?最新互联网职场生态报告显示,最抢手的十大互联网技术人才排名中Go语言开发人员位居第三,从中不难见得,Go语言的渗透率越来越...

    李海彬
  • Go 语言构建高并发分布式系统实践

    你知道互联网最抢手的技术人才有哪些吗?最新互联网职场生态报告显示,最抢手的十大互联网技术人才排名中Go语言开发人员位居第三,从中不难见得,Go语言的渗透率越来越...

    李海彬
  • Go 自带的 http/server.go 的连接解析 与 如何结合 master-worker 并发模式,提高单机并发能力

    林冠宏-指尖下的幽灵
  • Go 语言构建高并发分布式系统实践

    你知道互联网最抢手的技术人才有哪些吗?最新互联网职场生态报告显示,最抢手的十大互联网技术人才排名中Go语言开发人员位居第三,从中不难见得,Go语言的渗透率越来越...

    李海彬
  • 用 Go 重构 C 语言系统,这个抗住春晚红包的百度转发引擎承接了万亿流量

    11 月 20 日,百度的万亿流量转发引擎 BFE 登上了 GitHub Trending Top 3,今日 Star 已突破 270。事实上,这个曾经抗住 2...

    诸葛青云
  • 百度万亿流量转发引擎BFE VS Nginx

    BFE(Baidu Front End,百度统一前端)是百度的统一七层流量转发平台。BFE平台目前已接入百度大部分流量,每日转发请求接近1万亿,峰值QPS超过1...

    后端技术探索
  • 百万 Go TCP 连接的思考2: 百万连接的吞吐率和延迟

    上一篇epoll方式减少资源占用 介绍了测试环境以及epoll方式实现百万连接的TCP服务器。这篇文章介绍百万连接服务器的几种实现方式,以及它们的吞吐率和延迟。

    李海彬
  • 百度开源BFE被CNCF接纳为Sandbox Project

    2020年6月24日,BFE[1]开源项目被CNCF[2] (Cloud Native Computing Foundation,云原生计算基金会)正式接纳为S...

    公众号: 云原生生态圈
  • Go语言构建千万级在线的高并发消息推送系统实践

    用户1263954
  • 每秒处理1000万用户请求…云上架构如何实现高性能和高可用

    IT大咖说
  • 转型之后——流量洪峰中如何设计弹性微服务架构 | Techo大会精彩回顾第四期

    ? 全文共4142字,阅读需要8分钟 ? 导读 刘冠军《万象伊始——集中式架构为何演进到微服务架构》 秦金卫《转型求通——微服务架构的最佳实践和发展趋势》 曹...

    腾讯云中间件团队
  • Go程序GC优化经验分享

    最近一段时间对《仙侠道》的服务端进行了一系列针对GC的调优,这里跟各位分享一下调优的经验。

    李海彬
  • Go 的垃圾回收机制在实践中有哪些需要注意的地方?

    之前回答问题的时候Go还处在1.1版本,到了1.2和1.3,Go的GC跟踪命令和GC内部实现已经有一些变化,并且根据评论中的反馈,这边一并做补充说明。 Go ...

    李海彬

扫码关注云+社区

领取腾讯云代金券