前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过Django管理周期性任务-feapder爬虫

通过Django管理周期性任务-feapder爬虫

作者头像
IT不难
发布2023-11-17 13:43:07
2460
发布2023-11-17 13:43:07
举报
文章被收录于专栏:IT不难技术家园

前言

一开始感兴趣的信息比较少,直接用crontab启动就满足要求了。后台爬虫越来越多,有的爬虫早就失效了,也没发现。用了 feapder 作者的管理系统 feaplat 。系统功能很全面,但是随着功能的完善,价格也越来越贵。个人实在承担不起,只能花时间自己搞一个简易版的了。

功能实现

模型设计

代码语言:javascript
复制
# Create your models here.
class SpiderInfo(models.Model):
    '''
    爬虫项目信息类
    '''
    id = models.AutoField(primary_key=True)
    sname = models.CharField(max_length=64, verbose_name='项目名称')
    filepath   = models.CharField(max_length=64, verbose_name='项目路径')
    workpath   = models.CharField(max_length=64, verbose_name='工作路径')
    image    = models.CharField(max_length=64, verbose_name='镜像', default='feapder23')
    shm = models.CharField(max_length=32, default='64M', verbose_name='虚拟内存')
    addtime = models.DateTimeField(auto_now_add=True, verbose_name='添加时间')
    snote = models.CharField(max_length=255, verbose_name='说明')

    # admin显示订单的id
    def __str__(self):
        return self.sname

    class Meta:
        db_table = 'spider_info'
        verbose_name = '爬虫项目'
        verbose_name_plural = '爬虫项目'

# Create your models here.
class SpiderTask(models.Model):
    '''
    爬虫任务管理类
    '''
    id = models.AutoField(primary_key=True)
    status_choices = ((k, v) for k,v in STATUS_CHOICE.items())
    sname = models.CharField(max_length=64, unique=True,verbose_name='任务名')
    snote =  models.CharField(max_length=128, blank=True,verbose_name='说明')
    command = models.CharField(max_length=128, verbose_name='启动命令')
    status = models.SmallIntegerField(default='0', choices=status_choices, verbose_name='调度状态')
    crond = models.CharField(max_length=32, blank=True, verbose_name='计划任务')
    addtime = models.DateTimeField(auto_now_add=True, verbose_name='添加时间')
    runtime = models.DateTimeField(auto_now=True, blank=True, verbose_name='最后运行时间')
    total = models.IntegerField(default=0, verbose_name='总数')
    repeat = models.IntegerField(default=0, verbose_name='重复')
    valid = models.IntegerField(default=0, verbose_name='入库')
    logpath = models.CharField(max_length=128, blank=True, verbose_name='日志')
    sinfo = models.ForeignKey(SpiderInfo, on_delete=models.CASCADE, default='')

    # admin显示订单的id
    def __str__(self):
        return self.sname

    class Meta:
        db_table = 'spider_task'
        verbose_name = '爬虫任务'
        verbose_name_plural = '爬虫任务'

后台管理

代码语言:javascript
复制
# Register your models here.
class SpiderInfofAdmin(admin.ModelAdmin):
    #后台展示字段
    list_display = ['id', 'sname', 'filepath', 'workpath', 'image', 'addtime', 'snote']

    #搜索字段
    search_fields = ['sname']

class SpiderTaskAdmin(admin.ModelAdmin):
    #后台展示字段
    list_display = ['id', 'sname', 'snote', 'addtime', 'runtime_show', 'total', 'repeat', 'valid', 'status_colored', 'operate']

    #过滤字段
    list_filter =  ["status"]

    #搜索字段
    search_fields = ['sname']

    #只读字段
    readonly_fields =  ['id', 'addtime', 'runtime', 'total', 'repeat', 'valid', 'status']

    #自定义动作
    actions = ['schedule_switch']

异步任务

  1. 项目主settings.py添加内容
代码语言:javascript
复制
INSTALLED_APPS = [
    # 略
    'django_celery_beat',
    #略
]

# Celery配置
# BROKER和BACKEND配置,这里用了本地的redis,其中1和2表示分别用redis的第一个和第二个db
CELERY_BROKER_URL = 'redis://172.17.0.10:6379/1'
CELERY_RESULT_BACKEND = 'redis://172.17.0.10:6379/2'

# CELERY 时间
CELERY_TIMEZONE = TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False

#指定任务接收的内容序列化类型
CELERY_ACCEPT_CONTENT = ['application/json']

#任务和任务结果序列化方式
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

#超过时间
CELERY_TASK_RESULT_EXPIRES = 12 * 30

#是否压缩
CELERY_MESSAGE_COMPRESSION = 'zlib'

#并发数默
CELERYD_CONCURRENCY = 2

#celery worker 每次去redis取任务的数量认已CPU数量定
CELERYD_PREFETCH_MULTIPLIER = 2

#每个worker最多执行3个任务就摧毁,避免内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 3

#可以防止死锁
CELERYD_FORCE_EXECV = True

#celery 关闭UTC时区
CELERY_ENABLE_UTC = False

#celery 并发数设置,最多可以有20个任务同时运行
CELERYD_CONCURRENCY = 20
CELERYD_MAX_TASKS_PER_CHILD = 4

#celery开启数据库调度器,数据库修改后即时生效
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

#解决告警
DEFAULT_AUTO_FIELD = 'django.db.models.AutoField'

2.同目录下新增celery.py

代码语言:javascript
复制
import os
from celery import Celery,platforms
from django.conf import settings

# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE','taskmon.settings')

# 实例化
app = Celery('taskmon')

# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')


# 自动从Django的已注册app中发现任务
app.autodiscover_tasks()

#允许root 用户运行celery
platforms.C_FORCE_ROOT = True

# 一个测试任务
@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))

3.修改__init__.py,加载celery配置

代码语言:javascript
复制
#注册celery
from .celery import app as celery_app
__all__ = ('celery_app',)

4.项目目录下tasks.py

代码语言:javascript
复制
#操作docker
from celery import shared_task
from .utils import process_start

@shared_task
def sync_start_process(sname):
    """
    异步执行任务
    """
    process_start(sname)

5.celery启动

代码语言:javascript
复制
#任务调度
celery multi start worker  -A taskmon  -l info  --logfile=/logs/celery_worker.log
celery -A taskmon beat  -l info  --logfile=/logs/celery_beat.log

添加周期任务

代码语言:javascript
复制
def add_celery_task(sid):
    """
    新增计划任务
    sid : 爬虫任务ID
    """
    stask = SpiderTask.objects.get(id=sid)
    cname = str(stask.id) + '-' + '周期任务'

    c_arry = stask.crond.strip().split()
    print(c_arry)

    #添加计划任务
    with transaction.atomic():
        save_id = transaction.savepoint()
        try:
            _c, created = CrontabSchedule.objects.get_or_create(
                minute=str(c_arry[0]),
                hour=str(c_arry[1]),
                day_of_week=str(c_arry[2]),
                day_of_month=str(c_arry[3]),
                month_of_year=str(c_arry[4])
            )

            _p = PeriodicTask.objects.create(
                name= cname,
                task='spider.tasks.sync_start_process',
                args='["{}"]'.format(stask.sname),
                enabled=True,
                crontab=_c
            )

            print('{}计划任务添加成功'.format(cname))
            return True
        except Exception as e:
            transaction.savepoint_rollback(save_id)
            print('{}添加计划任务失败,错误原因:'.format(cname) + str(e))
            return False

删除周期任务

代码语言:javascript
复制
def remove_celery_task(sid):
    """
    删除计划任务
    sid : 爬虫任务ID
    """
    cname = str(sid) + '-' + '周期任务'
    #添加计划任务
    with transaction.atomic():
        save_id = transaction.savepoint()
        try:
            _p = PeriodicTask.objects.get(name=cname)
            if _p:
                _p.delete()
                print('{}删除计划任务成功'.format(cname))
            return True
        except Exception as e:
            transaction.savepoint_rollback(save_id)
            print('{}删除计划任务失败,错误原因:'.format(cname) + str(e))
            return False

任务启动函数

代码语言:javascript
复制
def process_start(sname):
    """
    执行任务并处理返回结果
    sname: 任务名
    cinfo: 启动容器所需的信息
    """
    con_name = 'spider_{}_1'.format(sname)
    containers = get_containers({"name":con_name})

    if containers:
        print('有相同任务运行中...|{}|{}'.format(con_name, datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")))
        return False

    #查询库
    spider_task = SpiderTask.objects.get(sname=sname)

    #构建docker启动信息
    cinfo = {
            "name": con_name,
            "command": spider_task.command,
            #宿主机目录
            "volumes": ['/opt/project/taskmon/myapp/spider/{}:{}'.format(spider_task.sinfo.filepath, spider_task.sinfo.workpath),],
            "shm_size": spider_task.sinfo.shm,
            "image": spider_task.sinfo.image,
            "working_dir": spider_task.sinfo.workpath,
            "remove": False,
        }

    #启动容器
    result = run_container(cinfo)

    if result:
        #日志文件
        log_path='/logs/{}_{}.log'.format(con_name, datetime.datetime.now().strftime("%H-%M-%S-%Y-%m-%d"))

        #保存日志
        with open(log_path, 'wb') as fw:
            fw.write(result)

        #采集结果
        d_nums = process_result(result)

        #更新
        spider_task.total = d_nums[0]
        spider_task.repeat = d_nums[1]
        spider_task.valid = d_nums[2]
        spider_task.logpath = log_path
        spider_task.save()

    print('任务执行...|{}|{}'.format(con_name, datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")))

容器启动函数

代码语言:javascript
复制
def run_container(ddict):
    """
    运行容器
    """
    #print(ddict)
    container = client.containers.run(
        image=ddict['image'],
        name=ddict['name'],
        shm_size=ddict['shm_size'],
        volumes=ddict['volumes'],
        working_dir=ddict['working_dir'],
        remove=ddict['remove'],
        detach=True,
        command=ddict['command']
    )
    container.wait()
    result = container.logs()
    container.remove()
    return result

结果处理函数

代码语言:javascript
复制
def process_result(result):
    """
    处理返回结果
    """
    a = 0
    b = 0
    c = 0
    lines = str(result, encoding = "utf-8").split('\n')
    for line in lines:
        if '待入库数据' in line:
            tmp_s = line.split('|')[3]
            nums = tmp_s.split(' ')
            a += int(nums[2])
            b += int(nums[5])
            c += int(nums[7])
    return (a, b, c)

相关文章

Django后台展示(一) Django后台展示(二)

FAQ

自动添加周期任务后,启动报错

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023年02月21日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 功能实现
    • 模型设计
      • 后台管理
        • 异步任务
          • 添加周期任务
            • 删除周期任务
              • 任务启动函数
                • 容器启动函数
                  • 结果处理函数
                  • 相关文章
                  • FAQ
                    • 自动添加周期任务后,启动报错
                    相关产品与服务
                    容器服务
                    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档