i18n: translate all Chinese comments, docstrings, and logger messages to English

Comprehensive translation of Chinese text to English across the entire codebase:

- api/: FastAPI server documentation and logger messages
- cache/: Cache abstraction layer comments and docstrings
- database/: Database models and MongoDB store documentation
- media_platform/: All platform crawlers (Bilibili, Douyin, Kuaishou, Tieba, Weibo, Xiaohongshu, Zhihu)
- model/: Data model documentation
- proxy/: Proxy pool and provider documentation
- store/: Data storage layer comments
- tools/: Utility functions and browser automation
- test/: Test file documentation

Preserved: Chinese disclaimer header (lines 10-18) for legal compliance

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
程序员阿江(Relakkes)
2025-12-26 23:27:19 +08:00
parent 1544d13dd5
commit 157ddfb21b
93 changed files with 1971 additions and 1955 deletions

View File

@@ -18,32 +18,32 @@
# @Author : persist-1<persist1@126.com>
# @Time : 2025/9/8 00:02
# @Desc : 用于将orm映射模型database/models.py与两种数据库实际结构进行对比并进行更新操作连接数据库->结构比对->差异报告->交互式同步)
# @Tips : 该脚本需要安装依赖'pymysql==1.1.0'
# @Desc : Used to compare ORM mapping model (database/models.py) with actual database structure and perform update operations (connect database -> structure comparison -> difference report -> interactive synchronization)
# @Tips : This script requires dependency 'pymysql==1.1.0'
import os
import sys
from sqlalchemy import create_engine, inspect as sqlalchemy_inspect
from sqlalchemy.schema import MetaData
# 将项目根目录添加到 sys.path
# Add project root directory to sys.path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config.db_config import mysql_db_config, sqlite_db_config
from database.models import Base
def get_mysql_engine():
"""创建并返回一个MySQL数据库引擎"""
"""Create and return a MySQL database engine"""
conn_str = f"mysql+pymysql://{mysql_db_config['user']}:{mysql_db_config['password']}@{mysql_db_config['host']}:{mysql_db_config['port']}/{mysql_db_config['db_name']}"
return create_engine(conn_str)
def get_sqlite_engine():
"""创建并返回一个SQLite数据库引擎"""
"""Create and return a SQLite database engine"""
conn_str = f"sqlite:///{sqlite_db_config['db_path']}"
return create_engine(conn_str)
def get_db_schema(engine):
"""获取数据库的当前表结构"""
"""Get current table structure of the database"""
inspector = sqlalchemy_inspect(engine)
schema = {}
for table_name in inspector.get_table_names():
@@ -54,7 +54,7 @@ def get_db_schema(engine):
return schema
def get_orm_schema():
"""获取ORM模型的表结构"""
"""Get table structure of ORM model"""
schema = {}
for table_name, table in Base.metadata.tables.items():
columns = {}
@@ -64,7 +64,7 @@ def get_orm_schema():
return schema
def compare_schemas(db_schema, orm_schema):
"""比较数据库结构和ORM模型结构返回差异"""
"""Compare database structure with ORM model structure and return differences"""
db_tables = set(db_schema.keys())
orm_tables = set(orm_schema.keys())
@@ -99,42 +99,42 @@ def compare_schemas(db_schema, orm_schema):
}
def print_diff(db_name, diff):
"""打印差异报告"""
print(f"--- {db_name} 数据库结构差异报告 ---")
"""Print difference report"""
print(f"--- {db_name} Database Structure Difference Report ---")
if not any(diff.values()):
print("数据库结构与ORM模型一致无需同步。")
print("Database structure matches ORM model, no synchronization needed.")
return
if diff.get("added_tables"):
print("\n[+] 新增的表:")
print("\n[+] Added tables:")
for table in diff["added_tables"]:
print(f" - {table}")
if diff.get("deleted_tables"):
print("\n[-] 删除的表:")
print("\n[-] Deleted tables:")
for table in diff["deleted_tables"]:
print(f" - {table}")
if diff.get("changed_tables"):
print("\n[*] 变动的表:")
print("\n[*] Changed tables:")
for table, changes in diff["changed_tables"].items():
print(f" - {table}:")
if changes.get("added"):
print(" [+] 新增字段:", ", ".join(changes["added"]))
print(" [+] Added fields:", ", ".join(changes["added"]))
if changes.get("deleted"):
print(" [-] 删除字段:", ", ".join(changes["deleted"]))
print(" [-] Deleted fields:", ", ".join(changes["deleted"]))
if changes.get("modified"):
print(" [*] 修改字段:")
print(" [*] Modified fields:")
for col, types in changes["modified"].items():
print(f" - {col}: {types[0]} -> {types[1]}")
print("--- 报告结束 ---")
print("--- End of Report ---")
def sync_database(engine, diff):
"""将ORM模型同步到数据库"""
"""Synchronize ORM model to database"""
metadata = Base.metadata
# Alembic的上下文配置
# Alembic context configuration
from alembic.migration import MigrationContext
from alembic.operations import Operations
@@ -142,109 +142,109 @@ def sync_database(engine, diff):
ctx = MigrationContext.configure(conn)
op = Operations(ctx)
# 处理删除的表
# Handle deleted tables
for table_name in diff['deleted_tables']:
op.drop_table(table_name)
print(f"已删除表: {table_name}")
print(f"Deleted table: {table_name}")
# 处理新增的表
# Handle added tables
for table_name in diff['added_tables']:
table = metadata.tables.get(table_name)
if table is not None:
table.create(engine)
print(f"已创建表: {table_name}")
print(f"Created table: {table_name}")
# 处理字段变更
# Handle field changes
for table_name, changes in diff['changed_tables'].items():
# 删除字段
# Delete fields
for col_name in changes['deleted']:
op.drop_column(table_name, col_name)
print(f"在表 {table_name} 中已删除字段: {col_name}")
# 新增字段
print(f"Deleted field in table {table_name}: {col_name}")
# Add fields
for col_name in changes['added']:
table = metadata.tables.get(table_name)
column = table.columns.get(col_name)
if column is not None:
op.add_column(table_name, column)
print(f"在表 {table_name} 中已新增字段: {col_name}")
print(f"Added field in table {table_name}: {col_name}")
# 修改字段
# Modify fields
for col_name, types in changes['modified'].items():
table = metadata.tables.get(table_name)
if table is not None:
column = table.columns.get(col_name)
if column is not None:
op.alter_column(table_name, col_name, type_=column.type)
print(f"在表 {table_name} 中已修改字段: {col_name} (类型变为 {column.type})")
print(f"Modified field in table {table_name}: {col_name} (type changed to {column.type})")
def main():
"""主函数"""
"""Main function"""
orm_schema = get_orm_schema()
# 处理 MySQL
# Handle MySQL
try:
mysql_engine = get_mysql_engine()
mysql_schema = get_db_schema(mysql_engine)
mysql_diff = compare_schemas(mysql_schema, orm_schema)
print_diff("MySQL", mysql_diff)
if any(mysql_diff.values()):
choice = input(">>> 需要人工确认是否要将ORM模型同步到MySQL数据库? (y/N): ")
choice = input(">>> Manual confirmation required: Synchronize ORM model to MySQL database? (y/N): ")
if choice.lower() == 'y':
sync_database(mysql_engine, mysql_diff)
print("MySQL数据库同步完成。")
print("MySQL database synchronization completed.")
except Exception as e:
print(f"处理MySQL时出错: {e}")
print(f"Error processing MySQL: {e}")
# 处理 SQLite
# Handle SQLite
try:
sqlite_engine = get_sqlite_engine()
sqlite_schema = get_db_schema(sqlite_engine)
sqlite_diff = compare_schemas(sqlite_schema, orm_schema)
print_diff("SQLite", sqlite_diff)
if any(sqlite_diff.values()):
choice = input(">>> 需要人工确认是否要将ORM模型同步到SQLite数据库? (y/N): ")
choice = input(">>> Manual confirmation required: Synchronize ORM model to SQLite database? (y/N): ")
if choice.lower() == 'y':
# 注意SQLite不支持ALTER COLUMN来修改字段类型这里简化处理
print("警告SQLite的字段修改支持有限此脚本不会执行修改字段类型的操作。")
# Note: SQLite does not support ALTER COLUMN to modify field types, simplified handling here
print("Warning: SQLite has limited support for field modifications, this script will not execute field type modification operations.")
sync_database(sqlite_engine, sqlite_diff)
print("SQLite数据库同步完成。")
print("SQLite database synchronization completed.")
except Exception as e:
print(f"处理SQLite时出错: {e}")
print(f"Error processing SQLite: {e}")
if __name__ == "__main__":
main()
######################### Feedback example #########################
# [*] 变动的表:
# [*] Changed tables:
# - kuaishou_video:
# [*] 修改字段:
# [*] Modified fields:
# - user_id: TEXT -> VARCHAR(64)
# - xhs_note_comment:
# [*] 修改字段:
# [*] Modified fields:
# - comment_id: BIGINT -> VARCHAR(255)
# - zhihu_content:
# [*] 修改字段:
# [*] Modified fields:
# - created_time: BIGINT -> VARCHAR(32)
# - content_id: BIGINT -> VARCHAR(64)
# - zhihu_creator:
# [*] 修改字段:
# [*] Modified fields:
# - user_id: INTEGER -> VARCHAR(64)
# - tieba_note:
# [*] 修改字段:
# [*] Modified fields:
# - publish_time: BIGINT -> VARCHAR(255)
# - tieba_id: INTEGER -> VARCHAR(255)
# - note_id: BIGINT -> VARCHAR(644)
# --- 报告结束 ---
# >>> 需要人工确认是否要将ORM模型同步到MySQL数据库? (y/N): y
# 在表 kuaishou_video 中已修改字段: user_id (类型变为 VARCHAR(64))
# 在表 xhs_note_comment 中已修改字段: comment_id (类型变为 VARCHAR(255))
# 在表 zhihu_content 中已修改字段: created_time (类型变为 VARCHAR(32))
# 在表 zhihu_content 中已修改字段: content_id (类型变为 VARCHAR(64))
# 在表 zhihu_creator 中已修改字段: user_id (类型变为 VARCHAR(64))
# 在表 tieba_note 中已修改字段: publish_time (类型变为 VARCHAR(255))
# 在表 tieba_note 中已修改字段: tieba_id (类型变为 VARCHAR(255))
# 在表 tieba_note 中已修改字段: note_id (类型变为 VARCHAR(644))
# MySQL数据库同步完成。
# --- End of Report ---
# >>> Manual confirmation required: Synchronize ORM model to MySQL database? (y/N): y
# Modified field in table kuaishou_video: user_id (type changed to VARCHAR(64))
# Modified field in table xhs_note_comment: comment_id (type changed to VARCHAR(255))
# Modified field in table zhihu_content: created_time (type changed to VARCHAR(32))
# Modified field in table zhihu_content: content_id (type changed to VARCHAR(64))
# Modified field in table zhihu_creator: user_id (type changed to VARCHAR(64))
# Modified field in table tieba_note: publish_time (type changed to VARCHAR(255))
# Modified field in table tieba_note: tieba_id (type changed to VARCHAR(255))
# Modified field in table tieba_note: note_id (type changed to VARCHAR(644))
# MySQL database synchronization completed.

View File

@@ -45,9 +45,9 @@ class TestExpiringLocalCache(unittest.TestCase):
self.assertIsNone(self.cache.get('key'))
def test_clear(self):
# 设置两个键值对过期时间为11秒
# Set two key-value pairs with expiration time of 11 seconds
self.cache.set('key', 'value', 11)
# 睡眠12秒让cache类的定时任务执行一次
# Sleep for 12 seconds to let the cache class's scheduled task execute once
time.sleep(12)
self.assertIsNone(self.cache.get('key'))

View File

@@ -38,14 +38,14 @@ class TestMongoDBRealConnection(unittest.TestCase):
conn = MongoDBConnection()
asyncio.run(conn._connect())
cls.mongodb_available = True
print("\n✓ MongoDB连接成功")
print("\n✓ MongoDB connection successful")
except Exception as e:
cls.mongodb_available = False
print(f"\n✗ MongoDB连接失败: {e}")
print(f"\n✗ MongoDB connection failed: {e}")
def setUp(self):
if not self.mongodb_available:
self.skipTest("MongoDB不可用")
self.skipTest("MongoDB not available")
MongoDBConnection._instance = None
MongoDBConnection._client = None
@@ -82,9 +82,9 @@ class TestMongoDBRealConnection(unittest.TestCase):
try:
asyncio.run(cleanup())
print("\n测试数据清理完成")
print("\nTest data cleanup completed")
except Exception as e:
print(f"\n清理测试数据时出错: {e}")
print(f"\nError cleaning up test data: {e}")
def test_real_connection(self):
async def test():
@@ -106,8 +106,8 @@ class TestMongoDBRealConnection(unittest.TestCase):
test_data = {
"note_id": "test_note_001",
"title": "测试笔记",
"content": "这是一条测试内容",
"title": "Test Note",
"content": "This is a test content",
"created_at": datetime.now().isoformat()
}
@@ -125,7 +125,7 @@ class TestMongoDBRealConnection(unittest.TestCase):
self.assertIsNotNone(found)
self.assertEqual(found["note_id"], "test_note_001")
self.assertEqual(found["title"], "测试笔记")
self.assertEqual(found["title"], "Test Note")
asyncio.run(test())
@@ -135,7 +135,7 @@ class TestMongoDBRealConnection(unittest.TestCase):
initial_data = {
"note_id": "test_note_002",
"title": "初始标题",
"title": "Initial Title",
"likes": 10
}
@@ -147,7 +147,7 @@ class TestMongoDBRealConnection(unittest.TestCase):
updated_data = {
"note_id": "test_note_002",
"title": "更新后的标题",
"title": "Updated Title",
"likes": 100
}
@@ -162,7 +162,7 @@ class TestMongoDBRealConnection(unittest.TestCase):
{"note_id": "test_note_002"}
)
self.assertEqual(found["title"], "更新后的标题")
self.assertEqual(found["title"], "Updated Title")
self.assertEqual(found["likes"], 100)
asyncio.run(test())
@@ -176,7 +176,7 @@ class TestMongoDBRealConnection(unittest.TestCase):
data = {
"note_id": f"test_note_{i:03d}",
"user_id": test_user_id,
"title": f"测试笔记{i}",
"title": f"Test Note {i}",
"likes": i * 10
}
await store.save_or_update(
@@ -226,9 +226,9 @@ class TestMongoDBRealConnection(unittest.TestCase):
note_data = {
"note_id": "xhs_test_001",
"user_id": "user_001",
"nickname": "测试用户",
"title": "小红书测试笔记",
"desc": "这是一条测试笔记",
"nickname": "Test User",
"title": "Xiaohongshu Test Note",
"desc": "This is a test note",
"type": "normal",
"liked_count": "100",
"collected_count": "50",
@@ -240,16 +240,16 @@ class TestMongoDBRealConnection(unittest.TestCase):
"comment_id": "comment_001",
"note_id": "xhs_test_001",
"user_id": "user_002",
"nickname": "评论用户",
"content": "这是一条测试评论",
"nickname": "Comment User",
"content": "This is a test comment",
"like_count": "10"
}
await store.store_comment(comment_data)
creator_data = {
"user_id": "user_001",
"nickname": "测试创作者",
"desc": "这是一个测试创作者",
"nickname": "Test Creator",
"desc": "This is a test creator",
"fans": "1000",
"follows": "100"
}
@@ -259,15 +259,15 @@ class TestMongoDBRealConnection(unittest.TestCase):
note = await mongo_store.find_one("contents", {"note_id": "xhs_test_001"})
self.assertIsNotNone(note)
self.assertEqual(note["title"], "小红书测试笔记")
self.assertEqual(note["title"], "Xiaohongshu Test Note")
comment = await mongo_store.find_one("comments", {"comment_id": "comment_001"})
self.assertIsNotNone(comment)
self.assertEqual(comment["content"], "这是一条测试评论")
self.assertEqual(comment["content"], "This is a test comment")
creator = await mongo_store.find_one("creators", {"user_id": "user_001"})
self.assertIsNotNone(creator)
self.assertEqual(creator["nickname"], "测试创作者")
self.assertEqual(creator["nickname"], "Test Creator")
asyncio.run(test())
@@ -278,9 +278,9 @@ class TestMongoDBRealConnection(unittest.TestCase):
video_data = {
"aweme_id": "dy_test_001",
"user_id": "user_001",
"nickname": "测试用户",
"title": "抖音测试视频",
"desc": "这是一条测试视频",
"nickname": "Test User",
"title": "Douyin Test Video",
"desc": "This is a test video",
"liked_count": "1000",
"comment_count": "100"
}
@@ -290,15 +290,15 @@ class TestMongoDBRealConnection(unittest.TestCase):
"comment_id": "dy_comment_001",
"aweme_id": "dy_test_001",
"user_id": "user_002",
"nickname": "评论用户",
"content": "这是一条测试评论"
"nickname": "Comment User",
"content": "This is a test comment"
}
await store.store_comment(comment_data)
creator_data = {
"user_id": "user_001",
"nickname": "测试创作者",
"desc": "这是一个测试创作者"
"nickname": "Test Creator",
"desc": "This is a test creator"
}
await store.store_creator(creator_data)
@@ -306,7 +306,7 @@ class TestMongoDBRealConnection(unittest.TestCase):
video = await mongo_store.find_one("contents", {"aweme_id": "dy_test_001"})
self.assertIsNotNone(video)
self.assertEqual(video["title"], "抖音测试视频")
self.assertEqual(video["title"], "Douyin Test Video")
comment = await mongo_store.find_one("comments", {"comment_id": "dy_comment_001"})
self.assertIsNotNone(comment)
@@ -324,8 +324,8 @@ class TestMongoDBRealConnection(unittest.TestCase):
for i in range(10):
data = {
"note_id": f"concurrent_note_{i:03d}",
"title": f"并发测试笔记{i}",
"content": f"内容{i}"
"title": f"Concurrent Test Note {i}",
"content": f"Content {i}"
}
task = store.save_or_update(
"contents",
@@ -362,9 +362,9 @@ def run_integration_tests():
if __name__ == "__main__":
print("="*70)
print("MongoDB存储集成测试")
print("MongoDB Storage Integration Test")
print("="*70)
print(f"MongoDB配置:")
print(f"MongoDB Configuration:")
print(f" Host: {db_config.MONGODB_HOST}")
print(f" Port: {db_config.MONGODB_PORT}")
print(f" Database: {db_config.MONGODB_DB_NAME}")
@@ -373,12 +373,12 @@ if __name__ == "__main__":
result = run_integration_tests()
print("\n" + "="*70)
print("测试统计:")
print(f"总测试数: {result.testsRun}")
print(f"成功: {result.testsRun - len(result.failures) - len(result.errors)}")
print(f"失败: {len(result.failures)}")
print(f"错误: {len(result.errors)}")
print(f"跳过: {len(result.skipped)}")
print("Test Statistics:")
print(f"Total tests: {result.testsRun}")
print(f"Passed: {result.testsRun - len(result.failures) - len(result.errors)}")
print(f"Failed: {len(result.failures)}")
print(f"Errors: {len(result.errors)}")
print(f"Skipped: {len(result.skipped)}")
print("="*70)
sys.exit(0 if result.wasSuccessful() else 1)

View File

@@ -37,109 +37,109 @@ class TestIpPool(IsolatedAsyncioTestCase):
for _ in range(3):
ip_proxy_info: IpInfoModel = await pool.get_proxy()
print(ip_proxy_info)
self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功")
self.assertIsNotNone(ip_proxy_info.ip, msg="Verify if IP is obtained successfully")
async def test_ip_expiration(self):
"""测试IP代理过期检测功能"""
print("\n=== 开始测试IP代理过期检测 ===")
"""Test IP proxy expiration detection functionality"""
print("\n=== Starting IP proxy expiration detection test ===")
# 1. 创建IP池并获取一个代理
# 1. Create IP pool and get a proxy
pool = await create_ip_pool(ip_pool_count=2, enable_validate_ip=True)
ip_proxy_info: IpInfoModel = await pool.get_proxy()
print(f"获取到的代理: {ip_proxy_info.ip}:{ip_proxy_info.port}")
print(f"Obtained proxy: {ip_proxy_info.ip}:{ip_proxy_info.port}")
# 2. 测试未过期的情况
# 2. Test non-expired case
if ip_proxy_info.expired_time_ts:
print(f"代理过期时间戳: {ip_proxy_info.expired_time_ts}")
print(f"当前时间戳: {int(time.time())}")
print(f"剩余有效时间: {ip_proxy_info.expired_time_ts - int(time.time())} ")
print(f"Proxy expiration timestamp: {ip_proxy_info.expired_time_ts}")
print(f"Current timestamp: {int(time.time())}")
print(f"Remaining valid time: {ip_proxy_info.expired_time_ts - int(time.time())} seconds")
is_expired = ip_proxy_info.is_expired(buffer_seconds=30)
print(f"代理是否过期(缓冲30秒): {is_expired}")
self.assertFalse(is_expired, msg="新获取的IP应该未过期")
print(f"Is proxy expired (30s buffer): {is_expired}")
self.assertFalse(is_expired, msg="Newly obtained IP should not be expired")
else:
print("当前代理未设置过期时间,跳过过期检测")
print("Current proxy does not have expiration time set, skipping expiration detection")
# 3. 测试即将过期的情况设置为5分钟后过期
# 3. Test about to expire case (set to expire in 5 minutes)
current_ts = int(time.time())
five_minutes_later = current_ts + 300 # 5分钟 = 300秒
five_minutes_later = current_ts + 300 # 5 minutes = 300 seconds
ip_proxy_info.expired_time_ts = five_minutes_later
print(f"\n设置代理过期时间为5分钟后: {five_minutes_later}")
print(f"\nSet proxy expiration time to 5 minutes later: {five_minutes_later}")
# 不应该过期缓冲30秒
# Should not be expired (30s buffer)
is_expired_30s = ip_proxy_info.is_expired(buffer_seconds=30)
print(f"代理是否过期(缓冲30秒): {is_expired_30s}")
self.assertFalse(is_expired_30s, msg="5分钟后过期的IP缓冲30秒不应该过期")
print(f"Is proxy expired (30s buffer): {is_expired_30s}")
self.assertFalse(is_expired_30s, msg="IP expiring in 5 minutes should not be expired with 30s buffer")
# 4. 测试已过期的情况(设置为已经过期)
expired_ts = current_ts - 60 # 1分钟前已过期
# 4. Test already expired case (set to already expired)
expired_ts = current_ts - 60 # Expired 1 minute ago
ip_proxy_info.expired_time_ts = expired_ts
print(f"\n设置代理过期时间为1分钟前: {expired_ts}")
print(f"\nSet proxy expiration time to 1 minute ago: {expired_ts}")
is_expired = ip_proxy_info.is_expired(buffer_seconds=30)
print(f"代理是否过期(缓冲30秒): {is_expired}")
self.assertTrue(is_expired, msg="已过期的IP应该被检测为过期")
print(f"Is proxy expired (30s buffer): {is_expired}")
self.assertTrue(is_expired, msg="Expired IP should be detected as expired")
# 5. 测试临界过期情况29秒后过期缓冲30秒应该认为已过期
# 5. Test critical expiration case (expires in 29s, should be considered expired with 30s buffer)
almost_expired_ts = current_ts + 29
ip_proxy_info.expired_time_ts = almost_expired_ts
print(f"\n设置代理过期时间为29秒后: {almost_expired_ts}")
print(f"\nSet proxy expiration time to 29 seconds later: {almost_expired_ts}")
is_expired_critical = ip_proxy_info.is_expired(buffer_seconds=30)
print(f"代理是否过期(缓冲30秒): {is_expired_critical}")
self.assertTrue(is_expired_critical, msg="29秒后过期的IP缓冲30秒应该被认为已过期")
print(f"Is proxy expired (30s buffer): {is_expired_critical}")
self.assertTrue(is_expired_critical, msg="IP expiring in 29s should be considered expired with 30s buffer")
print("\n=== IP代理过期检测测试完成 ===")
print("\n=== IP proxy expiration detection test completed ===")
async def test_proxy_pool_auto_refresh(self):
"""测试代理池自动刷新过期代理的功能"""
print("\n=== 开始测试代理池自动刷新功能 ===")
"""Test proxy pool auto-refresh expired proxy functionality"""
print("\n=== Starting proxy pool auto-refresh test ===")
# 1. 创建IP池
# 1. Create IP pool
pool = await create_ip_pool(ip_pool_count=2, enable_validate_ip=True)
# 2. 获取一个代理
# 2. Get a proxy
first_proxy = await pool.get_proxy()
print(f"首次获取代理: {first_proxy.ip}:{first_proxy.port}")
print(f"First proxy obtained: {first_proxy.ip}:{first_proxy.port}")
# 验证当前代理未过期
# Verify current proxy is not expired
is_expired = pool.is_current_proxy_expired(buffer_seconds=30)
print(f"当前代理是否过期: {is_expired}")
print(f"Is current proxy expired: {is_expired}")
if first_proxy.expired_time_ts:
print(f"当前代理过期时间戳: {first_proxy.expired_time_ts}")
print(f"Current proxy expiration timestamp: {first_proxy.expired_time_ts}")
# 3. 手动设置当前代理为已过期
# 3. Manually set current proxy as expired
current_ts = int(time.time())
pool.current_proxy.expired_time_ts = current_ts - 60
print(f"\n手动设置代理为已过期1分钟前")
print(f"\nManually set proxy as expired (1 minute ago)")
# 4. 检测是否过期
# 4. Check if expired
is_expired_after = pool.is_current_proxy_expired(buffer_seconds=30)
print(f"设置后代理是否过期: {is_expired_after}")
self.assertTrue(is_expired_after, msg="手动设置过期后应该被检测为过期")
print(f"Is proxy expired after setting: {is_expired_after}")
self.assertTrue(is_expired_after, msg="Should be detected as expired after manual setting")
# 5. 使用 get_or_refresh_proxy 自动刷新
print("\n调用 get_or_refresh_proxy 自动刷新过期代理...")
# 5. Use get_or_refresh_proxy to auto-refresh
print("\nCalling get_or_refresh_proxy to auto-refresh expired proxy...")
refreshed_proxy = await pool.get_or_refresh_proxy(buffer_seconds=30)
print(f"刷新后的代理: {refreshed_proxy.ip}:{refreshed_proxy.port}")
print(f"Refreshed proxy: {refreshed_proxy.ip}:{refreshed_proxy.port}")
# 6. 验证新代理未过期
# 6. Verify new proxy is not expired
is_new_expired = pool.is_current_proxy_expired(buffer_seconds=30)
print(f"新代理是否过期: {is_new_expired}")
self.assertFalse(is_new_expired, msg="刷新后的新代理应该未过期")
print(f"Is new proxy expired: {is_new_expired}")
self.assertFalse(is_new_expired, msg="Refreshed new proxy should not be expired")
print("\n=== 代理池自动刷新测试完成 ===")
print("\n=== Proxy pool auto-refresh test completed ===")
else:
print("当前代理未设置过期时间,跳过自动刷新测试")
print("Current proxy does not have expiration time set, skipping auto-refresh test")
async def test_ip_expiration_standalone(self):
"""独立测试IP过期检测功能不依赖真实代理提供商"""
print("\n=== 开始独立测试IP代理过期检测功能 ===")
"""Standalone test for IP expiration detection (does not depend on real proxy provider)"""
print("\n=== Starting standalone IP proxy expiration detection test ===")
current_ts = int(time.time())
# 1. 测试未设置过期时间的IP永不过期
# 1. Test IP without expiration time set (never expires)
ip_no_expire = IpInfoModel(
ip="192.168.1.1",
port=8080,
@@ -147,14 +147,14 @@ class TestIpPool(IsolatedAsyncioTestCase):
password="test_pwd",
expired_time_ts=None
)
print(f"\n测试1: IP未设置过期时间")
print(f"\nTest 1: IP without expiration time set")
is_expired = ip_no_expire.is_expired(buffer_seconds=30)
print(f" 代理: {ip_no_expire.ip}:{ip_no_expire.port}")
print(f" 过期时间: {ip_no_expire.expired_time_ts}")
print(f" 是否过期: {is_expired}")
self.assertFalse(is_expired, msg="未设置过期时间的IP应该永不过期")
print(f" Proxy: {ip_no_expire.ip}:{ip_no_expire.port}")
print(f" Expiration time: {ip_no_expire.expired_time_ts}")
print(f" Is expired: {is_expired}")
self.assertFalse(is_expired, msg="IP without expiration time should never expire")
# 2. 测试5分钟后过期的IP应该未过期
# 2. Test IP expiring in 5 minutes (should not be expired)
five_minutes_later = current_ts + 300
ip_valid = IpInfoModel(
ip="192.168.1.2",
@@ -163,16 +163,16 @@ class TestIpPool(IsolatedAsyncioTestCase):
password="test_pwd",
expired_time_ts=five_minutes_later
)
print(f"\n测试2: IP将在5分钟后过期")
print(f"\nTest 2: IP will expire in 5 minutes")
is_expired = ip_valid.is_expired(buffer_seconds=30)
print(f" 代理: {ip_valid.ip}:{ip_valid.port}")
print(f" 当前时间戳: {current_ts}")
print(f" 过期时间戳: {ip_valid.expired_time_ts}")
print(f" 剩余时间: {ip_valid.expired_time_ts - current_ts} ")
print(f" 是否过期(缓冲30秒): {is_expired}")
self.assertFalse(is_expired, msg="5分钟后过期的IP缓冲30秒不应该过期")
print(f" Proxy: {ip_valid.ip}:{ip_valid.port}")
print(f" Current timestamp: {current_ts}")
print(f" Expiration timestamp: {ip_valid.expired_time_ts}")
print(f" Remaining time: {ip_valid.expired_time_ts - current_ts} seconds")
print(f" Is expired (30s buffer): {is_expired}")
self.assertFalse(is_expired, msg="IP expiring in 5 minutes should not be expired with 30s buffer")
# 3. 测试已过期的IP
# 3. Test already expired IP
already_expired = current_ts - 60
ip_expired = IpInfoModel(
ip="192.168.1.3",
@@ -181,16 +181,16 @@ class TestIpPool(IsolatedAsyncioTestCase):
password="test_pwd",
expired_time_ts=already_expired
)
print(f"\n测试3: IP已经过期1分钟前")
print(f"\nTest 3: IP already expired (1 minute ago)")
is_expired = ip_expired.is_expired(buffer_seconds=30)
print(f" 代理: {ip_expired.ip}:{ip_expired.port}")
print(f" 当前时间戳: {current_ts}")
print(f" 过期时间戳: {ip_expired.expired_time_ts}")
print(f" 已过期: {current_ts - ip_expired.expired_time_ts} ")
print(f" 是否过期(缓冲30秒): {is_expired}")
self.assertTrue(is_expired, msg="已过期的IP应该被检测为过期")
print(f" Proxy: {ip_expired.ip}:{ip_expired.port}")
print(f" Current timestamp: {current_ts}")
print(f" Expiration timestamp: {ip_expired.expired_time_ts}")
print(f" Expired for: {current_ts - ip_expired.expired_time_ts} seconds")
print(f" Is expired (30s buffer): {is_expired}")
self.assertTrue(is_expired, msg="Expired IP should be detected as expired")
# 4. 测试临界过期29秒后过期缓冲30秒应该认为已过期
# 4. Test critical expiration (expires in 29s, should be considered expired with 30s buffer)
almost_expired = current_ts + 29
ip_critical = IpInfoModel(
ip="192.168.1.4",
@@ -199,16 +199,16 @@ class TestIpPool(IsolatedAsyncioTestCase):
password="test_pwd",
expired_time_ts=almost_expired
)
print(f"\n测试4: IP即将过期29秒后")
print(f"\nTest 4: IP about to expire (in 29 seconds)")
is_expired = ip_critical.is_expired(buffer_seconds=30)
print(f" 代理: {ip_critical.ip}:{ip_critical.port}")
print(f" 当前时间戳: {current_ts}")
print(f" 过期时间戳: {ip_critical.expired_time_ts}")
print(f" 剩余时间: {ip_critical.expired_time_ts - current_ts} ")
print(f" 是否过期(缓冲30秒): {is_expired}")
self.assertTrue(is_expired, msg="29秒后过期的IP缓冲30秒应该被认为已过期")
print(f" Proxy: {ip_critical.ip}:{ip_critical.port}")
print(f" Current timestamp: {current_ts}")
print(f" Expiration timestamp: {ip_critical.expired_time_ts}")
print(f" Remaining time: {ip_critical.expired_time_ts - current_ts} seconds")
print(f" Is expired (30s buffer): {is_expired}")
self.assertTrue(is_expired, msg="IP expiring in 29s should be considered expired with 30s buffer")
# 5. 测试31秒后过期缓冲30秒应该未过期
# 5. Test expires in 31s (should not be expired with 30s buffer)
just_safe = current_ts + 31
ip_just_safe = IpInfoModel(
ip="192.168.1.5",
@@ -217,17 +217,17 @@ class TestIpPool(IsolatedAsyncioTestCase):
password="test_pwd",
expired_time_ts=just_safe
)
print(f"\n测试5: IP在安全范围内31秒后过期")
print(f"\nTest 5: IP within safe range (expires in 31 seconds)")
is_expired = ip_just_safe.is_expired(buffer_seconds=30)
print(f" 代理: {ip_just_safe.ip}:{ip_just_safe.port}")
print(f" 当前时间戳: {current_ts}")
print(f" 过期时间戳: {ip_just_safe.expired_time_ts}")
print(f" 剩余时间: {ip_just_safe.expired_time_ts - current_ts} ")
print(f" 是否过期(缓冲30秒): {is_expired}")
self.assertFalse(is_expired, msg="31秒后过期的IP缓冲30秒应该未过期")
print(f" Proxy: {ip_just_safe.ip}:{ip_just_safe.port}")
print(f" Current timestamp: {current_ts}")
print(f" Expiration timestamp: {ip_just_safe.expired_time_ts}")
print(f" Remaining time: {ip_just_safe.expired_time_ts - current_ts} seconds")
print(f" Is expired (30s buffer): {is_expired}")
self.assertFalse(is_expired, msg="IP expiring in 31s should not be expired with 30s buffer")
# 6. 测试ProxyIpPool的过期检测
print(f"\n测试6: ProxyIpPool的过期检测功能")
# 6. Test ProxyIpPool expiration detection
print(f"\nTest 6: ProxyIpPool expiration detection functionality")
mock_provider = MagicMock()
mock_provider.get_proxy = AsyncMock(return_value=[])
@@ -237,35 +237,35 @@ class TestIpPool(IsolatedAsyncioTestCase):
ip_provider=mock_provider
)
# 6.1 测试无当前代理时
# 6.1 Test when there is no current proxy
is_expired = pool.is_current_proxy_expired(buffer_seconds=30)
print(f" 无当前代理时是否过期: {is_expired}")
self.assertTrue(is_expired, msg="无当前代理时应该返回True")
print(f" Is expired when no current proxy: {is_expired}")
self.assertTrue(is_expired, msg="Should return True when there is no current proxy")
# 6.2 设置一个有效的代理
# 6.2 Set a valid proxy
valid_proxy = IpInfoModel(
ip="192.168.1.6",
port=8080,
user="test_user",
password="test_pwd",
expired_time_ts=current_ts + 300 # 5分钟后过期
expired_time_ts=current_ts + 300 # Expires in 5 minutes
)
pool.current_proxy = valid_proxy
is_expired = pool.is_current_proxy_expired(buffer_seconds=30)
print(f" 设置有效代理后是否过期: {is_expired}")
self.assertFalse(is_expired, msg="有效的代理应该返回False")
print(f" Is expired after setting valid proxy: {is_expired}")
self.assertFalse(is_expired, msg="Valid proxy should return False")
# 6.3 设置一个已过期的代理
# 6.3 Set an expired proxy
expired_proxy = IpInfoModel(
ip="192.168.1.7",
port=8080,
user="test_user",
password="test_pwd",
expired_time_ts=current_ts - 60 # 1分钟前已过期
expired_time_ts=current_ts - 60 # Expired 1 minute ago
)
pool.current_proxy = expired_proxy
is_expired = pool.is_current_proxy_expired(buffer_seconds=30)
print(f" 设置已过期代理后是否过期: {is_expired}")
self.assertTrue(is_expired, msg="已过期的代理应该返回True")
print(f" Is expired after setting expired proxy: {is_expired}")
self.assertTrue(is_expired, msg="Expired proxy should return True")
print("\n=== 独立IP代理过期检测测试完成 ===\n")
print("\n=== Standalone IP proxy expiration detection test completed ===\n")

View File

@@ -52,7 +52,7 @@ class TestRedisCache(unittest.TestCase):
self.assertIn('key2', keys)
def tearDown(self):
# self.redis_cache._redis_client.flushdb() # 清空redis数据库
# self.redis_cache._redis_client.flushdb() # Clear redis database
pass