job.py
3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from superstream import Stream
from app.config.app_config import getAppConfig
from app.model.mysql_model import XWebCrawler
from app.schemas.spider_schema import ApschedulerJob, SpiderParams, TaskInfo, TypeEnum
logger = logging.getLogger(__name__)
# 导入asynccontextmanager用于创建异步上下文管理器
# 创建一个scheduler实例
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job("interval", minutes=getAppConfig().getSpider.get("interval"))
async def get_spider_config():
from app.service.spider_job_service import (
add_guduo_job,
delete_job,
get_job_info,
)
logger.info("开始获取爬虫配置")
web_crawlers: List[XWebCrawler] = await XWebCrawler.all()
if not web_crawlers:
logger.info("未获取到爬虫任务")
return
status_group: Dict[int, List[XWebCrawler]] = Stream(web_crawlers).group_by(
lambda x: x.status
)
active_jobs = status_group.get(1, [])
if not active_jobs:
logger.info("未获取到启用的爬虫任务")
return
# 获取已经注册的任务
jobs: List[ApschedulerJob] = get_job_info()
# 处理已经关闭的任务
registered_jobs: Dict[int, ApschedulerJob] = Stream(jobs).to_dict(lambda x: x.taskId, lambda y: y)
for job in status_group.get(0, []):
if job.id in registered_jobs:
delete_job(job.id)
logger.info(f"删除任务成功,任务id:{job.id}")
# 处理新的定时任务
for job in status_group.get(1, []):
# 构建参数
# 获得今天的日期与昨天的日期 格式是 yyyy-MM-dd
dates = get_dates()
types = [TypeEnum.EPISODES, TypeEnum.MOVIE, TypeEnum.ANIME, TypeEnum.VARIETY]
md5 = calculate_md5(job.id, job.target_url, job.param_info, job.cron)
# 计算
param = SpiderParams(
startDate=dates[1],
endDate=dates[0],
target_type=types,
url=job.target_url,
)
taskInfo = TaskInfo(
taskStart=True,
executeStrategy=job.cron,
taskId=job.id,
url=job.target_url,
taskMd5=md5,
)
if job.id not in registered_jobs:
add_guduo_job(param, taskInfo)
logger.info(f"添加任务成功,任务id:{job.id}")
else:
exists_jobs: List[ApschedulerJob] = get_job_info(job.id)
if exists_jobs[0].taskMd5 != md5:
delete_job(job.id)
add_guduo_job(param, taskInfo)
logger.info(f"更新任务成功,任务id:{job.id}")
logger.info("爬虫配置完成")
def calculate_md5(*fields):
"""
计算多个字段的MD5值
:param fields: 多个字段
:return: MD5值
"""
md5 = hashlib.md5()
for field in fields:
md5.update(str(field).encode("utf-8"))
return md5.hexdigest()
def get_dates() -> Tuple[str, str]:
"""
获取今天与昨天的日期 格式是 yyyy-MM-dd
:return: Tuple[str, str]
"""
today = datetime.today()
yesterday = today - timedelta(days=1)
today_str = today.strftime("%Y-%m-%d")
yesterday_str = yesterday.strftime("%Y-%m-%d")
return today_str, yesterday_str
if __name__ == "__main__":
new_dict = {}
print(new_dict.get("a"))
print(hash((1, "b", True)))
new_dict.update({"a": 1})
print("a" in new_dict)