http_spider.py
3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
import datetime
import time
from typing import List, Dict, Tuple
import aiohttp
from tenacity import retry, stop_after_attempt, before_sleep_log, wait_exponential, after_log
import logging
from app.schemas.config_schema import UrlTemplateInfo
from app.schemas.spider_schema import SpiderParams, TypeEnum, GuoDuoSpiderResult, ResponseModel
from app.config.app_config import getAppConfig
from superstream import Stream
from tqdm.asyncio import tqdm_asyncio
logger = logging.getLogger(__name__)
def build_request_url(spiderParam: SpiderParams) -> Dict[TypeEnum, List[Tuple[str, str]]]:
base_url = spiderParam.url
targets = spiderParam.target_type
templates: List[UrlTemplateInfo] = getAppConfig().urlTemplate
templates_group: Dict[int, str] = Stream(templates).to_map(lambda x: x.type, lambda x: x.template)
# 时间范围中的每个时间
time_range = spiderParam.get_time_range()
target_urls: Dict[TypeEnum, List[Tuple[str, str]]] = {}
for target in targets:
template = templates_group[target.value]
url_type = [(base_url + template.format(date=item), item) for item in time_range]
target_urls[target] = url_type
return target_urls
async def fetch(session, url):
async with session.get(url) as response:
if response.status != 200:
logger.error(f"请求失败,状态码为:{response.status}")
else:
return await response.json()
async def fetch_and_parse(urls: List[Tuple[str, str]]) -> Dict[str, ResponseModel]:
async with aiohttp.ClientSession() as session:
requests = [fetch(session, url[0]) for url in urls]
results = await tqdm_asyncio.gather(*requests)
return_data = {}
for k, v in zip(urls, results):
return_data[k[1]] = ResponseModel(**v)
return return_data
def convert_results(guduo_responses: Dict[str, ResponseModel], dataType: TypeEnum) -> List[GuoDuoSpiderResult]:
results: List[GuoDuoSpiderResult] = []
for k, v in guduo_responses.items():
time_request = k
targetType = dataType
platform = 'ALL'
createTime = datetime.datetime.now()
for item in v.data:
sourceName = item.name
score = item.gdiFloat
results.append(GuoDuoSpiderResult(time=time_request, targetType=targetType,
platform=platform, sourceName=sourceName,
score=score, createTime=createTime))
logger.info(f"处理类型是{dataType}的数据,时间是{time_request},获取结果长度为{len(v.data)}")
return results
async def batch_fetch_and_parse(urls: List[Tuple[str, str]], enum: TypeEnum):
result = await fetch_and_parse(urls)
return convert_results(result, enum)
@retry(
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.DEBUG),
wait=wait_exponential(multiplier=1, min=1, max=10),
after=after_log(logger, logging.DEBUG),
)
async def get_score_data(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
"""
获取评分数据
:param spiderParam: 爬取参数
:return:
"""
logger.info(f"开始爬取数据,爬取参数为:{spiderParam}")
url_infos = build_request_url(spiderParam)
tasks = [batch_fetch_and_parse(urls, key) for key, urls in url_infos.items()]
results = await tqdm_asyncio.gather(*tasks)
spider_datas = [item for sublist in results for item in sublist]
logger.info(f"爬取数据结束,共爬取{len(spider_datas)}条数据")
return spider_datas