feat: weibo支持指定创作者主页

This commit is contained in:
Relakkes
2024-08-24 05:52:11 +08:00
parent 61f023edac
commit ab7d8142af
9 changed files with 368 additions and 16 deletions

View File

@@ -7,10 +7,11 @@ import asyncio
import copy
import json
import re
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import urlencode
from typing import Callable, Dict, List, Optional, Union
from urllib.parse import parse_qs, unquote, urlencode
import httpx
from httpx import Response
from playwright.async_api import BrowserContext, Page
import config
@@ -38,20 +39,26 @@ class WeiboClient:
self.cookie_dict = cookie_dict
self._image_agent_host = "https://i1.wp.com/"
async def request(self, method, url, **kwargs) -> Any:
async def request(self, method, url, **kwargs) -> Union[Response, Dict]:
enable_return_response = kwargs.pop("return_response", False)
async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request(
method, url, timeout=self.timeout,
**kwargs
)
if enable_return_response:
return response
data: Dict = response.json()
if data.get("ok") != 1:
ok_code = data.get("ok")
if ok_code not in [0, 1]:
utils.logger.error(f"[WeiboClient.request] request {method}:{url} err, res:{data}")
raise DataFetchError(data.get("msg", "unkonw error"))
else:
return data.get("data", {})
async def get(self, uri: str, params=None, headers=None) -> Dict:
async def get(self, uri: str, params=None, headers=None, **kwargs) -> Union[Response, Dict]:
final_uri = uri
if isinstance(params, dict):
final_uri = (f"{uri}?"
@@ -59,7 +66,7 @@ class WeiboClient:
if headers is None:
headers = self.headers
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers)
return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers, **kwargs)
async def post(self, uri: str, data: dict) -> Dict:
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
@@ -229,3 +236,123 @@ class WeiboClient:
return None
else:
return response.content
async def get_creator_container_info(self, creator_id: str) -> Dict:
"""
获取用户的容器ID, 容器信息代表着真实请求的API路径
fid_container_id用户的微博详情API的容器ID
lfid_container_id用户的微博列表API的容器ID
Args:
creator_id:
Returns: {
"""
response = await self.get(f"/u/{creator_id}", return_response=True)
m_weibocn_params = response.cookies.get("M_WEIBOCN_PARAMS")
if not m_weibocn_params:
raise DataFetchError("get containerid failed")
m_weibocn_params_dict = parse_qs(unquote(m_weibocn_params))
return {
"fid_container_id": m_weibocn_params_dict.get("fid", [""])[0],
"lfid_container_id": m_weibocn_params_dict.get("lfid", [""])[0]
}
async def get_creator_info_by_id(self, creator_id: str) -> Dict:
"""
根据用户ID获取用户详情
Args:
creator_id:
Returns:
"""
uri = "/api/container/getIndex"
container_info = await self.get_creator_container_info(creator_id)
if container_info.get("fid_container_id") == "" or container_info.get("lfid_container_id") == "":
utils.logger.error(f"[WeiboClient.get_creator_info_by_id] get containerid failed")
raise DataFetchError("get containerid failed")
params = {
"jumpfrom": "weibocom",
"type": "uid",
"value": creator_id,
"containerid": container_info["fid_container_id"],
}
user_res = await self.get(uri, params)
if user_res.get("tabsInfo"):
tabs: List[Dict] = user_res.get("tabsInfo", {}).get("tabs", [])
for tab in tabs:
if tab.get("tabKey") == "weibo":
container_info["lfid_container_id"] = tab.get("containerid")
break
user_res.update(container_info)
return user_res
async def get_notes_by_creator(self, creator: str, container_id: str, since_id: str = "0", ) -> Dict:
"""
获取博主的笔记
Args:
creator: 博主ID
container_id: 容器ID
since_id: 上一页最后一条笔记的ID
Returns:
"""
uri = "/api/container/getIndex"
params = {
"jumpfrom": "weibocom",
"type": "uid",
"value": creator,
"containerid": container_id,
"since_id": since_id,
}
return await self.get(uri, params)
async def get_all_notes_by_creator_id(self, creator_id: str, container_id: str, crawl_interval: float = 1.0,
callback: Optional[Callable] = None) -> List[Dict]:
"""
获取指定用户下的所有发过的帖子,该方法会一直查找一个用户下的所有帖子信息
Args:
creator_id:
container_id:
crawl_interval:
callback:
Returns:
"""
result = []
notes_has_more = True
since_id = ""
crawler_total_count = 0
while notes_has_more:
notes_res = await self.get_notes_by_creator(creator_id, container_id, since_id)
if not notes_res:
utils.logger.error(
f"[WeiboClient.get_notes_by_creator] The current creator may have been banned by xhs, so they cannot access the data.")
break
notes_has_more = notes_res.get("cardlistInfo", {}).get("total", 0) > crawler_total_count
since_id = notes_res.get("cardlistInfo", {}).get("since_id", "0")
notes_has_more += 10
if "cards" not in notes_res:
utils.logger.info(
f"[WeiboClient.get_all_notes_by_creator] No 'notes' key found in response: {notes_res}")
break
notes = notes_res["cards"]
utils.logger.info(
f"[WeiboClient.get_all_notes_by_creator] got user_id:{creator_id} notes len : {len(notes)}")
notes = [note for note in notes if note.get("card_type") == 9]
if callback:
await callback(notes)
await asyncio.sleep(crawl_interval)
result.extend(notes)
return result