最近项目中需要用到观察者模式来实现一些逻辑,如某些操作的数据变更会影响到同项目中另一模块的数据。
使用Java时可以用 Google Guava 中的 EventBus 来轻松实现,但是在go中暂无相似类库(可能有我不知道)。
由于是较为轻量级的应用,不想引入MQ这些外部实现,于是想到封装一个简单的实现。
文中的代码都放在 https://github.com/kakisong/eventbus-go
由于goroutine天生的优势,所以在消息的转发过程会非常方便。
所以这里简单分为两个步骤:
由于我们想实现的是根据接收到的消息类型来决定消息转发的对应函数,这样使用起来就很方便,只需要确定监听的消息类型即可注册使用。
如:
现在有两种消息类型
EventTypeAEventTypeB
四个函数func1(EventTypeA)func2(EventTypeB)func3(EventTypeB)func4(EventTypeB, AnotherParam)那么我们现在需要实现的几个条件:
func2与func3func2与func3func1与func4上代码
var (
// 注册锁
registerMu sync.RWMutex
// 监听函数存放处
listeners = make(map[string]map[string]reflect.Value)
)
// 监听注册
func Register(handlerFunc interface{}) error {
registerMu.Lock()
defer registerMu.Unlock()
funcType := reflect.TypeOf(handlerFunc)
// 参数数量校验
if err := checkFunc(funcType); err != nil {
return err
}
paramName := generateParameterName(funcType)
funcName := getFunctionName(handlerFunc)
funcMap, ok := listeners[paramName]
if ok {
_, ok := funcMap[funcName]
// 重复注册校验
if ok {
logrus.Errorf("方法 %s 已经被注册", funcName)
return errors.New("该方法已被注册")
}
funcMap[funcName] = reflect.ValueOf(handlerFunc)
listeners[paramName] = funcMap
logrus.Infof("方法 %s 监听注册成功", funcName)
} else {
logrus.Infof("方法 %s 监听注册成功", funcName)
listeners[paramName] = map[string]reflect.Value{funcName: reflect.ValueOf(handlerFunc)}
}
return nil
}
// 检测参数个数是否合法
func checkFunc(t reflect.Type) error {
if t.Kind() != reflect.Func {
return errors.New("只可使用方法进行监听")
}
if t.NumIn() < 1 {
return errors.New("非标准方法,参数不可为空")
}
if t.NumIn() > 1 {
return errors.New("非标准方法,只可拥有一个参数")
}
return nil
}
// 获取参数类型名
func generateParameterName(t reflect.Type) string {
in := t.In(0)
return in.PkgPath() + "." + in.Name()
}
// 获取函数名
func getFunctionName(f interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
}简单说明
listeners,其实go的反射效率不高,所以这里存放反射后的对象完成了监听函数的注册之后,接下来就是对发送过来的消息进行处理
var msgChan = make(chan interface{}, 100)// 发送消息
func Send(msg interface{}) {
msgChan <- msg
}// 监听
func listen(msgChan <-chan interface{}) {
for {
select {
case msg := <-msgChan:
call(msg)
default:
}
}
}
// 调用监听方法
func call(param interface{}) {
registerMu.Lock()
defer registerMu.Unlock()
paramName := getParamName(param)
funcMap, ok := listeners[paramName]
if ok {
for _, method := range funcMap {
go method.Call([]reflect.Value{reflect.ValueOf(param)})
}
}
}
// 获取类型名
func getParamName(param interface{}) string {
t := reflect.TypeOf(param)
return t.PkgPath() + "." + t.Name()
}func init() {
go listen(msgChan)
}这里直接放在init()方法中,使用时只要import我们的包就可以启动监听
放上测试代码
func TestSend(t *testing.T) {
Register(PrintFunc)
Send(TestEvent{
Code: 0,
Msg: "测试消息",
})
time.Sleep(1 * time.Second)
}
func PrintFunc(event TestEvent) {
fmt.Printf("PrintFuncRun: code=%v, msg=%s\n", event.Code, event.Msg)
}测试结果

这里我们利用了简洁的goroutine来实现消息的监听与消费,对外暴露的只有两个方法,我们不用去定义topic,不用去处理通道,由消息类型来控制回调的监听函数,在项目中的轻量级使用应该是开箱即用的。
RegisterSend
所以使用起来也是十分简单方便的原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。