http_spider.py 3.66 KB
import asyncio
import datetime
import time
from typing import List, Dict, Tuple
import aiohttp
from tenacity import retry, stop_after_attempt, before_sleep_log, wait_exponential, after_log
import logging
from app.schemas.config_schema import UrlTemplateInfo
from app.schemas.spider_schema import SpiderParams, TypeEnum, GuoDuoSpiderResult, ResponseModel
from app.config.app_config import getAppConfig
from superstream import Stream

logger = logging.getLogger(__name__)


def build_request_url(spiderParam: SpiderParams) -> Dict[TypeEnum, List[Tuple[str, str]]]:
    base_url = spiderParam.url
    targets = spiderParam.target_type
    templates: List[UrlTemplateInfo] = getAppConfig().urlTemplate
    templates_group: Dict[int, str] = Stream(templates).to_map(lambda x: x.type, lambda x: x.template)
    # 时间范围中的每个时间
    time_range = spiderParam.get_time_range()
    target_urls: Dict[TypeEnum, List[Tuple[str, str]]] = {}
    for target in targets:
        template = templates_group[target.value]
        url_type = [(base_url + template.format(date=item), item) for item in time_range]
        target_urls[target] = url_type
    return target_urls


@retry(
    stop=stop_after_attempt(3),
    before_sleep=before_sleep_log(logger, logging.DEBUG),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    after=after_log(logger, logging.DEBUG),
)
async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()


async def fetch_and_parse(urls: List[Tuple[str, str]]) -> Dict[str, ResponseModel]:
    async with aiohttp.ClientSession() as session:
        requests = [fetch(session, url[0]) for url in urls]
        results = await asyncio.gather(*requests)
        return_data = {}
        for k, v in zip(urls, results):
            return_data[k[1]] = ResponseModel(**v)
        return return_data


def parse_response_to_spider_result(guduo_responses: Dict[str, ResponseModel], dataType: TypeEnum) -> List[GuoDuoSpiderResult]:
    results: List[GuoDuoSpiderResult] = []
    for k, v in guduo_responses.items():
        time_request = k
        targetType = dataType
        platform = 'ALL'
        createTime = datetime.datetime.now()
        for item in v.data:
            sourceName = item.name
            score = item.gdiFloat
            results.append(GuoDuoSpiderResult(time=time_request, targetType=targetType,
                                              platform=platform, sourceName=sourceName,
                                              score=score, createTime=createTime))
    return results


async def batch_fetch_and_parse(urls: List[Tuple[str, str]], enum: TypeEnum):
    result = await fetch_and_parse(urls)
    return parse_response_to_spider_result(result, enum)


async def get_score_data(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
    """
    获取评分数据
    :param spiderParam: 爬取参数
    :return:
    """
    url_infos = build_request_url(spiderParam)
    tasks = [batch_fetch_and_parse(urls, key) for key, urls in url_infos.items()]
    results = await asyncio.gather(*tasks)
    spider_datas = [item for sublist in results for item in sublist]
    return spider_datas


if __name__ == '__main__':
    # 获取程序开始执行的时间
    start_time = time.time()
    param = SpiderParams(
        startDate="2024-12-22",
        endDate="2024-12-23",
        target_type=[
            TypeEnum.ANIME,
            TypeEnum.EPISODES,
            TypeEnum.MOVIE,
            TypeEnum.VARIETY,
        ],
        url="http://guduodata.com",
    )
    res = asyncio.run(get_score_data(param))
    print(f"程序执行耗时时间:{(time.time() - start_time) / 1000}长度为:{len(res)}")
    print(res)