fix_words

This commit is contained in:
Bowenwin
2025-05-22 20:31:48 +08:00
parent a356358c21
commit 44e3d370ff
8 changed files with 338 additions and 59 deletions

View File

@@ -46,7 +46,7 @@ HEADLESS = False
SAVE_LOGIN_STATE = True
# 数据保存类型选项配置,支持三种类型csv、db、json, 最好保存到DB有排重的功能。
SAVE_DATA_OPTION = "json" # csv or db or json
SAVE_DATA_OPTION = "csv" # csv or db or json
# 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
@@ -73,7 +73,10 @@ ENABLE_GET_COMMENTS = True
CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 10
# 爬取作者粉丝和关注列表数量控制(单作者)
CRAWLER_MAX_FANS_COUNT_SINGLENOTES = 100
CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES = 100
# 爬取作者动态粉丝和关注列表数量控制(单作者)
CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES = 50
# 是否开启爬二级评论模式, 默认不开启爬二级评论
# 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段
@@ -152,9 +155,9 @@ DY_CREATOR_ID_LIST = [
BILI_CREATOR_ID_LIST = [
# "20813884",
"520819684",
"472747194",
"519872016",
"372201438",
# "472747194",
# "519872016",
# "372201438",
# ........................
]

View File

@@ -12,11 +12,16 @@
import os
# mysql config
RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456")
RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root")
RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost")
RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306)
RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler")
# RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456")
# RELATION_DB_USER = os.getenv("RELATION_DB_USER", "root")
# RELATION_DB_HOST = os.getenv("RELATION_DB_HOST", "localhost")
# RELATION_DB_PORT = os.getenv("RELATION_DB_PORT", 3306)
# RELATION_DB_NAME = os.getenv("RELATION_DB_NAME", "media_crawler")
RELATION_DB_HOST = "47.94.233.47" # 替换为你的数据库域名/公网IP
RELATION_DB_PORT = 3306 # 替换为你的数据库端口通常3306
RELATION_DB_USER = "remote_user" # 替换为你的数据库用户名
RELATION_DB_PWD = "314159" # 替换为你的数据库密码
RELATION_DB_NAME = "Test" # 替换为你的数据库名称
# redis config

View File

@@ -341,7 +341,8 @@ class BilibiliClient(AbstractApiClient):
return await self.get(uri, post_data)
async def get_creator_info(self, creator_id: int) -> Dict:
"""get creator info
"""
get creator info
:param creator_id: 作者 ID
"""
uri = "/x/space/wbi/acc/info"
@@ -355,7 +356,8 @@ class BilibiliClient(AbstractApiClient):
pn: int,
ps: int = 24,
) -> Dict:
"""get video comments
"""
get creator fans
:param creator_id: 创作者 ID
:param pn: 开始页数
:param ps: 每页数量
@@ -376,7 +378,8 @@ class BilibiliClient(AbstractApiClient):
pn: int,
ps: int = 24,
) -> Dict:
"""get video comments
"""
get creator followings
:param creator_id: 创作者 ID
:param pn: 开始页数
:param ps: 每页数量
@@ -391,11 +394,27 @@ class BilibiliClient(AbstractApiClient):
}
return await self.get(uri, post_data)
async def get_creator_dynamics(self, creator_id: int, offset: str = ""):
"""
get creator comments
:param creator_id: 创作者 ID
:param offset: 发送请求所需参数
:return:
"""
uri = "/x/polymer/web-dynamic/v1/feed/space"
post_data = {
"offset": offset,
"host_mid": creator_id,
"platform": "web",
}
return await self.get(uri, post_data)
async def get_creator_all_fans(self, creator_info: Dict, crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
max_count: int = 100) -> List:
"""
get video all comments include sub comments
get creator all fans
:param creator_info:
:param crawl_interval:
:param callback:
@@ -419,16 +438,13 @@ class BilibiliClient(AbstractApiClient):
if not fans_list:
break
result.extend(fans_list)
utils.logger.info(
f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans successfully")
return result
async def get_creator_all_followings(self, creator_info: Dict, crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
max_count: int = 100) -> List:
"""
get video all comments include sub comments
get creator all followings
:param creator_info:
:param crawl_interval:
:param callback:
@@ -452,7 +468,33 @@ class BilibiliClient(AbstractApiClient):
if not followings_list:
break
result.extend(followings_list)
utils.logger.info(
f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings successfully")
return result
async def get_creator_all_dynamics(self, creator_info: Dict, crawl_interval: float = 1.0,
callback: Optional[Callable] = None,
max_count: int = 20) -> List:
"""
get creator all followings
:param creator_info:
:param crawl_interval:
:param callback:
:param max_count: 一个up主爬取的最大动态数量
:return: up主关注者列表
"""
creator_id = creator_info["id"]
result = []
offset = ""
has_more = True
while has_more and len(result) < max_count:
dynamics_res = await self.get_creator_dynamics(creator_id, offset)
dynamics_list: List[Dict] = dynamics_res["items"]
has_more = dynamics_res["has_more"]
offset = dynamics_res["offset"]
if len(result) + len(dynamics_list) > max_count:
dynamics_list = dynamics_list[:max_count - len(result)]
if callback:
await callback(creator_info, dynamics_list)
await asyncio.sleep(crawl_interval)
result.extend(dynamics_list)
return result

View File

@@ -119,14 +119,16 @@ class BilibiliCrawler(AbstractCrawler):
start_day: datetime = datetime.strptime(start, '%Y-%m-%d')
end_day: datetime = datetime.strptime(end, '%Y-%m-%d')
if start_day > end_day:
raise ValueError('Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end')
raise ValueError(
'Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end')
elif start_day == end_day: # 搜索同一天的内容
end_day = start_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second
end_day = start_day + timedelta(days=1) - timedelta(
seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second
else: # 搜索 start 至 end
end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second
# 将其重新转换为时间戳
return str(int(start_day.timestamp())), str(int(end_day.timestamp()))
async def search(self):
"""
search bilibili video with keywords
@@ -164,9 +166,11 @@ class BilibiliCrawler(AbstractCrawler):
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = []
try:
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore)
for video_item in video_list]
except Exception as e:
utils.logger.warning(f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}")
utils.logger.warning(
f"[BilibiliCrawler.search] error in the task list. The video for this page will not be included. {e}")
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
@@ -180,21 +184,23 @@ class BilibiliCrawler(AbstractCrawler):
else:
for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'):
# 按照每一天进行爬取的时间戳参数
pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d'))
pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'),
end=day.strftime('%Y-%m-%d'))
page = 1
#!该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频
#!除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天
#!除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!!
# !该段 while 语句在发生异常时(通常情况下为当天数据为空时)会自动跳转到下一天,以实现最大程度爬取该关键词下当天的所有视频
# !除了仅保留现在原有的 try, except Exception 语句外,不要再添加其他的异常处理!!!否则将使该段代码失效,使其仅能爬取当天一天数据而无法跳转到下一天
# !除非将该段代码的逻辑进行重构以实现相同的功能,否则不要进行修改!!!
while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
#! Catch any error if response return nothing, go to next day
# ! Catch any error if response return nothing, go to next day
try:
#! Don't skip any page, to make sure gather all video in one day
# ! Don't skip any page, to make sure gather all video in one day
# if page < start_page:
# utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}")
# page += 1
# continue
utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}")
utils.logger.info(
f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}")
video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword,
@@ -207,7 +213,9 @@ class BilibiliCrawler(AbstractCrawler):
video_list: List[Dict] = videos_res.get("result")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
task_list = [
self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for
video_item in video_list]
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
@@ -467,7 +475,6 @@ class BilibiliCrawler(AbstractCrawler):
extension_file_name = f"video.mp4"
await bilibili_store.store_video(aid, content, extension_file_name)
async def get_all_creator_details(self, creator_id_list: List[int]):
"""
creator_id_list: get details for creator from creator_id_list
@@ -485,7 +492,8 @@ class BilibiliCrawler(AbstractCrawler):
creator_id, semaphore), name=creator_id)
task_list.append(task)
except Exception as e:
utils.logger.warning(f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}")
utils.logger.warning(
f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}")
await asyncio.gather(*task_list)
@@ -504,8 +512,9 @@ class BilibiliCrawler(AbstractCrawler):
"sign": creator_unhandled_info.get("sign"),
"avatar": creator_unhandled_info.get("face"),
}
await self.get_fans(creator_info, semaphore)
await self.get_followings(creator_info, semaphore)
# await self.get_fans(creator_info, semaphore)
# await self.get_followings(creator_info, semaphore)
await self.get_dynamics(creator_info, semaphore)
async def get_fans(self, creator_info: Dict, semaphore: asyncio.Semaphore):
"""
@@ -523,7 +532,7 @@ class BilibiliCrawler(AbstractCrawler):
creator_info=creator_info,
crawl_interval=random.random(),
callback=bilibili_store.batch_update_bilibili_creator_fans,
max_count=config.CRAWLER_MAX_FANS_COUNT_SINGLENOTES,
max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES,
)
except DataFetchError as ex:
@@ -549,7 +558,7 @@ class BilibiliCrawler(AbstractCrawler):
creator_info=creator_info,
crawl_interval=random.random(),
callback=bilibili_store.batch_update_bilibili_creator_followings,
max_count=config.CRAWLER_MAX_FANS_COUNT_SINGLENOTES,
max_count=config.CRAWLER_MAX_CONTACTS_COUNT_SINGLENOTES,
)
except DataFetchError as ex:
@@ -558,3 +567,29 @@ class BilibiliCrawler(AbstractCrawler):
except Exception as e:
utils.logger.error(
f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}")
async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore):
"""
get dynamics for creator id
:param creator_info:
:param semaphore:
:return:
"""
creator_id = creator_info["id"]
async with semaphore:
try:
utils.logger.info(
f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ...")
await self.bili_client.get_creator_all_dynamics(
creator_info=creator_info,
crawl_interval=random.random(),
callback=bilibili_store.batch_update_bilibili_creator_dynamics,
max_count=config.CRAWLER_MAX_DYNAMICS_COUNT_SINGLENOTES,
)
except DataFetchError as ex:
utils.logger.error(
f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}")
except Exception as e:
utils.logger.error(
f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}")

View File

@@ -76,6 +76,50 @@ CREATE TABLE `bilibili_up_info`
KEY `idx_bilibili_vi_user_123456` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站UP主信息';
-- ----------------------------
-- Table structure for bilibili_contact_info
-- ----------------------------
DROP TABLE IF EXISTS `bilibili_contact_info`;
CREATE TABLE `bilibili_contact_info`
(
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`up_id` varchar(64) DEFAULT NULL COMMENT 'up主ID',
`fan_id` varchar(64) DEFAULT NULL COMMENT '粉丝ID',
`up_name` varchar(64) DEFAULT NULL COMMENT 'up主昵称',
`fan_name` varchar(64) DEFAULT NULL COMMENT '粉丝昵称',
`up_sign` longtext DEFAULT NULL COMMENT 'up主签名',
`fan_sign` longtext DEFAULT NULL COMMENT '粉丝签名',
`up_avatar` varchar(255) DEFAULT NULL COMMENT 'up主头像地址',
`fan_avatar` varchar(255) DEFAULT NULL COMMENT '粉丝头像地址',
`add_ts` bigint NOT NULL COMMENT '记录添加时间戳',
`last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳',
PRIMARY KEY (`id`),
KEY `idx_bilibili_contact_info_up_id` (`up_id`),
KEY `idx_bilibili_contact_info_fan_id` (`fan_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站联系人信息';
-- ----------------------------
-- Table structure for bilibili_up_dynamic
-- ----------------------------
DROP TABLE IF EXISTS `bilibili_up_dynamic`;
CREATE TABLE `bilibili_up_dynamic`
(
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`dynamic_id` varchar(64) DEFAULT NULL COMMENT '动态ID',
`user_id` varchar(64) DEFAULT NULL COMMENT '用户ID',
`user_name` varchar(64) DEFAULT NULL COMMENT '用户名',
`text` longtext DEFAULT NULL COMMENT '动态文本',
`type` varchar(64) DEFAULT NULL COMMENT '动态类型',
`pub_ts` bigint DEFAULT NULL COMMENT '动态发布时间',
`total_comments` bigint DEFAULT NULL COMMENT '评论数',
`total_forwards` bigint DEFAULT NULL COMMENT '转发数',
`total_liked` bigint DEFAULT NULL COMMENT '点赞数',
`add_ts` bigint NOT NULL COMMENT '记录添加时间戳',
`last_modify_ts` bigint NOT NULL COMMENT '记录最后修改时间戳',
PRIMARY KEY (`id`),
KEY `idx_bilibili_up_dynamic_dynamic_id` (`dynamic_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='B 站up主动态信息';
-- ----------------------------
-- Table structure for douyin_aweme
-- ----------------------------
@@ -463,7 +507,7 @@ CREATE TABLE `tieba_creator`
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='贴吧创作者';
DROP TABLE IF EXISTS `zhihu_content`;
CREATE TABLE `zhihu_content` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`content_id` varchar(64) NOT NULL COMMENT '内容ID',
@@ -491,7 +535,7 @@ CREATE TABLE `zhihu_content` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎内容(回答、文章、视频)';
DROP TABLE IF EXISTS `zhihu_comment`;
CREATE TABLE `zhihu_comment` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`comment_id` varchar(64) NOT NULL COMMENT '评论ID',
@@ -516,7 +560,7 @@ CREATE TABLE `zhihu_comment` (
KEY `idx_zhihu_comment_publish_time` (`publish_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='知乎评论';
DROP TABLE IF EXISTS `zhihu_creator`;
CREATE TABLE `zhihu_creator` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`user_id` varchar(64) NOT NULL COMMENT '用户ID',

View File

@@ -144,7 +144,7 @@ async def batch_update_bilibili_creator_fans(creator_info: Dict, fans_list: List
"sign": fan_item.get("sign"),
"avatar": fan_item.get("face"),
}
await update_bilibili_creator_fans(creator_info=creator_info, fan_info=fan_info)
await update_bilibili_creator_contact(creator_info=creator_info, fan_info=fan_info)
async def batch_update_bilibili_creator_followings(creator_info: Dict, followings_list: List[Dict]):
@@ -157,10 +157,36 @@ async def batch_update_bilibili_creator_followings(creator_info: Dict, following
"sign": following_item.get("sign"),
"avatar": following_item.get("face"),
}
await update_bilibili_creator_fans(creator_info=following_info, fan_info=creator_info)
await update_bilibili_creator_contact(creator_info=following_info, fan_info=creator_info)
async def update_bilibili_creator_fans(creator_info: Dict, fan_info: Dict):
async def batch_update_bilibili_creator_dynamics(creator_info: Dict, dynamics_list: List[Dict]):
if not dynamics_list:
return
for dynamic_item in dynamics_list:
dynamic_id: str = dynamic_item["id_str"]
dynamic_text: str = ""
if dynamic_item["modules"]["module_dynamic"].get("desc"):
dynamic_text = dynamic_item["modules"]["module_dynamic"]["desc"]["text"]
dynamic_type: str = dynamic_item["type"].split("_")[-1]
dynamic_pub_ts: str = dynamic_item["modules"]["module_author"]["pub_ts"]
dynamic_stat: Dict = dynamic_item["modules"]["module_stat"]
dynamic_comment: int = dynamic_stat["comment"]["count"]
dynamic_forward: int = dynamic_stat["forward"]["count"]
dynamic_like: int = dynamic_stat["like"]["count"]
dynamic_info: Dict = {
"dynamic_id": dynamic_id,
"text": dynamic_text,
"type": dynamic_type,
"pub_ts": dynamic_pub_ts,
"comment": dynamic_comment,
"forward": dynamic_forward,
"like": dynamic_like,
}
await update_bilibili_creator_dynamic(creator_info=creator_info, dynamic_info=dynamic_info)
async def update_bilibili_creator_contact(creator_info: Dict, fan_info: Dict):
save_contact_item = {
"up_id": creator_info["id"],
"fan_id": fan_info["id"],
@@ -169,7 +195,25 @@ async def update_bilibili_creator_fans(creator_info: Dict, fan_info: Dict):
"up_sign": creator_info["sign"],
"fan_sign": fan_info["sign"],
"up_avatar": creator_info["avatar"],
"fan_avatar": fan_info["avatar"]
"fan_avatar": fan_info["avatar"],
"last_modify_ts": utils.get_current_timestamp(),
}
await BiliStoreFactory.create_store().store_creator_contact(contact_item=save_contact_item)
await BiliStoreFactory.create_store().store_contact(contact_item=save_contact_item)
async def update_bilibili_creator_dynamic(creator_info: Dict, dynamic_info: Dict):
save_dynamic_item = {
"dynamic_id": dynamic_info["dynamic_id"],
"user_id": creator_info["id"],
"user_name": creator_info["name"],
"text": dynamic_info["text"],
"type": dynamic_info["type"],
"pub_ts": dynamic_info["pub_ts"],
"comment": dynamic_info["comment"],
"forward": dynamic_info["forward"],
"like": dynamic_info["like"],
"last_modify_ts": utils.get_current_timestamp(),
}
await BiliStoreFactory.create_store().store_dynamic(dynamic_item=save_dynamic_item)

View File

@@ -38,13 +38,15 @@ def calculate_number_of_files(file_store_path: str) -> int:
if not os.path.exists(file_store_path):
return 1
try:
return max([int(file_name.split("_")[0])for file_name in os.listdir(file_store_path)])+1
return max([int(file_name.split("_")[0]) for file_name in os.listdir(file_store_path)]) + 1
except ValueError:
return 1
class BiliCsvStoreImplement(AbstractStore):
csv_store_path: str = "data/bilibili"
file_count:int=calculate_number_of_files(csv_store_path)
file_count: int = calculate_number_of_files(csv_store_path)
def make_save_file_name(self, store_type: str) -> str:
"""
make save file name by store type
@@ -107,9 +109,9 @@ class BiliCsvStoreImplement(AbstractStore):
"""
await self.save_data_to_csv(save_item=creator, store_type="creators")
async def store_creator_contact(self, contact_item: Dict):
async def store_contact(self, contact_item: Dict):
"""
Bilibili comment CSV storage implementation
Bilibili contact CSV storage implementation
Args:
contact_item: creator's contact item dict
@@ -117,7 +119,19 @@ class BiliCsvStoreImplement(AbstractStore):
"""
await self.save_data_to_csv(save_item=contact_item, store_type="fans")
await self.save_data_to_csv(save_item=contact_item, store_type="contacts")
async def store_dynamic(self, dynamic_item: Dict):
"""
Bilibili dynamic CSV storage implementation
Args:
dynamic_item: creator's dynamic item dict
Returns:
"""
await self.save_data_to_csv(save_item=dynamic_item, store_type="dynamics")
class BiliDbStoreImplement(AbstractStore):
@@ -184,16 +198,50 @@ class BiliDbStoreImplement(AbstractStore):
else:
await update_creator_by_creator_id(creator_id,creator_item=creator)
async def store_contact(self, contact_item: Dict):
"""
Bilibili contact DB storage implementation
Args:
contact_item: contact item dict
Returns:
"""
from .bilibili_store_sql import (add_new_contact,
query_contact_by_up_and_fan,
update_contact_by_id, )
up_id = contact_item.get("up_id")
fan_id = contact_item.get("fan_id")
contact_detail: Dict = await query_contact_by_up_and_fan(up_id=up_id, fan_id=fan_id)
if not contact_detail:
contact_item["add_ts"] = utils.get_current_timestamp()
await add_new_contact(contact_item)
else:
key_id = contact_detail.get("id")
await update_contact_by_id(id=key_id, contact_item=contact_item)
async def store_dynamic(self, dynamic_item):
"""
Bilibili dynamic DB storage implementation
Args:
dynamic_item: dynamic item dict
Returns:
"""
class BiliJsonStoreImplement(AbstractStore):
json_store_path: str = "data/bilibili/json"
words_store_path: str = "data/bilibili/words"
lock = asyncio.Lock()
file_count:int=calculate_number_of_files(json_store_path)
file_count: int = calculate_number_of_files(json_store_path)
WordCloud = words.AsyncWordCloudGenerator()
def make_save_file_name(self, store_type: str) -> (str,str):
def make_save_file_name(self, store_type: str) -> (str, str):
"""
make save file name by store type
Args:
@@ -220,7 +268,7 @@ class BiliJsonStoreImplement(AbstractStore):
"""
pathlib.Path(self.json_store_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(self.words_store_path).mkdir(parents=True, exist_ok=True)
save_file_name,words_file_name_prefix = self.make_save_file_name(store_type=store_type)
save_file_name, words_file_name_prefix = self.make_save_file_name(store_type=store_type)
save_data = []
async with self.lock:
@@ -271,7 +319,7 @@ class BiliJsonStoreImplement(AbstractStore):
"""
await self.save_data_to_json(creator, "creators")
async def store_creator_contact(self, contact_item: Dict):
async def store_contact(self, contact_item: Dict):
"""
creator contact JSON storage implementation
Args:
@@ -281,4 +329,16 @@ class BiliJsonStoreImplement(AbstractStore):
"""
await self.save_data_to_json(save_item=contact_item, store_type="fans")
await self.save_data_to_json(save_item=contact_item, store_type="contacts")
async def store_dynamic(self, dynamic_item: Dict):
"""
creator dynamic JSON storage implementation
Args:
dynamic_item: creator's contact item dict
Returns:
"""
await self.save_data_to_json(save_item=dynamic_item, store_type="dynamics")

View File

@@ -158,3 +158,49 @@ async def update_creator_by_creator_id(creator_id: str, creator_item: Dict) -> i
effect_row: int = await async_db_conn.update_table("bilibili_up_info", creator_item, "user_id", creator_id)
return effect_row
async def query_contact_by_up_and_fan(up_id: str, fan_id: str) -> Dict:
"""
查询一条关联关系
Args:
up_id:
fan_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from bilibili_contact_info where up_id = '{up_id}' and fan_id = '{fan_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_contact(contact_item: Dict) -> int:
"""
新增关联关系
Args:
contact_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("bilibili_contact_info", contact_item)
return last_row_id
async def update_contact_by_id(id: str, contact_item: Dict) -> int:
"""
更新关联关系
Args:
id:
contact_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("bilibili_contact_info", contact_item, "id", id)
return effect_row