首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Aeron 是如何实现的?—— Ipc Subscription

接上文

Aeron 是什么?https://xie.infoq.cn/article/27422063a2cadcc054187135e

Aeron 中这么多空闲策略选哪个?https://xie.infoq.cn/article/41d0885f46594e90cbdba4b2b

Aeron 是如何实现的?—— Conductor https://xie.infoq.cn/article/b4953b06323cd26e3a1397874

Aeron 是如何实现的?—— Ipc Publication https://xie.infoq.cn/article/9c3c085daef8eb12d533d2f66

0. 简介

最近我们用 Aeron 实现了 Mesh agent 与 sdk 之间的共享内存通信,但是在使用过程中越来越感觉到 Aeron 框架太重了,其中很大部分功能完全用不到,有些想要自定义的逻辑很难在现有框架中实现。所以我们计划深入到 Aeron 源码中,看看它是如何实现的,最终尝试实现一个轻量的 Mesh 共享内存通信类库。上文分析了 Ipc Publication 的逻辑,本文继续分析 Ipc Subscription 的逻辑。

1. Driver Conductor - onAddIpcSubscription

在读取数据之前,需要先向 Driver Conductor 发送 ADD_SUBSCRIPTION 命令,在 Driver 中构建发送接收的关系。至于 Conductor 交互的逻辑不再赘述,直接看 Driver Conductor 处理 ADD_SUBSCRIPTION 命令的逻辑。处理逻辑的入口在 io.aeron.driver.ClientCommandAdapter,这里只关心 Ipc 的情况,找到 DriverConductor 中的处理逻辑:

构建完 IpcSubscriptionLink 之后,就返回 ON_SUBSCRIPTION_READY。然后再匹配当前的 ipcPublications,如果有 match 的,那么将订阅关系添加到对应的 IpcPublication 中:

1.1 UnsafeBufferPosition

在发送接收关系中,最核心的数据结构是 Position,记录消费的位置,本质上就是 cnc.dat 中的一个 Counter。

type id 是 SUBSCRIBER_POSITION_TYPE_ID(4),label 是 "sub-pos: ${registrationId} ${sessionId} ${streamId} ${channel} @${joinPosition}"。这个信息可以通过 io.aeron.samples.AeronStat 工具查看。

1.2 关系映射

在 SubscriptionLink 中,positionBySubscribableMap 维护着订阅的多个 Publication 的消费位置。此处之所以是多个的原因是,同一个 stream 可以有多个 session。

在 IpcPublication 中,subscriberPositions 维护着多个 subscriber 的消费位置。(tether 的逻辑暂时忽略,异常情况处理下一篇再分析)

1.3 ON_AVAILABLE_IMAGE

Driver 这边的关系信息维护好之后,向 Client 广播可用 ON_AVAILABLE_IMAGE 信息。

其中包含 Publication 生成的 logbuffer 信息,Subscription 也是直接读该共享内存。

2. Client Conductor - addSubscription

Client Conductor 的处理逻辑也很清晰,主要的处理逻辑就是响应 ON_SUBSCRIPTION_READY 和 ON_AVAILABLE_IMAGE 这两个消息。

收到 ON_SUBSCRIPTION_READY 之后,Client Conductor 构建相应的 io.aeron.Subscription 封装类,然后添加到 resourceByRegIdMap 这个映射关系中。此时调用 poll 拉取消息是不行的,因为还没有 image,也就是对应 Publication 的信息。这个信息是通过 ON_AVAILABLE_IMAGE 消息通知的,处理的方法是 onAvailableImage:

构建的 io.aeron.Image 包装类主要包含 logbuffer 和 position counter 这两个信息。接着调 Subscription 的 addImage 方法,将其添加到 images 数组中。

3. Subscription.poll & controlledPoll

此时就可以调用 poll 方法拉取消息了。

最终调用的是 image 的 poll 方法:

根据上次读的位置(首次是 joinPosition)计算对应的 term 读取。

逻辑很清楚,读数据,最后更新位置。有个细节,此处没有检查数据覆盖的情况,这说明在 Driver 中严格控制了 Publication 的最大位置。读取数据还有个 controlledPoll 方法,主干逻辑与 poll 一样,区别主要在于更新位置信息的逻辑。

4. Driver 对 publisherLimit 的维护

Driver 维护 position 的逻辑在 DriverConductor 的 trackStreamPositions 方法中。对于 ipc 场景调用的是 IpcPublication 的 updatePublisherLimit 方法:

核心逻辑就是更新 publisherLimit。publisherLimit 和 consumerPosition 的起始位置都是 Publication 的初始写入位置。这说明对于一个没有 subscriber 的 Publication 是不能写入的,当有 ADD_SUBSCRIPTION 后,就进入第一个 if 逻辑:

  1. 遍历所有 subscriberPositions,找出最大位置 maxSubscriberPosition 和最小位置 minSubscriberPosition
  2. consumerPosition 设定为最大读取位置
  3. proposedLimit 也就是提议给 Publication 的写入限制,初始值为最小读取位置加上 termWindowLength(termWindowLength 默认为 termLength 的一半,且自定义不能大于该值)
  4. 通过 tripLimit 和 tripGain 限制更新的步长,默认为 termWindowLength 的 1/8
  5. cleanBufferTo 方法清理之前的共享内存

从这里的逻辑看,ipc 场景用两个 termBuffer 就可以,三个岂不是浪费内存?再精简一下,一个 RingBuffer 看起来也可以。

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/9bcc41516b0b4dc53a628302a
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券