Files
LiteOps/backend/apps/utils/permissions.py
2025-06-12 16:48:37 +08:00

128 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
from ..models import User, UserRole
logger = logging.getLogger('apps')
def get_user_permissions(user_id):
"""
获取用户的权限信息
Args:
user_id: 用户ID
Returns:
dict: 用户权限信息,包含菜单权限、功能权限和数据权限
"""
try:
# 获取用户信息
try:
user = User.objects.get(user_id=user_id)
except User.DoesNotExist:
logger.error(f'获取用户权限时用户不存在: {user_id}')
return {
'menu': [],
'function': {},
'data': {
'project_scope': 'all',
'project_ids': [],
'environment_scope': 'all',
'environment_types': []
}
}
# 获取用户的所有角色
user_roles = UserRole.objects.filter(user=user).select_related('role')
# 合并所有角色的权限
menu_permissions = set()
function_permissions = {}
data_permissions = {
'project_scope': 'all',
'project_ids': [],
'environment_scope': 'all',
'environment_types': []
}
has_custom_project_scope = False
has_custom_environment_scope = False
for user_role in user_roles:
role = user_role.role
permissions = role.permissions
# 如果permissions是字符串解析为JSON
if isinstance(permissions, str):
try:
permissions = json.loads(permissions)
except:
permissions = {}
# 合并菜单权限
if permissions.get('menu'):
menu_permissions.update(permissions['menu'])
# 合并功能权限
if permissions.get('function'):
for module, actions in permissions['function'].items():
if module not in function_permissions:
function_permissions[module] = []
function_permissions[module].extend(actions)
# 确保不重复
function_permissions[module] = list(set(function_permissions[module]))
# 合并数据权限
if permissions.get('data'):
data_perms = permissions['data']
# 项目权限
if data_perms.get('project_scope') == 'custom':
has_custom_project_scope = True
if data_permissions['project_scope'] == 'all':
data_permissions['project_scope'] = 'custom'
data_permissions['project_ids'] = data_perms.get('project_ids', [])
else:
# 合并项目ID列表
data_permissions['project_ids'].extend(data_perms.get('project_ids', []))
# 确保不重复
data_permissions['project_ids'] = list(set(data_permissions['project_ids']))
# 环境权限
if data_perms.get('environment_scope') == 'custom':
has_custom_environment_scope = True
if data_permissions['environment_scope'] == 'all':
data_permissions['environment_scope'] = 'custom'
data_permissions['environment_types'] = data_perms.get('environment_types', [])
else:
# 合并环境类型列表
data_permissions['environment_types'].extend(data_perms.get('environment_types', []))
# 确保不重复
data_permissions['environment_types'] = list(set(data_permissions['environment_types']))
# 如果没有任何角色有自定义项目/环境范围,保持为'all'
if not has_custom_project_scope:
data_permissions['project_scope'] = 'all'
data_permissions['project_ids'] = []
if not has_custom_environment_scope:
data_permissions['environment_scope'] = 'all'
data_permissions['environment_types'] = []
return {
'menu': list(menu_permissions),
'function': function_permissions,
'data': data_permissions
}
except Exception as e:
logger.error(f'获取用户权限失败: {str(e)}', exc_info=True)
# 返回默认的空权限
return {
'menu': [],
'function': {},
'data': {
'project_scope': 'all',
'project_ids': [],
'environment_scope': 'all',
'environment_types': []
}
}