guduo_spider.py 4.57 KB
import asyncio
import time
from datetime import datetime
from typing import List, Optional
from playwright.async_api import Page, async_playwright
from superstream import Stream
from app.schemas.spider_schema import GuoDuoSpiderResult, SpiderParams, TypeEnum
import logging
from tqdm.asyncio import tqdm_asyncio
from tenacity import (
    after_log,
    before_sleep_log,
    retry,
    stop_after_attempt,
    wait_exponential,
    wait_fixed,
)

logger = logging.getLogger(__name__)


@retry(
    stop=stop_after_attempt(3),
    before_sleep=before_sleep_log(logger, logging.DEBUG),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    after=after_log(logger, logging.DEBUG),
)
async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
    time_range = spiderParam.get_time_range()
    url = spiderParam.url
    scrawl_types = spiderParam.target_type
    async with async_playwright() as p:
        browser = await p.firefox.launch(headless=True)
        # 创建指定个数的浏览器页面
        pages = await tqdm_asyncio.gather(
            *(browser.new_page() for _ in range(len(time_range)))
        )
        # 同时循环time_range与pages 去调用hand_one_data异步方法
        results = await tqdm_asyncio.gather(
            *(
                hand_one_data(time_range[i], pages[i], scrawl_types, url)
                for i in range(len(time_range))
            )
        )
        return (
            Stream(results)
            .filter(lambda x: x is not None)
            .filter(lambda x: len(x) > 0)
            .flat_map(lambda x: Stream(x))
            .to_list()
        )


async def hand_one_data(
    targetDate: str, page: Page, scrawl_types: List[TypeEnum], url: str
) -> List[GuoDuoSpiderResult]:
    # 把date这个日期对象解析为年、月、日
    year, month, day = targetDate.split("-")
    day = day.replace("0", "")
    month = int(month) - 1
    await page.goto(url)
    # 等待遮罩层消失
    await page.click('//*[@id="currentDateTxt"]')
    data_css_selector = (
        f'#day span.pika-button.pika-day[type="button"]'
        f'[data-pika-year="{year}"]'
        f'[data-pika-month="{month}"]'
        f'[data-pika-day="{day}"]'
    )
    doc = page.locator(data_css_selector)
    # 判断指定元素是否存在如果不存在就返回空的[]
    if not await doc.is_visible():
        return []
    # 点击指定日期
    await doc.click()
    # 最后一步修正定位
    css_selectors = (
        Stream(scrawl_types)
        .map(lambda x: (x, x.get_precise_positioning(x)))
        .group_by(lambda x: x[0])
    )
    results = []
    for key, value in css_selectors.items():
        logger.info(f"开始爬取 {targetDate} 类型是{value[0][0]} 的数据")
        for css_selector in value[0][1]:
            await page.click(css_selector)
        result = await get_data(page, key.value, targetDate)
        if result:
            Stream(result).for_each(lambda x: results.append(x))
    return results


async def get_data(
    targetPage: Page, scrawl_type: int, target_time: str
) -> Optional[List[GuoDuoSpiderResult]]:
    target_data_css_selector = ".rank-box"
    table = targetPage.locator(target_data_css_selector)
    if table is None:
        print(f"当前页面获取table为空:{scrawl_type}")
        return None
    # 获取当前这个doc_obj元素下面所有的tr标签
    trs = table.locator("tr")
    if trs is None:
        print(f"当前页面获取tr为空:{scrawl_type}")
    # 循环遍历trs下面的每个tr标签下面的td标签
    at = await trs.all_text_contents()
    result = (
        Stream(at)
        .filter(lambda x: x.strip() != "")
        .filter(lambda x: len(x.split(" ")) > 6)
        .map(
            lambda x: GuoDuoSpiderResult(
                time=target_time,
                targetType=TypeEnum(scrawl_type),
                createTime=datetime.now(),
                platform="all",
                score=float(x.split(" ")[-1]),
                sourceName=x.split(" ")[5],
            )
        )
        .to_list()
    )
    return result


if __name__ == "__main__":
    # 获取程序开始执行的时间
    start_time = time.time()
    param = SpiderParams(
        startDate="2024-01-02",
        endDate="2024-01-15",
        target_type=[
            TypeEnum.ANIME,
            TypeEnum.EPISODES,
            TypeEnum.MOVIE,
            TypeEnum.VARIETY,
        ],
        url="http://guduodata.com/",
    )
    res = asyncio.run(startBrowser(param))
    print(f"代码重试情况:{startBrowser.statistics}")
    print(f"程序执行耗时时间:{(time.time() - start_time)/1000}长度为:{len(res)}")