spider_job_service.py 3.09 KB
import asyncio
import logging
from typing import List, Any
from app.model.mysql_model import SpiderModel
from superstream import Stream

from app.job.job import scheduler
from app.schemas.spider_schema import ApschedulerJob, TaskInfo, SpiderParams, GuoDuoSpiderResult
from app.spider.guduo_spider import startBrowser

logger = logging.getLogger(__name__)
jobs: List[ApschedulerJob] = []


def add_job(params: List[Any], taskInfo: TaskInfo, task: Any):
    cron = taskInfo.executeStrategy
    cron_parameterization = cron.split(" ")
    minute = cron_parameterization[1]
    minute = (None if minute == "?" else minute)
    hour = cron_parameterization[2]
    hour = (None if hour == "?" else hour)
    day = cron_parameterization[3]
    day = (None if day == "?" else day)
    month = cron_parameterization[4]
    month = (None if month == "?" else month)
    weekend = cron_parameterization[5]
    weekend = (None if weekend == "?" else weekend)
    job = scheduler.add_job(task, "cron",
                            minute=minute, hour=hour, day=day, month=month, week=weekend,
                            args=params)
    jobs.append(ApschedulerJob(jobId=job, taskId=taskInfo.taskId))
    logger.info(
        f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}")


def add_guduo_job(taskParam: SpiderParams, taskInfo: TaskInfo):
    """
    添加骨朵怕从任务到调度器中
    :param taskParam: 骨朵爬虫参数
    :param taskInfo: 任务信息
    :return:
    """
    add_job([taskParam], taskInfo, scrawl_and_save)


def delete_job(taskId: int):
    for job in jobs:
        if job.taskId == taskId:
            job.jobId.remove()
            jobs.remove(job)
    logger.info(f"删除任务成功,任务id:{taskId}")


def get_job_info(taskId: int):
    job = (Stream(jobs)
           .filter(lambda x: x.taskId == taskId)
           .find_first())
    return f'job 信息->{job}'


async def scrawl_and_save(taskParam: SpiderParams):
    # 执行爬虫获取结果
    results = await startBrowser(taskParam)
    asyncTasks = (save_or_update(item) for item in results)
    await asyncio.gather(*asyncTasks)
    logger.info(f'爬虫任务执行完成,爬取到数据{len(results)}条 保存到数据库完成')


async def save_or_update(result: GuoDuoSpiderResult):
    time = result.time
    targetType = result.targetType
    platform = result.platform
    sourceName = result.sourceName
    score = result.score
    createTime = result.createTime
    # 使用SpiderModel 查询 time targetType platform sourceName 等于以上的值是否存在如果存在就更新不存在就插入
    obj = await SpiderModel.get_or_none(
        time=time,
        targetType=targetType,
        platform=platform,
        sourceName=sourceName
    )
    if obj:
        for key, value in result.__dict__.items():
            setattr(obj, key, value)
    else:
        obj = await SpiderModel.create(
            time=time,
            targetType=targetType,
            platform=platform,
            sourceName=sourceName,
            score=score,
            createTime=createTime
        )
    await obj.save()