refactor: tieba 改为浏览器获取数据

This commit is contained in:
程序员阿江(Relakkes)
2025-10-19 17:09:55 +08:00
parent 26a261bc09
commit ed6e0bfb5f
3 changed files with 606 additions and 128 deletions

View File

@@ -11,10 +11,10 @@
import asyncio import asyncio
import json import json
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode from urllib.parse import urlencode, quote
import httpx import requests
from playwright.async_api import BrowserContext from playwright.async_api import BrowserContext, Page
from tenacity import RetryError, retry, stop_after_attempt, wait_fixed from tenacity import RetryError, retry, stop_after_attempt, wait_fixed
import config import config
@@ -34,34 +34,76 @@ class BaiduTieBaClient(AbstractApiClient):
timeout=10, timeout=10,
ip_pool=None, ip_pool=None,
default_ip_proxy=None, default_ip_proxy=None,
headers: Dict[str, str] = None,
playwright_page: Optional[Page] = None,
): ):
self.ip_pool: Optional[ProxyIpPool] = ip_pool self.ip_pool: Optional[ProxyIpPool] = ip_pool
self.timeout = timeout self.timeout = timeout
self.headers = { # 使用传入的headers(包含真实浏览器UA)或默认headers
self.headers = headers or {
"User-Agent": utils.get_user_agent(), "User-Agent": utils.get_user_agent(),
"Cookies": "", "Cookie": "",
} }
self._host = "https://tieba.baidu.com" self._host = "https://tieba.baidu.com"
self._page_extractor = TieBaExtractor() self._page_extractor = TieBaExtractor()
self.default_ip_proxy = default_ip_proxy self.default_ip_proxy = default_ip_proxy
self.playwright_page = playwright_page # Playwright页面对象
def _sync_request(self, method, url, proxy=None, **kwargs):
"""
同步的requests请求方法
Args:
method: 请求方法
url: 请求的URL
proxy: 代理IP
**kwargs: 其他请求参数
Returns:
response对象
"""
# 构造代理字典
proxies = None
if proxy:
proxies = {
"http": proxy,
"https": proxy,
}
# 发送请求
response = requests.request(
method=method,
url=url,
headers=self.headers,
proxies=proxies,
timeout=self.timeout,
**kwargs
)
return response
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) @retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def request(self, method, url, return_ori_content=False, proxy=None, **kwargs) -> Union[str, Any]: async def request(self, method, url, return_ori_content=False, proxy=None, **kwargs) -> Union[str, Any]:
""" """
封装httpx的公共请求方法,对请求响应做一些处理 封装requests的公共请求方法,对请求响应做一些处理
Args: Args:
method: 请求方法 method: 请求方法
url: 请求的URL url: 请求的URL
return_ori_content: 是否返回原始内容 return_ori_content: 是否返回原始内容
proxies: 代理IP proxy: 代理IP
**kwargs: 其他请求参数,例如请求头、请求体等 **kwargs: 其他请求参数,例如请求头、请求体等
Returns: Returns:
""" """
actual_proxy = proxy if proxy else self.default_ip_proxy actual_proxy = proxy if proxy else self.default_ip_proxy
async with httpx.AsyncClient(proxy=actual_proxy) as client:
response = await client.request(method, url, timeout=self.timeout, headers=self.headers, **kwargs) # 在线程池中执行同步的requests请求
response = await asyncio.to_thread(
self._sync_request,
method,
url,
actual_proxy,
**kwargs
)
if response.status_code != 200: if response.status_code != 200:
utils.logger.error(f"Request failed, method: {method}, url: {url}, status code: {response.status_code}") utils.logger.error(f"Request failed, method: {method}, url: {url}, status code: {response.status_code}")
@@ -69,7 +111,7 @@ class BaiduTieBaClient(AbstractApiClient):
raise Exception(f"Request failed, method: {method}, url: {url}, status code: {response.status_code}") raise Exception(f"Request failed, method: {method}, url: {url}, status code: {response.status_code}")
if response.text == "" or response.text == "blocked": if response.text == "" or response.text == "blocked":
utils.logger.error(f"request params incrr, response.text: {response.text}") utils.logger.error(f"request params incorrect, response.text: {response.text}")
raise Exception("account blocked") raise Exception("account blocked")
if return_ori_content: if return_ori_content:
@@ -119,26 +161,41 @@ class BaiduTieBaClient(AbstractApiClient):
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, **kwargs) return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, **kwargs)
async def pong(self) -> bool: async def pong(self, browser_context: BrowserContext = None) -> bool:
""" """
用于检查登录态是否失效了 用于检查登录态是否失效了
Returns: 使用Cookie检测而非API调用,避免被检测
Args:
browser_context: 浏览器上下文对象
Returns:
bool: True表示已登录,False表示未登录
""" """
utils.logger.info("[BaiduTieBaClient.pong] Begin to pong tieba...") utils.logger.info("[BaiduTieBaClient.pong] Begin to check tieba login state by cookies...")
if not browser_context:
utils.logger.warning("[BaiduTieBaClient.pong] browser_context is None, assume not logged in")
return False
try: try:
uri = "/mo/q/sync" # 从浏览器获取cookies并检查关键登录cookie
res: Dict = await self.get(uri) _, cookie_dict = utils.convert_cookies(await browser_context.cookies())
utils.logger.info(f"[BaiduTieBaClient.pong] res: {res}")
if res and res.get("no") == 0: # 百度贴吧的登录标识: STOKEN 或 PTOKEN
ping_flag = True stoken = cookie_dict.get("STOKEN")
ptoken = cookie_dict.get("PTOKEN")
bduss = cookie_dict.get("BDUSS") # 百度通用登录cookie
if stoken or ptoken or bduss:
utils.logger.info(f"[BaiduTieBaClient.pong] Login state verified by cookies (STOKEN: {bool(stoken)}, PTOKEN: {bool(ptoken)}, BDUSS: {bool(bduss)})")
return True
else: else:
utils.logger.info(f"[BaiduTieBaClient.pong] user not login, will try to login again...") utils.logger.info("[BaiduTieBaClient.pong] No valid login cookies found, need to login")
ping_flag = False return False
except Exception as e: except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.pong] Ping tieba failed: {e}, and try to login again...") utils.logger.error(f"[BaiduTieBaClient.pong] Check login state failed: {e}, assume not logged in")
ping_flag = False return False
return ping_flag
async def update_cookies(self, browser_context: BrowserContext): async def update_cookies(self, browser_context: BrowserContext):
""" """
@@ -149,7 +206,9 @@ class BaiduTieBaClient(AbstractApiClient):
Returns: Returns:
""" """
pass cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.headers["Cookie"] = cookie_str
utils.logger.info("[BaiduTieBaClient.update_cookies] Cookie has been updated")
async def get_notes_by_keyword( async def get_notes_by_keyword(
self, self,
@@ -160,7 +219,7 @@ class BaiduTieBaClient(AbstractApiClient):
note_type: SearchNoteType = SearchNoteType.FIXED_THREAD, note_type: SearchNoteType = SearchNoteType.FIXED_THREAD,
) -> List[TiebaNote]: ) -> List[TiebaNote]:
""" """
根据关键词搜索贴吧帖子 根据关键词搜索贴吧帖子 (使用Playwright访问页面,避免API检测)
Args: Args:
keyword: 关键词 keyword: 关键词
page: 分页第几页 page: 分页第几页
@@ -170,30 +229,81 @@ class BaiduTieBaClient(AbstractApiClient):
Returns: Returns:
""" """
uri = "/f/search/res" if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_notes_by_keyword] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based search")
# 构造搜索URL
# 示例: https://tieba.baidu.com/f/search/res?ie=utf-8&qw=编程
search_url = f"{self._host}/f/search/res"
params = { params = {
"isnew": 1, "ie": "utf-8",
"qw": keyword, "qw": keyword,
"rn": page_size, "rn": page_size,
"pn": page, "pn": page,
"sm": sort.value, "sm": sort.value,
"only_thread": note_type.value, "only_thread": note_type.value,
} }
page_content = await self.get(uri, params=params, return_ori_content=True)
return self._page_extractor.extract_search_note_list(page_content) # 拼接完整URL
full_url = f"{search_url}?{urlencode(params)}"
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] 访问搜索页面: {full_url}")
try:
# 使用Playwright访问搜索页面
await self.playwright_page.goto(full_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] 成功获取搜索页面HTML,长度: {len(page_content)}")
# 提取搜索结果
notes = self._page_extractor.extract_search_note_list(page_content)
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] 提取到 {len(notes)} 条帖子")
return notes
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_keyword] 搜索失败: {e}")
raise
async def get_note_by_id(self, note_id: str) -> TiebaNote: async def get_note_by_id(self, note_id: str) -> TiebaNote:
""" """
根据帖子ID获取帖子详情 根据帖子ID获取帖子详情 (使用Playwright访问页面,避免API检测)
Args: Args:
note_id: note_id: 帖子ID
Returns: Returns:
TiebaNote: 帖子详情对象
""" """
uri = f"/p/{note_id}" if not self.playwright_page:
page_content = await self.get(uri, return_ori_content=True) utils.logger.error("[BaiduTieBaClient.get_note_by_id] playwright_page is None, cannot use browser mode")
return self._page_extractor.extract_note_detail(page_content) raise Exception("playwright_page is required for browser-based note detail fetching")
# 构造帖子详情URL
note_url = f"{self._host}/p/{note_id}"
utils.logger.info(f"[BaiduTieBaClient.get_note_by_id] 访问帖子详情页面: {note_url}")
try:
# 使用Playwright访问帖子详情页面
await self.playwright_page.goto(note_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
utils.logger.info(f"[BaiduTieBaClient.get_note_by_id] 成功获取帖子详情HTML,长度: {len(page_content)}")
# 提取帖子详情
note_detail = self._page_extractor.extract_note_detail(page_content)
return note_detail
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_note_by_id] 获取帖子详情失败: {e}")
raise
async def get_note_all_comments( async def get_note_all_comments(
self, self,
@@ -203,35 +313,68 @@ class BaiduTieBaClient(AbstractApiClient):
max_count: int = 10, max_count: int = 10,
) -> List[TiebaComment]: ) -> List[TiebaComment]:
""" """
获取指定帖子下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息 获取指定帖子下的所有一级评论 (使用Playwright访问页面,避免API检测)
Args: Args:
note_detail: 帖子详情对象 note_detail: 帖子详情对象
crawl_interval: 爬取一次笔记的延迟单位(秒) crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后 callback: 一次笔记爬取结束后的回调函数
max_count: 一次帖子爬取的最大评论数量 max_count: 一次帖子爬取的最大评论数量
Returns: Returns:
List[TiebaComment]: 评论列表
""" """
uri = f"/p/{note_detail.note_id}" if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_note_all_comments] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based comment fetching")
result: List[TiebaComment] = [] result: List[TiebaComment] = []
current_page = 1 current_page = 1
while note_detail.total_replay_page >= current_page and len(result) < max_count: while note_detail.total_replay_page >= current_page and len(result) < max_count:
params = { # 构造评论页URL
"pn": current_page, comment_url = f"{self._host}/p/{note_detail.note_id}?pn={current_page}"
} utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 访问评论页面: {comment_url}")
page_content = await self.get(uri, params=params, return_ori_content=True)
comments = self._page_extractor.extract_tieba_note_parment_comments(page_content, note_id=note_detail.note_id) try:
# 使用Playwright访问评论页面
await self.playwright_page.goto(comment_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
# 提取评论
comments = self._page_extractor.extract_tieba_note_parment_comments(
page_content, note_id=note_detail.note_id
)
if not comments: if not comments:
utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 第{current_page}页没有评论,停止爬取")
break break
# 限制评论数量
if len(result) + len(comments) > max_count: if len(result) + len(comments) > max_count:
comments = comments[:max_count - len(result)] comments = comments[:max_count - len(result)]
if callback: if callback:
await callback(note_detail.note_id, comments) await callback(note_detail.note_id, comments)
result.extend(comments) result.extend(comments)
# 获取所有子评论 # 获取所有子评论
await self.get_comments_all_sub_comments(comments, crawl_interval=crawl_interval, callback=callback) await self.get_comments_all_sub_comments(
comments, crawl_interval=crawl_interval, callback=callback
)
await asyncio.sleep(crawl_interval) await asyncio.sleep(crawl_interval)
current_page += 1 current_page += 1
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_note_all_comments] 获取第{current_page}页评论失败: {e}")
break
utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 共获取 {len(result)} 条一级评论")
return result return result
async def get_comments_all_sub_comments( async def get_comments_all_sub_comments(
@@ -241,93 +384,194 @@ class BaiduTieBaClient(AbstractApiClient):
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
) -> List[TiebaComment]: ) -> List[TiebaComment]:
""" """
获取指定评论下的所有子评论 获取指定评论下的所有子评论 (使用Playwright访问页面,避免API检测)
Args: Args:
comments: 评论列表 comments: 评论列表
crawl_interval: 爬取一次笔记的延迟单位(秒) crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后 callback: 一次笔记爬取结束后的回调函数
Returns: Returns:
List[TiebaComment]: 子评论列表
""" """
uri = "/p/comment"
if not config.ENABLE_GET_SUB_COMMENTS: if not config.ENABLE_GET_SUB_COMMENTS:
return [] return []
# # 贴吧获取所有子评论需要登录态 if not self.playwright_page:
# if self.headers.get("Cookies") == "" or not self.pong(): utils.logger.error("[BaiduTieBaClient.get_comments_all_sub_comments] playwright_page is None, cannot use browser mode")
# raise Exception(f"[BaiduTieBaClient.pong] Cookies is empty, please login first...") raise Exception("playwright_page is required for browser-based sub-comment fetching")
all_sub_comments: List[TiebaComment] = [] all_sub_comments: List[TiebaComment] = []
for parment_comment in comments: for parment_comment in comments:
if parment_comment.sub_comment_count == 0: if parment_comment.sub_comment_count == 0:
continue continue
current_page = 1 current_page = 1
max_sub_page_num = parment_comment.sub_comment_count // 10 + 1 max_sub_page_num = parment_comment.sub_comment_count // 10 + 1
while max_sub_page_num >= current_page: while max_sub_page_num >= current_page:
params = { # 构造子评论URL
"tid": parment_comment.note_id, # 帖子ID sub_comment_url = (
"pid": parment_comment.comment_id, # 父级评论ID f"{self._host}/p/comment?"
"fid": parment_comment.tieba_id, # 贴吧ID f"tid={parment_comment.note_id}&"
"pn": current_page # 页码 f"pid={parment_comment.comment_id}&"
} f"fid={parment_comment.tieba_id}&"
page_content = await self.get(uri, params=params, return_ori_content=True) f"pn={current_page}"
sub_comments = self._page_extractor.extract_tieba_note_sub_comments(page_content, parent_comment=parment_comment) )
utils.logger.info(f"[BaiduTieBaClient.get_comments_all_sub_comments] 访问子评论页面: {sub_comment_url}")
try:
# 使用Playwright访问子评论页面
await self.playwright_page.goto(sub_comment_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
# 提取子评论
sub_comments = self._page_extractor.extract_tieba_note_sub_comments(
page_content, parent_comment=parment_comment
)
if not sub_comments: if not sub_comments:
utils.logger.info(
f"[BaiduTieBaClient.get_comments_all_sub_comments] "
f"评论{parment_comment.comment_id}{current_page}页没有子评论,停止爬取"
)
break break
if callback: if callback:
await callback(parment_comment.note_id, sub_comments) await callback(parment_comment.note_id, sub_comments)
all_sub_comments.extend(sub_comments) all_sub_comments.extend(sub_comments)
await asyncio.sleep(crawl_interval) await asyncio.sleep(crawl_interval)
current_page += 1 current_page += 1
except Exception as e:
utils.logger.error(
f"[BaiduTieBaClient.get_comments_all_sub_comments] "
f"获取评论{parment_comment.comment_id}{current_page}页子评论失败: {e}"
)
break
utils.logger.info(f"[BaiduTieBaClient.get_comments_all_sub_comments] 共获取 {len(all_sub_comments)} 条子评论")
return all_sub_comments return all_sub_comments
async def get_notes_by_tieba_name(self, tieba_name: str, page_num: int) -> List[TiebaNote]: async def get_notes_by_tieba_name(self, tieba_name: str, page_num: int) -> List[TiebaNote]:
""" """
根据贴吧名称获取帖子列表 根据贴吧名称获取帖子列表 (使用Playwright访问页面,避免API检测)
Args: Args:
tieba_name: 贴吧名称 tieba_name: 贴吧名称
page_num: 分页数量 page_num: 分页页码
Returns: Returns:
List[TiebaNote]: 帖子列表
""" """
uri = f"/f?kw={tieba_name}&pn={page_num}" if not self.playwright_page:
page_content = await self.get(uri, return_ori_content=True) utils.logger.error("[BaiduTieBaClient.get_notes_by_tieba_name] playwright_page is None, cannot use browser mode")
return self._page_extractor.extract_tieba_note_list(page_content) raise Exception("playwright_page is required for browser-based tieba note fetching")
# 构造贴吧帖子列表URL
tieba_url = f"{self._host}/f?kw={quote(tieba_name)}&pn={page_num}"
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] 访问贴吧页面: {tieba_url}")
try:
# 使用Playwright访问贴吧页面
await self.playwright_page.goto(tieba_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] 成功获取贴吧页面HTML,长度: {len(page_content)}")
# 提取帖子列表
notes = self._page_extractor.extract_tieba_note_list(page_content)
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] 提取到 {len(notes)} 条帖子")
return notes
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_tieba_name] 获取贴吧帖子列表失败: {e}")
raise
async def get_creator_info_by_url(self, creator_url: str) -> str: async def get_creator_info_by_url(self, creator_url: str) -> str:
""" """
根据创作者ID获取创作者信息 根据创作者URL获取创作者信息 (使用Playwright访问页面,避免API检测)
Args: Args:
creator_url: 创作者主页URL creator_url: 创作者主页URL
Returns: Returns:
str: 页面HTML内容
""" """
page_content = await self.request(method="GET", url=creator_url, return_ori_content=True) if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_creator_info_by_url] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based creator info fetching")
utils.logger.info(f"[BaiduTieBaClient.get_creator_info_by_url] 访问创作者主页: {creator_url}")
try:
# 使用Playwright访问创作者主页
await self.playwright_page.goto(creator_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
utils.logger.info(f"[BaiduTieBaClient.get_creator_info_by_url] 成功获取创作者主页HTML,长度: {len(page_content)}")
return page_content return page_content
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_creator_info_by_url] 获取创作者主页失败: {e}")
raise
async def get_notes_by_creator(self, user_name: str, page_number: int) -> Dict: async def get_notes_by_creator(self, user_name: str, page_number: int) -> Dict:
""" """
根据创作者获取创作者的所有帖子 根据创作者获取创作者的帖子 (使用Playwright访问页面,避免API检测)
Args: Args:
user_name: user_name: 创作者用户名
page_number: page_number: 页码
Returns: Returns:
Dict: 包含帖子数据的字典
""" """
uri = f"/home/get/getthread" if not self.playwright_page:
params = { utils.logger.error("[BaiduTieBaClient.get_notes_by_creator] playwright_page is None, cannot use browser mode")
"un": user_name, raise Exception("playwright_page is required for browser-based creator notes fetching")
"pn": page_number,
"id": "utf-8", # 构造创作者帖子列表URL
"_": utils.get_current_timestamp(), creator_url = f"{self._host}/home/get/getthread?un={quote(user_name)}&pn={page_number}&id=utf-8&_={utils.get_current_timestamp()}"
} utils.logger.info(f"[BaiduTieBaClient.get_notes_by_creator] 访问创作者帖子列表: {creator_url}")
return await self.get(uri, params=params)
try:
# 使用Playwright访问创作者帖子列表页面
await self.playwright_page.goto(creator_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面内容(这个接口返回JSON)
page_content = await self.playwright_page.content()
# 提取JSON数据(页面会包含<pre>标签或直接是JSON)
try:
# 尝试从页面中提取JSON
json_text = await self.playwright_page.evaluate("() => document.body.innerText")
result = json.loads(json_text)
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_creator] 成功获取创作者帖子数据")
return result
except json.JSONDecodeError as e:
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_creator] JSON解析失败: {e}")
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_creator] 页面内容: {page_content[:500]}")
raise Exception(f"Failed to parse JSON from creator notes page: {e}")
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_creator] 获取创作者帖子列表失败: {e}")
raise
async def get_all_notes_by_creator_user_name( async def get_all_notes_by_creator_user_name(
self, self,

View File

@@ -11,7 +11,6 @@
import asyncio import asyncio
import os import os
# import random # Removed as we now use fixed config.CRAWLER_MAX_SLEEP_SEC intervals
from asyncio import Task from asyncio import Task
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
@@ -26,7 +25,7 @@ from playwright.async_api import (
import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from model.m_baidu_tieba import TiebaCreator, TiebaNote from model.m_baidu_tieba import TiebaCreator, TiebaNote
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool from proxy.proxy_ip_pool import IpInfoModel, ProxyIpPool, create_ip_pool
from store import tieba as tieba_store from store import tieba as tieba_store
from tools import utils from tools import utils
from tools.cdp_browser import CDPBrowserManager from tools.cdp_browser import CDPBrowserManager
@@ -56,7 +55,7 @@ class TieBaCrawler(AbstractCrawler):
Returns: Returns:
""" """
ip_proxy_pool, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
utils.logger.info( utils.logger.info(
"[BaiduTieBaCrawler.start] Begin create ip proxy pool ..." "[BaiduTieBaCrawler.start] Begin create ip proxy pool ..."
@@ -65,16 +64,58 @@ class TieBaCrawler(AbstractCrawler):
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
) )
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
_, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info)
utils.logger.info( utils.logger.info(
f"[BaiduTieBaCrawler.start] Init default ip proxy, value: {httpx_proxy_format}" f"[BaiduTieBaCrawler.start] Init default ip proxy, value: {httpx_proxy_format}"
) )
# Create a client to interact with the baidutieba website. async with async_playwright() as playwright:
self.tieba_client = BaiduTieBaClient( # 根据配置选择启动模式
ip_pool=ip_proxy_pool, if config.ENABLE_CDP_MODE:
default_ip_proxy=httpx_proxy_format, utils.logger.info("[BaiduTieBaCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp(
playwright,
playwright_proxy_format,
self.user_agent,
headless=config.CDP_HEADLESS,
) )
else:
utils.logger.info("[BaiduTieBaCrawler] 使用标准模式启动浏览器")
# Launch a browser context.
chromium = playwright.chromium
self.browser_context = await self.launch_browser(
chromium,
playwright_proxy_format,
self.user_agent,
headless=config.HEADLESS,
)
# 注入反检测脚本 - 针对百度的特殊检测
await self._inject_anti_detection_scripts()
self.context_page = await self.browser_context.new_page()
# 先访问百度首页,再点击贴吧链接,避免触发安全验证
await self._navigate_to_tieba_via_baidu()
# Create a client to interact with the baidutieba website.
self.tieba_client = await self.create_tieba_client(
httpx_proxy_format,
ip_proxy_pool if config.ENABLE_IP_PROXY else None
)
# Check login status and perform login if necessary
if not await self.tieba_client.pong(browser_context=self.browser_context):
login_obj = BaiduTieBaLogin(
login_type=config.LOGIN_TYPE,
login_phone="", # your phone number
browser_context=self.browser_context,
context_page=self.context_page,
cookie_str=config.COOKIES,
)
await login_obj.begin()
await self.tieba_client.update_cookies(browser_context=self.browser_context)
crawler_type_var.set(config.CRAWLER_TYPE) crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search": if config.CRAWLER_TYPE == "search":
# Search for notes and retrieve their comment information. # Search for notes and retrieve their comment information.
@@ -347,6 +388,198 @@ class TieBaCrawler(AbstractCrawler):
f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_url:{creator_url}" f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_url:{creator_url}"
) )
async def _navigate_to_tieba_via_baidu(self):
"""
模拟真实用户访问路径:
1. 先访问百度首页 (https://www.baidu.com/)
2. 等待页面加载
3. 点击顶部导航栏的"贴吧"链接
4. 跳转到贴吧首页
这样做可以避免触发百度的安全验证
"""
utils.logger.info("[TieBaCrawler] 模拟真实用户访问路径...")
try:
# Step 1: 访问百度首页
utils.logger.info("[TieBaCrawler] Step 1: 访问百度首页 https://www.baidu.com/")
await self.context_page.goto("https://www.baidu.com/", wait_until="domcontentloaded")
# Step 2: 等待页面加载,使用配置文件中的延时设置
utils.logger.info(f"[TieBaCrawler] Step 2: 等待 {config.CRAWLER_MAX_SLEEP_SEC}秒 模拟用户浏览...")
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# Step 3: 查找并点击"贴吧"链接
utils.logger.info("[TieBaCrawler] Step 3: 查找并点击'贴吧'链接...")
# 尝试多种选择器,确保能找到贴吧链接
tieba_selectors = [
'a[href="http://tieba.baidu.com/"]',
'a[href="https://tieba.baidu.com/"]',
'a.mnav:has-text("贴吧")',
'text=贴吧',
]
tieba_link = None
for selector in tieba_selectors:
try:
tieba_link = await self.context_page.wait_for_selector(selector, timeout=5000)
if tieba_link:
utils.logger.info(f"[TieBaCrawler] 找到贴吧链接 (selector: {selector})")
break
except Exception:
continue
if not tieba_link:
utils.logger.warning("[TieBaCrawler] 未找到贴吧链接,直接访问贴吧首页")
await self.context_page.goto(self.index_url, wait_until="domcontentloaded")
return
# Step 4: 点击贴吧链接 (检查是否会打开新标签页)
utils.logger.info("[TieBaCrawler] Step 4: 点击贴吧链接...")
# 检查链接的target属性
target_attr = await tieba_link.get_attribute("target")
utils.logger.info(f"[TieBaCrawler] 链接target属性: {target_attr}")
if target_attr == "_blank":
# 如果是新标签页,需要等待新页面并切换
utils.logger.info("[TieBaCrawler] 链接会在新标签页打开,等待新页面...")
async with self.browser_context.expect_page() as new_page_info:
await tieba_link.click()
# 获取新打开的页面
new_page = await new_page_info.value
await new_page.wait_for_load_state("domcontentloaded")
# 关闭旧的百度首页
await self.context_page.close()
# 切换到新的贴吧页面
self.context_page = new_page
utils.logger.info("[TieBaCrawler] ✅ 已切换到新标签页 (贴吧页面)")
else:
# 如果是同一标签页跳转,正常等待导航
utils.logger.info("[TieBaCrawler] 链接在当前标签页跳转...")
async with self.context_page.expect_navigation(wait_until="domcontentloaded"):
await tieba_link.click()
# Step 5: 等待页面稳定,使用配置文件中的延时设置
utils.logger.info(f"[TieBaCrawler] Step 5: 页面加载完成,等待 {config.CRAWLER_MAX_SLEEP_SEC}秒...")
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
current_url = self.context_page.url
utils.logger.info(f"[TieBaCrawler] ✅ 成功通过百度首页进入贴吧! 当前URL: {current_url}")
except Exception as e:
utils.logger.error(f"[TieBaCrawler] 通过百度首页访问贴吧失败: {e}")
utils.logger.info("[TieBaCrawler] 回退:直接访问贴吧首页")
await self.context_page.goto(self.index_url, wait_until="domcontentloaded")
async def _inject_anti_detection_scripts(self):
"""
注入反检测JavaScript脚本
针对百度贴吧的特殊检测机制
"""
utils.logger.info("[TieBaCrawler] Injecting anti-detection scripts...")
# 轻量级反检测脚本,只覆盖关键检测点
anti_detection_js = """
// 覆盖 navigator.webdriver
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
configurable: true
});
// 覆盖 window.navigator.chrome
if (!window.navigator.chrome) {
window.navigator.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: {}
};
}
// 覆盖 Permissions API
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
// 覆盖 plugins 长度(让它看起来有插件)
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
configurable: true
});
// 覆盖 languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en'],
configurable: true
});
// 移除 window.cdc_ 等 ChromeDriver 残留
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Array;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Promise;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Symbol;
console.log('[Anti-Detection] Scripts injected successfully');
"""
await self.browser_context.add_init_script(anti_detection_js)
utils.logger.info("[TieBaCrawler] Anti-detection scripts injected")
async def create_tieba_client(
self, httpx_proxy: Optional[str], ip_pool: Optional[ProxyIpPool] = None
) -> BaiduTieBaClient:
"""
Create tieba client with real browser User-Agent and complete headers
Args:
httpx_proxy: HTTP代理
ip_pool: IP代理池
Returns:
BaiduTieBaClient实例
"""
utils.logger.info("[TieBaCrawler.create_tieba_client] Begin create tieba API client...")
# 从真实浏览器提取User-Agent,避免被检测
user_agent = await self.context_page.evaluate("() => navigator.userAgent")
utils.logger.info(f"[TieBaCrawler.create_tieba_client] Extracted User-Agent from browser: {user_agent}")
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
# 构建完整的浏览器请求头,模拟真实浏览器行为
tieba_client = BaiduTieBaClient(
timeout=10,
ip_pool=ip_pool,
default_ip_proxy=httpx_proxy,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"User-Agent": user_agent, # 使用真实浏览器的UA
"Cookie": cookie_str,
"Host": "tieba.baidu.com",
"Referer": "https://tieba.baidu.com/",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"sec-ch-ua": '"Google Chrome";v="141", "Not?A_Brand";v="8", "Chromium";v="141"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
},
playwright_page=self.context_page, # 传入playwright页面对象
)
return tieba_client
async def launch_browser( async def launch_browser(
self, self,
chromium: BrowserType, chromium: BrowserType,

View File

@@ -127,23 +127,24 @@ class BrowserLauncher:
"--disable-hang-monitor", "--disable-hang-monitor",
"--disable-prompt-on-repost", "--disable-prompt-on-repost",
"--disable-sync", "--disable-sync",
"--disable-web-security", # 可能有助于某些网站的访问
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage", # 避免共享内存问题 "--disable-dev-shm-usage", # 避免共享内存问题
"--no-sandbox", # 在CDP模式下关闭沙箱 "--no-sandbox", # 在CDP模式下关闭沙箱
# 🔥 关键反检测参数
"--disable-blink-features=AutomationControlled", # 禁用自动化控制标记
"--exclude-switches=enable-automation", # 排除自动化开关
"--disable-infobars", # 禁用信息栏
] ]
# 无头模式 # 无头模式
if headless: if headless:
args.extend([ args.extend([
"--headless", "--headless=new", # 使用新的headless模式
"--disable-gpu", "--disable-gpu",
]) ])
else: else:
# 非无头模式下也保持一些稳定性参数 # 非无头模式的额外参数
args.extend([ args.extend([
"--disable-blink-features=AutomationControlled", "--start-maximized", # 最大化窗口,更像真实用户
"--disable-infobars",
]) ])
# 用户数据目录 # 用户数据目录