guduo_spider.py
4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import asyncio
import time
from datetime import datetime
from typing import List, Optional
from playwright.async_api import Page, async_playwright
from superstream import Stream
from app.schemas.spider_schema import GuoDuoSpiderResult, SpiderParams, TypeEnum
import logging
from tqdm.asyncio import tqdm_asyncio
logger = logging.getLogger(__name__)
async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
time_range = spiderParam.get_time_range()
url = spiderParam.url
scrawl_types = spiderParam.target_type
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
# 创建指定个数的浏览器页面
pages = await tqdm_asyncio.gather(*(browser.new_page()
for _ in range(len(time_range))))
# 同时循环time_range与pages 去调用hand_one_data异步方法
results = await tqdm_asyncio.gather(*(hand_one_data(time_range[i], pages[i],
scrawl_types, url)
for i in range(len(time_range))))
return (Stream(results)
.filter(lambda x: x is not None)
.filter(lambda x: len(x) > 0)
.flat_map(lambda x: Stream(x))
.to_list())
async def hand_one_data(targetDate: str,
page: Page,
scrawl_types: List[TypeEnum],
url: str) -> List[GuoDuoSpiderResult]:
# 把date这个日期对象解析为年、月、日
year, month, day = targetDate.split('-')
day = day.replace('0', '')
month = int(month) - 1
await page.goto(url)
# 等待遮罩层消失
await page.click('//*[@id="currentDateTxt"]')
data_css_selector = (f'#day span.pika-button.pika-day[type="button"]'
f'[data-pika-year="{year}"]'
f'[data-pika-month="{month}"]'
f'[data-pika-day="{day}"]')
doc = page.locator(data_css_selector)
# 判断指定元素是否存在如果不存在就返回空的[]
if not await doc.is_visible():
return []
# 点击指定日期
await doc.click()
# 最后一步修正定位
css_selectors = (Stream(scrawl_types)
.map(lambda x: (x, x.get_precise_positioning(x)))
.group_by(lambda x: x[0]))
results = []
for key, value in css_selectors.items():
logger.info(f'开始爬取 {targetDate} 类型是{value[0][0]} 的数据')
for css_selector in value[0][1]:
await page.click(css_selector)
result = await get_data(page, key.value, targetDate)
if result:
Stream(result).for_each(lambda x: results.append(x))
return results
async def get_data(targetPage: Page, scrawl_type: int, target_time: str) -> Optional[List[GuoDuoSpiderResult]]:
target_data_css_selector = '.rank-box'
table = targetPage.locator(target_data_css_selector)
if table is None:
print(f'当前页面获取table为空:{scrawl_type}')
return None
# 获取当前这个doc_obj元素下面所有的tr标签
trs = table.locator('tr')
if trs is None:
print(f'当前页面获取tr为空:{scrawl_type}')
# 循环遍历trs下面的每个tr标签下面的td标签
at = await trs.all_text_contents()
result = (Stream(at)
.filter(lambda x: x.strip() != '')
.filter(lambda x: len(x.split(' ')) > 6)
.map(lambda x: GuoDuoSpiderResult(time=target_time,
targetType=TypeEnum(scrawl_type),
createTime=datetime.now(),
platform='all',
score=float(x.split(' ')[-1]),
sourceName=x.split(' ')[5]))
.to_list())
return result
if __name__ == '__main__':
# 获取程序开始执行的时间
start_time = time.time()
param = SpiderParams(startDate='2024-01-02',
endDate='2024-01-15',
target_type=[TypeEnum.ANIME, TypeEnum.EPISODES, TypeEnum.MOVIE]
, url='http://guduodata.com/')
res = asyncio.run(startBrowser(param))
print(f'程序执行耗时时间:{(time.time() - start_time)/1000}长度为:{len(res)}')
for it in res:
print(it)