#!/usr/bin/env python3 """ Agent Session Monitor - Web Server 提供浏览器访问的观测界面 """ import argparse import json import sys from pathlib import Path from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from collections import defaultdict from datetime import datetime, timedelta import re # 添加父目录到path以导入cli模块 sys.path.insert(0, str(Path(__file__).parent.parent)) try: from scripts.cli import SessionAnalyzer, TOKEN_PRICING except ImportError: # 如果导入失败,定义简单版本 TOKEN_PRICING = { "Qwen3-rerank": {"input": 0.0003, "output": 0.0012}, "DeepSeek-R1": {"input": 0.004, "output": 0.012, "reasoning": 0.002}, } class SessionMonitorHandler(BaseHTTPRequestHandler): """HTTP请求处理器""" def __init__(self, *args, data_dir=None, **kwargs): self.data_dir = Path(data_dir) if data_dir else Path("./sessions") super().__init__(*args, **kwargs) def do_GET(self): """处理GET请求""" parsed_path = urlparse(self.path) path = parsed_path.path query = parse_qs(parsed_path.query) if path == '/' or path == '/index.html': self.serve_index() elif path == '/session': session_id = query.get('id', [None])[0] if session_id: self.serve_session_detail(session_id) else: self.send_error(400, "Missing session id") elif path == '/api/sessions': self.serve_api_sessions() elif path == '/api/session': session_id = query.get('id', [None])[0] if session_id: self.serve_api_session(session_id) else: self.send_error(400, "Missing session id") elif path == '/api/stats': self.serve_api_stats() else: self.send_error(404, "Not Found") def serve_index(self): """首页 - 总览""" html = self.generate_index_html() self.send_html(html) def serve_session_detail(self, session_id: str): """Session详情页""" html = self.generate_session_html(session_id) self.send_html(html) def serve_api_sessions(self): """API: 获取所有session列表""" sessions = self.load_all_sessions() # 简化数据 data = [] for session in sessions: data.append({ 'session_id': session['session_id'], 'model': session.get('model', 'unknown'), 'messages_count': session.get('messages_count', 0), 'total_tokens': session['total_input_tokens'] + session['total_output_tokens'], 'updated_at': session.get('updated_at', ''), 'cost': self.calculate_cost(session) }) # 按更新时间降序排序 data.sort(key=lambda x: x['updated_at'], reverse=True) self.send_json(data) def serve_api_session(self, session_id: str): """API: 获取指定session的详细数据""" session = self.load_session(session_id) if session: session['cost'] = self.calculate_cost(session) self.send_json(session) else: self.send_error(404, "Session not found") def serve_api_stats(self): """API: 获取统计数据""" sessions = self.load_all_sessions() # 按模型统计 by_model = defaultdict(lambda: { 'count': 0, 'input_tokens': 0, 'output_tokens': 0, 'cost': 0.0 }) # 按日期统计 by_date = defaultdict(lambda: { 'count': 0, 'input_tokens': 0, 'output_tokens': 0, 'cost': 0.0, 'models': set() }) total_cost = 0.0 for session in sessions: model = session.get('model', 'unknown') cost = self.calculate_cost(session) total_cost += cost # 按模型 by_model[model]['count'] += 1 by_model[model]['input_tokens'] += session['total_input_tokens'] by_model[model]['output_tokens'] += session['total_output_tokens'] by_model[model]['cost'] += cost # 按日期 created_at = session.get('created_at', '') date_key = created_at[:10] if len(created_at) >= 10 else 'unknown' by_date[date_key]['count'] += 1 by_date[date_key]['input_tokens'] += session['total_input_tokens'] by_date[date_key]['output_tokens'] += session['total_output_tokens'] by_date[date_key]['cost'] += cost by_date[date_key]['models'].add(model) # 转换sets为lists for date in by_date: by_date[date]['models'] = list(by_date[date]['models']) stats = { 'total_sessions': len(sessions), 'total_cost': total_cost, 'by_model': dict(by_model), 'by_date': dict(sorted(by_date.items(), reverse=True)) } self.send_json(stats) def load_session(self, session_id: str): """加载指定session""" session_file = self.data_dir / f"{session_id}.json" if session_file.exists(): with open(session_file, 'r', encoding='utf-8') as f: return json.load(f) return None def load_all_sessions(self): """加载所有session""" sessions = [] for session_file in self.data_dir.glob("*.json"): try: with open(session_file, 'r', encoding='utf-8') as f: sessions.append(json.load(f)) except Exception as e: print(f"Warning: Failed to load {session_file}: {e}", file=sys.stderr) return sessions def calculate_cost(self, session: dict) -> float: """计算session成本""" model = session.get('model', 'unknown') pricing = TOKEN_PRICING.get(model, TOKEN_PRICING.get("GPT-4", {"input": 0.003, "output": 0.006})) input_tokens = session['total_input_tokens'] output_tokens = session['total_output_tokens'] reasoning_tokens = session.get('total_reasoning_tokens', 0) cached_tokens = session.get('total_cached_tokens', 0) # 区分regular input和cached input regular_input_tokens = input_tokens - cached_tokens input_cost = regular_input_tokens * pricing.get('input', 0) / 1000000 output_cost = output_tokens * pricing.get('output', 0) / 1000000 reasoning_cost = 0 if 'reasoning' in pricing and reasoning_tokens > 0: reasoning_cost = reasoning_tokens * pricing['reasoning'] / 1000000 cached_cost = 0 if 'cached' in pricing and cached_tokens > 0: cached_cost = cached_tokens * pricing['cached'] / 1000000 return input_cost + output_cost + reasoning_cost + cached_cost def send_html(self, html: str): """发送HTML响应""" self.send_response(200) self.send_header('Content-type', 'text/html; charset=utf-8') self.end_headers() self.wfile.write(html.encode('utf-8')) def send_json(self, data): """发送JSON响应""" self.send_response(200) self.send_header('Content-type', 'application/json; charset=utf-8') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8')) def generate_index_html(self) -> str: """生成首页HTML""" return '''
实时观测Clawdbot对话过程和Token开销
{session_id}
暂无对话记录
'}