Files
MediaCrawler/test/test_mongodb_integration.py
程序员阿江(Relakkes) 157ddfb21b i18n: translate all Chinese comments, docstrings, and logger messages to English
Comprehensive translation of Chinese text to English across the entire codebase:

- api/: FastAPI server documentation and logger messages
- cache/: Cache abstraction layer comments and docstrings
- database/: Database models and MongoDB store documentation
- media_platform/: All platform crawlers (Bilibili, Douyin, Kuaishou, Tieba, Weibo, Xiaohongshu, Zhihu)
- model/: Data model documentation
- proxy/: Proxy pool and provider documentation
- store/: Data storage layer comments
- tools/: Utility functions and browser automation
- test/: Test file documentation

Preserved: Chinese disclaimer header (lines 10-18) for legal compliance

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-26 23:27:19 +08:00

385 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2025 relakkes@gmail.com
#
# This file is part of MediaCrawler project.
# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/test/test_mongodb_integration.py
# GitHub: https://github.com/NanmiCoder
# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1
#
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio
import unittest
import sys
import os
from datetime import datetime
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from database.mongodb_store_base import MongoDBConnection, MongoDBStoreBase
from store.xhs._store_impl import XhsMongoStoreImplement
from store.douyin._store_impl import DouyinMongoStoreImplement
from config import db_config
class TestMongoDBRealConnection(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
conn = MongoDBConnection()
asyncio.run(conn._connect())
cls.mongodb_available = True
print("\n✓ MongoDB connection successful")
except Exception as e:
cls.mongodb_available = False
print(f"\n✗ MongoDB connection failed: {e}")
def setUp(self):
if not self.mongodb_available:
self.skipTest("MongoDB not available")
MongoDBConnection._instance = None
MongoDBConnection._client = None
MongoDBConnection._db = None
def tearDown(self):
if self.mongodb_available:
conn = MongoDBConnection()
asyncio.run(conn.close())
@classmethod
def tearDownClass(cls):
if cls.mongodb_available:
async def cleanup():
conn = MongoDBConnection()
db = await conn.get_db()
test_collections = [
"test_xhs_contents",
"test_xhs_comments",
"test_xhs_creators",
"test_douyin_contents",
"test_douyin_comments",
"test_douyin_creators"
]
for collection_name in test_collections:
try:
await db[collection_name].drop()
except:
pass
await conn.close()
try:
asyncio.run(cleanup())
print("\n✓ Test data cleanup completed")
except Exception as e:
print(f"\n✗ Error cleaning up test data: {e}")
def test_real_connection(self):
async def test():
conn = MongoDBConnection()
client = await conn.get_client()
db = await conn.get_db()
self.assertIsNotNone(client)
self.assertIsNotNone(db)
result = await db.command("ping")
self.assertEqual(result.get("ok"), 1.0)
asyncio.run(test())
def test_real_save_and_query(self):
async def test():
store = MongoDBStoreBase(collection_prefix="test_xhs")
test_data = {
"note_id": "test_note_001",
"title": "Test Note",
"content": "This is a test content",
"created_at": datetime.now().isoformat()
}
result = await store.save_or_update(
"contents",
{"note_id": "test_note_001"},
test_data
)
self.assertTrue(result)
found = await store.find_one(
"contents",
{"note_id": "test_note_001"}
)
self.assertIsNotNone(found)
self.assertEqual(found["note_id"], "test_note_001")
self.assertEqual(found["title"], "Test Note")
asyncio.run(test())
def test_real_update(self):
async def test():
store = MongoDBStoreBase(collection_prefix="test_xhs")
initial_data = {
"note_id": "test_note_002",
"title": "Initial Title",
"likes": 10
}
await store.save_or_update(
"contents",
{"note_id": "test_note_002"},
initial_data
)
updated_data = {
"note_id": "test_note_002",
"title": "Updated Title",
"likes": 100
}
await store.save_or_update(
"contents",
{"note_id": "test_note_002"},
updated_data
)
found = await store.find_one(
"contents",
{"note_id": "test_note_002"}
)
self.assertEqual(found["title"], "Updated Title")
self.assertEqual(found["likes"], 100)
asyncio.run(test())
def test_real_find_many(self):
async def test():
store = MongoDBStoreBase(collection_prefix="test_xhs")
test_user_id = "test_user_123"
for i in range(5):
data = {
"note_id": f"test_note_{i:03d}",
"user_id": test_user_id,
"title": f"Test Note {i}",
"likes": i * 10
}
await store.save_or_update(
"contents",
{"note_id": data["note_id"]},
data
)
results = await store.find_many(
"contents",
{"user_id": test_user_id}
)
self.assertGreaterEqual(len(results), 5)
limited_results = await store.find_many(
"contents",
{"user_id": test_user_id},
limit=3
)
self.assertEqual(len(limited_results), 3)
asyncio.run(test())
def test_real_create_index(self):
async def test():
store = MongoDBStoreBase(collection_prefix="test_xhs")
await store.create_index(
"contents",
[("note_id", 1)],
unique=True
)
collection = await store.get_collection("contents")
indexes = await collection.index_information()
self.assertIn("note_id_1", indexes)
asyncio.run(test())
def test_xhs_store_implementation(self):
async def test():
store = XhsMongoStoreImplement()
note_data = {
"note_id": "xhs_test_001",
"user_id": "user_001",
"nickname": "Test User",
"title": "Xiaohongshu Test Note",
"desc": "This is a test note",
"type": "normal",
"liked_count": "100",
"collected_count": "50",
"comment_count": "20"
}
await store.store_content(note_data)
comment_data = {
"comment_id": "comment_001",
"note_id": "xhs_test_001",
"user_id": "user_002",
"nickname": "Comment User",
"content": "This is a test comment",
"like_count": "10"
}
await store.store_comment(comment_data)
creator_data = {
"user_id": "user_001",
"nickname": "Test Creator",
"desc": "This is a test creator",
"fans": "1000",
"follows": "100"
}
await store.store_creator(creator_data)
mongo_store = store.mongo_store
note = await mongo_store.find_one("contents", {"note_id": "xhs_test_001"})
self.assertIsNotNone(note)
self.assertEqual(note["title"], "Xiaohongshu Test Note")
comment = await mongo_store.find_one("comments", {"comment_id": "comment_001"})
self.assertIsNotNone(comment)
self.assertEqual(comment["content"], "This is a test comment")
creator = await mongo_store.find_one("creators", {"user_id": "user_001"})
self.assertIsNotNone(creator)
self.assertEqual(creator["nickname"], "Test Creator")
asyncio.run(test())
def test_douyin_store_implementation(self):
async def test():
store = DouyinMongoStoreImplement()
video_data = {
"aweme_id": "dy_test_001",
"user_id": "user_001",
"nickname": "Test User",
"title": "Douyin Test Video",
"desc": "This is a test video",
"liked_count": "1000",
"comment_count": "100"
}
await store.store_content(video_data)
comment_data = {
"comment_id": "dy_comment_001",
"aweme_id": "dy_test_001",
"user_id": "user_002",
"nickname": "Comment User",
"content": "This is a test comment"
}
await store.store_comment(comment_data)
creator_data = {
"user_id": "user_001",
"nickname": "Test Creator",
"desc": "This is a test creator"
}
await store.store_creator(creator_data)
mongo_store = store.mongo_store
video = await mongo_store.find_one("contents", {"aweme_id": "dy_test_001"})
self.assertIsNotNone(video)
self.assertEqual(video["title"], "Douyin Test Video")
comment = await mongo_store.find_one("comments", {"comment_id": "dy_comment_001"})
self.assertIsNotNone(comment)
creator = await mongo_store.find_one("creators", {"user_id": "user_001"})
self.assertIsNotNone(creator)
asyncio.run(test())
def test_concurrent_operations(self):
async def test():
store = MongoDBStoreBase(collection_prefix="test_xhs")
tasks = []
for i in range(10):
data = {
"note_id": f"concurrent_note_{i:03d}",
"title": f"Concurrent Test Note {i}",
"content": f"Content {i}"
}
task = store.save_or_update(
"contents",
{"note_id": data["note_id"]},
data
)
tasks.append(task)
results = await asyncio.gather(*tasks)
self.assertTrue(all(results))
for i in range(10):
found = await store.find_one(
"contents",
{"note_id": f"concurrent_note_{i:03d}"}
)
self.assertIsNotNone(found)
asyncio.run(test())
def run_integration_tests():
loader = unittest.TestLoader()
suite = unittest.TestSuite()
suite.addTests(loader.loadTestsFromTestCase(TestMongoDBRealConnection))
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
return result
if __name__ == "__main__":
print("="*70)
print("MongoDB Storage Integration Test")
print("="*70)
print(f"MongoDB Configuration:")
print(f" Host: {db_config.MONGODB_HOST}")
print(f" Port: {db_config.MONGODB_PORT}")
print(f" Database: {db_config.MONGODB_DB_NAME}")
print("="*70)
result = run_integration_tests()
print("\n" + "="*70)
print("Test Statistics:")
print(f"Total tests: {result.testsRun}")
print(f"Passed: {result.testsRun - len(result.failures) - len(result.errors)}")
print(f"Failed: {len(result.failures)}")
print(f"Errors: {len(result.errors)}")
print(f"Skipped: {len(result.skipped)}")
print("="*70)
sys.exit(0 if result.wasSuccessful() else 1)