Merge branch 'NanmiCoder:main' into main

This commit is contained in:
yangtao210
2025-11-07 17:44:09 +08:00
committed by GitHub
18 changed files with 168 additions and 79 deletions

BIN
.DS_Store vendored Normal file
View File

Binary file not shown.

View File

@@ -17,7 +17,7 @@ SORT_TYPE = "popularity_descending"
# 指定笔记URL列表, 必须要携带xsec_token参数
XHS_SPECIFIED_NOTE_URL_LIST = [
"https://www.xiaohongshu.com/explore/66fad51c000000001b0224b8?xsec_token=AB3rO-QopW5sgrJ41GwN01WCXh6yWPxjSoFI9D5JIMgKw=&xsec_source=pc_search"
"https://www.xiaohongshu.com/explore/68f99f6d0000000007033fcf?xsec_token=ABZEzjuN2fPjKF9EcMsCCxfbt3IBRsFZldGFoCJbdDmXI=&xsec_source=pc_feed"
# ........................
]

View File

@@ -59,7 +59,6 @@ export default defineConfig({
text: 'MediaCrawler源码剖析课',
link: 'https://relakkes.feishu.cn/wiki/JUgBwdhIeiSbAwkFCLkciHdAnhh'
},
{text: '知识星球文章专栏', link: '/知识星球介绍'},
{text: '开发者咨询服务', link: '/开发者咨询'},
]
},

View File

@@ -1,12 +1,12 @@
# 关于作者
> 大家都叫我阿江,网名:程序员阿江-Relakkes目前裸辞正探索自由职业,希望能靠自己的技术能力和努力,实现自己理想的生活方式
>
> 我身边有大量的技术人脉资源,如果大家有一些爬虫咨询或者编程单子可以向我丢过来
> 大家都叫我阿江,网名:程序员阿江-Relakkes目前是一名独立开发者,专注于 AI Agent 和爬虫相关的开发工作All in AI
- [Github万星开源自媒体爬虫仓库MediaCrawler作者](https://github.com/NanmiCoder/MediaCrawler)
- 全栈程序员熟悉Python、Golang、JavaScript工作中主要用Golang。
- 曾经主导并参与过百万级爬虫采集系统架构设计与编码
- 爬虫是一种技术兴趣爱好,参与爬虫有一种对抗的感觉,越难越兴奋。
- 目前专注于 AI Agent 领域,积极探索 AI 技术的应用与创新
- 如果你有 AI Agent 相关的项目需要合作,欢迎联系我,我有很多时间可以投入
## 微信联系方式
![relakkes_weichat.JPG](static/images/relakkes_weichat.jpg)

View File

@@ -15,5 +15,3 @@
## MediaCrawler源码剖析视频课程
[mediacrawler源码课程介绍](https://relakkes.feishu.cn/wiki/JUgBwdhIeiSbAwkFCLkciHdAnhh)
## 知识星球爬虫逆向、编程专栏
[知识星球专栏介绍](知识星球介绍.md)

View File

@@ -1,31 +0,0 @@
# 知识星球专栏
## 基本介绍
文章:
- 1.爬虫JS逆向案例分享
- 2.MediaCrawler技术实现分享。
- 3.沉淀python开发经验和技巧
- ......................
提问:
- 4.在星球内向我提问关于MediaCrawler、爬虫、编程任何问题
## 章节内容
- [逆向案例 - 某16x8平台商品列表接口逆向参数分析](https://articles.zsxq.com/id_x1qmtg8pzld9.html)
- [逆向案例 - Product Hunt月度最佳产品榜单接口加密参数分析](https://articles.zsxq.com/id_au4eich3x2sg.html)
- [逆向案例 - 某zhi乎x-zse-96参数分析过程](https://articles.zsxq.com/id_dui2vil0ag1l.html)
- [逆向案例 - 某x识星球X-Signature加密参数分析过程](https://articles.zsxq.com/id_pp4madwcwcg8.html)
- [【独创】使用Playwright获取某音a_bogus参数流程包含加密参数分析](https://articles.zsxq.com/id_u89al50jk9x0.html)
- [【独创】使用Playwright低成本获取某书X-s参数流程分析当年的回忆录](https://articles.zsxq.com/id_u4lcrvqakuc7.html)
- [ MediaCrawler-基于抽象类设计重构项目缓存](https://articles.zsxq.com/id_4ju73oxewt9j.html)
- [ 手把手带你撸一个自己的IP代理池](https://articles.zsxq.com/id_38fza371ladm.html)
- [一次Mysql数据库中混用collation排序规则带来的bug](https://articles.zsxq.com/id_pibwr1wnst2p.html)
- [错误使用 Python 可变类型带来的隐藏 Bug](https://articles.zsxq.com/id_f7vn89l1d303.html)
- [【MediaCrawler】微博帖子评论爬虫教程](https://articles.zsxq.com/id_vrmuhw0ovj3t.html)
- [Python协程在并发场景下的幂等性问题](https://articles.zsxq.com/id_wocdwsfmfcmp.html)
- ........................................
## 加入星球
![星球qrcode.JPG](static/images/星球qrcode.jpg)

14
main.py
View File

@@ -24,6 +24,8 @@ from media_platform.tieba import TieBaCrawler
from media_platform.weibo import WeiboCrawler
from media_platform.xhs import XiaoHongShuCrawler
from media_platform.zhihu import ZhihuCrawler
from tools.async_file_writer import AsyncFileWriter
from var import crawler_type_var
class CrawlerFactory:
@@ -72,6 +74,18 @@ async def main():
crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM)
await crawler.start()
# Generate wordcloud after crawling is complete
# Only for JSON save mode
if config.SAVE_DATA_OPTION == "json" and config.ENABLE_GET_WORDCLOUD:
try:
file_writer = AsyncFileWriter(
platform=config.PLATFORM,
crawler_type=crawler_type_var.get()
)
await file_writer.generate_wordcloud_from_comments()
except Exception as e:
print(f"Error generating wordcloud: {e}")
def cleanup():
if crawler:

View File

@@ -91,8 +91,10 @@ class DouYinClient(AbstractApiClient):
post_data = {}
if request_method == "POST":
post_data = params
a_bogus = await get_a_bogus(uri, query_string, post_data, headers["User-Agent"], self.playwright_page)
params["a_bogus"] = a_bogus
if "/v1/web/general/search" not in uri:
a_bogus = await get_a_bogus(uri, query_string, post_data, headers["User-Agent"], self.playwright_page)
params["a_bogus"] = a_bogus
async def request(self, method, url, **kwargs):
async with httpx.AsyncClient(proxy=self.proxy) as client:

View File

@@ -288,27 +288,14 @@ class WeiboClient:
"""
uri = "/api/container/getIndex"
container_info = await self.get_creator_container_info(creator_id)
if container_info.get("fid_container_id") == "" or container_info.get("lfid_container_id") == "":
utils.logger.error(f"[WeiboClient.get_creator_info_by_id] get containerid failed")
raise DataFetchError("get containerid failed")
containerid = f"100505{creator_id}"
params = {
"jumpfrom": "weibocom",
"type": "uid",
"value": creator_id,
"containerid": container_info["fid_container_id"],
"containerid":containerid,
}
user_res = await self.get(uri, params)
if user_res.get("tabsInfo"):
tabs: List[Dict] = user_res.get("tabsInfo", {}).get("tabs", [])
for tab in tabs:
if tab.get("tabKey") == "weibo":
container_info["lfid_container_id"] = tab.get("containerid")
break
user_res.update(container_info)
return user_res
async def get_notes_by_creator(

View File

@@ -293,7 +293,7 @@ class WeiboCrawler(AbstractCrawler):
# Get all note information of the creator
all_notes_list = await self.wb_client.get_all_notes_by_creator_id(
creator_id=user_id,
container_id=createor_info_res.get("lfid_container_id"),
container_id=f"107603{user_id}",
crawl_interval=0,
callback=weibo_store.batch_update_weibo_notes,
)

View File

@@ -10,23 +10,24 @@
import asyncio
import json
import re
import time
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode
import httpx
from playwright.async_api import BrowserContext, Page
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_result
from tenacity import retry, stop_after_attempt, wait_fixed
import config
from base.base_crawler import AbstractApiClient
from tools import utils
from html import unescape
from .exception import DataFetchError, IPBlockError
from .field import SearchNoteType, SearchSortType
from .help import get_search_id, sign
from .extractor import XiaoHongShuExtractor
from .secsign import seccore_signv2_playwright
class XiaoHongShuClient(AbstractApiClient):
@@ -63,15 +64,13 @@ class XiaoHongShuClient(AbstractApiClient):
Returns:
"""
encrypt_params = await self.playwright_page.evaluate(
"([url, data]) => window._webmsxyw(url,data)", [url, data]
)
x_s = await seccore_signv2_playwright(self.playwright_page, url, data)
local_storage = await self.playwright_page.evaluate("() => window.localStorage")
signs = sign(
a1=self.cookie_dict.get("a1", ""),
b1=local_storage.get("b1", ""),
x_s=encrypt_params.get("X-s", ""),
x_t=str(encrypt_params.get("X-t", "")),
x_s=x_s,
x_t=str(int(time.time())),
)
headers = {

View File

@@ -282,16 +282,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
async with semaphore:
try:
utils.logger.info(f"[get_note_detail_async_task] Begin get note detail, note_id: {note_id}")
try:
note_detail = await self.xhs_client.get_note_by_id(note_id, xsec_source, xsec_token)
except RetryError:
pass
note_detail = await self.xhs_client.get_note_by_id_from_html(note_id, xsec_source, xsec_token, enable_cookie=True)
if not note_detail:
note_detail = await self.xhs_client.get_note_by_id_from_html(note_id, xsec_source, xsec_token, enable_cookie=True)
if not note_detail:
raise Exception(f"[get_note_detail_async_task] Failed to get note detail, Id: {note_id}")
raise Exception(f"[get_note_detail_async_task] Failed to get note detail, Id: {note_id}")
note_detail.update({"xsec_token": xsec_token, "xsec_source": xsec_source})

View File

@@ -27,16 +27,17 @@ def sign(a1="", b1="", x_s="", x_t=""):
"s0": 3, # getPlatformCode
"s1": "",
"x0": "1", # localStorage.getItem("b1b1")
"x1": "3.7.8-2", # version
"x1": "4.2.2", # version
"x2": "Mac OS",
"x3": "xhs-pc-web",
"x4": "4.27.2",
"x4": "4.74.0",
"x5": a1, # cookie of a1
"x6": x_t,
"x7": x_s,
"x8": b1, # localStorage.getItem("b1")
"x9": mrc(x_t + x_s + b1),
"x10": 154, # getSigCount
"x11": "normal"
}
encode_str = encodeUtf8(json.dumps(common, separators=(',', ':')))
x_s_common = b64Encode(encode_str)

View File

@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import hashlib
import base64
import json
from typing import Any
def _build_c(e: Any, a: Any) -> str:
c = str(e)
if isinstance(a, (dict, list)):
c += json.dumps(a, separators=(",", ":"), ensure_ascii=False)
elif isinstance(a, str):
c += a
# 其它类型不拼
return c
# ---------------------------
# p.Pu = MD5(c) => hex 小写
# ---------------------------
def _md5_hex(s: str) -> str:
return hashlib.md5(s.encode("utf-8")).hexdigest()
# ============================================================
# Playwright 版本(异步):传入 pagePage 对象)
# 内部用 page.evaluate('window.mnsv2(...)')
# ============================================================
async def seccore_signv2_playwright(
page, # Playwright Page
e: Any,
a: Any,
) -> str:
"""
使用 Playwright 的 page.evaluate 调用 window.mnsv2(c, d) 来生成签名。
需确保 page 上下文中已存在 window.mnsv2比如已注入目标站点脚本
用法:
s = await page.evaluate("(c, d) => window.mnsv2(c, d)", c, d)
"""
c = _build_c(e, a)
d = _md5_hex(c)
# 调用浏览器上下文里的 window.mnsv2
s = await page.evaluate("(c, d) => window.mnsv2(c, d)", [c, d])
f = {
"x0": "4.2.6",
"x1": "xhs-pc-web",
"x2": "Mac OS",
"x3": s,
"x4": a,
}
payload = json.dumps(f, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
token = "XYS_" + base64.b64encode(payload).decode("ascii")
print(token)
return token

View File

@@ -38,7 +38,7 @@ class BiliCsvStoreImplement(AbstractStore):
def __init__(self):
self.file_writer = AsyncFileWriter(
crawler_type=crawler_type_var.get(),
platform="bilibili"
platform="bili"
)
async def store_content(self, content_item: Dict):
@@ -221,7 +221,7 @@ class BiliJsonStoreImplement(AbstractStore):
def __init__(self):
self.file_writer = AsyncFileWriter(
crawler_type=crawler_type_var.get(),
platform="bilibili"
platform="bili"
)
async def store_content(self, content_item: Dict):

View File

@@ -22,7 +22,7 @@ from tools import utils
class BilibiliVideo(AbstractStoreVideo):
video_store_path: str = "data/bilibili/videos"
video_store_path: str = "data/bili/videos"
async def store_video(self, video_content_item: Dict):
"""

View File

@@ -5,13 +5,16 @@ import os
import pathlib
from typing import Dict, List
import aiofiles
import config
from tools.utils import utils
from tools.words import AsyncWordCloudGenerator
class AsyncFileWriter:
def __init__(self, platform: str, crawler_type: str):
self.lock = asyncio.Lock()
self.platform = platform
self.crawler_type = crawler_type
self.wordcloud_generator = AsyncWordCloudGenerator() if config.ENABLE_GET_WORDCLOUD else None
def _get_file_path(self, file_type: str, item_type: str) -> str:
base_path = f"data/{self.platform}/{file_type}"
@@ -47,4 +50,58 @@ class AsyncFileWriter:
existing_data.append(item)
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
await f.write(json.dumps(existing_data, ensure_ascii=False, indent=4))
await f.write(json.dumps(existing_data, ensure_ascii=False, indent=4))
async def generate_wordcloud_from_comments(self):
"""
Generate wordcloud from comments data
Only works when ENABLE_GET_WORDCLOUD and ENABLE_GET_COMMENTS are True
"""
if not config.ENABLE_GET_WORDCLOUD or not config.ENABLE_GET_COMMENTS:
return
if not self.wordcloud_generator:
return
try:
# Read comments from JSON file
comments_file_path = self._get_file_path('json', 'comments')
if not os.path.exists(comments_file_path) or os.path.getsize(comments_file_path) == 0:
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] No comments file found at {comments_file_path}")
return
async with aiofiles.open(comments_file_path, 'r', encoding='utf-8') as f:
content = await f.read()
if not content:
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Comments file is empty")
return
comments_data = json.loads(content)
if not isinstance(comments_data, list):
comments_data = [comments_data]
# Filter comments data to only include 'content' field
# Handle different comment data structures across platforms
filtered_data = []
for comment in comments_data:
if isinstance(comment, dict):
# Try different possible content field names
content_text = comment.get('content') or comment.get('comment_text') or comment.get('text') or ''
if content_text:
filtered_data.append({'content': content_text})
if not filtered_data:
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] No valid comment content found")
return
# Generate wordcloud
words_base_path = f"data/{self.platform}/words"
pathlib.Path(words_base_path).mkdir(parents=True, exist_ok=True)
words_file_prefix = f"{words_base_path}/{self.crawler_type}_comments_{utils.get_current_date()}"
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Generating wordcloud from {len(filtered_data)} comments")
await self.wordcloud_generator.generate_word_frequency_and_cloud(filtered_data, words_file_prefix)
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Wordcloud generated successfully at {words_file_prefix}")
except Exception as e:
utils.logger.error(f"[AsyncFileWriter.generate_wordcloud_from_comments] Error generating wordcloud: {e}")

View File

@@ -26,6 +26,10 @@ def init_loging_config():
)
_logger = logging.getLogger("MediaCrawler")
_logger.setLevel(level)
# 关闭 httpx 的 INFO 日志
logging.getLogger("httpx").setLevel(logging.WARNING)
return _logger