Commit 78f7d02c 78f7d02cef4999f3a981e1636e3627252679bcdd by wenxin

init

0 parents
Showing 52 changed files with 732 additions and 0 deletions
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Spider" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PackageRequirementsSettings">
<option name="modifyBaseFiles" value="true" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DataSourceManagerImpl" format="xml" multifile-model="true">
<data-source source="LOCAL" name="docker" uuid="4b6d465a-8a9f-4d96-bc3c-4d40505aae14">
<driver-ref>mysql.8</driver-ref>
<synchronize>true</synchronize>
<jdbc-driver>com.mysql.cj.jdbc.Driver</jdbc-driver>
<jdbc-url>jdbc:mysql://localhost:3306</jdbc-url>
<working-dir>$ProjectFileDir$</working-dir>
</data-source>
</component>
</project>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="HttpUrlsUsage" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredUrls">
<list>
<option value="http://0.0.0.0" />
<option value="http://127.0.0.1" />
<option value="http://activemq.apache.org/schema/" />
<option value="http://cxf.apache.org/schemas/" />
<option value="http://guduodata.com" />
<option value="http://java.sun.com/" />
<option value="http://javafx.com/fxml" />
<option value="http://javafx.com/javafx/" />
<option value="http://json-schema.org/draft" />
<option value="http://localhost" />
<option value="http://maven.apache.org/POM/" />
<option value="http://maven.apache.org/xsd/" />
<option value="http://primefaces.org/ui" />
<option value="http://schema.cloudfoundry.org/spring/" />
<option value="http://schemas.xmlsoap.org/" />
<option value="http://tiles.apache.org/" />
<option value="http://www.ibm.com/webservices/xsd" />
<option value="http://www.jboss.com/xml/ns/" />
<option value="http://www.jboss.org/j2ee/schema/" />
<option value="http://www.springframework.org/schema/" />
<option value="http://www.springframework.org/security/tags" />
<option value="http://www.springframework.org/tags" />
<option value="http://www.thymeleaf.org" />
<option value="http://www.w3.org/" />
<option value="http://xmlns.jcp.org/" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyArgumentListInspection" enabled="true" level="INFORMATION" enabled_by_default="true" />
<inspection_tool class="PyMethodMayBeStaticInspection" enabled="false" level="WEAK WARNING" enabled_by_default="false" />
<inspection_tool class="PyPep8Inspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="E402" />
<option value="W605" />
<option value="E501" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N803" />
<option value="N802" />
<option value="N801" />
<option value="N806" />
<option value="N813" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="tensorflow" />
<option value="str.get" />
<option value="admin.Admin._Base__readUsers" />
<option value="admin.Admin._Base__changeActive" />
<option value="admin.Admin._Base__changeRole" />
<option value="admin.Admin._Base__writeGifts" />
<option value="admin.Admin._Base__reduceGift" />
<option value="user.User._Base__readUsers" />
<option value="user.User._Base__readGifts" />
<option value="urllib.request" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PydanticInspection" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Spider" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/FastAPIProject1.iml" filepath="$PROJECT_DIR$/.idea/FastAPIProject1.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings" defaultProject="true" />
</project>
\ No newline at end of file
No preview for this file type
server:
port: 7654
tortoise:
connections:
default:
engine: tortoise.backends.mysql
credentials:
database: fast_api
host: 127.0.0.1
password: root
port: 3306
user: root
minsize: 10
maxsize: 200
connect_timeout: 30
echo: true
apps:
models_read:
models:
- app.model.mysql_model
default_connection: default
log_queries: true # 启用日志查询
File mode changed
import os
import yaml
from app.schemas.config_schema import AppConfig
import logging
logger = logging.getLogger(__name__)
def getAppConfig():
logger.info('开始加载AppConfig')
# 获取当前文件的绝对路径
current_file_path = os.path.abspath(__file__)
# 获取当前文件的上级目录
parent_directory = os.path.dirname(current_file_path)
# 构建application.yaml文件的路径
application_yaml_path = os.path.join(parent_directory, '..', 'application.yaml')
with open(application_yaml_path, 'r') as f:
log_config = yaml.safe_load(f)
return AppConfig(**log_config)
import functools
import logging.config
import os
from typing import Any
import yaml
@functools.lru_cache()
def getLogConfig() -> dict[str, Any]:
current_file_path = os.path.abspath(__file__)
# 获取当前文件的上级目录
parent_directory = os.path.dirname(current_file_path)
# 构建application.yaml文件的路径
application_yaml_path = os.path.join(parent_directory, '..', 'log-config.yaml')
# 加载日志配置
with open(application_yaml_path, 'r') as f:
log_config = yaml.safe_load(f)
return log_config
# 配置根日志记录器以供其他模块使用
logging.config.dictConfig(getLogConfig())
import os
import yaml
from app.schemas.config_schema import TortoiseConfig
import logging
logger = logging.getLogger(__name__)
def getTortoiseConfig():
logger.info('开始加载TortoiseConfig')
# 获取当前文件的绝对路径
current_file_path = os.path.abspath(__file__)
# 获取当前文件的上级目录
parent_directory = os.path.dirname(current_file_path)
# 构建application.yaml文件的路径
application_yaml_path = os.path.join(parent_directory, '..', 'application.yaml')
with open(application_yaml_path, 'r') as f:
log_config = yaml.safe_load(f)
return TortoiseConfig(**(log_config.get('tortoise')))
File mode changed
No preview for this file type
from contextlib import asynccontextmanager
from tortoise import Tortoise
from app.job.job import scheduler
from fastapi import FastAPI
from app.config.tortoise_config import getTortoiseConfig
import logging
logger = logging.getLogger(__name__)
async def init():
# 初始化链接
logger.info("开始初始化数据库")
await Tortoise.init(config=getTortoiseConfig().dict())
# 生成数据库表数据
logger.info("开始生成数据库表")
await Tortoise.generate_schemas()
async def close():
await Tortoise.close_connections()
# 使用asynccontextmanager装饰器定义一个异步上下文管理器函数lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
# 开始apscheduler
scheduler.start()
logging.info("apscheduler启动完成")
await init()
logging.info("初始化数据库完成")
yield
# 在异步上下文管理器中,"退出上下文"时清理机器学习模型,释放资源
scheduler.shutdown()
logging.info("apscheduler关闭完成")
await close()
logging.info("关闭数据库完成")
File mode changed
No preview for this file type
No preview for this file type
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# 导入asynccontextmanager用于创建异步上下文管理器
# 创建一个scheduler实例
scheduler = AsyncIOScheduler()
# 每分钟执行的定时任务
@scheduler.scheduled_job('interval', minutes=1)
async def cron_job():
# 执行任务的内容,例如打印当前时间
print(f"The current time is {datetime.now()}")
version: 1
disable_existing_loggers: false
formatters:
standard:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: standard
stream: ext://sys.stderr
file:
class: logging.handlers.TimedRotatingFileHandler
level: INFO
formatter: standard
filename: 'logs/app.log' # 日志文件的路径和名称
when: midnight # 按天分割
interval: 1 # 每天分割一次
backupCount: 7 # 保留最近7天的日志文件
encoding: utf8 # 文件编码
loggers:
'':
handlers: ['console', 'file'] # 使用两个处理器:控制台和文件
level: INFO
propagate: true
This diff could not be displayed because it is too large.
import logging
import uvicorn
from fastapi import FastAPI
from app.router import spider_router
from app.config.app_config import getAppConfig
from app.event.fastapi_event import lifespan
from app.config.log_config import getLogConfig
app = FastAPI(lifespan=lifespan)
app.include_router(spider_router.spiderRouter)
if __name__ == '__main__':
appConf = getAppConfig()
uvicorn.run("main:app", host="0.0.0.0",
port=appConf.server.port,
log_config=getLogConfig())
logging.info(f"启动成功->{appConf.server}")
File mode changed
No preview for this file type
from datetime import datetime
from tortoise import Model, fields
from app.schemas.spider_schema import TypeEnum
class SpiderModel(Model):
id = fields.IntField(pk=True)
time = fields.CharField(max_length=10, description="数据时间 yyyy-MM-dd 格式的字符", regex=r'^\d{4}-\d{2}-\d{2}$')
targetType = fields.IntEnumField(TypeEnum, description="数据类型", source_field='target_type')
platform = fields.CharField(max_length=255, description="平台名字")
sourceName = fields.CharField(max_length=255, description="媒体资源名字", source_field='source_name')
score = fields.FloatField(description="热度得分")
createTime = fields.DatetimeField(default=datetime.now, description="创建时间", source_field='create_time')
class Meta:
table = "spider_data"
File mode changed
# router/users.py
import logging
from typing import Dict, Any
from fastapi import APIRouter, Path
from app.schemas.spider_schema import SpiderParams, TaskInfo
from app.service.spider_job_service import add_job, get_job_info, add_guduo_job
from app.spider import guduo_spider
logger = logging.getLogger(__name__)
spiderRouter = APIRouter(prefix="/spider", tags=["spider"])
@spiderRouter.post("/start/", summary="启动一个爬虫在后台运行")
async def read_users(param: SpiderParams):
logger.info(f"开始执行怕从参数是:{param}")
res = await guduo_spider.startBrowser(param)
logger.info("爬取数据完成")
return res
@spiderRouter.post("/add_guduo_job", summary="添加一个骨朵爬虫定时任务")
async def add_task_to_job(param: Dict[str, Any]):
jobParam = SpiderParams(**(param.get('taskParam')))
taskInfo = TaskInfo(**(param.get('taskInfo')))
add_guduo_job(jobParam, taskInfo)
return '添加骨朵爬虫任务成功'
@spiderRouter.get("/add_job/{taskId}", summary="添加一个定时任务")
async def add_test_job(taskId: int = Path(..., description="要查询的任务的唯一标识符")):
async def test(name: str):
logger.info(f"测试定时任务执行->{name}")
taskInfo = TaskInfo(taskId=taskId, taskStart=True,
executeStrategy='0 0/1 * * * ?',
url='www.baidu.com')
add_job(['test'], taskInfo, test)
@spiderRouter.get("/get_job/{taskId}", summary="获取一个定时任务")
async def get_job(taskId: int = Path(...)):
return get_job_info(taskId)
File mode changed
import ssl
from typing import Dict, List, Optional, Any
from pydantic import BaseModel, Field
def get_ctx():
ctx = ssl.create_default_context()
# And in this example we disable validation...
# Please don't do this. Look at the official Python ``ssl`` module documentation
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
class Credentials(BaseModel):
database: str
host: str
password: str
port: int
user: str
ssl: Any = Field(default_factory=get_ctx)
minsize: Optional[int] = None
maxsize: Optional[int] = None
connect_timeout: Optional[int] = None
echo: Optional[bool] = None
class Connection(BaseModel):
engine: str
credentials: Credentials
class App(BaseModel):
models: List[str]
default_connection: str
class TortoiseConfig(BaseModel):
connections: Dict[str, Connection]
apps: Dict[str, App]
log_queries: bool
class Port(BaseModel):
port: int
class AppConfig(BaseModel):
server: Port
from datetime import timedelta, datetime
from enum import IntEnum
from typing import List
from apscheduler.job import Job
from pydantic import BaseModel, Field
class TypeEnum(IntEnum):
EPISODES = 1 # 剧集
MOVIE = 2 # 电影
ANIME = 3 # 动漫
def get_precise_positioning(self, other) -> List[str]:
if self.ANIME == other:
return ['.cateIcon_5',
'li.anime-platform:nth-child(1)']
if self.MOVIE == other:
return ['.cateIcon_3',
'ul.type-box:nth-child(1) > li:nth-child(1)']
if self.EPISODES == other:
return ['.cateIcon_1',
'div.category:nth-child(1) > ul:nth-child(1) > li:nth-child(1)',
'ul.type-box:nth-child(1) > li:nth-child(1)']
class GuoDuoSpiderResult(BaseModel):
# time 数据时间 yyyy-MM-dd 格式的字符窜
time: str = Field(..., description="数据时间 yyyy-MM-dd 格式的字符", pattern='^\d{4}-\d{2}-\d{2}$')
# type '1剧集 2电影 3 动漫',
targetType: TypeEnum = Field(..., description="数据类型")
# platform 平台名字
platform: str
# source_name 媒体资源名字
sourceName: str
# 热度得分
score: float
# 创建时间
createTime: datetime
class Config:
from_attributes = False
class SpiderParams(BaseModel):
startDate: str = Field(..., description="数据时间 yyyy-MM-dd 格式的字符", pattern='^\d{4}-\d{2}-\d{2}$')
endDate: str = Field(..., description="数据时间 yyyy-MM-dd 格式的字符", pattern='^\d{4}-\d{2}-\d{2}$')
# type '1剧集 2电影 3 动漫',
target_type: List[TypeEnum]
# 爬取的地址
url: str
class Config:
from_attributes = False
def get_time_range(self) -> List[str]:
start_date = self.startDate
end_date = self.endDate
time_range = []
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d')
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d')
delta = end_date_obj - start_date_obj
for i in range(delta.days + 1):
day = start_date_obj + timedelta(days=i)
time_range.append(day.strftime('%Y-%m-%d'))
return time_range
class ApschedulerJob(BaseModel):
jobId: Job
taskId: int
class Config:
arbitrary_types_allowed = True
class TaskInfo(BaseModel):
taskStart: bool
executeStrategy: str
taskId: int
url: str
File mode changed
import asyncio
import logging
from typing import List, Any
from app.model.mysql_model import SpiderModel
from superstream import Stream
from app.job.job import scheduler
from app.schemas.spider_schema import ApschedulerJob, TaskInfo, SpiderParams, GuoDuoSpiderResult
from app.spider.guduo_spider import startBrowser
logger = logging.getLogger(__name__)
jobs: List[ApschedulerJob] = []
def add_job(params: List[Any], taskInfo: TaskInfo, task: Any):
cron = taskInfo.executeStrategy
cron_parameterization = cron.split(" ")
minute = cron_parameterization[1]
minute = (None if minute == "?" else minute)
hour = cron_parameterization[2]
hour = (None if hour == "?" else hour)
day = cron_parameterization[3]
day = (None if day == "?" else day)
month = cron_parameterization[4]
month = (None if month == "?" else month)
weekend = cron_parameterization[5]
weekend = (None if weekend == "?" else weekend)
job = scheduler.add_job(task, "cron",
minute=minute, hour=hour, day=day, month=month, week=weekend,
args=params)
jobs.append(ApschedulerJob(jobId=job, taskId=taskInfo.taskId))
logger.info(
f"添加任务成功,任务id:{taskInfo.taskId},任务执行策略:{taskInfo.executeStrategy}")
def add_guduo_job(taskParam: SpiderParams, taskInfo: TaskInfo):
"""
添加骨朵怕从任务到调度器中
:param taskParam: 骨朵爬虫参数
:param taskInfo: 任务信息
:return:
"""
add_job([taskParam], taskInfo, scrawl_and_save)
def delete_job(taskId: int):
for job in jobs:
if job.taskId == taskId:
job.jobId.remove()
jobs.remove(job)
logger.info(f"删除任务成功,任务id:{taskId}")
def get_job_info(taskId: int):
job = (Stream(jobs)
.filter(lambda x: x.taskId == taskId)
.find_first())
return f'job 信息->{job}'
async def scrawl_and_save(taskParam: SpiderParams):
# 执行爬虫获取结果
results = await startBrowser(taskParam)
asyncTasks = (save_or_update(item) for item in results)
await asyncio.gather(*asyncTasks)
logger.info(f'爬虫任务执行完成,爬取到数据{len(results)}条 保存到数据库完成')
async def save_or_update(result: GuoDuoSpiderResult):
time = result.time
targetType = result.targetType
platform = result.platform
sourceName = result.sourceName
score = result.score
createTime = result.createTime
# 使用SpiderModel 查询 time targetType platform sourceName 等于以上的值是否存在如果存在就更新不存在就插入
obj = await SpiderModel.get_or_none(
time=time,
targetType=targetType,
platform=platform,
sourceName=sourceName
)
if obj:
for key, value in result.__dict__.items():
setattr(obj, key, value)
else:
obj = await SpiderModel.create(
time=time,
targetType=targetType,
platform=platform,
sourceName=sourceName,
score=score,
createTime=createTime
)
await obj.save()
File mode changed
import asyncio
import time
from datetime import datetime
from typing import List, Optional
from playwright.async_api import Page, async_playwright
from superstream import Stream
from app.schemas.spider_schema import GuoDuoSpiderResult, SpiderParams, TypeEnum
import logging
logger = logging.getLogger(__name__)
async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
time_range = spiderParam.get_time_range()
url = spiderParam.url
scrawl_types = spiderParam.target_type
async with async_playwright() as p:
browser = await p.firefox.launch(headless=False)
# 创建指定个数的浏览器页面
pages = await asyncio.gather(*(browser.new_page()
for _ in range(len(time_range))))
# 同时循环time_range与pages 去调用hand_one_data异步方法
results = await asyncio.gather(*(hand_one_data(time_range[i], pages[i],
scrawl_types, url)
for i in range(len(time_range))))
return (Stream(results)
.filter(lambda x: x is not None)
.filter(lambda x: len(x) > 0)
.flat_map(lambda x: Stream(x))
.to_list())
async def hand_one_data(targetDate: str,
page: Page,
scrawl_types: List[TypeEnum],
url: str) -> List[GuoDuoSpiderResult]:
# 把date这个日期对象解析为年、月、日
year, month, day = targetDate.split('-')
day = day.replace('0', '')
month = int(month) - 1
await page.goto(url)
# 等待遮罩层消失
await page.click('//*[@id="currentDateTxt"]')
data_css_selector = (f'#day span.pika-button.pika-day[type="button"]'
f'[data-pika-year="{year}"]'
f'[data-pika-month="{month}"]'
f'[data-pika-day="{day}"]')
doc = page.locator(data_css_selector)
# 点击指定日期
await doc.click()
# 最后一步修正定位
css_selectors = (Stream(scrawl_types)
.map(lambda x: (x, x.get_precise_positioning(x)))
.group_by(lambda x: x[0]))
results = []
for key, value in css_selectors.items():
logger.info(f'开始爬取 {targetDate} 类型是{value[0][0]} 的数据')
for css_selector in value[0][1]:
await page.click(css_selector)
result = await get_data(page, key.value, targetDate)
if result:
Stream(result).for_each(lambda x: results.append(x))
return results
async def get_data(targetPage: Page, scrawl_type: int, target_time: str) -> Optional[List[GuoDuoSpiderResult]]:
target_data_css_selector = '.rank-box'
table = targetPage.locator(target_data_css_selector)
if table is None:
print(f'当前页面获取table为空:{scrawl_type}')
return None
# 获取当前这个doc_obj元素下面所有的tr标签
trs = table.locator('tr')
if trs is None:
print(f'当前页面获取tr为空:{scrawl_type}')
# 循环遍历trs下面的每个tr标签下面的td标签
at = await trs.all_text_contents()
result = (Stream(at)
.filter(lambda x: x.strip() != '')
.filter(lambda x: len(x.split(' ')) > 6)
.map(lambda x: GuoDuoSpiderResult(time=target_time,
targetType=TypeEnum(scrawl_type),
createTime=datetime.now(),
platform='all',
score=float(x.split(' ')[-1]),
sourceName=x.split(' ')[5]))
.to_list())
return result
if __name__ == '__main__':
# 获取程序开始执行的时间
start_time = time.time()
param = SpiderParams(startDate='2024-01-02',
endDate='2024-01-05',
target_type=[TypeEnum.ANIME, TypeEnum.EPISODES, TypeEnum.MOVIE]
, url='http://guduodata.com/')
res = asyncio.run(startBrowser(param))
print(f'程序执行耗时时间:{time.time() - start_time} 长度为:{len(res)}')
for it in res:
print(it)
aiosqlite==0.20.0
annotated-types==0.7.0
anyio==4.7.0
APScheduler==3.11.0
asyncmy==0.2.10
click==8.1.7
exceptiongroup==1.2.2
fastapi==0.115.6
greenlet==3.1.1
h11==0.14.0
idna==3.10
iso8601==2.1.0
playwright==1.49.1
pydantic==2.10.4
pydantic_core==2.27.2
pyee==12.0.0
pypika-tortoise==0.3.2
pytz==2024.2
PyYAML==6.0.2
sniffio==1.3.1
starlette==0.41.3
SuperStream==0.2.6
tortoise-orm==0.22.2
typing_extensions==4.12.2
tzlocal==5.2
uvicorn==0.34.0