feat: db数据存储重构完成

This commit is contained in:
Relakkes
2024-04-06 22:11:10 +08:00
parent de4a437dd7
commit 0c8484c334
14 changed files with 813 additions and 147 deletions

View File

@@ -82,20 +82,17 @@ class BiliDbStoreImplement(AbstractStore):
Returns:
"""
from .bilibili_store_db_types import BilibiliVideo
from .bilibili_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
video_id = content_item.get("video_id")
if not await BilibiliVideo.filter(video_id=video_id).exists():
video_detail: Dict = await query_content_by_content_id(content_id=video_id)
if not video_detail:
content_item["add_ts"] = utils.get_current_timestamp()
bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoCreate', exclude=('id',))
bilibili_data = bilibili_video_pydantic(**content_item)
bilibili_video_pydantic.model_validate(bilibili_data)
await BilibiliVideo.create(**bilibili_data.model_dump())
await add_new_content(content_item)
else:
bilibili_video_pydantic = pydantic_model_creator(BilibiliVideo, name='BilibiliVideoUpdate',
exclude=('id', 'add_ts'))
bilibili_data = bilibili_video_pydantic(**content_item)
bilibili_video_pydantic.model_validate(bilibili_data)
await BilibiliVideo.filter(video_id=video_id).update(**bilibili_data.model_dump())
await update_content_by_content_id(video_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
@@ -106,21 +103,17 @@ class BiliDbStoreImplement(AbstractStore):
Returns:
"""
from .bilibili_store_db_types import BilibiliComment
from .bilibili_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
if not await BilibiliComment.filter(comment_id=comment_id).exists():
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await BilibiliComment.create(**comment_data.model_dump())
await add_new_comment(comment_item)
else:
comment_pydantic = pydantic_model_creator(BilibiliComment, name='BilibiliVideoCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await BilibiliComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
class BiliJsonStoreImplement(AbstractStore):

View File

@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from db import AsyncMysqlDB
from var import media_crawler_db_var
async def query_content_by_content_id(content_id: str) -> Dict:
"""
查询一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from bilibili_video where video_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_content(content_item: Dict) -> int:
"""
新增一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("bilibili_video", content_item)
return last_row_id
async def update_content_by_content_id(content_id: str, content_item: Dict) -> int:
"""
更新一条记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("bilibili_video", content_item, "video_id", content_id)
return effect_row
async def query_comment_by_comment_id(comment_id: str) -> Dict:
"""
查询一条评论内容
Args:
comment_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from bilibili_video_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_comment(comment_item: Dict) -> int:
"""
新增一条评论记录
Args:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("bilibili_video_comment", comment_item)
return last_row_id
async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int:
"""
更新增一条评论记录
Args:
comment_id:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("bilibili_video_comment", comment_item, "comment_id", comment_id)
return effect_row

View File

@@ -82,20 +82,19 @@ class DouyinDbStoreImplement(AbstractStore):
Returns:
"""
from .douyin_store_db_types import DouyinAweme
from .douyin_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
aweme_id = content_item.get("aweme_id")
if not await DouyinAweme.filter(aweme_id=aweme_id).exists():
aweme_detail: Dict = await query_content_by_content_id(content_id=aweme_id)
if not aweme_detail:
content_item["add_ts"] = utils.get_current_timestamp()
douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeCreate', exclude=('id',))
douyin_data = douyin_aweme_pydantic(**content_item)
douyin_aweme_pydantic.model_validate(douyin_data)
await DouyinAweme.create(**douyin_data.dict())
if aweme_detail.get("title"):
await add_new_content(content_item)
else:
douyin_aweme_pydantic = pydantic_model_creator(DouyinAweme, name='DouyinAwemeUpdate',
exclude=('id', 'add_ts'))
douyin_data = douyin_aweme_pydantic(**content_item)
douyin_aweme_pydantic.model_validate(douyin_data)
await DouyinAweme.filter(aweme_id=aweme_id).update(**douyin_data.model_dump())
await update_content_by_content_id(aweme_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
@@ -106,21 +105,16 @@ class DouyinDbStoreImplement(AbstractStore):
Returns:
"""
from .douyin_store_db_types import DouyinAwemeComment
from .douyin_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
if not await DouyinAwemeComment.filter(comment_id=comment_id).exists():
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await DouyinAwemeComment.create(**comment_data.model_dump())
await add_new_comment(comment_item)
else:
comment_pydantic = pydantic_model_creator(DouyinAwemeComment, name='DouyinAwemeCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await DouyinAwemeComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
class DouyinJsonStoreImplement(AbstractStore):

View File

@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from db import AsyncMysqlDB
from var import media_crawler_db_var
async def query_content_by_content_id(content_id: str) -> Dict:
"""
查询一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from douyin_aweme where aweme_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_content(content_item: Dict) -> int:
"""
新增一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("douyin_aweme", content_item)
return last_row_id
async def update_content_by_content_id(content_id: str, content_item: Dict) -> int:
"""
更新一条记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("douyin_aweme", content_item, "aweme_id", content_id)
return effect_row
async def query_comment_by_comment_id(comment_id: str) -> Dict:
"""
查询一条评论内容
Args:
comment_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from douyin_aweme_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_comment(comment_item: Dict) -> int:
"""
新增一条评论记录
Args:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("douyin_aweme_comment", comment_item)
return last_row_id
async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int:
"""
更新增一条评论记录
Args:
comment_id:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("douyin_aweme_comment", comment_item, "comment_id", comment_id)
return effect_row

View File

@@ -82,20 +82,18 @@ class KuaishouDbStoreImplement(AbstractStore):
Returns:
"""
from .kuaishou_store_db_types import KuaishouVideo
from .kuaishou_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
video_id = content_item.get("video_id")
if not await KuaishouVideo.filter(video_id=video_id).exists():
video_detail: Dict = await query_content_by_content_id(content_id=video_id)
if not video_detail:
content_item["add_ts"] = utils.get_current_timestamp()
kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoCreate', exclude=('id',))
kuaishou_data = kuaishou_video_pydantic(**content_item)
kuaishou_video_pydantic.model_validate(kuaishou_data)
await KuaishouVideo.create(**kuaishou_data.model_dump())
await add_new_content(content_item)
else:
kuaishou_video_pydantic = pydantic_model_creator(KuaishouVideo, name='kuaishouVideoUpdate',
exclude=('id', 'add_ts'))
kuaishou_data = kuaishou_video_pydantic(**content_item)
kuaishou_video_pydantic.model_validate(kuaishou_data)
await KuaishouVideo.filter(video_id=video_id).update(**kuaishou_data.model_dump())
await update_content_by_content_id(video_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
@@ -106,21 +104,17 @@ class KuaishouDbStoreImplement(AbstractStore):
Returns:
"""
from .kuaishou_store_db_types import KuaishouVideoComment
from .kuaishou_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
if not await KuaishouVideoComment.filter(comment_id=comment_id).exists():
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await KuaishouVideoComment.create(**comment_data.model_dump())
await add_new_comment(comment_item)
else:
comment_pydantic = pydantic_model_creator(KuaishouVideoComment, name='KuaishouVideoCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await KuaishouVideoComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
class KuaishouJsonStoreImplement(AbstractStore):

View File

@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from db import AsyncMysqlDB
from var import media_crawler_db_var
async def query_content_by_content_id(content_id: str) -> Dict:
"""
查询一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from kuaishou_video where video_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_content(content_item: Dict) -> int:
"""
新增一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("kuaishou_video", content_item)
return last_row_id
async def update_content_by_content_id(content_id: str, content_item: Dict) -> int:
"""
更新一条记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("kuaishou_video", content_item, "video_id", content_id)
return effect_row
async def query_comment_by_comment_id(comment_id: str) -> Dict:
"""
查询一条评论内容
Args:
comment_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from kuaishou_video_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_comment(comment_item: Dict) -> int:
"""
新增一条评论记录
Args:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("kuaishou_video_comment", comment_item)
return last_row_id
async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int:
"""
更新增一条评论记录
Args:
comment_id:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("kuaishou_video_comment", comment_item, "comment_id", comment_id)
return effect_row

View File

@@ -82,20 +82,17 @@ class WeiboDbStoreImplement(AbstractStore):
Returns:
"""
from .weibo_store_db_types import WeiboNote
from .weibo_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
note_id = content_item.get("note_id")
if not await WeiboNote.filter(note_id=note_id).exists():
note_detail: Dict = await query_content_by_content_id(content_id=note_id)
if not note_detail:
content_item["add_ts"] = utils.get_current_timestamp()
weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteCreate', exclude=('id',))
weibo_data = weibo_note_pydantic(**content_item)
weibo_note_pydantic.model_validate(weibo_data)
await WeiboNote.create(**weibo_data.model_dump())
await add_new_content(content_item)
else:
weibo_note_pydantic = pydantic_model_creator(WeiboNote, name='WeiboNoteUpdate',
exclude=('id', 'add_ts'))
weibo_data = weibo_note_pydantic(**content_item)
weibo_note_pydantic.model_validate(weibo_data)
await WeiboNote.filter(note_id=note_id).update(**weibo_data.model_dump())
await update_content_by_content_id(note_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
@@ -106,21 +103,16 @@ class WeiboDbStoreImplement(AbstractStore):
Returns:
"""
from .weibo_store_db_types import WeiboComment
from .weibo_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
if not await WeiboComment.filter(comment_id=comment_id).exists():
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentCreate',
exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await WeiboComment.create(**comment_data.model_dump())
await add_new_comment(comment_item)
else:
comment_pydantic = pydantic_model_creator(WeiboComment, name='WeiboNoteCommentUpdate',
exclude=('id', 'add_ts'))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await WeiboComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
class WeiboJsonStoreImplement(AbstractStore):
@@ -161,7 +153,6 @@ class WeiboJsonStoreImplement(AbstractStore):
async with aiofiles.open(save_file_name, 'w', encoding='utf-8') as file:
await file.write(json.dumps(save_data, ensure_ascii=False))
async def store_content(self, content_item: Dict):
"""
content JSON storage implementation

View File

@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from db import AsyncMysqlDB
from var import media_crawler_db_var
async def query_content_by_content_id(content_id: str) -> Dict:
"""
查询一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from weibo_note where note_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_content(content_item: Dict) -> int:
"""
新增一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("weibo_note", content_item)
return last_row_id
async def update_content_by_content_id(content_id: str, content_item: Dict) -> int:
"""
更新一条记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("weibo_note", content_item, "note_id", content_id)
return effect_row
async def query_comment_by_comment_id(comment_id: str) -> Dict:
"""
查询一条评论内容
Args:
comment_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from weibo_note_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_comment(comment_item: Dict) -> int:
"""
新增一条评论记录
Args:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("weibo_note_comment", comment_item)
return last_row_id
async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int:
"""
更新增一条评论记录
Args:
comment_id:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("weibo_note_comment", comment_item, "comment_id", comment_id)
return effect_row

View File

@@ -118,6 +118,7 @@ async def save_creator(user_id: str, creator: Dict):
'fans': fans,
'interaction': interaction,
'tag_list': json.dumps({tag.get('tagType'): tag.get('name') for tag in creator.get('tags')}, ensure_ascii=False),
"last_modify_ts": utils.get_current_timestamp(),
}
utils.logger.info(f"[store.xhs.save_creator] creator:{local_db_item}")
await XhsStoreFactory.create_store().store_creator(local_db_item)

View File

@@ -13,8 +13,9 @@ import aiofiles
from tortoise.contrib.pydantic import pydantic_model_creator
from base.base_crawler import AbstractStore
from db import AsyncMysqlDB
from tools import utils
from var import crawler_type_var
from var import crawler_type_var, media_crawler_db_var
class XhsCsvStoreImplement(AbstractStore):
@@ -94,19 +95,16 @@ class XhsDbStoreImplement(AbstractStore):
Returns:
"""
from .xhs_store_db_types import XHSNote
from .xhs_store_sql import (add_new_content,
query_content_by_content_id,
update_content_by_content_id)
note_id = content_item.get("note_id")
if not await XHSNote.filter(note_id=note_id).first():
note_detail: Dict = await query_content_by_content_id(content_id=note_id)
if not note_detail:
content_item["add_ts"] = utils.get_current_timestamp()
note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticCreate", exclude=('id',))
note_data = note_pydantic(**content_item)
note_pydantic.model_validate(note_data)
await XHSNote.create(**note_data.model_dump())
await add_new_content(content_item)
else:
note_pydantic = pydantic_model_creator(XHSNote, name="XHSPydanticUpdate", exclude=('id', 'add_ts'))
note_data = note_pydantic(**content_item)
note_pydantic.model_validate(note_data)
await XHSNote.filter(note_id=note_id).update(**note_data.model_dump())
await update_content_by_content_id(note_id, content_item=content_item)
async def store_comment(self, comment_item: Dict):
"""
@@ -117,20 +115,16 @@ class XhsDbStoreImplement(AbstractStore):
Returns:
"""
from .xhs_store_db_types import XHSNoteComment
from .xhs_store_sql import (add_new_comment,
query_comment_by_comment_id,
update_comment_by_comment_id)
comment_id = comment_item.get("comment_id")
if not await XHSNoteComment.filter(comment_id=comment_id).first():
comment_detail: Dict = await query_comment_by_comment_id(comment_id=comment_id)
if not comment_detail:
comment_item["add_ts"] = utils.get_current_timestamp()
comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticCreate", exclude=('id',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await XHSNoteComment.create(**comment_data.model_dump())
await add_new_comment(comment_item)
else:
comment_pydantic = pydantic_model_creator(XHSNoteComment, name="CommentPydanticUpdate",
exclude=('id', 'add_ts',))
comment_data = comment_pydantic(**comment_item)
comment_pydantic.model_validate(comment_data)
await XHSNoteComment.filter(comment_id=comment_id).update(**comment_data.model_dump())
await update_comment_by_comment_id(comment_id, comment_item=comment_item)
async def store_creator(self, creator: Dict):
"""
@@ -141,21 +135,15 @@ class XhsDbStoreImplement(AbstractStore):
Returns:
"""
from .xhs_store_db_types import XhsCreator
from .xhs_store_sql import (add_new_creator, query_creator_by_user_id,
update_creator_by_user_id)
user_id = creator.get("user_id")
if not await XhsCreator.filter(user_id=user_id).first():
user_detail: Dict = await query_creator_by_user_id(user_id)
if not user_detail:
creator["add_ts"] = utils.get_current_timestamp()
creator["last_modify_ts"] = creator["add_ts"]
creator_pydantic = pydantic_model_creator(XhsCreator, name="CreatorPydanticCreate", exclude=('id',))
creator_data = creator_pydantic(**creator)
creator_pydantic.model_validate(creator_data)
await XhsCreator.create(**creator_data.model_dump())
await add_new_creator(creator)
else:
creator["last_modify_ts"] = utils.get_current_timestamp()
creator_pydantic = pydantic_model_creator(XhsCreator, name="CreatorPydanticUpdate", exclude=('id', 'add_ts',))
creator_data = creator_pydantic(**creator)
creator_pydantic.model_validate(creator_data)
await XhsCreator.filter(user_id=user_id).update(**creator_data.model_dump())
await update_creator_by_user_id(user_id, creator)
class XhsJsonStoreImplement(AbstractStore):
@@ -217,7 +205,7 @@ class XhsJsonStoreImplement(AbstractStore):
"""
await self.save_data_to_json(comment_item, "comments")
async def store_creator(self, creator: Dict):
"""
Xiaohongshu content JSON storage implementation
@@ -227,4 +215,4 @@ class XhsJsonStoreImplement(AbstractStore):
Returns:
"""
await self.save_data_to_json(creator, "creator")
await self.save_data_to_json(creator, "creator")

148
store/xhs/xhs_store_sql.py Normal file
View File

@@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/6 15:30
# @Desc : sql接口集合
from typing import Dict, List
from db import AsyncMysqlDB
from var import media_crawler_db_var
async def query_content_by_content_id(content_id: str) -> Dict:
"""
查询一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from xhs_note where note_id = '{content_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_content(content_item: Dict) -> int:
"""
新增一条内容记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("xhs_note", content_item)
return last_row_id
async def update_content_by_content_id(content_id: str, content_item: Dict) -> int:
"""
更新一条记录xhs的帖子 抖音的视频 微博 快手视频 ...
Args:
content_id:
content_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("xhs_note", content_item, "note_id", content_id)
return effect_row
async def query_comment_by_comment_id(comment_id: str) -> Dict:
"""
查询一条评论内容
Args:
comment_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from xhs_note_comment where comment_id = '{comment_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_comment(comment_item: Dict) -> int:
"""
新增一条评论记录
Args:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("xhs_note_comment", comment_item)
return last_row_id
async def update_comment_by_comment_id(comment_id: str, comment_item: Dict) -> int:
"""
更新增一条评论记录
Args:
comment_id:
comment_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("xhs_note_comment", comment_item, "comment_id", comment_id)
return effect_row
async def query_creator_by_user_id(user_id: str) -> Dict:
"""
查询一条创作者记录
Args:
user_id:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
sql: str = f"select * from xhs_creator where user_id = '{user_id}'"
rows: List[Dict] = await async_db_conn.query(sql)
if len(rows) > 0:
return rows[0]
return dict()
async def add_new_creator(creator_item: Dict) -> int:
"""
新增一条创作者信息
Args:
creator_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
last_row_id: int = await async_db_conn.item_to_table("xhs_creator", creator_item)
return last_row_id
async def update_creator_by_user_id(user_id: str, creator_item: Dict) -> int:
"""
更新一条创作者信息
Args:
user_id:
creator_item:
Returns:
"""
async_db_conn: AsyncMysqlDB = media_crawler_db_var.get()
effect_row: int = await async_db_conn.update_table("xhs_creator", creator_item, "user_id", user_id)
return effect_row