From b61ec54a72db035d48c6895b899fb4c54a660a8f Mon Sep 17 00:00:00 2001 From: yt210 <1973705434@qq.com> Date: Fri, 7 Nov 2025 17:42:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mongodb=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A7=BB=E5=8A=A8?= =?UTF-8?q?=E5=AD=98=E5=82=A8=E5=9F=BA=E7=B1=BB=E4=BD=8D=E7=BD=AE=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- {store => database}/mongodb_store_base.py | 111 +++++++--------------- store/bilibili/_store_impl.py | 2 +- store/douyin/_store_impl.py | 2 +- store/kuaishou/_store_impl.py | 2 +- store/tieba/_store_impl.py | 2 +- store/weibo/_store_impl.py | 2 +- store/xhs/_store_impl.py | 2 +- store/zhihu/_store_impl.py | 2 +- 8 files changed, 40 insertions(+), 85 deletions(-) rename {store => database}/mongodb_store_base.py (50%) diff --git a/store/mongodb_store_base.py b/database/mongodb_store_base.py similarity index 50% rename from store/mongodb_store_base.py rename to database/mongodb_store_base.py index a3c0654..8c1a556 100644 --- a/store/mongodb_store_base.py +++ b/database/mongodb_store_base.py @@ -1,8 +1,5 @@ # -*- coding: utf-8 -*- -""" -MongoDB存储基类 -提供MongoDB连接管理和通用存储方法 -""" +"""MongoDB存储基类:提供连接管理和通用存储方法""" import asyncio from typing import Dict, List, Optional from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection @@ -11,7 +8,7 @@ from tools import utils class MongoDBConnection: - """MongoDB连接管理单例类""" + """MongoDB连接管理(单例模式)""" _instance = None _client: Optional[AsyncIOMotorClient] = None _db: Optional[AsyncIOMotorDatabase] = None @@ -23,7 +20,7 @@ class MongoDBConnection: return cls._instance async def get_client(self) -> AsyncIOMotorClient: - """获取MongoDB客户端""" + """获取客户端""" if self._client is None: async with self._lock: if self._client is None: @@ -31,7 +28,7 @@ class MongoDBConnection: return self._client async def get_db(self) -> AsyncIOMotorDatabase: - """获取MongoDB数据库""" + """获取数据库""" if self._db is None: async with self._lock: if self._db is None: @@ -39,135 +36,93 @@ class MongoDBConnection: return self._db async def _connect(self): - """建立MongoDB连接""" + """建立连接""" try: mongo_config = db_config.mongodb_config - host = mongo_config.get("host", "localhost") - port = mongo_config.get("port", 27017) - user = mongo_config.get("user", "") - password = mongo_config.get("password", "") - db_name = mongo_config.get("db_name", "media_crawler") + host = mongo_config["host"] + port = mongo_config["port"] + user = mongo_config["user"] + password = mongo_config["password"] + db_name = mongo_config["db_name"] - # 构建连接URL + # 构建连接URL(有认证/无认证) if user and password: connection_url = f"mongodb://{user}:{password}@{host}:{port}/" else: connection_url = f"mongodb://{host}:{port}/" self._client = AsyncIOMotorClient(connection_url, serverSelectionTimeoutMS=5000) - # 测试连接 - await self._client.server_info() + await self._client.server_info() # 测试连接 self._db = self._client[db_name] - utils.logger.info(f"[MongoDBConnection] Successfully connected to MongoDB at {host}:{port}, database: {db_name}") + utils.logger.info(f"[MongoDBConnection] Connected to {host}:{port}/{db_name}") except Exception as e: - utils.logger.error(f"[MongoDBConnection] Failed to connect to MongoDB: {e}") + utils.logger.error(f"[MongoDBConnection] Connection failed: {e}") raise async def close(self): - """关闭MongoDB连接""" + """关闭连接""" if self._client is not None: self._client.close() self._client = None self._db = None - utils.logger.info("[MongoDBConnection] MongoDB connection closed") + utils.logger.info("[MongoDBConnection] Connection closed") class MongoDBStoreBase: - """MongoDB存储基类""" + """MongoDB存储基类:提供通用的CRUD操作""" def __init__(self, collection_prefix: str): - """ - 初始化MongoDB存储基类 + """初始化存储基类 Args: - collection_prefix: 集合名称前缀(如:xhs, douyin, bilibili等) + collection_prefix: 平台前缀(xhs/douyin/bilibili等) """ self.collection_prefix = collection_prefix self._connection = MongoDBConnection() async def get_collection(self, collection_suffix: str) -> AsyncIOMotorCollection: - """ - 获取MongoDB集合 - Args: - collection_suffix: 集合名称后缀(如:contents, comments, creators) - Returns: - MongoDB集合对象 - """ + """获取集合:{prefix}_{suffix}""" db = await self._connection.get_db() collection_name = f"{self.collection_prefix}_{collection_suffix}" return db[collection_name] async def save_or_update(self, collection_suffix: str, query: Dict, data: Dict) -> bool: - """ - 保存或更新数据(upsert操作) - Args: - collection_suffix: 集合名称后缀 - query: 查询条件 - data: 要保存的数据 - Returns: - 是否成功 - """ + """保存或更新数据(upsert)""" try: collection = await self.get_collection(collection_suffix) - result = await collection.update_one( - query, - {"$set": data}, - upsert=True - ) + await collection.update_one(query, {"$set": data}, upsert=True) return True except Exception as e: - utils.logger.error(f"[MongoDBStoreBase.save_or_update] Failed to save data to {self.collection_prefix}_{collection_suffix}: {e}") + utils.logger.error(f"[MongoDBStoreBase] Save failed ({self.collection_prefix}_{collection_suffix}): {e}") return False async def find_one(self, collection_suffix: str, query: Dict) -> Optional[Dict]: - """ - 查询单条数据 - Args: - collection_suffix: 集合名称后缀 - query: 查询条件 - Returns: - 查询结果 - """ + """查询单条数据""" try: collection = await self.get_collection(collection_suffix) - result = await collection.find_one(query) - return result + return await collection.find_one(query) except Exception as e: - utils.logger.error(f"[MongoDBStoreBase.find_one] Failed to query from {self.collection_prefix}_{collection_suffix}: {e}") + utils.logger.error(f"[MongoDBStoreBase] Find one failed ({self.collection_prefix}_{collection_suffix}): {e}") return None async def find_many(self, collection_suffix: str, query: Dict, limit: int = 0) -> List[Dict]: - """ - 查询多条数据 - Args: - collection_suffix: 集合名称后缀 - query: 查询条件 - limit: 限制返回数量,0表示不限制 - Returns: - 查询结果列表 - """ + """查询多条数据(limit=0表示不限制)""" try: collection = await self.get_collection(collection_suffix) cursor = collection.find(query) if limit > 0: cursor = cursor.limit(limit) - results = await cursor.to_list(length=None) - return results + return await cursor.to_list(length=None) except Exception as e: - utils.logger.error(f"[MongoDBStoreBase.find_many] Failed to query from {self.collection_prefix}_{collection_suffix}: {e}") + utils.logger.error(f"[MongoDBStoreBase] Find many failed ({self.collection_prefix}_{collection_suffix}): {e}") return [] async def create_index(self, collection_suffix: str, keys: List[tuple], unique: bool = False): - """ - 创建索引 - Args: - collection_suffix: 集合名称后缀 - keys: 索引键列表,例如:[("note_id", 1)] - unique: 是否创建唯一索引 - """ + """创建索引:keys=[("field", 1)]""" try: collection = await self.get_collection(collection_suffix) await collection.create_index(keys, unique=unique) - utils.logger.info(f"[MongoDBStoreBase.create_index] Created index on {self.collection_prefix}_{collection_suffix}") + utils.logger.info(f"[MongoDBStoreBase] Index created on {self.collection_prefix}_{collection_suffix}") except Exception as e: - utils.logger.error(f"[MongoDBStoreBase.create_index] Failed to create index: {e}") + utils.logger.error(f"[MongoDBStoreBase] Create index failed: {e}") + diff --git a/store/bilibili/_store_impl.py b/store/bilibili/_store_impl.py index d7f2ede..9e79cf5 100644 --- a/store/bilibili/_store_impl.py +++ b/store/bilibili/_store_impl.py @@ -31,7 +31,7 @@ from database.models import BilibiliVideoComment, BilibiliVideo, BilibiliUpInfo, from tools.async_file_writer import AsyncFileWriter from tools import utils, words from var import crawler_type_var -from store.mongodb_store_base import MongoDBStoreBase +from database.mongodb_store_base import MongoDBStoreBase class BiliCsvStoreImplement(AbstractStore): diff --git a/store/douyin/_store_impl.py b/store/douyin/_store_impl.py index 1b2eb8c..4063b47 100644 --- a/store/douyin/_store_impl.py +++ b/store/douyin/_store_impl.py @@ -28,7 +28,7 @@ from database.models import DouyinAweme, DouyinAwemeComment, DyCreator from tools import utils, words from tools.async_file_writer import AsyncFileWriter from var import crawler_type_var -from store.mongodb_store_base import MongoDBStoreBase +from database.mongodb_store_base import MongoDBStoreBase class DouyinCsvStoreImplement(AbstractStore): diff --git a/store/kuaishou/_store_impl.py b/store/kuaishou/_store_impl.py index 8cc6202..d77792d 100644 --- a/store/kuaishou/_store_impl.py +++ b/store/kuaishou/_store_impl.py @@ -30,7 +30,7 @@ from database.db_session import get_session from database.models import KuaishouVideo, KuaishouVideoComment from tools import utils, words from var import crawler_type_var -from store.mongodb_store_base import MongoDBStoreBase +from database.mongodb_store_base import MongoDBStoreBase def calculate_number_of_files(file_store_path: str) -> int: diff --git a/store/tieba/_store_impl.py b/store/tieba/_store_impl.py index 2b99750..aa5d8b1 100644 --- a/store/tieba/_store_impl.py +++ b/store/tieba/_store_impl.py @@ -31,7 +31,7 @@ from tools import utils, words from database.db_session import get_session from var import crawler_type_var from tools.async_file_writer import AsyncFileWriter -from store.mongodb_store_base import MongoDBStoreBase +from database.mongodb_store_base import MongoDBStoreBase def calculate_number_of_files(file_store_path: str) -> int: diff --git a/store/weibo/_store_impl.py b/store/weibo/_store_impl.py index c864cf8..729f9d1 100644 --- a/store/weibo/_store_impl.py +++ b/store/weibo/_store_impl.py @@ -31,7 +31,7 @@ from tools import utils, words from tools.async_file_writer import AsyncFileWriter from database.db_session import get_session from var import crawler_type_var -from store.mongodb_store_base import MongoDBStoreBase +from database.mongodb_store_base import MongoDBStoreBase def calculate_number_of_files(file_store_path: str) -> int: diff --git a/store/xhs/_store_impl.py b/store/xhs/_store_impl.py index 023d2c2..decaf8f 100644 --- a/store/xhs/_store_impl.py +++ b/store/xhs/_store_impl.py @@ -18,7 +18,7 @@ from database.models import XhsNote, XhsNoteComment, XhsCreator from tools.async_file_writer import AsyncFileWriter from tools.time_util import get_current_timestamp from var import crawler_type_var -from store.mongodb_store_base import MongoDBStoreBase +from database.mongodb_store_base import MongoDBStoreBase from tools import utils class XhsCsvStoreImplement(AbstractStore): diff --git a/store/zhihu/_store_impl.py b/store/zhihu/_store_impl.py index c374632..b52873d 100644 --- a/store/zhihu/_store_impl.py +++ b/store/zhihu/_store_impl.py @@ -31,7 +31,7 @@ from database.models import ZhihuContent, ZhihuComment, ZhihuCreator from tools import utils, words from var import crawler_type_var from tools.async_file_writer import AsyncFileWriter -from store.mongodb_store_base import MongoDBStoreBase +from database.mongodb_store_base import MongoDBStoreBase def calculate_number_of_files(file_store_path: str) -> int: """计算数据保存文件的前部分排序数字,支持每次运行代码不写到同一个文件中