我正在寻找消息调度的最佳算法。我所说的消息调度是指当我们有许多不同速率的消费者时,在总线上发送消息的一种方式。
例如:假设我们有到Dn的数据D1。D1每5ms发送一次C1,C2每19ms发送一次,C3每30ms发送一次,Cn每Rn ms发送一次。Dn每10ms发送到C1,C2每31ms发送一次,Cn每50ms发送一次
以最佳性能(CPU、内存、IO)调度此操作的最佳算法是什么?
问候
发布于 2018-06-06 23:38:44
我可以想到相当多的选择,每个选择都有自己的成本和收益。这实际上归结于你的需求是什么--什么才是真正为你定义的“最佳”。我在下面给出了几种可能的代码,希望能帮助你入门。
选项1:每个时间单位执行以下命令(在您的示例中为毫秒)
func callEachMs
time = getCurrentTime()
for each datum
for each customer
if time % datum.customer.rate == 0
sendMsg()
这样做的优点是不需要一致存储的内存--您只需在每个时间单位检查是否应该发送消息。这也可以处理不是以time == 0
发送的消息--只需存储最初以速率为模发送消息的时间,并用if time % datum.customer.rate == data.customer.firstMsgTimeMod
替换条件。
这种方法的一个缺点是它完全依赖于总是以1ms的速率被调用。如果CPU上的另一个进程导致延迟,并且它错过了一个周期,那么您可能完全错过了发送消息(而不是延迟发送消息)。
选项2:维护元组列表,其中每个条目表示在该毫秒内需要完成的任务。让你的列表至少和最长速率除以时间单位一样长(如果你的最长速率是50毫秒,你要用毫秒来计算,那么你的列表必须至少有50长)。启动程序时,请在第一次将消息发送到队列时放置。然后,每次您发送消息时,请在下次发送该列表中的消息时进行更新。
func buildList(&list)
for each datum
for each customer
if list.size < datum.customer.rate
list.resize(datum.customer.rate+1)
list[customer.rate].push_back(tuple(datum.name, customer.name))
func callEachMs(&list)
for each (datum.name, customer.name) in list[0]
sendMsg()
list[customer.rate].push_back((datum.name, customer.name))
list.pop_front()
list.push_back(empty list)
这具有避免许多不必要的模数计算选项1所需的优点。然而,这也带来了内存使用量增加的代价。如果各种消息的速率有很大的差异,这种实现也不会有效率(尽管您可以修改它以更有效地处理具有较长速率的算法)。它仍然需要每毫秒调用一次。
最后,您必须非常仔细地考虑您使用的数据结构,因为这将对其效率产生巨大的影响。因为每次迭代都是从前面弹出,从后面推,并且列表的大小是固定的,所以您可能想要实现一个circular buffer来避免不必要的值移动。对于元组列表,由于它们只会迭代(不需要随机访问),而且需要频繁添加,因此单链表可能是最佳解决方案。
。
显然,有更多的方法可以做到这一点,但希望这些想法可以让你开始。此外,请记住,您在其上运行此程序的系统的性质可能会对哪种方法工作得更好有很大的影响,或者您是否想做完全不同的事情。例如,这两种方法都要求能够以一定的速率可靠地调用它们。我也没有描述并行化实现,如果您的应用程序支持它们,这可能是最好的选择。
发布于 2018-06-07 20:02:12
就像Helium_1s2描述的那样,还有第二种方法,它基于我所说的计划表,这就是我现在使用的方法,但这种解决方案有其局限性。
假设我们有一个数据要发送,两个消费者C1和C2:
就像你看到的,我们必须提取我们的调度表,我们必须识别重复传输周期和空闲最小周期的值。事实上,在最小的时间间隔(例如1ms、1ns、1mn或1h )上循环是无用的,但它并不总是最好的周期,我们可以像下面这样优化这个循环。
例如1 (C1在6,C2在9),我们注意到存在从0到18重复的周期。两个连续发送事件的最小差值等于3。因此:
HCF(6,9) = 3 = IDLE MINIMUM PERIOD
LCM(6,9) = 18 = transmission cycle length
LCM/HCF = 6 = size of our schedule table
明细表是:
发送循环看起来像这样:
while(1) {
sleep(IDLE_MINIMUM_PERIOD); // free CPU for idle min period
i++; // initialized at 0
send(ScheduleTable[i]);
if (i == sizeof(ScheduleTable)) i=0;
}
这种方法的问题是,如果LCM增长,这个数组就会增长,如果我们有不好的组合,比如rate =质数等,就会出现这种情况。
https://stackoverflow.com/questions/50698807
复制相似问题