Files
LiteOps/backend/apps/utils/log_stream.py
2025-06-12 16:48:37 +08:00

194 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import threading
import queue
import time
import logging
from typing import Dict, Set, Optional
from dataclasses import dataclass
logger = logging.getLogger('apps')
@dataclass
class LogMessage:
"""日志消息数据类"""
task_id: str
build_number: int
message: str
stage: Optional[str] = None
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
class LogStreamManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self._initialized = True
# 存储每个构建的日志队列 {(task_id, build_number): queue.Queue}
self._log_queues: Dict[tuple, queue.Queue] = {}
# 存储每个构建的SSE客户端集合 {(task_id, build_number): set}
self._sse_clients: Dict[tuple, Set] = {}
# 线程锁
self._queues_lock = threading.Lock()
self._clients_lock = threading.Lock()
logger.info("LogStreamManager initialized")
def get_build_key(self, task_id: str, build_number: int) -> tuple:
"""获取构建的唯一键"""
return (task_id, build_number)
def create_build_stream(self, task_id: str, build_number: int):
"""为新构建创建日志流"""
build_key = self.get_build_key(task_id, build_number)
with self._queues_lock:
if build_key not in self._log_queues:
self._log_queues[build_key] = queue.Queue(maxsize=20000) # 最大缓存20000条日志
logger.info(f"Created log stream for build {task_id}#{build_number}")
with self._clients_lock:
if build_key not in self._sse_clients:
self._sse_clients[build_key] = set()
def add_sse_client(self, task_id: str, build_number: int, client_id: str):
"""添加SSE客户端"""
build_key = self.get_build_key(task_id, build_number)
with self._clients_lock:
if build_key not in self._sse_clients:
self._sse_clients[build_key] = set()
self._sse_clients[build_key].add(client_id)
logger.info(f"Added SSE client {client_id} for build {task_id}#{build_number}")
def remove_sse_client(self, task_id: str, build_number: int, client_id: str):
"""移除SSE客户端"""
build_key = self.get_build_key(task_id, build_number)
with self._clients_lock:
if build_key in self._sse_clients:
self._sse_clients[build_key].discard(client_id)
logger.info(f"Removed SSE client {client_id} for build {task_id}#{build_number}")
if not self._sse_clients[build_key]:
del self._sse_clients[build_key]
self._cleanup_build_stream(build_key)
def _cleanup_build_stream(self, build_key: tuple):
"""清理构建流资源"""
with self._queues_lock:
if build_key in self._log_queues:
del self._log_queues[build_key]
logger.info(f"Cleaned up log stream for build {build_key[0]}#{build_key[1]}")
def push_log(self, task_id: str, build_number: int, message: str, stage: Optional[str] = None):
"""推送日志消息到流"""
build_key = self.get_build_key(task_id, build_number)
# 创建日志消息
log_msg = LogMessage(
task_id=task_id,
build_number=build_number,
message=message,
stage=stage
)
# 推送到队列
with self._queues_lock:
if build_key not in self._log_queues:
# 如果队列不存在,先创建
self._log_queues[build_key] = queue.Queue(maxsize=20000)
logger.debug(f"Created log queue for build {task_id}#{build_number} during push")
try:
# 非阻塞推送,如果队列满了就丢弃最老的消息
if self._log_queues[build_key].full():
try:
self._log_queues[build_key].get_nowait() # 移除最老的消息
except queue.Empty:
pass
self._log_queues[build_key].put_nowait(log_msg)
except queue.Full:
logger.warning(f"Log queue full for build {task_id}#{build_number}, dropping message")
def get_log_stream(self, task_id: str, build_number: int, client_id: str):
"""获取日志流生成器"""
build_key = self.get_build_key(task_id, build_number)
# 确保流存在
self.create_build_stream(task_id, build_number)
self.add_sse_client(task_id, build_number, client_id)
try:
while True:
try:
# 检查客户端是否还在连接
with self._clients_lock:
if build_key not in self._sse_clients or client_id not in self._sse_clients[build_key]:
logger.info(f"Client {client_id} disconnected from build {task_id}#{build_number}")
break
# 获取日志队列
with self._queues_lock:
log_queue = self._log_queues.get(build_key)
if log_queue is None:
break
try:
# 等待新日志消息超时时间为1秒
log_msg = log_queue.get(timeout=1.0)
yield log_msg
except queue.Empty:
# 超时,发送心跳
yield None # None表示心跳
except Exception as e:
logger.error(f"Error in log stream for {task_id}#{build_number}: {str(e)}")
break
finally:
# 清理客户端
self.remove_sse_client(task_id, build_number, client_id)
def complete_build(self, task_id: str, build_number: int, status: str):
"""标记构建完成"""
build_key = self.get_build_key(task_id, build_number)
# 推送完成消息
completion_msg = LogMessage(
task_id=task_id,
build_number=build_number,
message=f"BUILD_COMPLETE:{status}",
stage="SYSTEM"
)
with self._queues_lock:
if build_key in self._log_queues:
try:
self._log_queues[build_key].put_nowait(completion_msg)
except queue.Full:
pass
def has_active_clients(self, task_id: str, build_number: int) -> bool:
"""检查是否有活跃的SSE客户端"""
build_key = self.get_build_key(task_id, build_number)
with self._clients_lock:
return build_key in self._sse_clients and len(self._sse_clients[build_key]) > 0
# 全局单例实例
log_stream_manager = LogStreamManager()