mirror of
https://github.com/NanmiCoder/MediaCrawler.git
synced 2026-06-08 19:07:33 +08:00
feat: ip proxy expired check
This commit is contained in:
@@ -22,6 +22,7 @@
|
||||
# @Author : relakkes@gmail.com
|
||||
# @Time : 2024/4/5 10:18
|
||||
# @Desc : 基础类型
|
||||
import time
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
@@ -41,4 +42,17 @@ class IpInfoModel(BaseModel):
|
||||
user: str = Field(title="IP代理认证的用户名")
|
||||
protocol: str = Field(default="https://", title="代理IP的协议")
|
||||
password: str = Field(title="IP代理认证用户的密码")
|
||||
expired_time_ts: Optional[int] = Field(title="IP 过期时间")
|
||||
expired_time_ts: Optional[int] = Field(default=None, title="IP 过期时间")
|
||||
|
||||
def is_expired(self, buffer_seconds: int = 30) -> bool:
|
||||
"""
|
||||
检测代理IP是否已过期
|
||||
Args:
|
||||
buffer_seconds: 缓冲时间(秒),提前多少秒认为已过期,避免临界时间请求失败
|
||||
Returns:
|
||||
bool: True表示已过期或即将过期,False表示仍然有效
|
||||
"""
|
||||
if self.expired_time_ts is None:
|
||||
return False
|
||||
current_ts = int(time.time())
|
||||
return current_ts >= (self.expired_time_ts - buffer_seconds)
|
||||
|
||||
Reference in New Issue
Block a user