diff --git a/config/base_config.py b/config/base_config.py index 2f94975..102e567 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -193,4 +193,17 @@ END_DAY = '2024-01-01' # 是否开启按每一天进行爬取的选项,仅支持 bilibili 关键字搜索 # 若为 False,则忽略 START_DAY 与 END_DAY 设置的值 # 若为 True,则按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频 -ALL_DAY = False \ No newline at end of file +ALL_DAY = False + +#!!! 下面仅支持 bilibili creator搜索 +# 爬取评论creator主页还是爬取creator动态和关系列表(True为前者) +CREATOR_MODE = True + +# 爬取creator粉丝列表时起始爬取页数 +START_CONTACTS_PAGE = 1 + +# 爬取作者粉丝和关注列表数量控制(单作者) +CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100 + +# 爬取作者动态数量控制(单作者) +CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50 \ No newline at end of file diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index 37c087b..32af357 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -21,6 +21,7 @@ from urllib.parse import urlencode import httpx from playwright.async_api import BrowserContext, Page +import config from base.base_crawler import AbstractApiClient from tools import utils @@ -337,3 +338,162 @@ class BilibiliClient(AbstractApiClient): "order": order_mode, } return await self.get(uri, post_data) + + async def get_creator_info(self, creator_id: int) -> Dict: + """ + get creator info + :param creator_id: 作者 ID + """ + uri = "/x/space/wbi/acc/info" + post_data = { + "mid": creator_id, + } + return await self.get(uri, post_data) + + async def get_creator_fans(self, + creator_id: int, + pn: int, + ps: int = 24, + ) -> Dict: + """ + get creator fans + :param creator_id: 创作者 ID + :param pn: 开始页数 + :param ps: 每页数量 + :return: + """ + uri = "/x/relation/fans" + post_data = { + 'vmid': creator_id, + "pn": pn, + "ps": ps, + "gaia_source": "main_web", + + } + return await self.get(uri, post_data) + + async def get_creator_followings(self, + creator_id: int, + pn: int, + ps: int = 24, + ) -> Dict: + """ + get creator followings + :param creator_id: 创作者 ID + :param pn: 开始页数 + :param ps: 每页数量 + :return: + """ + uri = "/x/relation/followings" + post_data = { + "vmid": creator_id, + "pn": pn, + "ps": ps, + "gaia_source": "main_web", + } + return await self.get(uri, post_data) + + async def get_creator_dynamics(self, creator_id: int, offset: str = ""): + """ + get creator comments + :param creator_id: 创作者 ID + :param offset: 发送请求所需参数 + :return: + """ + uri = "/x/polymer/web-dynamic/v1/feed/space" + post_data = { + "offset": offset, + "host_mid": creator_id, + "platform": "web", + } + + return await self.get(uri, post_data) + + async def get_creator_all_fans(self, creator_info: Dict, crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_count: int = 100) -> List: + """ + get creator all fans + :param creator_info: + :param crawl_interval: + :param callback: + :param max_count: 一个up主爬取的最大粉丝数量 + + :return: up主粉丝数列表 + """ + creator_id = creator_info["id"] + result = [] + pn = config.START_CONTACTS_PAGE + while len(result) < max_count: + fans_res: Dict = await self.get_creator_fans(creator_id, pn=pn) + fans_list: List[Dict] = fans_res.get("list", []) + + pn += 1 + if len(result) + len(fans_list) > max_count: + fans_list = fans_list[:max_count - len(result)] + if callback: # 如果有回调函数,就执行回调函数 + await callback(creator_info, fans_list) + await asyncio.sleep(crawl_interval) + if not fans_list: + break + result.extend(fans_list) + return result + + async def get_creator_all_followings(self, creator_info: Dict, crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_count: int = 100) -> List: + """ + get creator all followings + :param creator_info: + :param crawl_interval: + :param callback: + :param max_count: 一个up主爬取的最大关注者数量 + + :return: up主关注者列表 + """ + creator_id = creator_info["id"] + result = [] + pn = config.START_CONTACTS_PAGE + while len(result) < max_count: + followings_res: Dict = await self.get_creator_followings(creator_id, pn=pn) + followings_list: List[Dict] = followings_res.get("list", []) + + pn += 1 + if len(result) + len(followings_list) > max_count: + followings_list = followings_list[:max_count - len(result)] + if callback: # 如果有回调函数,就执行回调函数 + await callback(creator_info, followings_list) + await asyncio.sleep(crawl_interval) + if not followings_list: + break + result.extend(followings_list) + return result + + async def get_creator_all_dynamics(self, creator_info: Dict, crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_count: int = 20) -> List: + """ + get creator all followings + :param creator_info: + :param crawl_interval: + :param callback: + :param max_count: 一个up主爬取的最大动态数量 + + :return: up主关注者列表 + """ + creator_id = creator_info["id"] + result = [] + offset = "" + has_more = True + while has_more and len(result) < max_count: + dynamics_res = await self.get_creator_dynamics(creator_id, offset) + dynamics_list: List[Dict] = dynamics_res["items"] + has_more = dynamics_res["has_more"] + offset = dynamics_res["offset"] + if len(result) + len(dynamics_list) > max_count: + dynamics_list = dynamics_list[:max_count - len(result)] + if callback: + await callback(creator_info, dynamics_list) + await asyncio.sleep(crawl_interval) + result.extend(dynamics_list) + return result diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index 5c7949a..1836ba2 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -89,8 +89,11 @@ class BilibiliCrawler(AbstractCrawler): # Get the information and comments of the specified post await self.get_specified_videos(config.BILI_SPECIFIED_ID_LIST) elif config.CRAWLER_TYPE == "creator": - for creator_id in config.BILI_CREATOR_ID_LIST: - await self.get_creator_videos(int(creator_id)) + if config.CREATOR_MODE: + for creator_id in config.BILI_CREATOR_ID_LIST: + await self.get_creator_videos(int(creator_id)) + else: + await self.get_all_creator_details(config.BILI_CREATOR_ID_LIST) else: pass utils.logger.info( @@ -125,7 +128,7 @@ class BilibiliCrawler(AbstractCrawler): end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second # 将其重新转换为时间戳 return str(int(start_day.timestamp())), str(int(end_day.timestamp())) - + async def search(self): """ search bilibili video with keywords @@ -466,3 +469,121 @@ class BilibiliCrawler(AbstractCrawler): extension_file_name = f"video.mp4" await bilibili_store.store_video(aid, content, extension_file_name) + async def get_all_creator_details(self, creator_id_list: List[int]): + """ + creator_id_list: get details for creator from creator_id_list + """ + utils.logger.info( + f"[BilibiliCrawler.get_creator_details] Crawling the detalis of creator") + utils.logger.info( + f"[BilibiliCrawler.get_creator_details] creator ids:{creator_id_list}") + + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list: List[Task] = [] + try: + for creator_id in creator_id_list: + task = asyncio.create_task(self.get_creator_details( + creator_id, semaphore), name=creator_id) + task_list.append(task) + except Exception as e: + utils.logger.warning( + f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}") + + await asyncio.gather(*task_list) + + async def get_creator_details(self, creator_id: int, semaphore: asyncio.Semaphore): + """ + get details for creator id + :param creator_id: + :param semaphore: + :return: + """ + async with semaphore: + creator_unhandled_info: Dict = await self.bili_client.get_creator_info(creator_id) + creator_info: Dict = { + "id": creator_id, + "name": creator_unhandled_info.get("name"), + "sign": creator_unhandled_info.get("sign"), + "avatar": creator_unhandled_info.get("face"), + } + await self.get_fans(creator_info, semaphore) + await self.get_followings(creator_info, semaphore) + await self.get_dynamics(creator_info, semaphore) + + async def get_fans(self, creator_info: Dict, semaphore: asyncio.Semaphore): + """ + get fans for creator id + :param creator_info: + :param semaphore: + :return: + """ + creator_id = creator_info["id"] + async with semaphore: + try: + utils.logger.info( + f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans ...") + await self.bili_client.get_creator_all_fans( + creator_info=creator_info, + crawl_interval=random.random(), + callback=bilibili_store.batch_update_bilibili_creator_fans, + max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES, + ) + + except DataFetchError as ex: + utils.logger.error( + f"[BilibiliCrawler.get_fans] get creator_id: {creator_id} fans error: {ex}") + except Exception as e: + utils.logger.error( + f"[BilibiliCrawler.get_fans] may be been blocked, err:{e}") + + async def get_followings(self, creator_info: Dict, semaphore: asyncio.Semaphore): + """ + get followings for creator id + :param creator_info: + :param semaphore: + :return: + """ + creator_id = creator_info["id"] + async with semaphore: + try: + utils.logger.info( + f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings ...") + await self.bili_client.get_creator_all_followings( + creator_info=creator_info, + crawl_interval=random.random(), + callback=bilibili_store.batch_update_bilibili_creator_followings, + max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES, + ) + + except DataFetchError as ex: + utils.logger.error( + f"[BilibiliCrawler.get_followings] get creator_id: {creator_id} followings error: {ex}") + except Exception as e: + utils.logger.error( + f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}") + + async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore): + """ + get dynamics for creator id + :param creator_info: + :param semaphore: + :return: + """ + creator_id = creator_info["id"] + async with semaphore: + try: + utils.logger.info( + f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ...") + await self.bili_client.get_creator_all_dynamics( + creator_info=creator_info, + crawl_interval=random.random(), + callback=bilibili_store.batch_update_bilibili_creator_dynamics, + max_count=config.CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES, + ) + + except DataFetchError as ex: + utils.logger.error( + f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}") + except Exception as e: + utils.logger.error( + f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}") diff --git a/schema/tables.sql b/schema/tables.sql index 7e9a9b3..69d00cc 100644 --- a/schema/tables.sql +++ b/schema/tables.sql @@ -76,6 +76,50 @@ CREATE TABLE `bilibili_up_info` KEY `idx_bilibili_vi_user_123456` (`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站UP主信息'; +-- ---------------------------- +-- Table structure for bilibili_contact_info +-- ---------------------------- +DROP TABLE IF EXISTS `bilibili_contact_info`; +CREATE TABLE `bilibili_contact_info` +( + `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `up_id` varchar(64) DEFAULT NULL COMMENT 'up主ID', + `fan_id` varchar(64) DEFAULT NULL COMMENT '粉丝ID', + `up_name` varchar(64) DEFAULT NULL COMMENT 'up主昵称', + `fan_name` varchar(64) DEFAULT NULL COMMENT '粉丝昵称', + `up_sign` longtext DEFAULT NULL COMMENT 'up主签名', + `fan_sign` longtext DEFAULT NULL COMMENT '粉丝签名', + `up_avatar` varchar(255) DEFAULT NULL COMMENT 'up主头像地址', + `fan_avatar` varchar(255) DEFAULT NULL COMMENT '粉丝头像地址', + `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', + `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', + PRIMARY KEY (`id`), + KEY `idx_bilibili_contact_info_up_id` (`up_id`), + KEY `idx_bilibili_contact_info_fan_id` (`fan_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站联系人信息'; + +-- ---------------------------- +-- Table structure for bilibili_up_dynamic +-- ---------------------------- +DROP TABLE IF EXISTS `bilibili_up_dynamic`; +CREATE TABLE `bilibili_up_dynamic` +( + `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `dynamic_id` varchar(64) DEFAULT NULL COMMENT '动态ID', + `user_id` varchar(64) DEFAULT NULL COMMENT '用户ID', + `user_name` varchar(64) DEFAULT NULL COMMENT '用户名', + `text` longtext DEFAULT NULL COMMENT '动态文本', + `type` varchar(64) DEFAULT NULL COMMENT '动态类型', + `pub_ts` bigint DEFAULT NULL COMMENT '动态发布时间', + `total_comments` bigint DEFAULT NULL COMMENT '评论数', + `total_forwards` bigint DEFAULT NULL COMMENT '转发数', + `total_liked` bigint DEFAULT NULL COMMENT '点赞数', + `add_ts` bigint NOT NULL COMMENT '记录添加时间戳', + `last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳', + PRIMARY KEY (`id`), + KEY `idx_bilibili_up_dynamic_dynamic_id` (`dynamic_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站up主动态信息'; + -- ---------------------------- -- Table structure for douyin_aweme -- ---------------------------- @@ -463,7 +507,7 @@ CREATE TABLE `tieba_creator` PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='贴吧创作者'; - +DROP TABLE IF EXISTS `zhihu_content`; CREATE TABLE `zhihu_content` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `content_id` varchar(64) NOT NULL COMMENT '内容ID', @@ -491,7 +535,7 @@ CREATE TABLE `zhihu_content` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎内容(回答、文章、视频)'; - +DROP TABLE IF EXISTS `zhihu_comment`; CREATE TABLE `zhihu_comment` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `comment_id` varchar(64) NOT NULL COMMENT '评论ID', @@ -516,7 +560,7 @@ CREATE TABLE `zhihu_comment` ( KEY `idx_zhihu_comment_publish_time` (`publish_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎评论'; - +DROP TABLE IF EXISTS `zhihu_creator`; CREATE TABLE `zhihu_creator` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID', `user_id` varchar(64) NOT NULL COMMENT '用户ID', diff --git a/store/bilibili/__init__.py b/store/bilibili/__init__.py index dcffa88..3183358 100644 --- a/store/bilibili/__init__.py +++ b/store/bilibili/__init__.py @@ -71,25 +71,25 @@ async def update_bilibili_video(video_item: Dict): await BiliStoreFactory.create_store().store_content(content_item=save_content_item) -async def update_up_info(video_item: Dict): +async def update_up_info(video_item: Dict): video_item_card_list: Dict = video_item.get("Card") - video_item_card: Dict = video_item_card_list.get("card") + video_item_card: Dict = video_item_card_list.get("card") saver_up_info = { - "user_id": str(video_item_card.get("mid")), - "nickname": video_item_card.get("name"), + "user_id": str(video_item_card.get("mid")), + "nickname": video_item_card.get("name"), "sex": video_item_card.get("sex"), "sign": video_item_card.get("sign"), - "avatar": video_item_card.get("face"), - "last_modify_ts": utils.get_current_timestamp(), - "total_fans": video_item_card.get("fans"), - "total_liked": video_item_card_list.get("like_num"), - "user_rank": video_item_card.get("level_info").get("current_level"), - "is_official": video_item_card.get("official_verify").get("type"), + "avatar": video_item_card.get("face"), + "last_modify_ts": utils.get_current_timestamp(), + "total_fans": video_item_card.get("fans"), + "total_liked": video_item_card_list.get("like_num"), + "user_rank": video_item_card.get("level_info").get("current_level"), + "is_official": video_item_card.get("official_verify").get("type"), } utils.logger.info( f"[store.bilibili.update_up_info] bilibili user_id:{video_item_card.get('mid')}") await BiliStoreFactory.create_store().store_creator(creator=saver_up_info) - + async def batch_update_bilibili_video_comments(video_id: str, comments: List[Dict]): if not comments: @@ -132,3 +132,88 @@ async def store_video(aid, video_content, extension_file_name): """ await BilibiliVideo().store_video( {"aid": aid, "video_content": video_content, "extension_file_name": extension_file_name}) + + +async def batch_update_bilibili_creator_fans(creator_info: Dict, fans_list: List[Dict]): + if not fans_list: + return + for fan_item in fans_list: + fan_info: Dict = { + "id": fan_item.get("mid"), + "name": fan_item.get("uname"), + "sign": fan_item.get("sign"), + "avatar": fan_item.get("face"), + } + await update_bilibili_creator_contact(creator_info=creator_info, fan_info=fan_info) + + +async def batch_update_bilibili_creator_followings(creator_info: Dict, followings_list: List[Dict]): + if not followings_list: + return + for following_item in followings_list: + following_info: Dict = { + "id": following_item.get("mid"), + "name": following_item.get("uname"), + "sign": following_item.get("sign"), + "avatar": following_item.get("face"), + } + await update_bilibili_creator_contact(creator_info=following_info, fan_info=creator_info) + + +async def batch_update_bilibili_creator_dynamics(creator_info: Dict, dynamics_list: List[Dict]): + if not dynamics_list: + return + for dynamic_item in dynamics_list: + dynamic_id: str = dynamic_item["id_str"] + dynamic_text: str = "" + if dynamic_item["modules"]["module_dynamic"].get("desc"): + dynamic_text = dynamic_item["modules"]["module_dynamic"]["desc"]["text"] + dynamic_type: str = dynamic_item["type"].split("_")[-1] + dynamic_pub_ts: str = dynamic_item["modules"]["module_author"]["pub_ts"] + dynamic_stat: Dict = dynamic_item["modules"]["module_stat"] + dynamic_comment: int = dynamic_stat["comment"]["count"] + dynamic_forward: int = dynamic_stat["forward"]["count"] + dynamic_like: int = dynamic_stat["like"]["count"] + dynamic_info: Dict = { + "dynamic_id": dynamic_id, + "text": dynamic_text, + "type": dynamic_type, + "pub_ts": dynamic_pub_ts, + "total_comments": dynamic_comment, + "total_forwards": dynamic_forward, + "total_liked": dynamic_like, + } + await update_bilibili_creator_dynamic(creator_info=creator_info, dynamic_info=dynamic_info) + + +async def update_bilibili_creator_contact(creator_info: Dict, fan_info: Dict): + save_contact_item = { + "up_id": creator_info["id"], + "fan_id": fan_info["id"], + "up_name": creator_info["name"], + "fan_name": fan_info["name"], + "up_sign": creator_info["sign"], + "fan_sign": fan_info["sign"], + "up_avatar": creator_info["avatar"], + "fan_avatar": fan_info["avatar"], + "last_modify_ts": utils.get_current_timestamp(), + } + + await BiliStoreFactory.create_store().store_contact(contact_item=save_contact_item) + + +async def update_bilibili_creator_dynamic(creator_info: Dict, dynamic_info: Dict): + save_dynamic_item = { + "dynamic_id": dynamic_info["dynamic_id"], + "user_id": creator_info["id"], + "user_name": creator_info["name"], + "text": dynamic_info["text"], + "type": dynamic_info["type"], + "pub_ts": dynamic_info["pub_ts"], + "total_comments": dynamic_info["total_comments"], + "total_forwards": dynamic_info["total_forwards"], + "total_liked": dynamic_info["total_liked"], + "last_modify_ts": utils.get_current_timestamp(), + } + + await BiliStoreFactory.create_store().store_dynamic(dynamic_item=save_dynamic_item) diff --git a/store/bilibili/bilibili_store_impl.py b/store/bilibili/bilibili_store_impl.py index 677026d..0fa1504 100644 --- a/store/bilibili/bilibili_store_impl.py +++ b/store/bilibili/bilibili_store_impl.py @@ -107,6 +107,30 @@ class BiliCsvStoreImplement(AbstractStore): """ await self.save_data_to_csv(save_item=creator, store_type="creators") + async def store_contact(self, contact_item: Dict): + """ + Bilibili contact CSV storage implementation + Args: + contact_item: creator's contact item dict + + Returns: + + """ + + await self.save_data_to_csv(save_item=contact_item, store_type="contacts") + + async def store_dynamic(self, dynamic_item: Dict): + """ + Bilibili dynamic CSV storage implementation + Args: + dynamic_item: creator's dynamic item dict + + Returns: + + """ + + await self.save_data_to_csv(save_item=dynamic_item, store_type="dynamics") + class BiliDbStoreImplement(AbstractStore): async def store_content(self, content_item: Dict): @@ -172,6 +196,52 @@ class BiliDbStoreImplement(AbstractStore): else: await update_creator_by_creator_id(creator_id,creator_item=creator) + async def store_contact(self, contact_item: Dict): + """ + Bilibili contact DB storage implementation + Args: + contact_item: contact item dict + + Returns: + + """ + + from .bilibili_store_sql import (add_new_contact, + query_contact_by_up_and_fan, + update_contact_by_id, ) + + up_id = contact_item.get("up_id") + fan_id = contact_item.get("fan_id") + contact_detail: Dict = await query_contact_by_up_and_fan(up_id=up_id, fan_id=fan_id) + if not contact_detail: + contact_item["add_ts"] = utils.get_current_timestamp() + await add_new_contact(contact_item) + else: + key_id = contact_detail.get("id") + await update_contact_by_id(id=key_id, contact_item=contact_item) + + async def store_dynamic(self, dynamic_item): + """ + Bilibili dynamic DB storage implementation + Args: + dynamic_item: dynamic item dict + + Returns: + + """ + + from .bilibili_store_sql import (add_new_dynamic, + query_dynamic_by_dynamic_id, + update_dynamic_by_dynamic_id) + + dynamic_id = dynamic_item.get("dynamic_id") + dynamic_detail = await query_dynamic_by_dynamic_id(dynamic_id=dynamic_id) + if not dynamic_detail: + dynamic_item["add_ts"] = utils.get_current_timestamp() + await add_new_dynamic(dynamic_item) + else: + await update_dynamic_by_dynamic_id(dynamic_id, dynamic_item=dynamic_item) + class BiliJsonStoreImplement(AbstractStore): json_store_path: str = "data/bilibili/json" @@ -258,3 +328,27 @@ class BiliJsonStoreImplement(AbstractStore): """ await self.save_data_to_json(creator, "creators") + + async def store_contact(self, contact_item: Dict): + """ + creator contact JSON storage implementation + Args: + contact_item: creator's contact item dict + + Returns: + + """ + + await self.save_data_to_json(save_item=contact_item, store_type="contacts") + + async def store_dynamic(self, dynamic_item: Dict): + """ + creator dynamic JSON storage implementation + Args: + dynamic_item: creator's contact item dict + + Returns: + + """ + + await self.save_data_to_json(save_item=dynamic_item, store_type="dynamics") diff --git a/store/bilibili/bilibili_store_sql.py b/store/bilibili/bilibili_store_sql.py index 5e6356f..02b146c 100644 --- a/store/bilibili/bilibili_store_sql.py +++ b/store/bilibili/bilibili_store_sql.py @@ -158,3 +158,95 @@ async def update_creator_by_creator_id(creator_id: str, creator_item: Dict) -> i effect_row: int = await async_db_conn.update_table("bilibili_up_info", creator_item, "user_id", creator_id) return effect_row + +async def query_contact_by_up_and_fan(up_id: str, fan_id: str) -> Dict: + """ + 查询一条关联关系 + Args: + up_id: + fan_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from bilibili_contact_info where up_id = '{up_id}' and fan_id = '{fan_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_contact(contact_item: Dict) -> int: + """ + 新增关联关系 + Args: + contact_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("bilibili_contact_info", contact_item) + return last_row_id + + +async def update_contact_by_id(id: str, contact_item: Dict) -> int: + """ + 更新关联关系 + Args: + id: + contact_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("bilibili_contact_info", contact_item, "id", id) + return effect_row + + +async def query_dynamic_by_dynamic_id(dynamic_id: str) -> Dict: + """ + 查询一条动态信息 + Args: + dynamic_id: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + sql: str = f"select * from bilibili_up_dynamic where dynamic_id = '{dynamic_id}'" + rows: List[Dict] = await async_db_conn.query(sql) + if len(rows) > 0: + return rows[0] + return dict() + + +async def add_new_dynamic(dynamic_item: Dict) -> int: + """ + 新增动态信息 + Args: + dynamic_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + last_row_id: int = await async_db_conn.item_to_table("bilibili_up_dynamic", dynamic_item) + return last_row_id + + +async def update_dynamic_by_dynamic_id(dynamic_id: str, dynamic_item: Dict) -> int: + """ + 更新动态信息 + Args: + dynamic_id: + dynamic_item: + + Returns: + + """ + async_db_conn: AsyncMysqlDB = media_crawler_db_var.get() + effect_row: int = await async_db_conn.update_table("bilibili_up_dynamic", dynamic_item, "dynamic_id", dynamic_id) + return effect_row