spider_router.py
1.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# router/users.py
import logging
from typing import Dict, Any
from fastapi import APIRouter, Path
from app.schemas.spider_schema import SpiderParams, TaskInfo
from app.service.spider_job_service import add_job, get_job_info, add_guduo_job
from app.spider import guduo_spider
logger = logging.getLogger(__name__)
spiderRouter = APIRouter(prefix="/spider", tags=["spider"])
@spiderRouter.post("/start/", summary="启动一个爬虫在后台运行")
async def read_users(param: SpiderParams):
logger.info(f"开始执行怕从参数是:{param}")
res = await guduo_spider.startBrowser(param)
logger.info("爬取数据完成")
return res
@spiderRouter.post("/add_guduo_job", summary="添加一个骨朵爬虫定时任务")
async def add_task_to_job(param: Dict[str, Any]):
jobParam = SpiderParams(**(param.get('taskParam')))
taskInfo = TaskInfo(**(param.get('taskInfo')))
add_guduo_job(jobParam, taskInfo)
return '添加骨朵爬虫任务成功'
@spiderRouter.get("/add_job/{taskId}", summary="添加一个定时任务")
async def add_test_job(taskId: int = Path(..., description="要查询的任务的唯一标识符")):
async def test(name: str):
logger.info(f"测试定时任务执行->{name}")
taskInfo = TaskInfo(taskId=taskId, taskStart=True,
executeStrategy='0 0/1 * * * ?',
url='www.baidu.com')
add_job(['test'], taskInfo, test)
@spiderRouter.get("/get_job/{taskId}", summary="获取一个定时任务")
async def get_job(taskId: int = Path(...)):
return get_job_info(taskId)