mirror of
https://github.com/opsre/LiteOps.git
synced 2026-02-23 00:20:46 +08:00
465 lines
18 KiB
Python
465 lines
18 KiB
Python
import json
|
|
import logging
|
|
from django.http import JsonResponse
|
|
from django.views import View
|
|
from django.utils.decorators import method_decorator
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.http import require_http_methods
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
from datetime import datetime, timedelta
|
|
from ..models import SecurityConfig, User, BuildTask, BuildHistory, LoginLog
|
|
from ..utils.auth import jwt_auth_required
|
|
from ..utils.permissions import get_user_permissions
|
|
|
|
logger = logging.getLogger('apps')
|
|
|
|
@method_decorator(csrf_exempt, name='dispatch')
|
|
class SecurityConfigView(View):
|
|
@method_decorator(jwt_auth_required)
|
|
def get(self, request):
|
|
"""获取安全配置"""
|
|
try:
|
|
# 获取或创建安全配置
|
|
security_config, created = SecurityConfig.objects.get_or_create(
|
|
id=1,
|
|
defaults={
|
|
'min_password_length': 8,
|
|
'password_complexity': ['lowercase', 'number'],
|
|
'session_timeout': 120,
|
|
'max_login_attempts': 5,
|
|
'lockout_duration': 30,
|
|
'enable_2fa': False
|
|
}
|
|
)
|
|
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': '获取安全配置成功',
|
|
'data': {
|
|
'min_password_length': security_config.min_password_length,
|
|
'password_complexity': security_config.password_complexity,
|
|
'session_timeout': security_config.session_timeout,
|
|
'max_login_attempts': security_config.max_login_attempts,
|
|
'lockout_duration': security_config.lockout_duration,
|
|
'enable_2fa': security_config.enable_2fa,
|
|
'watermark_enabled': security_config.watermark_enabled,
|
|
'watermark_content': security_config.watermark_content,
|
|
'watermark_show_time': security_config.watermark_show_time,
|
|
'watermark_show_username': security_config.watermark_show_username,
|
|
'update_time': security_config.update_time.strftime('%Y-%m-%d %H:%M:%S') if security_config.update_time else None
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f'获取安全配置失败: {str(e)}', exc_info=True)
|
|
return JsonResponse({
|
|
'code': 500,
|
|
'message': f'服务器错误: {str(e)}'
|
|
})
|
|
|
|
@method_decorator(jwt_auth_required)
|
|
def put(self, request):
|
|
"""更新安全配置"""
|
|
try:
|
|
with transaction.atomic():
|
|
data = json.loads(request.body)
|
|
|
|
min_password_length = data.get('min_password_length')
|
|
password_complexity = data.get('password_complexity')
|
|
session_timeout = data.get('session_timeout')
|
|
max_login_attempts = data.get('max_login_attempts')
|
|
lockout_duration = data.get('lockout_duration')
|
|
enable_2fa = data.get('enable_2fa')
|
|
watermark_enabled = data.get('watermark_enabled')
|
|
watermark_content = data.get('watermark_content')
|
|
watermark_show_time = data.get('watermark_show_time')
|
|
watermark_show_username = data.get('watermark_show_username')
|
|
|
|
# 验证输入数据
|
|
if min_password_length is not None:
|
|
if not isinstance(min_password_length, int) or min_password_length < 6 or min_password_length > 20:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '密码最小长度必须在6-20之间'
|
|
})
|
|
|
|
if password_complexity is not None:
|
|
if not isinstance(password_complexity, list):
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '密码复杂度要求格式错误'
|
|
})
|
|
valid_complexity = ['uppercase', 'lowercase', 'number', 'special']
|
|
for item in password_complexity:
|
|
if item not in valid_complexity:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': f'无效的密码复杂度要求: {item}'
|
|
})
|
|
|
|
if session_timeout is not None:
|
|
if not isinstance(session_timeout, int) or session_timeout < 10 or session_timeout > 1440:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '会话超时时间必须在10-1440分钟之间'
|
|
})
|
|
|
|
if max_login_attempts is not None:
|
|
if not isinstance(max_login_attempts, int) or max_login_attempts < 3 or max_login_attempts > 10:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '最大登录尝试次数必须在3-10次之间'
|
|
})
|
|
|
|
if lockout_duration is not None:
|
|
if not isinstance(lockout_duration, int) or lockout_duration < 5 or lockout_duration > 60:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '账户锁定时间必须在5-60分钟之间'
|
|
})
|
|
|
|
if watermark_content is not None:
|
|
if not isinstance(watermark_content, str) or len(watermark_content.strip()) == 0:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '水印内容不能为空'
|
|
})
|
|
if len(watermark_content) > 500:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '水印内容长度不能超过500字符'
|
|
})
|
|
|
|
# 获取或创建安全配置
|
|
security_config, created = SecurityConfig.objects.get_or_create(id=1)
|
|
|
|
# 更新配置
|
|
if min_password_length is not None:
|
|
security_config.min_password_length = min_password_length
|
|
if password_complexity is not None:
|
|
security_config.password_complexity = password_complexity
|
|
if session_timeout is not None:
|
|
security_config.session_timeout = session_timeout
|
|
if max_login_attempts is not None:
|
|
security_config.max_login_attempts = max_login_attempts
|
|
if lockout_duration is not None:
|
|
security_config.lockout_duration = lockout_duration
|
|
if enable_2fa is not None:
|
|
security_config.enable_2fa = enable_2fa
|
|
if watermark_enabled is not None:
|
|
security_config.watermark_enabled = watermark_enabled
|
|
if watermark_content is not None:
|
|
security_config.watermark_content = watermark_content
|
|
if watermark_show_time is not None:
|
|
security_config.watermark_show_time = watermark_show_time
|
|
if watermark_show_username is not None:
|
|
security_config.watermark_show_username = watermark_show_username
|
|
|
|
security_config.save()
|
|
|
|
# 记录操作日志
|
|
user = User.objects.get(user_id=request.user_id)
|
|
logger.info(f'用户[{user.username}]更新了安全配置')
|
|
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': '安全配置更新成功'
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f'更新安全配置失败: {str(e)}', exc_info=True)
|
|
return JsonResponse({
|
|
'code': 500,
|
|
'message': f'服务器错误: {str(e)}'
|
|
})
|
|
|
|
|
|
@csrf_exempt
|
|
@jwt_auth_required
|
|
@require_http_methods(["GET"])
|
|
def get_build_tasks_for_cleanup(request):
|
|
"""获取可用于日志清理的构建任务列表"""
|
|
try:
|
|
# 获取用户权限信息
|
|
user_permissions = get_user_permissions(request.user_id)
|
|
data_permissions = user_permissions.get('data', {})
|
|
|
|
# 检查用户是否有系统基本设置权限
|
|
function_permissions = user_permissions.get('function', {})
|
|
system_permissions = function_permissions.get('system_basic', [])
|
|
|
|
if 'view' not in system_permissions:
|
|
logger.warning(f'用户[{request.user_id}]没有系统基本设置查看权限')
|
|
return JsonResponse({
|
|
'code': 403,
|
|
'message': '没有权限查看构建任务'
|
|
}, status=403)
|
|
|
|
# 应用项目权限过滤
|
|
project_scope = data_permissions.get('project_scope', 'all')
|
|
if project_scope == 'custom':
|
|
permitted_project_ids = data_permissions.get('project_ids', [])
|
|
if not permitted_project_ids:
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': '获取构建任务列表成功',
|
|
'data': []
|
|
})
|
|
tasks = BuildTask.objects.filter(project__project_id__in=permitted_project_ids).select_related('project')
|
|
else:
|
|
tasks = BuildTask.objects.all().select_related('project')
|
|
|
|
# 格式化返回数据
|
|
task_list = []
|
|
for task in tasks:
|
|
task_list.append({
|
|
'task_id': task.task_id,
|
|
'name': task.name,
|
|
'project_name': task.project.name if task.project else '未知项目',
|
|
'total_builds': task.total_builds
|
|
})
|
|
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': '获取构建任务列表成功',
|
|
'data': task_list
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f'获取构建任务列表失败: {str(e)}', exc_info=True)
|
|
return JsonResponse({
|
|
'code': 500,
|
|
'message': f'服务器错误: {str(e)}'
|
|
})
|
|
|
|
|
|
@csrf_exempt
|
|
@jwt_auth_required
|
|
@require_http_methods(["POST"])
|
|
def cleanup_build_logs(request):
|
|
"""清理构建日志"""
|
|
try:
|
|
# 获取用户权限信息
|
|
user_permissions = get_user_permissions(request.user_id)
|
|
|
|
# 检查用户是否有系统基本设置编辑权限
|
|
function_permissions = user_permissions.get('function', {})
|
|
system_permissions = function_permissions.get('system_basic', [])
|
|
|
|
if 'edit' not in system_permissions:
|
|
logger.warning(f'用户[{request.user_id}]没有系统基本设置编辑权限')
|
|
return JsonResponse({
|
|
'code': 403,
|
|
'message': '没有权限执行日志清理操作'
|
|
}, status=403)
|
|
|
|
data = json.loads(request.body)
|
|
task_ids = data.get('task_ids', [])
|
|
days_before = data.get('days_before', 30)
|
|
|
|
if not isinstance(days_before, int) or days_before < 1 or days_before > 365:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '保留天数必须在1-365天之间'
|
|
})
|
|
|
|
# 计算截止日期
|
|
cutoff_date = timezone.now() - timedelta(days=days_before)
|
|
|
|
# 查询要删除的构建历史记录
|
|
if task_ids:
|
|
# 清理指定任务的日志
|
|
histories_to_delete = BuildHistory.objects.filter(
|
|
task__task_id__in=task_ids,
|
|
create_time__lt=cutoff_date
|
|
)
|
|
else:
|
|
# 清理所有任务的日志
|
|
histories_to_delete = BuildHistory.objects.filter(
|
|
create_time__lt=cutoff_date
|
|
)
|
|
|
|
# 统计信息
|
|
total_count = histories_to_delete.count()
|
|
|
|
if total_count == 0:
|
|
task_desc = f"{len(task_ids)}个指定任务" if task_ids else "所有任务"
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': f'没有找到{task_desc}中需要清理的构建日志记录',
|
|
'data': {
|
|
'deleted_count': 0,
|
|
'task_count': len(task_ids) if task_ids else 0,
|
|
'days_before': days_before
|
|
}
|
|
})
|
|
|
|
# 执行删除操作
|
|
with transaction.atomic():
|
|
deleted_count, _ = histories_to_delete.delete()
|
|
|
|
# 记录操作日志
|
|
user = User.objects.get(user_id=request.user_id)
|
|
task_desc = f"{len(task_ids)}个指定任务" if task_ids else "所有任务"
|
|
logger.info(f'用户[{user.username}]清理了{task_desc}{days_before}天前的构建日志,共删除{deleted_count}条记录')
|
|
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': f'构建日志清理完成,共删除{deleted_count}条记录',
|
|
'data': {
|
|
'deleted_count': deleted_count,
|
|
'task_count': len(task_ids) if task_ids else 0,
|
|
'days_before': days_before
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f'清理构建日志失败: {str(e)}', exc_info=True)
|
|
return JsonResponse({
|
|
'code': 500,
|
|
'message': f'服务器错误: {str(e)}'
|
|
})
|
|
|
|
|
|
@csrf_exempt
|
|
@jwt_auth_required
|
|
@require_http_methods(["POST"])
|
|
def cleanup_login_logs(request):
|
|
"""清理登录日志"""
|
|
try:
|
|
# 获取用户权限信息
|
|
user_permissions = get_user_permissions(request.user_id)
|
|
|
|
# 检查用户是否有系统基本设置编辑权限
|
|
function_permissions = user_permissions.get('function', {})
|
|
system_permissions = function_permissions.get('system_basic', [])
|
|
|
|
if 'edit' not in system_permissions:
|
|
logger.warning(f'用户[{request.user_id}]没有系统基本设置编辑权限')
|
|
return JsonResponse({
|
|
'code': 403,
|
|
'message': '没有权限执行日志清理操作'
|
|
}, status=403)
|
|
|
|
data = json.loads(request.body)
|
|
days_before = data.get('days_before', 30)
|
|
|
|
# 验证输入参数
|
|
if not isinstance(days_before, int) or days_before < 1 or days_before > 365:
|
|
return JsonResponse({
|
|
'code': 400,
|
|
'message': '保留天数必须在1-365天之间'
|
|
})
|
|
|
|
# 计算截止日期
|
|
cutoff_date = timezone.now() - timedelta(days=days_before)
|
|
|
|
# 查询要删除的登录日志记录
|
|
logs_to_delete = LoginLog.objects.filter(
|
|
login_time__lt=cutoff_date
|
|
)
|
|
|
|
# 统计信息
|
|
total_count = logs_to_delete.count()
|
|
|
|
if total_count == 0:
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': '没有找到需要清理的登录日志记录',
|
|
'data': {
|
|
'deleted_count': 0,
|
|
'days_before': days_before
|
|
}
|
|
})
|
|
|
|
# 执行删除操作
|
|
with transaction.atomic():
|
|
deleted_count, _ = logs_to_delete.delete()
|
|
|
|
# 记录操作日志
|
|
user = User.objects.get(user_id=request.user_id)
|
|
logger.info(f'用户[{user.username}]清理了{days_before}天前的登录日志,共删除{deleted_count}条记录')
|
|
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': f'登录日志清理完成,共删除{deleted_count}条记录',
|
|
'data': {
|
|
'deleted_count': deleted_count,
|
|
'days_before': days_before
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f'清理登录日志失败: {str(e)}', exc_info=True)
|
|
return JsonResponse({
|
|
'code': 500,
|
|
'message': f'服务器错误: {str(e)}'
|
|
})
|
|
|
|
|
|
@csrf_exempt
|
|
@require_http_methods(["GET"])
|
|
def get_watermark_config(request):
|
|
"""获取水印配置"""
|
|
try:
|
|
# 获取安全配置
|
|
security_config, created = SecurityConfig.objects.get_or_create(
|
|
id=1,
|
|
defaults={
|
|
'min_password_length': 8,
|
|
'password_complexity': ['lowercase', 'number'],
|
|
'session_timeout': 120,
|
|
'max_login_attempts': 5,
|
|
'lockout_duration': 30,
|
|
'enable_2fa': False
|
|
}
|
|
)
|
|
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': '获取水印配置成功',
|
|
'data': {
|
|
'watermark_enabled': security_config.watermark_enabled,
|
|
'watermark_content': security_config.watermark_content,
|
|
'watermark_show_time': security_config.watermark_show_time,
|
|
'watermark_show_username': security_config.watermark_show_username
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f'获取水印配置失败: {str(e)}', exc_info=True)
|
|
return JsonResponse({
|
|
'code': 500,
|
|
'message': f'服务器错误: {str(e)}'
|
|
})
|
|
|
|
|
|
@csrf_exempt
|
|
@jwt_auth_required
|
|
@require_http_methods(["GET"])
|
|
def get_current_user_info(request):
|
|
"""获取当前用户信息"""
|
|
try:
|
|
user = User.objects.get(user_id=request.user_id)
|
|
|
|
return JsonResponse({
|
|
'code': 200,
|
|
'message': '获取用户信息成功',
|
|
'data': {
|
|
'username': user.username,
|
|
'name': user.name or user.username
|
|
}
|
|
})
|
|
|
|
except User.DoesNotExist:
|
|
return JsonResponse({
|
|
'code': 404,
|
|
'message': '用户不存在'
|
|
})
|
|
except Exception as e:
|
|
logger.error(f'获取用户信息失败: {str(e)}', exc_info=True)
|
|
return JsonResponse({
|
|
'code': 500,
|
|
'message': f'服务器错误: {str(e)}'
|
|
}) |