refactor: config update

This commit is contained in:
程序员阿江(Relakkes)
2025-07-18 23:26:52 +08:00
parent 122978b35c
commit 13b00f7a36
17 changed files with 964 additions and 485 deletions

View File

@@ -39,7 +39,7 @@ SAVE_LOGIN_STATE = True
# 是否启用CDP模式 - 使用用户现有的Chrome/Edge浏览器进行爬取提供更好的反检测能力 # 是否启用CDP模式 - 使用用户现有的Chrome/Edge浏览器进行爬取提供更好的反检测能力
# 启用后将自动检测并启动用户的Chrome/Edge浏览器通过CDP协议进行控制 # 启用后将自动检测并启动用户的Chrome/Edge浏览器通过CDP协议进行控制
# 这种方式使用真实的浏览器环境包括用户的扩展、Cookie和设置大大降低被检测的风险 # 这种方式使用真实的浏览器环境包括用户的扩展、Cookie和设置大大降低被检测的风险
ENABLE_CDP_MODE = False ENABLE_CDP_MODE = True
# CDP调试端口用于与浏览器通信 # CDP调试端口用于与浏览器通信
# 如果端口被占用,系统会自动尝试下一个可用端口 # 如果端口被占用,系统会自动尝试下一个可用端口
@@ -105,3 +105,14 @@ STOP_WORDS_FILE = "./docs/hit_stopwords.txt"
# 中文字体文件路径 # 中文字体文件路径
FONT_PATH = "./docs/STZHONGS.TTF" FONT_PATH = "./docs/STZHONGS.TTF"
# 爬取间隔时间
CRAWLER_MAX_SLEEP_SEC = 2
from .bilibili_config import *
from .xhs_config import *
from .dy_config import *
from .ks_config import *
from .weibo_config import *
from .tieba_config import *
from .zhihu_config import *

View File

@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。 # 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
@@ -7,28 +8,40 @@
# #
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# bilili 平台配置
from config import *
# 每天爬取视频/帖子的数量控制 # 每天爬取视频/帖子的数量控制
MAX_NOTES_PER_DAY = 1 MAX_NOTES_PER_DAY = 1
# Bilibili 平台配置 # 指定B站视频ID列表
BILI_SPECIFIED_ID_LIST = [ BILI_SPECIFIED_ID_LIST = [
"BV1d54y1g7db", "BV1d54y1g7db",
"BV1Sz4y1U77N", "BV1Sz4y1U77N",
"BV14Q4y1n7jz", "BV14Q4y1n7jz",
# ........................ # ........................
] ]
START_DAY = "2024-01-01"
END_DAY = "2024-01-01" # 指定B站用户ID列表
BILI_SEARCH_MODE = "normal"
CREATOR_MODE = True
START_CONTACTS_PAGE = 1
CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100
CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50
BILI_CREATOR_ID_LIST = [ BILI_CREATOR_ID_LIST = [
"20813884", "20813884",
# ........................ # ........................
] ]
# 指定时间范围
START_DAY = "2024-01-01"
END_DAY = "2024-01-01"
# 搜索模式
BILI_SEARCH_MODE = "normal"
# 是否爬取用户信息
CREATOR_MODE = True
# 开始爬取用户信息页码
START_CONTACTS_PAGE = 1
# 单个视频/帖子最大爬取评论数
CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100
# 单个视频/帖子最大爬取动态数
CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50

View File

@@ -8,15 +8,17 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from config import *
# 抖音平台配置 # 抖音平台配置
PUBLISH_TIME_TYPE = 0 PUBLISH_TIME_TYPE = 0
# 指定DY视频ID列表
DY_SPECIFIED_ID_LIST = [ DY_SPECIFIED_ID_LIST = [
"7280854932641664319", "7280854932641664319",
"7202432992642387233", "7202432992642387233",
# ........................ # ........................
] ]
# 指定DY用户ID列表
DY_CREATOR_ID_LIST = [ DY_CREATOR_ID_LIST = [
"MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE", "MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE",
# ........................ # ........................

View File

@@ -8,10 +8,12 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from config import *
# 快手平台配置 # 快手平台配置
# 指定快手视频ID列表
KS_SPECIFIED_ID_LIST = ["3xf8enb8dbj6uig", "3x6zz972bchmvqe"] KS_SPECIFIED_ID_LIST = ["3xf8enb8dbj6uig", "3x6zz972bchmvqe"]
# 指定快手用户ID列表
KS_CREATOR_ID_LIST = [ KS_CREATOR_ID_LIST = [
"3x4sm73aye7jq7i", "3x4sm73aye7jq7i",
# ........................ # ........................

View File

@@ -8,13 +8,17 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from config import *
# 贴吧平台配置 # 贴吧平台配置
# 指定贴吧ID列表
TIEBA_SPECIFIED_ID_LIST = [] TIEBA_SPECIFIED_ID_LIST = []
# 指定贴吧名称列表
TIEBA_NAME_LIST = [ TIEBA_NAME_LIST = [
# "盗墓笔记" # "盗墓笔记"
] ]
# 指定贴吧用户URL列表
TIEBA_CREATOR_URL_LIST = [ TIEBA_CREATOR_URL_LIST = [
"https://tieba.baidu.com/home/main/?id=tb.1.7f139e2e.6CyEwxu3VJruH_-QqpCi6g&fr=frs", "https://tieba.baidu.com/home/main/?id=tb.1.7f139e2e.6CyEwxu3VJruH_-QqpCi6g&fr=frs",
# ........................ # ........................

View File

@@ -8,14 +8,19 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from config import *
# 微博平台配置 # 微博平台配置
# 搜索类型具体的枚举值在media_platform/weibo/field.py中
WEIBO_SEARCH_TYPE = "popular" WEIBO_SEARCH_TYPE = "popular"
# 指定微博ID列表
WEIBO_SPECIFIED_ID_LIST = [ WEIBO_SPECIFIED_ID_LIST = [
"4982041758140155", "4982041758140155",
# ........................ # ........................
] ]
# 指定微博用户ID列表
WEIBO_CREATOR_ID_LIST = [ WEIBO_CREATOR_ID_LIST = [
"5533390220", "5533390220",
# ........................ # ........................

View File

@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。 # 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
@@ -8,16 +9,22 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from config import *
# 小红书平台配置 # 小红书平台配置
# 排序方式具体的枚举值在media_platform/xhs/field.py中
SORT_TYPE = "popularity_descending" SORT_TYPE = "popularity_descending"
# 用户代理xhs自定义User-Agent
UA = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0" UA = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0"
CRAWLER_MAX_SLEEP_SEC = 2
# 指定笔记URL列表, 必须要携带xsec_token参数
XHS_SPECIFIED_NOTE_URL_LIST = [ XHS_SPECIFIED_NOTE_URL_LIST = [
"https://www.xiaohongshu.com/explore/66fad51c000000001b0224b8?xsec_token=AB3rO-QopW5sgrJ41GwN01WCXh6yWPxjSoFI9D5JIMgKw=&xsec_source=pc_search" "https://www.xiaohongshu.com/explore/66fad51c000000001b0224b8?xsec_token=AB3rO-QopW5sgrJ41GwN01WCXh6yWPxjSoFI9D5JIMgKw=&xsec_source=pc_search"
# ........................ # ........................
] ]
# 指定用户ID列表
XHS_CREATOR_ID_LIST = [ XHS_CREATOR_ID_LIST = [
"63e36c9a000000002703502b", "63e36c9a000000002703502b",
# ........................ # ........................

View File

@@ -8,13 +8,16 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from config import *
# 知乎平台配置 # 知乎平台配置
# 指定知乎用户URL列表
ZHIHU_CREATOR_URL_LIST = [ ZHIHU_CREATOR_URL_LIST = [
"https://www.zhihu.com/people/yd1234567", "https://www.zhihu.com/people/yd1234567",
# ........................ # ........................
] ]
# 指定知乎ID列表
ZHIHU_SPECIFIED_ID_LIST = [ ZHIHU_SPECIFIED_ID_LIST = [
"https://www.zhihu.com/question/826896610/answer/4885821440", # 回答 "https://www.zhihu.com/question/826896610/answer/4885821440", # 回答
"https://zhuanlan.zhihu.com/p/673461588", # 文章 "https://zhuanlan.zhihu.com/p/673461588", # 文章

31
main.py
View File

@@ -34,20 +34,26 @@ class CrawlerFactory:
"bili": BilibiliCrawler, "bili": BilibiliCrawler,
"wb": WeiboCrawler, "wb": WeiboCrawler,
"tieba": TieBaCrawler, "tieba": TieBaCrawler,
"zhihu": ZhihuCrawler "zhihu": ZhihuCrawler,
} }
@staticmethod @staticmethod
def create_crawler(platform: str) -> AbstractCrawler: def create_crawler(platform: str) -> AbstractCrawler:
crawler_class = CrawlerFactory.CRAWLERS.get(platform) crawler_class = CrawlerFactory.CRAWLERS.get(platform)
if not crawler_class: if not crawler_class:
raise ValueError("Invalid Media Platform Currently only supported xhs or dy or ks or bili ...") raise ValueError(
"Invalid Media Platform Currently only supported xhs or dy or ks or bili ..."
)
return crawler_class() return crawler_class()
crawler: Optional[AbstractCrawler] = None
async def main(): async def main():
# Init crawler # Init crawler
crawler: Optional[AbstractCrawler] = None global crawler
try:
# parse cmd # parse cmd
await cmd_arg.parse_cmd() await cmd_arg.parse_cmd()
@@ -58,17 +64,16 @@ async def main():
crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM) crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM)
await crawler.start() await crawler.start()
finally:
def cleanup():
if crawler: if crawler:
await crawler.close() asyncio.run(crawler.close())
if config.SAVE_DATA_OPTION in ["db", "sqlite"]: if config.SAVE_DATA_OPTION in ["db", "sqlite"]:
await db.close() asyncio.run(db.close())
if __name__ == '__main__':
if __name__ == "__main__":
try: try:
# asyncio.run(main())
asyncio.get_event_loop().run_until_complete(main()) asyncio.get_event_loop().run_until_complete(main())
except KeyboardInterrupt: finally:
print("\n[main] Caught keyboard interrupt, exiting.") cleanup()
sys.exit()

View File

@@ -22,10 +22,16 @@ from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime, timedelta from datetime import datetime, timedelta
import pandas as pd import pandas as pd
from playwright.async_api import (BrowserContext, BrowserType, Page, Playwright, async_playwright) from playwright.async_api import (
BrowserContext,
BrowserType,
Page,
Playwright,
async_playwright,
)
from playwright._impl._errors import TargetClosedError from playwright._impl._errors import TargetClosedError
from . import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import bilibili as bilibili_store from store import bilibili as bilibili_store
@@ -53,28 +59,30 @@ class BilibiliCrawler(AbstractCrawler):
async def start(self): async def start(self):
playwright_proxy_format, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) ip_proxy_pool = await create_ip_pool(
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = self.format_proxy_info( playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(
ip_proxy_info) ip_proxy_info
)
async with async_playwright() as playwright: async with async_playwright() as playwright:
# 根据配置选择启动模式 # 根据配置选择启动模式
if config.ENABLE_CDP_MODE: if config.ENABLE_CDP_MODE:
utils.logger.info("[BilibiliCrawler] 使用CDP模式启动浏览器") utils.logger.info("[BilibiliCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp( self.browser_context = await self.launch_browser_with_cdp(
playwright, playwright_proxy_format, self.user_agent, playwright,
headless=config.CDP_HEADLESS playwright_proxy_format,
self.user_agent,
headless=config.CDP_HEADLESS,
) )
else: else:
utils.logger.info("[BilibiliCrawler] 使用标准模式启动浏览器") utils.logger.info("[BilibiliCrawler] 使用标准模式启动浏览器")
# Launch a browser context. # Launch a browser context.
chromium = playwright.chromium chromium = playwright.chromium
self.browser_context = await self.launch_browser( self.browser_context = await self.launch_browser(
chromium, chromium, None, self.user_agent, headless=config.HEADLESS
None,
self.user_agent,
headless=config.HEADLESS
) )
# stealth.min.js is a js script to prevent the website from detecting the crawler. # stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js") await self.browser_context.add_init_script(path="libs/stealth.min.js")
@@ -89,10 +97,12 @@ class BilibiliCrawler(AbstractCrawler):
login_phone="", # your phone number login_phone="", # your phone number
browser_context=self.browser_context, browser_context=self.browser_context,
context_page=self.context_page, context_page=self.context_page,
cookie_str=config.COOKIES cookie_str=config.COOKIES,
) )
await login_obj.begin() await login_obj.begin()
await self.bili_client.update_cookies(browser_context=self.browser_context) await self.bili_client.update_cookies(
browser_context=self.browser_context
)
crawler_type_var.set(config.CRAWLER_TYPE) crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search": if config.CRAWLER_TYPE == "search":
@@ -108,8 +118,7 @@ class BilibiliCrawler(AbstractCrawler):
await self.get_all_creator_details(config.BILI_CREATOR_ID_LIST) await self.get_all_creator_details(config.BILI_CREATOR_ID_LIST)
else: else:
pass pass
utils.logger.info( utils.logger.info("[BilibiliCrawler.start] Bilibili Crawler finished ...")
"[BilibiliCrawler.start] Bilibili Crawler finished ...")
async def search(self): async def search(self):
""" """
@@ -126,7 +135,9 @@ class BilibiliCrawler(AbstractCrawler):
utils.logger.warning(f"Unknown BILI_SEARCH_MODE: {config.BILI_SEARCH_MODE}") utils.logger.warning(f"Unknown BILI_SEARCH_MODE: {config.BILI_SEARCH_MODE}")
@staticmethod @staticmethod
async def get_pubtime_datetime(start: str = config.START_DAY, end: str = config.END_DAY) -> Tuple[str, str]: async def get_pubtime_datetime(
start: str = config.START_DAY, end: str = config.END_DAY
) -> Tuple[str, str]:
""" """
获取 bilibili 作品发布日期起始时间戳 pubtime_begin_s 与发布日期结束时间戳 pubtime_end_s 获取 bilibili 作品发布日期起始时间戳 pubtime_begin_s 与发布日期结束时间戳 pubtime_end_s
--- ---
@@ -144,14 +155,20 @@ class BilibiliCrawler(AbstractCrawler):
转换为可读的 datetime 对象pubtime_begin_s = datetime.datetime(2024, 1, 5, 0, 0)pubtime_end_s = datetime.datetime(2024, 1, 6, 23, 59, 59) 转换为可读的 datetime 对象pubtime_begin_s = datetime.datetime(2024, 1, 5, 0, 0)pubtime_end_s = datetime.datetime(2024, 1, 6, 23, 59, 59)
""" """
# 转换 start 与 end 为 datetime 对象 # 转换 start 与 end 为 datetime 对象
start_day: datetime = datetime.strptime(start, '%Y-%m-%d') start_day: datetime = datetime.strptime(start, "%Y-%m-%d")
end_day: datetime = datetime.strptime(end, '%Y-%m-%d') end_day: datetime = datetime.strptime(end, "%Y-%m-%d")
if start_day > end_day: if start_day > end_day:
raise ValueError('Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') raise ValueError(
"Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end"
)
elif start_day == end_day: # 搜索同一天的内容 elif start_day == end_day: # 搜索同一天的内容
end_day = start_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second end_day = (
start_day + timedelta(days=1) - timedelta(seconds=1)
) # 则将 end_day 设置为 start_day + 1 day - 1 second
else: # 搜索 start 至 end else: # 搜索 start 至 end
end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second end_day = (
end_day + timedelta(days=1) - timedelta(seconds=1)
) # 则将 end_day 设置为 end_day + 1 day - 1 second
# 将其重新转换为时间戳 # 将其重新转换为时间戳
return str(int(start_day.timestamp())), str(int(end_day.timestamp())) return str(int(start_day.timestamp())), str(int(end_day.timestamp()))
@@ -160,22 +177,32 @@ class BilibiliCrawler(AbstractCrawler):
search bilibili video with keywords in normal mode search bilibili video with keywords in normal mode
:return: :return:
""" """
utils.logger.info("[BilibiliCrawler.search_by_keywords] Begin search bilibli keywords") utils.logger.info(
"[BilibiliCrawler.search_by_keywords] Begin search bilibli keywords"
)
bili_limit_count = 20 # bilibili limit page fixed value bili_limit_count = 20 # bilibili limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count: if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count
start_page = config.START_PAGE # start page number start_page = config.START_PAGE # start page number
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info(f"[BilibiliCrawler.search_by_keywords] Current search keyword: {keyword}") utils.logger.info(
f"[BilibiliCrawler.search_by_keywords] Current search keyword: {keyword}"
)
page = 1 page = 1
while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: while (
page - start_page + 1
) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info(f"[BilibiliCrawler.search_by_keywords] Skip page: {page}") utils.logger.info(
f"[BilibiliCrawler.search_by_keywords] Skip page: {page}"
)
page += 1 page += 1
continue continue
utils.logger.info(f"[BilibiliCrawler.search_by_keywords] search bilibili keyword: {keyword}, page: {page}") utils.logger.info(
f"[BilibiliCrawler.search_by_keywords] search bilibili keyword: {keyword}, page: {page}"
)
video_id_list: List[str] = [] video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword( videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword, keyword=keyword,
@@ -183,20 +210,29 @@ class BilibiliCrawler(AbstractCrawler):
page_size=bili_limit_count, page_size=bili_limit_count,
order=SearchOrderType.DEFAULT, order=SearchOrderType.DEFAULT,
pubtime_begin_s=0, # 作品发布日期起始时间戳 pubtime_begin_s=0, # 作品发布日期起始时间戳
pubtime_end_s=0 # 作品发布日期结束日期时间戳 pubtime_end_s=0, # 作品发布日期结束日期时间戳
) )
video_list: List[Dict] = videos_res.get("result") video_list: List[Dict] = videos_res.get("result")
if not video_list: if not video_list:
utils.logger.info(f"[BilibiliCrawler.search_by_keywords] No more videos for '{keyword}', moving to next keyword.") utils.logger.info(
f"[BilibiliCrawler.search_by_keywords] No more videos for '{keyword}', moving to next keyword."
)
break break
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [] task_list = []
try: try:
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] task_list = [
self.get_video_info_task(
aid=video_item.get("aid"), bvid="", semaphore=semaphore
)
for video_item in video_list
]
except Exception as e: except Exception as e:
utils.logger.warning(f"[BilibiliCrawler.search_by_keywords] error in the task list. The video for this page will not be included. {e}") utils.logger.warning(
f"[BilibiliCrawler.search_by_keywords] error in the task list. The video for this page will not be included. {e}"
)
video_items = await asyncio.gather(*task_list) video_items = await asyncio.gather(*task_list)
for video_item in video_items: for video_item in video_items:
if video_item: if video_item:
@@ -212,40 +248,74 @@ class BilibiliCrawler(AbstractCrawler):
Search bilibili video with keywords in a given time range. Search bilibili video with keywords in a given time range.
:param daily_limit: if True, strictly limit the number of notes per day and total. :param daily_limit: if True, strictly limit the number of notes per day and total.
""" """
utils.logger.info(f"[BilibiliCrawler.search_by_keywords_in_time_range] Begin search with daily_limit={daily_limit}") utils.logger.info(
f"[BilibiliCrawler.search_by_keywords_in_time_range] Begin search with daily_limit={daily_limit}"
)
bili_limit_count = 20 bili_limit_count = 20
start_page = config.START_PAGE start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info(f"[BilibiliCrawler.search_by_keywords_in_time_range] Current search keyword: {keyword}") utils.logger.info(
f"[BilibiliCrawler.search_by_keywords_in_time_range] Current search keyword: {keyword}"
)
total_notes_crawled_for_keyword = 0 total_notes_crawled_for_keyword = 0
for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'): for day in pd.date_range(
if daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT: start=config.START_DAY, end=config.END_DAY, freq="D"
utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days.") ):
if (
daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
utils.logger.info(
f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days."
)
break break
if not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT: if (
utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days.") not daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
utils.logger.info(
f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days."
)
break break
pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d')) pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(
start=day.strftime("%Y-%m-%d"), end=day.strftime("%Y-%m-%d")
)
page = 1 page = 1
notes_count_this_day = 0 notes_count_this_day = 0
while True: while True:
if notes_count_this_day >= config.MAX_NOTES_PER_DAY: if notes_count_this_day >= config.MAX_NOTES_PER_DAY:
utils.logger.info(f"[BilibiliCrawler.search] Reached MAX_NOTES_PER_DAY limit for {day.ctime()}.") utils.logger.info(
f"[BilibiliCrawler.search] Reached MAX_NOTES_PER_DAY limit for {day.ctime()}."
)
break break
if daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT: if (
utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}'.") daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
utils.logger.info(
f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}'."
)
break break
if not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT: if (
not daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
break break
try: try:
utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") utils.logger.info(
f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}"
)
video_id_list: List[str] = [] video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword( videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword, keyword=keyword,
@@ -253,23 +323,38 @@ class BilibiliCrawler(AbstractCrawler):
page_size=bili_limit_count, page_size=bili_limit_count,
order=SearchOrderType.DEFAULT, order=SearchOrderType.DEFAULT,
pubtime_begin_s=pubtime_begin_s, pubtime_begin_s=pubtime_begin_s,
pubtime_end_s=pubtime_end_s pubtime_end_s=pubtime_end_s,
) )
video_list: List[Dict] = videos_res.get("result") video_list: List[Dict] = videos_res.get("result")
if not video_list: if not video_list:
utils.logger.info(f"[BilibiliCrawler.search] No more videos for '{keyword}' on {day.ctime()}, moving to next day.") utils.logger.info(
f"[BilibiliCrawler.search] No more videos for '{keyword}' on {day.ctime()}, moving to next day."
)
break break
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] task_list = [
self.get_video_info_task(
aid=video_item.get("aid"), bvid="", semaphore=semaphore
)
for video_item in video_list
]
video_items = await asyncio.gather(*task_list) video_items = await asyncio.gather(*task_list)
for video_item in video_items: for video_item in video_items:
if video_item: if video_item:
if daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT: if (
daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
break break
if not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT: if (
not daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
break break
if notes_count_this_day >= config.MAX_NOTES_PER_DAY: if notes_count_this_day >= config.MAX_NOTES_PER_DAY:
break break
@@ -284,7 +369,9 @@ class BilibiliCrawler(AbstractCrawler):
await self.batch_get_video_comments(video_id_list) await self.batch_get_video_comments(video_id_list)
except Exception as e: except Exception as e:
utils.logger.error(f"[BilibiliCrawler.search] Error searching on {day.ctime()}: {e}") utils.logger.error(
f"[BilibiliCrawler.search] Error searching on {day.ctime()}: {e}"
)
break break
async def batch_get_video_comments(self, video_id_list: List[str]): async def batch_get_video_comments(self, video_id_list: List[str]):
@@ -295,16 +382,19 @@ class BilibiliCrawler(AbstractCrawler):
""" """
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info( utils.logger.info(
f"[BilibiliCrawler.batch_get_note_comments] Crawling comment mode is not enabled") f"[BilibiliCrawler.batch_get_note_comments] Crawling comment mode is not enabled"
)
return return
utils.logger.info( utils.logger.info(
f"[BilibiliCrawler.batch_get_video_comments] video ids:{video_id_list}") f"[BilibiliCrawler.batch_get_video_comments] video ids:{video_id_list}"
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
for video_id in video_id_list: for video_id in video_id_list:
task = asyncio.create_task(self.get_comments( task = asyncio.create_task(
video_id, semaphore), name=video_id) self.get_comments(video_id, semaphore), name=video_id
)
task_list.append(task) task_list.append(task)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
@@ -318,7 +408,8 @@ class BilibiliCrawler(AbstractCrawler):
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(
f"[BilibiliCrawler.get_comments] begin get video_id: {video_id} comments ...") f"[BilibiliCrawler.get_comments] begin get video_id: {video_id} comments ..."
)
await asyncio.sleep(random.uniform(0.5, 1.5)) await asyncio.sleep(random.uniform(0.5, 1.5))
await self.bili_client.get_video_all_comments( await self.bili_client.get_video_all_comments(
video_id=video_id, video_id=video_id,
@@ -330,10 +421,12 @@ class BilibiliCrawler(AbstractCrawler):
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_comments] get video_id: {video_id} comment error: {ex}") f"[BilibiliCrawler.get_comments] get video_id: {video_id} comment error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_comments] may be been blocked, err:{e}") f"[BilibiliCrawler.get_comments] may be been blocked, err:{e}"
)
# Propagate the exception to be caught by the main loop # Propagate the exception to be caught by the main loop
raise raise
@@ -360,8 +453,8 @@ class BilibiliCrawler(AbstractCrawler):
""" """
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [
self.get_video_info_task(aid=0, bvid=video_id, semaphore=semaphore) for video_id in self.get_video_info_task(aid=0, bvid=video_id, semaphore=semaphore)
bvids_list for video_id in bvids_list
] ]
video_details = await asyncio.gather(*task_list) video_details = await asyncio.gather(*task_list)
video_aids_list = [] video_aids_list = []
@@ -376,7 +469,9 @@ class BilibiliCrawler(AbstractCrawler):
await self.get_bilibili_video(video_detail, semaphore) await self.get_bilibili_video(video_detail, semaphore)
await self.batch_get_video_comments(video_aids_list) await self.batch_get_video_comments(video_aids_list)
async def get_video_info_task(self, aid: int, bvid: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: async def get_video_info_task(
self, aid: int, bvid: str, semaphore: asyncio.Semaphore
) -> Optional[Dict]:
""" """
Get video detail task Get video detail task
:param aid: :param aid:
@@ -390,14 +485,18 @@ class BilibiliCrawler(AbstractCrawler):
return result return result
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}") f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}") f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}"
)
return None return None
async def get_video_play_url_task(self, aid: int, cid: int, semaphore: asyncio.Semaphore) -> Union[Dict, None]: async def get_video_play_url_task(
self, aid: int, cid: int, semaphore: asyncio.Semaphore
) -> Union[Dict, None]:
""" """
Get video play url Get video play url
:param aid: :param aid:
@@ -411,22 +510,29 @@ class BilibiliCrawler(AbstractCrawler):
return result return result
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_video_play_url_task] Get video play url error: {ex}") f"[BilibiliCrawler.get_video_play_url_task] Get video play url error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_video_play_url_task] have not fund play url from :{aid}|{cid}, err: {ex}") f"[BilibiliCrawler.get_video_play_url_task] have not fund play url from :{aid}|{cid}, err: {ex}"
)
return None return None
async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient: async def create_bilibili_client(
self, httpx_proxy: Optional[str]
) -> BilibiliClient:
""" """
create bilibili client create bilibili client
:param httpx_proxy: httpx proxy :param httpx_proxy: httpx proxy
:return: bilibili client :return: bilibili client
""" """
utils.logger.info( utils.logger.info(
"[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ...") "[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ..."
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) )
cookie_str, cookie_dict = utils.convert_cookies(
await self.browser_context.cookies()
)
bilibili_client_obj = BilibiliClient( bilibili_client_obj = BilibiliClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
@@ -434,7 +540,7 @@ class BilibiliCrawler(AbstractCrawler):
"Cookie": cookie_str, "Cookie": cookie_str,
"Origin": "https://www.bilibili.com", "Origin": "https://www.bilibili.com",
"Referer": "https://www.bilibili.com", "Referer": "https://www.bilibili.com",
"Content-Type": "application/json;charset=UTF-8" "Content-Type": "application/json;charset=UTF-8",
}, },
playwright_page=self.context_page, playwright_page=self.context_page,
cookie_dict=cookie_dict, cookie_dict=cookie_dict,
@@ -442,7 +548,9 @@ class BilibiliCrawler(AbstractCrawler):
return bilibili_client_obj return bilibili_client_obj
@staticmethod @staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: def format_proxy_info(
ip_proxy_info: IpInfoModel,
) -> Tuple[Optional[Dict], Optional[Dict]]:
""" """
format proxy info for playwright and httpx format proxy info for playwright and httpx
:param ip_proxy_info: ip proxy info :param ip_proxy_info: ip proxy info
@@ -463,7 +571,7 @@ class BilibiliCrawler(AbstractCrawler):
chromium: BrowserType, chromium: BrowserType,
playwright_proxy: Optional[Dict], playwright_proxy: Optional[Dict],
user_agent: Optional[str], user_agent: Optional[str],
headless: bool = True headless: bool = True,
) -> BrowserContext: ) -> BrowserContext:
""" """
launch browser and create browser context launch browser and create browser context
@@ -474,32 +582,38 @@ class BilibiliCrawler(AbstractCrawler):
:return: browser context :return: browser context
""" """
utils.logger.info( utils.logger.info(
"[BilibiliCrawler.launch_browser] Begin create browser context ...") "[BilibiliCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
# feat issue #14 # feat issue #14
# we will save login state to avoid login every time # we will save login state to avoid login every time
user_data_dir = os.path.join(os.getcwd(), "browser_data", user_data_dir = os.path.join(
config.USER_DATA_DIR % config.PLATFORM) # type: ignore os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080},
user_agent=user_agent user_agent=user_agent,
) )
return browser_context return browser_context
else: else:
# type: ignore # type: ignore
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) browser = await chromium.launch(headless=headless, proxy=playwright_proxy)
browser_context = await browser.new_context( browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080}, user_agent=user_agent
user_agent=user_agent
) )
return browser_context return browser_context
async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], async def launch_browser_with_cdp(
user_agent: Optional[str], headless: bool = True) -> BrowserContext: self,
playwright: Playwright,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
""" """
使用CDP模式启动浏览器 使用CDP模式启动浏览器
""" """
@@ -509,7 +623,7 @@ class BilibiliCrawler(AbstractCrawler):
playwright=playwright, playwright=playwright,
playwright_proxy=playwright_proxy, playwright_proxy=playwright_proxy,
user_agent=user_agent, user_agent=user_agent,
headless=headless headless=headless,
) )
# 显示浏览器信息 # 显示浏览器信息
@@ -519,10 +633,14 @@ class BilibiliCrawler(AbstractCrawler):
return browser_context return browser_context
except Exception as e: except Exception as e:
utils.logger.error(f"[BilibiliCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(
f"[BilibiliCrawler] CDP模式启动失败回退到标准模式: {e}"
)
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser(chromium, playwright_proxy, user_agent, headless) return await self.launch_browser(
chromium, playwright_proxy, user_agent, headless
)
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""
@@ -535,9 +653,13 @@ class BilibiliCrawler(AbstractCrawler):
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[BilibiliCrawler.close] Browser context closed ...") utils.logger.info("[BilibiliCrawler.close] Browser context closed ...")
except TargetClosedError: except TargetClosedError:
utils.logger.warning("[BilibiliCrawler.close] Browser context was already closed.") utils.logger.warning(
"[BilibiliCrawler.close] Browser context was already closed."
)
except Exception as e: except Exception as e:
utils.logger.error(f"[BilibiliCrawler.close] An error occurred during close: {e}") utils.logger.error(
f"[BilibiliCrawler.close] An error occurred during close: {e}"
)
async def get_bilibili_video(self, video_item: Dict, semaphore: asyncio.Semaphore): async def get_bilibili_video(self, video_item: Dict, semaphore: asyncio.Semaphore):
""" """
@@ -547,14 +669,18 @@ class BilibiliCrawler(AbstractCrawler):
:return: :return:
""" """
if not config.ENABLE_GET_IMAGES: if not config.ENABLE_GET_IMAGES:
utils.logger.info(f"[BilibiliCrawler.get_bilibili_video] Crawling image mode is not enabled") utils.logger.info(
f"[BilibiliCrawler.get_bilibili_video] Crawling image mode is not enabled"
)
return return
video_item_view: Dict = video_item.get("View") video_item_view: Dict = video_item.get("View")
aid = video_item_view.get("aid") aid = video_item_view.get("aid")
cid = video_item_view.get("cid") cid = video_item_view.get("cid")
result = await self.get_video_play_url_task(aid, cid, semaphore) result = await self.get_video_play_url_task(aid, cid, semaphore)
if result is None: if result is None:
utils.logger.info("[BilibiliCrawler.get_bilibili_video] get video play url failed") utils.logger.info(
"[BilibiliCrawler.get_bilibili_video] get video play url failed"
)
return return
durl_list = result.get("durl") durl_list = result.get("durl")
max_size = -1 max_size = -1
@@ -565,7 +691,9 @@ class BilibiliCrawler(AbstractCrawler):
max_size = size max_size = size
video_url = durl.get("url") video_url = durl.get("url")
if video_url == "": if video_url == "":
utils.logger.info("[BilibiliCrawler.get_bilibili_video] get video url failed") utils.logger.info(
"[BilibiliCrawler.get_bilibili_video] get video url failed"
)
return return
content = await self.bili_client.get_video_media(video_url) content = await self.bili_client.get_video_media(video_url)
@@ -579,20 +707,24 @@ class BilibiliCrawler(AbstractCrawler):
creator_id_list: get details for creator from creator_id_list creator_id_list: get details for creator from creator_id_list
""" """
utils.logger.info( utils.logger.info(
f"[BilibiliCrawler.get_creator_details] Crawling the detalis of creator") f"[BilibiliCrawler.get_creator_details] Crawling the detalis of creator"
)
utils.logger.info( utils.logger.info(
f"[BilibiliCrawler.get_creator_details] creator ids:{creator_id_list}") f"[BilibiliCrawler.get_creator_details] creator ids:{creator_id_list}"
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
try: try:
for creator_id in creator_id_list: for creator_id in creator_id_list:
task = asyncio.create_task(self.get_creator_details( task = asyncio.create_task(
creator_id, semaphore), name=creator_id) self.get_creator_details(creator_id, semaphore), name=creator_id
)
task_list.append(task) task_list.append(task)
except Exception as e: except Exception as e:
utils.logger.warning( utils.logger.warning(
f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}") f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}"
)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
@@ -604,7 +736,9 @@ class BilibiliCrawler(AbstractCrawler):
:return: :return:
""" """
async with semaphore: async with semaphore:
creator_unhandled_info: Dict = await self.bili_client.get_creator_info(creator_id) creator_unhandled_info: Dict = await self.bili_client.get_creator_info(
creator_id
)
creator_info: Dict = { creator_info: Dict = {
"id": creator_id, "id": creator_id,
"name": creator_unhandled_info.get("name"), "name": creator_unhandled_info.get("name"),
@@ -626,7 +760,8 @@ class BilibiliCrawler(AbstractCrawler):
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(
f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans ...") f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans ..."
)
await self.bili_client.get_creator_all_fans( await self.bili_client.get_creator_all_fans(
creator_info=creator_info, creator_info=creator_info,
crawl_interval=random.random(), crawl_interval=random.random(),
@@ -636,10 +771,12 @@ class BilibiliCrawler(AbstractCrawler):
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_fans] get creator_id: {creator_id} fans error: {ex}") f"[BilibiliCrawler.get_fans] get creator_id: {creator_id} fans error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_fans] may be been blocked, err:{e}") f"[BilibiliCrawler.get_fans] may be been blocked, err:{e}"
)
async def get_followings(self, creator_info: Dict, semaphore: asyncio.Semaphore): async def get_followings(self, creator_info: Dict, semaphore: asyncio.Semaphore):
""" """
@@ -652,7 +789,8 @@ class BilibiliCrawler(AbstractCrawler):
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(
f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings ...") f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings ..."
)
await self.bili_client.get_creator_all_followings( await self.bili_client.get_creator_all_followings(
creator_info=creator_info, creator_info=creator_info,
crawl_interval=random.random(), crawl_interval=random.random(),
@@ -662,10 +800,12 @@ class BilibiliCrawler(AbstractCrawler):
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_followings] get creator_id: {creator_id} followings error: {ex}") f"[BilibiliCrawler.get_followings] get creator_id: {creator_id} followings error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}") f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}"
)
async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore): async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore):
""" """
@@ -678,7 +818,8 @@ class BilibiliCrawler(AbstractCrawler):
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(
f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ...") f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ..."
)
await self.bili_client.get_creator_all_dynamics( await self.bili_client.get_creator_all_dynamics(
creator_info=creator_info, creator_info=creator_info,
crawl_interval=random.random(), crawl_interval=random.random(),
@@ -688,7 +829,9 @@ class BilibiliCrawler(AbstractCrawler):
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}") f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(
f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}") f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}"
)

View File

@@ -15,10 +15,15 @@ import random
from asyncio import Task from asyncio import Task
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from playwright.async_api import (BrowserContext, BrowserType, Page, Playwright, from playwright.async_api import (
async_playwright) BrowserContext,
BrowserType,
Page,
Playwright,
async_playwright,
)
from . import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import douyin as douyin_store from store import douyin as douyin_store
@@ -45,17 +50,23 @@ class DouYinCrawler(AbstractCrawler):
async def start(self) -> None: async def start(self) -> None:
playwright_proxy_format, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) ip_proxy_pool = await create_ip_pool(
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info) playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(
ip_proxy_info
)
async with async_playwright() as playwright: async with async_playwright() as playwright:
# 根据配置选择启动模式 # 根据配置选择启动模式
if config.ENABLE_CDP_MODE: if config.ENABLE_CDP_MODE:
utils.logger.info("[DouYinCrawler] 使用CDP模式启动浏览器") utils.logger.info("[DouYinCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp( self.browser_context = await self.launch_browser_with_cdp(
playwright, playwright_proxy_format, None, playwright,
headless=config.CDP_HEADLESS playwright_proxy_format,
None,
headless=config.CDP_HEADLESS,
) )
else: else:
utils.logger.info("[DouYinCrawler] 使用标准模式启动浏览器") utils.logger.info("[DouYinCrawler] 使用标准模式启动浏览器")
@@ -65,7 +76,7 @@ class DouYinCrawler(AbstractCrawler):
chromium, chromium,
playwright_proxy_format, playwright_proxy_format,
user_agent=None, user_agent=None,
headless=config.HEADLESS headless=config.HEADLESS,
) )
# stealth.min.js is a js script to prevent the website from detecting the crawler. # stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js") await self.browser_context.add_init_script(path="libs/stealth.min.js")
@@ -79,10 +90,12 @@ class DouYinCrawler(AbstractCrawler):
login_phone="", # you phone number login_phone="", # you phone number
browser_context=self.browser_context, browser_context=self.browser_context,
context_page=self.context_page, context_page=self.context_page,
cookie_str=config.COOKIES cookie_str=config.COOKIES,
) )
await login_obj.begin() await login_obj.begin()
await self.dy_client.update_cookies(browser_context=self.browser_context) await self.dy_client.update_cookies(
browser_context=self.browser_context
)
crawler_type_var.set(config.CRAWLER_TYPE) crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search": if config.CRAWLER_TYPE == "search":
# Search for notes and retrieve their comment information. # Search for notes and retrieve their comment information.
@@ -108,47 +121,62 @@ class DouYinCrawler(AbstractCrawler):
aweme_list: List[str] = [] aweme_list: List[str] = []
page = 0 page = 0
dy_search_id = "" dy_search_id = ""
while (page - start_page + 1) * dy_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: while (
page - start_page + 1
) * dy_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info(f"[DouYinCrawler.search] Skip {page}") utils.logger.info(f"[DouYinCrawler.search] Skip {page}")
page += 1 page += 1
continue continue
try: try:
utils.logger.info(f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page}") utils.logger.info(
posts_res = await self.dy_client.search_info_by_keyword(keyword=keyword, f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page}"
)
posts_res = await self.dy_client.search_info_by_keyword(
keyword=keyword,
offset=page * dy_limit_count - dy_limit_count, offset=page * dy_limit_count - dy_limit_count,
publish_time=PublishTimeType(config.PUBLISH_TIME_TYPE), publish_time=PublishTimeType(config.PUBLISH_TIME_TYPE),
search_id=dy_search_id search_id=dy_search_id,
) )
if posts_res.get("data") is None or posts_res.get("data") == []: if posts_res.get("data") is None or posts_res.get("data") == []:
utils.logger.info(f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page} is empty,{posts_res.get('data')}`") utils.logger.info(
f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page} is empty,{posts_res.get('data')}`"
)
break break
except DataFetchError: except DataFetchError:
utils.logger.error(f"[DouYinCrawler.search] search douyin keyword: {keyword} failed") utils.logger.error(
f"[DouYinCrawler.search] search douyin keyword: {keyword} failed"
)
break break
page += 1 page += 1
if "data" not in posts_res: if "data" not in posts_res:
utils.logger.error( utils.logger.error(
f"[DouYinCrawler.search] search douyin keyword: {keyword} failed账号也许被风控了。") f"[DouYinCrawler.search] search douyin keyword: {keyword} failed账号也许被风控了。"
)
break break
dy_search_id = posts_res.get("extra", {}).get("logid", "") dy_search_id = posts_res.get("extra", {}).get("logid", "")
for post_item in posts_res.get("data"): for post_item in posts_res.get("data"):
try: try:
aweme_info: Dict = post_item.get("aweme_info") or \ aweme_info: Dict = (
post_item.get("aweme_mix_info", {}).get("mix_items")[0] post_item.get("aweme_info")
or post_item.get("aweme_mix_info", {}).get("mix_items")[0]
)
except TypeError: except TypeError:
continue continue
aweme_list.append(aweme_info.get("aweme_id", "")) aweme_list.append(aweme_info.get("aweme_id", ""))
await douyin_store.update_douyin_aweme(aweme_item=aweme_info) await douyin_store.update_douyin_aweme(aweme_item=aweme_info)
utils.logger.info(f"[DouYinCrawler.search] keyword:{keyword}, aweme_list:{aweme_list}") utils.logger.info(
f"[DouYinCrawler.search] keyword:{keyword}, aweme_list:{aweme_list}"
)
await self.batch_get_note_comments(aweme_list) await self.batch_get_note_comments(aweme_list)
async def get_specified_awemes(self): async def get_specified_awemes(self):
"""Get the information and comments of the specified post""" """Get the information and comments of the specified post"""
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [
self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore) for aweme_id in config.DY_SPECIFIED_ID_LIST self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore)
for aweme_id in config.DY_SPECIFIED_ID_LIST
] ]
aweme_details = await asyncio.gather(*task_list) aweme_details = await asyncio.gather(*task_list)
for aweme_detail in aweme_details: for aweme_detail in aweme_details:
@@ -156,17 +184,22 @@ class DouYinCrawler(AbstractCrawler):
await douyin_store.update_douyin_aweme(aweme_detail) await douyin_store.update_douyin_aweme(aweme_detail)
await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST) await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST)
async def get_aweme_detail(self, aweme_id: str, semaphore: asyncio.Semaphore) -> Any: async def get_aweme_detail(
self, aweme_id: str, semaphore: asyncio.Semaphore
) -> Any:
"""Get note detail""" """Get note detail"""
async with semaphore: async with semaphore:
try: try:
return await self.dy_client.get_video_by_id(aweme_id) return await self.dy_client.get_video_by_id(aweme_id)
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error(f"[DouYinCrawler.get_aweme_detail] Get aweme detail error: {ex}") utils.logger.error(
f"[DouYinCrawler.get_aweme_detail] Get aweme detail error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(
f"[DouYinCrawler.get_aweme_detail] have not fund note detail aweme_id:{aweme_id}, err: {ex}") f"[DouYinCrawler.get_aweme_detail] have not fund note detail aweme_id:{aweme_id}, err: {ex}"
)
return None return None
async def batch_get_note_comments(self, aweme_list: List[str]) -> None: async def batch_get_note_comments(self, aweme_list: List[str]) -> None:
@@ -174,14 +207,17 @@ class DouYinCrawler(AbstractCrawler):
Batch get note comments Batch get note comments
""" """
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info(f"[DouYinCrawler.batch_get_note_comments] Crawling comment mode is not enabled") utils.logger.info(
f"[DouYinCrawler.batch_get_note_comments] Crawling comment mode is not enabled"
)
return return
task_list: List[Task] = [] task_list: List[Task] = []
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
for aweme_id in aweme_list: for aweme_id in aweme_list:
task = asyncio.create_task( task = asyncio.create_task(
self.get_comments(aweme_id, semaphore), name=aweme_id) self.get_comments(aweme_id, semaphore), name=aweme_id
)
task_list.append(task) task_list.append(task)
if len(task_list) > 0: if len(task_list) > 0:
await asyncio.wait(task_list) await asyncio.wait(task_list)
@@ -195,18 +231,23 @@ class DouYinCrawler(AbstractCrawler):
crawl_interval=random.random(), crawl_interval=random.random(),
is_fetch_sub_comments=config.ENABLE_GET_SUB_COMMENTS, is_fetch_sub_comments=config.ENABLE_GET_SUB_COMMENTS,
callback=douyin_store.batch_update_dy_aweme_comments, callback=douyin_store.batch_update_dy_aweme_comments,
max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,
) )
utils.logger.info( utils.logger.info(
f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} comments have all been obtained and filtered ...") f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} comments have all been obtained and filtered ..."
)
except DataFetchError as e: except DataFetchError as e:
utils.logger.error(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} get comments failed, error: {e}") utils.logger.error(
f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} get comments failed, error: {e}"
)
async def get_creators_and_videos(self) -> None: async def get_creators_and_videos(self) -> None:
""" """
Get the information and videos of the specified creator Get the information and videos of the specified creator
""" """
utils.logger.info("[DouYinCrawler.get_creators_and_videos] Begin get douyin creators") utils.logger.info(
"[DouYinCrawler.get_creators_and_videos] Begin get douyin creators"
)
for user_id in config.DY_CREATOR_ID_LIST: for user_id in config.DY_CREATOR_ID_LIST:
creator_info: Dict = await self.dy_client.get_user_info(user_id) creator_info: Dict = await self.dy_client.get_user_info(user_id)
if creator_info: if creator_info:
@@ -214,8 +255,7 @@ class DouYinCrawler(AbstractCrawler):
# Get all video information of the creator # Get all video information of the creator
all_video_list = await self.dy_client.get_all_user_aweme_posts( all_video_list = await self.dy_client.get_all_user_aweme_posts(
sec_user_id=user_id, sec_user_id=user_id, callback=self.fetch_creator_video_detail
callback=self.fetch_creator_video_detail
) )
video_ids = [video_item.get("aweme_id") for video_item in all_video_list] video_ids = [video_item.get("aweme_id") for video_item in all_video_list]
@@ -227,7 +267,8 @@ class DouYinCrawler(AbstractCrawler):
""" """
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [
self.get_aweme_detail(post_item.get("aweme_id"), semaphore) for post_item in video_list self.get_aweme_detail(post_item.get("aweme_id"), semaphore)
for post_item in video_list
] ]
note_details = await asyncio.gather(*task_list) note_details = await asyncio.gather(*task_list)
@@ -236,7 +277,9 @@ class DouYinCrawler(AbstractCrawler):
await douyin_store.update_douyin_aweme(aweme_item) await douyin_store.update_douyin_aweme(aweme_item)
@staticmethod @staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: def format_proxy_info(
ip_proxy_info: IpInfoModel,
) -> Tuple[Optional[Dict], Optional[Dict]]:
"""format proxy info for playwright and httpx""" """format proxy info for playwright and httpx"""
playwright_proxy = { playwright_proxy = {
"server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}", "server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}",
@@ -254,12 +297,14 @@ class DouYinCrawler(AbstractCrawler):
douyin_client = DOUYINClient( douyin_client = DOUYINClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
"User-Agent": await self.context_page.evaluate("() => navigator.userAgent"), "User-Agent": await self.context_page.evaluate(
"() => navigator.userAgent"
),
"Cookie": cookie_str, "Cookie": cookie_str,
"Host": "www.douyin.com", "Host": "www.douyin.com",
"Origin": "https://www.douyin.com/", "Origin": "https://www.douyin.com/",
"Referer": "https://www.douyin.com/", "Referer": "https://www.douyin.com/",
"Content-Type": "application/json;charset=UTF-8" "Content-Type": "application/json;charset=UTF-8",
}, },
playwright_page=self.context_page, playwright_page=self.context_page,
cookie_dict=cookie_dict, cookie_dict=cookie_dict,
@@ -271,31 +316,36 @@ class DouYinCrawler(AbstractCrawler):
chromium: BrowserType, chromium: BrowserType,
playwright_proxy: Optional[Dict], playwright_proxy: Optional[Dict],
user_agent: Optional[str], user_agent: Optional[str],
headless: bool = True headless: bool = True,
) -> BrowserContext: ) -> BrowserContext:
"""Launch browser and create browser context""" """Launch browser and create browser context"""
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
user_data_dir = os.path.join(os.getcwd(), "browser_data", user_data_dir = os.path.join(
config.USER_DATA_DIR % config.PLATFORM) # type: ignore os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080},
user_agent=user_agent user_agent=user_agent,
) # type: ignore ) # type: ignore
return browser_context return browser_context
else: else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context( browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080}, user_agent=user_agent
user_agent=user_agent
) )
return browser_context return browser_context
async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], async def launch_browser_with_cdp(
user_agent: Optional[str], headless: bool = True) -> BrowserContext: self,
playwright: Playwright,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
""" """
使用CDP模式启动浏览器 使用CDP模式启动浏览器
""" """
@@ -305,7 +355,7 @@ class DouYinCrawler(AbstractCrawler):
playwright=playwright, playwright=playwright,
playwright_proxy=playwright_proxy, playwright_proxy=playwright_proxy,
user_agent=user_agent, user_agent=user_agent,
headless=headless headless=headless,
) )
# 添加反检测脚本 # 添加反检测脚本
@@ -321,7 +371,9 @@ class DouYinCrawler(AbstractCrawler):
utils.logger.error(f"[DouYinCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(f"[DouYinCrawler] CDP模式启动失败回退到标准模式: {e}")
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser(chromium, playwright_proxy, user_agent, headless) return await self.launch_browser(
chromium, playwright_proxy, user_agent, headless
)
async def close(self) -> None: async def close(self) -> None:
"""Close browser context""" """Close browser context"""

View File

@@ -16,9 +16,15 @@ import time
from asyncio import Task from asyncio import Task
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from playwright.async_api import BrowserContext, BrowserType, Page, Playwright, async_playwright from playwright.async_api import (
BrowserContext,
BrowserType,
Page,
Playwright,
async_playwright,
)
from . import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import kuaishou as kuaishou_store from store import kuaishou as kuaishou_store
@@ -58,8 +64,10 @@ class KuaishouCrawler(AbstractCrawler):
if config.ENABLE_CDP_MODE: if config.ENABLE_CDP_MODE:
utils.logger.info("[KuaishouCrawler] 使用CDP模式启动浏览器") utils.logger.info("[KuaishouCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp( self.browser_context = await self.launch_browser_with_cdp(
playwright, playwright_proxy_format, self.user_agent, playwright,
headless=config.CDP_HEADLESS playwright_proxy_format,
self.user_agent,
headless=config.CDP_HEADLESS,
) )
else: else:
utils.logger.info("[KuaishouCrawler] 使用标准模式启动浏览器") utils.logger.info("[KuaishouCrawler] 使用标准模式启动浏览器")
@@ -319,8 +327,13 @@ class KuaishouCrawler(AbstractCrawler):
) )
return browser_context return browser_context
async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], async def launch_browser_with_cdp(
user_agent: Optional[str], headless: bool = True) -> BrowserContext: self,
playwright: Playwright,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
""" """
使用CDP模式启动浏览器 使用CDP模式启动浏览器
""" """
@@ -330,7 +343,7 @@ class KuaishouCrawler(AbstractCrawler):
playwright=playwright, playwright=playwright,
playwright_proxy=playwright_proxy, playwright_proxy=playwright_proxy,
user_agent=user_agent, user_agent=user_agent,
headless=headless headless=headless,
) )
# 显示浏览器信息 # 显示浏览器信息
@@ -340,10 +353,14 @@ class KuaishouCrawler(AbstractCrawler):
return browser_context return browser_context
except Exception as e: except Exception as e:
utils.logger.error(f"[KuaishouCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(
f"[KuaishouCrawler] CDP模式启动失败回退到标准模式: {e}"
)
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser(chromium, playwright_proxy, user_agent, headless) return await self.launch_browser(
chromium, playwright_proxy, user_agent, headless
)
async def get_creators_and_videos(self) -> None: async def get_creators_and_videos(self) -> None:
"""Get creator's videos and retrieve their comment information.""" """Get creator's videos and retrieve their comment information."""

View File

@@ -15,10 +15,15 @@ import random
from asyncio import Task from asyncio import Task
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from playwright.async_api import (BrowserContext, BrowserType, Page, Playwright, from playwright.async_api import (
async_playwright) BrowserContext,
BrowserType,
Page,
Playwright,
async_playwright,
)
from . import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from model.m_baidu_tieba import TiebaCreator, TiebaNote from model.m_baidu_tieba import TiebaCreator, TiebaNote
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
@@ -54,11 +59,17 @@ class TieBaCrawler(AbstractCrawler):
""" """
ip_proxy_pool, httpx_proxy_format = None, None ip_proxy_pool, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
utils.logger.info("[BaiduTieBaCrawler.start] Begin create ip proxy pool ...") utils.logger.info(
ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) "[BaiduTieBaCrawler.start] Begin create ip proxy pool ..."
)
ip_proxy_pool = await create_ip_pool(
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
_, httpx_proxy_format = format_proxy_info(ip_proxy_info) _, httpx_proxy_format = format_proxy_info(ip_proxy_info)
utils.logger.info(f"[BaiduTieBaCrawler.start] Init default ip proxy, value: {httpx_proxy_format}") utils.logger.info(
f"[BaiduTieBaCrawler.start] Init default ip proxy, value: {httpx_proxy_format}"
)
# Create a client to interact with the baidutieba website. # Create a client to interact with the baidutieba website.
self.tieba_client = BaiduTieBaClient( self.tieba_client = BaiduTieBaClient(
@@ -87,38 +98,55 @@ class TieBaCrawler(AbstractCrawler):
Returns: Returns:
""" """
utils.logger.info("[BaiduTieBaCrawler.search] Begin search baidu tieba keywords") utils.logger.info(
"[BaiduTieBaCrawler.search] Begin search baidu tieba keywords"
)
tieba_limit_count = 10 # tieba limit page fixed value tieba_limit_count = 10 # tieba limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < tieba_limit_count: if config.CRAWLER_MAX_NOTES_COUNT < tieba_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = tieba_limit_count config.CRAWLER_MAX_NOTES_COUNT = tieba_limit_count
start_page = config.START_PAGE start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info(f"[BaiduTieBaCrawler.search] Current search keyword: {keyword}") utils.logger.info(
f"[BaiduTieBaCrawler.search] Current search keyword: {keyword}"
)
page = 1 page = 1
while (page - start_page + 1) * tieba_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: while (
page - start_page + 1
) * tieba_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info(f"[BaiduTieBaCrawler.search] Skip page {page}") utils.logger.info(f"[BaiduTieBaCrawler.search] Skip page {page}")
page += 1 page += 1
continue continue
try: try:
utils.logger.info(f"[BaiduTieBaCrawler.search] search tieba keyword: {keyword}, page: {page}") utils.logger.info(
notes_list: List[TiebaNote] = await self.tieba_client.get_notes_by_keyword( f"[BaiduTieBaCrawler.search] search tieba keyword: {keyword}, page: {page}"
)
notes_list: List[TiebaNote] = (
await self.tieba_client.get_notes_by_keyword(
keyword=keyword, keyword=keyword,
page=page, page=page,
page_size=tieba_limit_count, page_size=tieba_limit_count,
sort=SearchSortType.TIME_DESC, sort=SearchSortType.TIME_DESC,
note_type=SearchNoteType.FIXED_THREAD note_type=SearchNoteType.FIXED_THREAD,
)
) )
if not notes_list: if not notes_list:
utils.logger.info(f"[BaiduTieBaCrawler.search] Search note list is empty") utils.logger.info(
f"[BaiduTieBaCrawler.search] Search note list is empty"
)
break break
utils.logger.info(f"[BaiduTieBaCrawler.search] Note list len: {len(notes_list)}") utils.logger.info(
await self.get_specified_notes(note_id_list=[note_detail.note_id for note_detail in notes_list]) f"[BaiduTieBaCrawler.search] Note list len: {len(notes_list)}"
)
await self.get_specified_notes(
note_id_list=[note_detail.note_id for note_detail in notes_list]
)
page += 1 page += 1
except Exception as ex: except Exception as ex:
utils.logger.error( utils.logger.error(
f"[BaiduTieBaCrawler.search] Search keywords error, current page: {page}, current keyword: {keyword}, err: {ex}") f"[BaiduTieBaCrawler.search] Search keywords error, current page: {page}, current keyword: {keyword}, err: {ex}"
)
break break
async def get_specified_tieba_notes(self): async def get_specified_tieba_notes(self):
@@ -132,24 +160,30 @@ class TieBaCrawler(AbstractCrawler):
config.CRAWLER_MAX_NOTES_COUNT = tieba_limit_count config.CRAWLER_MAX_NOTES_COUNT = tieba_limit_count
for tieba_name in config.TIEBA_NAME_LIST: for tieba_name in config.TIEBA_NAME_LIST:
utils.logger.info( utils.logger.info(
f"[BaiduTieBaCrawler.get_specified_tieba_notes] Begin get tieba name: {tieba_name}") f"[BaiduTieBaCrawler.get_specified_tieba_notes] Begin get tieba name: {tieba_name}"
)
page_number = 0 page_number = 0
while page_number <= config.CRAWLER_MAX_NOTES_COUNT: while page_number <= config.CRAWLER_MAX_NOTES_COUNT:
note_list: List[TiebaNote] = await self.tieba_client.get_notes_by_tieba_name( note_list: List[TiebaNote] = (
tieba_name=tieba_name, await self.tieba_client.get_notes_by_tieba_name(
page_num=page_number tieba_name=tieba_name, page_num=page_number
)
) )
if not note_list: if not note_list:
utils.logger.info( utils.logger.info(
f"[BaiduTieBaCrawler.get_specified_tieba_notes] Get note list is empty") f"[BaiduTieBaCrawler.get_specified_tieba_notes] Get note list is empty"
)
break break
utils.logger.info( utils.logger.info(
f"[BaiduTieBaCrawler.get_specified_tieba_notes] tieba name: {tieba_name} note list len: {len(note_list)}") f"[BaiduTieBaCrawler.get_specified_tieba_notes] tieba name: {tieba_name} note list len: {len(note_list)}"
)
await self.get_specified_notes([note.note_id for note in note_list]) await self.get_specified_notes([note.note_id for note in note_list])
page_number += tieba_limit_count page_number += tieba_limit_count
async def get_specified_notes(self, note_id_list: List[str] = config.TIEBA_SPECIFIED_ID_LIST): async def get_specified_notes(
self, note_id_list: List[str] = config.TIEBA_SPECIFIED_ID_LIST
):
""" """
Get the information and comments of the specified post Get the information and comments of the specified post
Args: Args:
@@ -160,7 +194,8 @@ class TieBaCrawler(AbstractCrawler):
""" """
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [
self.get_note_detail_async_task(note_id=note_id, semaphore=semaphore) for note_id in note_id_list self.get_note_detail_async_task(note_id=note_id, semaphore=semaphore)
for note_id in note_id_list
] ]
note_details = await asyncio.gather(*task_list) note_details = await asyncio.gather(*task_list)
note_details_model: List[TiebaNote] = [] note_details_model: List[TiebaNote] = []
@@ -170,7 +205,9 @@ class TieBaCrawler(AbstractCrawler):
await tieba_store.update_tieba_note(note_detail) await tieba_store.update_tieba_note(note_detail)
await self.batch_get_note_comments(note_details_model) await self.batch_get_note_comments(note_details_model)
async def get_note_detail_async_task(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[TiebaNote]: async def get_note_detail_async_task(
self, note_id: str, semaphore: asyncio.Semaphore
) -> Optional[TiebaNote]:
""" """
Get note detail Get note detail
Args: Args:
@@ -182,19 +219,25 @@ class TieBaCrawler(AbstractCrawler):
""" """
async with semaphore: async with semaphore:
try: try:
utils.logger.info(f"[BaiduTieBaCrawler.get_note_detail] Begin get note detail, note_id: {note_id}") utils.logger.info(
f"[BaiduTieBaCrawler.get_note_detail] Begin get note detail, note_id: {note_id}"
)
note_detail: TiebaNote = await self.tieba_client.get_note_by_id(note_id) note_detail: TiebaNote = await self.tieba_client.get_note_by_id(note_id)
if not note_detail: if not note_detail:
utils.logger.error( utils.logger.error(
f"[BaiduTieBaCrawler.get_note_detail] Get note detail error, note_id: {note_id}") f"[BaiduTieBaCrawler.get_note_detail] Get note detail error, note_id: {note_id}"
)
return None return None
return note_detail return note_detail
except Exception as ex: except Exception as ex:
utils.logger.error(f"[BaiduTieBaCrawler.get_note_detail] Get note detail error: {ex}") utils.logger.error(
f"[BaiduTieBaCrawler.get_note_detail] Get note detail error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(
f"[BaiduTieBaCrawler.get_note_detail] have not fund note detail note_id:{note_id}, err: {ex}") f"[BaiduTieBaCrawler.get_note_detail] have not fund note detail note_id:{note_id}, err: {ex}"
)
return None return None
async def batch_get_note_comments(self, note_detail_list: List[TiebaNote]): async def batch_get_note_comments(self, note_detail_list: List[TiebaNote]):
@@ -212,11 +255,16 @@ class TieBaCrawler(AbstractCrawler):
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
for note_detail in note_detail_list: for note_detail in note_detail_list:
task = asyncio.create_task(self.get_comments_async_task(note_detail, semaphore), name=note_detail.note_id) task = asyncio.create_task(
self.get_comments_async_task(note_detail, semaphore),
name=note_detail.note_id,
)
task_list.append(task) task_list.append(task)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
async def get_comments_async_task(self, note_detail: TiebaNote, semaphore: asyncio.Semaphore): async def get_comments_async_task(
self, note_detail: TiebaNote, semaphore: asyncio.Semaphore
):
""" """
Get comments async task Get comments async task
Args: Args:
@@ -227,12 +275,14 @@ class TieBaCrawler(AbstractCrawler):
""" """
async with semaphore: async with semaphore:
utils.logger.info(f"[BaiduTieBaCrawler.get_comments] Begin get note id comments {note_detail.note_id}") utils.logger.info(
f"[BaiduTieBaCrawler.get_comments] Begin get note id comments {note_detail.note_id}"
)
await self.tieba_client.get_note_all_comments( await self.tieba_client.get_note_all_comments(
note_detail=note_detail, note_detail=note_detail,
crawl_interval=random.random(), crawl_interval=random.random(),
callback=tieba_store.batch_update_tieba_note_comments, callback=tieba_store.batch_update_tieba_note_comments,
max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,
) )
async def get_creators_and_notes(self) -> None: async def get_creators_and_notes(self) -> None:
@@ -241,38 +291,49 @@ class TieBaCrawler(AbstractCrawler):
Returns: Returns:
""" """
utils.logger.info("[WeiboCrawler.get_creators_and_notes] Begin get weibo creators") utils.logger.info(
"[WeiboCrawler.get_creators_and_notes] Begin get weibo creators"
)
for creator_url in config.TIEBA_CREATOR_URL_LIST: for creator_url in config.TIEBA_CREATOR_URL_LIST:
creator_page_html_content = await self.tieba_client.get_creator_info_by_url(creator_url=creator_url) creator_page_html_content = await self.tieba_client.get_creator_info_by_url(
creator_info: TiebaCreator = self._page_extractor.extract_creator_info(creator_page_html_content) creator_url=creator_url
)
creator_info: TiebaCreator = self._page_extractor.extract_creator_info(
creator_page_html_content
)
if creator_info: if creator_info:
utils.logger.info(f"[WeiboCrawler.get_creators_and_notes] creator info: {creator_info}") utils.logger.info(
f"[WeiboCrawler.get_creators_and_notes] creator info: {creator_info}"
)
if not creator_info: if not creator_info:
raise Exception("Get creator info error") raise Exception("Get creator info error")
await tieba_store.save_creator(user_info=creator_info) await tieba_store.save_creator(user_info=creator_info)
# Get all note information of the creator # Get all note information of the creator
all_notes_list = await self.tieba_client.get_all_notes_by_creator_user_name( all_notes_list = (
await self.tieba_client.get_all_notes_by_creator_user_name(
user_name=creator_info.user_name, user_name=creator_info.user_name,
crawl_interval=0, crawl_interval=0,
callback=tieba_store.batch_update_tieba_notes, callback=tieba_store.batch_update_tieba_notes,
max_note_count=config.CRAWLER_MAX_NOTES_COUNT, max_note_count=config.CRAWLER_MAX_NOTES_COUNT,
creator_page_html_content=creator_page_html_content, creator_page_html_content=creator_page_html_content,
) )
)
await self.batch_get_note_comments(all_notes_list) await self.batch_get_note_comments(all_notes_list)
else: else:
utils.logger.error( utils.logger.error(
f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_url:{creator_url}") f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_url:{creator_url}"
)
async def launch_browser( async def launch_browser(
self, self,
chromium: BrowserType, chromium: BrowserType,
playwright_proxy: Optional[Dict], playwright_proxy: Optional[Dict],
user_agent: Optional[str], user_agent: Optional[str],
headless: bool = True headless: bool = True,
) -> BrowserContext: ) -> BrowserContext:
""" """
Launch browser and create browser Launch browser and create browser
@@ -285,31 +346,38 @@ class TieBaCrawler(AbstractCrawler):
Returns: Returns:
""" """
utils.logger.info("[BaiduTieBaCrawler.launch_browser] Begin create browser context ...") utils.logger.info(
"[BaiduTieBaCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
# feat issue #14 # feat issue #14
# we will save login state to avoid login every time # we will save login state to avoid login every time
user_data_dir = os.path.join(os.getcwd(), "browser_data", user_data_dir = os.path.join(
config.USER_DATA_DIR % config.PLATFORM) # type: ignore os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080},
user_agent=user_agent user_agent=user_agent,
) )
return browser_context return browser_context
else: else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context( browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080}, user_agent=user_agent
user_agent=user_agent
) )
return browser_context return browser_context
async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], async def launch_browser_with_cdp(
user_agent: Optional[str], headless: bool = True) -> BrowserContext: self,
playwright: Playwright,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
""" """
使用CDP模式启动浏览器 使用CDP模式启动浏览器
""" """
@@ -319,7 +387,7 @@ class TieBaCrawler(AbstractCrawler):
playwright=playwright, playwright=playwright,
playwright_proxy=playwright_proxy, playwright_proxy=playwright_proxy,
user_agent=user_agent, user_agent=user_agent,
headless=headless headless=headless,
) )
# 显示浏览器信息 # 显示浏览器信息
@@ -332,7 +400,9 @@ class TieBaCrawler(AbstractCrawler):
utils.logger.error(f"[TieBaCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(f"[TieBaCrawler] CDP模式启动失败回退到标准模式: {e}")
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser(chromium, playwright_proxy, user_agent, headless) return await self.launch_browser(
chromium, playwright_proxy, user_agent, headless
)
async def close(self): async def close(self):
""" """

View File

@@ -21,10 +21,15 @@ import random
from asyncio import Task from asyncio import Task
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from playwright.async_api import (BrowserContext, BrowserType, Page, Playwright, from playwright.async_api import (
async_playwright) BrowserContext,
BrowserType,
Page,
Playwright,
async_playwright,
)
from . import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import weibo as weibo_store from store import weibo as weibo_store
@@ -55,27 +60,30 @@ class WeiboCrawler(AbstractCrawler):
async def start(self): async def start(self):
playwright_proxy_format, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) ip_proxy_pool = await create_ip_pool(
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info) playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(
ip_proxy_info
)
async with async_playwright() as playwright: async with async_playwright() as playwright:
# 根据配置选择启动模式 # 根据配置选择启动模式
if config.ENABLE_CDP_MODE: if config.ENABLE_CDP_MODE:
utils.logger.info("[WeiboCrawler] 使用CDP模式启动浏览器") utils.logger.info("[WeiboCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp( self.browser_context = await self.launch_browser_with_cdp(
playwright, playwright_proxy_format, self.mobile_user_agent, playwright,
headless=config.CDP_HEADLESS playwright_proxy_format,
self.mobile_user_agent,
headless=config.CDP_HEADLESS,
) )
else: else:
utils.logger.info("[WeiboCrawler] 使用标准模式启动浏览器") utils.logger.info("[WeiboCrawler] 使用标准模式启动浏览器")
# Launch a browser context. # Launch a browser context.
chromium = playwright.chromium chromium = playwright.chromium
self.browser_context = await self.launch_browser( self.browser_context = await self.launch_browser(
chromium, chromium, None, self.mobile_user_agent, headless=config.HEADLESS
None,
self.mobile_user_agent,
headless=config.HEADLESS
) )
# stealth.min.js is a js script to prevent the website from detecting the crawler. # stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js") await self.browser_context.add_init_script(path="libs/stealth.min.js")
@@ -90,15 +98,19 @@ class WeiboCrawler(AbstractCrawler):
login_phone="", # your phone number login_phone="", # your phone number
browser_context=self.browser_context, browser_context=self.browser_context,
context_page=self.context_page, context_page=self.context_page,
cookie_str=config.COOKIES cookie_str=config.COOKIES,
) )
await login_obj.begin() await login_obj.begin()
# 登录成功后重定向到手机端的网站再更新手机端登录成功的cookie # 登录成功后重定向到手机端的网站再更新手机端登录成功的cookie
utils.logger.info("[WeiboCrawler.start] redirect weibo mobile homepage and update cookies on mobile platform") utils.logger.info(
"[WeiboCrawler.start] redirect weibo mobile homepage and update cookies on mobile platform"
)
await self.context_page.goto(self.mobile_index_url) await self.context_page.goto(self.mobile_index_url)
await asyncio.sleep(2) await asyncio.sleep(2)
await self.wb_client.update_cookies(browser_context=self.browser_context) await self.wb_client.update_cookies(
browser_context=self.browser_context
)
crawler_type_var.set(config.CRAWLER_TYPE) crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search": if config.CRAWLER_TYPE == "search":
@@ -135,23 +147,29 @@ class WeiboCrawler(AbstractCrawler):
elif config.WEIBO_SEARCH_TYPE == "video": elif config.WEIBO_SEARCH_TYPE == "video":
search_type = SearchType.VIDEO search_type = SearchType.VIDEO
else: else:
utils.logger.error(f"[WeiboCrawler.search] Invalid WEIBO_SEARCH_TYPE: {config.WEIBO_SEARCH_TYPE}") utils.logger.error(
f"[WeiboCrawler.search] Invalid WEIBO_SEARCH_TYPE: {config.WEIBO_SEARCH_TYPE}"
)
return return
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info(f"[WeiboCrawler.search] Current search keyword: {keyword}") utils.logger.info(
f"[WeiboCrawler.search] Current search keyword: {keyword}"
)
page = 1 page = 1
while (page - start_page + 1) * weibo_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: while (
page - start_page + 1
) * weibo_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info(f"[WeiboCrawler.search] Skip page: {page}") utils.logger.info(f"[WeiboCrawler.search] Skip page: {page}")
page += 1 page += 1
continue continue
utils.logger.info(f"[WeiboCrawler.search] search weibo keyword: {keyword}, page: {page}") utils.logger.info(
f"[WeiboCrawler.search] search weibo keyword: {keyword}, page: {page}"
)
search_res = await self.wb_client.get_note_by_keyword( search_res = await self.wb_client.get_note_by_keyword(
keyword=keyword, keyword=keyword, page=page, search_type=search_type
page=page,
search_type=search_type
) )
note_id_list: List[str] = [] note_id_list: List[str] = []
note_list = filter_search_result_card(search_res.get("cards")) note_list = filter_search_result_card(search_res.get("cards"))
@@ -173,8 +191,8 @@ class WeiboCrawler(AbstractCrawler):
""" """
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [
self.get_note_info_task(note_id=note_id, semaphore=semaphore) for note_id in self.get_note_info_task(note_id=note_id, semaphore=semaphore)
config.WEIBO_SPECIFIED_ID_LIST for note_id in config.WEIBO_SPECIFIED_ID_LIST
] ]
video_details = await asyncio.gather(*task_list) video_details = await asyncio.gather(*task_list)
for note_item in video_details: for note_item in video_details:
@@ -182,7 +200,9 @@ class WeiboCrawler(AbstractCrawler):
await weibo_store.update_weibo_note(note_item) await weibo_store.update_weibo_note(note_item)
await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST) await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST)
async def get_note_info_task(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]: async def get_note_info_task(
self, note_id: str, semaphore: asyncio.Semaphore
) -> Optional[Dict]:
""" """
Get note detail task Get note detail task
:param note_id: :param note_id:
@@ -194,11 +214,14 @@ class WeiboCrawler(AbstractCrawler):
result = await self.wb_client.get_note_info_by_id(note_id) result = await self.wb_client.get_note_info_by_id(note_id)
return result return result
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error(f"[WeiboCrawler.get_note_info_task] Get note detail error: {ex}") utils.logger.error(
f"[WeiboCrawler.get_note_info_task] Get note detail error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(
f"[WeiboCrawler.get_note_info_task] have not fund note detail note_id:{note_id}, err: {ex}") f"[WeiboCrawler.get_note_info_task] have not fund note detail note_id:{note_id}, err: {ex}"
)
return None return None
async def batch_get_notes_comments(self, note_id_list: List[str]): async def batch_get_notes_comments(self, note_id_list: List[str]):
@@ -208,14 +231,20 @@ class WeiboCrawler(AbstractCrawler):
:return: :return:
""" """
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info(f"[WeiboCrawler.batch_get_note_comments] Crawling comment mode is not enabled") utils.logger.info(
f"[WeiboCrawler.batch_get_note_comments] Crawling comment mode is not enabled"
)
return return
utils.logger.info(f"[WeiboCrawler.batch_get_notes_comments] note ids:{note_id_list}") utils.logger.info(
f"[WeiboCrawler.batch_get_notes_comments] note ids:{note_id_list}"
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
for note_id in note_id_list: for note_id in note_id_list:
task = asyncio.create_task(self.get_note_comments(note_id, semaphore), name=note_id) task = asyncio.create_task(
self.get_note_comments(note_id, semaphore), name=note_id
)
task_list.append(task) task_list.append(task)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
@@ -228,17 +257,25 @@ class WeiboCrawler(AbstractCrawler):
""" """
async with semaphore: async with semaphore:
try: try:
utils.logger.info(f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ...") utils.logger.info(
f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ..."
)
await self.wb_client.get_note_all_comments( await self.wb_client.get_note_all_comments(
note_id=note_id, note_id=note_id,
crawl_interval=random.randint(1,3), # 微博对API的限流比较严重所以延时提高一些 crawl_interval=random.randint(
1, 3
), # 微博对API的限流比较严重所以延时提高一些
callback=weibo_store.batch_update_weibo_note_comments, callback=weibo_store.batch_update_weibo_note_comments,
max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,
) )
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error(f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}") utils.logger.error(
f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error(f"[WeiboCrawler.get_note_comments] may be been blocked, err:{e}") utils.logger.error(
f"[WeiboCrawler.get_note_comments] may be been blocked, err:{e}"
)
async def get_note_images(self, mblog: Dict): async def get_note_images(self, mblog: Dict):
""" """
@@ -247,7 +284,9 @@ class WeiboCrawler(AbstractCrawler):
:return: :return:
""" """
if not config.ENABLE_GET_IMAGES: if not config.ENABLE_GET_IMAGES:
utils.logger.info(f"[WeiboCrawler.get_note_images] Crawling image mode is not enabled") utils.logger.info(
f"[WeiboCrawler.get_note_images] Crawling image mode is not enabled"
)
return return
pics: Dict = mblog.get("pics") pics: Dict = mblog.get("pics")
@@ -260,8 +299,9 @@ class WeiboCrawler(AbstractCrawler):
content = await self.wb_client.get_note_image(url) content = await self.wb_client.get_note_image(url)
if content != None: if content != None:
extension_file_name = url.split(".")[-1] extension_file_name = url.split(".")[-1]
await weibo_store.update_weibo_note_image(pic["pid"], content, extension_file_name) await weibo_store.update_weibo_note_image(
pic["pid"], content, extension_file_name
)
async def get_creators_and_notes(self) -> None: async def get_creators_and_notes(self) -> None:
""" """
@@ -269,12 +309,18 @@ class WeiboCrawler(AbstractCrawler):
Returns: Returns:
""" """
utils.logger.info("[WeiboCrawler.get_creators_and_notes] Begin get weibo creators") utils.logger.info(
"[WeiboCrawler.get_creators_and_notes] Begin get weibo creators"
)
for user_id in config.WEIBO_CREATOR_ID_LIST: for user_id in config.WEIBO_CREATOR_ID_LIST:
createor_info_res: Dict = await self.wb_client.get_creator_info_by_id(creator_id=user_id) createor_info_res: Dict = await self.wb_client.get_creator_info_by_id(
creator_id=user_id
)
if createor_info_res: if createor_info_res:
createor_info: Dict = createor_info_res.get("userInfo", {}) createor_info: Dict = createor_info_res.get("userInfo", {})
utils.logger.info(f"[WeiboCrawler.get_creators_and_notes] creator info: {createor_info}") utils.logger.info(
f"[WeiboCrawler.get_creators_and_notes] creator info: {createor_info}"
)
if not createor_info: if not createor_info:
raise DataFetchError("Get creator info error") raise DataFetchError("Get creator info error")
await weibo_store.save_creator(user_id, user_info=createor_info) await weibo_store.save_creator(user_id, user_info=createor_info)
@@ -284,23 +330,29 @@ class WeiboCrawler(AbstractCrawler):
creator_id=user_id, creator_id=user_id,
container_id=createor_info_res.get("lfid_container_id"), container_id=createor_info_res.get("lfid_container_id"),
crawl_interval=0, crawl_interval=0,
callback=weibo_store.batch_update_weibo_notes callback=weibo_store.batch_update_weibo_notes,
) )
note_ids = [note_item.get("mblog", {}).get("id") for note_item in all_notes_list if note_ids = [
note_item.get("mblog", {}).get("id")] note_item.get("mblog", {}).get("id")
for note_item in all_notes_list
if note_item.get("mblog", {}).get("id")
]
await self.batch_get_notes_comments(note_ids) await self.batch_get_notes_comments(note_ids)
else: else:
utils.logger.error( utils.logger.error(
f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_id:{user_id}") f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_id:{user_id}"
)
async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient: async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient:
"""Create xhs client""" """Create xhs client"""
utils.logger.info("[WeiboCrawler.create_weibo_client] Begin create weibo API client ...") utils.logger.info(
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) "[WeiboCrawler.create_weibo_client] Begin create weibo API client ..."
)
cookie_str, cookie_dict = utils.convert_cookies(
await self.browser_context.cookies()
)
weibo_client_obj = WeiboClient( weibo_client_obj = WeiboClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
@@ -308,7 +360,7 @@ class WeiboCrawler(AbstractCrawler):
"Cookie": cookie_str, "Cookie": cookie_str,
"Origin": "https://m.weibo.cn", "Origin": "https://m.weibo.cn",
"Referer": "https://m.weibo.cn", "Referer": "https://m.weibo.cn",
"Content-Type": "application/json;charset=UTF-8" "Content-Type": "application/json;charset=UTF-8",
}, },
playwright_page=self.context_page, playwright_page=self.context_page,
cookie_dict=cookie_dict, cookie_dict=cookie_dict,
@@ -316,7 +368,9 @@ class WeiboCrawler(AbstractCrawler):
return weibo_client_obj return weibo_client_obj
@staticmethod @staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: def format_proxy_info(
ip_proxy_info: IpInfoModel,
) -> Tuple[Optional[Dict], Optional[Dict]]:
"""format proxy info for playwright and httpx""" """format proxy info for playwright and httpx"""
playwright_proxy = { playwright_proxy = {
"server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}", "server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}",
@@ -333,32 +387,39 @@ class WeiboCrawler(AbstractCrawler):
chromium: BrowserType, chromium: BrowserType,
playwright_proxy: Optional[Dict], playwright_proxy: Optional[Dict],
user_agent: Optional[str], user_agent: Optional[str],
headless: bool = True headless: bool = True,
) -> BrowserContext: ) -> BrowserContext:
"""Launch browser and create browser context""" """Launch browser and create browser context"""
utils.logger.info("[WeiboCrawler.launch_browser] Begin create browser context ...") utils.logger.info(
"[WeiboCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
user_data_dir = os.path.join(os.getcwd(), "browser_data", user_data_dir = os.path.join(
config.USER_DATA_DIR % config.PLATFORM) # type: ignore os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080},
user_agent=user_agent user_agent=user_agent,
) )
return browser_context return browser_context
else: else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context( browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080}, user_agent=user_agent
user_agent=user_agent
) )
return browser_context return browser_context
async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], async def launch_browser_with_cdp(
user_agent: Optional[str], headless: bool = True) -> BrowserContext: self,
playwright: Playwright,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
""" """
使用CDP模式启动浏览器 使用CDP模式启动浏览器
""" """
@@ -368,7 +429,7 @@ class WeiboCrawler(AbstractCrawler):
playwright=playwright, playwright=playwright,
playwright_proxy=playwright_proxy, playwright_proxy=playwright_proxy,
user_agent=user_agent, user_agent=user_agent,
headless=headless headless=headless,
) )
# 显示浏览器信息 # 显示浏览器信息
@@ -381,7 +442,9 @@ class WeiboCrawler(AbstractCrawler):
utils.logger.error(f"[WeiboCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(f"[WeiboCrawler] CDP模式启动失败回退到标准模式: {e}")
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser(chromium, playwright_proxy, user_agent, headless) return await self.launch_browser(
chromium, playwright_proxy, user_agent, headless
)
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""

View File

@@ -16,10 +16,16 @@ import time
from asyncio import Task from asyncio import Task
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from playwright.async_api import BrowserContext, BrowserType, Page, Playwright, async_playwright from playwright.async_api import (
BrowserContext,
BrowserType,
Page,
Playwright,
async_playwright,
)
from tenacity import RetryError from tenacity import RetryError
from . import config import config
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from config import CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES from config import CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES
from model.m_xiaohongshu import NoteUrlInfo from model.m_xiaohongshu import NoteUrlInfo
@@ -45,7 +51,11 @@ class XiaoHongShuCrawler(AbstractCrawler):
def __init__(self) -> None: def __init__(self) -> None:
self.index_url = "https://www.xiaohongshu.com" self.index_url = "https://www.xiaohongshu.com"
# self.user_agent = utils.get_user_agent() # self.user_agent = utils.get_user_agent()
self.user_agent = config.UA if config.UA else "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" self.user_agent = (
config.UA
if config.UA
else "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
)
self.cdp_manager = None self.cdp_manager = None
async def start(self) -> None: async def start(self) -> None:
@@ -64,15 +74,20 @@ class XiaoHongShuCrawler(AbstractCrawler):
if config.ENABLE_CDP_MODE: if config.ENABLE_CDP_MODE:
utils.logger.info("[XiaoHongShuCrawler] 使用CDP模式启动浏览器") utils.logger.info("[XiaoHongShuCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp( self.browser_context = await self.launch_browser_with_cdp(
playwright, playwright_proxy_format, self.user_agent, playwright,
headless=config.CDP_HEADLESS playwright_proxy_format,
self.user_agent,
headless=config.CDP_HEADLESS,
) )
else: else:
utils.logger.info("[XiaoHongShuCrawler] 使用标准模式启动浏览器") utils.logger.info("[XiaoHongShuCrawler] 使用标准模式启动浏览器")
# Launch a browser context. # Launch a browser context.
chromium = playwright.chromium chromium = playwright.chromium
self.browser_context = await self.launch_browser( self.browser_context = await self.launch_browser(
chromium, playwright_proxy_format, self.user_agent, headless=config.HEADLESS chromium,
playwright_proxy_format,
self.user_agent,
headless=config.HEADLESS,
) )
# stealth.min.js is a js script to prevent the website from detecting the crawler. # stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js") await self.browser_context.add_init_script(path="libs/stealth.min.js")
@@ -304,7 +319,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
else: else:
crawl_interval = random.uniform(1, config.CRAWLER_MAX_SLEEP_SEC) crawl_interval = random.uniform(1, config.CRAWLER_MAX_SLEEP_SEC)
try: try:
utils.logger.info(f"[get_note_detail_async_task] Begin get note detail, note_id: {note_id}") utils.logger.info(
f"[get_note_detail_async_task] Begin get note detail, note_id: {note_id}"
)
# 尝试直接获取网页版笔记详情携带cookie # 尝试直接获取网页版笔记详情携带cookie
note_detail_from_html: Optional[Dict] = ( note_detail_from_html: Optional[Dict] = (
await self.xhs_client.get_note_by_id_from_html( await self.xhs_client.get_note_by_id_from_html(
@@ -462,8 +479,13 @@ class XiaoHongShuCrawler(AbstractCrawler):
) )
return browser_context return browser_context
async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], async def launch_browser_with_cdp(
user_agent: Optional[str], headless: bool = True) -> BrowserContext: self,
playwright: Playwright,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
""" """
使用CDP模式启动浏览器 使用CDP模式启动浏览器
""" """
@@ -473,7 +495,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
playwright=playwright, playwright=playwright,
playwright_proxy=playwright_proxy, playwright_proxy=playwright_proxy,
user_agent=user_agent, user_agent=user_agent,
headless=headless headless=headless,
) )
# 显示浏览器信息 # 显示浏览器信息
@@ -483,10 +505,14 @@ class XiaoHongShuCrawler(AbstractCrawler):
return browser_context return browser_context
except Exception as e: except Exception as e:
utils.logger.error(f"[XiaoHongShuCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(
f"[XiaoHongShuCrawler] CDP模式启动失败回退到标准模式: {e}"
)
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser(chromium, playwright_proxy, user_agent, headless) return await self.launch_browser(
chromium, playwright_proxy, user_agent, headless
)
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""

View File

@@ -16,10 +16,15 @@ import random
from asyncio import Task from asyncio import Task
from typing import Dict, List, Optional, Tuple, cast from typing import Dict, List, Optional, Tuple, cast
from playwright.async_api import (BrowserContext, BrowserType, Page, Playwright, from playwright.async_api import (
async_playwright) BrowserContext,
BrowserType,
Page,
Playwright,
async_playwright,
)
from . import config import config
from constant import zhihu as constant from constant import zhihu as constant
from base.base_crawler import AbstractCrawler from base.base_crawler import AbstractCrawler
from model.m_zhihu import ZhihuContent, ZhihuCreator from model.m_zhihu import ZhihuContent, ZhihuCreator
@@ -56,27 +61,30 @@ class ZhihuCrawler(AbstractCrawler):
""" """
playwright_proxy_format, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) ip_proxy_pool = await create_ip_pool(
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info) playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(
ip_proxy_info
)
async with async_playwright() as playwright: async with async_playwright() as playwright:
# 根据配置选择启动模式 # 根据配置选择启动模式
if config.ENABLE_CDP_MODE: if config.ENABLE_CDP_MODE:
utils.logger.info("[ZhihuCrawler] 使用CDP模式启动浏览器") utils.logger.info("[ZhihuCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp( self.browser_context = await self.launch_browser_with_cdp(
playwright, playwright_proxy_format, self.user_agent, playwright,
headless=config.CDP_HEADLESS playwright_proxy_format,
self.user_agent,
headless=config.CDP_HEADLESS,
) )
else: else:
utils.logger.info("[ZhihuCrawler] 使用标准模式启动浏览器") utils.logger.info("[ZhihuCrawler] 使用标准模式启动浏览器")
# Launch a browser context. # Launch a browser context.
chromium = playwright.chromium chromium = playwright.chromium
self.browser_context = await self.launch_browser( self.browser_context = await self.launch_browser(
chromium, chromium, None, self.user_agent, headless=config.HEADLESS
None,
self.user_agent,
headless=config.HEADLESS
) )
# stealth.min.js is a js script to prevent the website from detecting the crawler. # stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js") await self.browser_context.add_init_script(path="libs/stealth.min.js")
@@ -92,14 +100,20 @@ class ZhihuCrawler(AbstractCrawler):
login_phone="", # input your phone number login_phone="", # input your phone number
browser_context=self.browser_context, browser_context=self.browser_context,
context_page=self.context_page, context_page=self.context_page,
cookie_str=config.COOKIES cookie_str=config.COOKIES,
) )
await login_obj.begin() await login_obj.begin()
await self.zhihu_client.update_cookies(browser_context=self.browser_context) await self.zhihu_client.update_cookies(
browser_context=self.browser_context
)
# 知乎的搜索接口需要打开搜索页面之后cookies才能访问API单独的首页不行 # 知乎的搜索接口需要打开搜索页面之后cookies才能访问API单独的首页不行
utils.logger.info("[ZhihuCrawler.start] Zhihu跳转到搜索页面获取搜索页面的Cookies该过程需要5秒左右") utils.logger.info(
await self.context_page.goto(f"{self.index_url}/search?q=python&search_source=Guess&utm_content=search_hot&type=content") "[ZhihuCrawler.start] Zhihu跳转到搜索页面获取搜索页面的Cookies该过程需要5秒左右"
)
await self.context_page.goto(
f"{self.index_url}/search?q=python&search_source=Guess&utm_content=search_hot&type=content"
)
await asyncio.sleep(5) await asyncio.sleep(5)
await self.zhihu_client.update_cookies(browser_context=self.browser_context) await self.zhihu_client.update_cookies(browser_context=self.browser_context)
@@ -127,21 +141,31 @@ class ZhihuCrawler(AbstractCrawler):
start_page = config.START_PAGE start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info(f"[ZhihuCrawler.search] Current search keyword: {keyword}") utils.logger.info(
f"[ZhihuCrawler.search] Current search keyword: {keyword}"
)
page = 1 page = 1
while (page - start_page + 1) * zhihu_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: while (
page - start_page + 1
) * zhihu_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info(f"[ZhihuCrawler.search] Skip page {page}") utils.logger.info(f"[ZhihuCrawler.search] Skip page {page}")
page += 1 page += 1
continue continue
try: try:
utils.logger.info(f"[ZhihuCrawler.search] search zhihu keyword: {keyword}, page: {page}") utils.logger.info(
content_list: List[ZhihuContent] = await self.zhihu_client.get_note_by_keyword( f"[ZhihuCrawler.search] search zhihu keyword: {keyword}, page: {page}"
)
content_list: List[ZhihuContent] = (
await self.zhihu_client.get_note_by_keyword(
keyword=keyword, keyword=keyword,
page=page, page=page,
) )
utils.logger.info(f"[ZhihuCrawler.search] Search contents :{content_list}") )
utils.logger.info(
f"[ZhihuCrawler.search] Search contents :{content_list}"
)
if not content_list: if not content_list:
utils.logger.info("No more content!") utils.logger.info("No more content!")
break break
@@ -165,17 +189,23 @@ class ZhihuCrawler(AbstractCrawler):
""" """
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info(f"[ZhihuCrawler.batch_get_content_comments] Crawling comment mode is not enabled") utils.logger.info(
f"[ZhihuCrawler.batch_get_content_comments] Crawling comment mode is not enabled"
)
return return
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
for content_item in content_list: for content_item in content_list:
task = asyncio.create_task(self.get_comments(content_item, semaphore), name=content_item.content_id) task = asyncio.create_task(
self.get_comments(content_item, semaphore), name=content_item.content_id
)
task_list.append(task) task_list.append(task)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
async def get_comments(self, content_item: ZhihuContent, semaphore: asyncio.Semaphore): async def get_comments(
self, content_item: ZhihuContent, semaphore: asyncio.Semaphore
):
""" """
Get note comments with keyword filtering and quantity limitation Get note comments with keyword filtering and quantity limitation
Args: Args:
@@ -186,11 +216,13 @@ class ZhihuCrawler(AbstractCrawler):
""" """
async with semaphore: async with semaphore:
utils.logger.info(f"[ZhihuCrawler.get_comments] Begin get note id comments {content_item.content_id}") utils.logger.info(
f"[ZhihuCrawler.get_comments] Begin get note id comments {content_item.content_id}"
)
await self.zhihu_client.get_note_all_comments( await self.zhihu_client.get_note_all_comments(
content=content_item, content=content_item,
crawl_interval=random.random(), crawl_interval=random.random(),
callback=zhihu_store.batch_update_zhihu_note_comments callback=zhihu_store.batch_update_zhihu_note_comments,
) )
async def get_creators_and_notes(self) -> None: async def get_creators_and_notes(self) -> None:
@@ -199,17 +231,27 @@ class ZhihuCrawler(AbstractCrawler):
Returns: Returns:
""" """
utils.logger.info("[ZhihuCrawler.get_creators_and_notes] Begin get xiaohongshu creators") utils.logger.info(
"[ZhihuCrawler.get_creators_and_notes] Begin get xiaohongshu creators"
)
for user_link in config.ZHIHU_CREATOR_URL_LIST: for user_link in config.ZHIHU_CREATOR_URL_LIST:
utils.logger.info(f"[ZhihuCrawler.get_creators_and_notes] Begin get creator {user_link}") utils.logger.info(
f"[ZhihuCrawler.get_creators_and_notes] Begin get creator {user_link}"
)
user_url_token = user_link.split("/")[-1] user_url_token = user_link.split("/")[-1]
# get creator detail info from web html content # get creator detail info from web html content
createor_info: ZhihuCreator = await self.zhihu_client.get_creator_info(url_token=user_url_token) createor_info: ZhihuCreator = await self.zhihu_client.get_creator_info(
url_token=user_url_token
)
if not createor_info: if not createor_info:
utils.logger.info(f"[ZhihuCrawler.get_creators_and_notes] Creator {user_url_token} not found") utils.logger.info(
f"[ZhihuCrawler.get_creators_and_notes] Creator {user_url_token} not found"
)
continue continue
utils.logger.info(f"[ZhihuCrawler.get_creators_and_notes] Creator info: {createor_info}") utils.logger.info(
f"[ZhihuCrawler.get_creators_and_notes] Creator info: {createor_info}"
)
await zhihu_store.save_creator(creator=createor_info) await zhihu_store.save_creator(creator=createor_info)
# 默认只提取回答信息,如果需要文章和视频,把下面的注释打开即可 # 默认只提取回答信息,如果需要文章和视频,把下面的注释打开即可
@@ -218,10 +260,9 @@ class ZhihuCrawler(AbstractCrawler):
all_content_list = await self.zhihu_client.get_all_anwser_by_creator( all_content_list = await self.zhihu_client.get_all_anwser_by_creator(
creator=createor_info, creator=createor_info,
crawl_interval=random.random(), crawl_interval=random.random(),
callback=zhihu_store.batch_update_zhihu_contents callback=zhihu_store.batch_update_zhihu_contents,
) )
# Get all articles of the creator's contents # Get all articles of the creator's contents
# all_content_list = await self.zhihu_client.get_all_articles_by_creator( # all_content_list = await self.zhihu_client.get_all_articles_by_creator(
# creator=createor_info, # creator=createor_info,
@@ -311,7 +352,9 @@ class ZhihuCrawler(AbstractCrawler):
await self.batch_get_content_comments(need_get_comment_notes) await self.batch_get_content_comments(need_get_comment_notes)
@staticmethod @staticmethod
def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: def format_proxy_info(
ip_proxy_info: IpInfoModel,
) -> Tuple[Optional[Dict], Optional[Dict]]:
"""format proxy info for playwright and httpx""" """format proxy info for playwright and httpx"""
playwright_proxy = { playwright_proxy = {
"server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}", "server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}",
@@ -325,21 +368,25 @@ class ZhihuCrawler(AbstractCrawler):
async def create_zhihu_client(self, httpx_proxy: Optional[str]) -> ZhiHuClient: async def create_zhihu_client(self, httpx_proxy: Optional[str]) -> ZhiHuClient:
"""Create zhihu client""" """Create zhihu client"""
utils.logger.info("[ZhihuCrawler.create_zhihu_client] Begin create zhihu API client ...") utils.logger.info(
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) "[ZhihuCrawler.create_zhihu_client] Begin create zhihu API client ..."
)
cookie_str, cookie_dict = utils.convert_cookies(
await self.browser_context.cookies()
)
zhihu_client_obj = ZhiHuClient( zhihu_client_obj = ZhiHuClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
'accept': '*/*', "accept": "*/*",
'accept-language': 'zh-CN,zh;q=0.9', "accept-language": "zh-CN,zh;q=0.9",
'cookie': cookie_str, "cookie": cookie_str,
'priority': 'u=1, i', "priority": "u=1, i",
'referer': 'https://www.zhihu.com/search?q=python&time_interval=a_year&type=content', "referer": "https://www.zhihu.com/search?q=python&time_interval=a_year&type=content",
'user-agent': self.user_agent, "user-agent": self.user_agent,
'x-api-version': '3.0.91', "x-api-version": "3.0.91",
'x-app-za': 'OS=Web', "x-app-za": "OS=Web",
'x-requested-with': 'fetch', "x-requested-with": "fetch",
'x-zse-93': '101_3_3.0', "x-zse-93": "101_3_3.0",
}, },
playwright_page=self.context_page, playwright_page=self.context_page,
cookie_dict=cookie_dict, cookie_dict=cookie_dict,
@@ -351,34 +398,41 @@ class ZhihuCrawler(AbstractCrawler):
chromium: BrowserType, chromium: BrowserType,
playwright_proxy: Optional[Dict], playwright_proxy: Optional[Dict],
user_agent: Optional[str], user_agent: Optional[str],
headless: bool = True headless: bool = True,
) -> BrowserContext: ) -> BrowserContext:
"""Launch browser and create browser context""" """Launch browser and create browser context"""
utils.logger.info("[ZhihuCrawler.launch_browser] Begin create browser context ...") utils.logger.info(
"[ZhihuCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
# feat issue #14 # feat issue #14
# we will save login state to avoid login every time # we will save login state to avoid login every time
user_data_dir = os.path.join(os.getcwd(), "browser_data", user_data_dir = os.path.join(
config.USER_DATA_DIR % config.PLATFORM) # type: ignore os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080},
user_agent=user_agent user_agent=user_agent,
) )
return browser_context return browser_context
else: else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context( browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, viewport={"width": 1920, "height": 1080}, user_agent=user_agent
user_agent=user_agent
) )
return browser_context return browser_context
async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], async def launch_browser_with_cdp(
user_agent: Optional[str], headless: bool = True) -> BrowserContext: self,
playwright: Playwright,
playwright_proxy: Optional[Dict],
user_agent: Optional[str],
headless: bool = True,
) -> BrowserContext:
""" """
使用CDP模式启动浏览器 使用CDP模式启动浏览器
""" """
@@ -388,7 +442,7 @@ class ZhihuCrawler(AbstractCrawler):
playwright=playwright, playwright=playwright,
playwright_proxy=playwright_proxy, playwright_proxy=playwright_proxy,
user_agent=user_agent, user_agent=user_agent,
headless=headless headless=headless,
) )
# 显示浏览器信息 # 显示浏览器信息
@@ -401,7 +455,9 @@ class ZhihuCrawler(AbstractCrawler):
utils.logger.error(f"[ZhihuCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(f"[ZhihuCrawler] CDP模式启动失败回退到标准模式: {e}")
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser(chromium, playwright_proxy, user_agent, headless) return await self.launch_browser(
chromium, playwright_proxy, user_agent, headless
)
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""

View File

@@ -291,16 +291,16 @@ class CDPBrowserManager:
""" """
try: try:
# 关闭浏览器上下文 # 关闭浏览器上下文
if self.browser_context: # if self.browser_context:
await self.browser_context.close() # await self.browser_context.close()
self.browser_context = None # self.browser_context = None
utils.logger.info("[CDPBrowserManager] 浏览器上下文已关闭") # utils.logger.info("[CDPBrowserManager] 浏览器上下文已关闭")
# 断开浏览器连接 # # 断开浏览器连接
if self.browser: # if self.browser:
await self.browser.close() # await self.browser.close()
self.browser = None # self.browser = None
utils.logger.info("[CDPBrowserManager] 浏览器连接已断开") # utils.logger.info("[CDPBrowserManager] 浏览器连接已断开")
# 关闭浏览器进程(如果配置为自动关闭) # 关闭浏览器进程(如果配置为自动关闭)
if config.AUTO_CLOSE_BROWSER: if config.AUTO_CLOSE_BROWSER: