Commit 7acfaa5f 7acfaa5f6198ac74808ec267b3923afd065d4b78 by wenxin

从接口获取数据

1 parent 5e498700
......@@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="Spider" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="spider" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Spider" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
......
......@@ -5,16 +5,15 @@ tortoise:
default:
engine: tortoise.backends.mysql
credentials:
database: ams_test
host: 172.0.31.57
password: Tjlh@2017
database: spider
host: 192.168.1.91
password: Tjlh@2023
port: 3306
user: root
minsize: 10
maxsize: 200
connect_timeout: 30
echo: True
timezone: Asia/Shanghai
apps:
models_read:
models:
......@@ -24,4 +23,14 @@ tortoise:
getSpider:
# 设置设置爬虫定时任务时间间隔 单位是分钟
interval: 5
interval: 1
urlTemplate:
# 1 电视剧 2 电影 3 综艺 4 动漫
- type: 1
template: "/m/v3/billboard/list?type=DAILY&category=ALL_ANIME&date={date}&attach=gdi&orderTitle=gdi&platformId=0"
- type: 2
template: "/m/v3/billboard/list?type=DAILY&category=NETWORK_MOVIE&date={date}&attach=gdi&orderTitle=gdi&platformId=0"
- type: 3
template: "/m/v3/billboard/list?type=DAILY&category=NETWORK_VARIETY&date={date}&attach=gdi&orderTitle=gdi&platformId=0"
- type: 4
template: "/m/v3/billboard/list?type=DAILY&category=ALL_ANIME&date={date}&attach=gdi&orderTitle=gdi&platformId=0"
......
......@@ -8,8 +8,9 @@ import logging
logger = logging.getLogger(__name__)
@functools.lru_cache()
def getAppConfig()->AppConfig:
def getAppConfig() -> AppConfig:
logger.info('开始加载AppConfig')
# 获取当前文件的绝对路径
current_file_path = os.path.abspath(__file__)
......
from datetime import datetime, timedelta
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
from app.model.mysql_model import XWebCrawler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from superstream import Stream
from app.config.app_config import getAppConfig
from app.model.mysql_model import XWebCrawler
from app.schemas.spider_schema import ApschedulerJob, SpiderParams, TaskInfo, TypeEnum
logger = logging.getLogger(__name__)
......@@ -19,7 +21,6 @@ scheduler = AsyncIOScheduler()
@scheduler.scheduled_job("interval", minutes=getAppConfig().getSpider.get("interval"))
async def get_spider_config():
from app.service.spider_job_service import (
add_job,
add_guduo_job,
delete_job,
get_job_info,
......@@ -40,9 +41,9 @@ async def get_spider_config():
# 获取已经注册的任务
jobs: List[ApschedulerJob] = get_job_info()
# 处理已经关闭的任务
regist_jobs: Dict[int, ApschedulerJob] = Stream(jobs).group_by(lambda x: x.taskId)
registered_jobs: Dict[int, ApschedulerJob] = Stream(jobs).to_dict(lambda x: x.taskId, lambda y: y)
for job in status_group.get(0, []):
if job.id in regist_jobs:
if job.id in registered_jobs:
delete_job(job.id)
logger.info(f"删除任务成功,任务id:{job.id}")
# 处理新的定时任务
......@@ -66,7 +67,7 @@ async def get_spider_config():
url=job.target_url,
taskMd5=md5,
)
if job.id not in regist_jobs:
if job.id not in registered_jobs:
add_guduo_job(param, taskInfo)
logger.info(f"添加任务成功,任务id:{job.id}")
else:
......
......@@ -15,7 +15,7 @@ handlers:
class: logging.handlers.TimedRotatingFileHandler
level: INFO
formatter: standard
filename: 'logs/app.log' # 日志文件的路径和名称
filename: '/Users/mac/PycharmProjects/pythonProject/FastAPIProject1/app/logs/app.log' # 日志文件的路径和名称
when: midnight # 按天分割
interval: 1 # 每天分割一次
backupCount: 7 # 保留最近7天的日志文件
......
This diff could not be displayed because it is too large.
......@@ -46,6 +46,12 @@ class Port(BaseModel):
port: int
class UrlTemplateInfo(BaseModel):
type: int
template: str
class AppConfig(BaseModel):
server: Port
getSpider: Dict[str, int]
urlTemplate: List[UrlTemplateInfo]
......
import threading
from typing import TypeVar, List, Dict, Generic
K = TypeVar('K') # 定义类型变量
V = TypeVar('V')
class SafeDict(Generic[K, V]):
def __init__(self):
self.lock = threading.Lock()
self.data: Dict[K, V] = {}
def put(self, key: K, value: V):
with self.lock:
self.data.update({key: value})
def remove(self, key: K):
with self.lock:
self.data.pop(key)
def values(self) -> List[V]:
with self.lock:
return list(self.data.values())
def get(self, key: K) -> V:
with self.lock:
return self.data.get(key)
if __name__ == '__main__':
test: SafeDict[int, str] = SafeDict()
test.put(1, '1')
print(test.get(1))
print(test.get(2))
t2 = {}
print(t2.get(1))
from datetime import timedelta, datetime
from enum import IntEnum
from typing import List
from typing import List, Optional
from apscheduler.job import Job
from pydantic import BaseModel, Field
......@@ -17,19 +17,22 @@ class TypeEnum(IntEnum):
def get_precise_positioning(self, other) -> List[str]:
if self.ANIME == other:
return [".cateIcon_5", "li.anime-platform:nth-child(1)"]
return ["html body div#app.wrap div.content.index div.content-box div.category.category-nav ul li.active",
"li.anime-platform:nth-child(1)"]
if self.MOVIE == other:
return [".cateIcon_3", "ul.type-box:nth-child(1) > li:nth-child(1)"]
return ["li.active:nth-child(3)",
"ul.type-box:nth-child(1) > li:nth-child(1)"]
if self.EPISODES == other:
return [
".cateIcon_1",
"div.category:nth-child(3) > ul:nth-child(1) > li:nth-child(1)",
"div.category:nth-child(1) > ul:nth-child(1) > li:nth-child(1)",
"ul.type-box:nth-child(1) > li:nth-child(1)",
]
if self.VARIETY == other:
return [
"div.category:nth-child(3) > ul:nth-child(1) > li:nth-child(2)",
"div.category:nth-child(1) > ul:nth-child(1) > li:nth-child(1)",
"ul.type-box:nth-child(1) > li:nth-child(1)",
"ul.type-box:nth-child(1) > li:nth-child(1)"
]
......@@ -96,3 +99,57 @@ class TaskInfo(BaseModel):
taskId: int
url: str
taskMd5: str
class PlatformImage(BaseModel):
url: str
class DataItem(BaseModel):
showId: int | None
name: str
category: str | None
releaseDate: int | None
days: int | None
offlineDate: int | None
releaseStatus: int | None
episode: int | None
gdi: str | None
gdiFloat: float
attachData: str
rank: int | None
rise: int | None
platformImgs: List[str] | None
topHotCount: Optional[int] = None
topHotCountDay: Optional[int] = None
totalPlayCount: int | None
class ResponseModel(BaseModel):
code: int
msg: str
data: List[DataItem]
extra: Optional[dict] = None
if __name__ == "__main__":
from datetime import datetime
# 时间戳(以毫秒为单位)
release_date_timestamp = 1573488000000
offline_date_timestamp = 1576080000000
# 将时间戳转换为datetime对象,并转换为'yyyy-MM-dd'格式
def timestamp_to_date_string(timestamp_ms):
# Convert from milliseconds to seconds by dividing by 1000, then use fromtimestamp
dt_object = datetime.fromtimestamp(timestamp_ms / 1000)
return dt_object.strftime('%Y-%m-%d')
# 调用函数并打印结果
formatted_release_date = timestamp_to_date_string(release_date_timestamp)
formatted_offline_date = timestamp_to_date_string(offline_date_timestamp)
print(f"Release Date: {formatted_release_date}")
print(f"Offline Date: {formatted_offline_date}")
......
import asyncio
from datetime import datetime
import logging
from typing import List, Any
from app.model.mysql_model import SpiderModel
from superstream import Stream
from datetime import datetime
from multiprocessing import Manager
from typing import List, Any, Dict
from app.schemas.safe_contrainer import SafeDict
from app.job.job import scheduler
from app.model.mysql_model import SpiderModel
from app.schemas.spider_schema import (
ApschedulerJob,
TaskInfo,
SpiderParams,
GuoDuoSpiderResult,
)
from app.spider.guduo_spider import startBrowser
from app.spider.http_spider import get_score_data
logger = logging.getLogger(__name__)
jobs: List[ApschedulerJob] = []
jobs: SafeDict[int, ApschedulerJob] = SafeDict()
def add_job(params: List[Any], taskInfo: TaskInfo, task: Any):
......@@ -40,7 +41,9 @@ def add_job(params: List[Any], taskInfo: TaskInfo, task: Any):
week=weekend,
args=params,
)
jobs.append(ApschedulerJob(jobId=job, taskId=taskInfo.taskId,taskMd5=taskInfo.md5()))
jobs.put(taskInfo.taskId,
ApschedulerJob(jobId=job, taskId=taskInfo.taskId, taskMd5=taskInfo.taskMd5)
)
logger.info(
f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}"
)
......@@ -57,26 +60,21 @@ def add_guduo_job(taskParam: SpiderParams, taskInfo: TaskInfo):
def delete_job(taskId: int):
for job in jobs:
if job.taskId == taskId:
job.jobId.remove()
jobs.remove(job)
jobs.remove(taskId)
logger.info(f"删除任务成功,任务id:{taskId}")
def get_job_info(taskId: int) -> List[ApschedulerJob]:
def get_job_info(taskId: int = None) -> List[ApschedulerJob]:
if taskId is None:
return jobs
job = Stream(jobs).filter(lambda x: x.taskId == taskId).to_list()
return f"job 信息->{job}"
return jobs.values()
return [jobs.get(taskId)] if jobs.get(taskId) else []
async def scrawl_and_save(taskParam: SpiderParams):
try:
# 执行爬虫获取结果 给下面一行代码添加 try cache try 捕获异常
results = await startBrowser(taskParam)
results = await get_score_data(taskParam)
except Exception as e:
logger.info(f"爬虫重试情况:{startBrowser.statistics}")
logger.error(f"爬虫任务执行失败,失败原因:{e}")
return
asyncTasks = (save_or_update(item) for item in results)
......
......@@ -18,18 +18,12 @@ from tenacity import (
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.DEBUG),
wait=wait_exponential(multiplier=1, min=1, max=10),
after=after_log(logger, logging.DEBUG),
)
async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
time_range = spiderParam.get_time_range()
url = spiderParam.url
scrawl_types = spiderParam.target_type
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
browser = await p.firefox.launch(headless=False)
# 创建指定个数的浏览器页面
pages = await tqdm_asyncio.gather(
*(browser.new_page() for _ in range(len(time_range)))
......@@ -51,14 +45,14 @@ async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
async def hand_one_data(
targetDate: str, page: Page, scrawl_types: List[TypeEnum], url: str
targetDate: str, page: Page, scrawl_types: List[TypeEnum], url: str
) -> List[GuoDuoSpiderResult]:
# 把date这个日期对象解析为年、月、日
year, month, day = targetDate.split("-")
day = day.replace("0", "")
month = int(month) - 1
await page.goto(url)
# 等待遮罩层消失
# 点击
await page.click('//*[@id="currentDateTxt"]')
data_css_selector = (
f'#day span.pika-button.pika-day[type="button"]'
......@@ -70,6 +64,10 @@ async def hand_one_data(
# 判断指定元素是否存在如果不存在就返回空的[]
if not await doc.is_visible():
return []
# 检查日期是否可点击
if await doc.get_attribute('class') == 'pika-button pika-day is-disabled':
logger.warning(f"日期 {targetDate} 被禁用,无法点击")
return []
# 点击指定日期
await doc.click()
# 最后一步修正定位
......@@ -90,7 +88,7 @@ async def hand_one_data(
async def get_data(
targetPage: Page, scrawl_type: int, target_time: str
targetPage: Page, scrawl_type: int, target_time: str
) -> Optional[List[GuoDuoSpiderResult]]:
target_data_css_selector = ".rank-box"
table = targetPage.locator(target_data_css_selector)
......@@ -122,12 +120,22 @@ async def get_data(
return result
@retry(
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.DEBUG),
wait=wait_exponential(multiplier=1, min=1, max=10),
after=after_log(logger, logging.DEBUG),
)
async def retry_job(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
return await startBrowser(spiderParam)
if __name__ == "__main__":
# 获取程序开始执行的时间
start_time = time.time()
param = SpiderParams(
startDate="2024-01-02",
endDate="2024-01-15",
startDate="2024-12-22",
endDate="2024-12-23",
target_type=[
TypeEnum.ANIME,
TypeEnum.EPISODES,
......@@ -136,6 +144,6 @@ if __name__ == "__main__":
],
url="http://guduodata.com/",
)
res = asyncio.run(startBrowser(param))
print(f"代码重试情况:{startBrowser.statistics}")
print(f"程序执行耗时时间:{(time.time() - start_time)/1000}长度为:{len(res)}")
res = asyncio.run(retry_job(param))
print(f"代码重试情况:{retry_job.statistics}")
print(f"程序执行耗时时间:{(time.time() - start_time) / 1000}长度为:{len(res)}")
......
import asyncio
import datetime
import time
from typing import List, Dict, Tuple
import aiohttp
from tenacity import retry, stop_after_attempt, before_sleep_log, wait_exponential, after_log
import logging
from app.schemas.config_schema import UrlTemplateInfo
from app.schemas.spider_schema import SpiderParams, TypeEnum, GuoDuoSpiderResult, ResponseModel
from app.config.app_config import getAppConfig
from superstream import Stream
logger = logging.getLogger(__name__)
def build_request_url(spiderParam: SpiderParams) -> Dict[TypeEnum, List[Tuple[str, str]]]:
base_url = spiderParam.url
targets = spiderParam.target_type
templates: List[UrlTemplateInfo] = getAppConfig().urlTemplate
templates_group: Dict[int, str] = Stream(templates).to_map(lambda x: x.type, lambda x: x.template)
# 时间范围中的每个时间
time_range = spiderParam.get_time_range()
target_urls: Dict[TypeEnum, List[Tuple[str, str]]] = {}
for target in targets:
template = templates_group[target.value]
url_type = [(base_url + template.format(date=item), item) for item in time_range]
target_urls[target] = url_type
return target_urls
@retry(
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.DEBUG),
wait=wait_exponential(multiplier=1, min=1, max=10),
after=after_log(logger, logging.DEBUG),
)
async def fetch(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_and_parse(urls: List[Tuple[str, str]]) -> Dict[str, ResponseModel]:
async with aiohttp.ClientSession() as session:
requests = [fetch(session, url[0]) for url in urls]
results = await asyncio.gather(*requests)
return_data = {}
for k, v in zip(urls, results):
return_data[k[1]] = ResponseModel(**v)
return return_data
def parse_response_to_spider_result(guduo_responses: Dict[str, ResponseModel], dataType: TypeEnum) -> List[GuoDuoSpiderResult]:
results: List[GuoDuoSpiderResult] = []
for k, v in guduo_responses.items():
time_request = k
targetType = dataType
platform = 'ALL'
createTime = datetime.datetime.now()
for item in v.data:
sourceName = item.name
score = item.gdiFloat
results.append(GuoDuoSpiderResult(time=time_request, targetType=targetType,
platform=platform, sourceName=sourceName,
score=score, createTime=createTime))
return results
async def batch_fetch_and_parse(urls: List[Tuple[str, str]], enum: TypeEnum):
result = await fetch_and_parse(urls)
return parse_response_to_spider_result(result, enum)
async def get_score_data(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
"""
获取评分数据
:param spiderParam: 爬取参数
:return:
"""
url_infos = build_request_url(spiderParam)
tasks = [batch_fetch_and_parse(urls, key) for key, urls in url_infos.items()]
results = await asyncio.gather(*tasks)
spider_datas = [item for sublist in results for item in sublist]
return spider_datas
if __name__ == '__main__':
# 获取程序开始执行的时间
start_time = time.time()
param = SpiderParams(
startDate="2024-12-22",
endDate="2024-12-23",
target_type=[
TypeEnum.ANIME,
TypeEnum.EPISODES,
TypeEnum.MOVIE,
TypeEnum.VARIETY,
],
url="http://guduodata.com",
)
res = asyncio.run(get_score_data(param))
print(f"程序执行耗时时间:{(time.time() - start_time) / 1000}长度为:{len(res)}")
print(res)
......@@ -33,3 +33,5 @@ tzdata==2024.2
tzlocal==5.2
uvicorn==0.34.0
virtualenv==20.25.0
aiohttp~=3.11.11
\ No newline at end of file
......