From 44e3d370ffaf45b815add882dae1ecd7c225256a Mon Sep 17 00:00:00 2001 From: Bowenwin Date: Thu, 22 May 2025 20:31:48 +0800 Subject: [PATCH] fix_words --- config/base_config.py | 13 +++-- config/db_config.py | 15 +++-- media_platform/bilibili/client.py | 64 +++++++++++++++++---- media_platform/bilibili/core.py | 73 +++++++++++++++++------- schema/tables.sql | 50 +++++++++++++++- store/bilibili/__init__.py | 54 ++++++++++++++++-- store/bilibili/bilibili_store_impl.py | 82 +++++++++++++++++++++++---- store/bilibili/bilibili_store_sql.py | 46 +++++++++++++++ 8 files changed, 338 insertions(+), 59 deletions(-) diff --git a/config/base_config.py b/config/base_config.py index 75154cd..34539c9 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -46,7 +46,7 @@ HEADLESS = False SAVE_LOGIN_STATE = True # 数据保存类型选项配置,支持三种类型:csv、db、json, 最好保存到DB,有排重的功能。 -SAVE_DATA_OPTION = "json" # csv or db or json +SAVE_DATA_OPTION = "csv" # csv or db or json # 用户浏览器缓存的浏览器文件配置 USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name @@ -73,7 +73,10 @@ ENABLE_GET_COMMENTS = True CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 10 # 爬取作者粉丝和关注列表数量控制(单作者) -CRAWLER_MAX_FANS_COUNT_SINGLENOTES = 100 +CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100 + +# 爬取作者动态粉丝和关注列表数量控制(单作者) +CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50 # 是否开启爬二级评论模式, 默认不开启爬二级评论 # 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段 @@ -152,9 +155,9 @@ DY_CREATOR_ID_LIST = [ BILI_CREATOR_ID_LIST = [ # "20813884", "520819684", - "472747194", - "519872016", - "372201438", + # "472747194", + # "519872016", + # "372201438", # ........................ ] diff --git a/config/db_config.py b/config/db_config.py index 51d3fd0..2b9c2d5 100644 --- a/config/db_config.py +++ b/config/db_config.py @@ -12,11 +12,16 @@ import os # mysql config -RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") -RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root") -RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost") -RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306) -RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler") +# RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") +# RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root") +# RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost") +# RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306) +# RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler") +RELATION_DB_HOST = "47.94.233.47" # 替换为你的数据库域名/公网IP +RELATION_DB_PORT = 3306 # 替换为你的数据库端口(通常3306) +RELATION_DB_USER = "remote_user" # 替换为你的数据库用户名 +RELATION_DB_PWD = "314159" # 替换为你的数据库密码 +RELATION_DB_NAME = "Test" # 替换为你的数据库名称 # redis config diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index 7ec6b35..d03d105 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -341,7 +341,8 @@ class BilibiliClient(AbstractApiClient): return await self.get(uri, post_data) async def get_creator_info(self, creator_id: int) -> Dict: - """get creator info + """ + get creator info :param creator_id: 作者 ID """ uri = "/x/space/wbi/acc/info" @@ -355,7 +356,8 @@ class BilibiliClient(AbstractApiClient): pn: int, ps: int = 24, ) -> Dict: - """get video comments + """ + get creator fans :param creator_id: 创作者 ID :param pn: 开始页数 :param ps: 每页数量 @@ -376,7 +378,8 @@ class BilibiliClient(AbstractApiClient): pn: int, ps: int = 24, ) -> Dict: - """get video comments + """ + get creator followings :param creator_id: 创作者 ID :param pn: 开始页数 :param ps: 每页数量 @@ -391,11 +394,27 @@ class BilibiliClient(AbstractApiClient): } return await self.get(uri, post_data) + async def get_creator_dynamics(self, creator_id: int, offset: str = ""): + """ + get creator comments + :param creator_id: 创作者 ID + :param offset: 发送请求所需参数 + :return: + """ + uri = "/x/polymer/web-dynamic/v1/feed/space" + post_data = { + "offset": offset, + "host_mid": creator_id, + "platform": "web", + } + + return await self.get(uri, post_data) + async def get_creator_all_fans(self, creator_info: Dict, crawl_interval: float = 1.0, callback: Optional[Callable] = None, max_count: int = 100) -> List: """ - get video all comments include sub comments + get creator all fans :param creator_info: :param crawl_interval: :param callback: @@ -419,16 +438,13 @@ class BilibiliClient(AbstractApiClient): if not fans_list: break result.extend(fans_list) - utils.logger.info( - f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans successfully") - return result async def get_creator_all_followings(self, creator_info: Dict, crawl_interval: float = 1.0, callback: Optional[Callable] = None, max_count: int = 100) -> List: """ - get video all comments include sub comments + get creator all followings :param creator_info: :param crawl_interval: :param callback: @@ -452,7 +468,33 @@ class BilibiliClient(AbstractApiClient): if not followings_list: break result.extend(followings_list) - utils.logger.info( - f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings successfully") - + return result + + async def get_creator_all_dynamics(self, creator_info: Dict, crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_count: int = 20) -> List: + """ + get creator all followings + :param creator_info: + :param crawl_interval: + :param callback: + :param max_count: 一个up主爬取的最大动态数量 + + :return: up主关注者列表 + """ + creator_id = creator_info["id"] + result = [] + offset = "" + has_more = True + while has_more and len(result) < max_count: + dynamics_res = await self.get_creator_dynamics(creator_id, offset) + dynamics_list: List[Dict] = dynamics_res["items"] + has_more = dynamics_res["has_more"] + offset = dynamics_res["offset"] + if len(result) + len(dynamics_list) > max_count: + dynamics_list = dynamics_list[:max_count - len(result)] + if callback: + await callback(creator_info, dynamics_list) + await asyncio.sleep(crawl_interval) + result.extend(dynamics_list) return result diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index eb6c014..f47519b 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -119,14 +119,16 @@ class BilibiliCrawler(AbstractCrawler): start_day: datetime = datetime.strptime(start, '%Y-%m-%d') end_day: datetime = datetime.strptime(end, '%Y-%m-%d') if start_day > end_day: - raise ValueError('Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') + raise ValueError( + 'Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') elif start_day == end_day: # 搜索同一天的内容 - end_day = start_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second + end_day = start_day + timedelta(days=1) - timedelta( + seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second else: # 搜索 start 至 end end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second # 将其重新转换为时间戳 return str(int(start_day.timestamp())), str(int(end_day.timestamp())) - + async def search(self): """ search bilibili video with keywords @@ -164,9 +166,11 @@ class BilibiliCrawler(AbstractCrawler): semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list = [] try: - task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] + task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) + for video_item in video_list] except Exception as e: - utils.logger.warning(f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}") + utils.logger.warning( + f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}") video_items = await asyncio.gather(*task_list) for video_item in video_items: if video_item: @@ -180,21 +184,23 @@ class BilibiliCrawler(AbstractCrawler): else: for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'): # 按照每一天进行爬取的时间戳参数 - pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d')) + pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), + end=day.strftime('%Y-%m-%d')) page = 1 - #!该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频 - #!除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天 - #!除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!! + # !该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频 + # !除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天 + # !除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!! while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: - #! Catch any error if response return nothing, go to next day + # ! Catch any error if response return nothing, go to next day try: - #! Don't skip any page, to make sure gather all video in one day + # ! Don't skip any page, to make sure gather all video in one day # if page < start_page: # utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}") # page += 1 # continue - utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") + utils.logger.info( + f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") video_id_list: List[str] = [] videos_res = await self.bili_client.search_video_by_keyword( keyword=keyword, @@ -207,7 +213,9 @@ class BilibiliCrawler(AbstractCrawler): video_list: List[Dict] = videos_res.get("result") semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) - task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] + task_list = [ + self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for + video_item in video_list] video_items = await asyncio.gather(*task_list) for video_item in video_items: if video_item: @@ -467,7 +475,6 @@ class BilibiliCrawler(AbstractCrawler): extension_file_name = f"video.mp4" await bilibili_store.store_video(aid, content, extension_file_name) - async def get_all_creator_details(self, creator_id_list: List[int]): """ creator_id_list: get details for creator from creator_id_list @@ -485,7 +492,8 @@ class BilibiliCrawler(AbstractCrawler): creator_id, semaphore), name=creator_id) task_list.append(task) except Exception as e: - utils.logger.warning(f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}") + utils.logger.warning( + f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}") await asyncio.gather(*task_list) @@ -504,8 +512,9 @@ class BilibiliCrawler(AbstractCrawler): "sign": creator_unhandled_info.get("sign"), "avatar": creator_unhandled_info.get("face"), } - await self.get_fans(creator_info, semaphore) - await self.get_followings(creator_info, semaphore) + # await self.get_fans(creator_info, semaphore) + # await self.get_followings(creator_info, semaphore) + await self.get_dynamics(creator_info, semaphore) async def get_fans(self, creator_info: Dict, semaphore: asyncio.Semaphore): """ @@ -523,7 +532,7 @@ class BilibiliCrawler(AbstractCrawler): creator_info=creator_info, crawl_interval=random.random(), callback=bilibili_store.batch_update_bilibili_creator_fans, - max_count=config.CRAWLER_MAX_FANS_COUNT_SINGLENOTES, + max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES, ) except DataFetchError as ex: @@ -549,7 +558,7 @@ class BilibiliCrawler(AbstractCrawler): creator_info=creator_info, crawl_interval=random.random(), callback=bilibili_store.batch_update_bilibili_creator_followings, - max_count=config.CRAWLER_MAX_FANS_COUNT_SINGLENOTES, + max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES, ) except DataFetchError as ex: @@ -558,3 +567,29 @@ class BilibiliCrawler(AbstractCrawler): except Exception as e: utils.logger.error( f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}") + + async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore): + """ + get dynamics for creator id + :param creator_info: + :param semaphore: + :return: + """ + creator_id = creator_info["id"] + async with semaphore: + try: + utils.logger.info( + f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ...") + await self.bili_client.get_creator_all_dynamics( + creator_info=creator_info, + crawl_interval=random.random(), + callback=bilibili_store.batch_update_bilibili_creator_dynamics, + max_count=config.CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES, + ) + + except DataFetchError as ex: + utils.logger.error( + f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}") + except Exception as e: + utils.logger.error( + f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}") diff --git a/schema/tables.sql b/schema/tables.sql index 7e9a9b3..69d00cc 100644 --- a/schema/tables.sql +++ b/schema/tables.sql @@ -76,6 +76,50 @@ CREATE TABLE `bilibili_up_info` KEY `idx_bilibili_vi_user_123456` (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站UP主信息'; +-- ---------------------------- +-- Table structure for bilibili_contact_info +-- ---------------------------- +DROP TABLE IF EXISTS `bilibili_contact_info`; +CREATE TABLE `bilibili_contact_info` +( + `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `up_id` varchar(64) DEFAULT NULL COMMENT 'up主ID', + `fan_id` varchar(64) DEFAULT NULL COMMENT '粉丝ID', + `up_name` varchar(64) DEFAULT NULL COMMENT 'up主昵称', + `fan_name` varchar(64) DEFAULT NULL COMMENT '粉丝昵称', + `up_sign` longtext DEFAULT NULL COMMENT 'up主签名', + `fan_sign` longtext DEFAULT NULL COMMENT '粉丝签名', + `up_avatar` varchar(255) DEFAULT NULL COMMENT 'up主头像地址', + `fan_avatar` varchar(255) DEFAULT NULL COMMENT '粉丝头像地址', + `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', + `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', + PRIMARY KEY (`id`), + KEY `idx_bilibili_contact_info_up_id` (`up_id`), + KEY `idx_bilibili_contact_info_fan_id` (`fan_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站联系人信息'; + +-- ---------------------------- +-- Table structure for bilibili_up_dynamic +-- ---------------------------- +DROP TABLE IF EXISTS `bilibili_up_dynamic`; +CREATE TABLE `bilibili_up_dynamic` +( + `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `dynamic_id` varchar(64) DEFAULT NULL COMMENT '动态ID', + `user_id` varchar(64) DEFAULT NULL COMMENT '用户ID', + `user_name` varchar(64) DEFAULT NULL COMMENT '用户名', + `text` longtext DEFAULT NULL COMMENT '动态文本', + `type` varchar(64) DEFAULT NULL COMMENT '动态类型', + `pub_ts` bigint DEFAULT NULL COMMENT '动态发布时间', + `total_comments` bigint DEFAULT NULL COMMENT '评论数', + `total_forwards` bigint DEFAULT NULL COMMENT '转发数', + `total_liked` bigint DEFAULT NULL COMMENT '点赞数', + `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', + `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', + PRIMARY KEY (`id`), + KEY `idx_bilibili_up_dynamic_dynamic_id` (`dynamic_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站up主动态信息'; + -- ---------------------------- -- Table structure for douyin_aweme -- ---------------------------- @@ -463,7 +507,7 @@ CREATE TABLE `tieba_creator` PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='贴吧创作者'; - +DROP TABLE IF EXISTS `zhihu_content`; CREATE TABLE `zhihu_content` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `content_id` varchar(64) NOT NULL COMMENT '内容ID', @@ -491,7 +535,7 @@ CREATE TABLE `zhihu_content` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎内容(回答、文章、视频)'; - +DROP TABLE IF EXISTS `zhihu_comment`; CREATE TABLE `zhihu_comment` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `comment_id` varchar(64) NOT NULL COMMENT '评论ID', @@ -516,7 +560,7 @@ CREATE TABLE `zhihu_comment` ( KEY `idx_zhihu_comment_publish_time` (`publish_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎评论'; - +DROP TABLE IF EXISTS `zhihu_creator`; CREATE TABLE `zhihu_creator` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `user_id` varchar(64) NOT NULL COMMENT '用户ID', diff --git a/store/bilibili/__init__.py b/store/bilibili/__init__.py index c417dcc..bd208a8 100644 --- a/store/bilibili/__init__.py +++ b/store/bilibili/__init__.py @@ -144,7 +144,7 @@ async def batch_update_bilibili_creator_fans(creator_info: Dict, fans_list: List "sign": fan_item.get("sign"), "avatar": fan_item.get("face"), } - await update_bilibili_creator_fans(creator_info=creator_info, fan_info=fan_info) + await update_bilibili_creator_contact(creator_info=creator_info, fan_info=fan_info) async def batch_update_bilibili_creator_followings(creator_info: Dict, followings_list: List[Dict]): @@ -157,10 +157,36 @@ async def batch_update_bilibili_creator_followings(creator_info: Dict, following "sign": following_item.get("sign"), "avatar": following_item.get("face"), } - await update_bilibili_creator_fans(creator_info=following_info, fan_info=creator_info) + await update_bilibili_creator_contact(creator_info=following_info, fan_info=creator_info) -async def update_bilibili_creator_fans(creator_info: Dict, fan_info: Dict): +async def batch_update_bilibili_creator_dynamics(creator_info: Dict, dynamics_list: List[Dict]): + if not dynamics_list: + return + for dynamic_item in dynamics_list: + dynamic_id: str = dynamic_item["id_str"] + dynamic_text: str = "" + if dynamic_item["modules"]["module_dynamic"].get("desc"): + dynamic_text = dynamic_item["modules"]["module_dynamic"]["desc"]["text"] + dynamic_type: str = dynamic_item["type"].split("_")[-1] + dynamic_pub_ts: str = dynamic_item["modules"]["module_author"]["pub_ts"] + dynamic_stat: Dict = dynamic_item["modules"]["module_stat"] + dynamic_comment: int = dynamic_stat["comment"]["count"] + dynamic_forward: int = dynamic_stat["forward"]["count"] + dynamic_like: int = dynamic_stat["like"]["count"] + dynamic_info: Dict = { + "dynamic_id": dynamic_id, + "text": dynamic_text, + "type": dynamic_type, + "pub_ts": dynamic_pub_ts, + "comment": dynamic_comment, + "forward": dynamic_forward, + "like": dynamic_like, + } + await update_bilibili_creator_dynamic(creator_info=creator_info, dynamic_info=dynamic_info) + + +async def update_bilibili_creator_contact(creator_info: Dict, fan_info: Dict): save_contact_item = { "up_id": creator_info["id"], "fan_id": fan_info["id"], @@ -169,7 +195,25 @@ async def update_bilibili_creator_fans(creator_info: Dict, fan_info: Dict): "up_sign": creator_info["sign"], "fan_sign": fan_info["sign"], "up_avatar": creator_info["avatar"], - "fan_avatar": fan_info["avatar"] + "fan_avatar": fan_info["avatar"], + "last_modify_ts": utils.get_current_timestamp(), } - await BiliStoreFactory.create_store().store_creator_contact(contact_item=save_contact_item) + await BiliStoreFactory.create_store().store_contact(contact_item=save_contact_item) + + +async def update_bilibili_creator_dynamic(creator_info: Dict, dynamic_info: Dict): + save_dynamic_item = { + "dynamic_id": dynamic_info["dynamic_id"], + "user_id": creator_info["id"], + "user_name": creator_info["name"], + "text": dynamic_info["text"], + "type": dynamic_info["type"], + "pub_ts": dynamic_info["pub_ts"], + "comment": dynamic_info["comment"], + "forward": dynamic_info["forward"], + "like": dynamic_info["like"], + "last_modify_ts": utils.get_current_timestamp(), + } + + await BiliStoreFactory.create_store().store_dynamic(dynamic_item=save_dynamic_item) diff --git a/store/bilibili/bilibili_store_impl.py b/store/bilibili/bilibili_store_impl.py index 20d09f1..c888ff5 100644 --- a/store/bilibili/bilibili_store_impl.py +++ b/store/bilibili/bilibili_store_impl.py @@ -38,13 +38,15 @@ def calculate_number_of_files(file_store_path: str) -> int: if not os.path.exists(file_store_path): return 1 try: - return max([int(file_name.split("_")[0])for file_name in os.listdir(file_store_path)])+1 + return max([int(file_name.split("_")[0]) for file_name in os.listdir(file_store_path)]) + 1 except ValueError: return 1 + class BiliCsvStoreImplement(AbstractStore): csv_store_path: str = "data/bilibili" - file_count:int=calculate_number_of_files(csv_store_path) + file_count: int = calculate_number_of_files(csv_store_path) + def make_save_file_name(self, store_type: str) -> str: """ make save file name by store type @@ -107,9 +109,9 @@ class BiliCsvStoreImplement(AbstractStore): """ await self.save_data_to_csv(save_item=creator, store_type="creators") - async def store_creator_contact(self, contact_item: Dict): + async def store_contact(self, contact_item: Dict): """ - Bilibili comment CSV storage implementation + Bilibili contact CSV storage implementation Args: contact_item: creator's contact item dict @@ -117,7 +119,19 @@ class BiliCsvStoreImplement(AbstractStore): """ - await self.save_data_to_csv(save_item=contact_item, store_type="fans") + await self.save_data_to_csv(save_item=contact_item, store_type="contacts") + + async def store_dynamic(self, dynamic_item: Dict): + """ + Bilibili dynamic CSV storage implementation + Args: + dynamic_item: creator's dynamic item dict + + Returns: + + """ + + await self.save_data_to_csv(save_item=dynamic_item, store_type="dynamics") class BiliDbStoreImplement(AbstractStore): @@ -184,16 +198,50 @@ class BiliDbStoreImplement(AbstractStore): else: await update_creator_by_creator_id(creator_id,creator_item=creator) + async def store_contact(self, contact_item: Dict): + """ + Bilibili contact DB storage implementation + Args: + contact_item: contact item dict + + Returns: + + """ + + from .bilibili_store_sql import (add_new_contact, + query_contact_by_up_and_fan, + update_contact_by_id, ) + + up_id = contact_item.get("up_id") + fan_id = contact_item.get("fan_id") + contact_detail: Dict = await query_contact_by_up_and_fan(up_id=up_id, fan_id=fan_id) + if not contact_detail: + contact_item["add_ts"] = utils.get_current_timestamp() + await add_new_contact(contact_item) + else: + key_id = contact_detail.get("id") + await update_contact_by_id(id=key_id, contact_item=contact_item) + + async def store_dynamic(self, dynamic_item): + """ + Bilibili dynamic DB storage implementation + Args: + dynamic_item: dynamic item dict + + Returns: + + """ + + class BiliJsonStoreImplement(AbstractStore): json_store_path: str = "data/bilibili/json" words_store_path: str = "data/bilibili/words" lock = asyncio.Lock() - file_count:int=calculate_number_of_files(json_store_path) + file_count: int = calculate_number_of_files(json_store_path) WordCloud = words.AsyncWordCloudGenerator() - - def make_save_file_name(self, store_type: str) -> (str,str): + def make_save_file_name(self, store_type: str) -> (str, str): """ make save file name by store type Args: @@ -220,7 +268,7 @@ class BiliJsonStoreImplement(AbstractStore): """ pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True) pathlib.Path(self.words_store_path).mkdir(parents=True, exist_ok=True) - save_file_name,words_file_name_prefix = self.make_save_file_name(store_type=store_type) + save_file_name, words_file_name_prefix = self.make_save_file_name(store_type=store_type) save_data = [] async with self.lock: @@ -271,7 +319,7 @@ class BiliJsonStoreImplement(AbstractStore): """ await self.save_data_to_json(creator, "creators") - async def store_creator_contact(self, contact_item: Dict): + async def store_contact(self, contact_item: Dict): """ creator contact JSON storage implementation Args: @@ -281,4 +329,16 @@ class BiliJsonStoreImplement(AbstractStore): """ - await self.save_data_to_json(save_item=contact_item, store_type="fans") + await self.save_data_to_json(save_item=contact_item, store_type="contacts") + + async def store_dynamic(self, dynamic_item: Dict): + """ + creator dynamic JSON storage implementation + Args: + dynamic_item: creator's contact item dict + + Returns: + + """ + + await self.save_data_to_json(save_item=dynamic_item, store_type="dynamics") diff --git a/store/bilibili/bilibili_store_sql.py b/store/bilibili/bilibili_store_sql.py index 5e6356f..513b679 100644 --- a/store/bilibili/bilibili_store_sql.py +++ b/store/bilibili/bilibili_store_sql.py @@ -158,3 +158,49 @@ async def update_creator_by_creator_id(creator_id: str, creator_item: Dict) -> i effect_row: int = await async_db_conn.update_table("bilibili_up_info", creator_item, "user_id", creator_id) return effect_row +async def query_contact_by_up_and_fan(up_id: str, fan_id: str) -> Dict: + """ + 查询一条关联关系 + Args: + up_id: + fan_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from bilibili_contact_info where up_id = '{up_id}' and fan_id = '{fan_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_contact(contact_item: Dict) -> int: + """ + 新增关联关系 + Args: + contact_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("bilibili_contact_info", contact_item) + return last_row_id + + +async def update_contact_by_id(id: str, contact_item: Dict) -> int: + """ + 更新关联关系 + Args: + id: + contact_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("bilibili_contact_info", contact_item, "id", id) + return effect_row +