前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊eventhorizon的EventBus

聊聊eventhorizon的EventBus

原创
作者头像
code4it
修改2021-04-02 09:55:04
5000
修改2021-04-02 09:55:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下eventhorizon的EventBus

EventBus

eventhorizon/eventbus.go

代码语言:javascript
复制
type EventBus interface {
    EventHandler

    // AddHandler adds a handler for an event. Returns an error if either the
    // matcher or handler is nil, the handler is already added or there was some
    // other problem adding the handler (for networked handlers for example).
    AddHandler(context.Context, EventMatcher, EventHandler) error

    // Errors returns an error channel where async handling errors are sent.
    Errors() <-chan EventBusError

    // Wait wait for all handlers to be cancelled by their context.
    Wait()
}

type EventHandler interface {
    // HandlerType is the type of the handler.
    HandlerType() EventHandlerType

    // HandleEvent handles an event.
    HandleEvent(context.Context, Event) error
}

type EventMatcher interface {
    // Match returns true if the matcher matches an event.
    Match(Event) bool
}

EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait方法

EventBus

eventhorizon/eventbus/local/eventbus.go

代码语言:javascript
复制
type EventBus struct {
    group        *Group
    registered   map[eh.EventHandlerType]struct{}
    registeredMu sync.RWMutex
    errCh        chan eh.EventBusError
    wg           sync.WaitGroup
    codec        eh.EventCodec
}

// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {
    data, err := b.codec.MarshalEvent(ctx, event)
    if err != nil {
        return fmt.Errorf("could not marshal event: %w", err)
    }

    return b.group.publish(ctx, data)
}

EventBus定义了group、registered、registeredMu、errCh、wg、codec属性;HandleEvent方法先序列化event,然后通过group.publish发布event

Group

eventhorizon/eventbus/local/eventbus.go

代码语言:javascript
复制
type Group struct {
    bus   map[string]chan []byte
    busMu sync.RWMutex
}

// NewGroup creates a Group.
func NewGroup() *Group {
    return &Group{
        bus: map[string]chan []byte{},
    }
}

func (g *Group) publish(ctx context.Context, b []byte) error {
    g.busMu.RLock()
    defer g.busMu.RUnlock()

    for _, ch := range g.bus {
        // Marshal and unmarshal the context to both simulate only sending data
        // that would be sent over a network bus and also break any relationship
        // with the old context.
        select {
        case ch <- b:
        default:
            log.Printf("eventhorizon: publish queue full in local event bus")
        }
    }

    return nil
}

// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {
    defer b.wg.Done()

    for {
        select {
        case data := <-ch:
            // Artificial delay to simulate network.
            time.Sleep(10 * time.Millisecond)

            event, ctx, err := b.codec.UnmarshalEvent(ctx, data)
            if err != nil {
                err = fmt.Errorf("could not unmarshal event: %w", err)
                select {
                case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
                default:
                    log.Printf("eventhorizon: missed error in local event bus: %s", err)
                }
                return
            }

            // Ignore non-matching events.
            if !m.Match(event) {
                continue
            }

            // Handle the event if it did match.
            if err := h.HandleEvent(ctx, event); err != nil {
                err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())
                select {
                case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
                default:
                    log.Printf("eventhorizon: missed error in local event bus: %s", err)
                }
            }
        case <-ctx.Done():
            return
        }
    }
}

Group的publish方法遍历bus的channel,通过select写入event;handle方法循环select读取event,然后通过m.Match(event)判断是符合,是的话执行h.HandleEvent

小结

eventhorizon的EventBus接口内嵌了EventHandler接口,定义了AddHandler、Errors、Wait方法。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • EventBus
  • EventBus
  • Group
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档