diff --git a/cmd_arg/arg.py b/cmd_arg/arg.py index cbbcc03..86199db 100644 --- a/cmd_arg/arg.py +++ b/cmd_arg/arg.py @@ -22,6 +22,7 @@ from __future__ import annotations import sys +import re from enum import Enum from types import SimpleNamespace from typing import Iterable, Optional, Sequence, Type, TypeVar @@ -135,6 +136,21 @@ def _inject_init_db_default(args: Sequence[str]) -> list[str]: return normalized +def _normalize_tieba_note_id(value: str) -> str: + """Accept a raw Tieba thread id or a /p/ URL.""" + value = value.strip() + match = re.search(r"/p/(\d+)", value) + return match.group(1) if match else value + + +def _normalize_tieba_creator_url(value: str) -> str: + """Accept a Tieba creator homepage URL or a portrait id.""" + value = value.strip() + if value.startswith("http://") or value.startswith("https://"): + return value + return f"https://tieba.baidu.com/home/main?id={value}" + + async def parse_cmd(argv: Optional[Sequence[str]] = None): """Parse command line arguments using Typer.""" @@ -344,6 +360,10 @@ async def parse_cmd(argv: Optional[Sequence[str]] = None): config.WEIBO_SPECIFIED_ID_LIST = specified_id_list elif platform == PlatformEnum.KUAISHOU: config.KS_SPECIFIED_ID_LIST = specified_id_list + elif platform == PlatformEnum.TIEBA: + config.TIEBA_SPECIFIED_ID_LIST = [ + _normalize_tieba_note_id(item) for item in specified_id_list + ] if creator_id_list: if platform == PlatformEnum.XHS: @@ -356,6 +376,10 @@ async def parse_cmd(argv: Optional[Sequence[str]] = None): config.WEIBO_CREATOR_ID_LIST = creator_id_list elif platform == PlatformEnum.KUAISHOU: config.KS_CREATOR_ID_LIST = creator_id_list + elif platform == PlatformEnum.TIEBA: + config.TIEBA_CREATOR_URL_LIST = [ + _normalize_tieba_creator_url(item) for item in creator_id_list + ] return SimpleNamespace( platform=config.PLATFORM, diff --git a/media_platform/tieba/client.py b/media_platform/tieba/client.py index 5315294..e7bda82 100644 --- a/media_platform/tieba/client.py +++ b/media_platform/tieba/client.py @@ -18,9 +18,10 @@ # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 import asyncio +import hashlib import json from typing import Any, Callable, Dict, List, Optional, Union -from urllib.parse import urlencode, quote +from urllib.parse import urlencode, quote, parse_qs, unquote, urlparse import requests from playwright.async_api import BrowserContext, Page @@ -35,6 +36,8 @@ from tools import utils from .field import SearchNoteType, SearchSortType from .help import TieBaExtractor +PC_SIGN_SECRET = "36770b1f34c9bbf2e7d1a99d2b82fa9e" + class BaiduTieBaClient(AbstractApiClient): @@ -58,6 +61,128 @@ class BaiduTieBaClient(AbstractApiClient): self._page_extractor = TieBaExtractor() self.default_ip_proxy = default_ip_proxy self.playwright_page = playwright_page # Playwright page object + self._pc_tbs = "" + + @staticmethod + def _sign_pc_params(params: Dict[str, Any]) -> str: + sign_text = "" + for key in sorted(params): + if key in {"sign", "sig"} or params[key] is None: + continue + sign_text += f"{key}={params[key]}" + sign_text += PC_SIGN_SECRET + return hashlib.md5(sign_text.encode("utf-8")).hexdigest() + + async def _ensure_tieba_origin(self) -> None: + if not self.playwright_page: + raise Exception("playwright_page is required for tieba PC API requests") + if not self.playwright_page.url.startswith(self._host): + await self.playwright_page.goto(self._host, wait_until="domcontentloaded") + + async def _fetch_json_by_browser( + self, + uri: str, + method: str = "GET", + params: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, + use_sign: bool = False, + ) -> Dict: + """ + Fetch current Tieba PC JSON APIs from the browser context. + These APIs rely on logged-in browser cookies and Baidu's PC signing + convention, while Python requests can be blocked by local proxy/TLS. + """ + await self._ensure_tieba_origin() + params = {k: v for k, v in (params or {}).items() if v is not None} + data = {k: v for k, v in (data or {}).items() if v is not None} + if use_sign: + sign_source = data if method.upper() == "POST" else params + sign_source.setdefault("subapp_type", "pc") + sign_source.setdefault("_client_type", "20") + sign_source["sign"] = self._sign_pc_params(sign_source) + + url = f"{self._host}{uri}" + if params: + url = f"{url}?{urlencode(params)}" + body = urlencode(data) if data else "" + response = await self.playwright_page.evaluate( + """async ({ url, method, body }) => { + const headers = { "Accept": "application/json, text/plain, */*" }; + const options = { method, credentials: "include", headers }; + if (method === "POST") { + headers["Content-Type"] = "application/x-www-form-urlencoded;charset=UTF-8"; + options.body = body; + } + const resp = await fetch(url, options); + const text = await resp.text(); + return { status: resp.status, text }; + }""", + {"url": url, "method": method.upper(), "body": body}, + ) + if response["status"] != 200: + raise Exception(f"Tieba PC API failed, status={response['status']}, url={url}") + try: + json_data = json.loads(response["text"]) + except json.JSONDecodeError as exc: + raise Exception(f"Tieba PC API returned non-JSON, url={url}, body={response['text'][:500]}") from exc + error_code = json_data.get("error_code", json_data.get("no", 0)) + if str(error_code) not in {"0", "None"}: + raise Exception(f"Tieba PC API error, url={url}, response={json_data}") + return json_data + + async def _get_pc_tbs(self) -> str: + if self._pc_tbs: + return self._pc_tbs + sync_data = await self._fetch_json_by_browser( + "/c/s/pc/sync", + params={"subapp_type": "pc", "_client_type": "20"}, + use_sign=True, + ) + self._pc_tbs = ( + sync_data.get("data", {}) + .get("anti", {}) + .get("tbs", "") + ) + if not self._pc_tbs: + raise Exception(f"Can not get Tieba tbs from pc sync API: {sync_data}") + return self._pc_tbs + + async def _get_pc_page_data(self, note_id: str, page: int = 1) -> Dict: + tbs = await self._get_pc_tbs() + return await self._fetch_json_by_browser( + "/c/f/pb/page_pc", + method="POST", + data={ + "pn": page, + "lz": 0, + "r": 2, + "mark_type": 0, + "back": 0, + "fr": "", + "kz": note_id, + "session_request_times": 1, + "tbs": tbs, + "subapp_type": "pc", + "_client_type": "20", + }, + use_sign=True, + ) + + @staticmethod + def _extract_creator_portrait(creator_url: str) -> str: + creator_url = (creator_url or "").strip() + if not creator_url: + return "" + if not creator_url.startswith(("http://", "https://")): + return creator_url.split("?")[0] + parsed = urlparse(creator_url) + query = parse_qs(parsed.query) + portrait = ( + query.get("id", [""])[0] + or query.get("portrait", [""])[0] + or query.get("un", [""])[0] + ) + return unquote(portrait).split("?")[0] def _sync_request(self, method, url, proxy=None, **kwargs): """ @@ -270,35 +395,29 @@ class BaiduTieBaClient(AbstractApiClient): utils.logger.error("[BaiduTieBaClient.get_notes_by_keyword] playwright_page is None, cannot use browser mode") raise Exception("playwright_page is required for browser-based search") - # Construct search URL - # Example: https://tieba.baidu.com/f/search/res?ie=utf-8&qw=keyword - search_url = f"{self._host}/f/search/res" params = { - "ie": "utf-8", - "qw": keyword, - "rn": page_size, + "rn": max(page_size, 20), + "st": sort.value, + "word": keyword, + "needbrand": 1, + "sug_type": 2, "pn": page, - "sm": sort.value, - "only_thread": note_type.value, + "come_from": "search", + "subapp_type": "pc", + "_client_type": "20", } - - # Concatenate full URL - full_url = f"{search_url}?{urlencode(params)}" - utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] Accessing search page: {full_url}") + utils.logger.info( + f"[BaiduTieBaClient.get_notes_by_keyword] Accessing search API: " + f"{self._host}/mo/q/search/multsearch?{urlencode(params)}" + ) try: - # Use Playwright to access search page - await self.playwright_page.goto(full_url, wait_until="domcontentloaded") - - # Wait for page loading, using delay setting from config file - await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) - - # Get page HTML content - page_content = await self.playwright_page.content() - utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] Successfully retrieved search page HTML, length: {len(page_content)}") - - # Extract search results - notes = self._page_extractor.extract_search_note_list(page_content) + api_data = await self._fetch_json_by_browser( + "/mo/q/search/multsearch", + params=params, + use_sign=True, + ) + notes = self._page_extractor.extract_search_note_list_from_api(api_data)[:page_size] utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] Extracted {len(notes)} posts") return notes @@ -319,23 +438,11 @@ class BaiduTieBaClient(AbstractApiClient): utils.logger.error("[BaiduTieBaClient.get_note_by_id] playwright_page is None, cannot use browser mode") raise Exception("playwright_page is required for browser-based note detail fetching") - # Construct post detail URL - note_url = f"{self._host}/p/{note_id}" - utils.logger.info(f"[BaiduTieBaClient.get_note_by_id] Accessing post detail page: {note_url}") + utils.logger.info(f"[BaiduTieBaClient.get_note_by_id] Accessing post detail API, note_id: {note_id}") try: - # Use Playwright to access post detail page - await self.playwright_page.goto(note_url, wait_until="domcontentloaded") - - # Wait for page loading, using delay setting from config file - await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) - - # Get page HTML content - page_content = await self.playwright_page.content() - utils.logger.info(f"[BaiduTieBaClient.get_note_by_id] Successfully retrieved post detail HTML, length: {len(page_content)}") - - # Extract post details - note_detail = self._page_extractor.extract_note_detail(page_content) + api_data = await self._get_pc_page_data(note_id=note_id, page=1) + note_detail = self._page_extractor.extract_note_detail_from_api(api_data) return note_detail except Exception as e: @@ -367,23 +474,15 @@ class BaiduTieBaClient(AbstractApiClient): current_page = 1 while note_detail.total_replay_page >= current_page and len(result) < max_count: - # Construct comment page URL - comment_url = f"{self._host}/p/{note_detail.note_id}?pn={current_page}" - utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] Accessing comment page: {comment_url}") + utils.logger.info( + f"[BaiduTieBaClient.get_note_all_comments] Accessing comment API, " + f"note_id: {note_detail.note_id}, page: {current_page}" + ) try: - # Use Playwright to access comment page - await self.playwright_page.goto(comment_url, wait_until="domcontentloaded") - - # Wait for page loading, using delay setting from config file - await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) - - # Get page HTML content - page_content = await self.playwright_page.content() - - # Extract comments - comments = self._page_extractor.extract_tieba_note_parment_comments( - page_content, note_id=note_detail.note_id + api_data = await self._get_pc_page_data(note_id=note_detail.note_id, page=current_page) + comments = self._page_extractor.extract_tieba_note_parent_comments_from_api( + api_data, note_detail=note_detail ) if not comments: @@ -498,7 +597,7 @@ class BaiduTieBaClient(AbstractApiClient): async def get_notes_by_tieba_name(self, tieba_name: str, page_num: int) -> List[TiebaNote]: """ - Get post list by Tieba name (uses Playwright to access page, avoiding API detection) + Get post list by Tieba name from current PC forum JSON API. Args: tieba_name: Tieba name page_num: Page number @@ -510,23 +609,33 @@ class BaiduTieBaClient(AbstractApiClient): utils.logger.error("[BaiduTieBaClient.get_notes_by_tieba_name] playwright_page is None, cannot use browser mode") raise Exception("playwright_page is required for browser-based tieba note fetching") - # Construct Tieba post list URL - tieba_url = f"{self._host}/f?kw={quote(tieba_name)}&pn={page_num}" - utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] Accessing Tieba page: {tieba_url}") + page_size = 30 + api_page = page_num // page_size + 1 + tbs = await self._get_pc_tbs() + utils.logger.info( + f"[BaiduTieBaClient.get_notes_by_tieba_name] Accessing Tieba FRS API, " + f"tieba_name: {tieba_name}, page: {api_page}" + ) try: - # Use Playwright to access Tieba page - await self.playwright_page.goto(tieba_url, wait_until="domcontentloaded") - - # Wait for page loading, using delay setting from config file - await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) - - # Get page HTML content - page_content = await self.playwright_page.content() - utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] Successfully retrieved Tieba page HTML, length: {len(page_content)}") - - # Extract post list - notes = self._page_extractor.extract_tieba_note_list(page_content) + api_data = await self._fetch_json_by_browser( + "/c/f/frs/page_pc", + method="POST", + data={ + "kw": quote(tieba_name), + "pn": api_page, + "sort_type": -1, + "is_newfrs": 1, + "is_newfeed": 1, + "rn": page_size, + "rn_need": 10, + "tbs": tbs, + "subapp_type": "pc", + "_client_type": "20", + }, + use_sign=True, + ) + notes = self._page_extractor.extract_tieba_note_list_from_frs_api(api_data)[:page_size] utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] Extracted {len(notes)} posts") return notes @@ -534,38 +643,72 @@ class BaiduTieBaClient(AbstractApiClient): utils.logger.error(f"[BaiduTieBaClient.get_notes_by_tieba_name] Failed to get Tieba post list: {e}") raise - async def get_creator_info_by_url(self, creator_url: str) -> str: + async def get_creator_info_by_url(self, creator_url: str) -> TiebaCreator: """ - Get creator information by creator URL (uses Playwright to access page, avoiding API detection) + Get creator information by creator URL from current PC JSON API. Args: creator_url: Creator homepage URL Returns: - str: Page HTML content + TiebaCreator: Creator information """ if not self.playwright_page: utils.logger.error("[BaiduTieBaClient.get_creator_info_by_url] playwright_page is None, cannot use browser mode") raise Exception("playwright_page is required for browser-based creator info fetching") - utils.logger.info(f"[BaiduTieBaClient.get_creator_info_by_url] Accessing creator homepage: {creator_url}") + portrait = self._extract_creator_portrait(creator_url) + if not portrait: + raise Exception(f"Can not extract Tieba creator portrait from url: {creator_url}") + + utils.logger.info( + f"[BaiduTieBaClient.get_creator_info_by_url] Accessing creator info API, portrait: {portrait}" + ) try: - # Use Playwright to access creator homepage - await self.playwright_page.goto(creator_url, wait_until="domcontentloaded") - - # Wait for page loading, using delay setting from config file - await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC) - - # Get page HTML content - page_content = await self.playwright_page.content() - utils.logger.info(f"[BaiduTieBaClient.get_creator_info_by_url] Successfully retrieved creator homepage HTML, length: {len(page_content)}") - - return page_content + api_data = await self._fetch_json_by_browser( + "/c/u/pc/homeSidebarRight", + params={ + "portrait": portrait, + "un": "", + "subapp_type": "pc", + "_client_type": "20", + }, + use_sign=True, + ) + return self._page_extractor.extract_creator_info_from_api(api_data) except Exception as e: - utils.logger.error(f"[BaiduTieBaClient.get_creator_info_by_url] Failed to get creator homepage: {e}") + utils.logger.error(f"[BaiduTieBaClient.get_creator_info_by_url] Failed to get creator info: {e}") raise + async def get_notes_by_creator_portrait( + self, portrait: str, page_number: int, page_size: int = 20 + ) -> Dict: + """ + Get creator's thread feed by creator portrait from current PC JSON API. + """ + if not self.playwright_page: + utils.logger.error("[BaiduTieBaClient.get_notes_by_creator_portrait] playwright_page is None, cannot use browser mode") + raise Exception("playwright_page is required for browser-based creator notes fetching") + + utils.logger.info( + f"[BaiduTieBaClient.get_notes_by_creator_portrait] Accessing creator feed API, " + f"portrait: {portrait}, page: {page_number}" + ) + return await self._fetch_json_by_browser( + "/c/u/feed/myThread", + params={ + "pn": page_number, + "rn": page_size, + "portrait": portrait, + "type": 1, + "un": "", + "subapp_type": "pc", + "_client_type": "20", + }, + use_sign=True, + ) + async def get_notes_by_creator(self, user_name: str, page_number: int) -> Dict: """ Get creator's posts by creator (uses Playwright to access page, avoiding API detection) @@ -648,12 +791,12 @@ class BaiduTieBaClient(AbstractApiClient): while notes_has_more == 1 and (max_note_count == 0 or total_get_count < max_note_count): notes_res = await self.get_notes_by_creator(user_name, page_number) if not notes_res or notes_res.get("no") != 0: - utils.logger.error(f"[WeiboClient.get_notes_by_creator] got user_name:{user_name} notes failed, notes_res: {notes_res}") + utils.logger.error(f"[TieBaClient.get_notes_by_creator] got user_name:{user_name} notes failed, notes_res: {notes_res}") break notes_data = notes_res.get("data") notes_has_more = notes_data.get("has_more") notes = notes_data["thread_list"] - utils.logger.info(f"[WeiboClient.get_all_notes_by_creator] got user_name:{user_name} notes len : {len(notes)}") + utils.logger.info(f"[TieBaClient.get_all_notes_by_creator] got user_name:{user_name} notes len : {len(notes)}") note_detail_task = [self.get_note_by_id(note['thread_id']) for note in notes] notes = await asyncio.gather(*note_detail_task) @@ -664,3 +807,59 @@ class BaiduTieBaClient(AbstractApiClient): page_number += 1 total_get_count += page_per_count return result + + async def get_all_notes_by_creator_url( + self, + creator_url: str, + crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_note_count: int = 0, + ) -> List[TiebaNote]: + """ + Get all creator posts by current PC creator feed API. + """ + portrait = self._extract_creator_portrait(creator_url) + if not portrait: + raise Exception(f"Can not extract Tieba creator portrait from url: {creator_url}") + + result: List[TiebaNote] = [] + page_number = 1 + page_size = 20 + + while max_note_count == 0 or len(result) < max_note_count: + notes_res = await self.get_notes_by_creator_portrait( + portrait=portrait, + page_number=page_number, + page_size=page_size, + ) + thread_id_list = self._page_extractor.extract_creator_thread_id_list_from_api(notes_res) + if not thread_id_list: + utils.logger.info( + f"[BaiduTieBaClient.get_all_notes_by_creator_url] " + f"Creator portrait:{portrait} page:{page_number} has no threads" + ) + break + + if max_note_count: + thread_id_list = thread_id_list[: max_note_count - len(result)] + + utils.logger.info( + f"[BaiduTieBaClient.get_all_notes_by_creator_url] " + f"got portrait:{portrait} thread ids len: {len(thread_id_list)}" + ) + note_detail_task = [self.get_note_by_id(thread_id) for thread_id in thread_id_list] + notes = await asyncio.gather(*note_detail_task) + notes = [note for note in notes if note] + if callback and notes: + await callback(notes) + result.extend(notes) + + data = notes_res.get("data", {}) + has_more = int(data.get("has_more") or 0) + if not has_more: + break + + await asyncio.sleep(crawl_interval) + page_number += 1 + + return result diff --git a/media_platform/tieba/core.py b/media_platform/tieba/core.py index 8fbdb0a..940ab9f 100644 --- a/media_platform/tieba/core.py +++ b/media_platform/tieba/core.py @@ -213,7 +213,7 @@ class TieBaCrawler(AbstractCrawler): Returns: """ - tieba_limit_count = 50 + tieba_limit_count = 30 if config.CRAWLER_MAX_NOTES_COUNT < tieba_limit_count: config.CRAWLER_MAX_NOTES_COUNT = tieba_limit_count for tieba_name in config.TIEBA_NAME_LIST: @@ -245,7 +245,7 @@ class TieBaCrawler(AbstractCrawler): page_number += tieba_limit_count async def get_specified_notes( - self, note_id_list: List[str] = config.TIEBA_SPECIFIED_ID_LIST + self, note_id_list: Optional[List[str]] = None ): """ Get the information and comments of the specified post @@ -255,6 +255,8 @@ class TieBaCrawler(AbstractCrawler): Returns: """ + if note_id_list is None: + note_id_list = config.TIEBA_SPECIFIED_ID_LIST semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list = [ self.get_note_detail_async_task(note_id=note_id, semaphore=semaphore) @@ -365,18 +367,15 @@ class TieBaCrawler(AbstractCrawler): """ utils.logger.info( - "[WeiboCrawler.get_creators_and_notes] Begin get weibo creators" + "[TieBaCrawler.get_creators_and_notes] Begin get tieba creators" ) for creator_url in config.TIEBA_CREATOR_URL_LIST: - creator_page_html_content = await self.tieba_client.get_creator_info_by_url( + creator_info: TiebaCreator = await self.tieba_client.get_creator_info_by_url( creator_url=creator_url ) - creator_info: TiebaCreator = self._page_extractor.extract_creator_info( - creator_page_html_content - ) if creator_info: utils.logger.info( - f"[WeiboCrawler.get_creators_and_notes] creator info: {creator_info}" + f"[TieBaCrawler.get_creators_and_notes] creator info: {creator_info}" ) if not creator_info: raise Exception("Get creator info error") @@ -385,12 +384,11 @@ class TieBaCrawler(AbstractCrawler): # Get all note information of the creator all_notes_list = ( - await self.tieba_client.get_all_notes_by_creator_user_name( - user_name=creator_info.user_name, + await self.tieba_client.get_all_notes_by_creator_url( + creator_url=creator_url, crawl_interval=0, callback=tieba_store.batch_update_tieba_notes, max_note_count=config.CRAWLER_MAX_NOTES_COUNT, - creator_page_html_content=creator_page_html_content, ) ) @@ -398,7 +396,7 @@ class TieBaCrawler(AbstractCrawler): else: utils.logger.error( - f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_url:{creator_url}" + f"[TieBaCrawler.get_creators_and_notes] get creator info error, creator_url:{creator_url}" ) async def _navigate_to_tieba_via_baidu(self): diff --git a/media_platform/tieba/help.py b/media_platform/tieba/help.py index a91a67d..89c1f5b 100644 --- a/media_platform/tieba/help.py +++ b/media_platform/tieba/help.py @@ -22,8 +22,8 @@ import html import json import re -from typing import Dict, List, Tuple -from urllib.parse import parse_qs, unquote +from typing import Any, Dict, List, Tuple +from urllib.parse import parse_qs, quote, unquote, urljoin from parsel import Selector @@ -39,6 +39,306 @@ class TieBaExtractor: def __init__(self): pass + @staticmethod + def _class_contains(class_name: str) -> str: + return f"contains(concat(' ', normalize-space(@class), ' '), ' {class_name} ')" + + @staticmethod + def _normalize_text(text: str) -> str: + return re.sub(r"\s+", " ", text or "").strip() + + @classmethod + def _selector_text(cls, selector: Selector, xpath: str) -> str: + node = selector.xpath(xpath) + if not node: + return "" + return cls._normalize_text(node[0].xpath("string(.)").get(default="")) + + @staticmethod + def _absolute_url(url: str) -> str: + return urljoin(const.TIEBA_URL, (url or "").strip()) + + @staticmethod + def _extract_note_id_from_url(url: str) -> str: + note_id_match = re.search(r"/p/(\d+)", url or "") + return note_id_match.group(1) if note_id_match else "" + + @staticmethod + def _text_to_int(text: str) -> int: + match = re.search(r"\d+", text or "") + return int(match.group(0)) if match else 0 + + @staticmethod + def _ensure_tieba_suffix(tieba_name: str) -> str: + tieba_name = (tieba_name or "").strip() + return tieba_name if not tieba_name or tieba_name.endswith("吧") else f"{tieba_name}吧" + + @classmethod + def _tieba_link_from_name(cls, tieba_name: str) -> str: + if not tieba_name: + return const.TIEBA_URL + return f"{const.TIEBA_URL}/f?kw={quote(tieba_name.removesuffix('吧'))}" + + @classmethod + def _extract_api_content_text(cls, content: Any) -> str: + if isinstance(content, str): + return cls._normalize_text(content) + if not isinstance(content, list): + return "" + text_list: List[str] = [] + for item in content: + if not isinstance(item, dict): + continue + text = item.get("text") or item.get("c") or "" + if text: + text_list.append(str(text)) + return cls._normalize_text("".join(text_list)) + + @staticmethod + def _api_user_map(api_data: Dict) -> Dict[str, Dict]: + return {str(user.get("id")): user for user in api_data.get("user_list", []) if user.get("id")} + + @staticmethod + def _api_user_link(user: Dict) -> str: + portrait = (user or {}).get("portrait", "") + if not portrait: + return "" + return f"{const.TIEBA_URL}/home/main?id={quote(str(portrait))}" + + @staticmethod + def _api_user_avatar(user: Dict) -> str: + image_data = ( + (user or {}) + .get("user_show_info", {}) + .get("feed_head", {}) + .get("image_data", {}) + ) + return image_data.get("img_url") or ( + "https://gss0.bdstatic.com/6LZ1dD3d1sgCo2Kml5_Y_D3/sys/portrait/item/" + f"{user.get('portrait', '')}" + if user and user.get("portrait") + else "" + ) + + def extract_search_note_list_from_api(self, api_data: Dict) -> List[TiebaNote]: + """ + Extract Tieba post list from current PC search JSON API. + """ + result: List[TiebaNote] = [] + cards = api_data.get("data", {}).get("card_list", []) + for card in cards: + if card.get("cardInfo") != "thread" and card.get("cardStyle") != "thread": + continue + item = card.get("data") or {} + note_id = str(item.get("tid") or "") + if not note_id: + continue + user = item.get("user") or {} + tieba_name = self._ensure_tieba_suffix(item.get("forum_name") or "") + tieba_note = TiebaNote( + note_id=note_id, + title=self._normalize_text(item.get("title") or ""), + desc=self._normalize_text(item.get("content") or ""), + note_url=f"{const.TIEBA_URL}/p/{note_id}", + publish_time=utils.get_time_str_from_unix_time( + item.get("time") or item.get("create_time") or 0 + ), + user_link="", + user_nickname=user.get("show_nickname") or user.get("user_name") or "", + user_avatar=user.get("portrait") or user.get("portraith") or "", + tieba_name=tieba_name, + tieba_link=self._tieba_link_from_name(tieba_name), + total_replay_num=item.get("post_num") or 0, + ) + result.append(tieba_note) + return result + + def extract_note_detail_from_api(self, api_data: Dict) -> TiebaNote: + """ + Extract Tieba post detail from current PC page_pc JSON API. + """ + thread = api_data.get("thread") or {} + first_floor = api_data.get("first_floor") or {} + forum = api_data.get("forum") or api_data.get("display_forum") or {} + page = api_data.get("page") or {} + user_map = self._api_user_map(api_data) + author = user_map.get(str(first_floor.get("author_id"))) or {} + note_id = str(thread.get("id") or thread.get("tid") or first_floor.get("tid") or "") + tieba_name = self._ensure_tieba_suffix(forum.get("name") or "") + note = TiebaNote( + note_id=note_id, + title=self._clean_title(thread.get("title") or first_floor.get("title") or "", tieba_name), + desc=self._extract_api_content_text( + first_floor.get("content") + or thread.get("origin_thread_info", {}).get("abstract") + or thread.get("origin_thread_info", {}).get("content") + ), + note_url=f"{const.TIEBA_URL}/p/{note_id}", + publish_time=utils.get_time_str_from_unix_time( + first_floor.get("time") or thread.get("create_time") or 0 + ), + user_link=self._api_user_link(author), + user_nickname=author.get("name_show") or author.get("name") or "", + user_avatar=self._api_user_avatar(author), + tieba_name=tieba_name, + tieba_link=self._tieba_link_from_name(tieba_name), + total_replay_num=thread.get("reply_num") or 0, + total_replay_page=page.get("total_page") or 0, + ip_location=author.get("ip_address") or "", + ) + return note + + def extract_tieba_note_parent_comments_from_api( + self, api_data: Dict, note_detail: TiebaNote + ) -> List[TiebaComment]: + """ + Extract first-level comments from current PC page_pc JSON API. + """ + forum = api_data.get("forum") or api_data.get("display_forum") or {} + tieba_id = str(forum.get("id") or "") + tieba_name = note_detail.tieba_name or self._ensure_tieba_suffix(forum.get("name") or "") + tieba_link = note_detail.tieba_link or self._tieba_link_from_name(tieba_name) + user_map = self._api_user_map(api_data) + result: List[TiebaComment] = [] + for item in api_data.get("post_list", []): + comment_id = str(item.get("id") or "") + if not comment_id: + continue + user = user_map.get(str(item.get("author_id"))) or {} + comment = TiebaComment( + comment_id=comment_id, + sub_comment_count=item.get("sub_post_number") or 0, + content=self._extract_api_content_text(item.get("content")), + note_url=note_detail.note_url, + user_link=self._api_user_link(user), + user_nickname=user.get("name_show") or user.get("name") or "", + user_avatar=self._api_user_avatar(user), + tieba_id=tieba_id, + tieba_name=tieba_name, + tieba_link=tieba_link, + ip_location=user.get("ip_address") or "", + publish_time=utils.get_time_str_from_unix_time(item.get("time") or 0), + note_id=note_detail.note_id, + ) + result.append(comment) + return result + + def extract_creator_info_from_api(self, api_data: Dict) -> TiebaCreator: + """ + Extract Tieba creator information from current PC creator JSON API. + """ + user = api_data.get("data", {}).get("user", {}) + if not user: + raise ValueError(f"Creator API response does not contain user info: {api_data}") + gender_value = user.get("sex", user.get("gender", 0)) + gender = "Unknown" + if gender_value == 1: + gender = "Male" + elif gender_value == 2: + gender = "Female" + + return TiebaCreator( + user_id=str(user.get("id", "")), + user_name=str(user.get("name", "")), + nickname=str(user.get("name_show") or user.get("name") or ""), + avatar=self._api_user_avatar(user), + gender=gender, + ip_location=str(user.get("ip_address", "")), + follows=int(user.get("concern_num") or 0), + fans=int(user.get("fans_num") or 0), + registration_duration=str(user.get("tb_age", "")), + ) + + @staticmethod + def extract_creator_thread_id_list_from_api(api_data: Dict) -> List[str]: + """ + Extract creator thread ids from current PC creator feed JSON API. + """ + thread_ids: List[str] = [] + for item in api_data.get("data", {}).get("list", []): + thread_info = item.get("thread_info") or {} + thread_id = thread_info.get("tid") or thread_info.get("id") + if thread_id: + thread_ids.append(str(thread_id)) + return thread_ids + + def extract_tieba_note_list_from_frs_api(self, api_data: Dict) -> List[TiebaNote]: + """ + Extract Tieba thread ids from current PC forum page JSON API. + + The by-forum command immediately fetches full details for every id, so + this list intentionally carries only stable routing fields. + """ + forum = api_data.get("forum", {}) + tieba_name = self._ensure_tieba_suffix(forum.get("name") or "") + tieba_link = self._tieba_link_from_name(tieba_name) + tids = [ + tid.strip() + for tid in str(forum.get("tids") or "").split(",") + if tid.strip() + ] + return [ + TiebaNote( + note_id=tid, + title="", + desc="", + note_url=f"{const.TIEBA_URL}/p/{tid}", + tieba_name=tieba_name, + tieba_link=tieba_link, + ) + for tid in tids + ] + + @staticmethod + def _decode_js_string(value: str) -> str: + if not value or value == "null": + return "" + try: + decoded_value = json.loads(f'"{value}"') + return decoded_value if isinstance(decoded_value, str) else str(decoded_value) + except Exception: + return value + + @classmethod + def _extract_forum_info(cls, selector: Selector, page_content: str) -> Tuple[str, str]: + forum_xpath = f"//a[{cls._class_contains('card_title_fname')}]" + forum_link_selector = selector.xpath(forum_xpath) + tieba_name = cls._selector_text(selector, forum_xpath) + tieba_link = cls._absolute_url(forum_link_selector.xpath("./@href").get(default="")) + + if not tieba_name: + patterns = [ + r"PageData\.forum\s*=\s*\{.*?['\"]name['\"]\s*:\s*\"([^\"\\\\]*(?:\\\\.[^\"\\\\]*)*)\"", + r'"forum_name"\s*:\s*"([^"\\\\]*(?:\\\\.[^"\\\\]*)*)"', + r'"kw"\s*:\s*"([^"\\\\]*(?:\\\\.[^"\\\\]*)*)"', + ] + for pattern in patterns: + match = re.search(pattern, page_content, re.S) + if match: + tieba_name = cls._decode_js_string(match.group(1)) + if tieba_name: + break + + if not tieba_name: + title = selector.xpath("//title/text()").get(default="") + match = re.search(r"(.+?)吧[-_]", title) + if match: + tieba_name = cls._normalize_text(match.group(1)) + + if not tieba_link and tieba_name: + tieba_link = f"{const.TIEBA_URL}/f?kw={quote(tieba_name.removesuffix('吧'))}" + + return tieba_name, tieba_link or const.TIEBA_URL + + @classmethod + def _clean_title(cls, title: str, tieba_name: str = "") -> str: + title = cls._normalize_text(title) + title = re.sub(r"_(?:百度贴吧|Baidu Tieba)$", "", title).strip() + for name in {tieba_name, tieba_name.removesuffix("吧")}: + if name: + title = title.replace(f"【{name}】", "").strip() + return title + @staticmethod def extract_search_note_list(page_content: str) -> List[TiebaNote]: """ @@ -49,23 +349,115 @@ class TieBaExtractor: Returns: List of Tieba post objects """ - xpath_selector = "//div[@class='s_post']" - post_list = Selector(text=page_content).xpath(xpath_selector) + extractor = TieBaExtractor() + selector = Selector(text=page_content) + post_list = selector.xpath( + f"//div[{extractor._class_contains('s_post')}]" + ) result: List[TiebaNote] = [] for post in post_list: - tieba_note = TiebaNote(note_id=post.xpath(".//span[@class='p_title']/a/@data-tid").get(default='').strip(), - title=post.xpath(".//span[@class='p_title']/a/text()").get(default='').strip(), - desc=post.xpath(".//div[@class='p_content']/text()").get(default='').strip(), - note_url=const.TIEBA_URL + post.xpath(".//span[@class='p_title']/a/@href").get( - default=''), - user_nickname=post.xpath(".//a[starts-with(@href, '/home/main')]/font/text()").get( - default='').strip(), user_link=const.TIEBA_URL + post.xpath( - ".//a[starts-with(@href, '/home/main')]/@href").get(default=''), - tieba_name=post.xpath(".//a[@class='p_forum']/font/text()").get(default='').strip(), - tieba_link=const.TIEBA_URL + post.xpath(".//a[@class='p_forum']/@href").get( - default=''), - publish_time=post.xpath(".//font[@class='p_green p_date']/text()").get( - default='').strip(), ) + title_link = post.xpath(".//*[contains(@class, 'p_title')]//a[1]") + note_url = extractor._absolute_url(title_link.xpath("./@href").get(default="")) + note_id = title_link.xpath("./@data-tid").get(default="").strip() + if not note_id: + note_id = extractor._extract_note_id_from_url(note_url) + user_selector = post.xpath(".//a[contains(@href, '/home/main')][1]") + forum_selector = post.xpath(f".//a[{extractor._class_contains('p_forum')}][1]") + tieba_note = TiebaNote( + note_id=note_id, + title=extractor._selector_text(post, ".//*[contains(@class, 'p_title')]//a[1]"), + desc=extractor._selector_text( + post, f".//div[{extractor._class_contains('p_content')}]" + ), + note_url=note_url, + user_nickname=extractor._selector_text( + post, ".//a[contains(@href, '/home/main')][1]" + ), + user_link=extractor._absolute_url(user_selector.xpath("./@href").get(default="")), + tieba_name=extractor._selector_text( + post, f".//a[{extractor._class_contains('p_forum')}][1]" + ), + tieba_link=extractor._absolute_url(forum_selector.xpath("./@href").get(default="")), + publish_time=extractor._selector_text( + post, ".//*[contains(@class, 'p_date')][1]" + ), + ) + result.append(tieba_note) + if result: + return result + + # Tieba search changed to a PC feed/card layout in 2026. The old + # s_post nodes disappeared, while each search result now lives in a + # threadcardclass card with overlay links to /p/. + post_list = selector.xpath( + f"//*[contains(concat(' ', normalize-space(@class), ' '), ' threadcardclass ') " + f"and .//a[contains(@href, '/p/')]]" + ) + seen_note_ids = set() + for post in post_list: + title_link = post.xpath( + f".//a[{extractor._class_contains('action-link-bg')} and contains(@href, '/p/')][1]" + f"|.//a[contains(@href, '/p/')][1]" + ) + note_url = extractor._absolute_url(title_link.xpath("./@href").get(default="")) + note_id = extractor._extract_note_id_from_url(note_url) + if not note_id or note_id in seen_note_ids: + continue + seen_note_ids.add(note_id) + + tieba_name = extractor._selector_text( + post, f".//*[{extractor._class_contains('forum-name-text')}][1]" + ) + tieba_link = "" + forum_link = post.xpath(".//a[contains(@href, '/f?')][1]/@href").get(default="") + if forum_link: + tieba_link = extractor._absolute_url(forum_link) + elif tieba_name: + tieba_keyword = tieba_name.removesuffix("吧") + tieba_link = f"{const.TIEBA_URL}/f?kw={quote(tieba_keyword)}" + else: + tieba_link = const.TIEBA_URL + + publish_time = "" + top_title_text = extractor._selector_text( + post, f".//*[{extractor._class_contains('top-title')}][1]" + ) + publish_match = re.search(r"发布于\s*([^\s]+)", top_title_text) + if publish_match: + publish_time = publish_match.group(1) + + title = extractor._selector_text( + post, f".//*[{extractor._class_contains('title-wrap')}][1]" + ) + desc = extractor._selector_text( + post, f".//*[{extractor._class_contains('abstract-wrap')}][1]" + ) + if not title: + title = extractor._normalize_text(desc[:80]) + + user_nickname = extractor._selector_text( + post, f".//*[{extractor._class_contains('forum-attention')}][1]" + ) + if not user_nickname and publish_time: + user_nickname = extractor._normalize_text( + top_title_text.split("发布于", 1)[0] + ) + + comment_text = extractor._selector_text( + post, f".//a[{extractor._class_contains('comment-link-zone')}][1]" + ) + tieba_note = TiebaNote( + note_id=note_id, + title=title, + desc=desc, + note_url=f"{const.TIEBA_URL}/p/{note_id}", + user_nickname=user_nickname, + user_link="", + tieba_name=tieba_name, + tieba_link=tieba_link, + publish_time=publish_time, + total_replay_num=extractor._text_to_int(comment_text), + ) result.append(tieba_note) return result @@ -80,27 +472,39 @@ class TieBaExtractor: """ page_content = page_content.replace('