From a356358c211c267d37ce1565fdd8ed82b4f6efa1 Mon Sep 17 00:00:00 2001 From: Bowenwin Date: Mon, 19 May 2025 19:57:36 +0800 Subject: [PATCH 1/4] get_fans_and_get_followings --- config/base_config.py | 18 +++- media_platform/bilibili/client.py | 125 +++++++++++++++++++++++++- media_platform/bilibili/core.py | 96 +++++++++++++++++++- store/bilibili/__init__.py | 63 ++++++++++--- store/bilibili/bilibili_store_impl.py | 28 +++++- 5 files changed, 308 insertions(+), 22 deletions(-) diff --git a/config/base_config.py b/config/base_config.py index 2f94975..75154cd 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -10,16 +10,16 @@ # 基础配置 -PLATFORM = "xhs" +PLATFORM = "bili" KEYWORDS = "编程副业,编程兼职" # 关键词搜索配置,以英文逗号分隔 -LOGIN_TYPE = "qrcode" # qrcode or phone or cookie +LOGIN_TYPE = "phone" # qrcode or phone or cookie COOKIES = "" # 具体值参见media_platform.xxx.field下的枚举值,暂时只支持小红书 SORT_TYPE = "popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值,暂时只支持抖音 PUBLISH_TIME_TYPE = 0 CRAWLER_TYPE = ( - "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) + "creator" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) ) # 自定义User Agent(暂时仅对XHS有效) UA = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0' @@ -54,6 +54,9 @@ USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name # 爬取开始页数 默认从第一页开始 START_PAGE = 1 +# 爬取粉丝列表开始页数 默认从第一页开始 +START_CONTACTS_PAGE = 1 + # 爬取视频/帖子的数量控制 CRAWLER_MAX_NOTES_COUNT = 200 @@ -69,6 +72,9 @@ ENABLE_GET_COMMENTS = True # 爬取一级评论的数量控制(单视频/帖子) CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 10 +# 爬取作者粉丝和关注列表数量控制(单作者) +CRAWLER_MAX_FANS_COUNT_SINGLENOTES = 100 + # 是否开启爬二级评论模式, 默认不开启爬二级评论 # 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段 ENABLE_GET_SUB_COMMENTS = False @@ -144,7 +150,11 @@ DY_CREATOR_ID_LIST = [ # 指定bili创作者ID列表(sec_id) BILI_CREATOR_ID_LIST = [ - "20813884", + # "20813884", + "520819684", + "472747194", + "519872016", + "372201438", # ........................ ] diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index 37c087b..7ec6b35 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -21,6 +21,7 @@ from urllib.parse import urlencode import httpx from playwright.async_api import BrowserContext, Page +import config from base.base_crawler import AbstractApiClient from tools import utils @@ -223,7 +224,7 @@ class BilibiliClient(AbstractApiClient): async def get_video_all_comments(self, video_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False, callback: Optional[Callable] = None, - max_count: int = 10,): + max_count: int = 10, ): """ get video all comments include sub comments :param video_id: @@ -250,7 +251,7 @@ class BilibiliClient(AbstractApiClient): if (comment.get("rcount", 0) > 0): { await self.get_video_all_level_two_comments( - video_id, comment_id, CommentOrderType.DEFAULT, 10, crawl_interval, callback) + video_id, comment_id, CommentOrderType.DEFAULT, 10, crawl_interval, callback) } if len(result) + len(comment_list) > max_count: comment_list = comment_list[:max_count - len(result)] @@ -320,7 +321,8 @@ class BilibiliClient(AbstractApiClient): result = await self.get(uri, post_data) return result - async def get_creator_videos(self, creator_id: str, pn: int, ps: int = 30, order_mode: SearchOrderType = SearchOrderType.LAST_PUBLISH) -> Dict: + async def get_creator_videos(self, creator_id: str, pn: int, ps: int = 30, + order_mode: SearchOrderType = SearchOrderType.LAST_PUBLISH) -> Dict: """get all videos for a creator :param creator_id: 创作者 ID :param pn: 页数 @@ -337,3 +339,120 @@ class BilibiliClient(AbstractApiClient): "order": order_mode, } return await self.get(uri, post_data) + + async def get_creator_info(self, creator_id: int) -> Dict: + """get creator info + :param creator_id: 作者 ID + """ + uri = "/x/space/wbi/acc/info" + post_data = { + "mid": creator_id, + } + return await self.get(uri, post_data) + + async def get_creator_fans(self, + creator_id: int, + pn: int, + ps: int = 24, + ) -> Dict: + """get video comments + :param creator_id: 创作者 ID + :param pn: 开始页数 + :param ps: 每页数量 + :return: + """ + uri = "/x/relation/fans" + post_data = { + 'vmid': creator_id, + "pn": pn, + "ps": ps, + "gaia_source": "main_web", + + } + return await self.get(uri, post_data) + + async def get_creator_followings(self, + creator_id: int, + pn: int, + ps: int = 24, + ) -> Dict: + """get video comments + :param creator_id: 创作者 ID + :param pn: 开始页数 + :param ps: 每页数量 + :return: + """ + uri = "/x/relation/followings" + post_data = { + "vmid": creator_id, + "pn": pn, + "ps": ps, + "gaia_source": "main_web", + } + return await self.get(uri, post_data) + + async def get_creator_all_fans(self, creator_info: Dict, crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_count: int = 100) -> List: + """ + get video all comments include sub comments + :param creator_info: + :param crawl_interval: + :param callback: + :param max_count: 一个up主爬取的最大粉丝数量 + + :return: up主粉丝数列表 + """ + creator_id = creator_info["id"] + result = [] + pn = config.START_CONTACTS_PAGE + while len(result) < max_count: + fans_res: Dict = await self.get_creator_fans(creator_id, pn=pn) + fans_list: List[Dict] = fans_res.get("list", []) + + pn += 1 + if len(result) + len(fans_list) > max_count: + fans_list = fans_list[:max_count - len(result)] + if callback: # 如果有回调函数,就执行回调函数 + await callback(creator_info, fans_list) + await asyncio.sleep(crawl_interval) + if not fans_list: + break + result.extend(fans_list) + utils.logger.info( + f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans successfully") + + return result + + async def get_creator_all_followings(self, creator_info: Dict, crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_count: int = 100) -> List: + """ + get video all comments include sub comments + :param creator_info: + :param crawl_interval: + :param callback: + :param max_count: 一个up主爬取的最大关注者数量 + + :return: up主关注者列表 + """ + creator_id = creator_info["id"] + result = [] + pn = config.START_CONTACTS_PAGE + while len(result) < max_count: + followings_res: Dict = await self.get_creator_followings(creator_id, pn=pn) + followings_list: List[Dict] = followings_res.get("list", []) + + pn += 1 + if len(result) + len(followings_list) > max_count: + followings_list = followings_list[:max_count - len(result)] + if callback: # 如果有回调函数,就执行回调函数 + await callback(creator_info, followings_list) + await asyncio.sleep(crawl_interval) + if not followings_list: + break + result.extend(followings_list) + utils.logger.info( + f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings successfully") + + return result diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index 5c7949a..eb6c014 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -89,8 +89,9 @@ class BilibiliCrawler(AbstractCrawler): # Get the information and comments of the specified post await self.get_specified_videos(config.BILI_SPECIFIED_ID_LIST) elif config.CRAWLER_TYPE == "creator": - for creator_id in config.BILI_CREATOR_ID_LIST: - await self.get_creator_videos(int(creator_id)) + # for creator_id in config.BILI_CREATOR_ID_LIST: + # await self.get_creator_videos(int(creator_id)) + await self.get_all_creator_details(config.BILI_CREATOR_ID_LIST) else: pass utils.logger.info( @@ -466,3 +467,94 @@ class BilibiliCrawler(AbstractCrawler): extension_file_name = f"video.mp4" await bilibili_store.store_video(aid, content, extension_file_name) + + async def get_all_creator_details(self, creator_id_list: List[int]): + """ + creator_id_list: get details for creator from creator_id_list + """ + utils.logger.info( + f"[BilibiliCrawler.get_creator_details] Crawling the detalis of creator") + utils.logger.info( + f"[BilibiliCrawler.get_creator_details] creator ids:{creator_id_list}") + + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list: List[Task] = [] + try: + for creator_id in creator_id_list: + task = asyncio.create_task(self.get_creator_details( + creator_id, semaphore), name=creator_id) + task_list.append(task) + except Exception as e: + utils.logger.warning(f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}") + + await asyncio.gather(*task_list) + + async def get_creator_details(self, creator_id: int, semaphore: asyncio.Semaphore): + """ + get details for creator id + :param creator_id: + :param semaphore: + :return: + """ + async with semaphore: + creator_unhandled_info: Dict = await self.bili_client.get_creator_info(creator_id) + creator_info: Dict = { + "id": creator_id, + "name": creator_unhandled_info.get("name"), + "sign": creator_unhandled_info.get("sign"), + "avatar": creator_unhandled_info.get("face"), + } + await self.get_fans(creator_info, semaphore) + await self.get_followings(creator_info, semaphore) + + async def get_fans(self, creator_info: Dict, semaphore: asyncio.Semaphore): + """ + get fans for creator id + :param creator_info: + :param semaphore: + :return: + """ + creator_id = creator_info["id"] + async with semaphore: + try: + utils.logger.info( + f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans ...") + await self.bili_client.get_creator_all_fans( + creator_info=creator_info, + crawl_interval=random.random(), + callback=bilibili_store.batch_update_bilibili_creator_fans, + max_count=config.CRAWLER_MAX_FANS_COUNT_SINGLENOTES, + ) + + except DataFetchError as ex: + utils.logger.error( + f"[BilibiliCrawler.get_fans] get creator_id: {creator_id} fans error: {ex}") + except Exception as e: + utils.logger.error( + f"[BilibiliCrawler.get_fans] may be been blocked, err:{e}") + + async def get_followings(self, creator_info: Dict, semaphore: asyncio.Semaphore): + """ + get followings for creator id + :param creator_info: + :param semaphore: + :return: + """ + creator_id = creator_info["id"] + async with semaphore: + try: + utils.logger.info( + f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings ...") + await self.bili_client.get_creator_all_followings( + creator_info=creator_info, + crawl_interval=random.random(), + callback=bilibili_store.batch_update_bilibili_creator_followings, + max_count=config.CRAWLER_MAX_FANS_COUNT_SINGLENOTES, + ) + + except DataFetchError as ex: + utils.logger.error( + f"[BilibiliCrawler.get_followings] get creator_id: {creator_id} followings error: {ex}") + except Exception as e: + utils.logger.error( + f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}") diff --git a/store/bilibili/__init__.py b/store/bilibili/__init__.py index dcffa88..c417dcc 100644 --- a/store/bilibili/__init__.py +++ b/store/bilibili/__init__.py @@ -71,25 +71,25 @@ async def update_bilibili_video(video_item: Dict): await BiliStoreFactory.create_store().store_content(content_item=save_content_item) -async def update_up_info(video_item: Dict): +async def update_up_info(video_item: Dict): video_item_card_list: Dict = video_item.get("Card") - video_item_card: Dict = video_item_card_list.get("card") + video_item_card: Dict = video_item_card_list.get("card") saver_up_info = { - "user_id": str(video_item_card.get("mid")), - "nickname": video_item_card.get("name"), + "user_id": str(video_item_card.get("mid")), + "nickname": video_item_card.get("name"), "sex": video_item_card.get("sex"), "sign": video_item_card.get("sign"), - "avatar": video_item_card.get("face"), - "last_modify_ts": utils.get_current_timestamp(), - "total_fans": video_item_card.get("fans"), - "total_liked": video_item_card_list.get("like_num"), - "user_rank": video_item_card.get("level_info").get("current_level"), - "is_official": video_item_card.get("official_verify").get("type"), + "avatar": video_item_card.get("face"), + "last_modify_ts": utils.get_current_timestamp(), + "total_fans": video_item_card.get("fans"), + "total_liked": video_item_card_list.get("like_num"), + "user_rank": video_item_card.get("level_info").get("current_level"), + "is_official": video_item_card.get("official_verify").get("type"), } utils.logger.info( f"[store.bilibili.update_up_info] bilibili user_id:{video_item_card.get('mid')}") await BiliStoreFactory.create_store().store_creator(creator=saver_up_info) - + async def batch_update_bilibili_video_comments(video_id: str, comments: List[Dict]): if not comments: @@ -132,3 +132,44 @@ async def store_video(aid, video_content, extension_file_name): """ await BilibiliVideo().store_video( {"aid": aid, "video_content": video_content, "extension_file_name": extension_file_name}) + + +async def batch_update_bilibili_creator_fans(creator_info: Dict, fans_list: List[Dict]): + if not fans_list: + return + for fan_item in fans_list: + fan_info: Dict = { + "id": fan_item.get("mid"), + "name": fan_item.get("uname"), + "sign": fan_item.get("sign"), + "avatar": fan_item.get("face"), + } + await update_bilibili_creator_fans(creator_info=creator_info, fan_info=fan_info) + + +async def batch_update_bilibili_creator_followings(creator_info: Dict, followings_list: List[Dict]): + if not followings_list: + return + for following_item in followings_list: + following_info: Dict = { + "id": following_item.get("mid"), + "name": following_item.get("uname"), + "sign": following_item.get("sign"), + "avatar": following_item.get("face"), + } + await update_bilibili_creator_fans(creator_info=following_info, fan_info=creator_info) + + +async def update_bilibili_creator_fans(creator_info: Dict, fan_info: Dict): + save_contact_item = { + "up_id": creator_info["id"], + "fan_id": fan_info["id"], + "up_name": creator_info["name"], + "fan_name": fan_info["name"], + "up_sign": creator_info["sign"], + "fan_sign": fan_info["sign"], + "up_avatar": creator_info["avatar"], + "fan_avatar": fan_info["avatar"] + } + + await BiliStoreFactory.create_store().store_creator_contact(contact_item=save_contact_item) diff --git a/store/bilibili/bilibili_store_impl.py b/store/bilibili/bilibili_store_impl.py index 17c410a..20d09f1 100644 --- a/store/bilibili/bilibili_store_impl.py +++ b/store/bilibili/bilibili_store_impl.py @@ -107,6 +107,18 @@ class BiliCsvStoreImplement(AbstractStore): """ await self.save_data_to_csv(save_item=creator, store_type="creators") + async def store_creator_contact(self, contact_item: Dict): + """ + Bilibili comment CSV storage implementation + Args: + contact_item: creator's contact item dict + + Returns: + + """ + + await self.save_data_to_csv(save_item=contact_item, store_type="fans") + class BiliDbStoreImplement(AbstractStore): async def store_content(self, content_item: Dict): @@ -239,7 +251,7 @@ class BiliJsonStoreImplement(AbstractStore): async def store_comment(self, comment_item: Dict): """ - comment JSON storage implementatio + comment JSON storage implementation Args: comment_item: @@ -250,7 +262,7 @@ class BiliJsonStoreImplement(AbstractStore): async def store_creator(self, creator: Dict): """ - creator JSON storage implementatio + creator JSON storage implementation Args: creator: @@ -258,3 +270,15 @@ class BiliJsonStoreImplement(AbstractStore): """ await self.save_data_to_json(creator, "creators") + + async def store_creator_contact(self, contact_item: Dict): + """ + creator contact JSON storage implementation + Args: + contact_item: creator's contact item dict + + Returns: + + """ + + await self.save_data_to_json(save_item=contact_item, store_type="fans") From 44e3d370ffaf45b815add882dae1ecd7c225256a Mon Sep 17 00:00:00 2001 From: Bowenwin Date: Thu, 22 May 2025 20:31:48 +0800 Subject: [PATCH 2/4] fix_words --- config/base_config.py | 13 +++-- config/db_config.py | 15 +++-- media_platform/bilibili/client.py | 64 +++++++++++++++++---- media_platform/bilibili/core.py | 73 +++++++++++++++++------- schema/tables.sql | 50 +++++++++++++++- store/bilibili/__init__.py | 54 ++++++++++++++++-- store/bilibili/bilibili_store_impl.py | 82 +++++++++++++++++++++++---- store/bilibili/bilibili_store_sql.py | 46 +++++++++++++++ 8 files changed, 338 insertions(+), 59 deletions(-) diff --git a/config/base_config.py b/config/base_config.py index 75154cd..34539c9 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -46,7 +46,7 @@ HEADLESS = False SAVE_LOGIN_STATE = True # 数据保存类型选项配置,支持三种类型:csv、db、json, 最好保存到DB,有排重的功能。 -SAVE_DATA_OPTION = "json" # csv or db or json +SAVE_DATA_OPTION = "csv" # csv or db or json # 用户浏览器缓存的浏览器文件配置 USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name @@ -73,7 +73,10 @@ ENABLE_GET_COMMENTS = True CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 10 # 爬取作者粉丝和关注列表数量控制(单作者) -CRAWLER_MAX_FANS_COUNT_SINGLENOTES = 100 +CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100 + +# 爬取作者动态粉丝和关注列表数量控制(单作者) +CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50 # 是否开启爬二级评论模式, 默认不开启爬二级评论 # 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段 @@ -152,9 +155,9 @@ DY_CREATOR_ID_LIST = [ BILI_CREATOR_ID_LIST = [ # "20813884", "520819684", - "472747194", - "519872016", - "372201438", + # "472747194", + # "519872016", + # "372201438", # ........................ ] diff --git a/config/db_config.py b/config/db_config.py index 51d3fd0..2b9c2d5 100644 --- a/config/db_config.py +++ b/config/db_config.py @@ -12,11 +12,16 @@ import os # mysql config -RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") -RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root") -RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost") -RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306) -RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler") +# RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") +# RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root") +# RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost") +# RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306) +# RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler") +RELATION_DB_HOST = "47.94.233.47" # 替换为你的数据库域名/公网IP +RELATION_DB_PORT = 3306 # 替换为你的数据库端口(通常3306) +RELATION_DB_USER = "remote_user" # 替换为你的数据库用户名 +RELATION_DB_PWD = "314159" # 替换为你的数据库密码 +RELATION_DB_NAME = "Test" # 替换为你的数据库名称 # redis config diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index 7ec6b35..d03d105 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -341,7 +341,8 @@ class BilibiliClient(AbstractApiClient): return await self.get(uri, post_data) async def get_creator_info(self, creator_id: int) -> Dict: - """get creator info + """ + get creator info :param creator_id: 作者 ID """ uri = "/x/space/wbi/acc/info" @@ -355,7 +356,8 @@ class BilibiliClient(AbstractApiClient): pn: int, ps: int = 24, ) -> Dict: - """get video comments + """ + get creator fans :param creator_id: 创作者 ID :param pn: 开始页数 :param ps: 每页数量 @@ -376,7 +378,8 @@ class BilibiliClient(AbstractApiClient): pn: int, ps: int = 24, ) -> Dict: - """get video comments + """ + get creator followings :param creator_id: 创作者 ID :param pn: 开始页数 :param ps: 每页数量 @@ -391,11 +394,27 @@ class BilibiliClient(AbstractApiClient): } return await self.get(uri, post_data) + async def get_creator_dynamics(self, creator_id: int, offset: str = ""): + """ + get creator comments + :param creator_id: 创作者 ID + :param offset: 发送请求所需参数 + :return: + """ + uri = "/x/polymer/web-dynamic/v1/feed/space" + post_data = { + "offset": offset, + "host_mid": creator_id, + "platform": "web", + } + + return await self.get(uri, post_data) + async def get_creator_all_fans(self, creator_info: Dict, crawl_interval: float = 1.0, callback: Optional[Callable] = None, max_count: int = 100) -> List: """ - get video all comments include sub comments + get creator all fans :param creator_info: :param crawl_interval: :param callback: @@ -419,16 +438,13 @@ class BilibiliClient(AbstractApiClient): if not fans_list: break result.extend(fans_list) - utils.logger.info( - f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans successfully") - return result async def get_creator_all_followings(self, creator_info: Dict, crawl_interval: float = 1.0, callback: Optional[Callable] = None, max_count: int = 100) -> List: """ - get video all comments include sub comments + get creator all followings :param creator_info: :param crawl_interval: :param callback: @@ -452,7 +468,33 @@ class BilibiliClient(AbstractApiClient): if not followings_list: break result.extend(followings_list) - utils.logger.info( - f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings successfully") - + return result + + async def get_creator_all_dynamics(self, creator_info: Dict, crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_count: int = 20) -> List: + """ + get creator all followings + :param creator_info: + :param crawl_interval: + :param callback: + :param max_count: 一个up主爬取的最大动态数量 + + :return: up主关注者列表 + """ + creator_id = creator_info["id"] + result = [] + offset = "" + has_more = True + while has_more and len(result) < max_count: + dynamics_res = await self.get_creator_dynamics(creator_id, offset) + dynamics_list: List[Dict] = dynamics_res["items"] + has_more = dynamics_res["has_more"] + offset = dynamics_res["offset"] + if len(result) + len(dynamics_list) > max_count: + dynamics_list = dynamics_list[:max_count - len(result)] + if callback: + await callback(creator_info, dynamics_list) + await asyncio.sleep(crawl_interval) + result.extend(dynamics_list) return result diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index eb6c014..f47519b 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -119,14 +119,16 @@ class BilibiliCrawler(AbstractCrawler): start_day: datetime = datetime.strptime(start, '%Y-%m-%d') end_day: datetime = datetime.strptime(end, '%Y-%m-%d') if start_day > end_day: - raise ValueError('Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') + raise ValueError( + 'Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') elif start_day == end_day: # 搜索同一天的内容 - end_day = start_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second + end_day = start_day + timedelta(days=1) - timedelta( + seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second else: # 搜索 start 至 end end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second # 将其重新转换为时间戳 return str(int(start_day.timestamp())), str(int(end_day.timestamp())) - + async def search(self): """ search bilibili video with keywords @@ -164,9 +166,11 @@ class BilibiliCrawler(AbstractCrawler): semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list = [] try: - task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] + task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) + for video_item in video_list] except Exception as e: - utils.logger.warning(f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}") + utils.logger.warning( + f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}") video_items = await asyncio.gather(*task_list) for video_item in video_items: if video_item: @@ -180,21 +184,23 @@ class BilibiliCrawler(AbstractCrawler): else: for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'): # 按照每一天进行爬取的时间戳参数 - pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d')) + pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), + end=day.strftime('%Y-%m-%d')) page = 1 - #!该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频 - #!除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天 - #!除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!! + # !该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频 + # !除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天 + # !除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!! while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: - #! Catch any error if response return nothing, go to next day + # ! Catch any error if response return nothing, go to next day try: - #! Don't skip any page, to make sure gather all video in one day + # ! Don't skip any page, to make sure gather all video in one day # if page < start_page: # utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}") # page += 1 # continue - utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") + utils.logger.info( + f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") video_id_list: List[str] = [] videos_res = await self.bili_client.search_video_by_keyword( keyword=keyword, @@ -207,7 +213,9 @@ class BilibiliCrawler(AbstractCrawler): video_list: List[Dict] = videos_res.get("result") semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) - task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] + task_list = [ + self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for + video_item in video_list] video_items = await asyncio.gather(*task_list) for video_item in video_items: if video_item: @@ -467,7 +475,6 @@ class BilibiliCrawler(AbstractCrawler): extension_file_name = f"video.mp4" await bilibili_store.store_video(aid, content, extension_file_name) - async def get_all_creator_details(self, creator_id_list: List[int]): """ creator_id_list: get details for creator from creator_id_list @@ -485,7 +492,8 @@ class BilibiliCrawler(AbstractCrawler): creator_id, semaphore), name=creator_id) task_list.append(task) except Exception as e: - utils.logger.warning(f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}") + utils.logger.warning( + f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}") await asyncio.gather(*task_list) @@ -504,8 +512,9 @@ class BilibiliCrawler(AbstractCrawler): "sign": creator_unhandled_info.get("sign"), "avatar": creator_unhandled_info.get("face"), } - await self.get_fans(creator_info, semaphore) - await self.get_followings(creator_info, semaphore) + # await self.get_fans(creator_info, semaphore) + # await self.get_followings(creator_info, semaphore) + await self.get_dynamics(creator_info, semaphore) async def get_fans(self, creator_info: Dict, semaphore: asyncio.Semaphore): """ @@ -523,7 +532,7 @@ class BilibiliCrawler(AbstractCrawler): creator_info=creator_info, crawl_interval=random.random(), callback=bilibili_store.batch_update_bilibili_creator_fans, - max_count=config.CRAWLER_MAX_FANS_COUNT_SINGLENOTES, + max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES, ) except DataFetchError as ex: @@ -549,7 +558,7 @@ class BilibiliCrawler(AbstractCrawler): creator_info=creator_info, crawl_interval=random.random(), callback=bilibili_store.batch_update_bilibili_creator_followings, - max_count=config.CRAWLER_MAX_FANS_COUNT_SINGLENOTES, + max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES, ) except DataFetchError as ex: @@ -558,3 +567,29 @@ class BilibiliCrawler(AbstractCrawler): except Exception as e: utils.logger.error( f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}") + + async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore): + """ + get dynamics for creator id + :param creator_info: + :param semaphore: + :return: + """ + creator_id = creator_info["id"] + async with semaphore: + try: + utils.logger.info( + f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ...") + await self.bili_client.get_creator_all_dynamics( + creator_info=creator_info, + crawl_interval=random.random(), + callback=bilibili_store.batch_update_bilibili_creator_dynamics, + max_count=config.CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES, + ) + + except DataFetchError as ex: + utils.logger.error( + f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}") + except Exception as e: + utils.logger.error( + f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}") diff --git a/schema/tables.sql b/schema/tables.sql index 7e9a9b3..69d00cc 100644 --- a/schema/tables.sql +++ b/schema/tables.sql @@ -76,6 +76,50 @@ CREATE TABLE `bilibili_up_info` KEY `idx_bilibili_vi_user_123456` (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站UP主信息'; +-- ---------------------------- +-- Table structure for bilibili_contact_info +-- ---------------------------- +DROP TABLE IF EXISTS `bilibili_contact_info`; +CREATE TABLE `bilibili_contact_info` +( + `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `up_id` varchar(64) DEFAULT NULL COMMENT 'up主ID', + `fan_id` varchar(64) DEFAULT NULL COMMENT '粉丝ID', + `up_name` varchar(64) DEFAULT NULL COMMENT 'up主昵称', + `fan_name` varchar(64) DEFAULT NULL COMMENT '粉丝昵称', + `up_sign` longtext DEFAULT NULL COMMENT 'up主签名', + `fan_sign` longtext DEFAULT NULL COMMENT '粉丝签名', + `up_avatar` varchar(255) DEFAULT NULL COMMENT 'up主头像地址', + `fan_avatar` varchar(255) DEFAULT NULL COMMENT '粉丝头像地址', + `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', + `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', + PRIMARY KEY (`id`), + KEY `idx_bilibili_contact_info_up_id` (`up_id`), + KEY `idx_bilibili_contact_info_fan_id` (`fan_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站联系人信息'; + +-- ---------------------------- +-- Table structure for bilibili_up_dynamic +-- ---------------------------- +DROP TABLE IF EXISTS `bilibili_up_dynamic`; +CREATE TABLE `bilibili_up_dynamic` +( + `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `dynamic_id` varchar(64) DEFAULT NULL COMMENT '动态ID', + `user_id` varchar(64) DEFAULT NULL COMMENT '用户ID', + `user_name` varchar(64) DEFAULT NULL COMMENT '用户名', + `text` longtext DEFAULT NULL COMMENT '动态文本', + `type` varchar(64) DEFAULT NULL COMMENT '动态类型', + `pub_ts` bigint DEFAULT NULL COMMENT '动态发布时间', + `total_comments` bigint DEFAULT NULL COMMENT '评论数', + `total_forwards` bigint DEFAULT NULL COMMENT '转发数', + `total_liked` bigint DEFAULT NULL COMMENT '点赞数', + `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', + `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', + PRIMARY KEY (`id`), + KEY `idx_bilibili_up_dynamic_dynamic_id` (`dynamic_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站up主动态信息'; + -- ---------------------------- -- Table structure for douyin_aweme -- ---------------------------- @@ -463,7 +507,7 @@ CREATE TABLE `tieba_creator` PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='贴吧创作者'; - +DROP TABLE IF EXISTS `zhihu_content`; CREATE TABLE `zhihu_content` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `content_id` varchar(64) NOT NULL COMMENT '内容ID', @@ -491,7 +535,7 @@ CREATE TABLE `zhihu_content` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎内容(回答、文章、视频)'; - +DROP TABLE IF EXISTS `zhihu_comment`; CREATE TABLE `zhihu_comment` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `comment_id` varchar(64) NOT NULL COMMENT '评论ID', @@ -516,7 +560,7 @@ CREATE TABLE `zhihu_comment` ( KEY `idx_zhihu_comment_publish_time` (`publish_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎评论'; - +DROP TABLE IF EXISTS `zhihu_creator`; CREATE TABLE `zhihu_creator` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `user_id` varchar(64) NOT NULL COMMENT '用户ID', diff --git a/store/bilibili/__init__.py b/store/bilibili/__init__.py index c417dcc..bd208a8 100644 --- a/store/bilibili/__init__.py +++ b/store/bilibili/__init__.py @@ -144,7 +144,7 @@ async def batch_update_bilibili_creator_fans(creator_info: Dict, fans_list: List "sign": fan_item.get("sign"), "avatar": fan_item.get("face"), } - await update_bilibili_creator_fans(creator_info=creator_info, fan_info=fan_info) + await update_bilibili_creator_contact(creator_info=creator_info, fan_info=fan_info) async def batch_update_bilibili_creator_followings(creator_info: Dict, followings_list: List[Dict]): @@ -157,10 +157,36 @@ async def batch_update_bilibili_creator_followings(creator_info: Dict, following "sign": following_item.get("sign"), "avatar": following_item.get("face"), } - await update_bilibili_creator_fans(creator_info=following_info, fan_info=creator_info) + await update_bilibili_creator_contact(creator_info=following_info, fan_info=creator_info) -async def update_bilibili_creator_fans(creator_info: Dict, fan_info: Dict): +async def batch_update_bilibili_creator_dynamics(creator_info: Dict, dynamics_list: List[Dict]): + if not dynamics_list: + return + for dynamic_item in dynamics_list: + dynamic_id: str = dynamic_item["id_str"] + dynamic_text: str = "" + if dynamic_item["modules"]["module_dynamic"].get("desc"): + dynamic_text = dynamic_item["modules"]["module_dynamic"]["desc"]["text"] + dynamic_type: str = dynamic_item["type"].split("_")[-1] + dynamic_pub_ts: str = dynamic_item["modules"]["module_author"]["pub_ts"] + dynamic_stat: Dict = dynamic_item["modules"]["module_stat"] + dynamic_comment: int = dynamic_stat["comment"]["count"] + dynamic_forward: int = dynamic_stat["forward"]["count"] + dynamic_like: int = dynamic_stat["like"]["count"] + dynamic_info: Dict = { + "dynamic_id": dynamic_id, + "text": dynamic_text, + "type": dynamic_type, + "pub_ts": dynamic_pub_ts, + "comment": dynamic_comment, + "forward": dynamic_forward, + "like": dynamic_like, + } + await update_bilibili_creator_dynamic(creator_info=creator_info, dynamic_info=dynamic_info) + + +async def update_bilibili_creator_contact(creator_info: Dict, fan_info: Dict): save_contact_item = { "up_id": creator_info["id"], "fan_id": fan_info["id"], @@ -169,7 +195,25 @@ async def update_bilibili_creator_fans(creator_info: Dict, fan_info: Dict): "up_sign": creator_info["sign"], "fan_sign": fan_info["sign"], "up_avatar": creator_info["avatar"], - "fan_avatar": fan_info["avatar"] + "fan_avatar": fan_info["avatar"], + "last_modify_ts": utils.get_current_timestamp(), } - await BiliStoreFactory.create_store().store_creator_contact(contact_item=save_contact_item) + await BiliStoreFactory.create_store().store_contact(contact_item=save_contact_item) + + +async def update_bilibili_creator_dynamic(creator_info: Dict, dynamic_info: Dict): + save_dynamic_item = { + "dynamic_id": dynamic_info["dynamic_id"], + "user_id": creator_info["id"], + "user_name": creator_info["name"], + "text": dynamic_info["text"], + "type": dynamic_info["type"], + "pub_ts": dynamic_info["pub_ts"], + "comment": dynamic_info["comment"], + "forward": dynamic_info["forward"], + "like": dynamic_info["like"], + "last_modify_ts": utils.get_current_timestamp(), + } + + await BiliStoreFactory.create_store().store_dynamic(dynamic_item=save_dynamic_item) diff --git a/store/bilibili/bilibili_store_impl.py b/store/bilibili/bilibili_store_impl.py index 20d09f1..c888ff5 100644 --- a/store/bilibili/bilibili_store_impl.py +++ b/store/bilibili/bilibili_store_impl.py @@ -38,13 +38,15 @@ def calculate_number_of_files(file_store_path: str) -> int: if not os.path.exists(file_store_path): return 1 try: - return max([int(file_name.split("_")[0])for file_name in os.listdir(file_store_path)])+1 + return max([int(file_name.split("_")[0]) for file_name in os.listdir(file_store_path)]) + 1 except ValueError: return 1 + class BiliCsvStoreImplement(AbstractStore): csv_store_path: str = "data/bilibili" - file_count:int=calculate_number_of_files(csv_store_path) + file_count: int = calculate_number_of_files(csv_store_path) + def make_save_file_name(self, store_type: str) -> str: """ make save file name by store type @@ -107,9 +109,9 @@ class BiliCsvStoreImplement(AbstractStore): """ await self.save_data_to_csv(save_item=creator, store_type="creators") - async def store_creator_contact(self, contact_item: Dict): + async def store_contact(self, contact_item: Dict): """ - Bilibili comment CSV storage implementation + Bilibili contact CSV storage implementation Args: contact_item: creator's contact item dict @@ -117,7 +119,19 @@ class BiliCsvStoreImplement(AbstractStore): """ - await self.save_data_to_csv(save_item=contact_item, store_type="fans") + await self.save_data_to_csv(save_item=contact_item, store_type="contacts") + + async def store_dynamic(self, dynamic_item: Dict): + """ + Bilibili dynamic CSV storage implementation + Args: + dynamic_item: creator's dynamic item dict + + Returns: + + """ + + await self.save_data_to_csv(save_item=dynamic_item, store_type="dynamics") class BiliDbStoreImplement(AbstractStore): @@ -184,16 +198,50 @@ class BiliDbStoreImplement(AbstractStore): else: await update_creator_by_creator_id(creator_id,creator_item=creator) + async def store_contact(self, contact_item: Dict): + """ + Bilibili contact DB storage implementation + Args: + contact_item: contact item dict + + Returns: + + """ + + from .bilibili_store_sql import (add_new_contact, + query_contact_by_up_and_fan, + update_contact_by_id, ) + + up_id = contact_item.get("up_id") + fan_id = contact_item.get("fan_id") + contact_detail: Dict = await query_contact_by_up_and_fan(up_id=up_id, fan_id=fan_id) + if not contact_detail: + contact_item["add_ts"] = utils.get_current_timestamp() + await add_new_contact(contact_item) + else: + key_id = contact_detail.get("id") + await update_contact_by_id(id=key_id, contact_item=contact_item) + + async def store_dynamic(self, dynamic_item): + """ + Bilibili dynamic DB storage implementation + Args: + dynamic_item: dynamic item dict + + Returns: + + """ + + class BiliJsonStoreImplement(AbstractStore): json_store_path: str = "data/bilibili/json" words_store_path: str = "data/bilibili/words" lock = asyncio.Lock() - file_count:int=calculate_number_of_files(json_store_path) + file_count: int = calculate_number_of_files(json_store_path) WordCloud = words.AsyncWordCloudGenerator() - - def make_save_file_name(self, store_type: str) -> (str,str): + def make_save_file_name(self, store_type: str) -> (str, str): """ make save file name by store type Args: @@ -220,7 +268,7 @@ class BiliJsonStoreImplement(AbstractStore): """ pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True) pathlib.Path(self.words_store_path).mkdir(parents=True, exist_ok=True) - save_file_name,words_file_name_prefix = self.make_save_file_name(store_type=store_type) + save_file_name, words_file_name_prefix = self.make_save_file_name(store_type=store_type) save_data = [] async with self.lock: @@ -271,7 +319,7 @@ class BiliJsonStoreImplement(AbstractStore): """ await self.save_data_to_json(creator, "creators") - async def store_creator_contact(self, contact_item: Dict): + async def store_contact(self, contact_item: Dict): """ creator contact JSON storage implementation Args: @@ -281,4 +329,16 @@ class BiliJsonStoreImplement(AbstractStore): """ - await self.save_data_to_json(save_item=contact_item, store_type="fans") + await self.save_data_to_json(save_item=contact_item, store_type="contacts") + + async def store_dynamic(self, dynamic_item: Dict): + """ + creator dynamic JSON storage implementation + Args: + dynamic_item: creator's contact item dict + + Returns: + + """ + + await self.save_data_to_json(save_item=dynamic_item, store_type="dynamics") diff --git a/store/bilibili/bilibili_store_sql.py b/store/bilibili/bilibili_store_sql.py index 5e6356f..513b679 100644 --- a/store/bilibili/bilibili_store_sql.py +++ b/store/bilibili/bilibili_store_sql.py @@ -158,3 +158,49 @@ async def update_creator_by_creator_id(creator_id: str, creator_item: Dict) -> i effect_row: int = await async_db_conn.update_table("bilibili_up_info", creator_item, "user_id", creator_id) return effect_row +async def query_contact_by_up_and_fan(up_id: str, fan_id: str) -> Dict: + """ + 查询一条关联关系 + Args: + up_id: + fan_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from bilibili_contact_info where up_id = '{up_id}' and fan_id = '{fan_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_contact(contact_item: Dict) -> int: + """ + 新增关联关系 + Args: + contact_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("bilibili_contact_info", contact_item) + return last_row_id + + +async def update_contact_by_id(id: str, contact_item: Dict) -> int: + """ + 更新关联关系 + Args: + id: + contact_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("bilibili_contact_info", contact_item, "id", id) + return effect_row + From 59619fff0acfe0382033db084bedc86a182b76f4 Mon Sep 17 00:00:00 2001 From: Bowenwin Date: Thu, 22 May 2025 22:06:06 +0800 Subject: [PATCH 3/4] finish_all --- config/base_config.py | 16 ++++----- media_platform/bilibili/core.py | 4 +-- store/bilibili/__init__.py | 12 +++---- store/bilibili/bilibili_store_impl.py | 13 +++++++- store/bilibili/bilibili_store_sql.py | 47 ++++++++++++++++++++++++++- 5 files changed, 74 insertions(+), 18 deletions(-) diff --git a/config/base_config.py b/config/base_config.py index 34539c9..d05347a 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -46,7 +46,7 @@ HEADLESS = False SAVE_LOGIN_STATE = True # 数据保存类型选项配置,支持三种类型:csv、db、json, 最好保存到DB,有排重的功能。 -SAVE_DATA_OPTION = "csv" # csv or db or json +SAVE_DATA_OPTION = "json" # csv or db or json # 用户浏览器缓存的浏览器文件配置 USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name @@ -72,12 +72,6 @@ ENABLE_GET_COMMENTS = True # 爬取一级评论的数量控制(单视频/帖子) CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 10 -# 爬取作者粉丝和关注列表数量控制(单作者) -CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100 - -# 爬取作者动态粉丝和关注列表数量控制(单作者) -CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50 - # 是否开启爬二级评论模式, 默认不开启爬二级评论 # 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段 ENABLE_GET_SUB_COMMENTS = False @@ -206,4 +200,10 @@ END_DAY = '2024-01-01' # 是否开启按每一天进行爬取的选项,仅支持 bilibili 关键字搜索 # 若为 False,则忽略 START_DAY 与 END_DAY 设置的值 # 若为 True,则按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频 -ALL_DAY = False \ No newline at end of file +ALL_DAY = False + +# 爬取作者粉丝和关注列表数量控制(单作者) +CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100 + +# 爬取作者动态粉丝和关注列表数量控制(单作者) +CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50 \ No newline at end of file diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index f47519b..8c66747 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -512,8 +512,8 @@ class BilibiliCrawler(AbstractCrawler): "sign": creator_unhandled_info.get("sign"), "avatar": creator_unhandled_info.get("face"), } - # await self.get_fans(creator_info, semaphore) - # await self.get_followings(creator_info, semaphore) + await self.get_fans(creator_info, semaphore) + await self.get_followings(creator_info, semaphore) await self.get_dynamics(creator_info, semaphore) async def get_fans(self, creator_info: Dict, semaphore: asyncio.Semaphore): diff --git a/store/bilibili/__init__.py b/store/bilibili/__init__.py index bd208a8..3183358 100644 --- a/store/bilibili/__init__.py +++ b/store/bilibili/__init__.py @@ -179,9 +179,9 @@ async def batch_update_bilibili_creator_dynamics(creator_info: Dict, dynamics_li "text": dynamic_text, "type": dynamic_type, "pub_ts": dynamic_pub_ts, - "comment": dynamic_comment, - "forward": dynamic_forward, - "like": dynamic_like, + "total_comments": dynamic_comment, + "total_forwards": dynamic_forward, + "total_liked": dynamic_like, } await update_bilibili_creator_dynamic(creator_info=creator_info, dynamic_info=dynamic_info) @@ -210,9 +210,9 @@ async def update_bilibili_creator_dynamic(creator_info: Dict, dynamic_info: Dict "text": dynamic_info["text"], "type": dynamic_info["type"], "pub_ts": dynamic_info["pub_ts"], - "comment": dynamic_info["comment"], - "forward": dynamic_info["forward"], - "like": dynamic_info["like"], + "total_comments": dynamic_info["total_comments"], + "total_forwards": dynamic_info["total_forwards"], + "total_liked": dynamic_info["total_liked"], "last_modify_ts": utils.get_current_timestamp(), } diff --git a/store/bilibili/bilibili_store_impl.py b/store/bilibili/bilibili_store_impl.py index c888ff5..00ccd79 100644 --- a/store/bilibili/bilibili_store_impl.py +++ b/store/bilibili/bilibili_store_impl.py @@ -196,7 +196,7 @@ class BiliDbStoreImplement(AbstractStore): creator["add_ts"] = utils.get_current_timestamp() await add_new_creator(creator) else: - await update_creator_by_creator_id(creator_id,creator_item=creator) + await update_creator_by_creator_id(creator_id, creator_item=creator) async def store_contact(self, contact_item: Dict): """ @@ -232,6 +232,17 @@ class BiliDbStoreImplement(AbstractStore): """ + from .bilibili_store_sql import (add_new_dynamic, + query_dynamic_by_dynamic_id, + update_dynamic_by_dynamic_id) + + dynamic_id = dynamic_item.get("dynamic_id") + dynamic_detail = await query_dynamic_by_dynamic_id(dynamic_id=dynamic_id) + if not dynamic_detail: + dynamic_item["add_ts"] = utils.get_current_timestamp() + await add_new_dynamic(dynamic_item) + else: + await update_dynamic_by_dynamic_id(dynamic_id, dynamic_item=dynamic_item) class BiliJsonStoreImplement(AbstractStore): diff --git a/store/bilibili/bilibili_store_sql.py b/store/bilibili/bilibili_store_sql.py index 513b679..6ee4048 100644 --- a/store/bilibili/bilibili_store_sql.py +++ b/store/bilibili/bilibili_store_sql.py @@ -66,7 +66,6 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i return effect_row - async def query_comment_by_comment_id(comment_id: str) -> Dict: """ 查询一条评论内容 @@ -158,6 +157,7 @@ async def update_creator_by_creator_id(creator_id: str, creator_item: Dict) -> i effect_row: int = await async_db_conn.update_table("bilibili_up_info", creator_item, "user_id", creator_id) return effect_row + async def query_contact_by_up_and_fan(up_id: str, fan_id: str) -> Dict: """ 查询一条关联关系 @@ -204,3 +204,48 @@ async def update_contact_by_id(id: str, contact_item: Dict) -> int: effect_row: int = await async_db_conn.update_table("bilibili_contact_info", contact_item, "id", id) return effect_row + +async def query_dynamic_by_dynamic_id(dynamic_id: str) -> Dict: + """ + 查询一条动态信息 + Args: + dynamic_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from bilibili_up_dynamic where dynamic_id = '{dynamic_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_dynamic(dynamic_item: Dict) -> int: + """ + 新增动态信息 + Args: + dynamic_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("bilibili_up_dynamic", dynamic_item) + return last_row_id + + +async def update_dynamic_by_dynamic_id(dynamic_id: str, dynamic_item: Dict) -> int: + """ + 更新动态信息 + Args: + dynamic_id: + dynamic_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("bilibili_up_dynamic", dynamic_item, "dynamic_id", dynamic_id) + return effect_row From 66843f216a36ac47440ad10298b0cbdb84eb5e91 Mon Sep 17 00:00:00 2001 From: Bowenwin Date: Thu, 22 May 2025 22:26:30 +0800 Subject: [PATCH 4/4] finish_all_for_expand_bili --- config/base_config.py | 24 ++++++++-------- config/db_config.py | 15 ++++------ media_platform/bilibili/client.py | 7 ++--- media_platform/bilibili/core.py | 40 ++++++++++++--------------- store/bilibili/bilibili_store_impl.py | 15 +++++----- store/bilibili/bilibili_store_sql.py | 1 + store/douyin/douyin_store_impl.py | 2 +- store/kuaishou/kuaishou_store_impl.py | 2 +- store/tieba/tieba_store_impl.py | 2 +- store/weibo/weibo_store_impl.py | 2 +- store/xhs/xhs_store_impl.py | 2 +- store/zhihu/zhihu_store_impl.py | 2 +- 12 files changed, 51 insertions(+), 63 deletions(-) diff --git a/config/base_config.py b/config/base_config.py index d05347a..102e567 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -10,16 +10,16 @@ # 基础配置 -PLATFORM = "bili" +PLATFORM = "xhs" KEYWORDS = "编程副业,编程兼职" # 关键词搜索配置,以英文逗号分隔 -LOGIN_TYPE = "phone" # qrcode or phone or cookie +LOGIN_TYPE = "qrcode" # qrcode or phone or cookie COOKIES = "" # 具体值参见media_platform.xxx.field下的枚举值,暂时只支持小红书 SORT_TYPE = "popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值,暂时只支持抖音 PUBLISH_TIME_TYPE = 0 CRAWLER_TYPE = ( - "creator" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) + "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) ) # 自定义User Agent(暂时仅对XHS有效) UA = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0' @@ -54,9 +54,6 @@ USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name # 爬取开始页数 默认从第一页开始 START_PAGE = 1 -# 爬取粉丝列表开始页数 默认从第一页开始 -START_CONTACTS_PAGE = 1 - # 爬取视频/帖子的数量控制 CRAWLER_MAX_NOTES_COUNT = 200 @@ -147,11 +144,7 @@ DY_CREATOR_ID_LIST = [ # 指定bili创作者ID列表(sec_id) BILI_CREATOR_ID_LIST = [ - # "20813884", - "520819684", - # "472747194", - # "519872016", - # "372201438", + "20813884", # ........................ ] @@ -202,8 +195,15 @@ END_DAY = '2024-01-01' # 若为 True,则按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频 ALL_DAY = False +#!!! 下面仅支持 bilibili creator搜索 +# 爬取评论creator主页还是爬取creator动态和关系列表(True为前者) +CREATOR_MODE = True + +# 爬取creator粉丝列表时起始爬取页数 +START_CONTACTS_PAGE = 1 + # 爬取作者粉丝和关注列表数量控制(单作者) CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100 -# 爬取作者动态粉丝和关注列表数量控制(单作者) +# 爬取作者动态数量控制(单作者) CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50 \ No newline at end of file diff --git a/config/db_config.py b/config/db_config.py index 2b9c2d5..51d3fd0 100644 --- a/config/db_config.py +++ b/config/db_config.py @@ -12,16 +12,11 @@ import os # mysql config -# RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") -# RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root") -# RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost") -# RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306) -# RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler") -RELATION_DB_HOST = "47.94.233.47" # 替换为你的数据库域名/公网IP -RELATION_DB_PORT = 3306 # 替换为你的数据库端口(通常3306) -RELATION_DB_USER = "remote_user" # 替换为你的数据库用户名 -RELATION_DB_PWD = "314159" # 替换为你的数据库密码 -RELATION_DB_NAME = "Test" # 替换为你的数据库名称 +RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") +RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root") +RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost") +RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306) +RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler") # redis config diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index d03d105..32af357 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -224,7 +224,7 @@ class BilibiliClient(AbstractApiClient): async def get_video_all_comments(self, video_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False, callback: Optional[Callable] = None, - max_count: int = 10, ): + max_count: int = 10,): """ get video all comments include sub comments :param video_id: @@ -251,7 +251,7 @@ class BilibiliClient(AbstractApiClient): if (comment.get("rcount", 0) > 0): { await self.get_video_all_level_two_comments( - video_id, comment_id, CommentOrderType.DEFAULT, 10, crawl_interval, callback) + video_id, comment_id, CommentOrderType.DEFAULT, 10, crawl_interval, callback) } if len(result) + len(comment_list) > max_count: comment_list = comment_list[:max_count - len(result)] @@ -321,8 +321,7 @@ class BilibiliClient(AbstractApiClient): result = await self.get(uri, post_data) return result - async def get_creator_videos(self, creator_id: str, pn: int, ps: int = 30, - order_mode: SearchOrderType = SearchOrderType.LAST_PUBLISH) -> Dict: + async def get_creator_videos(self, creator_id: str, pn: int, ps: int = 30, order_mode: SearchOrderType = SearchOrderType.LAST_PUBLISH) -> Dict: """get all videos for a creator :param creator_id: 创作者 ID :param pn: 页数 diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index 8c66747..1836ba2 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -89,9 +89,11 @@ class BilibiliCrawler(AbstractCrawler): # Get the information and comments of the specified post await self.get_specified_videos(config.BILI_SPECIFIED_ID_LIST) elif config.CRAWLER_TYPE == "creator": - # for creator_id in config.BILI_CREATOR_ID_LIST: - # await self.get_creator_videos(int(creator_id)) - await self.get_all_creator_details(config.BILI_CREATOR_ID_LIST) + if config.CREATOR_MODE: + for creator_id in config.BILI_CREATOR_ID_LIST: + await self.get_creator_videos(int(creator_id)) + else: + await self.get_all_creator_details(config.BILI_CREATOR_ID_LIST) else: pass utils.logger.info( @@ -119,11 +121,9 @@ class BilibiliCrawler(AbstractCrawler): start_day: datetime = datetime.strptime(start, '%Y-%m-%d') end_day: datetime = datetime.strptime(end, '%Y-%m-%d') if start_day > end_day: - raise ValueError( - 'Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') + raise ValueError('Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') elif start_day == end_day: # 搜索同一天的内容 - end_day = start_day + timedelta(days=1) - timedelta( - seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second + end_day = start_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second else: # 搜索 start 至 end end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second # 将其重新转换为时间戳 @@ -166,11 +166,9 @@ class BilibiliCrawler(AbstractCrawler): semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list = [] try: - task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) - for video_item in video_list] + task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] except Exception as e: - utils.logger.warning( - f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}") + utils.logger.warning(f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}") video_items = await asyncio.gather(*task_list) for video_item in video_items: if video_item: @@ -184,23 +182,21 @@ class BilibiliCrawler(AbstractCrawler): else: for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'): # 按照每一天进行爬取的时间戳参数 - pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), - end=day.strftime('%Y-%m-%d')) + pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d')) page = 1 - # !该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频 - # !除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天 - # !除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!! + #!该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频 + #!除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天 + #!除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!! while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: - # ! Catch any error if response return nothing, go to next day + #! Catch any error if response return nothing, go to next day try: - # ! Don't skip any page, to make sure gather all video in one day + #! Don't skip any page, to make sure gather all video in one day # if page < start_page: # utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}") # page += 1 # continue - utils.logger.info( - f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") + utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") video_id_list: List[str] = [] videos_res = await self.bili_client.search_video_by_keyword( keyword=keyword, @@ -213,9 +209,7 @@ class BilibiliCrawler(AbstractCrawler): video_list: List[Dict] = videos_res.get("result") semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) - task_list = [ - self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for - video_item in video_list] + task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] video_items = await asyncio.gather(*task_list) for video_item in video_items: if video_item: diff --git a/store/bilibili/bilibili_store_impl.py b/store/bilibili/bilibili_store_impl.py index 00ccd79..0fa1504 100644 --- a/store/bilibili/bilibili_store_impl.py +++ b/store/bilibili/bilibili_store_impl.py @@ -38,15 +38,13 @@ def calculate_number_of_files(file_store_path: str) -> int: if not os.path.exists(file_store_path): return 1 try: - return max([int(file_name.split("_")[0]) for file_name in os.listdir(file_store_path)]) + 1 + return max([int(file_name.split("_")[0])for file_name in os.listdir(file_store_path)])+1 except ValueError: return 1 - class BiliCsvStoreImplement(AbstractStore): csv_store_path: str = "data/bilibili" - file_count: int = calculate_number_of_files(csv_store_path) - + file_count:int=calculate_number_of_files(csv_store_path) def make_save_file_name(self, store_type: str) -> str: """ make save file name by store type @@ -196,7 +194,7 @@ class BiliDbStoreImplement(AbstractStore): creator["add_ts"] = utils.get_current_timestamp() await add_new_creator(creator) else: - await update_creator_by_creator_id(creator_id, creator_item=creator) + await update_creator_by_creator_id(creator_id,creator_item=creator) async def store_contact(self, contact_item: Dict): """ @@ -249,10 +247,11 @@ class BiliJsonStoreImplement(AbstractStore): json_store_path: str = "data/bilibili/json" words_store_path: str = "data/bilibili/words" lock = asyncio.Lock() - file_count: int = calculate_number_of_files(json_store_path) + file_count:int=calculate_number_of_files(json_store_path) WordCloud = words.AsyncWordCloudGenerator() - def make_save_file_name(self, store_type: str) -> (str, str): + + def make_save_file_name(self, store_type: str) -> (str,str): """ make save file name by store type Args: @@ -279,7 +278,7 @@ class BiliJsonStoreImplement(AbstractStore): """ pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True) pathlib.Path(self.words_store_path).mkdir(parents=True, exist_ok=True) - save_file_name, words_file_name_prefix = self.make_save_file_name(store_type=store_type) + save_file_name,words_file_name_prefix = self.make_save_file_name(store_type=store_type) save_data = [] async with self.lock: diff --git a/store/bilibili/bilibili_store_sql.py b/store/bilibili/bilibili_store_sql.py index 6ee4048..02b146c 100644 --- a/store/bilibili/bilibili_store_sql.py +++ b/store/bilibili/bilibili_store_sql.py @@ -66,6 +66,7 @@ async def update_content_by_content_id(content_id: str, content_item: Dict) -> i return effect_row + async def query_comment_by_comment_id(comment_id: str) -> Dict: """ 查询一条评论内容 diff --git a/store/douyin/douyin_store_impl.py b/store/douyin/douyin_store_impl.py index c6308b1..30c993c 100644 --- a/store/douyin/douyin_store_impl.py +++ b/store/douyin/douyin_store_impl.py @@ -238,7 +238,7 @@ class DouyinJsonStoreImplement(AbstractStore): async def store_comment(self, comment_item: Dict): """ - comment JSON storage implementatio + comment JSON storage implementation Args: comment_item: diff --git a/store/kuaishou/kuaishou_store_impl.py b/store/kuaishou/kuaishou_store_impl.py index 0ee9f64..4866796 100644 --- a/store/kuaishou/kuaishou_store_impl.py +++ b/store/kuaishou/kuaishou_store_impl.py @@ -215,7 +215,7 @@ class KuaishouJsonStoreImplement(AbstractStore): async def store_comment(self, comment_item: Dict): """ - comment JSON storage implementatio + comment JSON storage implementation Args: comment_item: diff --git a/store/tieba/tieba_store_impl.py b/store/tieba/tieba_store_impl.py index ade65e5..ff5da80 100644 --- a/store/tieba/tieba_store_impl.py +++ b/store/tieba/tieba_store_impl.py @@ -235,7 +235,7 @@ class TieBaJsonStoreImplement(AbstractStore): async def store_comment(self, comment_item: Dict): """ - comment JSON storage implementatio + comment JSON storage implementation Args: comment_item: diff --git a/store/weibo/weibo_store_impl.py b/store/weibo/weibo_store_impl.py index 0db1e51..7348fec 100644 --- a/store/weibo/weibo_store_impl.py +++ b/store/weibo/weibo_store_impl.py @@ -241,7 +241,7 @@ class WeiboJsonStoreImplement(AbstractStore): async def store_comment(self, comment_item: Dict): """ - comment JSON storage implementatio + comment JSON storage implementation Args: comment_item: diff --git a/store/xhs/xhs_store_impl.py b/store/xhs/xhs_store_impl.py index 54ce528..5ad4979 100644 --- a/store/xhs/xhs_store_impl.py +++ b/store/xhs/xhs_store_impl.py @@ -236,7 +236,7 @@ class XhsJsonStoreImplement(AbstractStore): async def store_comment(self, comment_item: Dict): """ - comment JSON storage implementatio + comment JSON storage implementation Args: comment_item: diff --git a/store/zhihu/zhihu_store_impl.py b/store/zhihu/zhihu_store_impl.py index a5c24a3..34d0a5b 100644 --- a/store/zhihu/zhihu_store_impl.py +++ b/store/zhihu/zhihu_store_impl.py @@ -235,7 +235,7 @@ class ZhihuJsonStoreImplement(AbstractStore): async def store_comment(self, comment_item: Dict): """ - comment JSON storage implementatio + comment JSON storage implementation Args: comment_item: