我正在开发一个带有模块pyTelegramBotAPI的机器人,这是通过网络挂钩安装的Flask+Gunicorn作为网络挂钩的服务器。Gunicorn正在与5个工人合作,以提高速度,我的项目结构看起来像这样:
app.py
bot.py在bot.py中,我有一个处理更新的函数:
def process_the_update(update):
logger.info(update)
update = types.Update.de_json(update)
bot.process_new_updates([update])在bot中,我导入了这个函数,所以,每当有更新时,app.py就会调用这个函数,bot就会处理更新。在我的机器人中,用户可以调用一个命令,该命令将使用外部api来获取一些信息。问题是,这个外部api有每秒3个请求的限制。我需要配置一个机器人与这样的速率限制。首先,我想用Queue来实现,代码如下:
lock_queue = Queue(1)
requests_queue = Queue(3)
def api_request(argument):
if lock_queue.empty():
try:
requests_queue.put_nowait(time.time())
except queue.Full:
lock_queue.put(1)
first_request_time = requests_queue.get()
logger.info('First request time: ' + str(first_request_time))
current_time = time.time()
passed_time = current_time - first_request_time
if passed_time >= 1:
requests_queue.put_nowait(time.time())
lock_queue.get()
else:
logger.info(passed_time)
time.sleep(1 - passed_time)
requests_queue.put_nowait(time.time())
lock_queue.get()
else:
lock_queue.put(1)
first_request_time = vk_requests_queue.get()
logger.info('First request time: ' + str(first_request_time))
current_time = time.time()
passed_time = current_time - first_request_time
if passed_time >= 1:
requests_queue.put_nowait(time.time())
lock_queue.get()
else:
logger.info(passed_time)
time.sleep(1 - passed_time)
requests_queue.put_nowait(time.time())
lock_queue.get()
result = make_api_request(argument) # requests are made too by external module.
return result 逻辑是,正如我所想的那样,因为模块pyTelegramBotAPI使用线程来更快地处理更新,所以所有线程都会检查requests_queue,它的最后时间是3个api_requests,因此将3个请求中的第一个请求的时间与当前时间进行比较(如果超过1秒,则进行检查)。而且,因为我需要确保只有一个线程可以同时执行这种比较,所以我使用了lock_queue。但是,问题是,首先,gunicorn使用5个工作进程,因此总是有可能,来自用户的所有消息都将在不同的进程中处理,并且这些进程将有自己的队列。其次,即使我将工作进程数设置为默认值(1个工作进程),我仍然会得到429个错误,所以我认为,我的代码根本不会像我想要的那样工作。
我想用redis做速率限制,所以每次在每个线程和进程机器人都会检查最后3个请求的时间,但我仍然不确定,这是不是正确的方式,我也不确定,怎么写。
如果有人提出任何想法或代码示例(外部api不提供任何x-rate-limit报头),我都会很高兴
发布于 2017-06-26 09:04:13
写了这个函数,使用redis来计算请求(基于这个https://www.binpress.com/tutorial/introduction-to-rate-limiting-with-redis/155教程)
import redis
r_db = redis.Redis(port=port, db=db)
def limit_request(request_to_make, limit=3, per=1, request_name='test', **kwargs):
over_limit_lua_ = '''
local key_name = KEYS[1]
local limit = tonumber(ARGV[1])
local duration = ARGV[2]
local key = key_name .. '_num_of_requests'
local count = redis.call('INCR', key)
if tonumber(count) > limit then
local time_left = redis.call('PTTL', key)
return time_left
end
redis.call('EXPIRE', key, duration)
return -2
'''
if not hasattr(r_db, 'over_limit_lua'):
r_db.over_limit_lua = r_db.register_script(over_limit_lua_)
request_possibility = int(r_db.over_limit_lua(keys=request_name, args=[limit, per]))
if request_possibility > 0:
time.sleep(request_possibility / 1000.0)
return limit_request(request_to_make, limit, per, request_name, **kwargs)
else:
request_result = request_to_make(**kwargs)
return request_resulthttps://stackoverflow.com/questions/44742416
复制相似问题