Files
程序员阿江(Relakkes) d614ccf247 docs: translate comments and metadata to English
Update Chinese comments, variable descriptions, and metadata across
multiple configuration and core files to English. This improves
codebase accessibility for international developers. Additionally,
removed the sponsorship section from README files.
2026-02-12 05:30:11 +08:00

685 lines
24 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2025 relakkes@gmail.com
#
# This file is part of MediaCrawler project.
# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/media_platform/xhs/client.py
# GitHub: https://github.com/NanmiCoder
# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1
#
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio
import json
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode
import httpx
from playwright.async_api import BrowserContext, Page
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_not_exception_type
import config
from base.base_crawler import AbstractApiClient
from proxy.proxy_mixin import ProxyRefreshMixin
from tools import utils
if TYPE_CHECKING:
from proxy.proxy_ip_pool import ProxyIpPool
from .exception import DataFetchError, IPBlockError, NoteNotFoundError
from .field import SearchNoteType, SearchSortType
from .help import get_search_id
from .extractor import XiaoHongShuExtractor
from .playwright_sign import sign_with_playwright
class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin):
def __init__(
self,
timeout=60, # If media crawling is enabled, Xiaohongshu long videos need longer timeout
proxy=None,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str],
proxy_ip_pool: Optional["ProxyIpPool"] = None,
):
self.proxy = proxy
self.timeout = timeout
self.headers = headers
self._host = "https://edith.xiaohongshu.com"
self._domain = "https://www.xiaohongshu.com"
self.IP_ERROR_STR = "Network connection error, please check network settings or restart"
self.IP_ERROR_CODE = 300012
self.NOTE_NOT_FOUND_CODE = -510000
self.NOTE_ABNORMAL_STR = "Note status abnormal, please check later"
self.NOTE_ABNORMAL_CODE = -510001
self.playwright_page = playwright_page
self.cookie_dict = cookie_dict
self._extractor = XiaoHongShuExtractor()
# Initialize proxy pool (from ProxyRefreshMixin)
self.init_proxy_pool(proxy_ip_pool)
async def _pre_headers(self, url: str, params: Optional[Dict] = None, payload: Optional[Dict] = None) -> Dict:
"""Request header parameter signing (using playwright injection method)
Args:
url: Request URL
params: GET request parameters
payload: POST request parameters
Returns:
Dict: Signed request header parameters
"""
a1_value = self.cookie_dict.get("a1", "")
# Determine request data, method and URI
if params is not None:
data = params
method = "GET"
elif payload is not None:
data = payload
method = "POST"
else:
raise ValueError("params or payload is required")
# Generate signature using playwright injection method
signs = await sign_with_playwright(
page=self.playwright_page,
uri=url,
data=data,
a1=a1_value,
method=method,
)
headers = {
"X-S": signs["x-s"],
"X-T": signs["x-t"],
"x-S-Common": signs["x-s-common"],
"X-B3-Traceid": signs["x-b3-traceid"],
}
self.headers.update(headers)
return self.headers
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1), retry=retry_if_not_exception_type(NoteNotFoundError))
async def request(self, method, url, **kwargs) -> Union[str, Any]:
"""
Wrapper for httpx common request method, processes request response
Args:
method: Request method
url: Request URL
**kwargs: Other request parameters, such as headers, body, etc.
Returns:
"""
# Check if proxy is expired before each request
await self._refresh_proxy_if_expired()
# return response.text
return_response = kwargs.pop("return_response", False)
async with httpx.AsyncClient(proxy=self.proxy) as client:
response = await client.request(method, url, timeout=self.timeout, **kwargs)
if response.status_code == 471 or response.status_code == 461:
# someday someone maybe will bypass captcha
verify_type = response.headers["Verifytype"]
verify_uuid = response.headers["Verifyuuid"]
msg = f"CAPTCHA appeared, request failed, Verifytype: {verify_type}, Verifyuuid: {verify_uuid}, Response: {response}"
utils.logger.error(msg)
raise Exception(msg)
if return_response:
return response.text
data: Dict = response.json()
if data["success"]:
return data.get("data", data.get("success", {}))
elif data["code"] == self.IP_ERROR_CODE:
raise IPBlockError(self.IP_ERROR_STR)
elif data["code"] in (self.NOTE_NOT_FOUND_CODE, self.NOTE_ABNORMAL_CODE):
raise NoteNotFoundError(f"Note not found or abnormal, code: {data['code']}")
else:
err_msg = data.get("msg", None) or f"{response.text}"
raise DataFetchError(err_msg)
async def get(self, uri: str, params: Optional[Dict] = None) -> Dict:
"""
GET request, signs request headers
Args:
uri: Request route
params: Request parameters
Returns:
"""
headers = await self._pre_headers(uri, params)
full_url = f"{self._host}{uri}"
return await self.request(
method="GET", url=full_url, headers=headers, params=params
)
async def post(self, uri: str, data: dict, **kwargs) -> Dict:
"""
POST request, signs request headers
Args:
uri: Request route
data: Request body parameters
Returns:
"""
headers = await self._pre_headers(uri, payload=data)
json_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
return await self.request(
method="POST",
url=f"{self._host}{uri}",
data=json_str,
headers=headers,
**kwargs,
)
async def get_note_media(self, url: str) -> Union[bytes, None]:
# Check if proxy is expired before request
await self._refresh_proxy_if_expired()
async with httpx.AsyncClient(proxy=self.proxy) as client:
try:
response = await client.request("GET", url, timeout=self.timeout)
response.raise_for_status()
if not response.reason_phrase == "OK":
utils.logger.error(
f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}"
)
return None
else:
return response.content
except (
httpx.HTTPError
) as exc: # some wrong when call httpx.request method, such as connection error, client error, server error or response status code is not 2xx
utils.logger.error(
f"[XiaoHongShuClient.get_aweme_media] {exc.__class__.__name__} for {exc.request.url} - {exc}"
) # Keep original exception type name for developer debugging
return None
async def query_self(self) -> Optional[Dict]:
"""
Query self user info to check login state
Returns:
Dict: User info if logged in, None otherwise
"""
uri = "/api/sns/web/v1/user/selfinfo"
headers = await self._pre_headers(uri, params={})
async with httpx.AsyncClient(proxy=self.proxy) as client:
response = await client.get(f"{self._host}{uri}", headers=headers)
if response.status_code == 200:
return response.json()
return None
async def pong(self) -> bool:
"""
Check if login state is still valid by querying self user info
Returns:
bool: True if logged in, False otherwise
"""
utils.logger.info("[XiaoHongShuClient.pong] Begin to check login state...")
ping_flag = False
try:
self_info: Dict = await self.query_self()
if self_info and self_info.get("data", {}).get("result", {}).get("success"):
ping_flag = True
except Exception as e:
utils.logger.error(
f"[XiaoHongShuClient.pong] Check login state failed: {e}, and try to login again..."
)
ping_flag = False
utils.logger.info(f"[XiaoHongShuClient.pong] Login state result: {ping_flag}")
return ping_flag
async def update_cookies(self, browser_context: BrowserContext):
"""
Update cookies method provided by API client, usually called after successful login
Args:
browser_context: Browser context object
Returns:
"""
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.headers["Cookie"] = cookie_str
self.cookie_dict = cookie_dict
async def get_note_by_keyword(
self,
keyword: str,
search_id: str = get_search_id(),
page: int = 1,
page_size: int = 20,
sort: SearchSortType = SearchSortType.GENERAL,
note_type: SearchNoteType = SearchNoteType.ALL,
) -> Dict:
"""
Search notes by keyword
Args:
keyword: Keyword parameter
page: Page number
page_size: Page data length
sort: Search result sorting specification
note_type: Type of note to search
Returns:
"""
uri = "/api/sns/web/v1/search/notes"
data = {
"keyword": keyword,
"page": page,
"page_size": page_size,
"search_id": search_id,
"sort": sort.value,
"note_type": note_type.value,
}
return await self.post(uri, data)
async def get_note_by_id(
self,
note_id: str,
xsec_source: str,
xsec_token: str,
) -> Dict:
"""
Get note detail API
Args:
note_id: Note ID
xsec_source: Channel source
xsec_token: Token returned from search keyword result list
Returns:
"""
if xsec_source == "":
xsec_source = "pc_search"
data = {
"source_note_id": note_id,
"image_formats": ["jpg", "webp", "avif"],
"extra": {"need_body_topic": 1},
"xsec_source": xsec_source,
"xsec_token": xsec_token,
}
uri = "/api/sns/web/v1/feed"
res = await self.post(uri, data)
if res and res.get("items"):
res_dict: Dict = res["items"][0]["note_card"]
return res_dict
# When crawling frequently, some notes may have results while others don't
utils.logger.error(
f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}"
)
return dict()
async def get_note_comments(
self,
note_id: str,
xsec_token: str,
cursor: str = "",
) -> Dict:
"""
Get first-level comments API
Args:
note_id: Note ID
xsec_token: Verification token
cursor: Pagination cursor
Returns:
"""
uri = "/api/sns/web/v2/comment/page"
params = {
"note_id": note_id,
"cursor": cursor,
"top_comment_id": "",
"image_formats": "jpg,webp,avif",
"xsec_token": xsec_token,
}
return await self.get(uri, params)
async def get_note_sub_comments(
self,
note_id: str,
root_comment_id: str,
xsec_token: str,
num: int = 10,
cursor: str = "",
):
"""
Get sub-comments under specified parent comment API
Args:
note_id: Post ID of sub-comments
root_comment_id: Root comment ID
xsec_token: Verification token
num: Pagination quantity
cursor: Pagination cursor
Returns:
"""
uri = "/api/sns/web/v2/comment/sub/page"
params = {
"note_id": note_id,
"root_comment_id": root_comment_id,
"num": str(num),
"cursor": cursor,
"image_formats": "jpg,webp,avif",
"top_comment_id": "",
"xsec_token": xsec_token,
}
return await self.get(uri, params)
async def get_note_all_comments(
self,
note_id: str,
xsec_token: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
max_count: int = 10,
) -> List[Dict]:
"""
Get all first-level comments under specified note, this method will continuously find all comment information under a post
Args:
note_id: Note ID
xsec_token: Verification token
crawl_interval: Crawl delay per note (seconds)
callback: Callback after one note crawl ends
max_count: Maximum number of comments to crawl per note
Returns:
"""
result = []
comments_has_more = True
comments_cursor = ""
while comments_has_more and len(result) < max_count:
comments_res = await self.get_note_comments(
note_id=note_id, xsec_token=xsec_token, cursor=comments_cursor
)
comments_has_more = comments_res.get("has_more", False)
comments_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res:
utils.logger.info(
f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}"
)
break
comments = comments_res["comments"]
if len(result) + len(comments) > max_count:
comments = comments[: max_count - len(result)]
if callback:
await callback(note_id, comments)
await asyncio.sleep(crawl_interval)
result.extend(comments)
sub_comments = await self.get_comments_all_sub_comments(
comments=comments,
xsec_token=xsec_token,
crawl_interval=crawl_interval,
callback=callback,
)
result.extend(sub_comments)
return result
async def get_comments_all_sub_comments(
self,
comments: List[Dict],
xsec_token: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[Dict]:
"""
Get all second-level comments under specified first-level comments, this method will continuously find all second-level comment information under first-level comments
Args:
comments: Comment list
xsec_token: Verification token
crawl_interval: Crawl delay per comment (seconds)
callback: Callback after one comment crawl ends
Returns:
"""
if not config.ENABLE_GET_SUB_COMMENTS:
utils.logger.info(
f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled"
)
return []
result = []
for comment in comments:
try:
note_id = comment.get("note_id")
sub_comments = comment.get("sub_comments")
if sub_comments and callback:
await callback(note_id, sub_comments)
sub_comment_has_more = comment.get("sub_comment_has_more")
if not sub_comment_has_more:
continue
root_comment_id = comment.get("id")
sub_comment_cursor = comment.get("sub_comment_cursor")
while sub_comment_has_more:
try:
comments_res = await self.get_note_sub_comments(
note_id=note_id,
root_comment_id=root_comment_id,
xsec_token=xsec_token,
num=10,
cursor=sub_comment_cursor,
)
if comments_res is None:
utils.logger.info(
f"[XiaoHongShuClient.get_comments_all_sub_comments] No response found for note_id: {note_id}"
)
break
sub_comment_has_more = comments_res.get("has_more", False)
sub_comment_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res:
utils.logger.info(
f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}"
)
break
comments = comments_res["comments"]
if callback:
await callback(note_id, comments)
await asyncio.sleep(crawl_interval)
result.extend(comments)
except DataFetchError as e:
utils.logger.warning(
f"[XiaoHongShuClient.get_comments_all_sub_comments] Failed to get sub-comments for note_id: {note_id}, root_comment_id: {root_comment_id}, error: {e}. Skipping this comment's sub-comments."
)
break # Break out of the sub-comment acquisition loop of the current comment and continue processing the next comment
except Exception as e:
utils.logger.error(
f"[XiaoHongShuClient.get_comments_all_sub_comments] Unexpected error when getting sub-comments for note_id: {note_id}, root_comment_id: {root_comment_id}, error: {e}"
)
break
except Exception as e:
utils.logger.error(
f"[XiaoHongShuClient.get_comments_all_sub_comments] Error processing comment: {comment.get('id', 'unknown')}, error: {e}. Continuing with next comment."
)
continue # Continue to next comment
return result
async def get_creator_info(
self, user_id: str, xsec_token: str = "", xsec_source: str = ""
) -> Dict:
"""
Get user profile brief information by parsing user homepage HTML
The PC user homepage has window.__INITIAL_STATE__ variable, just parse it
Args:
user_id: User ID
xsec_token: Verification token (optional, pass if included in URL)
xsec_source: Channel source (optional, pass if included in URL)
Returns:
Dict: Creator information
"""
# Build URI, add xsec parameters to URL if available
uri = f"/user/profile/{user_id}"
if xsec_token and xsec_source:
uri = f"{uri}?xsec_token={xsec_token}&xsec_source={xsec_source}"
html_content = await self.request(
"GET", self._domain + uri, return_response=True, headers=self.headers
)
return self._extractor.extract_creator_info_from_html(html_content)
async def get_notes_by_creator(
self,
creator: str,
cursor: str,
page_size: int = 30,
xsec_token: str = "",
xsec_source: str = "pc_feed",
) -> Dict:
"""
Get creator's notes
Args:
creator: Creator ID
cursor: Last note ID from previous page
page_size: Page data length
xsec_token: Verification token
xsec_source: Channel source
Returns:
"""
uri = f"/api/sns/web/v1/user_posted"
params = {
"num": page_size,
"cursor": cursor,
"user_id": creator,
"xsec_token": xsec_token,
"xsec_source": xsec_source,
}
return await self.get(uri, params)
async def get_all_notes_by_creator(
self,
user_id: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
xsec_token: str = "",
xsec_source: str = "pc_feed",
) -> List[Dict]:
"""
Get all posts published by specified user, this method will continuously find all post information under a user
Args:
user_id: User ID
crawl_interval: Crawl delay (seconds)
callback: Update callback function after one pagination crawl ends
xsec_token: Verification token
xsec_source: Channel source
Returns:
"""
result = []
notes_has_more = True
notes_cursor = ""
while notes_has_more and len(result) < config.CRAWLER_MAX_NOTES_COUNT:
notes_res = await self.get_notes_by_creator(
user_id, notes_cursor, xsec_token=xsec_token, xsec_source=xsec_source
)
if not notes_res:
utils.logger.error(
f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data."
)
break
notes_has_more = notes_res.get("has_more", False)
notes_cursor = notes_res.get("cursor", "")
if "notes" not in notes_res:
utils.logger.info(
f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}"
)
break
notes = notes_res["notes"]
utils.logger.info(
f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}"
)
remaining = config.CRAWLER_MAX_NOTES_COUNT - len(result)
if remaining <= 0:
break
notes_to_add = notes[:remaining]
if callback:
await callback(notes_to_add)
result.extend(notes_to_add)
await asyncio.sleep(crawl_interval)
utils.logger.info(
f"[XiaoHongShuClient.get_all_notes_by_creator] Finished getting notes for user {user_id}, total: {len(result)}"
)
return result
async def get_note_short_url(self, note_id: str) -> Dict:
"""
Get note short URL
Args:
note_id: Note ID
Returns:
"""
uri = f"/api/sns/web/short_url"
data = {"original_url": f"{self._domain}/discovery/item/{note_id}"}
return await self.post(uri, data=data, return_response=True)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def get_note_by_id_from_html(
self,
note_id: str,
xsec_source: str,
xsec_token: str,
enable_cookie: bool = False,
) -> Optional[Dict]:
"""
Get note details by parsing note detail page HTML, this interface may fail, retry 3 times here
copy from https://github.com/ReaJason/xhs/blob/eb1c5a0213f6fbb592f0a2897ee552847c69ea2d/xhs/core.py#L217-L259
thanks for ReaJason
Args:
note_id:
xsec_source:
xsec_token:
enable_cookie:
Returns:
"""
url = (
"https://www.xiaohongshu.com/explore/"
+ note_id
+ f"?xsec_token={xsec_token}&xsec_source={xsec_source}"
)
copy_headers = self.headers.copy()
if not enable_cookie:
del copy_headers["Cookie"]
html = await self.request(
method="GET", url=url, return_response=True, headers=copy_headers
)
return self._extractor.extract_note_detail_from_html(note_id, html)