guduo_spider.py 4.44 KB
import asyncio
import time
from datetime import datetime
from typing import List, Optional
from playwright.async_api import Page, async_playwright
from superstream import Stream
from app.schemas.spider_schema import GuoDuoSpiderResult, SpiderParams, TypeEnum
import logging
from tqdm.asyncio import tqdm_asyncio

logger = logging.getLogger(__name__)


async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
    time_range = spiderParam.get_time_range()
    url = spiderParam.url
    scrawl_types = spiderParam.target_type
    async with async_playwright() as p:
        browser = await p.firefox.launch(headless=True)
        # 创建指定个数的浏览器页面
        pages = await tqdm_asyncio.gather(*(browser.new_page()
                                       for _ in range(len(time_range))))
        # 同时循环time_range与pages 去调用hand_one_data异步方法
        results = await tqdm_asyncio.gather(*(hand_one_data(time_range[i], pages[i],
                                                       scrawl_types, url)
                                         for i in range(len(time_range))))
        return (Stream(results)
                .filter(lambda x: x is not None)
                .filter(lambda x: len(x) > 0)
                .flat_map(lambda x: Stream(x))
                .to_list())


async def hand_one_data(targetDate: str,
                        page: Page,
                        scrawl_types: List[TypeEnum],
                        url: str) -> List[GuoDuoSpiderResult]:
    # 把date这个日期对象解析为年、月、日
    year, month, day = targetDate.split('-')
    day = day.replace('0', '')
    month = int(month) - 1
    await page.goto(url)
    # 等待遮罩层消失
    await page.click('//*[@id="currentDateTxt"]')
    data_css_selector = (f'#day span.pika-button.pika-day[type="button"]'
                         f'[data-pika-year="{year}"]'
                         f'[data-pika-month="{month}"]'
                         f'[data-pika-day="{day}"]')
    doc = page.locator(data_css_selector)
    # 判断指定元素是否存在如果不存在就返回空的[]
    if not await doc.is_visible():
        return []
    # 点击指定日期
    await doc.click()
    # 最后一步修正定位
    css_selectors = (Stream(scrawl_types)
                     .map(lambda x: (x, x.get_precise_positioning(x)))
                     .group_by(lambda x: x[0]))
    results = []
    for key, value in css_selectors.items():
        logger.info(f'开始爬取 {targetDate} 类型是{value[0][0]} 的数据')
        for css_selector in value[0][1]:
            await page.click(css_selector)
        result = await get_data(page, key.value, targetDate)
        if result:
            Stream(result).for_each(lambda x: results.append(x))
    return results


async def get_data(targetPage: Page, scrawl_type: int, target_time: str) -> Optional[List[GuoDuoSpiderResult]]:
    target_data_css_selector = '.rank-box'
    table = targetPage.locator(target_data_css_selector)
    if table is None:
        print(f'当前页面获取table为空:{scrawl_type}')
        return None
    # 获取当前这个doc_obj元素下面所有的tr标签
    trs = table.locator('tr')
    if trs is None:
        print(f'当前页面获取tr为空:{scrawl_type}')
    # 循环遍历trs下面的每个tr标签下面的td标签
    at = await trs.all_text_contents()
    result = (Stream(at)
              .filter(lambda x: x.strip() != '')
              .filter(lambda x: len(x.split(' ')) > 6)
              .map(lambda x: GuoDuoSpiderResult(time=target_time,
                                                targetType=TypeEnum(scrawl_type),
                                                createTime=datetime.now(),
                                                platform='all',
                                                score=float(x.split(' ')[-1]),
                                                sourceName=x.split(' ')[5]))
              .to_list())
    return result


if __name__ == '__main__':
    # 获取程序开始执行的时间
    start_time = time.time()
    param = SpiderParams(startDate='2024-01-02',
                         endDate='2024-01-15',
                         target_type=[TypeEnum.ANIME, TypeEnum.EPISODES, TypeEnum.MOVIE]
                         , url='http://guduodata.com/')
    res = asyncio.run(startBrowser(param))
    print(f'程序执行耗时时间:{(time.time() - start_time)/1000}长度为:{len(res)}')
    for it in res:
        print(it)