一起用golang之Go程序的套路

作者:山楂大卷 链接:https://www.jianshu.com/p/215510810c59 來源:简书

系统性地介绍golang基础的资料实在太多了,这里不再一一赘述。本文的思路是从另一个角度来由浅入深地探究下Go程序的套路。毕竟纸上得来终觉浅,所以,能动手就不要动口。有时候几天不写代码,突然间有一天投入进来做个东西,才恍然发觉,也只有敲代码的时候,才能找回迷失的自己,那可以忘掉一切的不开心。

Hello world

1package main
2
3import (
4    "fmt"
5)
6
7func main() {
8    fmt.Println("hello world")
9}

go程序结构从整体上来说就是这样的,第一行看起来这一定就是包头声明了,程序以包为单位,一个文件夹是一个包,一个包下可能有多个文件,但是包名都是同一个。相对C/C++程序的include来说,这里是import,后面跟的就是别的包名,一个包里定义的变量或类型,本包内都可见,若首字母大写,则可以被导出。如果引入了程序里不使用的包,编译会报错,报错,错。声明不使用的变量也一样,对,会报错。这里行尾没有分号,左大括号必须那样放,缩进也不用你操心等等,编码风格中的很多问题在这里都不再是问题,是的,go fmt帮你都搞定了,所以你看绝大部分go程序风格都好接近的。写一段时间代码后,你会发现,这种风格确实简单,干净利落。

本文重点

通过一些概念的学习和介绍,设计并实现个线程池,相信很多地方都可能用到这种模型或各种变形。

变量

变量的声明、定义、赋值、指针等不想啰嗦了,去别的地方学吧。

结构体

我们先来定义一个结构体吧

1package package1
2
3type User struct {
4    Name string
5    addr int
6    age  int
7}

你一定注意到了,Name首字母是大写的,在package2包中,import package1后就可以通过user.Name访问Name成员了,Name是被导出的。但addr和age在package2中就不能直接访问了,这俩没有被导出,只能在package1包中被直接访问,也就是私有的。那如何在package2中获取没有被导出的成员呢?我们来看下方法。

方法

1func (u User) GetAge() string {
2    return u.age
3}
4
5func(u *User) SetAge(age int){
6    u.age = age
7}

方法的使用和C++或者Java都很像的。下面代码段中user的类型是*User,你会发现,无论方法的接收者是对象还是指针,方法调用时都只用.,而代表指针的->已经不在了。

1user := &User{
2        Name: name,
3        addr: addr,
4        age:  age,
5}
6user.SetAge(100)
7fmt.Println(user.GetAge())

还有常用的构造对象的方式是这样的

1func NewUser(name string, addr string, age int) *User {
2    return &User{
3        Name: name,
4        addr: addr,
5        age:  age,
6    }
7}
1  user := new(User)
2    user := &User{}//与前者等价
3    user := User{}

组合与嵌套

Go中没有继承,没有了多态,也没有了模板。争论已久的继承与组合问题,在这里也不是问题了,因为已经没得选择了。比如我想实现个线程安全的整型(假设只用++和--),可能这么来做

 1type safepending struct {
 2    pending int
 3    mutex   sync.RWMutex
 4}
 5
 6func (s *safepending) Inc() {
 7    s.mutex.Lock()
 8    s.pending++
 9    s.mutex.Unlock()
10}
11
12func (s *safepending) Dec() {
13    s.mutex.Lock()
14    s.pending--
15    s.mutex.Unlock()
16}
17
18func (s *safepending) Get() int {
19    s.mutex.RLock()
20    n := s.pending
21    s.mutex.RUnlock()
22    return n
23}

也可以用嵌套写法

 1type safepending struct {
 2    pending int
 3    *sync.RWMutex
 4}
 5
 6func (s *safepending) Inc() {
 7    s.Lock()
 8    s.pending++
 9    s.Unlock()
10}
11
12func (s *safepending) Dec() {
13    s.Lock()
14    s.pending--
15    s.Unlock()
16}
17
18func (s *safepending) Get() int {
19    s.RLock()
20    n := s.pending
21    s.RUnlock()
22    return n
23}

这样safepending类型将直接拥有sync.RWMutex类型中的所有属性,好方便的写法。

interface

一个interface类型就是一个方法集,如果其他类型实现了interface类型中所有的接口,那我们就可以说这个类型实现了interface类型。举个例子:空接口interface{}包含的方法集是空,也就可以说任何类型都实现了它,也就是说interface{}可以代表任何类型,类型直接的转换看下边的例子吧。

实现一个小顶堆

首先定义一个worker结构体, worker对象中存放很多待处理的request,pinding代表待处理的request数量,以worker为元素,实现一个小顶堆,每次Pop操作都返回负载最低的一个worker。 golang标准库中提供了heap结构的容器,我们仅需要实现几个方法,就可以实现一个堆类型的数据结构了,使用时只需要调用标准库中提供的Init初始化接口、Pop接口、Push接口,就可以得到我们想要的结果。我们要实现的方法有Len、Less、Swap、Push、Pop,请看下边具体代码。另外值得一提的是,山楂君也是通过标准库中提供的例子学习到的这个知识点。

 1type Request struct {
 2    fn    func() int
 3    data  []byte
 4    op    int
 5    c     chan int
 6}
 7
 8type Worker struct {
 9    req     chan Request
10    pending int
11    index   int
12    done    chan struct{}
13}
14
15type Pool []*Worker
16
17func (p Pool) Len() int {
18    return len(p)
19}
20func (p Pool) Less(i, j int) bool {
21    return p[i].pending < p[j].pending
22}
23
24func (p Pool) Swap(i, j int) {
25    p[i], p[j] = p[j], p[i]
26    p[i].index = i
27    p[j].index = j
28}
29
30func (p *Pool) Push(x interface{}) {
31    n := len(*p)
32    item := x.(*Worker)
33    item.index = n
34    *p = append(*p, item)
35}
36
37func (p *Pool) Pop() interface{} {
38    old := *p
39    n := len(*p)
40    item := old[n-1]
41    //item.index = -1
42    *p = old[:n-1]
43    return item
44}

pool的使用

 1package main
 2
 3import (
 4    "container/heap"
 5    "log"
 6    "math/rand"
 7)
 8
 9var (
10    MaxWorks = 10000
11    MaxQueue = 1000
12)
13
14func main() {
15    pool := new(Pool)
16    for i := 0; i < 4; i++ {
17        work := &Worker{
18            req:     make(chan Request, MaxQueue),
19            pending: rand.Intn(100),
20            index:   i,
21        }
22        log.Println("pengding", work.pending, "i", i)
23        heap.Push(pool, work)
24    }
25
26    heap.Init(pool)
27    log.Println("init heap success")
28    work := &Worker{
29        req:     make(chan Request, MaxQueue),
30        pending: 50,
31        index:   4,
32    }
33    heap.Push(pool, work)
34    log.Println("Push worker: pending", work.pending)
35    for pool.Len() > 0 {
36        worker := heap.Pop(pool).(*Worker)
37        log.Println("Pop worker:index", worker.index, "pending", worker.pending)
38    }
39}

程序的运行结果如下,可以看到每次Pop的结果都返回一个pending值最小的一个work元素。

 12017/03/11 12:46:59 pengding 81 i 0
 22017/03/11 12:46:59 pengding 87 i 1
 32017/03/11 12:46:59 pengding 47 i 2
 42017/03/11 12:46:59 pengding 59 i 3
 52017/03/11 12:46:59 init heap success
 62017/03/11 12:46:59 Push worker: pending 50
 72017/03/11 12:46:59 Pop worker:index 4 pending 47
 82017/03/11 12:46:59 Pop worker:index 3 pending 50
 92017/03/11 12:46:59 Pop worker:index 2 pending 59
102017/03/11 12:46:59 Pop worker:index 1 pending 81
112017/03/11 12:46:59 Pop worker:index 0 pending 87

细心的你肯能会发现,不是work么,怎么没有goroutine去跑任务?是的山楂君这里仅是演示了小顶堆的构建与使用,至于如何用goroutine去跑任务,自己先思考一下吧。 其实加上类似于下边这样的代码就可以了

 1func (w *Worker) Stop() {
 2    w.done <- struct{}{}
 3}
 4
 5func (w *Worker) Run() {
 6    go func() {
 7        for {
 8            select {
 9            case req := <-w.req:
10                req.c <- req.fn()
11            case <-w.done:
12                break
13            }
14        }
15    }()
16}

golang的并发

golang中的并发机制很简单,掌握好goroutine、channel以及某些程序设计套路,就能用的很好。当然,并发程序设计中存在的一切问题与语言无关,只是每种语言中基础设施对此支持的程度不一,Go程序中同样都要小心。

goroutine

官方对goroutine的描述:

They're called goroutines because the existing terms—threads, coroutines, processes, and so on—convey inaccurate connotations. A goroutine has a simple model: it is a function executing concurrently with other goroutines in the same address space. It is lightweight, costing little more than the allocation of stack space. And the stacks start small, so they are cheap, and grow by allocating (and freeing) heap storage as required. Goroutines are multiplexed onto multiple OS threads so if one should block, such as while waiting for I/O, others continue to run. Their design hides many of the complexities of thread creation and management. Prefix a function or method call with the go keyword to run the call in a new goroutine. When the call completes, the goroutine exits, silently. (The effect is similar to the Unix shell's & notation for running a command in the background.)

启动一个goroutine,用法很简单:

1go DoSomething()

channel

看channel的描述:

A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type. The value of an uninitialized channel is nil.

简而言之,就是提供了goroutine之间的同步与通信机制。

共享内存?OR 通信?

Don't communicate by sharing memory; share memory by communicating

这就是Go程序中很重要的一种程序套路。拿一个具体的小应用场景来说吧:一个Map类型的数据结构,其增删改查操作可能在多个线程中进行,我们会用什么样的方案来实现呢?

  1. 增删改查操作时加锁
  2. 实现一个线程安全的Map类型
  3. 增删改查操作限定在线程T中,其他线程如果想进行增删改操作,统一发消息给线程T,由线程T来进行增删操作(假设其他线程没有Map的查询操作)

对于方案3其实就是对Go程序这种套路的小应用,这种思想当然和语言无关,但是在Go语言中通过“通信”来共享内存的思路非常容易实现,有原生支持的goroutine、channel、select、gc等基础设施,也许你会有"大消息"传递场景下的性能顾虑,但channel是支持引用类型的传递的,且会自动帮你进行垃圾回收,一个大结构体的引用类型实际上可能才占用了十几个字节的空间。这实在是省去了山楂君很多的功夫。看Go程序的具体做法:

 1type job struct {
 2    // something
 3}
 4
 5type jobPair struct {
 6    key   string
 7    value *job
 8}
 9
10type worker struct {
11    jobqueue map[string]*job // key:UserName
12    jobadd   chan *jobPair
13}
14
15// 并不是真正的map insert操作,仅发消息给另外一个线程
16func (w *worker) PushJob(user string, job *job) {
17    pair := &jobPair{
18        key:   user,
19        value: job,
20    }
21    w.jobadd <- pair
22}
23
24// 并不是真正的map delete操作,仅发消息给另外一个线程
25func (w *worker) RemoveJob(user string) {
26    w.jobdel <- user
27}
28
29func (w *worker) Run() {
30    go func() {
31        for {
32            select {
33            case jobpair := <-w.jobadd:
34                w.insertJob(jobpair.key, jobpair.value)
35            case delkey := <-w.jobdel:
36                w.deleteJob(delkey)
37            //case other channel
38            //  for _, job := range w.jobqueue {
39                    // do something use job
40            //      log.Println(job)
41            //  }
42            }
43        }
44    }()
45}
46func (w *worker) insertJob(key string, value *job) error {
47    w.jobqueue[key] = value
48    w.pending.Inc()
49    return nil
50}
51
52func (w *worker) deleteJob(key string) {
53    delete(w.jobqueue, key)
54    w.pending.Dec()
55}

线程池

模型详见下边流程图

线程池模型.png

由具体业务的生产者线程生成一个个不同的job,通过共同的Balance均衡器,将job分配到不同的worker去处理,每个worker占用一个goroutine。在job数量巨多的场景下,这种模型要远远优于一个job占用一个goroutine的模型。并且可以根据不同的业务特点以及硬件配置,配置不同的worker数量以及每个worker可以处理的job数量。

我们可以先定义个job结构体,根据业务不同,里边会包含不同的属性。

 1type job struct {
 2    conn     net.Conn
 3    opcode   int
 4    data     []byte
 5    result   chan ResultType //可能需要返回处理结果给其他channel
 6}
 7type jobPair struct {
 8    key   string
 9    value *job
10}

然后看下worker定义

 1type worker struct {
 2    jobqueue  map[string]*job // key:UserName
 3    broadcast chan DataType
 4    jobadd    chan *jobPair
 5    jobdel    chan string
 6    pending   safepending
 7    index     int
 8    done      chan struct{}
 9}
10
11func NewWorker(idx int, queue_limit int, source_limit int, jobreq_limit int) *worker {
12    return &worker{
13        jobqueue:  make(map[string]*job, queue_limit),
14        broadcast: make(chan DataType, source_limit), //4家交易所
15        jobadd:    make(chan jobPair, jobreq_limit),
16        jobdel:    make(chan string, jobreq_limit),
17        pending:   safepending{0, sync.RWMutex{}},
18        index:     idx,
19        done:      make(chan struct{}),
20    }
21}
22
23func (w *worker) PushJob(user string, job *job) {
24    pair := jobPair{
25        key:   user,
26        value: job,
27    }
28    w.jobadd <- pair
29}
30
31func (w *worker) RemoveJob(user string) {
32    w.jobdel <- user
33}
34
35func (w *worker) Run(wg *sync.WaitGroup) {
36    wg.Add(1)
37    go func() {
38        log.Println("new goroutine, worker index:", w.index)
39        defer wg.Done()
40        ticker := time.NewTicker(time.Second * 60)
41        for {
42            select {
43            case data := <-w.broadcast:
44                for _, job := range w.jobqueue {
45                    log.Println(job, data)
46                }
47            case jobpair := <-w.jobadd:
48                w.insertJob(jobpair.key, jobpair.value)
49            case delkey := <-w.jobdel:
50                w.deleteJob(delkey)
51            case <-ticker.C:
52                w.loadInfo()
53            case <-w.done:
54                log.Println("worker", w.index, "exit")
55                break
56            }
57        }
58    }()
59}
60
61func (w *worker) Stop() {
62    go func() {
63        w.done <- struct{}{}
64    }()
65}
66func (w *worker) insertJob(key string, value *job) error {
67    w.jobqueue[key] = value
68    w.pending.Inc()
69    return nil
70}
71
72func (w *worker) deleteJob(key string) {
73    delete(w.jobqueue, key)
74    w.pending.Dec()
75}

结合上边提到的小顶堆的实现,我们就可以实现一个带负载均衡的线程池了。 一种模式并不能应用于所有的业务场景,山楂君觉得重要的是针对不同的业务场景去设计或优化编程模型的能力,以上有不妥之处,欢迎吐槽或指正,喜欢也可以打赏。

参考文献

  1. https://blog.golang.org/share-memory-by-communicating
  2. https://golang.org/doc/effective_go.html
  3. http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

Golang语言社区

ID:Golangweb

www.bytedancing.com

游戏服务器架构丨分布式技术丨大数据丨游戏算法学习

原文发布于微信公众号 - Golang语言社区(Golangweb)

原文发表时间:2018-09-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏机器之心

资源 | 简单快捷的数据处理,数据科学需要注意的命令行

16950
来自专栏Golang语言社区

Go语言是彻底的面向组合的并发语言

面向组合编程从AOP的Mixin,然后到Ruby的Traits,直至DCI设计,包括Scala的trait的组合设计,这些都有一个共同特点,组合特性是显式的,也...

34860
来自专栏量化投资与机器学习

【干货】Matlab的内存问题讨论

谢谢大家支持,可以让有兴趣的人关注这个公众号。让知识传播的更加富有活力,谢谢各位读者。 很多人问我为什么每次的头像是奥黛丽赫本,我只能说她是我女神,每天看看女神...

23880
来自专栏有趣的django

PYTHON面试

大部分的面试问题,有最近要找事的老铁吗?  python语法以及其他基础部分 可变与不可变类型;  浅拷贝与深拷贝的实现方式、区别;deepcopy如果你来...

77070
来自专栏海天一树

NOIP 2010普及组初赛C/C++答案详解

1 D 2E + 03 = 2 * 103 = 2000 2E - 03 = 2 * 1 / (2 * 103) = 2 * 0.001 = 0.002

20210
来自专栏从流域到海域

《笨办法学Python》 第1课手记

《笨办法学Python》第1课手记 在powershell中打开Python输入如下代码: print "Hello World!" print "Hello...

24770
来自专栏喔家ArchiSelf

全栈Python 编程必备

Python作为一种编程语言,被称为“胶水语言”,更被拥趸们誉为“最美丽”的编程语言,从云端到客户端,再到物联网终端,无所不在,同时还是人工智能优选的编程语言。

58150
来自专栏数据小魔方

Python可视化笔记之folium交互地图

leftlet给R语言提供了很好用的交互式动态地图接口,其在Python中得API接口包名为folium(不知道包作者为何这样起名字,和leaflet已经扯不上...

64840
来自专栏java架构师

设计模式学习笔记之组合模式模式

我们常常会遇到一类具有“容器”特征的对象,他们既是容器,本身也是对象。比如,公司人员管理中的人,他们是处于不同层级,每个层的人下边,又有下属。也就是数的结构。 ...

30460
来自专栏铭毅天下

干货 | Elasticsearch Nested类型深入详解

本文通过一个例子将Nested类型适合解决的问题、应用场景、使用方法串起来, 文中所有的DSL都在Elasticsearch6.X+验证通过。

41230

扫码关注云+社区

领取腾讯云代金券