feat(database): add PostgreSQL support and fix Windows subprocess encoding

This commit is contained in:
Doiiars
2026-01-09 00:41:59 +08:00
parent 57b688fea4
commit 70a6ca55bb
23 changed files with 221 additions and 27 deletions

View File

@@ -35,6 +35,7 @@ class BiliStoreFactory:
STORES = {
"csv": BiliCsvStoreImplement,
"db": BiliDbStoreImplement,
"postgres": BiliDbStoreImplement,
"json": BiliJsonStoreImplement,
"sqlite": BiliSqliteStoreImplement,
"mongodb": BiliMongoStoreImplement,

View File

@@ -128,16 +128,23 @@ class BiliDbStoreImplement(AbstractStore):
Args:
content_item: content item dict
"""
video_id = content_item.get("video_id")
video_id = int(content_item.get("video_id"))
content_item["video_id"] = video_id
content_item["user_id"] = int(content_item.get("user_id", 0) or 0)
content_item["liked_count"] = int(content_item.get("liked_count", 0) or 0)
content_item["create_time"] = int(content_item.get("create_time", 0) or 0)
async with get_session() as session:
result = await session.execute(select(BilibiliVideo).where(BilibiliVideo.video_id == video_id))
video_detail = result.scalar_one_or_none()
if not video_detail:
content_item["add_ts"] = utils.get_current_timestamp()
content_item["last_modify_ts"] = utils.get_current_timestamp()
new_content = BilibiliVideo(**content_item)
session.add(new_content)
else:
content_item["last_modify_ts"] = utils.get_current_timestamp()
for key, value in content_item.items():
setattr(video_detail, key, value)
await session.commit()
@@ -148,16 +155,25 @@ class BiliDbStoreImplement(AbstractStore):
Args:
comment_item: comment item dict
"""
comment_id = comment_item.get("comment_id")
comment_id = int(comment_item.get("comment_id"))
comment_item["comment_id"] = comment_id
comment_item["video_id"] = int(comment_item.get("video_id", 0) or 0)
comment_item["create_time"] = int(comment_item.get("create_time", 0) or 0)
comment_item["like_count"] = str(comment_item.get("like_count", "0"))
comment_item["sub_comment_count"] = str(comment_item.get("sub_comment_count", "0"))
comment_item["parent_comment_id"] = str(comment_item.get("parent_comment_id", "0"))
async with get_session() as session:
result = await session.execute(select(BilibiliVideoComment).where(BilibiliVideoComment.comment_id == comment_id))
comment_detail = result.scalar_one_or_none()
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
comment_item["last_modify_ts"] = utils.get_current_timestamp()
new_comment = BilibiliVideoComment(**comment_item)
session.add(new_comment)
else:
comment_item["last_modify_ts"] = utils.get_current_timestamp()
for key, value in comment_item.items():
setattr(comment_detail, key, value)
await session.commit()
@@ -168,16 +184,24 @@ class BiliDbStoreImplement(AbstractStore):
Args:
creator: creator item dict
"""
creator_id = creator.get("user_id")
creator_id = int(creator.get("user_id"))
creator["user_id"] = creator_id
creator["total_fans"] = int(creator.get("total_fans", 0) or 0)
creator["total_liked"] = int(creator.get("total_liked", 0) or 0)
creator["user_rank"] = int(creator.get("user_rank", 0) or 0)
creator["is_official"] = int(creator.get("is_official", 0) or 0)
async with get_session() as session:
result = await session.execute(select(BilibiliUpInfo).where(BilibiliUpInfo.user_id == creator_id))
creator_detail = result.scalar_one_or_none()
if not creator_detail:
creator["add_ts"] = utils.get_current_timestamp()
creator["last_modify_ts"] = utils.get_current_timestamp()
new_creator = BilibiliUpInfo(**creator)
session.add(new_creator)
else:
creator["last_modify_ts"] = utils.get_current_timestamp()
for key, value in creator.items():
setattr(creator_detail, key, value)
await session.commit()
@@ -188,8 +212,11 @@ class BiliDbStoreImplement(AbstractStore):
Args:
contact_item: contact item dict
"""
up_id = contact_item.get("up_id")
fan_id = contact_item.get("fan_id")
up_id = int(contact_item.get("up_id"))
fan_id = int(contact_item.get("fan_id"))
contact_item["up_id"] = up_id
contact_item["fan_id"] = fan_id
async with get_session() as session:
result = await session.execute(
select(BilibiliContactInfo).where(BilibiliContactInfo.up_id == up_id, BilibiliContactInfo.fan_id == fan_id)
@@ -198,9 +225,11 @@ class BiliDbStoreImplement(AbstractStore):
if not contact_detail:
contact_item["add_ts"] = utils.get_current_timestamp()
contact_item["last_modify_ts"] = utils.get_current_timestamp()
new_contact = BilibiliContactInfo(**contact_item)
session.add(new_contact)
else:
contact_item["last_modify_ts"] = utils.get_current_timestamp()
for key, value in contact_item.items():
setattr(contact_detail, key, value)
await session.commit()
@@ -211,16 +240,20 @@ class BiliDbStoreImplement(AbstractStore):
Args:
dynamic_item: dynamic item dict
"""
dynamic_id = dynamic_item.get("dynamic_id")
dynamic_id = int(dynamic_item.get("dynamic_id"))
dynamic_item["dynamic_id"] = dynamic_id
async with get_session() as session:
result = await session.execute(select(BilibiliUpDynamic).where(BilibiliUpDynamic.dynamic_id == dynamic_id))
dynamic_detail = result.scalar_one_or_none()
if not dynamic_detail:
dynamic_item["add_ts"] = utils.get_current_timestamp()
dynamic_item["last_modify_ts"] = utils.get_current_timestamp()
new_dynamic = BilibiliUpDynamic(**dynamic_item)
session.add(new_dynamic)
else:
dynamic_item["last_modify_ts"] = utils.get_current_timestamp()
for key, value in dynamic_item.items():
setattr(dynamic_detail, key, value)
await session.commit()

View File

@@ -34,6 +34,7 @@ class DouyinStoreFactory:
STORES = {
"csv": DouyinCsvStoreImplement,
"db": DouyinDbStoreImplement,
"postgres": DouyinDbStoreImplement,
"json": DouyinJsonStoreImplement,
"sqlite": DouyinSqliteStoreImplement,
"mongodb": DouyinMongoStoreImplement,

View File

@@ -97,7 +97,7 @@ class DouyinDbStoreImplement(AbstractStore):
Args:
content_item: content item dict
"""
aweme_id = content_item.get("aweme_id")
aweme_id = int(content_item.get("aweme_id"))
async with get_session() as session:
result = await session.execute(select(DouyinAweme).where(DouyinAweme.aweme_id == aweme_id))
aweme_detail = result.scalar_one_or_none()
@@ -118,7 +118,7 @@ class DouyinDbStoreImplement(AbstractStore):
Args:
comment_item: comment item dict
"""
comment_id = comment_item.get("comment_id")
comment_id = int(comment_item.get("comment_id"))
async with get_session() as session:
result = await session.execute(select(DouyinAwemeComment).where(DouyinAwemeComment.comment_id == comment_id))
comment_detail = result.scalar_one_or_none()

View File

@@ -34,6 +34,7 @@ class KuaishouStoreFactory:
STORES = {
"csv": KuaishouCsvStoreImplement,
"db": KuaishouDbStoreImplement,
"postgres": KuaishouDbStoreImplement,
"json": KuaishouJsonStoreImplement,
"sqlite": KuaishouSqliteStoreImplement,
"mongodb": KuaishouMongoStoreImplement,

View File

@@ -109,7 +109,8 @@ class KuaishouDbStoreImplement(AbstractStore):
session.add(new_content)
else:
for key, value in content_item.items():
setattr(video_detail, key, value)
if hasattr(video_detail, key):
setattr(video_detail, key, value)
await session.commit()
async def store_comment(self, comment_item: Dict):
@@ -130,7 +131,8 @@ class KuaishouDbStoreImplement(AbstractStore):
session.add(new_comment)
else:
for key, value in comment_item.items():
setattr(comment_detail, key, value)
if hasattr(comment_detail, key):
setattr(comment_detail, key, value)
await session.commit()

View File

@@ -31,6 +31,7 @@ class TieBaStoreFactory:
STORES = {
"csv": TieBaCsvStoreImplement,
"db": TieBaDbStoreImplement,
"postgres": TieBaDbStoreImplement,
"json": TieBaJsonStoreImplement,
"sqlite": TieBaSqliteStoreImplement,
"mongodb": TieBaMongoStoreImplement,

View File

@@ -35,6 +35,7 @@ class WeibostoreFactory:
STORES = {
"csv": WeiboCsvStoreImplement,
"db": WeiboDbStoreImplement,
"postgres": WeiboDbStoreImplement,
"json": WeiboJsonStoreImplement,
"sqlite": WeiboSqliteStoreImplement,
"mongodb": WeiboMongoStoreImplement,

View File

@@ -108,7 +108,8 @@ class WeiboDbStoreImplement(AbstractStore):
Returns:
"""
note_id = content_item.get("note_id")
note_id = int(content_item.get("note_id"))
content_item["note_id"] = note_id
async with get_session() as session:
stmt = select(WeiboNote).where(WeiboNote.note_id == note_id)
res = await session.execute(stmt)
@@ -134,7 +135,14 @@ class WeiboDbStoreImplement(AbstractStore):
Returns:
"""
comment_id = comment_item.get("comment_id")
comment_id = int(comment_item.get("comment_id"))
comment_item["comment_id"] = comment_id
comment_item["note_id"] = int(comment_item.get("note_id", 0) or 0)
comment_item["create_time"] = int(comment_item.get("create_time", 0) or 0)
comment_item["comment_like_count"] = str(comment_item.get("comment_like_count", "0"))
comment_item["sub_comment_count"] = str(comment_item.get("sub_comment_count", "0"))
comment_item["parent_comment_id"] = str(comment_item.get("parent_comment_id", "0"))
async with get_session() as session:
stmt = select(WeiboNoteComment).where(WeiboNoteComment.comment_id == comment_id)
res = await session.execute(stmt)
@@ -160,7 +168,8 @@ class WeiboDbStoreImplement(AbstractStore):
Returns:
"""
user_id = creator.get("user_id")
user_id = int(creator.get("user_id"))
creator["user_id"] = user_id
async with get_session() as session:
stmt = select(WeiboCreator).where(WeiboCreator.user_id == user_id)
res = await session.execute(stmt)

View File

@@ -34,6 +34,7 @@ class XhsStoreFactory:
STORES = {
"csv": XhsCsvStoreImplement,
"db": XhsDbStoreImplement,
"postgres": XhsDbStoreImplement,
"json": XhsJsonStoreImplement,
"sqlite": XhsSqliteStoreImplement,
"mongodb": XhsMongoStoreImplement,

View File

@@ -189,9 +189,9 @@ class XhsDbStoreImplement(AbstractStore):
create_time=comment_item.get("create_time"),
note_id=comment_item.get("note_id"),
content=comment_item.get("content"),
sub_comment_count=comment_item.get("sub_comment_count"),
sub_comment_count=int(comment_item.get("sub_comment_count", 0) or 0),
pictures=json.dumps(comment_item.get("pictures")),
parent_comment_id=comment_item.get("parent_comment_id"),
parent_comment_id=str(comment_item.get("parent_comment_id", "")),
like_count=str(comment_item.get("like_count"))
)
session.add(comment)
@@ -202,7 +202,7 @@ class XhsDbStoreImplement(AbstractStore):
update_data = {
"last_modify_ts": last_modify_ts,
"like_count": str(comment_item.get("like_count")),
"sub_comment_count": comment_item.get("sub_comment_count"),
"sub_comment_count": int(comment_item.get("sub_comment_count", 0) or 0),
}
stmt = update(XhsNoteComment).where(XhsNoteComment.comment_id == comment_id).values(**update_data)
await session.execute(stmt)

View File

@@ -38,6 +38,7 @@ class ZhihuStoreFactory:
STORES = {
"csv": ZhihuCsvStoreImplement,
"db": ZhihuDbStoreImplement,
"postgres": ZhihuDbStoreImplement,
"json": ZhihuJsonStoreImplement,
"sqlite": ZhihuSqliteStoreImplement,
"mongodb": ZhihuMongoStoreImplement,

View File

@@ -110,7 +110,8 @@ class ZhihuDbStoreImplement(AbstractStore):
existing_content = result.scalars().first()
if existing_content:
for key, value in content_item.items():
setattr(existing_content, key, value)
if hasattr(existing_content, key):
setattr(existing_content, key, value)
else:
new_content = ZhihuContent(**content_item)
session.add(new_content)
@@ -129,7 +130,8 @@ class ZhihuDbStoreImplement(AbstractStore):
existing_comment = result.scalars().first()
if existing_comment:
for key, value in comment_item.items():
setattr(existing_comment, key, value)
if hasattr(existing_comment, key):
setattr(existing_comment, key, value)
else:
new_comment = ZhihuComment(**comment_item)
session.add(new_comment)
@@ -148,7 +150,8 @@ class ZhihuDbStoreImplement(AbstractStore):
existing_creator = result.scalars().first()
if existing_creator:
for key, value in creator.items():
setattr(existing_creator, key, value)
if hasattr(existing_creator, key):
setattr(existing_creator, key, value)
else:
new_creator = ZhihuCreator(**creator)
session.add(new_creator)