Commit 50c996ca 50c996cab94055b1bb62dd2da5cce62d08a90373 by wenxin

添加从数据库获取定时任务信息

1 parent 74924ed8
...@@ -5,9 +5,9 @@ tortoise: ...@@ -5,9 +5,9 @@ tortoise:
5 default: 5 default:
6 engine: tortoise.backends.mysql 6 engine: tortoise.backends.mysql
7 credentials: 7 credentials:
8 database: fast_api 8 database: cms_test
9 host: 127.0.0.1 9 host: 172.0.31.57
10 password: root 10 password: Tjlh@2017
11 port: 3306 11 port: 3306
12 user: root 12 user: root
13 minsize: 10 13 minsize: 10
...@@ -20,3 +20,7 @@ tortoise: ...@@ -20,3 +20,7 @@ tortoise:
20 - app.model.mysql_model 20 - app.model.mysql_model
21 default_connection: default 21 default_connection: default
22 log_queries: true # 启用日志查询 22 log_queries: true # 启用日志查询
23
24 getSpider:
25 # 设置设置爬虫定时任务时间间隔 单位是分钟
26 interval: 5
...\ No newline at end of file ...\ No newline at end of file
......
1 import functools
1 import os 2 import os
2 import yaml 3 import yaml
3 4
...@@ -7,8 +8,8 @@ import logging ...@@ -7,8 +8,8 @@ import logging
7 8
8 logger = logging.getLogger(__name__) 9 logger = logging.getLogger(__name__)
9 10
10 11 @functools.lru_cache()
11 def getAppConfig(): 12 def getAppConfig()->AppConfig:
12 logger.info('开始加载AppConfig') 13 logger.info('开始加载AppConfig')
13 # 获取当前文件的绝对路径 14 # 获取当前文件的绝对路径
14 current_file_path = os.path.abspath(__file__) 15 current_file_path = os.path.abspath(__file__)
......
1 import functools
1 import os 2 import os
2 import yaml 3 import yaml
3 from app.schemas.config_schema import TortoiseConfig 4 from app.schemas.config_schema import TortoiseConfig
...@@ -5,7 +6,7 @@ import logging ...@@ -5,7 +6,7 @@ import logging
5 6
6 logger = logging.getLogger(__name__) 7 logger = logging.getLogger(__name__)
7 8
8 9 @functools.lru_cache()
9 def getTortoiseConfig(): 10 def getTortoiseConfig():
10 logger.info('开始加载TortoiseConfig') 11 logger.info('开始加载TortoiseConfig')
11 # 获取当前文件的绝对路径 12 # 获取当前文件的绝对路径
......
1 from datetime import datetime 1 from datetime import datetime, timedelta
2 2 import hashlib
3 import logging
4 from typing import Dict, List, Tuple
5 from app.model.mysql_model import XWebCrawler
3 from apscheduler.schedulers.asyncio import AsyncIOScheduler 6 from apscheduler.schedulers.asyncio import AsyncIOScheduler
7 from superstream import Stream
8 from app.config.app_config import getAppConfig
9 from app.schemas.spider_schema import ApschedulerJob, SpiderParams, TaskInfo, TypeEnum
4 10
11 logger = logging.getLogger(__name__)
5 12
6 # 导入asynccontextmanager用于创建异步上下文管理器 13 # 导入asynccontextmanager用于创建异步上下文管理器
7 14
...@@ -9,11 +16,95 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler ...@@ -9,11 +16,95 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
9 scheduler = AsyncIOScheduler() 16 scheduler = AsyncIOScheduler()
10 17
11 18
12 # 每分钟执行的定时任务 19 @scheduler.scheduled_job("interval", minutes=getAppConfig().getSpider.get("interval"))
13 @scheduler.scheduled_job('interval', minutes=1) 20 async def get_spider_config():
14 async def cron_job(): 21 from app.service.spider_job_service import (
15 # 执行任务的内容,例如打印当前时间 22 add_job,
16 print(f"The current time is {datetime.now()}") 23 add_guduo_job,
24 delete_job,
25 get_job_info,
26 )
27
28 logger.info("开始获取爬虫配置")
29 web_crawlers: List[XWebCrawler] = await XWebCrawler.all()
30 if not web_crawlers:
31 logger.info("未获取到爬虫任务")
32 return
33 status_group: Dict[int, List[XWebCrawler]] = Stream(web_crawlers).group_by(
34 lambda x: x.status
35 )
36 active_jobs = status_group.get(1, [])
37 if not active_jobs:
38 logger.info("未获取到启用的爬虫任务")
39 return
40 # 获取已经注册的任务
41 jobs: List[ApschedulerJob] = get_job_info()
42 # 处理已经关闭的任务
43 regist_jobs: Dict[int, ApschedulerJob] = Stream(jobs).group_by(lambda x: x.taskId)
44 for job in status_group.get(0, []):
45 if job.id in regist_jobs:
46 delete_job(job.id)
47 logger.info(f"删除任务成功,任务id:{job.id}")
48 # 处理新的定时任务
49 for job in status_group.get(1, []):
50 # 构建参数
51 # 获得今天的日期与昨天的日期 格式是 yyyy-MM-dd
52 dates = get_dates()
53 types = [TypeEnum.EPISODES, TypeEnum.MOVIE, TypeEnum.ANIME, TypeEnum.VARIETY]
54 md5 = calculate_md5(job.id, job.target_url, job.param_info, job.cron)
55 # 计算
56 param = SpiderParams(
57 startDate=dates[1],
58 endDate=dates[0],
59 target_type=types,
60 url=job.target_url,
61 )
62 taskInfo = TaskInfo(
63 taskStart=True,
64 executeStrategy=job.cron,
65 taskId=job.id,
66 url=job.target_url,
67 taskMd5=md5,
68 )
69 if job.id not in regist_jobs:
70 add_guduo_job(param, taskInfo)
71 logger.info(f"添加任务成功,任务id:{job.id}")
72 else:
73 exists_jobs: List[ApschedulerJob] = get_job_info(job.id)
74 if exists_jobs[0].taskMd5 != md5:
75 delete_job(job.id)
76 add_guduo_job(param, taskInfo)
77 logger.info(f"更新任务成功,任务id:{job.id}")
78 logger.info("爬虫配置完成")
79
80
81 def calculate_md5(*fields):
82 """
83 计算多个字段的MD5值
84 :param fields: 多个字段
85 :return: MD5值
86 """
87 md5 = hashlib.md5()
88 for field in fields:
89 md5.update(str(field).encode("utf-8"))
90 return md5.hexdigest()
91
17 92
93 def get_dates() -> Tuple[str, str]:
94 """
95 获取今天与昨天的日期 格式是 yyyy-MM-dd
96 :return: Tuple[str, str]
97 """
98 today = datetime.today()
99 yesterday = today - timedelta(days=1)
100 today_str = today.strftime("%Y-%m-%d")
101 yesterday_str = yesterday.strftime("%Y-%m-%d")
102 return today_str, yesterday_str
18 103
19 104
105 if __name__ == "__main__":
106 new_dict = {}
107 print(new_dict.get("a"))
108 print(hash((1, "b", True)))
109 new_dict.update({"a": 1})
110 print("a" in new_dict)
......
...@@ -6,13 +6,54 @@ from app.schemas.spider_schema import TypeEnum ...@@ -6,13 +6,54 @@ from app.schemas.spider_schema import TypeEnum
6 6
7 7
8 class SpiderModel(Model): 8 class SpiderModel(Model):
9 """
10 爬虫结果数据模型
11 """
12
9 id = fields.IntField(pk=True) 13 id = fields.IntField(pk=True)
10 time = fields.CharField(max_length=10, description="数据时间 yyyy-MM-dd 格式的字符", regex=r'^\d{4}-\d{2}-\d{2}$') 14 time = fields.CharField(
11 targetType = fields.IntEnumField(TypeEnum, description="数据类型", source_field='target_type') 15 max_length=10,
16 description="数据时间 yyyy-MM-dd 格式的字符",
17 regex=r"^\d{4}-\d{2}-\d{2}$",
18 )
19 targetType = fields.IntEnumField(
20 TypeEnum, description="数据类型", source_field="target_type"
21 )
12 platform = fields.CharField(max_length=255, description="平台名字") 22 platform = fields.CharField(max_length=255, description="平台名字")
13 sourceName = fields.CharField(max_length=255, description="媒体资源名字", source_field='source_name') 23 sourceName = fields.CharField(
24 max_length=255, description="媒体资源名字", source_field="source_name"
25 )
14 score = fields.FloatField(description="热度得分") 26 score = fields.FloatField(description="热度得分")
15 createTime = fields.DatetimeField(default=datetime.now, description="创建时间", source_field='create_time') 27 createTime = fields.DatetimeField(
16 updateTime = fields.DatetimeField(default=datetime.now, description="更新时间", source_field='update_time') 28 default=datetime.now, description="创建时间", source_field="create_time"
29 )
30 updateTime = fields.DatetimeField(
31 default=datetime.now, description="更新时间", source_field="update_time"
32 )
33
17 class Meta: 34 class Meta:
18 table = "spider_data" 35 table = "spider_data"
36
37
38 class XWebCrawler(Model):
39 id = fields.BigIntField(pk=True, description="ID")
40 code = fields.CharField(max_length=64, unique=True, description="标识(crawl_)")
41 name = fields.CharField(max_length=255, description="名称")
42 description = fields.TextField(null=True, description="描述")
43 type = fields.IntField(default=0, description="类型0:普通;(待扩展)")
44 proxy_mode = fields.IntField(
45 default=0, description="代理模式0:不使用代理;(待扩展)"
46 )
47 status = fields.IntField(description="状态 0:关闭;1:启用;")
48 cron = fields.CharField(
49 max_length=32, description="执行周期,标准cron表达式(json)"
50 )
51 param_info = fields.TextField(null=True, description="参数信息")
52 target_url = fields.CharField(max_length=1000, null=True, description="目标地址")
53 sequence = fields.IntField(default=0, description="显示顺序")
54 create_time = fields.DatetimeField(null=True, description="创建时间")
55 update_time = fields.DatetimeField(null=True, description="更新时间")
56
57 class Meta:
58 table = "x_web_crawler"
59 table_description = "爬虫表"
......
...@@ -48,3 +48,4 @@ class Port(BaseModel): ...@@ -48,3 +48,4 @@ class Port(BaseModel):
48 48
49 class AppConfig(BaseModel): 49 class AppConfig(BaseModel):
50 server: Port 50 server: Port
51 getSpider:Dict[str,int]
......
...@@ -81,6 +81,7 @@ class SpiderParams(BaseModel): ...@@ -81,6 +81,7 @@ class SpiderParams(BaseModel):
81 class ApschedulerJob(BaseModel): 81 class ApschedulerJob(BaseModel):
82 jobId: Job 82 jobId: Job
83 taskId: int 83 taskId: int
84 taskMd5: str
84 85
85 class Config: 86 class Config:
86 arbitrary_types_allowed = True 87 arbitrary_types_allowed = True
...@@ -91,3 +92,4 @@ class TaskInfo(BaseModel): ...@@ -91,3 +92,4 @@ class TaskInfo(BaseModel):
91 executeStrategy: str 92 executeStrategy: str
92 taskId: int 93 taskId: int
93 url: str 94 url: str
95 taskMd5: str
......
...@@ -40,7 +40,7 @@ def add_job(params: List[Any], taskInfo: TaskInfo, task: Any): ...@@ -40,7 +40,7 @@ def add_job(params: List[Any], taskInfo: TaskInfo, task: Any):
40 week=weekend, 40 week=weekend,
41 args=params, 41 args=params,
42 ) 42 )
43 jobs.append(ApschedulerJob(jobId=job, taskId=taskInfo.taskId)) 43 jobs.append(ApschedulerJob(jobId=job, taskId=taskInfo.taskId,taskMd5=taskInfo.md5()))
44 logger.info( 44 logger.info(
45 f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}" 45 f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}"
46 ) 46 )
...@@ -64,8 +64,10 @@ def delete_job(taskId: int): ...@@ -64,8 +64,10 @@ def delete_job(taskId: int):
64 logger.info(f"删除任务成功,任务id:{taskId}") 64 logger.info(f"删除任务成功,任务id:{taskId}")
65 65
66 66
67 def get_job_info(taskId: int): 67 def get_job_info(taskId: int) -> List[ApschedulerJob]:
68 job = Stream(jobs).filter(lambda x: x.taskId == taskId).find_first() 68 if taskId is None:
69 return jobs
70 job = Stream(jobs).filter(lambda x: x.taskId == taskId).to_list()
69 return f"job 信息->{job}" 71 return f"job 信息->{job}"
70 72
71 73
......