Commit 8afa2bc6 8afa2bc6e3feca80a55c24035b2bfb257d2310ff by wenxin

代码优化

1 parent 7acfaa5f
......@@ -97,10 +97,10 @@ def get_dates() -> Tuple[str, str]:
:return: Tuple[str, str]
"""
today = datetime.today()
yesterday = today - timedelta(days=1)
today_str = today.strftime("%Y-%m-%d")
yesterday_str = yesterday.strftime("%Y-%m-%d")
return today_str, yesterday_str
start = today - timedelta(days=4)
end = today.strftime("%Y-%m-%d")
start_str = start.strftime("%Y-%m-%d")
return end, start_str
if __name__ == "__main__":
......
......@@ -9,6 +9,7 @@ from app.schemas.config_schema import UrlTemplateInfo
from app.schemas.spider_schema import SpiderParams, TypeEnum, GuoDuoSpiderResult, ResponseModel
from app.config.app_config import getAppConfig
from superstream import Stream
from tqdm.asyncio import tqdm_asyncio
logger = logging.getLogger(__name__)
......@@ -28,28 +29,25 @@ def build_request_url(spiderParam: SpiderParams) -> Dict[TypeEnum, List[Tuple[st
return target_urls
@retry(
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.DEBUG),
wait=wait_exponential(multiplier=1, min=1, max=10),
after=after_log(logger, logging.DEBUG),
)
async def fetch(session, url):
async with session.get(url) as response:
if response.status != 200:
logger.error(f"请求失败,状态码为:{response.status}")
else:
return await response.json()
async def fetch_and_parse(urls: List[Tuple[str, str]]) -> Dict[str, ResponseModel]:
async with aiohttp.ClientSession() as session:
requests = [fetch(session, url[0]) for url in urls]
results = await asyncio.gather(*requests)
results = await tqdm_asyncio.gather(*requests)
return_data = {}
for k, v in zip(urls, results):
return_data[k[1]] = ResponseModel(**v)
return return_data
def parse_response_to_spider_result(guduo_responses: Dict[str, ResponseModel], dataType: TypeEnum) -> List[GuoDuoSpiderResult]:
def convert_results(guduo_responses: Dict[str, ResponseModel], dataType: TypeEnum) -> List[GuoDuoSpiderResult]:
results: List[GuoDuoSpiderResult] = []
for k, v in guduo_responses.items():
time_request = k
......@@ -62,24 +60,33 @@ def parse_response_to_spider_result(guduo_responses: Dict[str, ResponseModel], d
results.append(GuoDuoSpiderResult(time=time_request, targetType=targetType,
platform=platform, sourceName=sourceName,
score=score, createTime=createTime))
logger.info(f"处理类型是{dataType}的数据,时间是{time_request},获取结果长度为{len(v.data)}")
return results
async def batch_fetch_and_parse(urls: List[Tuple[str, str]], enum: TypeEnum):
result = await fetch_and_parse(urls)
return parse_response_to_spider_result(result, enum)
return convert_results(result, enum)
@retry(
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.DEBUG),
wait=wait_exponential(multiplier=1, min=1, max=10),
after=after_log(logger, logging.DEBUG),
)
async def get_score_data(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
"""
获取评分数据
:param spiderParam: 爬取参数
:return:
"""
logger.info(f"开始爬取数据,爬取参数为:{spiderParam}")
url_infos = build_request_url(spiderParam)
tasks = [batch_fetch_and_parse(urls, key) for key, urls in url_infos.items()]
results = await asyncio.gather(*tasks)
results = await tqdm_asyncio.gather(*tasks)
spider_datas = [item for sublist in results for item in sublist]
logger.info(f"爬取数据结束,共爬取{len(spider_datas)}条数据")
return spider_datas
......@@ -97,6 +104,7 @@ if __name__ == '__main__':
],
url="http://guduodata.com",
)
print(param.get_time_range())
res = asyncio.run(get_score_data(param))
print(f"程序执行耗时时间:{(time.time() - start_time) / 1000}长度为:{len(res)}")
print(res)
......