job.py 3.5 KB
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Tuple

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from superstream import Stream

from app.config.app_config import getAppConfig
from app.model.mysql_model import XWebCrawler
from app.schemas.spider_schema import ApschedulerJob, SpiderParams, TaskInfo, TypeEnum

logger = logging.getLogger(__name__)

# 导入asynccontextmanager用于创建异步上下文管理器

# 创建一个scheduler实例
scheduler = AsyncIOScheduler()


@scheduler.scheduled_job("interval", minutes=getAppConfig().getSpider.get("interval"))
async def get_spider_config():
    from app.service.spider_job_service import (
        add_guduo_job,
        delete_job,
        get_job_info,
    )

    logger.info("开始获取爬虫配置")
    web_crawlers: List[XWebCrawler] = await XWebCrawler.all()
    if not web_crawlers:
        logger.info("未获取到爬虫任务")
        return
    status_group: Dict[int, List[XWebCrawler]] = Stream(web_crawlers).group_by(
        lambda x: x.status
    )
    active_jobs = status_group.get(1, [])
    if not active_jobs:
        logger.info("未获取到启用的爬虫任务")
        return
    # 获取已经注册的任务
    jobs: List[ApschedulerJob] = get_job_info()
    # 处理已经关闭的任务
    registered_jobs: Dict[int, ApschedulerJob] = Stream(jobs).to_dict(lambda x: x.taskId, lambda y: y)
    for job in status_group.get(0, []):
        if job.id in registered_jobs:
            delete_job(job.id)
            logger.info(f"删除任务成功,任务id:{job.id}")
    # 处理新的定时任务
    for job in status_group.get(1, []):
        # 构建参数
        # 获得今天的日期与昨天的日期 格式是 yyyy-MM-dd
        dates = get_dates()
        types = [TypeEnum.EPISODES, TypeEnum.MOVIE, TypeEnum.ANIME, TypeEnum.VARIETY]
        md5 = calculate_md5(job.id, job.target_url, job.param_info, job.cron)
        # 计算
        param = SpiderParams(
            startDate=dates[1],
            endDate=dates[0],
            target_type=types,
            url=job.target_url,
        )
        taskInfo = TaskInfo(
            taskStart=True,
            executeStrategy=job.cron,
            taskId=job.id,
            url=job.target_url,
            taskMd5=md5,
        )
        if job.id not in registered_jobs:
            add_guduo_job(param, taskInfo)
            logger.info(f"添加任务成功,任务id:{job.id}")
        else:
            exists_jobs: List[ApschedulerJob] = get_job_info(job.id)
            if exists_jobs[0].taskMd5 != md5:
                delete_job(job.id)
                add_guduo_job(param, taskInfo)
                logger.info(f"更新任务成功,任务id:{job.id}")
    logger.info("爬虫配置完成")


def calculate_md5(*fields):
    """
    计算多个字段的MD5值
    :param fields: 多个字段
    :return: MD5值
    """
    md5 = hashlib.md5()
    for field in fields:
        md5.update(str(field).encode("utf-8"))
    return md5.hexdigest()


def get_dates() -> Tuple[str, str]:
    """
    获取今天与昨天的日期 格式是 yyyy-MM-dd
    :return: Tuple[str, str]
    """
    today = datetime.today()
    yesterday = today - timedelta(days=1)
    today_str = today.strftime("%Y-%m-%d")
    yesterday_str = yesterday.strftime("%Y-%m-%d")
    return today_str, yesterday_str


if __name__ == "__main__":
    new_dict = {}
    print(new_dict.get("a"))
    print(hash((1, "b", True)))
    new_dict.update({"a": 1})
    print("a" in new_dict)