guduo_spider.py 4.26 KB
import asyncio
import time
from datetime import datetime
from typing import List, Optional
from playwright.async_api import Page, async_playwright
from superstream import Stream
from app.schemas.spider_schema import GuoDuoSpiderResult, SpiderParams, TypeEnum
import logging
from tqdm.asyncio import tqdm

logger = logging.getLogger(__name__)


async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
    time_range = spiderParam.get_time_range()
    url = spiderParam.url
    scrawl_types = spiderParam.target_type
    async with async_playwright() as p:
        browser = await p.firefox.launch(headless=True)
        # 创建指定个数的浏览器页面
        page_tasks = (browser.new_page() for _ in range(len(time_range)))
        pages_generator = tqdm.as_completed(page_tasks)
        pages = list(pages_generator)
        # 同时循环time_range与pages 去调用hand_one_data异步方法
        hand_result_tasks = (
            hand_one_data(time_range[i], pages[i], scrawl_types, url)
            for i in range(len(time_range))
        )
        results__generator = tqdm.as_completed(hand_result_tasks)
        results = list(results__generator)
        return (
            Stream(results)
            .filter(lambda x: x is not None)
            .filter(lambda x: len(x) > 0)
            .flat_map(lambda x: Stream(x))
            .to_list()
        )


async def hand_one_data(
        targetDate: str, page: Page, scrawl_types: List[TypeEnum], url: str
) -> List[GuoDuoSpiderResult]:
    # 把date这个日期对象解析为年、月、日
    year, month, day = targetDate.split("-")
    day = day.replace("0", "")
    month = int(month) - 1
    await page.goto(url)
    # 等待遮罩层消失
    await page.click('//*[@id="currentDateTxt"]')
    data_css_selector = (
        f'#day span.pika-button.pika-day[type="button"]'
        f'[data-pika-year="{year}"]'
        f'[data-pika-month="{month}"]'
        f'[data-pika-day="{day}"]'
    )
    doc = page.locator(data_css_selector)
    # 判断指定元素是否存在如果不存在就返回空的[]
    if not await doc.is_visible():
        return []
    # 点击指定日期
    await doc.click()
    # 最后一步修正定位
    css_selectors = (
        Stream(scrawl_types)
        .map(lambda x: (x, x.get_precise_positioning(x)))
        .group_by(lambda x: x[0])
    )
    results = []
    for key, value in css_selectors.items():
        logger.info(f"开始爬取 {targetDate} 类型是{value[0][0]} 的数据")
        for css_selector in value[0][1]:
            await page.click(css_selector)
        result = await get_data(page, key.value, targetDate)
        if result:
            Stream(result).for_each(lambda x: results.append(x))
    return results


async def get_data(
        targetPage: Page, scrawl_type: int, target_time: str
) -> Optional[List[GuoDuoSpiderResult]]:
    target_data_css_selector = ".rank-box"
    table = targetPage.locator(target_data_css_selector)
    if table is None:
        logger.info(f"当前页面获取table为空:{scrawl_type}")
        return None
    # 获取当前这个doc_obj元素下面所有的tr标签
    trs = table.locator("tr")
    if trs is None:
        logger.info(f"当前页面获取tr为空:{scrawl_type}")
    # 循环遍历trs下面的每个tr标签下面的td标签
    at = await trs.all_text_contents()
    result = (
        Stream(at)
        .filter(lambda x: x.strip() != "")
        .filter(lambda x: len(x.split(" ")) > 6)
        .map(
            lambda x: GuoDuoSpiderResult(
                time=target_time,
                targetType=TypeEnum(scrawl_type),
                createTime=datetime.now(),
                platform="all",
                score=float(x.split(" ")[-1]),
                sourceName=x.split(" ")[5],
            )
        )
        .to_list()
    )
    return result


if __name__ == "__main__":
    # 获取程序开始执行的时间
    start_time = time.time()
    param = SpiderParams(
        startDate="1991-01-02",
        endDate="1991-01-05",
        target_type=[TypeEnum.ANIME, TypeEnum.EPISODES, TypeEnum.MOVIE],
        url="http://guduodata.com/",
    )
    res = asyncio.run(startBrowser(param))
    print(f"程序执行耗时时间:{time.time() - start_time} 长度为:{len(res)}")
    for it in res:
        print(it)