guduo_spider.py
4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import asyncio
import time
from datetime import datetime
from typing import List, Optional
from playwright.async_api import Page, async_playwright
from superstream import Stream
from app.schemas.spider_schema import GuoDuoSpiderResult, SpiderParams, TypeEnum
import logging
from tqdm.asyncio import tqdm
logger = logging.getLogger(__name__)
async def startBrowser(spiderParam: SpiderParams) -> List[GuoDuoSpiderResult]:
time_range = spiderParam.get_time_range()
url = spiderParam.url
scrawl_types = spiderParam.target_type
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
# 创建指定个数的浏览器页面
page_tasks = (browser.new_page() for _ in range(len(time_range)))
pages_generator = tqdm.as_completed(page_tasks)
pages = list(pages_generator)
# 同时循环time_range与pages 去调用hand_one_data异步方法
hand_result_tasks = (
hand_one_data(time_range[i], pages[i], scrawl_types, url)
for i in range(len(time_range))
)
results__generator = tqdm.as_completed(hand_result_tasks)
results = list(results__generator)
return (
Stream(results)
.filter(lambda x: x is not None)
.filter(lambda x: len(x) > 0)
.flat_map(lambda x: Stream(x))
.to_list()
)
async def hand_one_data(
targetDate: str, page: Page, scrawl_types: List[TypeEnum], url: str
) -> List[GuoDuoSpiderResult]:
# 把date这个日期对象解析为年、月、日
year, month, day = targetDate.split("-")
day = day.replace("0", "")
month = int(month) - 1
await page.goto(url)
# 等待遮罩层消失
await page.click('//*[@id="currentDateTxt"]')
data_css_selector = (
f'#day span.pika-button.pika-day[type="button"]'
f'[data-pika-year="{year}"]'
f'[data-pika-month="{month}"]'
f'[data-pika-day="{day}"]'
)
doc = page.locator(data_css_selector)
# 判断指定元素是否存在如果不存在就返回空的[]
if not await doc.is_visible():
return []
# 点击指定日期
await doc.click()
# 最后一步修正定位
css_selectors = (
Stream(scrawl_types)
.map(lambda x: (x, x.get_precise_positioning(x)))
.group_by(lambda x: x[0])
)
results = []
for key, value in css_selectors.items():
logger.info(f"开始爬取 {targetDate} 类型是{value[0][0]} 的数据")
for css_selector in value[0][1]:
await page.click(css_selector)
result = await get_data(page, key.value, targetDate)
if result:
Stream(result).for_each(lambda x: results.append(x))
return results
async def get_data(
targetPage: Page, scrawl_type: int, target_time: str
) -> Optional[List[GuoDuoSpiderResult]]:
target_data_css_selector = ".rank-box"
table = targetPage.locator(target_data_css_selector)
if table is None:
logger.info(f"当前页面获取table为空:{scrawl_type}")
return None
# 获取当前这个doc_obj元素下面所有的tr标签
trs = table.locator("tr")
if trs is None:
logger.info(f"当前页面获取tr为空:{scrawl_type}")
# 循环遍历trs下面的每个tr标签下面的td标签
at = await trs.all_text_contents()
result = (
Stream(at)
.filter(lambda x: x.strip() != "")
.filter(lambda x: len(x.split(" ")) > 6)
.map(
lambda x: GuoDuoSpiderResult(
time=target_time,
targetType=TypeEnum(scrawl_type),
createTime=datetime.now(),
platform="all",
score=float(x.split(" ")[-1]),
sourceName=x.split(" ")[5],
)
)
.to_list()
)
return result
if __name__ == "__main__":
# 获取程序开始执行的时间
start_time = time.time()
param = SpiderParams(
startDate="1991-01-02",
endDate="1991-01-05",
target_type=[TypeEnum.ANIME, TypeEnum.EPISODES, TypeEnum.MOVIE],
url="http://guduodata.com/",
)
res = asyncio.run(startBrowser(param))
print(f"程序执行耗时时间:{time.time() - start_time} 长度为:{len(res)}")
for it in res:
print(it)