Commit 411051ea 411051eabdafe26ca50096c1b5a77230554cbcbd by wenxin
2 parents 39153b19 2c400f99
......@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Spider" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="spider" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PackageRequirementsSettings">
......
......@@ -3,4 +3,5 @@
<component name="Black">
<option name="sdkName" value="Spider" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="spider" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
......
import logging
import os
import sys
# 把当前文件所在文件夹的父文件夹路径加入到PYTHONPATH 解决ModuleNotFoundError: No module named 'app'
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import uvicorn
......@@ -12,9 +13,9 @@ from app.config.log_config import getLogConfig
app = FastAPI(lifespan=lifespan)
app.include_router(spider_router.spiderRouter)
if __name__ == '__main__':
if __name__ == "__main__":
appConf = getAppConfig()
uvicorn.run("main:app", host="0.0.0.0",
port=appConf.server.port,
log_config=getLogConfig())
uvicorn.run(
"main:app", host="0.0.0.0", port=appConf.server.port, log_config=getLogConfig()
)
logging.info(f"启动成功->{appConf.server}")
......
......@@ -13,6 +13,6 @@ class SpiderModel(Model):
sourceName = fields.CharField(max_length=255, description="媒体资源名字", source_field='source_name')
score = fields.FloatField(description="热度得分")
createTime = fields.DatetimeField(default=datetime.now, description="创建时间", source_field='create_time')
updateTime = fields.DatetimeField(default=datetime.now, description="更新时间", source_field='update_time')
class Meta:
table = "spider_data"
......
......@@ -10,24 +10,32 @@ class TypeEnum(IntEnum):
EPISODES = 1 # 剧集
MOVIE = 2 # 电影
ANIME = 3 # 动漫
VARIETY = 4 # 综艺
def get_precise_positioning(self, other) -> List[str]:
if self.ANIME == other:
return ['.cateIcon_5',
'li.anime-platform:nth-child(1)']
return [".cateIcon_5", "li.anime-platform:nth-child(1)"]
if self.MOVIE == other:
return ['.cateIcon_3',
'ul.type-box:nth-child(1) > li:nth-child(1)']
return [".cateIcon_3", "ul.type-box:nth-child(1) > li:nth-child(1)"]
if self.EPISODES == other:
return ['.cateIcon_1',
'div.category:nth-child(1) > ul:nth-child(1) > li:nth-child(1)',
'ul.type-box:nth-child(1) > li:nth-child(1)']
return [
".cateIcon_1",
"div.category:nth-child(1) > ul:nth-child(1) > li:nth-child(1)",
"ul.type-box:nth-child(1) > li:nth-child(1)",
]
if self.VARIETY == other:
return [
"div.category:nth-child(1) > ul:nth-child(1) > li:nth-child(1)",
"ul.type-box:nth-child(1) > li:nth-child(1)",
]
class GuoDuoSpiderResult(BaseModel):
# time 数据时间 yyyy-MM-dd 格式的字符窜
time: str = Field(..., description="数据时间 yyyy-MM-dd 格式的字符", pattern='^\d{4}-\d{2}-\d{2}$')
# type '1剧集 2电影 3 动漫',
time: str = Field(
..., description="数据时间 yyyy-MM-dd 格式的字符", pattern="^\d{4}-\d{2}-\d{2}$"
)
# type '1剧集 2电影 3 动漫 4综艺',
targetType: TypeEnum = Field(..., description="数据类型")
# platform 平台名字
platform: str
......@@ -43,8 +51,12 @@ class GuoDuoSpiderResult(BaseModel):
class SpiderParams(BaseModel):
startDate: str = Field(..., description="数据时间 yyyy-MM-dd 格式的字符", pattern='^\d{4}-\d{2}-\d{2}$')
endDate: str = Field(..., description="数据时间 yyyy-MM-dd 格式的字符", pattern='^\d{4}-\d{2}-\d{2}$')
startDate: str = Field(
..., description="数据时间 yyyy-MM-dd 格式的字符", pattern="^\d{4}-\d{2}-\d{2}$"
)
endDate: str = Field(
..., description="数据时间 yyyy-MM-dd 格式的字符", pattern="^\d{4}-\d{2}-\d{2}$"
)
# type '1剧集 2电影 3 动漫',
target_type: List[TypeEnum]
# 爬取的地址
......@@ -57,12 +69,12 @@ class SpiderParams(BaseModel):
start_date = self.startDate
end_date = self.endDate
time_range = []
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d')
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d')
start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
end_date_obj = datetime.strptime(end_date, "%Y-%m-%d")
delta = end_date_obj - start_date_obj
for i in range(delta.days + 1):
day = start_date_obj + timedelta(days=i)
time_range.append(day.strftime('%Y-%m-%d'))
time_range.append(day.strftime("%Y-%m-%d"))
return time_range
......
......@@ -3,9 +3,13 @@ import logging
from typing import List, Any
from app.model.mysql_model import SpiderModel
from superstream import Stream
from app.job.job import scheduler
from app.schemas.spider_schema import ApschedulerJob, TaskInfo, SpiderParams, GuoDuoSpiderResult
from app.schemas.spider_schema import (
ApschedulerJob,
TaskInfo,
SpiderParams,
GuoDuoSpiderResult,
)
from app.spider.guduo_spider import startBrowser
logger = logging.getLogger(__name__)
......@@ -16,21 +20,29 @@ def add_job(params: List[Any], taskInfo: TaskInfo, task: Any):
cron = taskInfo.executeStrategy
cron_parameterization = cron.split(" ")
minute = cron_parameterization[1]
minute = (None if minute == "?" else minute)
minute = None if minute == "?" else minute
hour = cron_parameterization[2]
hour = (None if hour == "?" else hour)
hour = None if hour == "?" else hour
day = cron_parameterization[3]
day = (None if day == "?" else day)
day = None if day == "?" else day
month = cron_parameterization[4]
month = (None if month == "?" else month)
month = None if month == "?" else month
weekend = cron_parameterization[5]
weekend = (None if weekend == "?" else weekend)
job = scheduler.add_job(task, "cron",
minute=minute, hour=hour, day=day, month=month, week=weekend,
args=params)
weekend = None if weekend == "?" else weekend
job = scheduler.add_job(
task,
"cron",
minute=minute,
hour=hour,
day=day,
month=month,
week=weekend,
args=params,
)
jobs.append(ApschedulerJob(jobId=job, taskId=taskInfo.taskId))
logger.info(
f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}")
f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}"
)
def add_guduo_job(taskParam: SpiderParams, taskInfo: TaskInfo):
......@@ -52,18 +64,17 @@ def delete_job(taskId: int):
def get_job_info(taskId: int):
job = (Stream(jobs)
.filter(lambda x: x.taskId == taskId)
.find_first())
return f'job 信息->{job}'
job = Stream(jobs).filter(lambda x: x.taskId == taskId).find_first()
return f"job 信息->{job}"
async def scrawl_and_save(taskParam: SpiderParams):
# 执行爬虫获取结果
results = await startBrowser(taskParam)
logger.info(f"爬虫重试情况:{startBrowser.statistics}")
asyncTasks = (save_or_update(item) for item in results)
await asyncio.gather(*asyncTasks)
logger.info(f'爬虫任务执行完成,爬取到数据{len(results)}条 保存到数据库完成')
logger.info(f"爬虫任务执行完成,爬取到数据{len(results)}条 保存到数据库完成")
async def save_or_update(result: GuoDuoSpiderResult):
......@@ -75,10 +86,7 @@ async def save_or_update(result: GuoDuoSpiderResult):
createTime = result.createTime
# 使用SpiderModel 查询 time targetType platform sourceName 等于以上的值是否存在如果存在就更新不存在就插入
obj = await SpiderModel.get_or_none(
time=time,
targetType=targetType,
platform=platform,
sourceName=sourceName
time=time, targetType=targetType, platform=platform, sourceName=sourceName
)
if obj:
for key, value in result.__dict__.items():
......@@ -90,6 +98,6 @@ async def save_or_update(result: GuoDuoSpiderResult):
platform=platform,
sourceName=sourceName,
score=score,
createTime=createTime
createTime=createTime,
)
await obj.save()
......
......@@ -6,11 +6,25 @@ from playwright.async_api import Page, async_playwright
from superstream import Stream
from app.schemas.spider_schema import GuoDuoSpiderResult, SpiderParams, TypeEnum
import logging
from tqdm.asyncio import tqdm
from tqdm.asyncio import tqdm_asyncio
from tenacity import (
after_log,
before_sleep_log,
retry,
stop_after_attempt,
wait_exponential,
wait_fixed,
)
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.DEBUG),
wait=wait_exponential(multiplier=1, min=1, max=10),
after=after_log(logger, logging.DEBUG),
)
async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
time_range = spiderParam.get_time_range()
url = spiderParam.url
......@@ -18,16 +32,16 @@ async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
# 创建指定个数的浏览器页面
page_tasks = (browser.new_page() for _ in range(len(time_range)))
pages_generator = tqdm.as_completed(page_tasks)
pages = list(pages_generator)
pages = await tqdm_asyncio.gather(
*(browser.new_page() for _ in range(len(time_range)))
)
# 同时循环time_range与pages 去调用hand_one_data异步方法
hand_result_tasks = (
hand_one_data(time_range[i], pages[i], scrawl_types, url)
for i in range(len(time_range))
results = await tqdm_asyncio.gather(
*(
hand_one_data(time_range[i], pages[i], scrawl_types, url)
for i in range(len(time_range))
)
)
results__generator = tqdm.as_completed(hand_result_tasks)
results = list(results__generator)
return (
Stream(results)
.filter(lambda x: x is not None)
......@@ -38,7 +52,7 @@ async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
async def hand_one_data(
targetDate: str, page: Page, scrawl_types: List[TypeEnum], url: str
targetDate: str, page: Page, scrawl_types: List[TypeEnum], url: str
) -> List[GuoDuoSpiderResult]:
# 把date这个日期对象解析为年、月、日
year, month, day = targetDate.split("-")
......@@ -77,17 +91,17 @@ async def hand_one_data(
async def get_data(
targetPage: Page, scrawl_type: int, target_time: str
targetPage: Page, scrawl_type: int, target_time: str
) -> Optional[List[GuoDuoSpiderResult]]:
target_data_css_selector = ".rank-box"
table = targetPage.locator(target_data_css_selector)
if table is None:
logger.info(f"当前页面获取table为空:{scrawl_type}")
print(f"当前页面获取table为空:{scrawl_type}")
return None
# 获取当前这个doc_obj元素下面所有的tr标签
trs = table.locator("tr")
if trs is None:
logger.info(f"当前页面获取tr为空:{scrawl_type}")
print(f"当前页面获取tr为空:{scrawl_type}")
# 循环遍历trs下面的每个tr标签下面的td标签
at = await trs.all_text_contents()
result = (
......@@ -113,12 +127,16 @@ if __name__ == "__main__":
# 获取程序开始执行的时间
start_time = time.time()
param = SpiderParams(
startDate="1991-01-02",
endDate="1991-01-05",
target_type=[TypeEnum.ANIME, TypeEnum.EPISODES, TypeEnum.MOVIE],
startDate="2024-01-02",
endDate="2024-01-15",
target_type=[
TypeEnum.ANIME,
TypeEnum.EPISODES,
TypeEnum.MOVIE,
TypeEnum.VARIETY,
],
url="http://guduodata.com/",
)
res = asyncio.run(startBrowser(param))
print(f"程序执行耗时时间:{time.time() - start_time} 长度为:{len(res)}")
for it in res:
print(it)
print(f"代码重试情况:{startBrowser.statistics}")
print(f"程序执行耗时时间:{(time.time() - start_time)/1000}长度为:{len(res)}")
......
......@@ -4,24 +4,33 @@ anyio==4.7.0
APScheduler==3.11.0
asyncmy==0.2.10
click==8.1.7
colorama==0.4.6
distlib==0.3.8
exceptiongroup==1.2.2
fastapi==0.115.6
greenlet==3.1.1
h11==0.14.0
idna==3.10
iso8601==2.1.0
pandoc==2.3
playwright==1.49.1
plumbum==1.8.3
ply==3.11
pydantic==2.10.4
pydantic_core==2.27.2
pyee==12.0.0
pypika-tortoise==0.3.2
pytz==2024.2
pywin32==306
PyYAML==6.0.2
sniffio==1.3.1
starlette==0.41.3
SuperStream==0.2.6
tenacity==9.0.0
tortoise-orm==0.22.2
tqdm==4.67.1
typing_extensions==4.12.2
tzdata==2024.2
tzlocal==5.2
uvicorn==0.34.0
virtualenv==20.25.0
......