我试图通过使用redis队列来完成简单的长时间运行任务,但是每次我都会出现超时错误,即使我在job = q.enqueue(run_scraper, temp_file, job_timeout=16600)
中增加了超时值,但不管它给我带来了什么超时错误。
回溯:
01:17:18 Traceback (most recent call last):
File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/worker.py", line 1061, in perform_job
rv = job.perform()
File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 821, in perform
self._result = self._execute()
File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 847, in _execute
coro_result = loop.run_until_complete(result)
File "/usr/lib/python3.9/asyncio/base_events.py", line 634, in run_until_complete
self.run_forever()
File "/usr/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
self._run_once()
File "/usr/lib/python3.9/asyncio/base_events.py", line 1869, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.9/selectors.py", line 469, in select
fd_event_list = self._selector.poll(timeout, max_ev)
File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/timeouts.py", line 63, in handle_death_penalty
raise self._exception('Task exceeded maximum timeout value '
rq.timeouts.JobTimeoutException: Task exceeded maximum timeout value (16600 seconds)
Traceback (most recent call last):
File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/worker.py", line 1061, in perform_job
rv = job.perform()
File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 821, in perform
self._result = self._execute()
File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 847, in _execute
coro_result = loop.run_until_complete(result)
File "/usr/lib/python3.9/asyncio/base_events.py", line 634, in run_until_complete
self.run_forever()
File "/usr/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
self._run_once()
File "/usr/lib/python3.9/asyncio/base_events.py", line 1869, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.9/selectors.py", line 469, in select
fd_event_list = self._selector.poll(timeout, max_ev)
File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/timeouts.py", line 63, in handle_death_penalty
raise self._exception('Task exceeded maximum timeout value '
rq.timeouts.JobTimeoutException: Task exceeded maximum timeout value (16600 seconds)
FastAPI代码:
import fastapi as _fastapi
from fastapi.responses import HTMLResponse, FileResponse, Response
from starlette.requests import Request
from starlette.templating import Jinja2Templates
import shutil
import os
import json
from rq import Queue
from rq.job import Job
from redis import Redis
from scraper import run_scraper
from utils import clean_file, csv_writer
app = _fastapi.FastAPI()
r = Redis(
host="localhost",
port=6379,
db=0,
)
q = Queue(connection=r)
templates = Jinja2Templates("templates")
@app.get("/")
def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/v1/scraped_csv")
async def extract_ads(csv_file: _fastapi.UploadFile = _fastapi.File(...)):
temp_file = _save_file_to_disk(csv_file, path="temp", save_as="temp")
job = q.enqueue(run_scraper, temp_file, job_timeout=16600)
return {"message": "Scraping has been started", "job_id": job.id}
@app.get("/progress/{job_id}")
def progress(job_id):
job = Job.fetch(job_id, connection=r)
if job.is_finished:
csv_path = os.path.abspath(clean_file)
return FileResponse(path=csv_path, media_type="text/csv", filename=clean_file)
return {"message": "Scraper is running."}
def _save_file_to_disk(uploaded_file, path=".", save_as="default"):
extension = os.path.splitext(uploaded_file.filename)[-1]
temp_file = os.path.join(path, save_as + extension)
with open(temp_file, "wb") as buffer:
shutil.copyfileobj(uploaded_file.file, buffer)
return temp_file
我是新的集成redis队列和刮擦,所以任何关于解决/处理超时错误的指导都将不胜感激。
发布于 2022-06-10 15:44:41
事实证明,python-rq
不直接支持异步,所以我必须将异步功能封装为同步功能,以便启动并运行它。
import httpx
import asyncio
from bs4 import BeautifulSoup
from decouple import config
from urllib.parse import urlencode
from urllib.parse import urlparse
import os
import logging
from utils import csv_reader, csv_writer
SCRAPERAPI_KEY = config("API_KEY")
NUM_RETRIES = 3
logging.basicConfig(filename="scraper.log",
format='%(asctime)s %(message)s',
filemode='w')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
base_url = "https://www.google.com/search?"
headers = {
"authority": "www.google.com",
"sec-ch-dpr": "1",
"sec-ch-viewport-width": "1366",
"sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="98", "Yandex";v="22"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-full-version": '"22.3.3.886"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-platform": '"Linux"',
"sec-ch-ua-platform-version": '"5.4.0"',
"sec-ch-ua-model": '""',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-full-version-list": '" Not A;Brand";v="99.0.0.0", "Chromium";v="98.0.4758.886", "Yandex";v="22.3.3.886"',
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.141 Safari/537.36",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "navigate",
"sec-fetch-user": "?1",
"sec-fetch-dest": "document",
"referer": "https://www.google.com/",
"accept-language": "en,ru;q=0.9",
}
pagination_params = {
"q": "lift installation",
"source": "lnms",
"tbm": "shop",
"ei": "aW-HYv74MrCOseMP_8OumAY",
"start": "",
"sa": "X",
"ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
"biw": "480",
"bih": "665",
"dpr": "1",
}
initial_params = {
"q": "",
"source": "lnms",
"tbm": "shop",
"sa": "X",
"ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
"biw": "480",
"bih": "665",
"dpr": "1",
}
results = []
def get_scraperapi_url(url):
payload = {
"api_key": SCRAPERAPI_KEY,
"url": url,
"country_code": "au",
"keep_headers": "true",
}
proxy_url = "http://api.scraperapi.com/?" + urlencode(payload)
return proxy_url
async def log_request(request):
logger.debug(f"Request: {request.method} {request.url}")
async def log_response(response):
request = response.request
logger.debug(f"Response: {request.method} {request.url} - Status: {response.status_code}")
async def fetch_pages(keyword, page_no):
initial_params["q"] = keyword
if not page_no:
params = initial_params
url = base_url + urlencode(params)
else:
params = pagination_params
params["start"] = str(page_no * 10)
params["q"] = keyword
url = base_url + urlencode(params)
async with httpx.AsyncClient(
event_hooks={"request": [log_request], "response": [log_response]}
) as client:
response = await client.get(
get_scraperapi_url(url), headers=headers, timeout=None
)
return response
async def parse_page(html):
ad_urls = []
content = BeautifulSoup(html, "lxml")
for ad in content.find_all("a", {"class": "sh-np__click-target"}):
try:
async with httpx.AsyncClient() as client:
r = await client.get(
"https://www.google.com" + ad["href"], headers=headers, timeout=None
)
url = str(r.url)
ad_urls.append(urlparse(url).netloc)
logger.debug(urlparse(url).netloc)
except:
pass
for idx in range(len(ad_urls)):
results.append({"Ad_Url": ad_urls[idx]})
return results
async def run_scraper(file_path):
tasks = []
kw = await csv_reader(file_path)
for k in kw:
for page in range(0, 4):
for _ in range(NUM_RETRIES):
try:
response = await fetch_pages(k, page)
if response.status_code in [200, 404]:
break
except httpx.ConnectError:
response = ""
if response.status_code == 200:
tasks.append(asyncio.create_task(parse_page(response.content)))
ad_data = await asyncio.gather(*tasks)
logger.info('Done!')
await csv_writer(ad_data[0])
logger.info('csv created.. Please refresh the page to download the csv.')
return ad_data[0]
def get_google_ad_urls(file_path):
asyncio.run(run_scraper(file_path))
https://stackoverflow.com/questions/72398949
复制相似问题