Files
MediaCrawler/proxy/proxy_ip_pool.py
程序员阿江(Relakkes) 8e93438fe5 Keep PR 900 overrides bounded and opt-in
The PR adds API limit overrides and static proxy support, but the review found that the default proxy provider changed to an invalid static placeholder and the new API fields accepted unbounded values. This keeps the existing proxy default intact, makes static proxy explicit via config or CLI, validates API limit ranges, and adds focused regression coverage for both paths.

Constraint: PR branch must remain contributor-branch compatible and avoid adding dependencies

Rejected: Keep static as the default provider | breaks existing --enable_ip_proxy defaults with an invalid placeholder URL

Rejected: Accept arbitrary integer limits | lets API callers request negative or excessive crawl sizes

Confidence: high

Scope-risk: narrow

Directive: Do not change proxy provider defaults when adding new providers; new providers should be opt-in and covered by provider-specific tests

Tested: uv run pytest tests/test_api_limits.py tests/test_static_proxy_provider.py

Tested: uv run pytest tests

Tested: uv run pytest test/test_utils.py

Tested: uv run python -m compileall api cmd_arg config proxy tests

Tested: git diff --cached --check

Not-tested: Live crawler run against external platforms or real proxy vendor endpoints
2026-05-29 21:27:52 +08:00

217 lines
7.4 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2025 relakkes@gmail.com
#
# This file is part of MediaCrawler project.
# Repository: https://github.com/NanmiCoder/MediaCrawler/blob/main/proxy/proxy_ip_pool.py
# GitHub: https://github.com/NanmiCoder
# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1
#
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 13:45
# @Desc : IP proxy pool implementation
import random
import time
from typing import Dict, List
from urllib.parse import unquote, urlparse
import httpx
from tenacity import retry, stop_after_attempt, wait_fixed
from tools.httpx_util import make_async_client
import config
from proxy.providers import (
new_kuai_daili_proxy,
new_wandou_http_proxy,
)
from tools import utils
from .base_proxy import ProxyProvider
from .types import IpInfoModel, ProviderNameEnum
class ProxyIpPool:
def __init__(
self, ip_pool_count: int, enable_validate_ip: bool, ip_provider: ProxyProvider
) -> None:
"""
Args:
ip_pool_count:
enable_validate_ip:
ip_provider:
"""
self.valid_ip_url = "https://echo.apifox.cn/" # URL to validate if IP is valid
self.ip_pool_count = ip_pool_count
self.enable_validate_ip = enable_validate_ip
self.proxy_list: List[IpInfoModel] = []
self.ip_provider: ProxyProvider = ip_provider
self.current_proxy: IpInfoModel | None = None # Currently used proxy
async def load_proxies(self) -> None:
"""
Load IP proxies
Returns:
"""
self.proxy_list = await self.ip_provider.get_proxy(self.ip_pool_count)
async def _is_valid_proxy(self, proxy: IpInfoModel) -> bool:
"""
Validate if proxy IP is valid
:param proxy:
:return:
"""
utils.logger.info(
f"[ProxyIpPool._is_valid_proxy] testing {proxy.ip} is it valid "
)
try:
# httpx 0.28.1 requires passing proxy URL string directly, not a dictionary
if proxy.user and proxy.password:
proxy_url = f"http://{proxy.user}:{proxy.password}@{proxy.ip}:{proxy.port}"
else:
proxy_url = f"http://{proxy.ip}:{proxy.port}"
async with make_async_client(proxy=proxy_url) as client:
response = await client.get(self.valid_ip_url)
if response.status_code == 200:
return True
else:
return False
except Exception as e:
utils.logger.info(
f"[ProxyIpPool._is_valid_proxy] testing {proxy.ip} err: {e}"
)
raise e
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def get_proxy(self) -> IpInfoModel:
"""
Randomly extract a proxy IP from the proxy pool
:return:
"""
if len(self.proxy_list) == 0:
await self._reload_proxies()
proxy = random.choice(self.proxy_list)
self.proxy_list.remove(proxy) # Remove an IP once extracted
if self.enable_validate_ip:
if not await self._is_valid_proxy(proxy):
raise Exception(
"[ProxyIpPool.get_proxy] current ip invalid and again get it"
)
self.current_proxy = proxy # Save currently used proxy
return proxy
def is_current_proxy_expired(self, buffer_seconds: int = 30) -> bool:
"""
Check if current proxy has expired
Args:
buffer_seconds: Buffer time (seconds), how many seconds ahead to consider expired
Returns:
bool: True means expired or no current proxy, False means still valid
"""
if self.current_proxy is None:
return True
return self.current_proxy.is_expired(buffer_seconds)
async def get_or_refresh_proxy(self, buffer_seconds: int = 30) -> IpInfoModel:
"""
Get current proxy, automatically refresh if expired
Call this method before each request to ensure proxy is valid
Args:
buffer_seconds: Buffer time (seconds), how many seconds ahead to consider expired
Returns:
IpInfoModel: Valid proxy IP information
"""
if self.is_current_proxy_expired(buffer_seconds):
utils.logger.info(
f"[ProxyIpPool.get_or_refresh_proxy] Current proxy expired or not set, getting new proxy..."
)
return await self.get_proxy()
return self.current_proxy
async def _reload_proxies(self):
"""
Reload proxy pool
:return:
"""
self.proxy_list = []
await self.load_proxies()
class StaticProxyProvider(ProxyProvider):
async def get_proxy(self, num: int) -> List[IpInfoModel]:
proxy_url = getattr(config, "STATIC_PROXY_URL", "")
if not proxy_url:
utils.logger.warning("[StaticProxyProvider] STATIC_PROXY_URL is not configured!")
return []
try:
parsed = urlparse(proxy_url)
scheme = parsed.scheme or "http"
if scheme not in {"http", "https"}:
utils.logger.error(f"[StaticProxyProvider] Unsupported proxy scheme: {scheme}")
return []
ip = parsed.hostname or ""
port = parsed.port or (443 if scheme == "https" else 80)
if not ip:
utils.logger.error("[StaticProxyProvider] STATIC_PROXY_URL host is empty!")
return []
return [
IpInfoModel(
ip=ip,
port=port,
user=unquote(parsed.username or ""),
password=unquote(parsed.password or ""),
protocol=f"{scheme}://",
# Static proxy doesn't expire.
expired_time_ts=int(time.time()) + 99999999,
)
]
except Exception as e:
utils.logger.error(f"[StaticProxyProvider] Parse static proxy url error: {e}")
return []
IpProxyProvider: Dict[str, ProxyProvider] = {
ProviderNameEnum.KUAI_DAILI_PROVIDER.value: new_kuai_daili_proxy(),
ProviderNameEnum.WANDOU_HTTP_PROVIDER.value: new_wandou_http_proxy(),
ProviderNameEnum.STATIC_PROVIDER.value: StaticProxyProvider(),
}
async def create_ip_pool(ip_pool_count: int, enable_validate_ip: bool) -> ProxyIpPool:
"""
Create IP proxy pool
:param ip_pool_count: Number of IPs in the pool
:param enable_validate_ip: Whether to enable IP proxy validation
:return:
"""
is_static = config.IP_PROXY_PROVIDER_NAME == ProviderNameEnum.STATIC_PROVIDER.value
pool = ProxyIpPool(
ip_pool_count=ip_pool_count,
enable_validate_ip=False if is_static else enable_validate_ip,
ip_provider=IpProxyProvider.get(config.IP_PROXY_PROVIDER_NAME),
)
await pool.load_proxies()
return pool
if __name__ == "__main__":
pass