mirror of
https://github.com/opsre/LiteOps.git
synced 2026-02-19 05:21:21 +08:00
194 lines
7.3 KiB
Python
194 lines
7.3 KiB
Python
import threading
|
||
import queue
|
||
import time
|
||
import logging
|
||
from typing import Dict, Set, Optional
|
||
from dataclasses import dataclass
|
||
|
||
logger = logging.getLogger('apps')
|
||
|
||
@dataclass
|
||
class LogMessage:
|
||
"""日志消息数据类"""
|
||
task_id: str
|
||
build_number: int
|
||
message: str
|
||
stage: Optional[str] = None
|
||
timestamp: float = None
|
||
|
||
def __post_init__(self):
|
||
if self.timestamp is None:
|
||
self.timestamp = time.time()
|
||
|
||
class LogStreamManager:
|
||
|
||
_instance = None
|
||
_lock = threading.Lock()
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
with cls._lock:
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
if hasattr(self, '_initialized'):
|
||
return
|
||
|
||
self._initialized = True
|
||
# 存储每个构建的日志队列 {(task_id, build_number): queue.Queue}
|
||
self._log_queues: Dict[tuple, queue.Queue] = {}
|
||
# 存储每个构建的SSE客户端集合 {(task_id, build_number): set}
|
||
self._sse_clients: Dict[tuple, Set] = {}
|
||
# 线程锁
|
||
self._queues_lock = threading.Lock()
|
||
self._clients_lock = threading.Lock()
|
||
|
||
logger.info("LogStreamManager initialized")
|
||
|
||
def get_build_key(self, task_id: str, build_number: int) -> tuple:
|
||
"""获取构建的唯一键"""
|
||
return (task_id, build_number)
|
||
|
||
def create_build_stream(self, task_id: str, build_number: int):
|
||
"""为新构建创建日志流"""
|
||
build_key = self.get_build_key(task_id, build_number)
|
||
|
||
with self._queues_lock:
|
||
if build_key not in self._log_queues:
|
||
self._log_queues[build_key] = queue.Queue(maxsize=20000) # 最大缓存20000条日志
|
||
logger.info(f"Created log stream for build {task_id}#{build_number}")
|
||
|
||
with self._clients_lock:
|
||
if build_key not in self._sse_clients:
|
||
self._sse_clients[build_key] = set()
|
||
|
||
def add_sse_client(self, task_id: str, build_number: int, client_id: str):
|
||
"""添加SSE客户端"""
|
||
build_key = self.get_build_key(task_id, build_number)
|
||
|
||
with self._clients_lock:
|
||
if build_key not in self._sse_clients:
|
||
self._sse_clients[build_key] = set()
|
||
self._sse_clients[build_key].add(client_id)
|
||
logger.info(f"Added SSE client {client_id} for build {task_id}#{build_number}")
|
||
|
||
def remove_sse_client(self, task_id: str, build_number: int, client_id: str):
|
||
"""移除SSE客户端"""
|
||
build_key = self.get_build_key(task_id, build_number)
|
||
|
||
with self._clients_lock:
|
||
if build_key in self._sse_clients:
|
||
self._sse_clients[build_key].discard(client_id)
|
||
logger.info(f"Removed SSE client {client_id} for build {task_id}#{build_number}")
|
||
|
||
if not self._sse_clients[build_key]:
|
||
del self._sse_clients[build_key]
|
||
self._cleanup_build_stream(build_key)
|
||
|
||
def _cleanup_build_stream(self, build_key: tuple):
|
||
"""清理构建流资源"""
|
||
with self._queues_lock:
|
||
if build_key in self._log_queues:
|
||
del self._log_queues[build_key]
|
||
logger.info(f"Cleaned up log stream for build {build_key[0]}#{build_key[1]}")
|
||
|
||
def push_log(self, task_id: str, build_number: int, message: str, stage: Optional[str] = None):
|
||
"""推送日志消息到流"""
|
||
build_key = self.get_build_key(task_id, build_number)
|
||
|
||
# 创建日志消息
|
||
log_msg = LogMessage(
|
||
task_id=task_id,
|
||
build_number=build_number,
|
||
message=message,
|
||
stage=stage
|
||
)
|
||
|
||
# 推送到队列
|
||
with self._queues_lock:
|
||
if build_key not in self._log_queues:
|
||
# 如果队列不存在,先创建
|
||
self._log_queues[build_key] = queue.Queue(maxsize=20000)
|
||
logger.debug(f"Created log queue for build {task_id}#{build_number} during push")
|
||
|
||
try:
|
||
# 非阻塞推送,如果队列满了就丢弃最老的消息
|
||
if self._log_queues[build_key].full():
|
||
try:
|
||
self._log_queues[build_key].get_nowait() # 移除最老的消息
|
||
except queue.Empty:
|
||
pass
|
||
|
||
self._log_queues[build_key].put_nowait(log_msg)
|
||
except queue.Full:
|
||
logger.warning(f"Log queue full for build {task_id}#{build_number}, dropping message")
|
||
|
||
def get_log_stream(self, task_id: str, build_number: int, client_id: str):
|
||
"""获取日志流生成器"""
|
||
build_key = self.get_build_key(task_id, build_number)
|
||
|
||
# 确保流存在
|
||
self.create_build_stream(task_id, build_number)
|
||
self.add_sse_client(task_id, build_number, client_id)
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
# 检查客户端是否还在连接
|
||
with self._clients_lock:
|
||
if build_key not in self._sse_clients or client_id not in self._sse_clients[build_key]:
|
||
logger.info(f"Client {client_id} disconnected from build {task_id}#{build_number}")
|
||
break
|
||
|
||
# 获取日志队列
|
||
with self._queues_lock:
|
||
log_queue = self._log_queues.get(build_key)
|
||
|
||
if log_queue is None:
|
||
break
|
||
|
||
try:
|
||
# 等待新日志消息,超时时间为1秒
|
||
log_msg = log_queue.get(timeout=1.0)
|
||
yield log_msg
|
||
except queue.Empty:
|
||
# 超时,发送心跳
|
||
yield None # None表示心跳
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in log stream for {task_id}#{build_number}: {str(e)}")
|
||
break
|
||
finally:
|
||
# 清理客户端
|
||
self.remove_sse_client(task_id, build_number, client_id)
|
||
|
||
def complete_build(self, task_id: str, build_number: int, status: str):
|
||
"""标记构建完成"""
|
||
build_key = self.get_build_key(task_id, build_number)
|
||
|
||
# 推送完成消息
|
||
completion_msg = LogMessage(
|
||
task_id=task_id,
|
||
build_number=build_number,
|
||
message=f"BUILD_COMPLETE:{status}",
|
||
stage="SYSTEM"
|
||
)
|
||
|
||
with self._queues_lock:
|
||
if build_key in self._log_queues:
|
||
try:
|
||
self._log_queues[build_key].put_nowait(completion_msg)
|
||
except queue.Full:
|
||
pass
|
||
|
||
def has_active_clients(self, task_id: str, build_number: int) -> bool:
|
||
"""检查是否有活跃的SSE客户端"""
|
||
build_key = self.get_build_key(task_id, build_number)
|
||
|
||
with self._clients_lock:
|
||
return build_key in self._sse_clients and len(self._sse_clients[build_key]) > 0
|
||
|
||
# 全局单例实例
|
||
log_stream_manager = LogStreamManager() |