Files
LiteOps/backend/apps/utils/security.py
2025-06-12 16:48:37 +08:00

214 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
import hashlib
import logging
from datetime import datetime, timedelta
from django.utils import timezone
from ..models import SecurityConfig, LoginAttempt, User
logger = logging.getLogger('apps')
class SecurityValidator:
"""安全验证工具类"""
@staticmethod
def get_security_config():
"""获取安全配置"""
try:
config, created = SecurityConfig.objects.get_or_create(
id=1,
defaults={
'min_password_length': 8,
'password_complexity': ['lowercase', 'number'],
'session_timeout': 120,
'max_login_attempts': 5,
'lockout_duration': 30,
'enable_2fa': False
}
)
return config
except Exception as e:
logger.error(f'获取安全配置失败: {str(e)}')
# 返回默认配置
return type('SecurityConfig', (), {
'min_password_length': 8,
'password_complexity': ['lowercase', 'number'],
'session_timeout': 120,
'max_login_attempts': 5,
'lockout_duration': 30,
'enable_2fa': False
})()
@staticmethod
def validate_password(password):
"""验证密码是否符合安全策略"""
config = SecurityValidator.get_security_config()
# 检查密码长度
if len(password) < config.min_password_length:
return False, f'密码长度不能少于{config.min_password_length}'
# 检查密码复杂度
complexity_checks = {
'uppercase': (r'[A-Z]', '大写字母'),
'lowercase': (r'[a-z]', '小写字母'),
'number': (r'[0-9]', '数字'),
'special': (r'[!@#$%^&*(),.?":{}|<>]', '特殊字符')
}
missing_requirements = []
for requirement in config.password_complexity:
if requirement in complexity_checks:
pattern, description = complexity_checks[requirement]
if not re.search(pattern, password):
missing_requirements.append(description)
if missing_requirements:
return False, f'密码必须包含: {", ".join(missing_requirements)}'
return True, '密码验证通过'
@staticmethod
def check_account_lockout(user, ip_address):
"""检查账户是否被锁定"""
try:
# 首先检查用户表中的status字段
if user.status == 0:
return False, '账户已被锁定,请联系管理员解锁'
config = SecurityValidator.get_security_config()
# 获取登录尝试记录
try:
attempt = LoginAttempt.objects.get(user=user, ip_address=ip_address)
except LoginAttempt.DoesNotExist:
return True, '账户未被锁定'
# 如果账户被锁定,检查是否已过期
if attempt.locked_until and attempt.locked_until > timezone.now():
remaining_time = attempt.locked_until - timezone.now()
minutes = int(remaining_time.total_seconds() / 60)
return False, f'账户因登录失败次数过多被临时锁定,请在{minutes}分钟后重试'
# 如果临时锁定已过期,重置失败次数并解锁
if attempt.locked_until and attempt.locked_until <= timezone.now():
attempt.failed_attempts = 0
attempt.locked_until = None
attempt.save()
# 如果用户被系统锁定status=0检查是否需要自动解锁
if user.status == 0:
# 暂时保持锁定状态,需要管理员手动解锁
return False, '账户已被锁定,请联系管理员解锁'
return True, '账户未被锁定'
except Exception as e:
logger.error(f'检查账户锁定状态失败: {str(e)}')
return True, '锁定检查跳过'
@staticmethod
def record_failed_login(user, ip_address):
"""记录登录失败"""
try:
config = SecurityValidator.get_security_config()
# 获取或创建登录尝试记录
attempt, created = LoginAttempt.objects.get_or_create(
user=user,
ip_address=ip_address,
defaults={'failed_attempts': 0}
)
# 增加失败次数
attempt.failed_attempts += 1
attempt.last_attempt_time = timezone.now()
# 如果达到最大失败次数,锁定账户
if attempt.failed_attempts >= config.max_login_attempts:
attempt.locked_until = timezone.now() + timedelta(minutes=config.lockout_duration)
# 同时将用户状态设置为锁定
user.status = 0
user.save()
logger.warning(f'用户[{user.username}]账户因多次登录失败被锁定IP: {ip_address}')
attempt.save()
return attempt.failed_attempts, config.max_login_attempts
except Exception as e:
logger.error(f'记录登录失败失败: {str(e)}')
return 0, 5
@staticmethod
def record_successful_login(user, ip_address):
"""记录登录成功,重置失败次数"""
try:
# 清除登录失败记录
LoginAttempt.objects.filter(
user=user,
ip_address=ip_address
).delete()
except Exception as e:
logger.error(f'重置登录失败记录失败: {str(e)}')
@staticmethod
def unlock_user_account(user):
"""解锁用户账户(管理员操作)"""
try:
# 清除所有登录尝试记录
LoginAttempt.objects.filter(user=user).delete()
# 解锁用户状态
if user.status == 0:
user.status = 1
user.save()
logger.info(f'管理员解锁了用户[{user.username}]的账户')
return True, '账户解锁成功'
return True, '账户状态正常'
except Exception as e:
logger.error(f'解锁用户账户失败: {str(e)}')
return False, '解锁失败'
@staticmethod
def lock_user_account(user):
"""锁定用户账户(管理员操作)"""
try:
# 锁定用户状态
if user.status == 1:
user.status = 0
user.save()
logger.info(f'管理员锁定了用户[{user.username}]的账户')
return True, '账户锁定成功'
return True, '账户已处于锁定状态'
except Exception as e:
logger.error(f'锁定用户账户失败: {str(e)}')
return False, '锁定失败'
@staticmethod
def is_session_expired(login_time):
"""检查会话是否过期"""
try:
config = SecurityValidator.get_security_config()
if not login_time:
return True
# 计算会话过期时间
session_expire_time = login_time + timedelta(minutes=config.session_timeout)
return timezone.now() > session_expire_time
except Exception as e:
logger.error(f'检查会话过期失败: {str(e)}')
return False
def validate_password_strength(password):
"""独立的密码强度验证函数,供其他模块使用"""
return SecurityValidator.validate_password(password)