fix: xhs creator error

This commit is contained in:
程序员阿江(Relakkes)
2026-04-07 12:54:39 +08:00
parent 21b3f90c7d
commit 699a90f830
4 changed files with 122 additions and 179 deletions

View File

@@ -8,7 +8,7 @@ repos:
# Python file header copyright check # Python file header copyright check
- id: check-file-headers - id: check-file-headers
name: Check Python file headers name: Check Python file headers
entry: python tools/file_header_manager.py --check entry: python3 tools/file_header_manager.py --check
language: system language: system
types: [python] types: [python]
pass_filenames: true pass_filenames: true
@@ -17,7 +17,7 @@ repos:
# Auto-fix Python file headers # Auto-fix Python file headers
- id: add-file-headers - id: add-file-headers
name: Add copyright headers to Python files name: Add copyright headers to Python files
entry: python tools/file_header_manager.py entry: python3 tools/file_header_manager.py
language: system language: system
types: [python] types: [python]
pass_filenames: true pass_filenames: true

View File

@@ -47,7 +47,7 @@ SAVE_LOGIN_STATE = True
# Whether to enable CDP mode - use the user's existing Chrome/Edge browser to crawl, providing better anti-detection capabilities # Whether to enable CDP mode - use the user's existing Chrome/Edge browser to crawl, providing better anti-detection capabilities
# Once enabled, the user's Chrome/Edge browser will be automatically detected and started, and controlled through the CDP protocol. # Once enabled, the user's Chrome/Edge browser will be automatically detected and started, and controlled through the CDP protocol.
# This method uses the real browser environment, including the user's extensions, cookies and settings, greatly reducing the risk of detection. # This method uses the real browser environment, including the user's extensions, cookies and settings, greatly reducing the risk of detection.
ENABLE_CDP_MODE = True ENABLE_CDP_MODE = False
# CDP debug port, used to communicate with the browser # CDP debug port, used to communicate with the browser
# If the port is occupied, the system will automatically try the next available port # If the port is occupied, the system will automatically try the next available port

View File

@@ -20,7 +20,7 @@
import asyncio import asyncio
import json import json
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode from urllib.parse import quote, urlencode
import httpx import httpx
from playwright.async_api import BrowserContext, Page from playwright.async_api import BrowserContext, Page
@@ -39,7 +39,7 @@ from .exception import DataFetchError, IPBlockError, NoteNotFoundError
from .field import SearchNoteType, SearchSortType from .field import SearchNoteType, SearchSortType
from .help import get_search_id from .help import get_search_id
from .extractor import XiaoHongShuExtractor from .extractor import XiaoHongShuExtractor
from .playwright_sign import sign_with_playwright from .playwright_sign import sign_with_xhshow
class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin): class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin):
@@ -71,19 +71,16 @@ class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin):
self.init_proxy_pool(proxy_ip_pool) self.init_proxy_pool(proxy_ip_pool)
async def _pre_headers(self, url: str, params: Optional[Dict] = None, payload: Optional[Dict] = None) -> Dict: async def _pre_headers(self, url: str, params: Optional[Dict] = None, payload: Optional[Dict] = None) -> Dict:
"""Request header parameter signing (using playwright injection method) """请求头参数签名 (使用 xhshow 纯算法)
Args: Args:
url: Request URL url: 请求 URI path
params: GET request parameters params: GET 请求参数
payload: POST request parameters payload: POST 请求参数
Returns: Returns:
Dict: Signed request header parameters Dict: 签名后的请求头参数
""" """
a1_value = self.cookie_dict.get("a1", "")
# Determine request data, method and URI
if params is not None: if params is not None:
data = params data = params
method = "GET" method = "GET"
@@ -93,12 +90,11 @@ class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin):
else: else:
raise ValueError("params or payload is required") raise ValueError("params or payload is required")
# Generate signature using playwright injection method # 使用 xhshow 纯算法生成签名
signs = await sign_with_playwright( signs = sign_with_xhshow(
page=self.playwright_page,
uri=url, uri=url,
data=data, data=data,
a1=a1_value, cookie_str=self.headers.get("Cookie", ""),
method=method, method=method,
) )
@@ -152,6 +148,15 @@ class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin):
err_msg = data.get("msg", None) or f"{response.text}" err_msg = data.get("msg", None) or f"{response.text}"
raise DataFetchError(err_msg) raise DataFetchError(err_msg)
@staticmethod
def _build_query_string(params: Dict) -> str:
"""Build URL query string with encoding matching browser behavior (commas not encoded)"""
parts = []
for key, value in params.items():
value_str = str(value) if value is not None else ""
parts.append(f"{key}={quote(value_str, safe=',')}")
return "&".join(parts)
async def get(self, uri: str, params: Optional[Dict] = None) -> Dict: async def get(self, uri: str, params: Optional[Dict] = None) -> Dict:
""" """
GET request, signs request headers GET request, signs request headers
@@ -163,10 +168,15 @@ class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin):
""" """
headers = await self._pre_headers(uri, params) headers = await self._pre_headers(uri, params)
# Build URL manually to ensure query string encoding matches the sign string
# (httpx's default params encoding differs from browser/XHS frontend behavior)
if params:
full_url = f"{self._host}{uri}?{self._build_query_string(params)}"
else:
full_url = f"{self._host}{uri}" full_url = f"{self._host}{uri}"
return await self.request( return await self.request(
method="GET", url=full_url, headers=headers, params=params method="GET", url=full_url, headers=headers
) )
async def post(self, uri: str, data: dict, **kwargs) -> Dict: async def post(self, uri: str, data: dict, **kwargs) -> Dict:
@@ -568,6 +578,7 @@ class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin):
"num": page_size, "num": page_size,
"cursor": cursor, "cursor": cursor,
"user_id": creator, "user_id": creator,
"image_formats": "jpg,webp,avif",
"xsec_token": xsec_token, "xsec_token": xsec_token,
"xsec_source": xsec_source, "xsec_source": xsec_source,
} }

View File

@@ -16,21 +16,61 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# Generate Xiaohongshu signature by calling window.mnsv2 via Playwright injection # Xiaohongshu signature generation using xhshow pure-algorithm library
#
# 致谢:本签名实现依赖 xhshow 开源库, 由 Cloxl 提供
# 仓库地址: https://github.com/Cloxl/xhshow
# 许可协议: MIT License
import hashlib import hashlib
import json import json
import time import time
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union
from urllib.parse import urlparse, quote from urllib.parse import quote
from playwright.async_api import Page from .xhs_sign import get_trace_id
from .xhs_sign import b64_encode, encode_utf8, get_trace_id, mrc
def _patch_xhshow_a3_hash():
"""
修复 xhshow 库 build_payload_array 中 a3_hash 计算的 bug。
xhshow 原实现对所有请求使用 MD5(extract_api_path(content_string)) 计算 a3_hash,
其中 extract_api_path 会同时去掉 "?" 后的查询参数和 "{" 后的 JSON body。
但浏览器的实际行为是:
- POST: a3 使用 MD5(api_path), 即去掉 JSON body 后的路径 → 原实现正确
- GET: a3 使用 MD5(完整 URL + 查询参数) → 原实现错误, 因为也去掉了查询参数
修复方式: 对 GET 请求(content_string 不含 "{"), 使用完整 content_string 的 MD5;
对 POST 请求(content_string 含 "{"), 保持原始行为。
相关 issue: https://github.com/Cloxl/xhshow/issues/104
"""
from xhshow.core.crypto import CryptoProcessor
_original_build = CryptoProcessor.build_payload_array
def _patched_build(self, hex_parameter, a1_value, app_identifier="xhs-pc-web",
string_param="", timestamp=None, sign_state=None):
payload = _original_build(self, hex_parameter, a1_value, app_identifier,
string_param, timestamp, sign_state)
# 仅当 content_string 不含 "{" 时修复 (即 GET 请求)
if "{" not in string_param:
correct_md5_hex = hashlib.md5(string_param.encode("utf-8")).hexdigest()
correct_md5_bytes = [int(correct_md5_hex[i:i + 2], 16) for i in range(0, 32, 2)]
seed_byte = payload[4]
ts_bytes = payload[8:16]
correct_a3_hash = self._custom_hash_v2(list(ts_bytes) + correct_md5_bytes)
for i in range(16):
payload[128 + i] = correct_a3_hash[i] ^ seed_byte
return payload
CryptoProcessor.build_payload_array = _patched_build
# 启动时应用 monkey-patch
_patch_xhshow_a3_hash()
def _build_sign_string(uri: str, data: Optional[Union[Dict, str]] = None, method: str = "POST") -> str: def _build_sign_string(uri: str, data: Optional[Union[Dict, str]] = None, method: str = "POST") -> str:
"""Build string to be signed """Build content string to be signed
Args: Args:
uri: API path uri: API path
@@ -38,10 +78,9 @@ def _build_sign_string(uri: str, data: Optional[Union[Dict, str]] = None, method
method: Request method (GET or POST) method: Request method (GET or POST)
Returns: Returns:
String to be signed Content string for signing
""" """
if method.upper() == "POST": if method.upper() == "POST":
# POST request uses JSON format
c = uri c = uri
if data is not None: if data is not None:
if isinstance(data, dict): if isinstance(data, dict):
@@ -50,10 +89,8 @@ def _build_sign_string(uri: str, data: Optional[Union[Dict, str]] = None, method
c += data c += data
return c return c
else: else:
# GET request uses query string format
if not data or (isinstance(data, dict) and len(data) == 0): if not data or (isinstance(data, dict) and len(data) == 0):
return uri return uri
if isinstance(data, dict): if isinstance(data, dict):
params = [] params = []
for key in data.keys(): for key in data.keys():
@@ -64,9 +101,8 @@ def _build_sign_string(uri: str, data: Optional[Union[Dict, str]] = None, method
value_str = str(value) value_str = str(value)
else: else:
value_str = "" value_str = ""
# Use URL encoding (safe parameter preserves certain characters from encoding) # URL encoding: preserve commas to match browser behavior
# Note: httpx will encode commas, equals signs, etc., we need to handle the same way value_str = quote(value_str, safe=',')
value_str = quote(value_str, safe='')
params.append(f"{key}={value_str}") params.append(f"{key}={value_str}")
return f"{uri}?{'&'.join(params)}" return f"{uri}?{'&'.join(params)}"
elif isinstance(data, str): elif isinstance(data, str):
@@ -74,171 +110,67 @@ def _build_sign_string(uri: str, data: Optional[Union[Dict, str]] = None, method
return uri return uri
def _md5_hex(s: str) -> str: def sign_with_xhshow(
"""Calculate MD5 hash value"""
return hashlib.md5(s.encode("utf-8")).hexdigest()
def _build_xs_payload(x3_value: str, data_type: str = "object") -> str:
"""Build x-s signature"""
s = {
"x0": "4.2.1",
"x1": "xhs-pc-web",
"x2": "Mac OS",
"x3": x3_value,
"x4": data_type,
}
return "XYS_" + b64_encode(encode_utf8(json.dumps(s, separators=(",", ":"))))
def _build_xs_common(a1: str, b1: str, x_s: str, x_t: str) -> str:
"""Build x-s-common request header"""
payload = {
"s0": 3,
"s1": "",
"x0": "1",
"x1": "4.2.2",
"x2": "Mac OS",
"x3": "xhs-pc-web",
"x4": "4.74.0",
"x5": a1,
"x6": x_t,
"x7": x_s,
"x8": b1,
"x9": mrc(x_t + x_s + b1),
"x10": 154,
"x11": "normal",
}
return b64_encode(encode_utf8(json.dumps(payload, separators=(",", ":"))))
async def get_b1_from_localstorage(page: Page) -> str:
"""Get b1 value from localStorage"""
try:
local_storage = await page.evaluate("() => window.localStorage")
return local_storage.get("b1", "")
except Exception:
return ""
async def call_mnsv2(page: Page, sign_str: str, md5_str: str) -> str:
"""
Call window.mnsv2 function via playwright
Args:
page: playwright Page object
sign_str: String to be signed (uri + JSON.stringify(data))
md5_str: MD5 hash value of sign_str
Returns:
Signature string returned by mnsv2
"""
sign_str_escaped = sign_str.replace("\\", "\\\\").replace("'", "\\'").replace("\n", "\\n")
md5_str_escaped = md5_str.replace("\\", "\\\\").replace("'", "\\'")
try:
result = await page.evaluate(f"window.mnsv2('{sign_str_escaped}', '{md5_str_escaped}')")
return result if result else ""
except Exception:
return ""
async def sign_xs_with_playwright(
page: Page,
uri: str, uri: str,
data: Optional[Union[Dict, str]] = None, data: Optional[Union[Dict, str]] = None,
method: str = "POST", cookie_str: str = "",
) -> str:
"""
Generate x-s signature via playwright injection
Args:
page: playwright Page object (must have Xiaohongshu page open)
uri: API path, e.g., "/api/sns/web/v1/search/notes"
data: Request data (GET params or POST payload)
method: Request method (GET or POST)
Returns:
x-s signature string
"""
sign_str = _build_sign_string(uri, data, method)
md5_str = _md5_hex(sign_str)
x3_value = await call_mnsv2(page, sign_str, md5_str)
data_type = "object" if isinstance(data, (dict, list)) else "string"
return _build_xs_payload(x3_value, data_type)
async def sign_with_playwright(
page: Page,
uri: str,
data: Optional[Union[Dict, str]] = None,
a1: str = "",
method: str = "POST", method: str = "POST",
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Generate complete signature request headers via playwright 使用 xhshow 纯算法生成完整签名请求头
Args: Args:
page: playwright Page object (must have Xiaohongshu page open)
uri: API path uri: API path
data: Request data data: Request data (GET params dict or POST payload dict)
a1: a1 value from cookie cookie_str: Cookie string
method: Request method (GET or POST) method: Request method (GET or POST)
Returns: Returns:
Dictionary containing x-s, x-t, x-s-common, x-b3-traceid Dictionary containing x-s, x-t, x-s-common, x-b3-traceid
""" """
b1 = await get_b1_from_localstorage(page) from xhshow import Xhshow
x_s = await sign_xs_with_playwright(page, uri, data, method) xhshow_client = Xhshow()
x_t = str(int(time.time() * 1000))
return { is_post = method.upper() == "POST"
"x-s": x_s,
"x-t": x_t,
"x-s-common": _build_xs_common(a1, b1, x_s, x_t),
"x-b3-traceid": get_trace_id(),
}
if is_post:
async def pre_headers_with_playwright( headers = xhshow_client.sign_headers_post(
page: Page, uri=uri,
url: str, cookies=cookie_str,
cookie_dict: Dict[str, str], payload=data if isinstance(data, dict) else {},
params: Optional[Dict] = None, )
payload: Optional[Dict] = None,
) -> Dict[str, str]:
"""
Generate request header signature using playwright injection method
Can directly replace _pre_headers method in client.py
Args:
page: playwright Page object
url: Request URL
cookie_dict: Cookie dictionary
params: GET request parameters
payload: POST request parameters
Returns:
Signed request header dictionary
"""
a1_value = cookie_dict.get("a1", "")
uri = urlparse(url).path
# Determine request data and method
if params is not None:
data = params
method = "GET"
elif payload is not None:
data = payload
method = "POST"
else: else:
raise ValueError("params or payload is required") # GET 请求: 构建完整的 content_string 用于签名
content_string = _build_sign_string(uri, data, method)
cookie_dict = xhshow_client._parse_cookies(cookie_str)
a1_value = cookie_dict.get("a1", "")
signs = await sign_with_playwright(page, uri, data, a1_value, method) ts = time.time()
d_value = hashlib.md5(content_string.encode("utf-8")).hexdigest()
payload_array = xhshow_client.crypto_processor.build_payload_array(
d_value, a1_value, "xhs-pc-web", content_string, ts
)
xor_result = xhshow_client.crypto_processor.bit_ops.xor_transform_array(payload_array)
config = xhshow_client.config
x3_b64 = xhshow_client.crypto_processor.b64encoder.encode_x3(
xor_result[:config.PAYLOAD_LENGTH]
)
sig_data = config.SIGNATURE_DATA_TEMPLATE.copy()
sig_data["x3"] = config.X3_PREFIX + x3_b64
x_s = config.XYS_PREFIX + xhshow_client.crypto_processor.b64encoder.encode(
json.dumps(sig_data, separators=(",", ":"), ensure_ascii=False)
)
headers = {
"x-s": x_s,
"x-s-common": xhshow_client.sign_xs_common(cookie_dict),
"x-t": str(xhshow_client.get_x_t(ts)),
"x-b3-traceid": xhshow_client.get_b3_trace_id(),
}
return { return {
"X-S": signs["x-s"], "x-s": headers.get("x-s", ""),
"X-T": signs["x-t"], "x-t": headers.get("x-t", ""),
"x-S-Common": signs["x-s-common"], "x-s-common": headers.get("x-s-common", ""),
"X-B3-Traceid": signs["x-b3-traceid"], "x-b3-traceid": headers.get("x-b3-traceid", get_trace_id()),
} }