Merge pull request #1 from gaoxiaobei/dev

Enhance robustness.
This commit is contained in:
gaoxiaobei
2025-07-17 06:45:55 +08:00
committed by GitHub
4 changed files with 212 additions and 109 deletions

View File

@@ -87,7 +87,7 @@ START_PAGE = 1
CRAWLER_MAX_NOTES_COUNT = 200
# 每天爬取视频/帖子的数量控制
MAX_NOTES_PER_DAY = 20
MAX_NOTES_PER_DAY = 1
# 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 1
@@ -216,16 +216,17 @@ STOP_WORDS_FILE = "./docs/hit_stopwords.txt"
# 中文字体文件路径
FONT_PATH = "./docs/STZHONGS.TTF"
# 爬取开始的天数,仅支持 bilibili 关键字搜索YYYY-MM-DD 格式,若为 None 则表示不设置时间范围,按照默认关键字最多返回 1000 条视频的结果处理
# 爬取开始的天数,仅支持 bilibili 关键字搜索YYYY-MM-DD 格式
START_DAY = "2024-01-01"
# 爬取结束的天数,仅支持 bilibili 关键字搜索YYYY-MM-DD 格式,若为 None 则表示不设置时间范围,按照默认关键字最多返回 1000 条视频的结果处理
# 爬取结束的天数,仅支持 bilibili 关键字搜索YYYY-MM-DD 格式
END_DAY = "2024-01-01"
# 是否开启按每一天进行爬取的选项,仅支持 bilibili 关键字搜索
# 若为 False则忽略 START_DAY 与 END_DAY 设置的值
# 若为 True则按照 START_DAY END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频
ALL_DAY = False
# Bilibili 搜索模式,仅在 CRAWLER_TYPE="search" 时生效
# "normal": 不指定时间范围进行搜索最多返回约1000条结果。
# "all_in_time_range": 在 START_DAY END_DAY 指定的时间范围内,尽可能多地爬取数据,每日上限受 MAX_NOTES_PER_DAY 影响,但总数可能超过 CRAWLER_MAX_NOTES_COUNT。
# "daily_limit_in_time_range": 在指定时间范围内,严格遵守 MAX_NOTES_PER_DAY 的每日上限和 CRAWLER_MAX_NOTES_COUNT 的总上限。
BILI_SEARCH_MODE = "normal"
#!!! 下面仅支持 bilibili creator搜索
# 爬取评论creator主页还是爬取creator动态和关系列表(True为前者)

View File

@@ -45,6 +45,11 @@ class CrawlerFactory:
return crawler_class()
async def main():
# Init crawler
crawler: Optional[AbstractCrawler] = None
try:
# parse cmd
await cmd_arg.parse_cmd()
# parse cmd
await cmd_arg.parse_cmd()
@@ -59,11 +64,11 @@ async def main():
if config.SAVE_DATA_OPTION in ["db", "sqlite"]:
await db.close()
if __name__ == '__main__':
try:
# asyncio.run(main())
asyncio.get_event_loop().run_until_complete(main())
except KeyboardInterrupt:
print("\n[main] Caught keyboard interrupt, exiting.")
sys.exit()

View File

@@ -15,6 +15,7 @@
# @Desc : bilibili 请求客户端
import asyncio
import json
import random
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urlencode
@@ -53,7 +54,11 @@ class BilibiliClient(AbstractApiClient):
method, url, timeout=self.timeout,
**kwargs
)
data: Dict = response.json()
try:
data: Dict = response.json()
except json.JSONDecodeError:
utils.logger.error(f"[BilibiliClient.request] Failed to decode JSON from response. status_code: {response.status_code}, response_text: {response.text}")
raise DataFetchError(f"Failed to decode JSON, content: {response.text}")
if data.get("code") != 0:
raise DataFetchError(data.get("message", "unkonw error"))
else:
@@ -78,8 +83,12 @@ class BilibiliClient(AbstractApiClient):
:return:
"""
local_storage = await self.playwright_page.evaluate("() => window.localStorage")
wbi_img_urls = local_storage.get("wbi_img_urls", "") or local_storage.get(
"wbi_img_url") + "-" + local_storage.get("wbi_sub_url")
wbi_img_urls = local_storage.get("wbi_img_urls", "")
if not wbi_img_urls:
img_url_from_storage = local_storage.get("wbi_img_url")
sub_url_from_storage = local_storage.get("wbi_sub_url")
if img_url_from_storage and sub_url_from_storage:
wbi_img_urls = f"{img_url_from_storage}-{sub_url_from_storage}"
if wbi_img_urls and "-" in wbi_img_urls:
img_url, sub_url = wbi_img_urls.split("-")
else:
@@ -235,16 +244,50 @@ class BilibiliClient(AbstractApiClient):
:return:
"""
result = []
is_end = False
next_page = 0
max_retries = 3
while not is_end and len(result) < max_count:
comments_res = await self.get_video_comments(video_id, CommentOrderType.DEFAULT, next_page)
comments_res = None
for attempt in range(max_retries):
try:
comments_res = await self.get_video_comments(video_id, CommentOrderType.DEFAULT, next_page)
break # Success
except DataFetchError as e:
if attempt < max_retries - 1:
delay = 5 * (2 ** attempt) + random.uniform(0, 1)
utils.logger.warning(
f"[BilibiliClient.get_video_all_comments] Retrying video_id {video_id} in {delay:.2f}s... (Attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
else:
utils.logger.error(
f"[BilibiliClient.get_video_all_comments] Max retries reached for video_id: {video_id}. Skipping comments. Error: {e}"
)
is_end = True
break
if not comments_res:
break
cursor_info: Dict = comments_res.get("cursor")
if not cursor_info:
utils.logger.warning(f"[BilibiliClient.get_video_all_comments] Could not find 'cursor' in response for video_id: {video_id}. Skipping.")
break
comment_list: List[Dict] = comments_res.get("replies", [])
is_end = cursor_info.get("is_end")
next_page = cursor_info.get("next")
# 检查 is_end 和 next 是否存在
if "is_end" not in cursor_info or "next" not in cursor_info:
utils.logger.warning(f"[BilibiliClient.get_video_all_comments] 'is_end' or 'next' not in cursor for video_id: {video_id}. Assuming end of comments.")
is_end = True
else:
is_end = cursor_info.get("is_end")
next_page = cursor_info.get("next")
if not isinstance(is_end, bool):
utils.logger.warning(f"[BilibiliClient.get_video_all_comments] 'is_end' is not a boolean for video_id: {video_id}. Assuming end of comments.")
is_end = True
if is_fetch_sub_comments:
for comment in comment_list:
comment_id = comment['rpid']

View File

@@ -23,6 +23,7 @@ from datetime import datetime, timedelta
import pandas as pd
from playwright.async_api import (BrowserContext, BrowserType, Page, Playwright, async_playwright)
from playwright._impl._errors import TargetClosedError
import config
from base.base_crawler import AbstractCrawler
@@ -95,7 +96,6 @@ class BilibiliCrawler(AbstractCrawler):
crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search":
# Search for video and retrieve their comment information.
await self.search()
elif config.CRAWLER_TYPE == "detail":
# Get the information and comments of the specified post
@@ -111,6 +111,20 @@ class BilibiliCrawler(AbstractCrawler):
utils.logger.info(
"[BilibiliCrawler.start] Bilibili Crawler finished ...")
async def search(self):
"""
search bilibili video
"""
# Search for video and retrieve their comment information.
if config.BILI_SEARCH_MODE == "normal":
await self.search_by_keywords()
elif config.BILI_SEARCH_MODE == "all_in_time_range":
await self.search_by_keywords_in_time_range(daily_limit=False)
elif config.BILI_SEARCH_MODE == "daily_limit_in_time_range":
await self.search_by_keywords_in_time_range(daily_limit=True)
else:
utils.logger.warning(f"Unknown BILI_SEARCH_MODE: {config.BILI_SEARCH_MODE}")
@staticmethod
async def get_pubtime_datetime(start: str = config.START_DAY, end: str = config.END_DAY) -> Tuple[str, str]:
"""
@@ -141,106 +155,138 @@ class BilibiliCrawler(AbstractCrawler):
# 将其重新转换为时间戳
return str(int(start_day.timestamp())), str(int(end_day.timestamp()))
async def search(self):
async def search_by_keywords(self):
"""
search bilibili video with keywords
search bilibili video with keywords in normal mode
:return:
"""
utils.logger.info("[BilibiliCrawler.search] Begin search bilibli keywords")
utils.logger.info("[BilibiliCrawler.search_by_keywords] Begin search bilibli keywords")
bili_limit_count = 20 # bilibili limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count
start_page = config.START_PAGE # start page number
for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword)
utils.logger.info(f"[BilibiliCrawler.search] Current search keyword: {keyword}")
# 每个关键词最多返回 1000 条数据
if not config.ALL_DAY:
page = 1
while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page:
utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}")
page += 1
continue
utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, page: {page}")
video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword,
page=page,
page_size=bili_limit_count,
order=SearchOrderType.DEFAULT,
pubtime_begin_s=0, # 作品发布日期起始时间戳
pubtime_end_s=0 # 作品发布日期结束日期时间戳
)
video_list: List[Dict] = videos_res.get("result")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = []
try:
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
except Exception as e:
utils.logger.warning(f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}")
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
video_id_list.append(video_item.get("View").get("aid"))
await bilibili_store.update_bilibili_video(video_item)
await bilibili_store.update_up_info(video_item)
await self.get_bilibili_video(video_item, semaphore)
utils.logger.info(f"[BilibiliCrawler.search_by_keywords] Current search keyword: {keyword}")
page = 1
while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page:
utils.logger.info(f"[BilibiliCrawler.search_by_keywords] Skip page: {page}")
page += 1
await self.batch_get_video_comments(video_id_list)
# 按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下每一天的所有视频
else:
for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'):
# 按照每一天进行爬取的时间戳参数
pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d'))
page = 1
notes_count_this_day = 0
#!该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频
#!除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天
#!除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!!
while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if notes_count_this_day >= config.MAX_NOTES_PER_DAY:
utils.logger.info(f"[BilibiliCrawler.search] Reached the maximum number of notes for today {day.ctime()}.")
break
#! Catch any error if response return nothing, go to next day
try:
#! Don't skip any page, to make sure gather all video in one day
# if page < start_page:
# utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}")
# page += 1
# continue
continue
utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}")
video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword,
page=page,
page_size=bili_limit_count,
order=SearchOrderType.DEFAULT,
pubtime_begin_s=pubtime_begin_s, # 作品发布日期起始时间戳
pubtime_end_s=pubtime_end_s # 作品发布日期结束日期时间戳
)
video_list: List[Dict] = videos_res.get("result")
utils.logger.info(f"[BilibiliCrawler.search_by_keywords] search bilibili keyword: {keyword}, page: {page}")
video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword,
page=page,
page_size=bili_limit_count,
order=SearchOrderType.DEFAULT,
pubtime_begin_s=0, # 作品发布日期起始时间戳
pubtime_end_s=0 # 作品发布日期结束日期时间戳
)
video_list: List[Dict] = videos_res.get("result")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
notes_count_this_day += 1
video_id_list.append(video_item.get("View").get("aid"))
await bilibili_store.update_bilibili_video(video_item)
await bilibili_store.update_up_info(video_item)
await self.get_bilibili_video(video_item, semaphore)
page += 1
await self.batch_get_video_comments(video_id_list)
# go to next day
except Exception as e:
print(e)
if not video_list:
utils.logger.info(f"[BilibiliCrawler.search_by_keywords] No more videos for '{keyword}', moving to next keyword.")
break
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = []
try:
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
except Exception as e:
utils.logger.warning(f"[BilibiliCrawler.search_by_keywords] error in the task list. The video for this page will not be included. {e}")
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
video_id_list.append(video_item.get("View").get("aid"))
await bilibili_store.update_bilibili_video(video_item)
await bilibili_store.update_up_info(video_item)
await self.get_bilibili_video(video_item, semaphore)
page += 1
await self.batch_get_video_comments(video_id_list)
async def search_by_keywords_in_time_range(self, daily_limit: bool):
"""
Search bilibili video with keywords in a given time range.
:param daily_limit: if True, strictly limit the number of notes per day and total.
"""
utils.logger.info(f"[BilibiliCrawler.search_by_keywords_in_time_range] Begin search with daily_limit={daily_limit}")
bili_limit_count = 20
start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword)
utils.logger.info(f"[BilibiliCrawler.search_by_keywords_in_time_range] Current search keyword: {keyword}")
total_notes_crawled_for_keyword = 0
for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'):
if daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT:
utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days.")
break
if not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT:
utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days.")
break
pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d'))
page = 1
notes_count_this_day = 0
while True:
if notes_count_this_day >= config.MAX_NOTES_PER_DAY:
utils.logger.info(f"[BilibiliCrawler.search] Reached MAX_NOTES_PER_DAY limit for {day.ctime()}.")
break
if daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT:
utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}'.")
break
if not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT:
break
try:
utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}")
video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword,
page=page,
page_size=bili_limit_count,
order=SearchOrderType.DEFAULT,
pubtime_begin_s=pubtime_begin_s,
pubtime_end_s=pubtime_end_s
)
video_list: List[Dict] = videos_res.get("result")
if not video_list:
utils.logger.info(f"[BilibiliCrawler.search] No more videos for '{keyword}' on {day.ctime()}, moving to next day.")
break
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
if daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT:
break
if not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT:
break
if notes_count_this_day >= config.MAX_NOTES_PER_DAY:
break
notes_count_this_day += 1
total_notes_crawled_for_keyword += 1
video_id_list.append(video_item.get("View").get("aid"))
await bilibili_store.update_bilibili_video(video_item)
await bilibili_store.update_up_info(video_item)
await self.get_bilibili_video(video_item, semaphore)
page += 1
await self.batch_get_video_comments(video_id_list)
except Exception as e:
utils.logger.error(f"[BilibiliCrawler.search] Error searching on {day.ctime()}: {e}")
break
async def batch_get_video_comments(self, video_id_list: List[str]):
"""
batch get video comments
@@ -273,6 +319,7 @@ class BilibiliCrawler(AbstractCrawler):
try:
utils.logger.info(
f"[BilibiliCrawler.get_comments] begin get video_id: {video_id} comments ...")
await asyncio.sleep(random.uniform(0.5, 1.5))
await self.bili_client.get_video_all_comments(
video_id=video_id,
crawl_interval=random.random(),
@@ -287,6 +334,8 @@ class BilibiliCrawler(AbstractCrawler):
except Exception as e:
utils.logger.error(
f"[BilibiliCrawler.get_comments] may be been blocked, err:{e}")
# Propagate the exception to be caught by the main loop
raise
async def get_creator_videos(self, creator_id: int):
"""
@@ -477,13 +526,18 @@ class BilibiliCrawler(AbstractCrawler):
async def close(self):
"""Close browser context"""
# 如果使用CDP模式需要特殊处理
if self.cdp_manager:
await self.cdp_manager.cleanup()
self.cdp_manager = None
else:
await self.browser_context.close()
utils.logger.info("[BilibiliCrawler.close] Browser context closed ...")
try:
# 如果使用CDP模式需要特殊处理
if self.cdp_manager:
await self.cdp_manager.cleanup()
self.cdp_manager = None
elif self.browser_context:
await self.browser_context.close()
utils.logger.info("[BilibiliCrawler.close] Browser context closed ...")
except TargetClosedError:
utils.logger.warning("[BilibiliCrawler.close] Browser context was already closed.")
except Exception as e:
utils.logger.error(f"[BilibiliCrawler.close] An error occurred during close: {e}")
async def get_bilibili_video(self, video_item: Dict, semaphore: asyncio.Semaphore):
"""