spider_job_service.py
3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import asyncio
import logging
from datetime import datetime
from typing import List, Any
from app.job.job import scheduler
from app.model.mysql_model import SpiderModel
from app.schemas.safe_contrainer import SafeDict
from app.schemas.spider_schema import (
ApschedulerJob,
TaskInfo,
SpiderParams,
GuoDuoSpiderResult, )
from app.spider.http_spider import get_score_data
logger = logging.getLogger(__name__)
jobs: SafeDict[int, ApschedulerJob] = SafeDict()
def add_job(params: List[Any], taskInfo: TaskInfo, task: Any):
cron = taskInfo.executeStrategy
cron_parameterization = cron.split(" ")
minute = cron_parameterization[1]
minute = None if minute == "?" else minute
hour = cron_parameterization[2]
hour = None if hour == "?" else hour
day = cron_parameterization[3]
day = None if day == "?" else day
month = cron_parameterization[4]
month = None if month == "?" else month
weekend = cron_parameterization[5]
weekend = None if weekend == "?" else weekend
job = scheduler.add_job(
task,
"cron",
minute=minute,
hour=hour,
day=day,
month=month,
week=weekend,
args=params,
)
jobs.put(taskInfo.taskId,
ApschedulerJob(jobId=job, taskId=taskInfo.taskId, taskMd5=taskInfo.taskMd5)
)
logger.info(
f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}"
)
def add_guduo_job(taskParam: SpiderParams, taskInfo: TaskInfo):
"""
添加骨朵怕从任务到调度器中
:param taskParam: 骨朵爬虫参数
:param taskInfo: 任务信息
:return:
"""
add_job([taskParam], taskInfo, scrawl_and_save)
def delete_job(taskId: int):
jobs.remove(taskId)
logger.info(f"删除任务成功,任务id:{taskId}")
def get_job_info(taskId: int = None) -> List[ApschedulerJob]:
if taskId is None:
return jobs.values()
return [jobs.get(taskId)] if jobs.get(taskId) else []
async def scrawl_and_save(taskParam: SpiderParams):
try:
# 执行爬虫获取结果 给下面一行代码添加 try cache try 捕获异常
logger.info("开始获取网站数据")
results = await get_score_data(taskParam)
except Exception as e:
logger.error(f"爬虫任务执行失败,失败原因:{e}")
return
logger.info(f"开始保存数据")
asyncTasks = (save_or_update(item) for item in results)
await asyncio.gather(*asyncTasks)
logger.info(f"爬虫任务执行完成,爬取到数据{len(results)}条 保存到数据库完成")
async def save_or_update(result: GuoDuoSpiderResult):
time = result.time
targetType = result.targetType
platform = result.platform
sourceName = result.sourceName
score = result.score
createTime = result.createTime
# 使用SpiderModel 查询 time targetType platform sourceName 等于以上的值是否存在如果存在就更新不存在就插入
obj = await SpiderModel.get_or_none(
time=time, targetType=targetType, platform=platform, sourceName=sourceName
)
if obj:
obj.updateTime = datetime.now()
for key, value in result.__dict__.items():
setattr(obj, key, value)
else:
obj = await SpiderModel.create(
time=time,
targetType=targetType,
platform=platform,
sourceName=sourceName,
score=score,
createTime=createTime,
updateTime=datetime.now(),
)
await obj.save()