mirror of
https://github.com/NanmiCoder/MediaCrawler.git
synced 2026-02-27 18:50:45 +08:00
@@ -193,4 +193,17 @@ END_DAY = '2024-01-01'
|
||||
# 是否开启按每一天进行爬取的选项,仅支持 bilibili 关键字搜索
|
||||
# 若为 False,则忽略 START_DAY 与 END_DAY 设置的值
|
||||
# 若为 True,则按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频
|
||||
ALL_DAY = False
|
||||
ALL_DAY = False
|
||||
|
||||
#!!! 下面仅支持 bilibili creator搜索
|
||||
# 爬取评论creator主页还是爬取creator动态和关系列表(True为前者)
|
||||
CREATOR_MODE = True
|
||||
|
||||
# 爬取creator粉丝列表时起始爬取页数
|
||||
START_CONTACTS_PAGE = 1
|
||||
|
||||
# 爬取作者粉丝和关注列表数量控制(单作者)
|
||||
CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100
|
||||
|
||||
# 爬取作者动态数量控制(单作者)
|
||||
CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50
|
||||
@@ -21,6 +21,7 @@ from urllib.parse import urlencode
|
||||
import httpx
|
||||
from playwright.async_api import BrowserContext, Page
|
||||
|
||||
import config
|
||||
from base.base_crawler import AbstractApiClient
|
||||
from tools import utils
|
||||
|
||||
@@ -337,3 +338,162 @@ class BilibiliClient(AbstractApiClient):
|
||||
"order": order_mode,
|
||||
}
|
||||
return await self.get(uri, post_data)
|
||||
|
||||
async def get_creator_info(self, creator_id: int) -> Dict:
|
||||
"""
|
||||
get creator info
|
||||
:param creator_id: 作者 ID
|
||||
"""
|
||||
uri = "/x/space/wbi/acc/info"
|
||||
post_data = {
|
||||
"mid": creator_id,
|
||||
}
|
||||
return await self.get(uri, post_data)
|
||||
|
||||
async def get_creator_fans(self,
|
||||
creator_id: int,
|
||||
pn: int,
|
||||
ps: int = 24,
|
||||
) -> Dict:
|
||||
"""
|
||||
get creator fans
|
||||
:param creator_id: 创作者 ID
|
||||
:param pn: 开始页数
|
||||
:param ps: 每页数量
|
||||
:return:
|
||||
"""
|
||||
uri = "/x/relation/fans"
|
||||
post_data = {
|
||||
'vmid': creator_id,
|
||||
"pn": pn,
|
||||
"ps": ps,
|
||||
"gaia_source": "main_web",
|
||||
|
||||
}
|
||||
return await self.get(uri, post_data)
|
||||
|
||||
async def get_creator_followings(self,
|
||||
creator_id: int,
|
||||
pn: int,
|
||||
ps: int = 24,
|
||||
) -> Dict:
|
||||
"""
|
||||
get creator followings
|
||||
:param creator_id: 创作者 ID
|
||||
:param pn: 开始页数
|
||||
:param ps: 每页数量
|
||||
:return:
|
||||
"""
|
||||
uri = "/x/relation/followings"
|
||||
post_data = {
|
||||
"vmid": creator_id,
|
||||
"pn": pn,
|
||||
"ps": ps,
|
||||
"gaia_source": "main_web",
|
||||
}
|
||||
return await self.get(uri, post_data)
|
||||
|
||||
async def get_creator_dynamics(self, creator_id: int, offset: str = ""):
|
||||
"""
|
||||
get creator comments
|
||||
:param creator_id: 创作者 ID
|
||||
:param offset: 发送请求所需参数
|
||||
:return:
|
||||
"""
|
||||
uri = "/x/polymer/web-dynamic/v1/feed/space"
|
||||
post_data = {
|
||||
"offset": offset,
|
||||
"host_mid": creator_id,
|
||||
"platform": "web",
|
||||
}
|
||||
|
||||
return await self.get(uri, post_data)
|
||||
|
||||
async def get_creator_all_fans(self, creator_info: Dict, crawl_interval: float = 1.0,
|
||||
callback: Optional[Callable] = None,
|
||||
max_count: int = 100) -> List:
|
||||
"""
|
||||
get creator all fans
|
||||
:param creator_info:
|
||||
:param crawl_interval:
|
||||
:param callback:
|
||||
:param max_count: 一个up主爬取的最大粉丝数量
|
||||
|
||||
:return: up主粉丝数列表
|
||||
"""
|
||||
creator_id = creator_info["id"]
|
||||
result = []
|
||||
pn = config.START_CONTACTS_PAGE
|
||||
while len(result) < max_count:
|
||||
fans_res: Dict = await self.get_creator_fans(creator_id, pn=pn)
|
||||
fans_list: List[Dict] = fans_res.get("list", [])
|
||||
|
||||
pn += 1
|
||||
if len(result) + len(fans_list) > max_count:
|
||||
fans_list = fans_list[:max_count - len(result)]
|
||||
if callback: # 如果有回调函数,就执行回调函数
|
||||
await callback(creator_info, fans_list)
|
||||
await asyncio.sleep(crawl_interval)
|
||||
if not fans_list:
|
||||
break
|
||||
result.extend(fans_list)
|
||||
return result
|
||||
|
||||
async def get_creator_all_followings(self, creator_info: Dict, crawl_interval: float = 1.0,
|
||||
callback: Optional[Callable] = None,
|
||||
max_count: int = 100) -> List:
|
||||
"""
|
||||
get creator all followings
|
||||
:param creator_info:
|
||||
:param crawl_interval:
|
||||
:param callback:
|
||||
:param max_count: 一个up主爬取的最大关注者数量
|
||||
|
||||
:return: up主关注者列表
|
||||
"""
|
||||
creator_id = creator_info["id"]
|
||||
result = []
|
||||
pn = config.START_CONTACTS_PAGE
|
||||
while len(result) < max_count:
|
||||
followings_res: Dict = await self.get_creator_followings(creator_id, pn=pn)
|
||||
followings_list: List[Dict] = followings_res.get("list", [])
|
||||
|
||||
pn += 1
|
||||
if len(result) + len(followings_list) > max_count:
|
||||
followings_list = followings_list[:max_count - len(result)]
|
||||
if callback: # 如果有回调函数,就执行回调函数
|
||||
await callback(creator_info, followings_list)
|
||||
await asyncio.sleep(crawl_interval)
|
||||
if not followings_list:
|
||||
break
|
||||
result.extend(followings_list)
|
||||
return result
|
||||
|
||||
async def get_creator_all_dynamics(self, creator_info: Dict, crawl_interval: float = 1.0,
|
||||
callback: Optional[Callable] = None,
|
||||
max_count: int = 20) -> List:
|
||||
"""
|
||||
get creator all followings
|
||||
:param creator_info:
|
||||
:param crawl_interval:
|
||||
:param callback:
|
||||
:param max_count: 一个up主爬取的最大动态数量
|
||||
|
||||
:return: up主关注者列表
|
||||
"""
|
||||
creator_id = creator_info["id"]
|
||||
result = []
|
||||
offset = ""
|
||||
has_more = True
|
||||
while has_more and len(result) < max_count:
|
||||
dynamics_res = await self.get_creator_dynamics(creator_id, offset)
|
||||
dynamics_list: List[Dict] = dynamics_res["items"]
|
||||
has_more = dynamics_res["has_more"]
|
||||
offset = dynamics_res["offset"]
|
||||
if len(result) + len(dynamics_list) > max_count:
|
||||
dynamics_list = dynamics_list[:max_count - len(result)]
|
||||
if callback:
|
||||
await callback(creator_info, dynamics_list)
|
||||
await asyncio.sleep(crawl_interval)
|
||||
result.extend(dynamics_list)
|
||||
return result
|
||||
|
||||
@@ -89,8 +89,11 @@ class BilibiliCrawler(AbstractCrawler):
|
||||
# Get the information and comments of the specified post
|
||||
await self.get_specified_videos(config.BILI_SPECIFIED_ID_LIST)
|
||||
elif config.CRAWLER_TYPE == "creator":
|
||||
for creator_id in config.BILI_CREATOR_ID_LIST:
|
||||
await self.get_creator_videos(int(creator_id))
|
||||
if config.CREATOR_MODE:
|
||||
for creator_id in config.BILI_CREATOR_ID_LIST:
|
||||
await self.get_creator_videos(int(creator_id))
|
||||
else:
|
||||
await self.get_all_creator_details(config.BILI_CREATOR_ID_LIST)
|
||||
else:
|
||||
pass
|
||||
utils.logger.info(
|
||||
@@ -125,7 +128,7 @@ class BilibiliCrawler(AbstractCrawler):
|
||||
end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second
|
||||
# 将其重新转换为时间戳
|
||||
return str(int(start_day.timestamp())), str(int(end_day.timestamp()))
|
||||
|
||||
|
||||
async def search(self):
|
||||
"""
|
||||
search bilibili video with keywords
|
||||
@@ -466,3 +469,121 @@ class BilibiliCrawler(AbstractCrawler):
|
||||
extension_file_name = f"video.mp4"
|
||||
await bilibili_store.store_video(aid, content, extension_file_name)
|
||||
|
||||
async def get_all_creator_details(self, creator_id_list: List[int]):
|
||||
"""
|
||||
creator_id_list: get details for creator from creator_id_list
|
||||
"""
|
||||
utils.logger.info(
|
||||
f"[BilibiliCrawler.get_creator_details] Crawling the detalis of creator")
|
||||
utils.logger.info(
|
||||
f"[BilibiliCrawler.get_creator_details] creator ids:{creator_id_list}")
|
||||
|
||||
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
|
||||
task_list: List[Task] = []
|
||||
try:
|
||||
for creator_id in creator_id_list:
|
||||
task = asyncio.create_task(self.get_creator_details(
|
||||
creator_id, semaphore), name=creator_id)
|
||||
task_list.append(task)
|
||||
except Exception as e:
|
||||
utils.logger.warning(
|
||||
f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}")
|
||||
|
||||
await asyncio.gather(*task_list)
|
||||
|
||||
async def get_creator_details(self, creator_id: int, semaphore: asyncio.Semaphore):
|
||||
"""
|
||||
get details for creator id
|
||||
:param creator_id:
|
||||
:param semaphore:
|
||||
:return:
|
||||
"""
|
||||
async with semaphore:
|
||||
creator_unhandled_info: Dict = await self.bili_client.get_creator_info(creator_id)
|
||||
creator_info: Dict = {
|
||||
"id": creator_id,
|
||||
"name": creator_unhandled_info.get("name"),
|
||||
"sign": creator_unhandled_info.get("sign"),
|
||||
"avatar": creator_unhandled_info.get("face"),
|
||||
}
|
||||
await self.get_fans(creator_info, semaphore)
|
||||
await self.get_followings(creator_info, semaphore)
|
||||
await self.get_dynamics(creator_info, semaphore)
|
||||
|
||||
async def get_fans(self, creator_info: Dict, semaphore: asyncio.Semaphore):
|
||||
"""
|
||||
get fans for creator id
|
||||
:param creator_info:
|
||||
:param semaphore:
|
||||
:return:
|
||||
"""
|
||||
creator_id = creator_info["id"]
|
||||
async with semaphore:
|
||||
try:
|
||||
utils.logger.info(
|
||||
f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans ...")
|
||||
await self.bili_client.get_creator_all_fans(
|
||||
creator_info=creator_info,
|
||||
crawl_interval=random.random(),
|
||||
callback=bilibili_store.batch_update_bilibili_creator_fans,
|
||||
max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES,
|
||||
)
|
||||
|
||||
except DataFetchError as ex:
|
||||
utils.logger.error(
|
||||
f"[BilibiliCrawler.get_fans] get creator_id: {creator_id} fans error: {ex}")
|
||||
except Exception as e:
|
||||
utils.logger.error(
|
||||
f"[BilibiliCrawler.get_fans] may be been blocked, err:{e}")
|
||||
|
||||
async def get_followings(self, creator_info: Dict, semaphore: asyncio.Semaphore):
|
||||
"""
|
||||
get followings for creator id
|
||||
:param creator_info:
|
||||
:param semaphore:
|
||||
:return:
|
||||
"""
|
||||
creator_id = creator_info["id"]
|
||||
async with semaphore:
|
||||
try:
|
||||
utils.logger.info(
|
||||
f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings ...")
|
||||
await self.bili_client.get_creator_all_followings(
|
||||
creator_info=creator_info,
|
||||
crawl_interval=random.random(),
|
||||
callback=bilibili_store.batch_update_bilibili_creator_followings,
|
||||
max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES,
|
||||
)
|
||||
|
||||
except DataFetchError as ex:
|
||||
utils.logger.error(
|
||||
f"[BilibiliCrawler.get_followings] get creator_id: {creator_id} followings error: {ex}")
|
||||
except Exception as e:
|
||||
utils.logger.error(
|
||||
f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}")
|
||||
|
||||
async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore):
|
||||
"""
|
||||
get dynamics for creator id
|
||||
:param creator_info:
|
||||
:param semaphore:
|
||||
:return:
|
||||
"""
|
||||
creator_id = creator_info["id"]
|
||||
async with semaphore:
|
||||
try:
|
||||
utils.logger.info(
|
||||
f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ...")
|
||||
await self.bili_client.get_creator_all_dynamics(
|
||||
creator_info=creator_info,
|
||||
crawl_interval=random.random(),
|
||||
callback=bilibili_store.batch_update_bilibili_creator_dynamics,
|
||||
max_count=config.CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES,
|
||||
)
|
||||
|
||||
except DataFetchError as ex:
|
||||
utils.logger.error(
|
||||
f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}")
|
||||
except Exception as e:
|
||||
utils.logger.error(
|
||||
f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}")
|
||||
|
||||
@@ -76,6 +76,50 @@ CREATE TABLE `bilibili_up_info`
|
||||
KEY `idx_bilibili_vi_user_123456` (`user_id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站UP主信息';
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for bilibili_contact_info
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS `bilibili_contact_info`;
|
||||
CREATE TABLE `bilibili_contact_info`
|
||||
(
|
||||
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
|
||||
`up_id` varchar(64) DEFAULT NULL COMMENT 'up主ID',
|
||||
`fan_id` varchar(64) DEFAULT NULL COMMENT '粉丝ID',
|
||||
`up_name` varchar(64) DEFAULT NULL COMMENT 'up主昵称',
|
||||
`fan_name` varchar(64) DEFAULT NULL COMMENT '粉丝昵称',
|
||||
`up_sign` longtext DEFAULT NULL COMMENT 'up主签名',
|
||||
`fan_sign` longtext DEFAULT NULL COMMENT '粉丝签名',
|
||||
`up_avatar` varchar(255) DEFAULT NULL COMMENT 'up主头像地址',
|
||||
`fan_avatar` varchar(255) DEFAULT NULL COMMENT '粉丝头像地址',
|
||||
`add_ts` bigint NOT NULL COMMENT '记录添加时间戳',
|
||||
`last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_bilibili_contact_info_up_id` (`up_id`),
|
||||
KEY `idx_bilibili_contact_info_fan_id` (`fan_id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站联系人信息';
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for bilibili_up_dynamic
|
||||
-- ----------------------------
|
||||
DROP TABLE IF EXISTS `bilibili_up_dynamic`;
|
||||
CREATE TABLE `bilibili_up_dynamic`
|
||||
(
|
||||
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
|
||||
`dynamic_id` varchar(64) DEFAULT NULL COMMENT '动态ID',
|
||||
`user_id` varchar(64) DEFAULT NULL COMMENT '用户ID',
|
||||
`user_name` varchar(64) DEFAULT NULL COMMENT '用户名',
|
||||
`text` longtext DEFAULT NULL COMMENT '动态文本',
|
||||
`type` varchar(64) DEFAULT NULL COMMENT '动态类型',
|
||||
`pub_ts` bigint DEFAULT NULL COMMENT '动态发布时间',
|
||||
`total_comments` bigint DEFAULT NULL COMMENT '评论数',
|
||||
`total_forwards` bigint DEFAULT NULL COMMENT '转发数',
|
||||
`total_liked` bigint DEFAULT NULL COMMENT '点赞数',
|
||||
`add_ts` bigint NOT NULL COMMENT '记录添加时间戳',
|
||||
`last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_bilibili_up_dynamic_dynamic_id` (`dynamic_id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站up主动态信息';
|
||||
|
||||
-- ----------------------------
|
||||
-- Table structure for douyin_aweme
|
||||
-- ----------------------------
|
||||
@@ -463,7 +507,7 @@ CREATE TABLE `tieba_creator`
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='贴吧创作者';
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS `zhihu_content`;
|
||||
CREATE TABLE `zhihu_content` (
|
||||
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
|
||||
`content_id` varchar(64) NOT NULL COMMENT '内容ID',
|
||||
@@ -491,7 +535,7 @@ CREATE TABLE `zhihu_content` (
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎内容(回答、文章、视频)';
|
||||
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS `zhihu_comment`;
|
||||
CREATE TABLE `zhihu_comment` (
|
||||
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
|
||||
`comment_id` varchar(64) NOT NULL COMMENT '评论ID',
|
||||
@@ -516,7 +560,7 @@ CREATE TABLE `zhihu_comment` (
|
||||
KEY `idx_zhihu_comment_publish_time` (`publish_time`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎评论';
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS `zhihu_creator`;
|
||||
CREATE TABLE `zhihu_creator` (
|
||||
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
|
||||
`user_id` varchar(64) NOT NULL COMMENT '用户ID',
|
||||
|
||||
@@ -71,25 +71,25 @@ async def update_bilibili_video(video_item: Dict):
|
||||
await BiliStoreFactory.create_store().store_content(content_item=save_content_item)
|
||||
|
||||
|
||||
async def update_up_info(video_item: Dict):
|
||||
async def update_up_info(video_item: Dict):
|
||||
video_item_card_list: Dict = video_item.get("Card")
|
||||
video_item_card: Dict = video_item_card_list.get("card")
|
||||
video_item_card: Dict = video_item_card_list.get("card")
|
||||
saver_up_info = {
|
||||
"user_id": str(video_item_card.get("mid")),
|
||||
"nickname": video_item_card.get("name"),
|
||||
"user_id": str(video_item_card.get("mid")),
|
||||
"nickname": video_item_card.get("name"),
|
||||
"sex": video_item_card.get("sex"),
|
||||
"sign": video_item_card.get("sign"),
|
||||
"avatar": video_item_card.get("face"),
|
||||
"last_modify_ts": utils.get_current_timestamp(),
|
||||
"total_fans": video_item_card.get("fans"),
|
||||
"total_liked": video_item_card_list.get("like_num"),
|
||||
"user_rank": video_item_card.get("level_info").get("current_level"),
|
||||
"is_official": video_item_card.get("official_verify").get("type"),
|
||||
"avatar": video_item_card.get("face"),
|
||||
"last_modify_ts": utils.get_current_timestamp(),
|
||||
"total_fans": video_item_card.get("fans"),
|
||||
"total_liked": video_item_card_list.get("like_num"),
|
||||
"user_rank": video_item_card.get("level_info").get("current_level"),
|
||||
"is_official": video_item_card.get("official_verify").get("type"),
|
||||
}
|
||||
utils.logger.info(
|
||||
f"[store.bilibili.update_up_info] bilibili user_id:{video_item_card.get('mid')}")
|
||||
await BiliStoreFactory.create_store().store_creator(creator=saver_up_info)
|
||||
|
||||
|
||||
|
||||
async def batch_update_bilibili_video_comments(video_id: str, comments: List[Dict]):
|
||||
if not comments:
|
||||
@@ -132,3 +132,88 @@ async def store_video(aid, video_content, extension_file_name):
|
||||
"""
|
||||
await BilibiliVideo().store_video(
|
||||
{"aid": aid, "video_content": video_content, "extension_file_name": extension_file_name})
|
||||
|
||||
|
||||
async def batch_update_bilibili_creator_fans(creator_info: Dict, fans_list: List[Dict]):
|
||||
if not fans_list:
|
||||
return
|
||||
for fan_item in fans_list:
|
||||
fan_info: Dict = {
|
||||
"id": fan_item.get("mid"),
|
||||
"name": fan_item.get("uname"),
|
||||
"sign": fan_item.get("sign"),
|
||||
"avatar": fan_item.get("face"),
|
||||
}
|
||||
await update_bilibili_creator_contact(creator_info=creator_info, fan_info=fan_info)
|
||||
|
||||
|
||||
async def batch_update_bilibili_creator_followings(creator_info: Dict, followings_list: List[Dict]):
|
||||
if not followings_list:
|
||||
return
|
||||
for following_item in followings_list:
|
||||
following_info: Dict = {
|
||||
"id": following_item.get("mid"),
|
||||
"name": following_item.get("uname"),
|
||||
"sign": following_item.get("sign"),
|
||||
"avatar": following_item.get("face"),
|
||||
}
|
||||
await update_bilibili_creator_contact(creator_info=following_info, fan_info=creator_info)
|
||||
|
||||
|
||||
async def batch_update_bilibili_creator_dynamics(creator_info: Dict, dynamics_list: List[Dict]):
|
||||
if not dynamics_list:
|
||||
return
|
||||
for dynamic_item in dynamics_list:
|
||||
dynamic_id: str = dynamic_item["id_str"]
|
||||
dynamic_text: str = ""
|
||||
if dynamic_item["modules"]["module_dynamic"].get("desc"):
|
||||
dynamic_text = dynamic_item["modules"]["module_dynamic"]["desc"]["text"]
|
||||
dynamic_type: str = dynamic_item["type"].split("_")[-1]
|
||||
dynamic_pub_ts: str = dynamic_item["modules"]["module_author"]["pub_ts"]
|
||||
dynamic_stat: Dict = dynamic_item["modules"]["module_stat"]
|
||||
dynamic_comment: int = dynamic_stat["comment"]["count"]
|
||||
dynamic_forward: int = dynamic_stat["forward"]["count"]
|
||||
dynamic_like: int = dynamic_stat["like"]["count"]
|
||||
dynamic_info: Dict = {
|
||||
"dynamic_id": dynamic_id,
|
||||
"text": dynamic_text,
|
||||
"type": dynamic_type,
|
||||
"pub_ts": dynamic_pub_ts,
|
||||
"total_comments": dynamic_comment,
|
||||
"total_forwards": dynamic_forward,
|
||||
"total_liked": dynamic_like,
|
||||
}
|
||||
await update_bilibili_creator_dynamic(creator_info=creator_info, dynamic_info=dynamic_info)
|
||||
|
||||
|
||||
async def update_bilibili_creator_contact(creator_info: Dict, fan_info: Dict):
|
||||
save_contact_item = {
|
||||
"up_id": creator_info["id"],
|
||||
"fan_id": fan_info["id"],
|
||||
"up_name": creator_info["name"],
|
||||
"fan_name": fan_info["name"],
|
||||
"up_sign": creator_info["sign"],
|
||||
"fan_sign": fan_info["sign"],
|
||||
"up_avatar": creator_info["avatar"],
|
||||
"fan_avatar": fan_info["avatar"],
|
||||
"last_modify_ts": utils.get_current_timestamp(),
|
||||
}
|
||||
|
||||
await BiliStoreFactory.create_store().store_contact(contact_item=save_contact_item)
|
||||
|
||||
|
||||
async def update_bilibili_creator_dynamic(creator_info: Dict, dynamic_info: Dict):
|
||||
save_dynamic_item = {
|
||||
"dynamic_id": dynamic_info["dynamic_id"],
|
||||
"user_id": creator_info["id"],
|
||||
"user_name": creator_info["name"],
|
||||
"text": dynamic_info["text"],
|
||||
"type": dynamic_info["type"],
|
||||
"pub_ts": dynamic_info["pub_ts"],
|
||||
"total_comments": dynamic_info["total_comments"],
|
||||
"total_forwards": dynamic_info["total_forwards"],
|
||||
"total_liked": dynamic_info["total_liked"],
|
||||
"last_modify_ts": utils.get_current_timestamp(),
|
||||
}
|
||||
|
||||
await BiliStoreFactory.create_store().store_dynamic(dynamic_item=save_dynamic_item)
|
||||
|
||||
@@ -107,6 +107,30 @@ class BiliCsvStoreImplement(AbstractStore):
|
||||
"""
|
||||
await self.save_data_to_csv(save_item=creator, store_type="creators")
|
||||
|
||||
async def store_contact(self, contact_item: Dict):
|
||||
"""
|
||||
Bilibili contact CSV storage implementation
|
||||
Args:
|
||||
contact_item: creator's contact item dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
await self.save_data_to_csv(save_item=contact_item, store_type="contacts")
|
||||
|
||||
async def store_dynamic(self, dynamic_item: Dict):
|
||||
"""
|
||||
Bilibili dynamic CSV storage implementation
|
||||
Args:
|
||||
dynamic_item: creator's dynamic item dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
await self.save_data_to_csv(save_item=dynamic_item, store_type="dynamics")
|
||||
|
||||
|
||||
class BiliDbStoreImplement(AbstractStore):
|
||||
async def store_content(self, content_item: Dict):
|
||||
@@ -172,6 +196,52 @@ class BiliDbStoreImplement(AbstractStore):
|
||||
else:
|
||||
await update_creator_by_creator_id(creator_id,creator_item=creator)
|
||||
|
||||
async def store_contact(self, contact_item: Dict):
|
||||
"""
|
||||
Bilibili contact DB storage implementation
|
||||
Args:
|
||||
contact_item: contact item dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
from .bilibili_store_sql import (add_new_contact,
|
||||
query_contact_by_up_and_fan,
|
||||
update_contact_by_id, )
|
||||
|
||||
up_id = contact_item.get("up_id")
|
||||
fan_id = contact_item.get("fan_id")
|
||||
contact_detail: Dict = await query_contact_by_up_and_fan(up_id=up_id, fan_id=fan_id)
|
||||
if not contact_detail:
|
||||
contact_item["add_ts"] = utils.get_current_timestamp()
|
||||
await add_new_contact(contact_item)
|
||||
else:
|
||||
key_id = contact_detail.get("id")
|
||||
await update_contact_by_id(id=key_id, contact_item=contact_item)
|
||||
|
||||
async def store_dynamic(self, dynamic_item):
|
||||
"""
|
||||
Bilibili dynamic DB storage implementation
|
||||
Args:
|
||||
dynamic_item: dynamic item dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
from .bilibili_store_sql import (add_new_dynamic,
|
||||
query_dynamic_by_dynamic_id,
|
||||
update_dynamic_by_dynamic_id)
|
||||
|
||||
dynamic_id = dynamic_item.get("dynamic_id")
|
||||
dynamic_detail = await query_dynamic_by_dynamic_id(dynamic_id=dynamic_id)
|
||||
if not dynamic_detail:
|
||||
dynamic_item["add_ts"] = utils.get_current_timestamp()
|
||||
await add_new_dynamic(dynamic_item)
|
||||
else:
|
||||
await update_dynamic_by_dynamic_id(dynamic_id, dynamic_item=dynamic_item)
|
||||
|
||||
|
||||
class BiliJsonStoreImplement(AbstractStore):
|
||||
json_store_path: str = "data/bilibili/json"
|
||||
@@ -258,3 +328,27 @@ class BiliJsonStoreImplement(AbstractStore):
|
||||
|
||||
"""
|
||||
await self.save_data_to_json(creator, "creators")
|
||||
|
||||
async def store_contact(self, contact_item: Dict):
|
||||
"""
|
||||
creator contact JSON storage implementation
|
||||
Args:
|
||||
contact_item: creator's contact item dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
await self.save_data_to_json(save_item=contact_item, store_type="contacts")
|
||||
|
||||
async def store_dynamic(self, dynamic_item: Dict):
|
||||
"""
|
||||
creator dynamic JSON storage implementation
|
||||
Args:
|
||||
dynamic_item: creator's contact item dict
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
await self.save_data_to_json(save_item=dynamic_item, store_type="dynamics")
|
||||
|
||||
@@ -158,3 +158,95 @@ async def update_creator_by_creator_id(creator_id: str, creator_item: Dict) -> i
|
||||
effect_row: int = await async_db_conn.update_table("bilibili_up_info", creator_item, "user_id", creator_id)
|
||||
return effect_row
|
||||
|
||||
|
||||
async def query_contact_by_up_and_fan(up_id: str, fan_id: str) -> Dict:
|
||||
"""
|
||||
查询一条关联关系
|
||||
Args:
|
||||
up_id:
|
||||
fan_id:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
|
||||
sql: str = f"select * from bilibili_contact_info where up_id = '{up_id}' and fan_id = '{fan_id}'"
|
||||
rows: List[Dict] = await async_db_conn.query(sql)
|
||||
if len(rows) > 0:
|
||||
return rows[0]
|
||||
return dict()
|
||||
|
||||
|
||||
async def add_new_contact(contact_item: Dict) -> int:
|
||||
"""
|
||||
新增关联关系
|
||||
Args:
|
||||
contact_item:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
|
||||
last_row_id: int = await async_db_conn.item_to_table("bilibili_contact_info", contact_item)
|
||||
return last_row_id
|
||||
|
||||
|
||||
async def update_contact_by_id(id: str, contact_item: Dict) -> int:
|
||||
"""
|
||||
更新关联关系
|
||||
Args:
|
||||
id:
|
||||
contact_item:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
|
||||
effect_row: int = await async_db_conn.update_table("bilibili_contact_info", contact_item, "id", id)
|
||||
return effect_row
|
||||
|
||||
|
||||
async def query_dynamic_by_dynamic_id(dynamic_id: str) -> Dict:
|
||||
"""
|
||||
查询一条动态信息
|
||||
Args:
|
||||
dynamic_id:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
|
||||
sql: str = f"select * from bilibili_up_dynamic where dynamic_id = '{dynamic_id}'"
|
||||
rows: List[Dict] = await async_db_conn.query(sql)
|
||||
if len(rows) > 0:
|
||||
return rows[0]
|
||||
return dict()
|
||||
|
||||
|
||||
async def add_new_dynamic(dynamic_item: Dict) -> int:
|
||||
"""
|
||||
新增动态信息
|
||||
Args:
|
||||
dynamic_item:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
|
||||
last_row_id: int = await async_db_conn.item_to_table("bilibili_up_dynamic", dynamic_item)
|
||||
return last_row_id
|
||||
|
||||
|
||||
async def update_dynamic_by_dynamic_id(dynamic_id: str, dynamic_item: Dict) -> int:
|
||||
"""
|
||||
更新动态信息
|
||||
Args:
|
||||
dynamic_id:
|
||||
dynamic_item:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
|
||||
effect_row: int = await async_db_conn.update_table("bilibili_up_dynamic", dynamic_item, "dynamic_id", dynamic_id)
|
||||
return effect_row
|
||||
|
||||
Reference in New Issue
Block a user