From 6eef02d08c157070f3160994ad648c71d491df42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E5=BA=8F=E5=91=98=E9=98=BF=E6=B1=9F=28Relakkes?= =?UTF-8?q?=29?= Date: Tue, 25 Nov 2025 12:39:10 +0800 Subject: [PATCH 1/2] feat: ip proxy expired check --- media_platform/bilibili/client.py | 14 +++++- media_platform/bilibili/core.py | 6 ++- media_platform/douyin/client.py | 14 +++++- media_platform/douyin/core.py | 6 ++- media_platform/kuaishou/client.py | 14 +++++- media_platform/kuaishou/core.py | 6 ++- media_platform/tieba/client.py | 21 +++++++++ media_platform/weibo/client.py | 14 +++++- media_platform/weibo/core.py | 6 ++- media_platform/xhs/client.py | 16 ++++++- media_platform/xhs/core.py | 6 ++- media_platform/zhihu/client.py | 14 +++++- media_platform/zhihu/core.py | 6 ++- proxy/proxy_ip_pool.py | 30 ++++++++++++ proxy/proxy_mixin.py | 77 +++++++++++++++++++++++++++++++ proxy/types.py | 16 ++++++- 16 files changed, 241 insertions(+), 25 deletions(-) create mode 100644 proxy/proxy_mixin.py diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index c938dfe..641e281 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -24,7 +24,7 @@ import asyncio import json import random -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union from urllib.parse import urlencode import httpx @@ -32,14 +32,18 @@ from playwright.async_api import BrowserContext, Page import config from base.base_crawler import AbstractApiClient +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import DataFetchError from .field import CommentOrderType, SearchOrderType from .help import BilibiliSign -class BilibiliClient(AbstractApiClient): +class BilibiliClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, @@ -49,6 +53,7 @@ class BilibiliClient(AbstractApiClient): headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -56,8 +61,13 @@ class BilibiliClient(AbstractApiClient): self._host = "https://api.bilibili.com" self.playwright_page = playwright_page self.cookie_dict = cookie_dict + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def request(self, method, url, **kwargs) -> Any: + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + async with httpx.AsyncClient(proxy=self.proxy) as client: response = await client.request(method, url, timeout=self.timeout, **kwargs) try: diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index 01ab91a..17e63f4 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -64,12 +64,13 @@ class BilibiliCrawler(AbstractCrawler): self.index_url = "https://www.bilibili.com" self.user_agent = utils.get_user_agent() self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self): playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + self.ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) async with async_playwright() as playwright: @@ -473,6 +474,7 @@ class BilibiliCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return bilibili_client_obj diff --git a/media_platform/douyin/client.py b/media_platform/douyin/client.py index e7b6765..b080f83 100644 --- a/media_platform/douyin/client.py +++ b/media_platform/douyin/client.py @@ -21,21 +21,25 @@ import asyncio import copy import json import urllib.parse -from typing import Any, Callable, Dict, Union, Optional +from typing import TYPE_CHECKING, Any, Callable, Dict, Union, Optional import httpx from playwright.async_api import BrowserContext from base.base_crawler import AbstractApiClient +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils from var import request_keyword_var +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import * from .field import * from .help import * -class DouYinClient(AbstractApiClient): +class DouYinClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, @@ -45,6 +49,7 @@ class DouYinClient(AbstractApiClient): headers: Dict, playwright_page: Optional[Page], cookie_dict: Dict, + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -52,6 +57,8 @@ class DouYinClient(AbstractApiClient): self._host = "https://www.douyin.com" self.playwright_page = playwright_page self.cookie_dict = cookie_dict + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def __process_req_params( self, @@ -106,6 +113,9 @@ class DouYinClient(AbstractApiClient): params["a_bogus"] = a_bogus async def request(self, method, url, **kwargs): + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + async with httpx.AsyncClient(proxy=self.proxy) as client: response = await client.request(method, url, timeout=self.timeout, **kwargs) try: diff --git a/media_platform/douyin/core.py b/media_platform/douyin/core.py index 4c856bb..3a0ec5d 100644 --- a/media_platform/douyin/core.py +++ b/media_platform/douyin/core.py @@ -55,12 +55,13 @@ class DouYinCrawler(AbstractCrawler): def __init__(self) -> None: self.index_url = "https://www.douyin.com" self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self) -> None: playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + self.ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) async with async_playwright() as playwright: @@ -305,6 +306,7 @@ class DouYinCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return douyin_client diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index 28dddc9..e402380 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -21,7 +21,7 @@ # -*- coding: utf-8 -*- import asyncio import json -from typing import Any, Callable, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional from urllib.parse import urlencode import httpx @@ -29,13 +29,17 @@ from playwright.async_api import BrowserContext, Page import config from base.base_crawler import AbstractApiClient +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import DataFetchError from .graphql import KuaiShouGraphQL -class KuaiShouClient(AbstractApiClient): +class KuaiShouClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, timeout=10, @@ -44,6 +48,7 @@ class KuaiShouClient(AbstractApiClient): headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -52,8 +57,13 @@ class KuaiShouClient(AbstractApiClient): self.playwright_page = playwright_page self.cookie_dict = cookie_dict self.graphql = KuaiShouGraphQL() + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def request(self, method, url, **kwargs) -> Any: + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + async with httpx.AsyncClient(proxy=self.proxy) as client: response = await client.request(method, url, timeout=self.timeout, **kwargs) data: Dict = response.json() diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index 4fe3d5c..b0655f4 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -58,14 +58,15 @@ class KuaishouCrawler(AbstractCrawler): self.index_url = "https://www.kuaishou.com" self.user_agent = utils.get_user_agent() self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self): playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool( + self.ip_proxy_pool = await create_ip_pool( config.IP_PROXY_POOL_COUNT, enable_validate_ip=True ) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info( ip_proxy_info ) @@ -317,6 +318,7 @@ class KuaishouCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return ks_client_obj diff --git a/media_platform/tieba/client.py b/media_platform/tieba/client.py index 6e211de..11206c4 100644 --- a/media_platform/tieba/client.py +++ b/media_platform/tieba/client.py @@ -89,6 +89,24 @@ class BaiduTieBaClient(AbstractApiClient): ) return response + async def _refresh_proxy_if_expired(self) -> None: + """ + 检测代理是否过期,如果过期则自动刷新 + """ + if self.ip_pool is None: + return + + if self.ip_pool.is_current_proxy_expired(): + utils.logger.info( + "[BaiduTieBaClient._refresh_proxy_if_expired] Proxy expired, refreshing..." + ) + new_proxy = await self.ip_pool.get_or_refresh_proxy() + # 更新代理URL + _, self.default_ip_proxy = utils.format_proxy_info(new_proxy) + utils.logger.info( + f"[BaiduTieBaClient._refresh_proxy_if_expired] New proxy: {new_proxy.ip}:{new_proxy.port}" + ) + @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) async def request(self, method, url, return_ori_content=False, proxy=None, **kwargs) -> Union[str, Any]: """ @@ -103,6 +121,9 @@ class BaiduTieBaClient(AbstractApiClient): Returns: """ + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + actual_proxy = proxy if proxy else self.default_ip_proxy # 在线程池中执行同步的requests请求 diff --git a/media_platform/weibo/client.py b/media_platform/weibo/client.py index a07f01f..6c83ea8 100644 --- a/media_platform/weibo/client.py +++ b/media_platform/weibo/client.py @@ -26,7 +26,7 @@ import asyncio import copy import json import re -from typing import Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union from urllib.parse import parse_qs, unquote, urlencode import httpx @@ -35,13 +35,17 @@ from playwright.async_api import BrowserContext, Page from tenacity import retry, stop_after_attempt, wait_fixed import config +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import DataFetchError from .field import SearchType -class WeiboClient: +class WeiboClient(ProxyRefreshMixin): def __init__( self, @@ -51,6 +55,7 @@ class WeiboClient: headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -59,9 +64,14 @@ class WeiboClient: self.playwright_page = playwright_page self.cookie_dict = cookie_dict self._image_agent_host = "https://i1.wp.com/" + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) @retry(stop=stop_after_attempt(5), wait=wait_fixed(3)) async def request(self, method, url, **kwargs) -> Union[Response, Dict]: + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + enable_return_response = kwargs.pop("return_response", False) async with httpx.AsyncClient(proxy=self.proxy) as client: response = await client.request(method, url, timeout=self.timeout, **kwargs) diff --git a/media_platform/weibo/core.py b/media_platform/weibo/core.py index 68fa6fa..9bea3f5 100644 --- a/media_platform/weibo/core.py +++ b/media_platform/weibo/core.py @@ -63,12 +63,13 @@ class WeiboCrawler(AbstractCrawler): self.user_agent = utils.get_user_agent() self.mobile_user_agent = utils.get_mobile_user_agent() self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self): playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + self.ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) async with async_playwright() as playwright: @@ -334,6 +335,7 @@ class WeiboCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return weibo_client_obj diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index dcb0cda..0a8f074 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -20,7 +20,7 @@ import asyncio import json import time -from typing import Any, Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from urllib.parse import urlencode, urlparse, parse_qs @@ -31,8 +31,11 @@ from xhshow import Xhshow import config from base.base_crawler import AbstractApiClient +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool from .exception import DataFetchError, IPBlockError from .field import SearchNoteType, SearchSortType @@ -40,7 +43,7 @@ from .help import get_search_id, sign from .extractor import XiaoHongShuExtractor -class XiaoHongShuClient(AbstractApiClient): +class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, @@ -50,6 +53,7 @@ class XiaoHongShuClient(AbstractApiClient): headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -65,6 +69,8 @@ class XiaoHongShuClient(AbstractApiClient): self._extractor = XiaoHongShuExtractor() # 初始化 xhshow 客户端用于签名生成 self._xhshow_client = Xhshow() + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def _pre_headers(self, url: str, params: Optional[Dict] = None, payload: Optional[Dict] = None) -> Dict: """请求头参数签名 @@ -132,6 +138,9 @@ class XiaoHongShuClient(AbstractApiClient): Returns: """ + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + # return response.text return_response = kwargs.pop("return_response", False) async with httpx.AsyncClient(proxy=self.proxy) as client: @@ -201,6 +210,9 @@ class XiaoHongShuClient(AbstractApiClient): ) async def get_note_media(self, url: str) -> Union[bytes, None]: + # 请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + async with httpx.AsyncClient(proxy=self.proxy) as client: try: response = await client.request("GET", url, timeout=self.timeout) diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index cbf49e0..06f6115 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -60,12 +60,13 @@ class XiaoHongShuCrawler(AbstractCrawler): # self.user_agent = utils.get_user_agent() self.user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self) -> None: playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + self.ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) async with async_playwright() as playwright: @@ -380,6 +381,7 @@ class XiaoHongShuCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return xhs_client_obj diff --git a/media_platform/zhihu/client.py b/media_platform/zhihu/client.py index 4f13527..1904c44 100644 --- a/media_platform/zhihu/client.py +++ b/media_platform/zhihu/client.py @@ -20,7 +20,7 @@ # -*- coding: utf-8 -*- import asyncio import json -from typing import Any, Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from urllib.parse import urlencode import httpx @@ -32,14 +32,18 @@ import config from base.base_crawler import AbstractApiClient from constant import zhihu as zhihu_constant from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import DataFetchError, ForbiddenError from .field import SearchSort, SearchTime, SearchType from .help import ZhihuExtractor, sign -class ZhiHuClient(AbstractApiClient): +class ZhiHuClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, @@ -49,12 +53,15 @@ class ZhiHuClient(AbstractApiClient): headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout self.default_headers = headers self.cookie_dict = cookie_dict self._extractor = ZhihuExtractor() + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def _pre_headers(self, url: str) -> Dict: """ @@ -85,6 +92,9 @@ class ZhiHuClient(AbstractApiClient): Returns: """ + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + # return response.text return_response = kwargs.pop('return_response', False) diff --git a/media_platform/zhihu/core.py b/media_platform/zhihu/core.py index 0608eac..1a5f39e 100644 --- a/media_platform/zhihu/core.py +++ b/media_platform/zhihu/core.py @@ -61,6 +61,7 @@ class ZhihuCrawler(AbstractCrawler): self.user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36" self._extractor = ZhihuExtractor() self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self) -> None: """ @@ -70,10 +71,10 @@ class ZhihuCrawler(AbstractCrawler): """ playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool( + self.ip_proxy_pool = await create_ip_pool( config.IP_PROXY_POOL_COUNT, enable_validate_ip=True ) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info( ip_proxy_info ) @@ -411,6 +412,7 @@ class ZhihuCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return zhihu_client_obj diff --git a/proxy/proxy_ip_pool.py b/proxy/proxy_ip_pool.py index c60390e..c8f93d7 100644 --- a/proxy/proxy_ip_pool.py +++ b/proxy/proxy_ip_pool.py @@ -55,6 +55,7 @@ class ProxyIpPool: self.enable_validate_ip = enable_validate_ip self.proxy_list: List[IpInfoModel] = [] self.ip_provider: ProxyProvider = ip_provider + self.current_proxy: IpInfoModel | None = None # 当前正在使用的代理 async def load_proxies(self) -> None: """ @@ -108,8 +109,37 @@ class ProxyIpPool: raise Exception( "[ProxyIpPool.get_proxy] current ip invalid and again get it" ) + self.current_proxy = proxy # 保存当前使用的代理 return proxy + def is_current_proxy_expired(self, buffer_seconds: int = 30) -> bool: + """ + 检测当前代理是否已过期 + Args: + buffer_seconds: 缓冲时间(秒),提前多少秒认为已过期 + Returns: + bool: True表示已过期或没有当前代理,False表示仍然有效 + """ + if self.current_proxy is None: + return True + return self.current_proxy.is_expired(buffer_seconds) + + async def get_or_refresh_proxy(self, buffer_seconds: int = 30) -> IpInfoModel: + """ + 获取当前代理,如果已过期则自动刷新 + 每次发起请求前调用此方法来确保代理有效 + Args: + buffer_seconds: 缓冲时间(秒),提前多少秒认为已过期 + Returns: + IpInfoModel: 有效的代理IP信息 + """ + if self.is_current_proxy_expired(buffer_seconds): + utils.logger.info( + f"[ProxyIpPool.get_or_refresh_proxy] Current proxy expired or not set, getting new proxy..." + ) + return await self.get_proxy() + return self.current_proxy + async def _reload_proxies(self): """ # 重新加载代理池 diff --git a/proxy/proxy_mixin.py b/proxy/proxy_mixin.py new file mode 100644 index 0000000..f0ce299 --- /dev/null +++ b/proxy/proxy_mixin.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 relakkes@gmail.com +# +# This file is part of MediaCrawler project. +# Repository: https://github.com/NanmiCoder/MediaCrawler +# GitHub: https://github.com/NanmiCoder +# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1 +# + +# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: +# 1. 不得用于任何商业用途。 +# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 +# 3. 不得进行大规模爬取或对平台造成运营干扰。 +# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 +# 5. 不得用于任何非法或不当的用途。 +# +# 详细许可条款请参阅项目根目录下的LICENSE文件。 +# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 + + +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2025/11/25 +# @Desc : 代理自动刷新 Mixin 类,供各平台 client 使用 + +from typing import TYPE_CHECKING, Optional + +from tools import utils + +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + + +class ProxyRefreshMixin: + """ + 代理自动刷新 Mixin 类 + + 使用方法: + 1. 让 client 类继承此 Mixin + 2. 在 client 的 __init__ 中调用 init_proxy_pool(proxy_ip_pool) + 3. 在每次 request 方法调用前调用 await _refresh_proxy_if_expired() + + 要求: + - client 类必须有 self.proxy 属性来存储当前代理URL + """ + + _proxy_ip_pool: Optional["ProxyIpPool"] = None + + def init_proxy_pool(self, proxy_ip_pool: Optional["ProxyIpPool"]) -> None: + """ + 初始化代理池引用 + Args: + proxy_ip_pool: 代理IP池实例 + """ + self._proxy_ip_pool = proxy_ip_pool + + async def _refresh_proxy_if_expired(self) -> None: + """ + 检测代理是否过期,如果过期则自动刷新 + 每次发起请求前调用此方法来确保代理有效 + """ + if self._proxy_ip_pool is None: + return + + if self._proxy_ip_pool.is_current_proxy_expired(): + utils.logger.info( + f"[{self.__class__.__name__}._refresh_proxy_if_expired] Proxy expired, refreshing..." + ) + new_proxy = await self._proxy_ip_pool.get_or_refresh_proxy() + # 更新 httpx 代理URL + if new_proxy.user and new_proxy.password: + self.proxy = f"http://{new_proxy.user}:{new_proxy.password}@{new_proxy.ip}:{new_proxy.port}" + else: + self.proxy = f"http://{new_proxy.ip}:{new_proxy.port}" + utils.logger.info( + f"[{self.__class__.__name__}._refresh_proxy_if_expired] New proxy: {new_proxy.ip}:{new_proxy.port}" + ) diff --git a/proxy/types.py b/proxy/types.py index 1c6b457..190650d 100644 --- a/proxy/types.py +++ b/proxy/types.py @@ -22,6 +22,7 @@ # @Author : relakkes@gmail.com # @Time : 2024/4/5 10:18 # @Desc : 基础类型 +import time from enum import Enum from typing import Optional @@ -41,4 +42,17 @@ class IpInfoModel(BaseModel): user: str = Field(title="IP代理认证的用户名") protocol: str = Field(default="https://", title="代理IP的协议") password: str = Field(title="IP代理认证用户的密码") - expired_time_ts: Optional[int] = Field(title="IP 过期时间") + expired_time_ts: Optional[int] = Field(default=None, title="IP 过期时间") + + def is_expired(self, buffer_seconds: int = 30) -> bool: + """ + 检测代理IP是否已过期 + Args: + buffer_seconds: 缓冲时间(秒),提前多少秒认为已过期,避免临界时间请求失败 + Returns: + bool: True表示已过期或即将过期,False表示仍然有效 + """ + if self.expired_time_ts is None: + return False + current_ts = int(time.time()) + return current_ts >= (self.expired_time_ts - buffer_seconds) From f1e71246549926abb35def7b49a4f8a41f65bcdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E5=BA=8F=E5=91=98=E9=98=BF=E6=B1=9F=28Relakkes?= =?UTF-8?q?=29?= Date: Wed, 26 Nov 2025 16:01:54 +0800 Subject: [PATCH 2/2] fix: proxy extract error --- proxy/base_proxy.py | 2 +- proxy/providers/kuaidl_proxy.py | 32 +++- proxy/providers/wandou_http_proxy.py | 13 +- test/test_proxy_ip_pool.py | 237 ++++++++++++++++++++++++++- 4 files changed, 268 insertions(+), 16 deletions(-) diff --git a/proxy/base_proxy.py b/proxy/base_proxy.py index 8943f2f..f6b7278 100644 --- a/proxy/base_proxy.py +++ b/proxy/base_proxy.py @@ -53,7 +53,7 @@ class ProxyProvider(ABC): class IpCache: def __init__(self): - self.cache_client: AbstractCache = CacheFactory.create_cache(cache_type=config.CACHE_TYPE_MEMORY) + self.cache_client: AbstractCache = CacheFactory.create_cache(cache_type=config.CACHE_TYPE_REDIS) def set_ip(self, ip_key: str, ip_value_info: str, ex: int): """ diff --git a/proxy/providers/kuaidl_proxy.py b/proxy/providers/kuaidl_proxy.py index 5d21de5..9d43be8 100644 --- a/proxy/providers/kuaidl_proxy.py +++ b/proxy/providers/kuaidl_proxy.py @@ -33,11 +33,14 @@ from proxy import IpCache, IpInfoModel, ProxyProvider from proxy.types import ProviderNameEnum from tools import utils +# 快代理的IP代理过期时间向前推移5秒,避免临界时间使用失败 +DELTA_EXPIRED_SECOND = 5 + class KuaidailiProxyModel(BaseModel): ip: str = Field("ip") port: int = Field("端口") - expire_ts: int = Field("过期时间") + expire_ts: int = Field("过期时间,单位秒,多少秒后过期") def parse_kuaidaili_proxy(proxy_info: str) -> KuaidailiProxyModel: @@ -114,7 +117,7 @@ class KuaiDaiLiProxy(ProxyProvider): response = await client.get(self.api_base + uri, params=self.params) if response.status_code != 200: - utils.logger.error(f"[KuaiDaiLiProxy.get_proxies] statuc code not 200 and response.txt:{response.text}") + utils.logger.error(f"[KuaiDaiLiProxy.get_proxies] statuc code not 200 and response.txt:{response.text}, status code: {response.status_code}") raise Exception("get ip error from proxy provider and status code not 200 ...") ip_response: Dict = response.json() @@ -125,16 +128,19 @@ class KuaiDaiLiProxy(ProxyProvider): proxy_list: List[str] = ip_response.get("data", {}).get("proxy_list") for proxy in proxy_list: proxy_model = parse_kuaidaili_proxy(proxy) + # expire_ts是相对时间(秒数),需要转换为绝对时间戳 + # 提前DELTA_EXPIRED_SECOND秒认为过期,避免临界时间使用失败 ip_info_model = IpInfoModel( ip=proxy_model.ip, port=proxy_model.port, user=self.kdl_user_name, password=self.kdl_user_pwd, - expired_time_ts=proxy_model.expire_ts, + expired_time_ts=proxy_model.expire_ts + utils.get_unix_timestamp() - DELTA_EXPIRED_SECOND, ) ip_key = f"{self.proxy_brand_name}_{ip_info_model.ip}_{ip_info_model.port}" - self.ip_cache.set_ip(ip_key, ip_info_model.model_dump_json(), ex=ip_info_model.expired_time_ts) + # 缓存过期时间使用相对时间(秒数),也需要减去缓冲时间 + self.ip_cache.set_ip(ip_key, ip_info_model.model_dump_json(), ex=proxy_model.expire_ts - DELTA_EXPIRED_SECOND) ip_infos.append(ip_info_model) return ip_cache_list + ip_infos @@ -143,12 +149,22 @@ class KuaiDaiLiProxy(ProxyProvider): def new_kuai_daili_proxy() -> KuaiDaiLiProxy: """ 构造快代理HTTP实例 + 支持两种环境变量命名格式: + 1. 大写格式:KDL_SECERT_ID, KDL_SIGNATURE, KDL_USER_NAME, KDL_USER_PWD + 2. 小写格式:kdl_secret_id, kdl_signature, kdl_user_name, kdl_user_pwd + 优先使用大写格式,如果不存在则使用小写格式 Returns: """ + # 支持大小写两种环境变量格式,优先使用大写 + kdl_secret_id = os.getenv("KDL_SECERT_ID") or os.getenv("kdl_secret_id", "你的快代理secert_id") + kdl_signature = os.getenv("KDL_SIGNATURE") or os.getenv("kdl_signature", "你的快代理签名") + kdl_user_name = os.getenv("KDL_USER_NAME") or os.getenv("kdl_user_name", "你的快代理用户名") + kdl_user_pwd = os.getenv("KDL_USER_PWD") or os.getenv("kdl_user_pwd", "你的快代理密码") + return KuaiDaiLiProxy( - kdl_secret_id=os.getenv("kdl_secret_id", "你的快代理secert_id"), - kdl_signature=os.getenv("kdl_signature", "你的快代理签名"), - kdl_user_name=os.getenv("kdl_user_name", "你的快代理用户名"), - kdl_user_pwd=os.getenv("kdl_user_pwd", "你的快代理密码"), + kdl_secret_id=kdl_secret_id, + kdl_signature=kdl_signature, + kdl_user_name=kdl_user_name, + kdl_user_pwd=kdl_user_pwd, ) diff --git a/proxy/providers/wandou_http_proxy.py b/proxy/providers/wandou_http_proxy.py index 71c6d88..4b05b0b 100644 --- a/proxy/providers/wandou_http_proxy.py +++ b/proxy/providers/wandou_http_proxy.py @@ -109,11 +109,14 @@ class WanDouHttpProxy(ProxyProvider): def new_wandou_http_proxy() -> WanDouHttpProxy: """ 构造豌豆HTTP实例 + 支持两种环境变量命名格式: + 1. 大写格式:WANDOU_APP_KEY + 2. 小写格式:wandou_app_key + 优先使用大写格式,如果不存在则使用小写格式 Returns: """ - return WanDouHttpProxy( - app_key=os.getenv( - "wandou_app_key", "你的豌豆HTTP app_key" - ), # 通过环境变量的方式获取豌豆HTTP app_key - ) + # 支持大小写两种环境变量格式,优先使用大写 + app_key = os.getenv("WANDOU_APP_KEY") or os.getenv("wandou_app_key", "你的豌豆HTTP app_key") + + return WanDouHttpProxy(app_key=app_key) diff --git a/test/test_proxy_ip_pool.py b/test/test_proxy_ip_pool.py index 6f6be7a..58c4046 100644 --- a/test/test_proxy_ip_pool.py +++ b/test/test_proxy_ip_pool.py @@ -22,9 +22,11 @@ # @Author : relakkes@gmail.com # @Time : 2023/12/2 14:42 # @Desc : +import time from unittest import IsolatedAsyncioTestCase +from unittest.mock import AsyncMock, MagicMock -from proxy.proxy_ip_pool import create_ip_pool +from proxy.proxy_ip_pool import create_ip_pool, ProxyIpPool from proxy.types import IpInfoModel @@ -32,7 +34,238 @@ class TestIpPool(IsolatedAsyncioTestCase): async def test_ip_pool(self): pool = await create_ip_pool(ip_pool_count=1, enable_validate_ip=True) print("\n") - for i in range(3): + for _ in range(3): ip_proxy_info: IpInfoModel = await pool.get_proxy() print(ip_proxy_info) self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功") + + async def test_ip_expiration(self): + """测试IP代理过期检测功能""" + print("\n=== 开始测试IP代理过期检测 ===") + + # 1. 创建IP池并获取一个代理 + pool = await create_ip_pool(ip_pool_count=2, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await pool.get_proxy() + print(f"获取到的代理: {ip_proxy_info.ip}:{ip_proxy_info.port}") + + # 2. 测试未过期的情况 + if ip_proxy_info.expired_time_ts: + print(f"代理过期时间戳: {ip_proxy_info.expired_time_ts}") + print(f"当前时间戳: {int(time.time())}") + print(f"剩余有效时间: {ip_proxy_info.expired_time_ts - int(time.time())} 秒") + + is_expired = ip_proxy_info.is_expired(buffer_seconds=30) + print(f"代理是否过期(缓冲30秒): {is_expired}") + self.assertFalse(is_expired, msg="新获取的IP应该未过期") + else: + print("当前代理未设置过期时间,跳过过期检测") + + # 3. 测试即将过期的情况(设置为5分钟后过期) + current_ts = int(time.time()) + five_minutes_later = current_ts + 300 # 5分钟 = 300秒 + ip_proxy_info.expired_time_ts = five_minutes_later + print(f"\n设置代理过期时间为5分钟后: {five_minutes_later}") + + # 不应该过期(缓冲30秒) + is_expired_30s = ip_proxy_info.is_expired(buffer_seconds=30) + print(f"代理是否过期(缓冲30秒): {is_expired_30s}") + self.assertFalse(is_expired_30s, msg="5分钟后过期的IP,缓冲30秒不应该过期") + + # 4. 测试已过期的情况(设置为已经过期) + expired_ts = current_ts - 60 # 1分钟前已过期 + ip_proxy_info.expired_time_ts = expired_ts + print(f"\n设置代理过期时间为1分钟前: {expired_ts}") + + is_expired = ip_proxy_info.is_expired(buffer_seconds=30) + print(f"代理是否过期(缓冲30秒): {is_expired}") + self.assertTrue(is_expired, msg="已过期的IP应该被检测为过期") + + # 5. 测试临界过期情况(29秒后过期,缓冲30秒应该认为已过期) + almost_expired_ts = current_ts + 29 + ip_proxy_info.expired_time_ts = almost_expired_ts + print(f"\n设置代理过期时间为29秒后: {almost_expired_ts}") + + is_expired_critical = ip_proxy_info.is_expired(buffer_seconds=30) + print(f"代理是否过期(缓冲30秒): {is_expired_critical}") + self.assertTrue(is_expired_critical, msg="29秒后过期的IP,缓冲30秒应该被认为已过期") + + print("\n=== IP代理过期检测测试完成 ===") + + async def test_proxy_pool_auto_refresh(self): + """测试代理池自动刷新过期代理的功能""" + print("\n=== 开始测试代理池自动刷新功能 ===") + + # 1. 创建IP池 + pool = await create_ip_pool(ip_pool_count=2, enable_validate_ip=True) + + # 2. 获取一个代理 + first_proxy = await pool.get_proxy() + print(f"首次获取代理: {first_proxy.ip}:{first_proxy.port}") + + # 验证当前代理未过期 + is_expired = pool.is_current_proxy_expired(buffer_seconds=30) + print(f"当前代理是否过期: {is_expired}") + + if first_proxy.expired_time_ts: + print(f"当前代理过期时间戳: {first_proxy.expired_time_ts}") + + # 3. 手动设置当前代理为已过期 + current_ts = int(time.time()) + pool.current_proxy.expired_time_ts = current_ts - 60 + print(f"\n手动设置代理为已过期(1分钟前)") + + # 4. 检测是否过期 + is_expired_after = pool.is_current_proxy_expired(buffer_seconds=30) + print(f"设置后代理是否过期: {is_expired_after}") + self.assertTrue(is_expired_after, msg="手动设置过期后应该被检测为过期") + + # 5. 使用 get_or_refresh_proxy 自动刷新 + print("\n调用 get_or_refresh_proxy 自动刷新过期代理...") + refreshed_proxy = await pool.get_or_refresh_proxy(buffer_seconds=30) + print(f"刷新后的代理: {refreshed_proxy.ip}:{refreshed_proxy.port}") + + # 6. 验证新代理未过期 + is_new_expired = pool.is_current_proxy_expired(buffer_seconds=30) + print(f"新代理是否过期: {is_new_expired}") + self.assertFalse(is_new_expired, msg="刷新后的新代理应该未过期") + + print("\n=== 代理池自动刷新测试完成 ===") + else: + print("当前代理未设置过期时间,跳过自动刷新测试") + + async def test_ip_expiration_standalone(self): + """独立测试IP过期检测功能(不依赖真实代理提供商)""" + print("\n=== 开始独立测试IP代理过期检测功能 ===") + + current_ts = int(time.time()) + + # 1. 测试未设置过期时间的IP(永不过期) + ip_no_expire = IpInfoModel( + ip="192.168.1.1", + port=8080, + user="test_user", + password="test_pwd", + expired_time_ts=None + ) + print(f"\n测试1: IP未设置过期时间") + is_expired = ip_no_expire.is_expired(buffer_seconds=30) + print(f" 代理: {ip_no_expire.ip}:{ip_no_expire.port}") + print(f" 过期时间: {ip_no_expire.expired_time_ts}") + print(f" 是否过期: {is_expired}") + self.assertFalse(is_expired, msg="未设置过期时间的IP应该永不过期") + + # 2. 测试5分钟后过期的IP(应该未过期) + five_minutes_later = current_ts + 300 + ip_valid = IpInfoModel( + ip="192.168.1.2", + port=8080, + user="test_user", + password="test_pwd", + expired_time_ts=five_minutes_later + ) + print(f"\n测试2: IP将在5分钟后过期") + is_expired = ip_valid.is_expired(buffer_seconds=30) + print(f" 代理: {ip_valid.ip}:{ip_valid.port}") + print(f" 当前时间戳: {current_ts}") + print(f" 过期时间戳: {ip_valid.expired_time_ts}") + print(f" 剩余时间: {ip_valid.expired_time_ts - current_ts} 秒") + print(f" 是否过期(缓冲30秒): {is_expired}") + self.assertFalse(is_expired, msg="5分钟后过期的IP,缓冲30秒不应该过期") + + # 3. 测试已过期的IP + already_expired = current_ts - 60 + ip_expired = IpInfoModel( + ip="192.168.1.3", + port=8080, + user="test_user", + password="test_pwd", + expired_time_ts=already_expired + ) + print(f"\n测试3: IP已经过期(1分钟前)") + is_expired = ip_expired.is_expired(buffer_seconds=30) + print(f" 代理: {ip_expired.ip}:{ip_expired.port}") + print(f" 当前时间戳: {current_ts}") + print(f" 过期时间戳: {ip_expired.expired_time_ts}") + print(f" 已过期: {current_ts - ip_expired.expired_time_ts} 秒") + print(f" 是否过期(缓冲30秒): {is_expired}") + self.assertTrue(is_expired, msg="已过期的IP应该被检测为过期") + + # 4. 测试临界过期(29秒后过期,缓冲30秒应该认为已过期) + almost_expired = current_ts + 29 + ip_critical = IpInfoModel( + ip="192.168.1.4", + port=8080, + user="test_user", + password="test_pwd", + expired_time_ts=almost_expired + ) + print(f"\n测试4: IP即将过期(29秒后)") + is_expired = ip_critical.is_expired(buffer_seconds=30) + print(f" 代理: {ip_critical.ip}:{ip_critical.port}") + print(f" 当前时间戳: {current_ts}") + print(f" 过期时间戳: {ip_critical.expired_time_ts}") + print(f" 剩余时间: {ip_critical.expired_time_ts - current_ts} 秒") + print(f" 是否过期(缓冲30秒): {is_expired}") + self.assertTrue(is_expired, msg="29秒后过期的IP,缓冲30秒应该被认为已过期") + + # 5. 测试31秒后过期(缓冲30秒应该未过期) + just_safe = current_ts + 31 + ip_just_safe = IpInfoModel( + ip="192.168.1.5", + port=8080, + user="test_user", + password="test_pwd", + expired_time_ts=just_safe + ) + print(f"\n测试5: IP在安全范围内(31秒后过期)") + is_expired = ip_just_safe.is_expired(buffer_seconds=30) + print(f" 代理: {ip_just_safe.ip}:{ip_just_safe.port}") + print(f" 当前时间戳: {current_ts}") + print(f" 过期时间戳: {ip_just_safe.expired_time_ts}") + print(f" 剩余时间: {ip_just_safe.expired_time_ts - current_ts} 秒") + print(f" 是否过期(缓冲30秒): {is_expired}") + self.assertFalse(is_expired, msg="31秒后过期的IP,缓冲30秒应该未过期") + + # 6. 测试ProxyIpPool的过期检测 + print(f"\n测试6: ProxyIpPool的过期检测功能") + mock_provider = MagicMock() + mock_provider.get_proxy = AsyncMock(return_value=[]) + + pool = ProxyIpPool( + ip_pool_count=1, + enable_validate_ip=False, + ip_provider=mock_provider + ) + + # 6.1 测试无当前代理时 + is_expired = pool.is_current_proxy_expired(buffer_seconds=30) + print(f" 无当前代理时是否过期: {is_expired}") + self.assertTrue(is_expired, msg="无当前代理时应该返回True") + + # 6.2 设置一个有效的代理 + valid_proxy = IpInfoModel( + ip="192.168.1.6", + port=8080, + user="test_user", + password="test_pwd", + expired_time_ts=current_ts + 300 # 5分钟后过期 + ) + pool.current_proxy = valid_proxy + is_expired = pool.is_current_proxy_expired(buffer_seconds=30) + print(f" 设置有效代理后是否过期: {is_expired}") + self.assertFalse(is_expired, msg="有效的代理应该返回False") + + # 6.3 设置一个已过期的代理 + expired_proxy = IpInfoModel( + ip="192.168.1.7", + port=8080, + user="test_user", + password="test_pwd", + expired_time_ts=current_ts - 60 # 1分钟前已过期 + ) + pool.current_proxy = expired_proxy + is_expired = pool.is_current_proxy_expired(buffer_seconds=30) + print(f" 设置已过期代理后是否过期: {is_expired}") + self.assertTrue(is_expired, msg="已过期的代理应该返回True") + + print("\n=== 独立IP代理过期检测测试完成 ===\n")