Files
MediaCrawler/store/xhs/__init__.py
买定不离手 1298022410 refactor: 更新各平台store模块初始化以支持SQLite
- 更新 store/bilibili/__init__.py 文件,导入SQLite存储实现类和相关模块
- 更新 store/douyin/__init__.py 文件,集成抖音平台的SQLite数据存储接口
- 更新 store/kuaishou/__init__.py 文件,添加快手平台SQLite存储模块的导入声明
- 更新 store/tieba/__init__.py 文件,引入贴吧平台SQLite数据库操作模块
- 更新 store/weibo/__init__.py 文件,整合微博平台SQLite存储功能模块
- 更新 store/xhs/__init__.py 文件,导入小红书平台SQLite数据存储实现
- 更新 store/zhihu/__init__.py 文件,集成知乎平台SQLite数据库存储模块
2025-07-14 03:51:08 +08:00

229 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/1/14 17:34
# @Desc :
from typing import List
import config
from var import source_keyword_var
from . import xhs_store_impl
from .xhs_store_image import *
from .xhs_store_impl import *
class XhsStoreFactory:
STORES = {
"csv": XhsCsvStoreImplement,
"db": XhsDbStoreImplement,
"json": XhsJsonStoreImplement,
"sqlite": XhsSqliteStoreImplement
}
@staticmethod
def create_store() -> AbstractStore:
store_class = XhsStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
return store_class()
def get_video_url_arr(note_item: Dict) -> List:
"""
获取视频url数组
Args:
note_item:
Returns:
"""
if note_item.get('type') != 'video':
return []
videoArr = []
originVideoKey = note_item.get('video').get('consumer').get('origin_video_key')
if originVideoKey == '':
originVideoKey = note_item.get('video').get('consumer').get('originVideoKey')
# 降级有水印
if originVideoKey == '':
videos = note_item.get('video').get('media').get('stream').get('h264')
if type(videos).__name__ == 'list':
videoArr = [v.get('master_url') for v in videos]
else:
videoArr = [f"http://sns-video-bd.xhscdn.com/{originVideoKey}"]
return videoArr
async def update_xhs_note(note_item: Dict):
"""
更新小红书笔记
Args:
note_item:
Returns:
"""
note_id = note_item.get("note_id")
user_info = note_item.get("user", {})
interact_info = note_item.get("interact_info", {})
image_list: List[Dict] = note_item.get("image_list", [])
tag_list: List[Dict] = note_item.get("tag_list", [])
for img in image_list:
if img.get('url_default') != '':
img.update({'url': img.get('url_default')})
video_url = ','.join(get_video_url_arr(note_item))
local_db_item = {
"note_id": note_item.get("note_id"), # 帖子id
"type": note_item.get("type"), # 帖子类型
"title": note_item.get("title") or note_item.get("desc", "")[:255], # 帖子标题
"desc": note_item.get("desc", ""), # 帖子描述
"video_url": video_url, # 帖子视频url
"time": note_item.get("time"), # 帖子发布时间
"last_update_time": note_item.get("last_update_time", 0), # 帖子最后更新时间
"user_id": user_info.get("user_id"), # 用户id
"nickname": user_info.get("nickname"), # 用户昵称
"avatar": user_info.get("avatar"), # 用户头像
"liked_count": interact_info.get("liked_count"), # 点赞数
"collected_count": interact_info.get("collected_count"), # 收藏数
"comment_count": interact_info.get("comment_count"), # 评论数
"share_count": interact_info.get("share_count"), # 分享数
"ip_location": note_item.get("ip_location", ""), # ip地址
"image_list": ','.join([img.get('url', '') for img in image_list]), # 图片url
"tag_list": ','.join([tag.get('name', '') for tag in tag_list if tag.get('type') == 'topic']), # 标签
"last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间
"note_url": f"https://www.xiaohongshu.com/explore/{note_id}?xsec_token={note_item.get('xsec_token')}&xsec_source=pc_search", # 帖子url
"source_keyword": source_keyword_var.get(), # 搜索关键词
"xsec_token": note_item.get("xsec_token"), # xsec_token
}
utils.logger.info(f"[store.xhs.update_xhs_note] xhs note: {local_db_item}")
await XhsStoreFactory.create_store().store_content(local_db_item)
async def batch_update_xhs_note_comments(note_id: str, comments: List[Dict]):
"""
批量更新小红书笔记评论
Args:
note_id:
comments:
Returns:
"""
if not comments:
return
for comment_item in comments:
await update_xhs_note_comment(note_id, comment_item)
async def update_xhs_note_comment(note_id: str, comment_item: Dict):
"""
更新小红书笔记评论
Args:
note_id:
comment_item:
Returns:
"""
user_info = comment_item.get("user_info", {})
comment_id = comment_item.get("id")
comment_pictures = [item.get("url_default", "") for item in comment_item.get("pictures", [])]
target_comment = comment_item.get("target_comment", {})
local_db_item = {
"comment_id": comment_id, # 评论id
"create_time": comment_item.get("create_time"), # 评论时间
"ip_location": comment_item.get("ip_location"), # ip地址
"note_id": note_id, # 帖子id
"content": comment_item.get("content"), # 评论内容
"user_id": user_info.get("user_id"), # 用户id
"nickname": user_info.get("nickname"), # 用户昵称
"avatar": user_info.get("image"), # 用户头像
"sub_comment_count": comment_item.get("sub_comment_count", 0), # 子评论数
"pictures": ",".join(comment_pictures), # 评论图片
"parent_comment_id": target_comment.get("id", 0), # 父评论id
"last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间
"like_count": comment_item.get("like_count", 0),
}
utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}")
await XhsStoreFactory.create_store().store_comment(local_db_item)
async def save_creator(user_id: str, creator: Dict):
"""
保存小红书创作者
Args:
user_id:
creator:
Returns:
"""
user_info = creator.get('basicInfo', {})
follows = 0
fans = 0
interaction = 0
for i in creator.get('interactions'):
if i.get('type') == 'follows':
follows = i.get('count')
elif i.get('type') == 'fans':
fans = i.get('count')
elif i.get('type') == 'interaction':
interaction = i.get('count')
def get_gender(gender):
if gender == 1:
return ''
elif gender == 0:
return ''
else:
return None
local_db_item = {
'user_id': user_id, # 用户id
'nickname': user_info.get('nickname'), # 昵称
'gender': get_gender(user_info.get('gender')), # 性别
'avatar': user_info.get('images'), # 头像
'desc': user_info.get('desc'), # 个人描述
'ip_location': user_info.get('ipLocation'), # ip地址
'follows': follows, # 关注数
'fans': fans, # 粉丝数
'interaction': interaction, # 互动数
'tag_list': json.dumps({tag.get('tagType'): tag.get('name') for tag in creator.get('tags')},
ensure_ascii=False), # 标签
"last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间
}
utils.logger.info(f"[store.xhs.save_creator] creator:{local_db_item}")
await XhsStoreFactory.create_store().store_creator(local_db_item)
async def update_xhs_note_image(note_id, pic_content, extension_file_name):
"""
更新小红书笔
Args:
note_id:
pic_content:
extension_file_name:
Returns:
"""
await XiaoHongShuImage().store_image(
{"notice_id": note_id, "pic_content": pic_content, "extension_file_name": extension_file_name})