Files
LiteOps/backend/apps/views/user.py
2025-06-12 16:48:37 +08:00

334 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import uuid
import hashlib
import logging
from django.http import JsonResponse
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.db import transaction
from django.db.models import Q
from ..models import User, Role, UserRole
from ..utils.auth import jwt_auth_required
from ..utils.security import SecurityValidator
logger = logging.getLogger('apps')
def generate_id():
"""生成唯一ID"""
return hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest()[:32]
@method_decorator(csrf_exempt, name='dispatch')
class UserView(View):
@method_decorator(jwt_auth_required)
def get(self, request):
"""获取用户列表"""
try:
user_id = request.GET.get('user_id')
username = request.GET.get('username')
email = request.GET.get('email')
status = request.GET.get('status')
# 构建查询条件
query = {}
if user_id:
query['user_id'] = user_id
if username:
query['username__icontains'] = username # 使用 icontains 进行不区分大小写的模糊查询
if email:
query['email__icontains'] = email
if status:
query['status'] = status
# 使用查询条件过滤用户
users = User.objects.filter(**query)
user_list = []
for user in users:
# 获取用户角色
user_roles = UserRole.objects.filter(user=user).select_related('role')
roles = [{"role_id": ur.role.role_id, "name": ur.role.name} for ur in user_roles]
user_list.append({
'user_id': user.user_id,
'username': user.username,
'name': user.name,
'email': user.email,
'status': user.status,
'roles': roles,
'login_time': user.login_time.strftime('%Y-%m-%d %H:%M:%S') if user.login_time else None,
'create_time': user.create_time.strftime('%Y-%m-%d %H:%M:%S'),
})
return JsonResponse({
'code': 200,
'message': '获取用户列表成功',
'data': user_list
})
except Exception as e:
logger.error(f'获取用户列表失败: {str(e)}', exc_info=True)
return JsonResponse({
'code': 500,
'message': f'服务器错误: {str(e)}'
})
@method_decorator(jwt_auth_required)
def post(self, request):
"""创建用户"""
try:
with transaction.atomic():
data = json.loads(request.body)
username = data.get('username')
name = data.get('name')
password = data.get('password')
email = data.get('email')
role_ids = data.get('role_ids', [])
if not all([username, name, password, email]):
return JsonResponse({
'code': 400,
'message': '用户名、姓名、密码和邮箱不能为空'
})
# 检查用户名是否已存在
if User.objects.filter(username=username).exists():
return JsonResponse({
'code': 400,
'message': '用户名已存在'
})
# 检查邮箱是否已存在
if User.objects.filter(email=email).exists():
return JsonResponse({
'code': 400,
'message': '邮箱已存在'
})
# 验证密码强度
is_valid, message = SecurityValidator.validate_password(password)
if not is_valid:
return JsonResponse({
'code': 400,
'message': message
})
# 密码加密
password_hash = hashlib.sha256(password.encode()).hexdigest()
# 创建用户
user = User.objects.create(
user_id=generate_id(),
username=username,
name=name,
password=password_hash,
email=email,
status=1 # 默认启用
)
# 分配角色
for role_id in role_ids:
try:
role = Role.objects.get(role_id=role_id)
UserRole.objects.create(user=user, role=role)
except Role.DoesNotExist:
logger.warning(f'角色不存在: {role_id}')
return JsonResponse({
'code': 200,
'message': '创建用户成功',
'data': {
'user_id': user.user_id,
'username': user.username
}
})
except Exception as e:
logger.error(f'创建用户失败: {str(e)}', exc_info=True)
return JsonResponse({
'code': 500,
'message': f'服务器错误: {str(e)}'
})
@method_decorator(jwt_auth_required)
def put(self, request):
"""编辑用户"""
try:
with transaction.atomic():
data = json.loads(request.body)
user_id = data.get('user_id')
name = data.get('name')
email = data.get('email')
status = data.get('status')
password = data.get('password')
role_ids = data.get('role_ids')
if not user_id:
return JsonResponse({
'code': 400,
'message': '用户ID不能为空'
})
try:
user = User.objects.get(user_id=user_id)
except User.DoesNotExist:
return JsonResponse({
'code': 404,
'message': '用户不存在'
})
# 更新用户信息
if name:
user.name = name
if email and email != user.email:
# 检查邮箱是否已存在
if User.objects.filter(email=email).exclude(user_id=user_id).exists():
return JsonResponse({
'code': 400,
'message': '邮箱已存在'
})
user.email = email
if status is not None:
# 如果是解锁操作从0改为1使用安全验证器的解锁方法
if user.status == 0 and status == 1:
success, message = SecurityValidator.unlock_user_account(user)
if not success:
return JsonResponse({
'code': 400,
'message': message
})
# 如果是锁定操作从1改为0使用安全验证器的锁定方法
elif user.status == 1 and status == 0:
success, message = SecurityValidator.lock_user_account(user)
if not success:
return JsonResponse({
'code': 400,
'message': message
})
else:
user.status = status
if password:
# 验证密码强度
is_valid, message = SecurityValidator.validate_password(password)
if not is_valid:
return JsonResponse({
'code': 400,
'message': message
})
# 密码加密
password_hash = hashlib.sha256(password.encode()).hexdigest()
user.password = password_hash
user.save()
# 更新角色
if role_ids is not None:
# 删除旧角色关联
UserRole.objects.filter(user=user).delete()
# 添加新角色关联
for role_id in role_ids:
try:
role = Role.objects.get(role_id=role_id)
UserRole.objects.create(user=user, role=role)
except Role.DoesNotExist:
logger.warning(f'角色不存在: {role_id}')
return JsonResponse({
'code': 200,
'message': '更新用户成功'
})
except Exception as e:
logger.error(f'更新用户失败: {str(e)}', exc_info=True)
return JsonResponse({
'code': 500,
'message': f'服务器错误: {str(e)}'
})
@method_decorator(jwt_auth_required)
def delete(self, request):
"""删除用户"""
try:
with transaction.atomic():
data = json.loads(request.body)
user_id = data.get('user_id')
if not user_id:
return JsonResponse({
'code': 400,
'message': '用户ID不能为空'
})
try:
user = User.objects.get(user_id=user_id)
# 删除关联的角色
UserRole.objects.filter(user=user).delete()
# 删除用户
user.delete()
return JsonResponse({
'code': 200,
'message': '删除用户成功'
})
except User.DoesNotExist:
return JsonResponse({
'code': 404,
'message': '用户不存在'
})
except Exception as e:
logger.error(f'删除用户失败: {str(e)}', exc_info=True)
return JsonResponse({
'code': 500,
'message': f'服务器错误: {str(e)}'
})
@method_decorator(csrf_exempt, name='dispatch')
class UserProfileView(View):
@method_decorator(jwt_auth_required)
def get(self, request):
"""获取当前登录用户的个人信息"""
try:
user_id = request.user_id
try:
user = User.objects.get(user_id=user_id)
except User.DoesNotExist:
return JsonResponse({
'code': 404,
'message': '用户不存在'
})
# 获取用户角色
user_roles = UserRole.objects.filter(user=user).select_related('role')
roles = [{
"role_id": ur.role.role_id,
"name": ur.role.name,
"description": ur.role.description
} for ur in user_roles]
# 构建用户信息
user_info = {
'user_id': user.user_id,
'username': user.username,
'name': user.name,
'email': user.email,
'status': user.status,
'roles': roles,
'login_time': user.login_time.strftime('%Y-%m-%d %H:%M:%S') if user.login_time else None,
'create_time': user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else None,
'update_time': user.update_time.strftime('%Y-%m-%d %H:%M:%S') if user.update_time else None,
}
return JsonResponse({
'code': 200,
'message': '获取用户信息成功',
'data': user_info
})
except Exception as e:
logger.error(f'获取用户信息失败: {str(e)}', exc_info=True)
return JsonResponse({
'code': 500,
'message': f'服务器错误: {str(e)}'
})