diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index c938dfe..641e281 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -24,7 +24,7 @@ import asyncio import json import random -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union from urllib.parse import urlencode import httpx @@ -32,14 +32,18 @@ from playwright.async_api import BrowserContext, Page import config from base.base_crawler import AbstractApiClient +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import DataFetchError from .field import CommentOrderType, SearchOrderType from .help import BilibiliSign -class BilibiliClient(AbstractApiClient): +class BilibiliClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, @@ -49,6 +53,7 @@ class BilibiliClient(AbstractApiClient): headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -56,8 +61,13 @@ class BilibiliClient(AbstractApiClient): self._host = "https://api.bilibili.com" self.playwright_page = playwright_page self.cookie_dict = cookie_dict + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def request(self, method, url, **kwargs) -> Any: + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + async with httpx.AsyncClient(proxy=self.proxy) as client: response = await client.request(method, url, timeout=self.timeout, **kwargs) try: diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index 01ab91a..17e63f4 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -64,12 +64,13 @@ class BilibiliCrawler(AbstractCrawler): self.index_url = "https://www.bilibili.com" self.user_agent = utils.get_user_agent() self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self): playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + self.ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) async with async_playwright() as playwright: @@ -473,6 +474,7 @@ class BilibiliCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return bilibili_client_obj diff --git a/media_platform/douyin/client.py b/media_platform/douyin/client.py index e7b6765..b080f83 100644 --- a/media_platform/douyin/client.py +++ b/media_platform/douyin/client.py @@ -21,21 +21,25 @@ import asyncio import copy import json import urllib.parse -from typing import Any, Callable, Dict, Union, Optional +from typing import TYPE_CHECKING, Any, Callable, Dict, Union, Optional import httpx from playwright.async_api import BrowserContext from base.base_crawler import AbstractApiClient +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils from var import request_keyword_var +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import * from .field import * from .help import * -class DouYinClient(AbstractApiClient): +class DouYinClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, @@ -45,6 +49,7 @@ class DouYinClient(AbstractApiClient): headers: Dict, playwright_page: Optional[Page], cookie_dict: Dict, + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -52,6 +57,8 @@ class DouYinClient(AbstractApiClient): self._host = "https://www.douyin.com" self.playwright_page = playwright_page self.cookie_dict = cookie_dict + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def __process_req_params( self, @@ -106,6 +113,9 @@ class DouYinClient(AbstractApiClient): params["a_bogus"] = a_bogus async def request(self, method, url, **kwargs): + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + async with httpx.AsyncClient(proxy=self.proxy) as client: response = await client.request(method, url, timeout=self.timeout, **kwargs) try: diff --git a/media_platform/douyin/core.py b/media_platform/douyin/core.py index 4c856bb..3a0ec5d 100644 --- a/media_platform/douyin/core.py +++ b/media_platform/douyin/core.py @@ -55,12 +55,13 @@ class DouYinCrawler(AbstractCrawler): def __init__(self) -> None: self.index_url = "https://www.douyin.com" self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self) -> None: playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + self.ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) async with async_playwright() as playwright: @@ -305,6 +306,7 @@ class DouYinCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return douyin_client diff --git a/media_platform/kuaishou/client.py b/media_platform/kuaishou/client.py index 28dddc9..e402380 100644 --- a/media_platform/kuaishou/client.py +++ b/media_platform/kuaishou/client.py @@ -21,7 +21,7 @@ # -*- coding: utf-8 -*- import asyncio import json -from typing import Any, Callable, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional from urllib.parse import urlencode import httpx @@ -29,13 +29,17 @@ from playwright.async_api import BrowserContext, Page import config from base.base_crawler import AbstractApiClient +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import DataFetchError from .graphql import KuaiShouGraphQL -class KuaiShouClient(AbstractApiClient): +class KuaiShouClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, timeout=10, @@ -44,6 +48,7 @@ class KuaiShouClient(AbstractApiClient): headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -52,8 +57,13 @@ class KuaiShouClient(AbstractApiClient): self.playwright_page = playwright_page self.cookie_dict = cookie_dict self.graphql = KuaiShouGraphQL() + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def request(self, method, url, **kwargs) -> Any: + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + async with httpx.AsyncClient(proxy=self.proxy) as client: response = await client.request(method, url, timeout=self.timeout, **kwargs) data: Dict = response.json() diff --git a/media_platform/kuaishou/core.py b/media_platform/kuaishou/core.py index 4fe3d5c..b0655f4 100644 --- a/media_platform/kuaishou/core.py +++ b/media_platform/kuaishou/core.py @@ -58,14 +58,15 @@ class KuaishouCrawler(AbstractCrawler): self.index_url = "https://www.kuaishou.com" self.user_agent = utils.get_user_agent() self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self): playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool( + self.ip_proxy_pool = await create_ip_pool( config.IP_PROXY_POOL_COUNT, enable_validate_ip=True ) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info( ip_proxy_info ) @@ -317,6 +318,7 @@ class KuaishouCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return ks_client_obj diff --git a/media_platform/tieba/client.py b/media_platform/tieba/client.py index 6e211de..11206c4 100644 --- a/media_platform/tieba/client.py +++ b/media_platform/tieba/client.py @@ -89,6 +89,24 @@ class BaiduTieBaClient(AbstractApiClient): ) return response + async def _refresh_proxy_if_expired(self) -> None: + """ + 检测代理是否过期,如果过期则自动刷新 + """ + if self.ip_pool is None: + return + + if self.ip_pool.is_current_proxy_expired(): + utils.logger.info( + "[BaiduTieBaClient._refresh_proxy_if_expired] Proxy expired, refreshing..." + ) + new_proxy = await self.ip_pool.get_or_refresh_proxy() + # 更新代理URL + _, self.default_ip_proxy = utils.format_proxy_info(new_proxy) + utils.logger.info( + f"[BaiduTieBaClient._refresh_proxy_if_expired] New proxy: {new_proxy.ip}:{new_proxy.port}" + ) + @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) async def request(self, method, url, return_ori_content=False, proxy=None, **kwargs) -> Union[str, Any]: """ @@ -103,6 +121,9 @@ class BaiduTieBaClient(AbstractApiClient): Returns: """ + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + actual_proxy = proxy if proxy else self.default_ip_proxy # 在线程池中执行同步的requests请求 diff --git a/media_platform/weibo/client.py b/media_platform/weibo/client.py index a07f01f..6c83ea8 100644 --- a/media_platform/weibo/client.py +++ b/media_platform/weibo/client.py @@ -26,7 +26,7 @@ import asyncio import copy import json import re -from typing import Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union from urllib.parse import parse_qs, unquote, urlencode import httpx @@ -35,13 +35,17 @@ from playwright.async_api import BrowserContext, Page from tenacity import retry, stop_after_attempt, wait_fixed import config +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import DataFetchError from .field import SearchType -class WeiboClient: +class WeiboClient(ProxyRefreshMixin): def __init__( self, @@ -51,6 +55,7 @@ class WeiboClient: headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -59,9 +64,14 @@ class WeiboClient: self.playwright_page = playwright_page self.cookie_dict = cookie_dict self._image_agent_host = "https://i1.wp.com/" + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) @retry(stop=stop_after_attempt(5), wait=wait_fixed(3)) async def request(self, method, url, **kwargs) -> Union[Response, Dict]: + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + enable_return_response = kwargs.pop("return_response", False) async with httpx.AsyncClient(proxy=self.proxy) as client: response = await client.request(method, url, timeout=self.timeout, **kwargs) diff --git a/media_platform/weibo/core.py b/media_platform/weibo/core.py index 68fa6fa..9bea3f5 100644 --- a/media_platform/weibo/core.py +++ b/media_platform/weibo/core.py @@ -63,12 +63,13 @@ class WeiboCrawler(AbstractCrawler): self.user_agent = utils.get_user_agent() self.mobile_user_agent = utils.get_mobile_user_agent() self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self): playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + self.ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) async with async_playwright() as playwright: @@ -334,6 +335,7 @@ class WeiboCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return weibo_client_obj diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index dcb0cda..0a8f074 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -20,7 +20,7 @@ import asyncio import json import time -from typing import Any, Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from urllib.parse import urlencode, urlparse, parse_qs @@ -31,8 +31,11 @@ from xhshow import Xhshow import config from base.base_crawler import AbstractApiClient +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool from .exception import DataFetchError, IPBlockError from .field import SearchNoteType, SearchSortType @@ -40,7 +43,7 @@ from .help import get_search_id, sign from .extractor import XiaoHongShuExtractor -class XiaoHongShuClient(AbstractApiClient): +class XiaoHongShuClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, @@ -50,6 +53,7 @@ class XiaoHongShuClient(AbstractApiClient): headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout @@ -65,6 +69,8 @@ class XiaoHongShuClient(AbstractApiClient): self._extractor = XiaoHongShuExtractor() # 初始化 xhshow 客户端用于签名生成 self._xhshow_client = Xhshow() + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def _pre_headers(self, url: str, params: Optional[Dict] = None, payload: Optional[Dict] = None) -> Dict: """请求头参数签名 @@ -132,6 +138,9 @@ class XiaoHongShuClient(AbstractApiClient): Returns: """ + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + # return response.text return_response = kwargs.pop("return_response", False) async with httpx.AsyncClient(proxy=self.proxy) as client: @@ -201,6 +210,9 @@ class XiaoHongShuClient(AbstractApiClient): ) async def get_note_media(self, url: str) -> Union[bytes, None]: + # 请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + async with httpx.AsyncClient(proxy=self.proxy) as client: try: response = await client.request("GET", url, timeout=self.timeout) diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index cbf49e0..06f6115 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -60,12 +60,13 @@ class XiaoHongShuCrawler(AbstractCrawler): # self.user_agent = utils.get_user_agent() self.user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self) -> None: playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + self.ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info) async with async_playwright() as playwright: @@ -380,6 +381,7 @@ class XiaoHongShuCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return xhs_client_obj diff --git a/media_platform/zhihu/client.py b/media_platform/zhihu/client.py index 4f13527..1904c44 100644 --- a/media_platform/zhihu/client.py +++ b/media_platform/zhihu/client.py @@ -20,7 +20,7 @@ # -*- coding: utf-8 -*- import asyncio import json -from typing import Any, Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from urllib.parse import urlencode import httpx @@ -32,14 +32,18 @@ import config from base.base_crawler import AbstractApiClient from constant import zhihu as zhihu_constant from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator +from proxy.proxy_mixin import ProxyRefreshMixin from tools import utils +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + from .exception import DataFetchError, ForbiddenError from .field import SearchSort, SearchTime, SearchType from .help import ZhihuExtractor, sign -class ZhiHuClient(AbstractApiClient): +class ZhiHuClient(AbstractApiClient, ProxyRefreshMixin): def __init__( self, @@ -49,12 +53,15 @@ class ZhiHuClient(AbstractApiClient): headers: Dict[str, str], playwright_page: Page, cookie_dict: Dict[str, str], + proxy_ip_pool: Optional["ProxyIpPool"] = None, ): self.proxy = proxy self.timeout = timeout self.default_headers = headers self.cookie_dict = cookie_dict self._extractor = ZhihuExtractor() + # 初始化代理池(来自 ProxyRefreshMixin) + self.init_proxy_pool(proxy_ip_pool) async def _pre_headers(self, url: str) -> Dict: """ @@ -85,6 +92,9 @@ class ZhiHuClient(AbstractApiClient): Returns: """ + # 每次请求前检测代理是否过期 + await self._refresh_proxy_if_expired() + # return response.text return_response = kwargs.pop('return_response', False) diff --git a/media_platform/zhihu/core.py b/media_platform/zhihu/core.py index 0608eac..1a5f39e 100644 --- a/media_platform/zhihu/core.py +++ b/media_platform/zhihu/core.py @@ -61,6 +61,7 @@ class ZhihuCrawler(AbstractCrawler): self.user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36" self._extractor = ZhihuExtractor() self.cdp_manager = None + self.ip_proxy_pool = None # 代理IP池,用于代理自动刷新 async def start(self) -> None: """ @@ -70,10 +71,10 @@ class ZhihuCrawler(AbstractCrawler): """ playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool( + self.ip_proxy_pool = await create_ip_pool( config.IP_PROXY_POOL_COUNT, enable_validate_ip=True ) - ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() + ip_proxy_info: IpInfoModel = await self.ip_proxy_pool.get_proxy() playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info( ip_proxy_info ) @@ -411,6 +412,7 @@ class ZhihuCrawler(AbstractCrawler): }, playwright_page=self.context_page, cookie_dict=cookie_dict, + proxy_ip_pool=self.ip_proxy_pool, # 传递代理池用于自动刷新 ) return zhihu_client_obj diff --git a/proxy/proxy_ip_pool.py b/proxy/proxy_ip_pool.py index c60390e..c8f93d7 100644 --- a/proxy/proxy_ip_pool.py +++ b/proxy/proxy_ip_pool.py @@ -55,6 +55,7 @@ class ProxyIpPool: self.enable_validate_ip = enable_validate_ip self.proxy_list: List[IpInfoModel] = [] self.ip_provider: ProxyProvider = ip_provider + self.current_proxy: IpInfoModel | None = None # 当前正在使用的代理 async def load_proxies(self) -> None: """ @@ -108,8 +109,37 @@ class ProxyIpPool: raise Exception( "[ProxyIpPool.get_proxy] current ip invalid and again get it" ) + self.current_proxy = proxy # 保存当前使用的代理 return proxy + def is_current_proxy_expired(self, buffer_seconds: int = 30) -> bool: + """ + 检测当前代理是否已过期 + Args: + buffer_seconds: 缓冲时间(秒),提前多少秒认为已过期 + Returns: + bool: True表示已过期或没有当前代理,False表示仍然有效 + """ + if self.current_proxy is None: + return True + return self.current_proxy.is_expired(buffer_seconds) + + async def get_or_refresh_proxy(self, buffer_seconds: int = 30) -> IpInfoModel: + """ + 获取当前代理,如果已过期则自动刷新 + 每次发起请求前调用此方法来确保代理有效 + Args: + buffer_seconds: 缓冲时间(秒),提前多少秒认为已过期 + Returns: + IpInfoModel: 有效的代理IP信息 + """ + if self.is_current_proxy_expired(buffer_seconds): + utils.logger.info( + f"[ProxyIpPool.get_or_refresh_proxy] Current proxy expired or not set, getting new proxy..." + ) + return await self.get_proxy() + return self.current_proxy + async def _reload_proxies(self): """ # 重新加载代理池 diff --git a/proxy/proxy_mixin.py b/proxy/proxy_mixin.py new file mode 100644 index 0000000..f0ce299 --- /dev/null +++ b/proxy/proxy_mixin.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 relakkes@gmail.com +# +# This file is part of MediaCrawler project. +# Repository: https://github.com/NanmiCoder/MediaCrawler +# GitHub: https://github.com/NanmiCoder +# Licensed under NON-COMMERCIAL LEARNING LICENSE 1.1 +# + +# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: +# 1. 不得用于任何商业用途。 +# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 +# 3. 不得进行大规模爬取或对平台造成运营干扰。 +# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 +# 5. 不得用于任何非法或不当的用途。 +# +# 详细许可条款请参阅项目根目录下的LICENSE文件。 +# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 + + +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2025/11/25 +# @Desc : 代理自动刷新 Mixin 类,供各平台 client 使用 + +from typing import TYPE_CHECKING, Optional + +from tools import utils + +if TYPE_CHECKING: + from proxy.proxy_ip_pool import ProxyIpPool + + +class ProxyRefreshMixin: + """ + 代理自动刷新 Mixin 类 + + 使用方法: + 1. 让 client 类继承此 Mixin + 2. 在 client 的 __init__ 中调用 init_proxy_pool(proxy_ip_pool) + 3. 在每次 request 方法调用前调用 await _refresh_proxy_if_expired() + + 要求: + - client 类必须有 self.proxy 属性来存储当前代理URL + """ + + _proxy_ip_pool: Optional["ProxyIpPool"] = None + + def init_proxy_pool(self, proxy_ip_pool: Optional["ProxyIpPool"]) -> None: + """ + 初始化代理池引用 + Args: + proxy_ip_pool: 代理IP池实例 + """ + self._proxy_ip_pool = proxy_ip_pool + + async def _refresh_proxy_if_expired(self) -> None: + """ + 检测代理是否过期,如果过期则自动刷新 + 每次发起请求前调用此方法来确保代理有效 + """ + if self._proxy_ip_pool is None: + return + + if self._proxy_ip_pool.is_current_proxy_expired(): + utils.logger.info( + f"[{self.__class__.__name__}._refresh_proxy_if_expired] Proxy expired, refreshing..." + ) + new_proxy = await self._proxy_ip_pool.get_or_refresh_proxy() + # 更新 httpx 代理URL + if new_proxy.user and new_proxy.password: + self.proxy = f"http://{new_proxy.user}:{new_proxy.password}@{new_proxy.ip}:{new_proxy.port}" + else: + self.proxy = f"http://{new_proxy.ip}:{new_proxy.port}" + utils.logger.info( + f"[{self.__class__.__name__}._refresh_proxy_if_expired] New proxy: {new_proxy.ip}:{new_proxy.port}" + ) diff --git a/proxy/types.py b/proxy/types.py index 1c6b457..190650d 100644 --- a/proxy/types.py +++ b/proxy/types.py @@ -22,6 +22,7 @@ # @Author : relakkes@gmail.com # @Time : 2024/4/5 10:18 # @Desc : 基础类型 +import time from enum import Enum from typing import Optional @@ -41,4 +42,17 @@ class IpInfoModel(BaseModel): user: str = Field(title="IP代理认证的用户名") protocol: str = Field(default="https://", title="代理IP的协议") password: str = Field(title="IP代理认证用户的密码") - expired_time_ts: Optional[int] = Field(title="IP 过期时间") + expired_time_ts: Optional[int] = Field(default=None, title="IP 过期时间") + + def is_expired(self, buffer_seconds: int = 30) -> bool: + """ + 检测代理IP是否已过期 + Args: + buffer_seconds: 缓冲时间(秒),提前多少秒认为已过期,避免临界时间请求失败 + Returns: + bool: True表示已过期或即将过期,False表示仍然有效 + """ + if self.expired_time_ts is None: + return False + current_ts = int(time.time()) + return current_ts >= (self.expired_time_ts - buffer_seconds)