我可以使用一些伪代码,或者更好的Python。我正在尝试为Python IRC机器人实现一个速率限制队列,它部分工作,但是如果有人触发的消息少于限制(例如,速率限制是每8秒5条消息,而人只触发4条),并且下一个触发器超过8秒(例如,16秒后),机器人发送消息,但是队列变满,机器人等待8秒,即使由于8秒周期已经过去而不需要它。
发布于 2009-03-20 23:15:27
这里是simplest algorithm,如果您只想在消息到达太快时丢弃它们(而不是对它们进行排队,这是有意义的,因为队列可能会变得任意大):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
在这个解决方案中没有数据结构、计时器等,它工作得很干净:)要看到这一点,“允许”以每秒最多5/8个单位的速度增长,即每8秒最多5个单位。每转发一条消息就会扣除一个单位,因此每8秒发送的消息不能超过5条。
请注意,rate
应该是一个整数,即没有非零的小数部分,否则算法将无法正常工作(实际的速率不会是rate/per
)。例如,因为allowance
永远不会增长到1.0,所以rate=0.5; per=1.0;
无法工作。但是rate=1.0; per=2.0;
运行得很好。
发布于 2009-03-20 19:51:44
在你的入队函数之前使用这个装饰符@RateLimited(ratepersec)。
基本上,这会检查自上次以来是否经过了1/rate秒,如果没有,则等待剩余时间,否则不等待。这实际上将您限制为速率/秒。装饰器可以应用于任何你想要的速率限制的函数。
在您的示例中,如果您希望每8秒最多发送5条消息,请在sendToQueue函数之前使用@RateLimited(0.625)。
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
发布于 2009-03-20 19:04:31
令牌存储桶实现起来相当简单。
从一个有5个令牌的桶开始。
每隔5/8秒:如果存储桶的令牌少于5个,则添加一个。
每次要发送消息时:如果存储桶有≥1令牌,则取出一个令牌并发送消息。否则,等待/丢弃消息/随便什么。
(显然,在实际代码中,您将使用整数计数器而不是真正的标记,并且您可以通过存储时间戳来优化每5/8步)
再读一遍问题,如果速率限制每8秒完全重置一次,那么这里是一个修改:
从很久以前(例如,在纪元)的时间戳last_send
开始。另外,从相同的5令牌存储桶开始。
打破每5/8秒的规则。
每次你发送一条消息:首先,检查8秒前是否有last_send
≥。如果是,则填充存储桶(设置为5个令牌)。其次,如果存储桶中有令牌,则发送消息(否则,丢弃/等待/等)。第三,将last_send
设置为now。
这应该适用于该场景。
实际上,我已经使用这样的策略(第一种方法)编写了一个IRC机器人。它是用Perl编写的,而不是Python,但这里有一些代码来演示:
这里的第一部分处理向存储桶中添加令牌。您可以看到基于时间添加令牌的优化(倒数第二行),然后最后一行将存储桶内容钳制到最大值(MESSAGE_BURST)
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$conn是一种传递的数据结构。这是在一个常规运行的方法中(它会计算下一次有事情要做的时间,然后休眠那么长时间或直到它获得网络流量)。该方法的下一部分处理发送。这相当复杂,因为消息具有与其关联的优先级。
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
这是第一个队列,无论如何都会运行。即使它会因为洪水而导致我们的连接中断。用于极其重要的事情,例如响应服务器的PING。接下来,其余的队列:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
最后,存储桶状态被保存回$conn数据结构(实际上在该方法中稍晚一点;它首先计算多长时间内将有更多工作)
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
如您所见,实际的存储桶处理代码非常小--大约四行。代码的其余部分是优先级队列处理。机器人具有优先级队列,因此,例如,与其聊天的人不能阻止它执行重要的踢/禁任务。
https://stackoverflow.com/questions/667508
复制相似问题