通过测试search模式,修复部分运行时的bug,并对能够爬取媒体的平台设置了较长的超时时间

This commit is contained in:
未来可欺
2025-07-30 21:19:56 +08:00
parent a7cc18ec7d
commit 93a1c27fff
5 changed files with 202 additions and 237 deletions

View File

@@ -8,7 +8,6 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com # @Author : relakkes@gmail.com
# @Time : 2023/12/2 18:44 # @Time : 2023/12/2 18:44
@@ -32,9 +31,10 @@ from .help import BilibiliSign
class BilibiliClient(AbstractApiClient): class BilibiliClient(AbstractApiClient):
def __init__( def __init__(
self, self,
timeout=10, timeout=60, # 若开启爬取媒体选项b 站的长视频需要更久的超时时间
proxies=None, proxies=None,
*, *,
headers: Dict[str, str], headers: Dict[str, str],
@@ -50,10 +50,7 @@ class BilibiliClient(AbstractApiClient):
async def request(self, method, url, **kwargs) -> Any: async def request(self, method, url, **kwargs) -> Any:
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request( response = await client.request(method, url, timeout=self.timeout, **kwargs)
method, url, timeout=self.timeout,
**kwargs
)
try: try:
data: Dict = response.json() data: Dict = response.json()
except json.JSONDecodeError: except json.JSONDecodeError:
@@ -111,8 +108,7 @@ class BilibiliClient(AbstractApiClient):
async def post(self, uri: str, data: dict) -> Dict: async def post(self, uri: str, data: dict) -> Dict:
data = await self.pre_request_data(data) data = await self.pre_request_data(data)
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}", return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, headers=self.headers)
data=json_str, headers=self.headers)
async def pong(self) -> bool: async def pong(self) -> bool:
"""get a note to check if login state is ok""" """get a note to check if login state is ok"""
@@ -122,12 +118,10 @@ class BilibiliClient(AbstractApiClient):
check_login_uri = "/x/web-interface/nav" check_login_uri = "/x/web-interface/nav"
response = await self.get(check_login_uri) response = await self.get(check_login_uri)
if response.get("isLogin"): if response.get("isLogin"):
utils.logger.info( utils.logger.info("[BilibiliClient.pong] Use cache login state get web interface successfull!")
"[BilibiliClient.pong] Use cache login state get web interface successfull!")
ping_flag = True ping_flag = True
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[BilibiliClient.pong] Pong bilibili failed: {e}, and try to login again...")
f"[BilibiliClient.pong] Pong bilibili failed: {e}, and try to login again...")
ping_flag = False ping_flag = False
return ping_flag return ping_flag
@@ -136,10 +130,15 @@ class BilibiliClient(AbstractApiClient):
self.headers["Cookie"] = cookie_str self.headers["Cookie"] = cookie_str
self.cookie_dict = cookie_dict self.cookie_dict = cookie_dict
async def search_video_by_keyword(self, keyword: str, page: int = 1, page_size: int = 20, async def search_video_by_keyword(
self,
keyword: str,
page: int = 1,
page_size: int = 20,
order: SearchOrderType = SearchOrderType.DEFAULT, order: SearchOrderType = SearchOrderType.DEFAULT,
pubtime_begin_s: int = 0, pubtime_end_s: int = 0) -> Dict: pubtime_begin_s: int = 0,
pubtime_end_s: int = 0,
) -> Dict:
""" """
KuaiShou web search api KuaiShou web search api
:param keyword: 搜索关键词 :param keyword: 搜索关键词
@@ -210,10 +209,11 @@ class BilibiliClient(AbstractApiClient):
else: else:
return response.content return response.content
async def get_video_comments(self, async def get_video_comments(
self,
video_id: str, video_id: str,
order_mode: CommentOrderType = CommentOrderType.DEFAULT, order_mode: CommentOrderType = CommentOrderType.DEFAULT,
next: int = 0 next: int = 0,
) -> Dict: ) -> Dict:
"""get video comments """get video comments
:param video_id: 视频 ID :param video_id: 视频 ID
@@ -222,18 +222,17 @@ class BilibiliClient(AbstractApiClient):
:return: :return:
""" """
uri = "/x/v2/reply/wbi/main" uri = "/x/v2/reply/wbi/main"
post_data = { post_data = {"oid": video_id, "mode": order_mode.value, "type": 1, "ps": 20, "next": next}
"oid": video_id,
"mode": order_mode.value,
"type": 1,
"ps": 20,
"next": next
}
return await self.get(uri, post_data) return await self.get(uri, post_data)
async def get_video_all_comments(self, video_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False, async def get_video_all_comments(
self,
video_id: str,
crawl_interval: float = 1.0,
is_fetch_sub_comments=False,
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
max_count: int = 10,): max_count: int = 10,
):
""" """
get video all comments include sub comments get video all comments include sub comments
:param video_id: :param video_id:
@@ -256,15 +255,11 @@ class BilibiliClient(AbstractApiClient):
break # Success break # Success
except DataFetchError as e: except DataFetchError as e:
if attempt < max_retries - 1: if attempt < max_retries - 1:
delay = 5 * (2 ** attempt) + random.uniform(0, 1) delay = 5 * (2**attempt) + random.uniform(0, 1)
utils.logger.warning( utils.logger.warning(f"[BilibiliClient.get_video_all_comments] Retrying video_id {video_id} in {delay:.2f}s... (Attempt {attempt + 1}/{max_retries})")
f"[BilibiliClient.get_video_all_comments] Retrying video_id {video_id} in {delay:.2f}s... (Attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay) await asyncio.sleep(delay)
else: else:
utils.logger.error( utils.logger.error(f"[BilibiliClient.get_video_all_comments] Max retries reached for video_id: {video_id}. Skipping comments. Error: {e}")
f"[BilibiliClient.get_video_all_comments] Max retries reached for video_id: {video_id}. Skipping comments. Error: {e}"
)
is_end = True is_end = True
break break
if not comments_res: if not comments_res:
@@ -292,10 +287,7 @@ class BilibiliClient(AbstractApiClient):
for comment in comment_list: for comment in comment_list:
comment_id = comment['rpid'] comment_id = comment['rpid']
if (comment.get("rcount", 0) > 0): if (comment.get("rcount", 0) > 0):
{ {await self.get_video_all_level_two_comments(video_id, comment_id, CommentOrderType.DEFAULT, 10, crawl_interval, callback)}
await self.get_video_all_level_two_comments(
video_id, comment_id, CommentOrderType.DEFAULT, 10, crawl_interval, callback)
}
if len(result) + len(comment_list) > max_count: if len(result) + len(comment_list) > max_count:
comment_list = comment_list[:max_count - len(result)] comment_list = comment_list[:max_count - len(result)]
if callback: # 如果有回调函数,就执行回调函数 if callback: # 如果有回调函数,就执行回调函数
@@ -306,7 +298,8 @@ class BilibiliClient(AbstractApiClient):
continue continue
return result return result
async def get_video_all_level_two_comments(self, async def get_video_all_level_two_comments(
self,
video_id: str, video_id: str,
level_one_comment_id: int, level_one_comment_id: int,
order_mode: CommentOrderType, order_mode: CommentOrderType,
@@ -327,8 +320,7 @@ class BilibiliClient(AbstractApiClient):
pn = 1 pn = 1
while True: while True:
result = await self.get_video_level_two_comments( result = await self.get_video_level_two_comments(video_id, level_one_comment_id, pn, ps, order_mode)
video_id, level_one_comment_id, pn, ps, order_mode)
comment_list: List[Dict] = result.get("replies", []) comment_list: List[Dict] = result.get("replies", [])
if callback: # 如果有回调函数,就执行回调函数 if callback: # 如果有回调函数,就执行回调函数
await callback(video_id, comment_list) await callback(video_id, comment_list)
@@ -338,7 +330,8 @@ class BilibiliClient(AbstractApiClient):
pn += 1 pn += 1
async def get_video_level_two_comments(self, async def get_video_level_two_comments(
self,
video_id: str, video_id: str,
level_one_comment_id: int, level_one_comment_id: int,
pn: int, pn: int,
@@ -393,7 +386,8 @@ class BilibiliClient(AbstractApiClient):
} }
return await self.get(uri, post_data) return await self.get(uri, post_data)
async def get_creator_fans(self, async def get_creator_fans(
self,
creator_id: int, creator_id: int,
pn: int, pn: int,
ps: int = 24, ps: int = 24,
@@ -411,11 +405,11 @@ class BilibiliClient(AbstractApiClient):
"pn": pn, "pn": pn,
"ps": ps, "ps": ps,
"gaia_source": "main_web", "gaia_source": "main_web",
} }
return await self.get(uri, post_data) return await self.get(uri, post_data)
async def get_creator_followings(self, async def get_creator_followings(
self,
creator_id: int, creator_id: int,
pn: int, pn: int,
ps: int = 24, ps: int = 24,
@@ -452,9 +446,13 @@ class BilibiliClient(AbstractApiClient):
return await self.get(uri, post_data) return await self.get(uri, post_data)
async def get_creator_all_fans(self, creator_info: Dict, crawl_interval: float = 1.0, async def get_creator_all_fans(
self,
creator_info: Dict,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
max_count: int = 100) -> List: max_count: int = 100,
) -> List:
""" """
get creator all fans get creator all fans
:param creator_info: :param creator_info:
@@ -482,9 +480,13 @@ class BilibiliClient(AbstractApiClient):
result.extend(fans_list) result.extend(fans_list)
return result return result
async def get_creator_all_followings(self, creator_info: Dict, crawl_interval: float = 1.0, async def get_creator_all_followings(
self,
creator_info: Dict,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
max_count: int = 100) -> List: max_count: int = 100,
) -> List:
""" """
get creator all followings get creator all followings
:param creator_info: :param creator_info:
@@ -512,9 +514,13 @@ class BilibiliClient(AbstractApiClient):
result.extend(followings_list) result.extend(followings_list)
return result return result
async def get_creator_all_dynamics(self, creator_info: Dict, crawl_interval: float = 1.0, async def get_creator_all_dynamics(
self,
creator_info: Dict,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
max_count: int = 20) -> List: max_count: int = 20,
) -> List:
""" """
get creator all followings get creator all followings
:param creator_info: :param creator_info:

View File

@@ -26,11 +26,11 @@ from .field import *
from .help import * from .help import *
class DOUYINClient(AbstractApiClient): class DouYinClient(AbstractApiClient):
def __init__( def __init__(
self, self,
timeout=30, timeout=30, # 若开启爬取媒体选项,抖音的短视频需要更久的超时时间
proxies=None, proxies=None,
*, *,
headers: Dict, headers: Dict,
@@ -314,7 +314,7 @@ class DOUYINClient(AbstractApiClient):
async def get_aweme_media(self, url: str) -> Union[bytes, None]: async def get_aweme_media(self, url: str) -> Union[bytes, None]:
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request("GET", url, timeout=self.timeout) response = await client.request("GET", url, timeout=self.timeout, follow_redirects=True)
if not response.reason_phrase == "OK": if not response.reason_phrase == "OK":
utils.logger.error(f"[DouYinCrawler.get_aweme_media] request {url} err, res:{response.text}") utils.logger.error(f"[DouYinCrawler.get_aweme_media] request {url} err, res:{response.text}")
return None return None

View File

@@ -30,7 +30,7 @@ from tools import utils
from tools.cdp_browser import CDPBrowserManager from tools.cdp_browser import CDPBrowserManager
from var import crawler_type_var, source_keyword_var from var import crawler_type_var, source_keyword_var
from .client import DOUYINClient from .client import DouYinClient
from .exception import DataFetchError from .exception import DataFetchError
from .field import PublishTimeType from .field import PublishTimeType
from .login import DouYinLogin from .login import DouYinLogin
@@ -38,7 +38,7 @@ from .login import DouYinLogin
class DouYinCrawler(AbstractCrawler): class DouYinCrawler(AbstractCrawler):
context_page: Page context_page: Page
dy_client: DOUYINClient dy_client: DouYinClient
browser_context: BrowserContext browser_context: BrowserContext
cdp_manager: Optional[CDPBrowserManager] cdp_manager: Optional[CDPBrowserManager]
@@ -233,10 +233,10 @@ class DouYinCrawler(AbstractCrawler):
await douyin_store.update_douyin_aweme(aweme_item=aweme_item) await douyin_store.update_douyin_aweme(aweme_item=aweme_item)
await self.get_aweme_media(aweme_item=aweme_item) await self.get_aweme_media(aweme_item=aweme_item)
async def create_douyin_client(self, httpx_proxy: Optional[str]) -> DOUYINClient: async def create_douyin_client(self, httpx_proxy: Optional[str]) -> DouYinClient:
"""Create douyin client""" """Create douyin client"""
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) # type: ignore cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) # type: ignore
douyin_client = DOUYINClient( douyin_client = DouYinClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
"User-Agent": await self.context_page.evaluate("() => navigator.userAgent"), "User-Agent": await self.context_page.evaluate("() => navigator.userAgent"),
@@ -385,10 +385,9 @@ class DouYinCrawler(AbstractCrawler):
if not video_download_url: if not video_download_url:
return return
videoNum = 0 videoNum = 0
for url in video_download_url: content = await self.dy_client.get_aweme_media(video_download_url)
content = await self.xhs_client.get_note_media(url)
if content is None: if content is None:
continue return
extension_file_name = f"{videoNum}.mp4" extension_file_name = f"{videoNum}.mp4"
videoNum += 1 videoNum += 1
await douyin_store.update_dy_aweme_image(aweme_id, content, extension_file_name) await douyin_store.update_dy_aweme_video(aweme_id, content, extension_file_name)

View File

@@ -8,7 +8,6 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com # @Author : relakkes@gmail.com
# @Time : 2023/12/23 15:40 # @Time : 2023/12/23 15:40
@@ -33,9 +32,10 @@ from .field import SearchType
class WeiboClient: class WeiboClient:
def __init__( def __init__(
self, self,
timeout=10, timeout=30, # 若开启爬取媒体选项weibo 的图片需要更久的超时时间
proxies=None, proxies=None,
*, *,
headers: Dict[str, str], headers: Dict[str, str],
@@ -53,10 +53,7 @@ class WeiboClient:
async def request(self, method, url, **kwargs) -> Union[Response, Dict]: async def request(self, method, url, **kwargs) -> Union[Response, Dict]:
enable_return_response = kwargs.pop("return_response", False) enable_return_response = kwargs.pop("return_response", False)
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request( response = await client.request(method, url, timeout=self.timeout, **kwargs)
method, url, timeout=self.timeout,
**kwargs
)
if enable_return_response: if enable_return_response:
return response return response
@@ -84,8 +81,7 @@ class WeiboClient:
async def post(self, uri: str, data: dict) -> Dict: async def post(self, uri: str, data: dict) -> Dict:
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}", return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, headers=self.headers)
data=json_str, headers=self.headers)
async def pong(self) -> bool: async def pong(self) -> bool:
"""get a note to check if login state is ok""" """get a note to check if login state is ok"""
@@ -112,7 +108,7 @@ class WeiboClient:
self, self,
keyword: str, keyword: str,
page: int = 1, page: int = 1,
search_type: SearchType = SearchType.DEFAULT search_type: SearchType = SearchType.DEFAULT,
) -> Dict: ) -> Dict:
""" """
search note by keyword search note by keyword
@@ -187,8 +183,11 @@ class WeiboClient:
return result return result
@staticmethod @staticmethod
async def get_comments_all_sub_comments(note_id: str, comment_list: List[Dict], async def get_comments_all_sub_comments(
callback: Optional[Callable] = None) -> List[Dict]: note_id: str,
comment_list: List[Dict],
callback: Optional[Callable] = None,
) -> List[Dict]:
""" """
获取评论的所有子评论 获取评论的所有子评论
Args: Args:
@@ -200,8 +199,7 @@ class WeiboClient:
""" """
if not config.ENABLE_GET_SUB_COMMENTS: if not config.ENABLE_GET_SUB_COMMENTS:
utils.logger.info( utils.logger.info(f"[WeiboClient.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled")
f"[WeiboClient.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled")
return [] return []
res_sub_comments = [] res_sub_comments = []
@@ -220,9 +218,7 @@ class WeiboClient:
""" """
url = f"{self._host}/detail/{note_id}" url = f"{self._host}/detail/{note_id}"
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request( response = await client.request("GET", url, timeout=self.timeout, headers=self.headers)
"GET", url, timeout=self.timeout, headers=self.headers
)
if response.status_code != 200: if response.status_code != 200:
raise DataFetchError(f"get weibo detail err: {response.text}") raise DataFetchError(f"get weibo detail err: {response.text}")
match = re.search(r'var \$render_data = (\[.*?\])\[0\]', response.text, re.DOTALL) match = re.search(r'var \$render_data = (\[.*?\])\[0\]', response.text, re.DOTALL)
@@ -230,9 +226,7 @@ class WeiboClient:
render_data_json = match.group(1) render_data_json = match.group(1)
render_data_dict = json.loads(render_data_json) render_data_dict = json.loads(render_data_json)
note_detail = render_data_dict[0].get("status") note_detail = render_data_dict[0].get("status")
note_item = { note_item = {"mblog": note_detail}
"mblog": note_detail
}
return note_item return note_item
else: else:
utils.logger.info(f"[WeiboClient.get_note_info_by_id] 未找到$render_data的值") utils.logger.info(f"[WeiboClient.get_note_info_by_id] 未找到$render_data的值")
@@ -251,7 +245,8 @@ class WeiboClient:
image_url += sub_url[i] + "/" image_url += sub_url[i] + "/"
# 微博图床对外存在防盗链,所以需要代理访问 # 微博图床对外存在防盗链,所以需要代理访问
# 由于微博图片是通过 i1.wp.com 来访问的,所以需要拼接一下 # 由于微博图片是通过 i1.wp.com 来访问的,所以需要拼接一下
final_uri = (f"{self._image_agent_host}" f"{image_url}") final_uri = (f"{self._image_agent_host}"
f"{image_url}")
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request("GET", final_uri, timeout=self.timeout) response = await client.request("GET", final_uri, timeout=self.timeout)
if not response.reason_phrase == "OK": if not response.reason_phrase == "OK":
@@ -260,8 +255,6 @@ class WeiboClient:
else: else:
return response.content return response.content
async def get_creator_container_info(self, creator_id: str) -> Dict: async def get_creator_container_info(self, creator_id: str) -> Dict:
""" """
获取用户的容器ID, 容器信息代表着真实请求的API路径 获取用户的容器ID, 容器信息代表着真实请求的API路径
@@ -278,10 +271,7 @@ class WeiboClient:
if not m_weibocn_params: if not m_weibocn_params:
raise DataFetchError("get containerid failed") raise DataFetchError("get containerid failed")
m_weibocn_params_dict = parse_qs(unquote(m_weibocn_params)) m_weibocn_params_dict = parse_qs(unquote(m_weibocn_params))
return { return {"fid_container_id": m_weibocn_params_dict.get("fid", [""])[0], "lfid_container_id": m_weibocn_params_dict.get("lfid", [""])[0]}
"fid_container_id": m_weibocn_params_dict.get("fid", [""])[0],
"lfid_container_id": m_weibocn_params_dict.get("lfid", [""])[0]
}
async def get_creator_info_by_id(self, creator_id: str) -> Dict: async def get_creator_info_by_id(self, creator_id: str) -> Dict:
""" """
@@ -316,7 +306,12 @@ class WeiboClient:
user_res.update(container_info) user_res.update(container_info)
return user_res return user_res
async def get_notes_by_creator(self, creator: str, container_id: str, since_id: str = "0", ) -> Dict: async def get_notes_by_creator(
self,
creator: str,
container_id: str,
since_id: str = "0",
) -> Dict:
""" """
获取博主的笔记 获取博主的笔记
Args: Args:
@@ -337,8 +332,13 @@ class WeiboClient:
} }
return await self.get(uri, params) return await self.get(uri, params)
async def get_all_notes_by_creator_id(self, creator_id: str, container_id: str, crawl_interval: float = 1.0, async def get_all_notes_by_creator_id(
callback: Optional[Callable] = None) -> List[Dict]: self,
creator_id: str,
container_id: str,
crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
) -> List[Dict]:
""" """
获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息 获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息
Args: Args:
@@ -357,18 +357,15 @@ class WeiboClient:
while notes_has_more: while notes_has_more:
notes_res = await self.get_notes_by_creator(creator_id, container_id, since_id) notes_res = await self.get_notes_by_creator(creator_id, container_id, since_id)
if not notes_res: if not notes_res:
utils.logger.error( utils.logger.error(f"[WeiboClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
f"[WeiboClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
break break
since_id = notes_res.get("cardlistInfo", {}).get("since_id", "0") since_id = notes_res.get("cardlistInfo", {}).get("since_id", "0")
if "cards" not in notes_res: if "cards" not in notes_res:
utils.logger.info( utils.logger.info(f"[WeiboClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
f"[WeiboClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
break break
notes = notes_res["cards"] notes = notes_res["cards"]
utils.logger.info( utils.logger.info(f"[WeiboClient.get_all_notes_by_creator] got user_id:{creator_id} notes len : {len(notes)}")
f"[WeiboClient.get_all_notes_by_creator] got user_id:{creator_id} notes len : {len(notes)}")
notes = [note for note in notes if note.get("card_type") == 9] notes = [note for note in notes if note.get("card_type") == 9]
if callback: if callback:
await callback(notes) await callback(notes)
@@ -377,4 +374,3 @@ class WeiboClient:
crawler_total_count += 10 crawler_total_count += 10
notes_has_more = notes_res.get("cardlistInfo", {}).get("total", 0) > crawler_total_count notes_has_more = notes_res.get("cardlistInfo", {}).get("total", 0) > crawler_total_count
return result return result

View File

@@ -8,7 +8,6 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio import asyncio
import json import json
import re import re
@@ -30,9 +29,10 @@ from .help import get_search_id, sign
class XiaoHongShuClient(AbstractApiClient): class XiaoHongShuClient(AbstractApiClient):
def __init__( def __init__(
self, self,
timeout=10, timeout=30, # 若开启爬取媒体选项xhs 的长视频需要更久的超时时间
proxies=None, proxies=None,
*, *,
headers: Dict[str, str], headers: Dict[str, str],
@@ -61,9 +61,7 @@ class XiaoHongShuClient(AbstractApiClient):
Returns: Returns:
""" """
encrypt_params = await self.playwright_page.evaluate( encrypt_params = await self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data])
"([url, data]) => window._webmsxyw(url,data)", [url, data]
)
local_storage = await self.playwright_page.evaluate("() => window.localStorage") local_storage = await self.playwright_page.evaluate("() => window.localStorage")
signs = sign( signs = sign(
a1=self.cookie_dict.get("a1", ""), a1=self.cookie_dict.get("a1", ""),
@@ -130,9 +128,7 @@ class XiaoHongShuClient(AbstractApiClient):
if isinstance(params, dict): if isinstance(params, dict):
final_uri = f"{uri}?" f"{urlencode(params)}" final_uri = f"{uri}?" f"{urlencode(params)}"
headers = await self._pre_headers(final_uri) headers = await self._pre_headers(final_uri)
return await self.request( return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
method="GET", url=f"{self._host}{final_uri}", headers=headers
)
async def post(self, uri: str, data: dict, **kwargs) -> Dict: async def post(self, uri: str, data: dict, **kwargs) -> Dict:
""" """
@@ -158,9 +154,7 @@ class XiaoHongShuClient(AbstractApiClient):
async with httpx.AsyncClient(proxies=self.proxies) as client: async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request("GET", url, timeout=self.timeout) response = await client.request("GET", url, timeout=self.timeout)
if not response.reason_phrase == "OK": if not response.reason_phrase == "OK":
utils.logger.error( utils.logger.error(f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}")
f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}"
)
return None return None
else: else:
return response.content return response.content
@@ -179,9 +173,7 @@ class XiaoHongShuClient(AbstractApiClient):
if note_card.get("items"): if note_card.get("items"):
ping_flag = True ping_flag = True
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again...")
f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again..."
)
ping_flag = False ping_flag = False
return ping_flag return ping_flag
@@ -231,7 +223,10 @@ class XiaoHongShuClient(AbstractApiClient):
return await self.post(uri, data) return await self.post(uri, data)
async def get_note_by_id( async def get_note_by_id(
self, note_id: str, xsec_source: str, xsec_token: str self,
note_id: str,
xsec_source: str,
xsec_token: str,
) -> Dict: ) -> Dict:
""" """
获取笔记详情API 获取笔记详情API
@@ -249,7 +244,9 @@ class XiaoHongShuClient(AbstractApiClient):
data = { data = {
"source_note_id": note_id, "source_note_id": note_id,
"image_formats": ["jpg", "webp", "avif"], "image_formats": ["jpg", "webp", "avif"],
"extra": {"need_body_topic": 1}, "extra": {
"need_body_topic": 1
},
"xsec_source": xsec_source, "xsec_source": xsec_source,
"xsec_token": xsec_token, "xsec_token": xsec_token,
} }
@@ -259,13 +256,14 @@ class XiaoHongShuClient(AbstractApiClient):
res_dict: Dict = res["items"][0]["note_card"] res_dict: Dict = res["items"][0]["note_card"]
return res_dict return res_dict
# 爬取频繁了可能会出现有的笔记能有结果有的没有 # 爬取频繁了可能会出现有的笔记能有结果有的没有
utils.logger.error( utils.logger.error(f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}")
f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}"
)
return dict() return dict()
async def get_note_comments( async def get_note_comments(
self, note_id: str, xsec_token: str, cursor: str = "" self,
note_id: str,
xsec_token: str,
cursor: str = "",
) -> Dict: ) -> Dict:
""" """
获取一级评论的API 获取一级评论的API
@@ -342,19 +340,15 @@ class XiaoHongShuClient(AbstractApiClient):
comments_has_more = True comments_has_more = True
comments_cursor = "" comments_cursor = ""
while comments_has_more and len(result) < max_count: while comments_has_more and len(result) < max_count:
comments_res = await self.get_note_comments( comments_res = await self.get_note_comments(note_id=note_id, xsec_token=xsec_token, cursor=comments_cursor)
note_id=note_id, xsec_token=xsec_token, cursor=comments_cursor
)
comments_has_more = comments_res.get("has_more", False) comments_has_more = comments_res.get("has_more", False)
comments_cursor = comments_res.get("cursor", "") comments_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res: if "comments" not in comments_res:
utils.logger.info( utils.logger.info(f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}")
f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}"
)
break break
comments = comments_res["comments"] comments = comments_res["comments"]
if len(result) + len(comments) > max_count: if len(result) + len(comments) > max_count:
comments = comments[: max_count - len(result)] comments = comments[:max_count - len(result)]
if callback: if callback:
await callback(note_id, comments) await callback(note_id, comments)
await asyncio.sleep(crawl_interval) await asyncio.sleep(crawl_interval)
@@ -387,9 +381,7 @@ class XiaoHongShuClient(AbstractApiClient):
""" """
if not config.ENABLE_GET_SUB_COMMENTS: if not config.ENABLE_GET_SUB_COMMENTS:
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled")
f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled"
)
return [] return []
result = [] result = []
@@ -416,16 +408,12 @@ class XiaoHongShuClient(AbstractApiClient):
) )
if comments_res is None: if comments_res is None:
utils.logger.info( utils.logger.info(f"[XiaoHongShuClient.get_comments_all_sub_comments] No response found for note_id: {note_id}")
f"[XiaoHongShuClient.get_comments_all_sub_comments] No response found for note_id: {note_id}"
)
continue continue
sub_comment_has_more = comments_res.get("has_more", False) sub_comment_has_more = comments_res.get("has_more", False)
sub_comment_cursor = comments_res.get("cursor", "") sub_comment_cursor = comments_res.get("cursor", "")
if "comments" not in comments_res: if "comments" not in comments_res:
utils.logger.info( utils.logger.info(f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}")
f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}"
)
break break
comments = comments_res["comments"] comments = comments_res["comments"]
if callback: if callback:
@@ -441,12 +429,8 @@ class XiaoHongShuClient(AbstractApiClient):
eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217 eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217
""" """
uri = f"/user/profile/{user_id}" uri = f"/user/profile/{user_id}"
html_content = await self.request( html_content = await self.request("GET", self._domain + uri, return_response=True, headers=self.headers)
"GET", self._domain + uri, return_response=True, headers=self.headers match = re.search(r"<script>window.__INITIAL_STATE__=(.+)<\/script>", html_content, re.M)
)
match = re.search(
r"<script>window.__INITIAL_STATE__=(.+)<\/script>", html_content, re.M
)
if match is None: if match is None:
return {} return {}
@@ -457,7 +441,10 @@ class XiaoHongShuClient(AbstractApiClient):
return info.get("user").get("userPageData") return info.get("user").get("userPageData")
async def get_notes_by_creator( async def get_notes_by_creator(
self, creator: str, cursor: str, page_size: int = 30 self,
creator: str,
cursor: str,
page_size: int = 30,
) -> Dict: ) -> Dict:
""" """
获取博主的笔记 获取博主的笔记
@@ -500,23 +487,17 @@ class XiaoHongShuClient(AbstractApiClient):
while notes_has_more and len(result) < config.CRAWLER_MAX_NOTES_COUNT: while notes_has_more and len(result) < config.CRAWLER_MAX_NOTES_COUNT:
notes_res = await self.get_notes_by_creator(user_id, notes_cursor) notes_res = await self.get_notes_by_creator(user_id, notes_cursor)
if not notes_res: if not notes_res:
utils.logger.error( utils.logger.error(f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
f"[XiaoHongShuClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data."
)
break break
notes_has_more = notes_res.get("has_more", False) notes_has_more = notes_res.get("has_more", False)
notes_cursor = notes_res.get("cursor", "") notes_cursor = notes_res.get("cursor", "")
if "notes" not in notes_res: if "notes" not in notes_res:
utils.logger.info( utils.logger.info(f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
f"[XiaoHongShuClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}"
)
break break
notes = notes_res["notes"] notes = notes_res["notes"]
utils.logger.info( utils.logger.info(f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}")
f"[XiaoHongShuClient.get_all_notes_by_creator] got user_id:{user_id} notes len : {len(notes)}"
)
remaining = config.CRAWLER_MAX_NOTES_COUNT - len(result) remaining = config.CRAWLER_MAX_NOTES_COUNT - len(result)
if remaining <= 0: if remaining <= 0:
@@ -529,9 +510,7 @@ class XiaoHongShuClient(AbstractApiClient):
result.extend(notes_to_add) result.extend(notes_to_add)
await asyncio.sleep(crawl_interval) await asyncio.sleep(crawl_interval)
utils.logger.info( utils.logger.info(f"[XiaoHongShuClient.get_all_notes_by_creator] Finished getting notes for user {user_id}, total: {len(result)}")
f"[XiaoHongShuClient.get_all_notes_by_creator] Finished getting notes for user {user_id}, total: {len(result)}"
)
return result return result
async def get_note_short_url(self, note_id: str) -> Dict: async def get_note_short_url(self, note_id: str) -> Dict:
@@ -582,35 +561,20 @@ class XiaoHongShuClient(AbstractApiClient):
elif isinstance(value, dict): elif isinstance(value, dict):
dict_new[new_key] = transform_json_keys(json.dumps(value)) dict_new[new_key] = transform_json_keys(json.dumps(value))
elif isinstance(value, list): elif isinstance(value, list):
dict_new[new_key] = [ dict_new[new_key] = [(transform_json_keys(json.dumps(item)) if (item and isinstance(item, dict)) else item) for item in value]
(
transform_json_keys(json.dumps(item))
if (item and isinstance(item, dict))
else item
)
for item in value
]
else: else:
dict_new[new_key] = value dict_new[new_key] = value
return dict_new return dict_new
url = ( url = ("https://www.xiaohongshu.com/explore/" + note_id + f"?xsec_token={xsec_token}&xsec_source={xsec_source}")
"https://www.xiaohongshu.com/explore/"
+ note_id
+ f"?xsec_token={xsec_token}&xsec_source={xsec_source}"
)
copy_headers = self.headers.copy() copy_headers = self.headers.copy()
if not enable_cookie: if not enable_cookie:
del copy_headers["Cookie"] del copy_headers["Cookie"]
html = await self.request( html = await self.request(method="GET", url=url, return_response=True, headers=copy_headers)
method="GET", url=url, return_response=True, headers=copy_headers
)
def get_note_dict(html): def get_note_dict(html):
state = re.findall(r"window.__INITIAL_STATE__=({.*})</script>", html)[ state = re.findall(r"window.__INITIAL_STATE__=({.*})</script>", html)[0].replace("undefined", '""')
0
].replace("undefined", '""')
if state != "{}": if state != "{}":
note_dict = transform_json_keys(state) note_dict = transform_json_keys(state)