job.py
3.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from datetime import datetime, timedelta
import hashlib
import logging
from typing import Dict, List, Tuple
from app.model.mysql_model import XWebCrawler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from superstream import Stream
from app.config.app_config import getAppConfig
from app.schemas.spider_schema import ApschedulerJob, SpiderParams, TaskInfo, TypeEnum
logger = logging.getLogger(__name__)
# 导入asynccontextmanager用于创建异步上下文管理器
# 创建一个scheduler实例
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job("interval", minutes=getAppConfig().getSpider.get("interval"))
async def get_spider_config():
from app.service.spider_job_service import (
add_job,
add_guduo_job,
delete_job,
get_job_info,
)
logger.info("开始获取爬虫配置")
web_crawlers: List[XWebCrawler] = await XWebCrawler.all()
if not web_crawlers:
logger.info("未获取到爬虫任务")
return
status_group: Dict[int, List[XWebCrawler]] = Stream(web_crawlers).group_by(
lambda x: x.status
)
active_jobs = status_group.get(1, [])
if not active_jobs:
logger.info("未获取到启用的爬虫任务")
return
# 获取已经注册的任务
jobs: List[ApschedulerJob] = get_job_info()
# 处理已经关闭的任务
regist_jobs: Dict[int, ApschedulerJob] = Stream(jobs).group_by(lambda x: x.taskId)
for job in status_group.get(0, []):
if job.id in regist_jobs:
delete_job(job.id)
logger.info(f"删除任务成功,任务id:{job.id}")
# 处理新的定时任务
for job in status_group.get(1, []):
# 构建参数
# 获得今天的日期与昨天的日期 格式是 yyyy-MM-dd
dates = get_dates()
types = [TypeEnum.EPISODES, TypeEnum.MOVIE, TypeEnum.ANIME, TypeEnum.VARIETY]
md5 = calculate_md5(job.id, job.target_url, job.param_info, job.cron)
# 计算
param = SpiderParams(
startDate=dates[1],
endDate=dates[0],
target_type=types,
url=job.target_url,
)
taskInfo = TaskInfo(
taskStart=True,
executeStrategy=job.cron,
taskId=job.id,
url=job.target_url,
taskMd5=md5,
)
if job.id not in regist_jobs:
add_guduo_job(param, taskInfo)
logger.info(f"添加任务成功,任务id:{job.id}")
else:
exists_jobs: List[ApschedulerJob] = get_job_info(job.id)
if exists_jobs[0].taskMd5 != md5:
delete_job(job.id)
add_guduo_job(param, taskInfo)
logger.info(f"更新任务成功,任务id:{job.id}")
logger.info("爬虫配置完成")
def calculate_md5(*fields):
"""
计算多个字段的MD5值
:param fields: 多个字段
:return: MD5值
"""
md5 = hashlib.md5()
for field in fields:
md5.update(str(field).encode("utf-8"))
return md5.hexdigest()
def get_dates() -> Tuple[str, str]:
"""
获取今天与昨天的日期 格式是 yyyy-MM-dd
:return: Tuple[str, str]
"""
today = datetime.today()
yesterday = today - timedelta(days=1)
today_str = today.strftime("%Y-%m-%d")
yesterday_str = yesterday.strftime("%Y-%m-%d")
return today_str, yesterday_str
if __name__ == "__main__":
new_dict = {}
print(new_dict.get("a"))
print(hash((1, "b", True)))
new_dict.update({"a": 1})
print("a" in new_dict)