前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go 模式

Go 模式

原创
作者头像
王磊-字节跳动
修改2021-08-21 21:34:47
1.2K0
修改2021-08-21 21:34:47
举报
文章被收录于专栏:01ZOO01ZOO

最近有个项目 go-patterns 挺火的,目前已经 16.9k star 了(虽然是项目还是不太完善/健全的状态)。本文也参考go-patterns 列举几个个人认为比较重要的 go patterns, 这里的 pattern 并不是设计模式, 更多是是广义的在 golang 中的一些有效的设计和开发模式.

观察者模式

这是一种很常见的模式,但是在 golang 中,这种模式能够提供更多有用/高级的选项。比如 我们可以定义三种消费者:第一种,生产者生产的消息会阻塞,等所有消费者都消费完,第二种,生产者不等消费者,生产完消息就返回,消费者异步消费;第三种,消费者并行消费,生产者等所有消费者都消费完再返回。

代码语言:txt
复制
模式1: [生产] --> [消费1] --> [消费2] --> [生产返回]
模式1: [生产] --> [生产返回]
           --> [消费1] 
           --> [消费2] 

模式1: [生产] -->[消费1] --> [生产返回]
            --> [消费2] 

核心代码如下:

代码语言:txt
复制
type EventType string

type Event struct {
	EventType EventType
	Data      interface{}
}

type Subscriber interface {
	// 处理 Event
	HandleEvent(ctx context.Context, event *Event) error
}

type EventBus interface {
	Subscribe(topic EventType, s Subscriber) 	// Subscribe 注册会阻塞 Publish 方法, Subscriber 会依次执行
	SubscribeAsync(topic EventType, s Subscriber)	// SubscribeAsync 注册不会阻塞 Publish 方法, Subscriber 会并发执行
	SubscribeAsyncWait(topic EventType, s Subscriber)	// SubscribeAsyncWait 注册会阻塞 Publish 方法, Subscriber 会并发执行, Publish 会在所有 Subscriber 执行完成之后返回
	Unsubscribe(topic EventType, s Subscriber) // Unsubscribe 取消注册
	Publish(ctx context.Context, e *Event) error	// 发送消息
}

type executeMode string
var (
	executeModeSync      executeMode = "sync"
	executeModeAsync     executeMode = "async"
	executeModeAsyncWait executeMode = "async_wait"
)

// EventBusImp
type EventBusImp struct {
	syncHandlers      map[EventType][]*eventHandler
	asyncHandlers     map[EventType][]*eventHandler
	asyncWaitHandlers map[EventType][]*eventHandler
	lock              sync.Mutex     // a lock for the map
	wait              sync.WaitGroup // shared wait for async
}

type eventHandler struct {
	name string
	s    Subscriber
}

// New returns new EventBus with empty handlers.
func New() *EventBusImp {
	b := &EventBusImp{
		make(map[EventType][]*eventHandler),
		make(map[EventType][]*eventHandler),
		make(map[EventType][]*eventHandler),
		sync.Mutex{},
		sync.WaitGroup{},
		&defaultExecutor{},
	}
	return b
}

// Subscribe 注册会阻塞 Publish 方法, Subscriber 会依次执行
func (bus *EventBusImp) Subscribe(eventType EventType, s Subscriber) {
	bus.subscribe(eventType, executeModeSync, s)
}

// SubscribeAsync 注册不会阻塞 Publish 方法, Subscriber 会并发执行
func (bus *EventBusImp) SubscribeAsync(eventType EventType, s Subscriber) {
	bus.subscribe(eventType, executeModeAsync, s)
}

// SubscribeAsyncWait 注册会阻塞 Publish 方法, Subscriber 会并发执行, Publish 会在所有 Subscriber 执行完成之后返回
func (bus *EventBusImp) SubscribeAsyncWait(eventType EventType, s Subscriber) {
	bus.subscribe(eventType, executeModeAsyncWait, s)
}

func (bus *EventBusImp) subscribe(eventType EventType, mode executeMode, s Subscriber) {
	bus.lock.Lock()
	defer bus.lock.Unlock()
	h := &eventHandler{name: subscriberName(s), s: s}
	switch mode {
	case executeModeAsync:
		bus.asyncHandlers[eventType] = append(bus.asyncHandlers[eventType], h)
	case executeModeAsyncWait:
		bus.asyncWaitHandlers[eventType] = append(bus.asyncWaitHandlers[eventType], h)
	default:
		bus.syncHandlers[eventType] = append(bus.syncHandlers[eventType], h)
	}
}

func (bus *EventBusImp) Publish(ctx context.Context, e *Event) error {
	var errList []error
	handlers := bus.syncHandlers
	if l, ok := handlers[e.EventType]; ok {
		for index := range l {
			errList = append(errList, bus.invoke(l[index], ctx, e, executeModeSync))
		}
	}

	handlers = bus.asyncHandlers
	if l, ok := handlers[e.EventType]; ok {
		for index := range l {
			innerIndex := index
			bus.wait.Add(1)
			err := go func() error {
				defer bus.wait.Done()
				return bus.invoke(l[innerIndex], context.Background(), e, executeModeAsync)
			}()
			errList = append(errList, err)
		}
	}

	handlers = bus.asyncWaitHandlers
	if l, ok := handlers[e.EventType]; ok && len(l) > 0 {
		waitG := sync.WaitGroup{}
		var err1List = make([]error, len(l))
		for index := range l {
			innerIndex := index
			waitG.Add(1)
			err := go func() error {
					defer waitG.Done()
					err1 := bus.invoke(l[innerIndex], ctx, e, executeModeAsyncWait)
					err1List[innerIndex] = err1
					return err1
				}()
			err1List[index] = err
		}
		waitG.Wait()
		errList = append(errList, err1List...)
	}

	return errors.NewAggregate(errList)
}

基于生产者消费者模式的并发

这种模式是 golang 中的常见并发模式,完成这样的目标:有一组任务,给定指定的并发度,把这批任务完成并返回。

下面是这个模式的核心实现,注意使用的时候,甚至不需要加锁。

代码语言:txt
复制
type DoWorkPieceFunc func(piece int)
func Parallelize(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
	var stop <-chan struct{}
	if ctx != nil {
		stop = ctx.Done()
	}

	toProcess := make(chan int, pieces)
	for i := 0; i < pieces; i++ {
		toProcess <- i
	}
	close(toProcess)

	if pieces < workers {
		workers = pieces
	}

	wg := sync.WaitGroup{}
	wg.Add(workers)
	for i := 0; i < workers; i++ {
		go func() {
			defer utilruntime.HandleCrash()
			defer wg.Done()
			for piece := range toProcess {
				select {
				case <-stop:
					return
				default:
					doWorkPiece(piece)
				}
			}
		}()
	}
	wg.Wait()
}


# 使用无须加锁
var tasks []task
var results = make(result, len(tasks)
Parallelize(context.Background(), 5, len(tasks), func(piece int){
    results[piece] = do_some_work_with_task(tasks[piece])
})

构建模式

比如我们有一个复杂的可配置的 client,他有很多可选的参数,那我们在设计 client 的 新建方法的时候通常有有两种方式,一种是 Options 模式,如下:

代码语言:txt
复制
type Options struct {
	A         int
	B         int
	Flags       int
}

type Option func(*Options)

func OptionA(a int) Option {
	return func(args *Options) {
		args.A = a
	}
}

func OptionB(b int) Option {
	return func(args *Options) {
		args.B = b
	}
}

type Client struct{
    o *Options
}

func NewClient(options...Option) *Client{
    c := &Client{}
    for _, option := range options{
        option(c.o)
    }
    return c
}

# 使用
client := NewClient(OptionA(1), OptionB(2)

另一种模式则是 Builder 模式,使用链式的方法构建 client,这里不必有 build 方法

代码语言:txt
复制
func NewClient() *Client{
    return &Client{}
}

func (c *Client)WithA(a int) *Client{
    c.o.A = a
    return c
}

func (c *Client)WithB(b int) *Client{
    c.o.B = b
    return c
}


# 使用
client = NewClient().WithA(1).WithB(2)

可选参数

golang 中没有严格的可选参数,但是可以通过可变参数变相的实现可选参数,比如下面一个例子,我们把 A, B 作为可选参数,并且 A, B 严格按照先后顺序,那么我们就可以这样做:

代码语言:txt
复制
func NewClient(options...int) *Client{
    c := &Client{}
    if len(options) >= 1{
        c.A = options[0]
    }
    if len(options) >= 2{
        c.B = options[1]
    }
}


# 使用
client := NewClient()
client := NewClient(1)
client := NewClient(1, 2)

context/ big-context 模式

context 是 golang 种非常常用的一种模式,详细可以参考我这篇文章:golang context实战; 这里我们讲另一种 context 的更复杂的使用方式:bigcontext,这种模式的方式为:把与 context 所有生命周期相关的东西都放在这个 context 里面,成为一个 big context【他本身因为包含了 context.Context, 所以他也是 context Interface 的一个实现】,这种模式尤其在实现网络服务的时候会很方便,一些常用得信息都可以在 context 中获取,并且大部分时候并不需要使用锁。比如下面的一个实现:

代码语言:txt
复制
// bContext wraps context and provides additional functionalities
type bContext struct {
	context.Context
	tracer tracer.Tracer       // tracer: optional, use global tracer if not set
	wait   *sync.WaitGroup     // wait shared between child and parent
	pool   GoPool              // pool: optional
	cache  map[CacheType]Cache // cache should be thread safe
	err    error               // go routine error
	db     *gorm.DB            // gorm db
}

type CacheType = string

var CacheNotFoundErr = errors.New("cache not found")

type defaultPool struct {
}

func (p *defaultPool) Submit(task func()) error {
	go func() {
		defer runtime.HandleCrash()
		task()
	}()
	return nil
}

// NewBContext init a bContext with given ctx
func NewBContext(ctx context.Context) BContext {
	return &bContext{
		Context: ctx,
		wait:    &sync.WaitGroup{},
		tracer:  tracer.GlobalTracer(),
		cache:   map[string]Cache{},
		pool:    &defaultPool{},
	}
}

// NewContextWithBContext will inherit old bContext's pool/tracer/db/wait..
func NewContextWithBContext(ctx context.Context, c *bContext) BContext {
	return &bContext{
		Context: ctx,
		pool:    c.pool,
		tracer:  c.tracer,
		wait:    c.wait,
		cache:   c.cache,
		db:      c.db,
	}
}

func (ctx *bContext) WithCache(cacheType CacheType, cache Cache) BContext {
	ctx.cache[cacheType] = cache
	return ctx
}

// refer: https://gorm.io/zh_CN/docs/context.html#Chi-%E4%B8%AD%E9%97%B4%E4%BB%B6%E7%A4%BA%E4%BE%8B
func (ctx *bContext) WithDb(db *gorm.DB) BContext {
	ctx.db = db
	return ctx
}

func (ctx *bContext) WithTracer(t tracer.Tracer) BContext {
	ctx.tracer = t
	return ctx
}

func (ctx *bContext) WithGoPool(p GoPool) BContext {
	ctx.pool = p
	return ctx
}

func (ctx *bContext) WithValue(k, v interface{}) BContext {
	return NewContextWithBContext(context.WithValue(ctx.Context, k, v), ctx)
}

func (ctx *bContext) Go(fn func(ctx BContext) error) {
	ctx.wait.Add(1)
	err := ctx.pool.Submit(func() {
		defer ctx.wait.Done()
		if err1 := fn(ctx); err1 != nil {
		}
	})
	if err != nil {
		ctx.wait.Done()
	}
}

func (ctx *bContext) Wait() {
	ctx.wait.Wait()
}

func (ctx *bContext) Transaction(fc func(ctx BContext, tx *gorm.DB) error) (err error) {
	return ctx.TransactionWithDb(ctx.db, fc)
}

func (ctx *bContext) Branch(branch string) BContext {
	_, newCtx := ctx.tracer.StartServerSpan(ctx.Context, branch)
	return NewContextWithBContext(newCtx, ctx)
}

func (ctx *bContext) BranchWrapped(branch string, fn func(BContext) error) error {
	child := ctx.Branch(branch)
	defer child.Span().Finish()

	if err := fn(child); err != nil {
		child.Span().LogFields(log.String("message", err.Error()))
		return err
	}
	return nil
}

// Get span from current context
func (ctx *bContext) Span() tracer.Span {
	span := tracer.GetSpanFromContext(ctx)
	if span == nil {
		span, _ = tracer.StartServerSpan(ctx, "default")
	}
	return span
}

// SetWithCache inserts or updates the specified key-value pair.
func (ctx *bContext) SetWithCache(cacheType CacheType, key, value interface{}) error {
	if cache, ok := ctx.cache[cacheType]; !ok {
		return CacheNotFoundErr
	} else {
		return cache.Set(key, value)
	}
}

// GetWithCache returns the value for the specified key if it is present in the cache.
func (ctx *bContext) GetWithCache(cacheType CacheType, key interface{}) (interface{}, error) {
	if cache, ok := ctx.cache[cacheType]; !ok {
		return nil, CacheNotFoundErr
	} else {
		return cache.Get(key)
	}
}

依赖倒置/依赖注入模式

golang 里面没有方便的依赖注入,这里可以参考本人做的一个实现, 这个实现不同于 wire 之类的实现,特点是不用生成代码,完全运行时处理,同时实现极简,但是能满足大部分得需求。 具体的用法如下:

代码语言:txt
复制
类型容器,支持依赖倒置、自动注入, 支持以下几种用法

// 1. 注册 type + new 函数, provide 返回新建实例
type ExampleA struct {
    d map[string]string
}

func (a *ExampleA) SayHello() {
    fmt.Printf("say hello from: %+v\n", a)
}

type SayHelloInterface interface {
    SayHello()
}

func TestContainerExample1(t *testing.T) {
    c := newThreadSafeContainer()
    
    c.RegisterType(&ExampleA{}, func() (interface{}, error) {
        t.Logf("ExampleA's new function called")
        return &ExampleA{d: map[string]string{"1": "2"}}, nil
    })
    ret, err := c.Provide(&ExampleA{})
    assert.Nil(t, err)
    assert.Equal(t, map[string]string{"1": "2"}, ret.(*ExampleA).d)
}

//  2.注册单例 type + new 函数, provide 返回单例
func TestContainerExample2(t *testing.T) {
    c := newThreadSafeContainer()
    
    c.RegisterType(&ExampleA{}, func() (interface{}, error) {
        t.Logf("ExampleA's new function called")
        return &ExampleA{d: map[string]string{"1": "2"}}, nil
    })
    ret, err := c.Provide(&ExampleA{})
    assert.Nil(t, err)
    assert.Equal(t, map[string]string{"1": "2"}, ret.(*ExampleA).d)
}


// 3. 注册单例 object, provide 返回单例
func TestContainerExample3(t *testing.T) {
    c := newThreadSafeContainer()
    exampleA := &ExampleA{d: map[string]string{"1": rand.String(10)}}
    c.RegisterObjectSingleton(exampleA)
    ret, err := c.Provide(&ExampleA{})
    assert.Nil(t, err)
    ret2, err := c.Provide(&ExampleA{})
    assert.Nil(t, err)
    assert.Equal(t, ret, ret2)
}


// 4. 注册单例 type + new 函数, provide 的时候用 interface 获取单例
func TestContainerExample4(t *testing.T) {
    c := newThreadSafeContainer()
    
    c.RegisterType(&ExampleA{}, func() (interface{}, error) {
        t.Logf("ExampleA's new function called")
        return &ExampleA{d: map[string]string{"1": "2"}}, nil
    })
    ret, err := c.Provide((*SayHelloInterface)(nil))
    assert.Nil(t, err)
    assert.Equal(t, map[string]string{"1": "2"}, ret.(*ExampleA).d)
    
    ret2, err := c.Provide(reflect.TypeOf((*SayHelloInterface)(nil)).Elem())
    assert.Nil(t, err)
    assert.Equal(t, ret, ret2)
}

//  5. 注册单例 type + new 函数, provide 的时候用 interface 获取单例, 同时使用自动注入字段
type ExampleB struct {
    d         map[string]string
    exampleA1 *ExampleA         `autowired:"true"`
    exampleA2 SayHelloInterface `autowired:"true"`
}

func (a *ExampleB) Foo() {
    fmt.Printf("foo from: %+v\n", a)
}

type FooInterface interface {
    Foo()
}

func TestContainerExample5(t *testing.T) {
    c := newThreadSafeContainer()
    
    c.RegisterTypeSingleton(&ExampleA{}, func() (interface{}, error) {
        t.Logf("ExampleA's new function called")
        return &ExampleA{d: map[string]string{"1": rand.String(5)}}, nil
    })
    
    c.RegisterTypeSingleton(&ExampleB{}, func() (interface{}, error) {
        t.Logf("ExampleB's new function called")
        return &ExampleB{d: map[string]string{"3": "4"}}, nil
    })
    
    ret, err := c.Provide((*FooInterface)(nil))
    assert.Nil(t, err)
    assert.Equal(t, map[string]string{"3": "4"}, ret.(*ExampleB).d)
    assert.Equal(t, ret.(*ExampleB).exampleA1, ret.(*ExampleB).exampleA2)
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 观察者模式
  • 基于生产者消费者模式的并发
  • 构建模式
  • 可选参数
  • context/ big-context 模式
  • 依赖倒置/依赖注入模式
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档