
1、前言
在 Python 开发中,处理大量 HTTP 请求时,传统的Requests库因串行执行效率低下,难以满足高并发场景需求。而GRequests库通过将Requests与Gevent的协程能力结合,实现了 HTTP 请求的异步并发,大幅提升了请求处理速度。
2、简介
GRequests 是一个结合了 Requests 和 Gevent 的 Python 库,旨在简化异步 HTTP请求的实现。
核心功能:
使用pip安装:
pip install grequestsGithub地址:
https://github.com/spyoungtech/grequests
3、快速上手
1、基本用法:
创建多个请求并批量发送,获取响应列表。
import grequests
urls = ['http://httpbin.org', 'http://www.heroku.com']
rs = (grequests.get(u) for u in urls)
responses = grequests.map(rs) # 并发发送请求2、异常处理:
自定义处理器捕获请求失败。
def exception_handler(request, exception):
print(f"Request to {request.url} failed: {exception}")
reqs = [grequests.get('http://fakedomain/', timeout=0.1)]
grequests.map(reqs, exception_handler=exception_handler)3、生成器式处理:
使用 imap() 逐一生成响应(无序)。
for resp in grequests.imap((grequests.get(u) for u in urls), size=5):
print(resp.status_code)4、应用场景:
(1)批量数据采集与爬虫
import grequests
urls = [f"https://example.com/page{i}" for i in range(1, 50)]
reqs = (grequests.get(url) for url in urls)
responses = grequests.map(reqs, size=10) # 控制并发数为10
for resp in responses:
if resp:
print(f"{resp.url}: {resp.status_code}")(2)API批量调用与集成
import grequests
ips = ["8.8.8.8", "1.1.1.1", "208.67.222.222"]
reqs = [grequests.get(f"http://ip-api.com/json/{ip}") for ip in ips]
results = grequests.map(reqs)
for res in results:
if res:
data = res.json()
print(f"{data['query']}: {data['country']}")(3)监控与健康检查
import grequests
services = [
"https://api.serviceA.com/health",
"https://api.serviceB.com/ping",
"https://api.serviceC.com/status"
]
def handle_exception(req, exc):
returnf"Failed to check {req.url}: {str(exc)}"
reqs = (grequests.get(url, timeout=2) for url in services)
statuses = grequests.map(reqs, exception_handler=handle_exception)
for status in statuses:
print(status if isinstance(status, str) elsef"{status.url} is OK")(4)并发文件下载
import grequests
import os
image_urls = [
"https://example.com/img1.jpg",
"https://example.com/img2.png",
"https://example.com/img3.gif"
]
def save_image(resp):
if resp:
filename = os.path.basename(resp.url)
with open(filename, "wb") as f:
f.write(resp.content)
print(f"Saved {filename}")
reqs = [grequests.get(url, callback=save_image) for url in image_urls]
grequests.map(reqs)(5)负载测试与压力测试
import grequests
import time
target_url = "https://api.target.com/endpoint"
concurrency = 50 # 并发数
total_requests = 1000 # 总请求数
start = time.time()
reqs = (grequests.post(target_url, json={"data": i}) for i in range(total_requests))
grequests.map(reqs, size=concurrency)
print(f"Completed {total_requests} requests in {time.time() - start:.2f}s")本文分享自 AllTests软件测试 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!