#!/usr/bin/env python3 """ Agent Session Monitor - Web Server 提供浏览器访问的观测界面 """ import argparse import json import sys from pathlib import Path from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from collections import defaultdict from datetime import datetime, timedelta import re # 添加父目录到path以导入cli模块 sys.path.insert(0, str(Path(__file__).parent.parent)) try: from scripts.cli import SessionAnalyzer, TOKEN_PRICING except ImportError: # 如果导入失败,定义简单版本 TOKEN_PRICING = { "Qwen3-rerank": {"input": 0.0003, "output": 0.0012}, "DeepSeek-R1": {"input": 0.004, "output": 0.012, "reasoning": 0.002}, } class SessionMonitorHandler(BaseHTTPRequestHandler): """HTTP请求处理器""" def __init__(self, *args, data_dir=None, **kwargs): self.data_dir = Path(data_dir) if data_dir else Path("./sessions") super().__init__(*args, **kwargs) def do_GET(self): """处理GET请求""" parsed_path = urlparse(self.path) path = parsed_path.path query = parse_qs(parsed_path.query) if path == '/' or path == '/index.html': self.serve_index() elif path == '/session': session_id = query.get('id', [None])[0] if session_id: self.serve_session_detail(session_id) else: self.send_error(400, "Missing session id") elif path == '/api/sessions': self.serve_api_sessions() elif path == '/api/session': session_id = query.get('id', [None])[0] if session_id: self.serve_api_session(session_id) else: self.send_error(400, "Missing session id") elif path == '/api/stats': self.serve_api_stats() else: self.send_error(404, "Not Found") def serve_index(self): """首页 - 总览""" html = self.generate_index_html() self.send_html(html) def serve_session_detail(self, session_id: str): """Session详情页""" html = self.generate_session_html(session_id) self.send_html(html) def serve_api_sessions(self): """API: 获取所有session列表""" sessions = self.load_all_sessions() # 简化数据 data = [] for session in sessions: data.append({ 'session_id': session['session_id'], 'model': session.get('model', 'unknown'), 'messages_count': session.get('messages_count', 0), 'total_tokens': session['total_input_tokens'] + session['total_output_tokens'], 'updated_at': session.get('updated_at', ''), 'cost': self.calculate_cost(session) }) # 按更新时间降序排序 data.sort(key=lambda x: x['updated_at'], reverse=True) self.send_json(data) def serve_api_session(self, session_id: str): """API: 获取指定session的详细数据""" session = self.load_session(session_id) if session: session['cost'] = self.calculate_cost(session) self.send_json(session) else: self.send_error(404, "Session not found") def serve_api_stats(self): """API: 获取统计数据""" sessions = self.load_all_sessions() # 按模型统计 by_model = defaultdict(lambda: { 'count': 0, 'input_tokens': 0, 'output_tokens': 0, 'cost': 0.0 }) # 按日期统计 by_date = defaultdict(lambda: { 'count': 0, 'input_tokens': 0, 'output_tokens': 0, 'cost': 0.0, 'models': set() }) total_cost = 0.0 for session in sessions: model = session.get('model', 'unknown') cost = self.calculate_cost(session) total_cost += cost # 按模型 by_model[model]['count'] += 1 by_model[model]['input_tokens'] += session['total_input_tokens'] by_model[model]['output_tokens'] += session['total_output_tokens'] by_model[model]['cost'] += cost # 按日期 created_at = session.get('created_at', '') date_key = created_at[:10] if len(created_at) >= 10 else 'unknown' by_date[date_key]['count'] += 1 by_date[date_key]['input_tokens'] += session['total_input_tokens'] by_date[date_key]['output_tokens'] += session['total_output_tokens'] by_date[date_key]['cost'] += cost by_date[date_key]['models'].add(model) # 转换sets为lists for date in by_date: by_date[date]['models'] = list(by_date[date]['models']) stats = { 'total_sessions': len(sessions), 'total_cost': total_cost, 'by_model': dict(by_model), 'by_date': dict(sorted(by_date.items(), reverse=True)) } self.send_json(stats) def load_session(self, session_id: str): """加载指定session""" session_file = self.data_dir / f"{session_id}.json" if session_file.exists(): with open(session_file, 'r', encoding='utf-8') as f: return json.load(f) return None def load_all_sessions(self): """加载所有session""" sessions = [] for session_file in self.data_dir.glob("*.json"): try: with open(session_file, 'r', encoding='utf-8') as f: sessions.append(json.load(f)) except Exception as e: print(f"Warning: Failed to load {session_file}: {e}", file=sys.stderr) return sessions def calculate_cost(self, session: dict) -> float: """计算session成本""" model = session.get('model', 'unknown') pricing = TOKEN_PRICING.get(model, TOKEN_PRICING.get("GPT-4", {"input": 0.003, "output": 0.006})) input_tokens = session['total_input_tokens'] output_tokens = session['total_output_tokens'] reasoning_tokens = session.get('total_reasoning_tokens', 0) cached_tokens = session.get('total_cached_tokens', 0) # 区分regular input和cached input regular_input_tokens = input_tokens - cached_tokens input_cost = regular_input_tokens * pricing.get('input', 0) / 1000000 output_cost = output_tokens * pricing.get('output', 0) / 1000000 reasoning_cost = 0 if 'reasoning' in pricing and reasoning_tokens > 0: reasoning_cost = reasoning_tokens * pricing['reasoning'] / 1000000 cached_cost = 0 if 'cached' in pricing and cached_tokens > 0: cached_cost = cached_tokens * pricing['cached'] / 1000000 return input_cost + output_cost + reasoning_cost + cached_cost def send_html(self, html: str): """发送HTML响应""" self.send_response(200) self.send_header('Content-type', 'text/html; charset=utf-8') self.end_headers() self.wfile.write(html.encode('utf-8')) def send_json(self, data): """发送JSON响应""" self.send_response(200) self.send_header('Content-type', 'application/json; charset=utf-8') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8')) def generate_index_html(self) -> str: """生成首页HTML""" return ''' Agent Session Monitor

🔍 Agent Session Monitor

实时观测Clawdbot对话过程和Token开销

总会话数
-
总Token消耗
-
总成本
-

📊 最近会话

加载中...

📈 按模型统计

加载中...
''' def generate_session_html(self, session_id: str) -> str: """生成Session详情页HTML""" session = self.load_session(session_id) if not session: return f'

Session not found: {session_id}

' cost = self.calculate_cost(session) # 生成对话轮次HTML rounds_html = [] for r in session.get('rounds', []): messages_html = '' if r.get('messages'): messages_html = '
' for msg in r['messages'][-5:]: # 最多显示5条 role = msg.get('role', 'unknown') content = msg.get('content', '') messages_html += f'
[{role}] {self.escape_html(content)}
' messages_html += '
' tool_calls_html = '' if r.get('tool_calls'): tool_calls_html = '
🛠️ Tool Calls:
' # Token详情显示 token_details_html = '' if r.get('input_token_details') or r.get('output_token_details'): token_details_html = '
📊 Token Details:
' # Token类型标签 token_badges = '' if r.get('cached_tokens', 0) > 0: token_badges += f' 📦 {r["cached_tokens"]:,} cached' if r.get('reasoning_tokens', 0) > 0: token_badges += f' 🧠 {r["reasoning_tokens"]:,} reasoning' rounds_html.append(f'''
Round {r['round']} {r['timestamp']} {r['input_tokens']:,} in → {r['output_tokens']:,} out{token_badges}
{messages_html} {f'
❓ Question: {self.escape_html(r.get("question", ""))}
' if r.get('question') else ''} {f'
✅ Answer: {self.escape_html(r.get("answer", ""))}
' if r.get('answer') else ''} {f'
🧠 Reasoning: {self.escape_html(r.get("reasoning", ""))}
' if r.get('reasoning') else ''} {tool_calls_html} {token_details_html}
''') return f''' {session_id} - Session Monitor
← 返回列表

📊 Session Detail

{session_id}

模型
{session.get('model', 'unknown')}
消息数
{session.get('messages_count', 0)}
总Token
{session['total_input_tokens'] + session['total_output_tokens']:,}
成本
${cost:.6f}

💬 对话记录 ({len(session.get('rounds', []))} 轮)

{"".join(rounds_html) if rounds_html else '

暂无对话记录

'}
''' def escape_html(self, text: str) -> str: """转义HTML特殊字符""" return (text.replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"') .replace("'", ''')) def log_message(self, format, *args): """重写日志方法,简化输出""" pass # 不打印每个请求 def create_handler(data_dir): """创建带数据目录的处理器""" def handler(*args, **kwargs): return SessionMonitorHandler(*args, data_dir=data_dir, **kwargs) return handler def main(): parser = argparse.ArgumentParser( description="Agent Session Monitor - Web Server", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--data-dir', default='./sessions', help='Session数据目录(默认: ./sessions)' ) parser.add_argument( '--port', type=int, default=8888, help='HTTP服务器端口(默认: 8888)' ) parser.add_argument( '--host', default='0.0.0.0', help='HTTP服务器地址(默认: 0.0.0.0)' ) args = parser.parse_args() # 检查数据目录是否存在 data_dir = Path(args.data_dir) if not data_dir.exists(): print(f"❌ Error: Data directory not found: {data_dir}") print(f" Please run main.py first to generate session data.") sys.exit(1) # 创建HTTP服务器 handler_class = create_handler(args.data_dir) server = HTTPServer((args.host, args.port), handler_class) print(f"{'=' * 60}") print(f"🌐 Agent Session Monitor - Web Server") print(f"{'=' * 60}") print() print(f"📂 Data directory: {args.data_dir}") print(f"🌍 Server address: http://{args.host}:{args.port}") print() print(f"✅ Server started. Press Ctrl+C to stop.") print(f"{'=' * 60}") print() try: server.serve_forever() except KeyboardInterrupt: print("\n\n👋 Shutting down server...") server.shutdown() if __name__ == '__main__': main()