首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >rq.timeouts.JobTimeoutException:任务超过最大超时值(16600秒)

rq.timeouts.JobTimeoutException:任务超过最大超时值(16600秒)
EN

Stack Overflow用户
提问于 2022-05-26 23:16:24
回答 1查看 312关注 0票数 0

我试图通过使用redis队列来完成简单的长时间运行任务,但是每次我都会出现超时错误,即使我在job = q.enqueue(run_scraper, temp_file, job_timeout=16600)中增加了超时值,但不管它给我带来了什么超时错误。

回溯:

代码语言:javascript
运行
复制
01:17:18 Traceback (most recent call last):
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/worker.py", line 1061, in perform_job
    rv = job.perform()
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 821, in perform
    self._result = self._execute()
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 847, in _execute
    coro_result = loop.run_until_complete(result)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 634, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
    self._run_once()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 1869, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.9/selectors.py", line 469, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/timeouts.py", line 63, in handle_death_penalty
    raise self._exception('Task exceeded maximum timeout value '
rq.timeouts.JobTimeoutException: Task exceeded maximum timeout value (16600 seconds)
Traceback (most recent call last):
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/worker.py", line 1061, in perform_job
    rv = job.perform()
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 821, in perform
    self._result = self._execute()
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 847, in _execute
    coro_result = loop.run_until_complete(result)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 634, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
    self._run_once()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 1869, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.9/selectors.py", line 469, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/timeouts.py", line 63, in handle_death_penalty
    raise self._exception('Task exceeded maximum timeout value '
rq.timeouts.JobTimeoutException: Task exceeded maximum timeout value (16600 seconds)

FastAPI代码:

代码语言:javascript
运行
复制
import fastapi as _fastapi
from fastapi.responses import HTMLResponse, FileResponse, Response
from starlette.requests import Request
from starlette.templating import Jinja2Templates
import shutil
import os
import json

from rq import Queue
from rq.job import Job

from redis import Redis

from scraper import run_scraper
from utils import clean_file, csv_writer

app = _fastapi.FastAPI()

r = Redis(
    host="localhost",
    port=6379,
    db=0,
)
q = Queue(connection=r)

templates = Jinja2Templates("templates")


@app.get("/")
def index(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})


@app.post("/api/v1/scraped_csv")
async def extract_ads(csv_file: _fastapi.UploadFile = _fastapi.File(...)):
    temp_file = _save_file_to_disk(csv_file, path="temp", save_as="temp")
    job = q.enqueue(run_scraper, temp_file, job_timeout=16600)

    return {"message": "Scraping has been started", "job_id": job.id}


@app.get("/progress/{job_id}")
def progress(job_id):
    job = Job.fetch(job_id, connection=r)
    if job.is_finished:
        csv_path = os.path.abspath(clean_file)
        return FileResponse(path=csv_path, media_type="text/csv", filename=clean_file)
    return {"message": "Scraper is running."}


def _save_file_to_disk(uploaded_file, path=".", save_as="default"):
    extension = os.path.splitext(uploaded_file.filename)[-1]
    temp_file = os.path.join(path, save_as + extension)
    with open(temp_file, "wb") as buffer:
        shutil.copyfileobj(uploaded_file.file, buffer)
    return temp_file

我是新的集成redis队列和刮擦,所以任何关于解决/处理超时错误的指导都将不胜感激。

EN

回答 1

Stack Overflow用户

发布于 2022-06-10 15:44:41

事实证明,python-rq不直接支持异步,所以我必须将异步功能封装为同步功能,以便启动并运行它。

代码语言:javascript
运行
复制
import httpx
import asyncio
from bs4 import BeautifulSoup
from decouple import config
from urllib.parse import urlencode
from urllib.parse import urlparse
import os
import logging

from utils import csv_reader, csv_writer

SCRAPERAPI_KEY = config("API_KEY")
NUM_RETRIES = 3

logging.basicConfig(filename="scraper.log",
                    format='%(asctime)s %(message)s',
                    filemode='w')

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


base_url = "https://www.google.com/search?"

headers = {
    "authority": "www.google.com",
    "sec-ch-dpr": "1",
    "sec-ch-viewport-width": "1366",
    "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="98", "Yandex";v="22"',
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-full-version": '"22.3.3.886"',
    "sec-ch-ua-arch": '"x86"',
    "sec-ch-ua-platform": '"Linux"',
    "sec-ch-ua-platform-version": '"5.4.0"',
    "sec-ch-ua-model": '""',
    "sec-ch-ua-bitness": '"64"',
    "sec-ch-ua-full-version-list": '" Not A;Brand";v="99.0.0.0", "Chromium";v="98.0.4758.886", "Yandex";v="22.3.3.886"',
    "upgrade-insecure-requests": "1",
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.141 Safari/537.36",
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "sec-fetch-site": "same-origin",
    "sec-fetch-mode": "navigate",
    "sec-fetch-user": "?1",
    "sec-fetch-dest": "document",
    "referer": "https://www.google.com/",
    "accept-language": "en,ru;q=0.9",
}

pagination_params = {
    "q": "lift installation",
    "source": "lnms",
    "tbm": "shop",
    "ei": "aW-HYv74MrCOseMP_8OumAY",
    "start": "",
    "sa": "X",
    "ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
    "biw": "480",
    "bih": "665",
    "dpr": "1",
}

initial_params = {
    "q": "",
    "source": "lnms",
    "tbm": "shop",
    "sa": "X",
    "ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
    "biw": "480",
    "bih": "665",
    "dpr": "1",
}

results = []


def get_scraperapi_url(url):
    payload = {
        "api_key": SCRAPERAPI_KEY,
        "url": url,
        "country_code": "au",
        "keep_headers": "true",
    }
    proxy_url = "http://api.scraperapi.com/?" + urlencode(payload)
    return proxy_url


async def log_request(request):
    logger.debug(f"Request: {request.method} {request.url}")


async def log_response(response):
    request = response.request
    logger.debug(f"Response: {request.method} {request.url} - Status: {response.status_code}")


async def fetch_pages(keyword, page_no):
    initial_params["q"] = keyword
    if not page_no:
        params = initial_params
        url = base_url + urlencode(params)
    else:
        params = pagination_params

        params["start"] = str(page_no * 10)

        params["q"] = keyword
        url = base_url + urlencode(params)

    async with httpx.AsyncClient(
        event_hooks={"request": [log_request], "response": [log_response]}
    ) as client:
        response = await client.get(
            get_scraperapi_url(url), headers=headers, timeout=None
        )

        return response


async def parse_page(html):

    ad_urls = []
    content = BeautifulSoup(html, "lxml")

    for ad in content.find_all("a", {"class": "sh-np__click-target"}):
        try:
            async with httpx.AsyncClient() as client:
                r = await client.get(
                    "https://www.google.com" + ad["href"], headers=headers, timeout=None
                )
                url = str(r.url)
                ad_urls.append(urlparse(url).netloc)
                logger.debug(urlparse(url).netloc)
        except:
            pass

    for idx in range(len(ad_urls)):

        results.append({"Ad_Url": ad_urls[idx]})

    return results


async def run_scraper(file_path):
    tasks = []
    kw = await csv_reader(file_path)
    for k in kw:
        for page in range(0, 4):
            for _ in range(NUM_RETRIES):
                try:
                    response = await fetch_pages(k, page)
                    if response.status_code in [200, 404]:
                        break
                except httpx.ConnectError:
                    response = ""
            if response.status_code == 200:
                tasks.append(asyncio.create_task(parse_page(response.content)))

    ad_data = await asyncio.gather(*tasks)

    logger.info('Done!')
    await csv_writer(ad_data[0])
    logger.info('csv created.. Please refresh the page to download the csv.')
    

    return ad_data[0]

def get_google_ad_urls(file_path):
    asyncio.run(run_scraper(file_path))

Source

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72398949

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档