从本节开始,我介绍一下如何将相关数据持久化到数据库,也就是上图中蓝色的部分。
我先运行 6 个传感器和2 个协调器,这里我使用了批处理文件:
运行后,看一下 RabbitMQ 的管理控制台:
注意上面前面几个 Queue,这些 Queue 就是我们让传感器和协调器监听那两个 Fanout Exchange 时创建的,因为这两个 Exchange 不使用路由 Key 来决定接收者,我使用了空字符串“”作为这些 Queue 的名称,而RabbitMQ 就会为它们赋予一个唯一的名字。
因为目前创建的 Queue 都是临时的,如果我重新启动系统,RabbitMQ会创建另一套不同的 Queue 来完成工作,这样的话系统资源就会被慢慢的耗尽,所以这个问题需要解决。
首先修改 tools 包下的 queuetools.go 里面的GetQueue 函数,添加一个 autoDelete 参数:
GetQueue 函数会确保创建一个Queue 从而能接收到消息。刚创建它的时候,我的意图是让它作用于 Direct Exchange 和命名的 Queue。后来我对它进行了扩展使用,也可以应用于匿名的 Queue。
再说一下 autoDelete 参数的作用是:若值为 true,那么如果一个 Queue 它没有被注册任何的使用者,这个 Queue 就会被删除。针对上述问题中的临时 Queue,这就是我想要的效果。但是针对传感器的数据 Queue,我还是希望在系统重启后,这些 Queue 能够保留。
所以,我为该函数添加了一个 autoDelete 参数,在创建 Queue 的时候,可以对 autoDelete 进行设置。
有三个调用该函数的地方需要调整代码,先打开 sensors.go:
调整 queuelistener.go 里面的调用:
这里得到的临时 Queue 是用来监听传感器上线时或响应协调器发现请求时来发布数据 Queue 名称的。
这里函数调用的 autoDelete 参数也设置为 true,从而让它们可以自动被清除掉。
把之前的 Queue 都删掉:
然后再运行 5 个传感器和2 个协调器:
现在又是很多的 Queue。
然后我们再停掉所有的传感器和协调器:
可以看到传感器传送数据的 Queue 被保留了,而其它的临时 Queue 都自动删除掉了,这就是我们想要的效果。
到目前为止,系统中只发布了一种类型的事件(接收到传感器数据时的事件),而且目前还没有任何使用者监听这个事件。接下来我们就要完善事件这部分功能了,但首先必须做出一些优化修改,以便能真正满足需求。
目前 eventaggregator.go 里面包含了所有添加监听者以及向监听者发布事件的方法。
但现在的情况是事件的使用者也知道如何自行发布事件,这点不太好,因为它们不需要这样做。代码修改如下:
现在 EventAggregator 被泛化了,我也可以发布其它类型的事件了。
来到 queuelistener.go,我想在协调器发现数据源之后,发布一个事件:
这个事件的名称叫做 DataSourceDiscovered,事件数据就是 Queue 的名称,由于这个参数的类型是 interface{},所以它可以正常的传递进去。
目前,我们整个系统的设计一共有三层,而数据源和数据的使用者是通过协调器分开的。这样做的好处是,关于如何处理消息的业务逻辑都集中在协调器这一层上面了,而数据源和数据使用者层仅关注它们自身的任务即可。
为了达到这个目的,需要在 coordinator 目录下创建一个 databaseconsumer.go 文件:
这个文件的作用是监听整个系统发出的事件,并决定哪些事件可以转发到数据管理包(我一会要建立的)。
首先看一下 dataconsumer.go 文件的内容:
下面看一看 SubscribeToDataEvent 方法:
让其传入 EventAggregator 作为参数并赋值给 QueueListener 的 ea 字段。