spider_router.py 1.57 KB
# router/users.py
import logging
from typing import Dict, Any
from fastapi import APIRouter, Path
from app.schemas.spider_schema import SpiderParams, TaskInfo
from app.service.spider_job_service import add_job, get_job_info, add_guduo_job
from app.spider import guduo_spider

logger = logging.getLogger(__name__)
spiderRouter = APIRouter(prefix="/spider", tags=["spider"])


@spiderRouter.post("/start/", summary="启动一个爬虫在后台运行")
async def read_users(param: SpiderParams):
    logger.info(f"开始执行怕从参数是:{param}")
    res = await guduo_spider.startBrowser(param)
    logger.info("爬取数据完成")
    return res


@spiderRouter.post("/add_guduo_job", summary="添加一个骨朵爬虫定时任务")
async def add_task_to_job(param: Dict[str, Any]):
    jobParam = SpiderParams(**(param.get('taskParam')))
    taskInfo = TaskInfo(**(param.get('taskInfo')))
    add_guduo_job(jobParam, taskInfo)
    return '添加骨朵爬虫任务成功'


@spiderRouter.get("/add_job/{taskId}", summary="添加一个定时任务")
async def add_test_job(taskId: int = Path(..., description="要查询的任务的唯一标识符")):
    async def test(name: str):
        logger.info(f"测试定时任务执行->{name}")

    taskInfo = TaskInfo(taskId=taskId, taskStart=True,
                        executeStrategy='0 0/1 * * * ?',
                        url='www.baidu.com', taskMd5='test')
    add_job(['test'], taskInfo, test)


@spiderRouter.get("/get_job/{taskId}", summary="获取一个定时任务")
async def get_job(taskId: int = Path(...)):
    return get_job_info(taskId)