spider_job_service.py 3.42 KB
import asyncio
from datetime import datetime
import logging
from typing import List, Any
from app.model.mysql_model import SpiderModel
from superstream import Stream
from app.job.job import scheduler
from app.schemas.spider_schema import (
    ApschedulerJob,
    TaskInfo,
    SpiderParams,
    GuoDuoSpiderResult,
)
from app.spider.guduo_spider import startBrowser

logger = logging.getLogger(__name__)
jobs: List[ApschedulerJob] = []


def add_job(params: List[Any], taskInfo: TaskInfo, task: Any):
    cron = taskInfo.executeStrategy
    cron_parameterization = cron.split(" ")
    minute = cron_parameterization[1]
    minute = None if minute == "?" else minute
    hour = cron_parameterization[2]
    hour = None if hour == "?" else hour
    day = cron_parameterization[3]
    day = None if day == "?" else day
    month = cron_parameterization[4]
    month = None if month == "?" else month
    weekend = cron_parameterization[5]
    weekend = None if weekend == "?" else weekend
    job = scheduler.add_job(
        task,
        "cron",
        minute=minute,
        hour=hour,
        day=day,
        month=month,
        week=weekend,
        args=params,
    )
    jobs.append(ApschedulerJob(jobId=job, taskId=taskInfo.taskId))
    logger.info(
        f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}"
    )


def add_guduo_job(taskParam: SpiderParams, taskInfo: TaskInfo):
    """
    添加骨朵怕从任务到调度器中
    :param taskParam: 骨朵爬虫参数
    :param taskInfo: 任务信息
    :return:
    """
    add_job([taskParam], taskInfo, scrawl_and_save)


def delete_job(taskId: int):
    for job in jobs:
        if job.taskId == taskId:
            job.jobId.remove()
            jobs.remove(job)
    logger.info(f"删除任务成功,任务id:{taskId}")


def get_job_info(taskId: int):
    job = Stream(jobs).filter(lambda x: x.taskId == taskId).find_first()
    return f"job 信息->{job}"


async def scrawl_and_save(taskParam: SpiderParams):
    try:
        # 执行爬虫获取结果 给下面一行代码添加 try  cache try 捕获异常
        results = await startBrowser(taskParam)
    except Exception as e:
        logger.info(f"爬虫重试情况:{startBrowser.statistics}")
        logger.error(f"爬虫任务执行失败,失败原因:{e}")
        return
    asyncTasks = (save_or_update(item) for item in results)
    await asyncio.gather(*asyncTasks)
    logger.info(f"爬虫任务执行完成,爬取到数据{len(results)}条 保存到数据库完成")


async def save_or_update(result: GuoDuoSpiderResult):
    time = result.time
    targetType = result.targetType
    platform = result.platform
    sourceName = result.sourceName
    score = result.score
    createTime = result.createTime
    # 使用SpiderModel 查询 time targetType platform sourceName 等于以上的值是否存在如果存在就更新不存在就插入
    obj = await SpiderModel.get_or_none(
        time=time, targetType=targetType, platform=platform, sourceName=sourceName
    )
    if obj:
        obj.updateTime = datetime.now()
        for key, value in result.__dict__.items():
            setattr(obj, key, value)
    else:
        obj = await SpiderModel.create(
            time=time,
            targetType=targetType,
            platform=platform,
            sourceName=sourceName,
            score=score,
            createTime=createTime,
            updateTime=datetime.now(),
        )
    await obj.save()