feat(skill): add agent-session-monitor skill for LLM observability (#3426)

This commit is contained in:
澄潭
2026-02-01 12:23:15 +08:00
committed by GitHub
parent 0c0ec53a50
commit f288ddf444
12 changed files with 3010 additions and 0 deletions

View File

@@ -0,0 +1,138 @@
# Agent Session Monitor - Quick Start
实时Agent对话观测程序用于监控Higress访问日志追踪多轮对话的token开销和模型使用情况。
## 快速开始
### 1. 运行Demo
```bash
cd example
bash demo.sh
```
这将:
- 解析示例日志文件
- 列出所有session
- 显示session详细信息包括完整的messages、question、answer、reasoning、tool_calls
- 按模型和日期统计token开销
- 导出FinOps报表
### 2. 启动Web界面推荐
```bash
# 先解析日志生成session数据
python3 main.py --log-path /var/log/higress/access.log --output-dir ./sessions
# 启动Web服务器
python3 scripts/webserver.py --data-dir ./sessions --port 8888
# 浏览器访问
open http://localhost:8888
```
Web界面功能
- 📊 总览所有session按模型分组统计
- 🔍 点击session ID下钻查看完整对话
- 💬 查看每轮的messages、question、answer、reasoning、tool_calls
- 💰 实时计算token开销和成本
- 🔄 每30秒自动刷新
### 3. 在Clawdbot对话中使用
当用户询问当前会话token消耗时生成观测链接
```
你的当前会话ID: agent:main:discord:channel:1465367993012981988
查看详情http://localhost:8888/session?id=agent:main:discord:channel:1465367993012981988
点击可以看到:
✅ 完整对话历史每轮messages
✅ Token消耗明细
✅ 工具调用记录
✅ 成本统计
```
### 4. 使用CLI查询可选
```bash
# 查看session详细信息
python3 scripts/cli.py show <session-id>
# 列出所有session
python3 scripts/cli.py list
# 按模型统计
python3 scripts/cli.py stats-model
# 导出报表
python3 scripts/cli.py export finops-report.json
```
## 核心功能
**完整对话追踪**记录每轮对话的完整messages、question、answer、reasoning、tool_calls
**Token开销统计**区分input/output/reasoning/cached token实时计算成本
**Session聚合**按session_id关联多轮对话
**Web可视化界面**:浏览器访问,总览+下钻查看session详情
**实时URL生成**Clawdbot可根据当前会话ID生成观测链接
**FinOps报表**导出JSON/CSV格式的成本分析报告
## 日志格式要求
Higress访问日志需要包含ai_log字段JSON格式示例
```json
{
"__file_offset__": "1000",
"timestamp": "2026-02-01T09:30:15Z",
"ai_log": "{\"session_id\":\"sess_abc\",\"messages\":[...],\"question\":\"...\",\"answer\":\"...\",\"input_token\":250,\"output_token\":160,\"model\":\"Qwen3-rerank\"}"
}
```
ai_log字段支持的属性
- `session_id`: 会话标识(必需)
- `messages`: 完整对话历史
- `question`: 当前轮次问题
- `answer`: AI回答
- `reasoning`: 思考过程DeepSeek等模型
- `tool_calls`: 工具调用列表
- `input_token`: 输入token数
- `output_token`: 输出token数
- `model`: 模型名称
- `response_type`: 响应类型
## 输出目录结构
```
sessions/
├── agent:main:discord:1465367993012981988.json
└── agent:test:discord:9999999999999999999.json
```
每个session文件包含
- 基本信息(创建时间、更新时间、模型)
- Token统计总输入、总输出、总reasoning、总cached
- 对话轮次列表每轮的完整messages、question、answer、reasoning、tool_calls
## 常见问题
**Q: 如何在Higress中配置session_id header**
A: 在ai-statistics插件中配置`session_id_header`或使用默认headerx-openclaw-session-key、x-clawdbot-session-key等。详见PR #3420
**Q: 支持哪些模型的pricing**
A: 目前支持Qwen、DeepSeek、GPT-4、Claude等主流模型。可以在main.py的TOKEN_PRICING字典中添加新模型。
**Q: 如何实时监控日志文件变化?**
A: 直接运行main.py即可程序使用定时轮询机制每秒自动检查一次无需安装额外依赖。
**Q: CLI查询速度慢**
A: 大量session时可以使用`--limit`限制结果数量,或按条件过滤(如`--sort-by cost`只查看成本最高的session
## 下一步
- 集成到Higress FinOps Dashboard
- 支持更多模型的pricing
- 添加趋势预测和异常检测
- 支持多数据源聚合分析

View File

@@ -0,0 +1,71 @@
# Agent Session Monitor
Real-time agent conversation monitoring for Clawdbot, designed to monitor Higress access logs and track token usage across multi-turn conversations.
## Features
- 🔍 **Complete Conversation Tracking**: Records messages, question, answer, reasoning, tool_calls for each turn
- 💰 **Token Usage Statistics**: Distinguishes input/output/reasoning/cached tokens, calculates costs in real-time
- 🌐 **Web Visualization**: Browser-based UI with overview and drill-down into session details
- 🔗 **Real-time URL Generation**: Clawdbot can generate observation links based on current session ID
- 🔄 **Log Rotation Support**: Automatically handles rotated log files (access.log, access.log.1, etc.)
- 📊 **FinOps Reporting**: Export usage data in JSON/CSV formats
## Quick Start
### 1. Run Demo
```bash
cd example
bash demo.sh
```
### 2. Start Web UI
```bash
# Parse logs
python3 main.py --log-path /var/log/higress/access.log --output-dir ./sessions
# Start web server
python3 scripts/webserver.py --data-dir ./sessions --port 8888
# Access in browser
open http://localhost:8888
```
### 3. Use in Clawdbot
When users ask "How many tokens did this conversation use?", you can respond with:
```
Your current session statistics:
- Session ID: agent:main:discord:channel:1465367993012981988
- View details: http://localhost:8888/session?id=agent:main:discord:channel:1465367993012981988
Click to see:
✅ Complete conversation history
✅ Token usage breakdown per turn
✅ Tool call records
✅ Cost statistics
```
## Files
- `main.py`: Background monitor, parses Higress access logs
- `scripts/webserver.py`: Web server, provides browser-based UI
- `scripts/cli.py`: Command-line tools for queries and exports
- `example/`: Demo examples and test data
## Dependencies
- Python 3.8+
- No external dependencies (uses only standard library)
## Documentation
- `SKILL.md`: Main skill documentation
- `QUICKSTART.md`: Quick start guide
## License
MIT

View File

@@ -0,0 +1,384 @@
---
name: agent-session-monitor
description: Real-time agent conversation monitoring - monitors Higress access logs, aggregates conversations by session, tracks token usage. Supports web interface for viewing complete conversation history and costs. Use when users ask about current session token consumption, conversation history, or cost statistics.
---
## Overview
Real-time monitoring of Higress access logs, extracting ai_log JSON, grouping multi-turn conversations by session_id, and calculating token costs with visualization.
### Core Features
- **Real-time Log Monitoring**: Monitors Higress access log files, parses new ai_log entries in real-time
- **Log Rotation Support**: Full logrotate support, automatically tracks access.log.1~5 etc.
- **Incremental Parsing**: Inode-based tracking, processes only new content, no duplicates
- **Session Grouping**: Associates multi-turn conversations by session_id (each turn is a separate request)
- **Complete Conversation Tracking**: Records messages, question, answer, reasoning, tool_calls for each turn
- **Token Usage Tracking**: Distinguishes input/output/reasoning/cached tokens
- **Web Visualization**: Browser-based UI with overview and session drill-down
- **Real-time URL Generation**: Clawdbot can generate observation links based on current session ID
- **Background Processing**: Independent process, continuously parses access logs
- **State Persistence**: Maintains parsing progress and session data across runs
## Usage
### 1. Background Monitoring (Continuous)
```bash
# Parse Higress access logs (with log rotation support)
python3 main.py --log-path /var/log/proxy/access.log --output-dir ./sessions
# Filter by session key
python3 main.py --log-path /var/log/proxy/access.log --session-key <session-id>
# Scheduled task (incremental parsing every minute)
* * * * * python3 /path/to/main.py --log-path /var/log/proxy/access.log --output-dir /var/lib/sessions
```
**Log Rotation Notes**:
- Automatically scans `access.log`, `access.log.1`, `access.log.2`, etc.
- Uses inode tracking to identify files even after renaming
- State persistence prevents duplicate parsing
- Session data accumulates correctly across multiple runs
See: [LOG_ROTATION.md](LOG_ROTATION.md)
### 2. Start Web UI (Recommended)
```bash
# Start web server
python3 scripts/webserver.py --data-dir ./sessions --port 8888
# Access in browser
open http://localhost:8888
```
Web UI features:
- 📊 Overview: View all session statistics and group by model
- 🔍 Session Details: Click session ID to drill down into complete conversation history
- 💬 Conversation Log: Display messages, question, answer, reasoning, tool_calls for each turn
- 💰 Cost Statistics: Real-time token usage and cost calculation
- 🔄 Auto Refresh: Updates every 30 seconds
### 3. Use in Clawdbot Conversations
When users ask about current session token consumption or conversation history:
1. Get current session_id (from runtime or context)
2. Generate web UI URL and return to user
Example response:
```
Your current session statistics:
- Session ID: agent:main:discord:channel:1465367993012981988
- View details: http://localhost:8888/session?id=agent:main:discord:channel:1465367993012981988
Click the link to see:
✅ Complete conversation history
✅ Token usage breakdown per turn
✅ Tool call records
✅ Cost statistics
```
### 4. CLI Queries (Optional)
```bash
# View specific session details
python3 scripts/cli.py show <session-id>
# List all sessions
python3 scripts/cli.py list --sort-by cost --limit 10
# Statistics by model
python3 scripts/cli.py stats-model
# Statistics by date (last 7 days)
python3 scripts/cli.py stats-date --days 7
# Export reports
python3 scripts/cli.py export finops-report.json
```
## Configuration
### main.py (Background Monitor)
| Parameter | Description | Required | Default |
|-----------|-------------|----------|---------|
| `--log-path` | Higress access log file path | Yes | /var/log/higress/access.log |
| `--output-dir` | Session data storage directory | No | ./sessions |
| `--session-key` | Monitor only specified session key | No | Monitor all sessions |
| `--state-file` | State file path (records read offsets) | No | <output-dir>/.state.json |
| `--refresh-interval` | Log refresh interval (seconds) | No | 1 |
### webserver.py (Web UI)
| Parameter | Description | Required | Default |
|-----------|-------------|----------|---------|
| `--data-dir` | Session data directory | No | ./sessions |
| `--port` | HTTP server port | No | 8888 |
| `--host` | HTTP server address | No | 0.0.0.0 |
## Output Examples
### 1. Real-time Monitor
```
🔍 Session Monitor - Active
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📊 Active Sessions: 3
┌──────────────────────────┬─────────┬──────────┬───────────┐
│ Session ID │ Msgs │ Input │ Output │
├──────────────────────────┼─────────┼──────────┼───────────┤
│ sess_abc123 │ 5 │ 1,250 │ 800 │
│ sess_xyz789 │ 3 │ 890 │ 650 │
│ sess_def456 │ 8 │ 2,100 │ 1,200 │
└──────────────────────────┴─────────┴──────────┴───────────┘
📈 Token Statistics
Total Input: 4240 tokens
Total Output: 2650 tokens
Total Cached: 0 tokens
Total Cost: $0.00127
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
```
### 2. CLI Session Details
```bash
$ python3 scripts/cli.py show agent:main:discord:channel:1465367993012981988
======================================================================
📊 Session Detail: agent:main:discord:channel:1465367993012981988
======================================================================
🕐 Created: 2026-02-01T09:30:00+08:00
🕑 Updated: 2026-02-01T10:35:12+08:00
🤖 Model: Qwen3-rerank
💬 Messages: 5
📈 Token Statistics:
Input: 1,250 tokens
Output: 800 tokens
Reasoning: 150 tokens
Total: 2,200 tokens
💰 Estimated Cost: $0.00126000 USD
📝 Conversation Rounds (5):
──────────────────────────────────────────────────────────────────────
Round 1 @ 2026-02-01T09:30:15+08:00
Tokens: 250 in → 160 out
🔧 Tool calls: Yes
Messages (2):
[user] Check Beijing weather
❓ Question: Check Beijing weather
✅ Answer: Checking Beijing weather for you...
🧠 Reasoning: User wants to know Beijing weather, I need to call weather API.
🛠️ Tool Calls:
- get_weather({"location":"Beijing"})
```
### 3. Statistics by Model
```bash
$ python3 scripts/cli.py stats-model
================================================================================
📊 Statistics by Model
================================================================================
Model Sessions Input Output Cost (USD)
────────────────────────────────────────────────────────────────────────────
Qwen3-rerank 12 15,230 9,840 $ 0.016800
DeepSeek-R1 5 8,450 6,200 $ 0.010600
Qwen-Max 3 4,200 3,100 $ 0.008300
GPT-4 2 2,100 1,800 $ 0.017100
────────────────────────────────────────────────────────────────────────────
TOTAL 22 29,980 20,940 $ 0.052800
================================================================================
```
### 4. Statistics by Date
```bash
$ python3 scripts/cli.py stats-date --days 7
================================================================================
📊 Statistics by Date (Last 7 days)
================================================================================
Date Sessions Input Output Cost (USD) Models
────────────────────────────────────────────────────────────────────────────
2026-01-26 3 2,100 1,450 $ 0.0042 Qwen3-rerank
2026-01-27 5 4,850 3,200 $ 0.0096 Qwen3-rerank, GPT-4
2026-01-28 4 3,600 2,800 $ 0.0078 DeepSeek-R1, Qwen
────────────────────────────────────────────────────────────────────────────
TOTAL 22 29,980 20,940 $ 0.0528
================================================================================
```
### 5. Web UI (Recommended)
Access `http://localhost:8888` to see:
**Home Page:**
- 📊 Total sessions, token consumption, cost cards
- 📋 Recent sessions list (clickable for details)
- 📈 Statistics by model table
**Session Detail Page:**
- 💬 Complete conversation log (messages, question, answer, reasoning, tool_calls per turn)
- 🔧 Tool call history
- 💰 Token usage breakdown and costs
**Features:**
- 🔄 Auto-refresh every 30 seconds
- 📱 Responsive design, mobile-friendly
- 🎨 Clean UI, easy to read
## Session Data Structure
Each session is stored as an independent JSON file with complete conversation history and token statistics:
```json
{
"session_id": "agent:main:discord:channel:1465367993012981988",
"created_at": "2026-02-01T10:30:00Z",
"updated_at": "2026-02-01T10:35:12Z",
"messages_count": 5,
"total_input_tokens": 1250,
"total_output_tokens": 800,
"total_reasoning_tokens": 150,
"total_cached_tokens": 0,
"model": "Qwen3-rerank",
"rounds": [
{
"round": 1,
"timestamp": "2026-02-01T10:30:15Z",
"input_tokens": 250,
"output_tokens": 160,
"reasoning_tokens": 0,
"cached_tokens": 0,
"model": "Qwen3-rerank",
"has_tool_calls": true,
"response_type": "normal",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant..."
},
{
"role": "user",
"content": "Check Beijing weather"
}
],
"question": "Check Beijing weather",
"answer": "Checking Beijing weather for you...",
"reasoning": "User wants to know Beijing weather, need to call weather API.",
"tool_calls": [
{
"index": 0,
"id": "call_abc123",
"type": "function",
"function": {
"name": "get_weather",
"arguments": "{\"location\":\"Beijing\"}"
}
}
],
"input_token_details": {"cached_tokens": 0},
"output_token_details": {}
}
]
}
```
### Field Descriptions
**Session Level:**
- `session_id`: Unique session identifier (from ai_log's session_id field)
- `created_at`: Session creation time
- `updated_at`: Last update time
- `messages_count`: Number of conversation turns
- `total_input_tokens`: Cumulative input tokens
- `total_output_tokens`: Cumulative output tokens
- `total_reasoning_tokens`: Cumulative reasoning tokens (DeepSeek, o1, etc.)
- `total_cached_tokens`: Cumulative cached tokens (prompt caching)
- `model`: Current model in use
**Round Level (rounds):**
- `round`: Turn number
- `timestamp`: Current turn timestamp
- `input_tokens`: Input tokens for this turn
- `output_tokens`: Output tokens for this turn
- `reasoning_tokens`: Reasoning tokens (o1, etc.)
- `cached_tokens`: Cached tokens (prompt caching)
- `model`: Model used for this turn
- `has_tool_calls`: Whether includes tool calls
- `response_type`: Response type (normal/error, etc.)
- `messages`: Complete conversation history (OpenAI messages format)
- `question`: User's question for this turn (last user message)
- `answer`: AI's answer for this turn
- `reasoning`: AI's thinking process (if model supports)
- `tool_calls`: Tool call list (if any)
- `input_token_details`: Complete input token details (JSON)
- `output_token_details`: Complete output token details (JSON)
## Log Format Requirements
Higress access logs must include ai_log field (JSON format). Example:
```json
{
"__file_offset__": "1000",
"timestamp": "2026-02-01T09:30:15Z",
"ai_log": "{\"session_id\":\"sess_abc\",\"messages\":[...],\"question\":\"...\",\"answer\":\"...\",\"input_token\":250,\"output_token\":160,\"model\":\"Qwen3-rerank\"}"
}
```
Supported ai_log attributes:
- `session_id`: Session identifier (required)
- `messages`: Complete conversation history
- `question`: Question for current turn
- `answer`: AI answer
- `reasoning`: Thinking process (DeepSeek, o1, etc.)
- `reasoning_tokens`: Reasoning token count (from PR #3424)
- `cached_tokens`: Cached token count (from PR #3424)
- `tool_calls`: Tool call list
- `input_token`: Input token count
- `output_token`: Output token count
- `input_token_details`: Complete input token details (JSON)
- `output_token_details`: Complete output token details (JSON)
- `model`: Model name
- `response_type`: Response type
## Implementation
### Technology Stack
- **Log Parsing**: Direct JSON parsing, no regex needed
- **File Monitoring**: Polling-based (no watchdog dependency)
- **Session Management**: In-memory + disk hybrid storage
- **Token Calculation**: Model-specific pricing for GPT-4, Qwen, Claude, o1, etc.
### Privacy and Security
- ✅ Does not record conversation content in logs, only token statistics
- ✅ Session data stored locally, not uploaded to external services
- ✅ Supports log file path allowlist
- ✅ Session key access control
### Performance Optimization
- Incremental log parsing, avoids full scans
- In-memory session data with periodic persistence
- Optimized log file reading (offset tracking)
- Inode-based file identification (handles rotation efficiently)

View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""
演示如何在Clawdbot中生成Session观测URL
"""
from urllib.parse import quote
def generate_session_url(session_id: str, base_url: str = "http://localhost:8888") -> dict:
"""
生成session观测URL
Args:
session_id: 当前会话的session ID
base_url: Web服务器基础URL
Returns:
包含各种URL的字典
"""
# URL编码session_id处理特殊字符
encoded_id = quote(session_id, safe='')
return {
"session_detail": f"{base_url}/session?id={encoded_id}",
"api_session": f"{base_url}/api/session?id={encoded_id}",
"index": f"{base_url}/",
"api_sessions": f"{base_url}/api/sessions",
"api_stats": f"{base_url}/api/stats",
}
def format_response_message(session_id: str, base_url: str = "http://localhost:8888") -> str:
"""
生成给用户的回复消息
Args:
session_id: 当前会话的session ID
base_url: Web服务器基础URL
Returns:
格式化的回复消息
"""
urls = generate_session_url(session_id, base_url)
return f"""你的当前会话信息:
📊 **Session ID**: `{session_id}`
🔗 **查看详情**: {urls['session_detail']}
点击链接可以看到:
✅ 完整对话历史每轮messages
✅ Token消耗明细input/output/reasoning
✅ 工具调用记录
✅ 实时成本统计
**更多链接:**
- 📋 所有会话: {urls['index']}
- 📥 API数据: {urls['api_session']}
- 📊 总体统计: {urls['api_stats']}
"""
# 示例使用
if __name__ == '__main__':
# 模拟clawdbot的session ID
demo_session_id = "agent:main:discord:channel:1465367993012981988"
print("=" * 70)
print("🤖 Clawdbot Session Monitor Demo")
print("=" * 70)
print()
# 生成URL
urls = generate_session_url(demo_session_id)
print("生成的URL")
print(f" Session详情: {urls['session_detail']}")
print(f" API数据: {urls['api_session']}")
print(f" 总览页面: {urls['index']}")
print()
# 生成回复消息
message = format_response_message(demo_session_id)
print("回复消息模板:")
print("-" * 70)
print(message)
print("-" * 70)
print()
print("✅ 在Clawdbot中你可以直接返回上面的消息给用户")
print()
# 测试特殊字符的session ID
special_session_id = "agent:test:session/with?special&chars"
special_urls = generate_session_url(special_session_id)
print("特殊字符处理示例:")
print(f" 原始ID: {special_session_id}")
print(f" URL: {special_urls['session_detail']}")
print()

View File

@@ -0,0 +1,101 @@
#!/bin/bash
# Agent Session Monitor - 演示脚本
set -e
SKILL_DIR="$(dirname "$(dirname "$(realpath "$0")")")"
EXAMPLE_DIR="$SKILL_DIR/example"
LOG_FILE="$EXAMPLE_DIR/test_access.log"
OUTPUT_DIR="$EXAMPLE_DIR/sessions"
echo "========================================"
echo "Agent Session Monitor - Demo"
echo "========================================"
echo ""
# 清理旧数据
if [ -d "$OUTPUT_DIR" ]; then
echo "🧹 Cleaning up old session data..."
rm -rf "$OUTPUT_DIR"
fi
echo "📂 Log file: $LOG_FILE"
echo "📁 Output dir: $OUTPUT_DIR"
echo ""
# 步骤1解析日志文件单次模式
echo "========================================"
echo "步骤1解析日志文件"
echo "========================================"
python3 "$SKILL_DIR/main.py" \
--log-path "$LOG_FILE" \
--output-dir "$OUTPUT_DIR"
echo ""
echo "✅ 日志解析完成Session数据已保存到: $OUTPUT_DIR"
echo ""
# 步骤2列出所有session
echo "========================================"
echo "步骤2列出所有session"
echo "========================================"
python3 "$SKILL_DIR/scripts/cli.py" list \
--data-dir "$OUTPUT_DIR" \
--limit 10
# 步骤3查看第一个session的详细信息
echo "========================================"
echo "步骤3查看session详细信息"
echo "========================================"
FIRST_SESSION=$(ls -1 "$OUTPUT_DIR"/*.json | head -1 | xargs -I {} basename {} .json)
python3 "$SKILL_DIR/scripts/cli.py" show "$FIRST_SESSION" \
--data-dir "$OUTPUT_DIR"
# 步骤4按模型统计
echo "========================================"
echo "步骤4按模型统计token开销"
echo "========================================"
python3 "$SKILL_DIR/scripts/cli.py" stats-model \
--data-dir "$OUTPUT_DIR"
# 步骤5按日期统计
echo "========================================"
echo "步骤5按日期统计token开销"
echo "========================================"
python3 "$SKILL_DIR/scripts/cli.py" stats-date \
--data-dir "$OUTPUT_DIR" \
--days 7
# 步骤6导出FinOps报表
echo "========================================"
echo "步骤6导出FinOps报表"
echo "========================================"
python3 "$SKILL_DIR/scripts/cli.py" export "$EXAMPLE_DIR/finops-report.json" \
--data-dir "$OUTPUT_DIR" \
--format json
echo ""
echo "✅ 报表已导出到: $EXAMPLE_DIR/finops-report.json"
echo ""
# 显示报表内容
if [ -f "$EXAMPLE_DIR/finops-report.json" ]; then
echo "📊 FinOps报表内容"
echo "========================================"
cat "$EXAMPLE_DIR/finops-report.json" | python3 -m json.tool | head -50
echo "..."
fi
echo ""
echo "========================================"
echo "✅ Demo完成"
echo "========================================"
echo ""
echo "💡 提示:"
echo " - Session数据保存在: $OUTPUT_DIR/"
echo " - FinOps报表: $EXAMPLE_DIR/finops-report.json"
echo " - 使用 'python3 scripts/cli.py --help' 查看更多命令"
echo ""
echo "🌐 启动Web界面查看"
echo " python3 $SKILL_DIR/scripts/webserver.py --data-dir $OUTPUT_DIR --port 8888"
echo " 然后访问: http://localhost:8888"

View File

@@ -0,0 +1,76 @@
#!/bin/bash
# Agent Session Monitor - Demo for PR #3424 token details
set -e
SKILL_DIR="$(dirname "$(dirname "$(realpath "$0")")")"
EXAMPLE_DIR="$SKILL_DIR/example"
LOG_FILE="$EXAMPLE_DIR/test_access_v2.log"
OUTPUT_DIR="$EXAMPLE_DIR/sessions_v2"
echo "========================================"
echo "Agent Session Monitor - Token Details Demo"
echo "========================================"
echo ""
# 清理旧数据
if [ -d "$OUTPUT_DIR" ]; then
echo "🧹 Cleaning up old session data..."
rm -rf "$OUTPUT_DIR"
fi
echo "📂 Log file: $LOG_FILE"
echo "📁 Output dir: $OUTPUT_DIR"
echo ""
# 步骤1解析日志文件
echo "========================================"
echo "步骤1解析日志文件包含token details"
echo "========================================"
python3 "$SKILL_DIR/main.py" \
--log-path "$LOG_FILE" \
--output-dir "$OUTPUT_DIR"
echo ""
echo "✅ 日志解析完成Session数据已保存到: $OUTPUT_DIR"
echo ""
# 步骤2查看使用prompt caching的sessiongpt-4o
echo "========================================"
echo "步骤2查看GPT-4o session包含cached tokens"
echo "========================================"
python3 "$SKILL_DIR/scripts/cli.py" show "agent:main:discord:1465367993012981988" \
--data-dir "$OUTPUT_DIR"
# 步骤3查看使用reasoning的sessiono1
echo "========================================"
echo "步骤3查看o1 session包含reasoning tokens"
echo "========================================"
python3 "$SKILL_DIR/scripts/cli.py" show "agent:main:discord:9999999999999999999" \
--data-dir "$OUTPUT_DIR"
# 步骤4按模型统计
echo "========================================"
echo "步骤4按模型统计包含新token类型"
echo "========================================"
python3 "$SKILL_DIR/scripts/cli.py" stats-model \
--data-dir "$OUTPUT_DIR"
echo ""
echo "========================================"
echo "✅ Demo完成"
echo "========================================"
echo ""
echo "💡 新功能说明:"
echo " ✅ cached_tokens - 缓存命中的token数prompt caching"
echo " ✅ reasoning_tokens - 推理token数o1等模型"
echo " ✅ input_token_details - 完整输入token详情JSON"
echo " ✅ output_token_details - 完整输出token详情JSON"
echo ""
echo "💰 成本计算已优化:"
echo " - cached tokens通常比regular input便宜50-90%折扣)"
echo " - reasoning tokens单独计费o1系列"
echo ""
echo "🌐 启动Web界面查看"
echo " python3 $SKILL_DIR/scripts/webserver.py --data-dir $OUTPUT_DIR --port 8889"
echo " 然后访问: http://localhost:8889"

View File

@@ -0,0 +1,4 @@
{"__file_offset__":"1000","timestamp":"2026-02-01T09:30:15Z","ai_log":"{\"session_id\":\"agent:main:discord:1465367993012981988\",\"api\":\"Qwen3-rerank@higress\",\"api_type\":\"LLM\",\"chat_round\":1,\"consumer\":\"clawdbot\",\"input_token\":250,\"output_token\":160,\"model\":\"Qwen3-rerank\",\"response_type\":\"normal\",\"total_token\":410,\"messages\":[{\"role\":\"system\",\"content\":\"You are a helpful assistant.\"},{\"role\":\"user\",\"content\":\"查询北京天气\"}],\"question\":\"查询北京天气\",\"answer\":\"正在为您查询北京天气...\",\"reasoning\":\"用户想知道北京的天气,我需要调用天气查询工具。\",\"tool_calls\":[{\"index\":0,\"id\":\"call_abc123\",\"type\":\"function\",\"function\":{\"name\":\"get_weather\",\"arguments\":\"{\\\"location\\\":\\\"Beijing\\\"}\"}}]}"}
{"__file_offset__":"2000","timestamp":"2026-02-01T09:32:00Z","ai_log":"{\"session_id\":\"agent:main:discord:1465367993012981988\",\"api\":\"Qwen3-rerank@higress\",\"api_type\":\"LLM\",\"chat_round\":2,\"consumer\":\"clawdbot\",\"input_token\":320,\"output_token\":180,\"model\":\"Qwen3-rerank\",\"response_type\":\"normal\",\"total_token\":500,\"messages\":[{\"role\":\"tool\",\"content\":\"{\\\"temperature\\\": 15, \\\"weather\\\": \\\"晴\\\"}\"}],\"question\":\"\",\"answer\":\"北京今天天气晴朗温度15°C。\",\"reasoning\":\"\",\"tool_calls\":[]}"}
{"__file_offset__":"3000","timestamp":"2026-02-01T09:35:12Z","ai_log":"{\"session_id\":\"agent:main:discord:1465367993012981988\",\"api\":\"Qwen3-rerank@higress\",\"api_type\":\"LLM\",\"chat_round\":3,\"consumer\":\"clawdbot\",\"input_token\":380,\"output_token\":220,\"model\":\"Qwen3-rerank\",\"response_type\":\"normal\",\"total_token\":600,\"messages\":[{\"role\":\"user\",\"content\":\"谢谢!\"},{\"role\":\"assistant\",\"content\":\"不客气!如果还有其他问题,随时问我。\"}],\"question\":\"谢谢!\",\"answer\":\"不客气!如果还有其他问题,随时问我。\",\"reasoning\":\"\",\"tool_calls\":[]}"}
{"__file_offset__":"4000","timestamp":"2026-02-01T10:00:00Z","ai_log":"{\"session_id\":\"agent:test:discord:9999999999999999999\",\"api\":\"DeepSeek-R1@higress\",\"api_type\":\"LLM\",\"chat_round\":1,\"consumer\":\"clawdbot\",\"input_token\":50,\"output_token\":30,\"model\":\"DeepSeek-R1\",\"response_type\":\"normal\",\"total_token\":80,\"messages\":[{\"role\":\"user\",\"content\":\"计算2+2\"}],\"question\":\"计算2+2\",\"answer\":\"4\",\"reasoning\":\"这是一个简单的加法运算2加2等于4。\",\"tool_calls\":[]}"}

View File

@@ -0,0 +1,4 @@
{"__file_offset__":"1000","timestamp":"2026-02-01T10:00:00Z","ai_log":"{\"session_id\":\"agent:main:discord:1465367993012981988\",\"api\":\"gpt-4o\",\"api_type\":\"LLM\",\"chat_round\":1,\"consumer\":\"clawdbot\",\"input_token\":150,\"output_token\":100,\"reasoning_tokens\":0,\"cached_tokens\":120,\"input_token_details\":\"{\\\"cached_tokens\\\":120}\",\"output_token_details\":\"{}\",\"model\":\"gpt-4o\",\"response_type\":\"normal\",\"total_token\":250,\"messages\":[{\"role\":\"system\",\"content\":\"You are a helpful assistant.\"},{\"role\":\"user\",\"content\":\"你好\"}],\"question\":\"你好\",\"answer\":\"你好!有什么我可以帮助你的吗?\",\"reasoning\":\"\",\"tool_calls\":[]}"}
{"__file_offset__":"2000","timestamp":"2026-02-01T10:01:00Z","ai_log":"{\"session_id\":\"agent:main:discord:1465367993012981988\",\"api\":\"gpt-4o\",\"api_type\":\"LLM\",\"chat_round\":2,\"consumer\":\"clawdbot\",\"input_token\":200,\"output_token\":150,\"reasoning_tokens\":0,\"cached_tokens\":80,\"input_token_details\":\"{\\\"cached_tokens\\\":80}\",\"output_token_details\":\"{}\",\"model\":\"gpt-4o\",\"response_type\":\"normal\",\"total_token\":350,\"messages\":[{\"role\":\"user\",\"content\":\"介绍一下你的能力\"}],\"question\":\"介绍一下你的能力\",\"answer\":\"我可以帮助你回答问题、写作、编程等...\",\"reasoning\":\"\",\"tool_calls\":[]}"}
{"__file_offset__":"3000","timestamp":"2026-02-01T10:02:00Z","ai_log":"{\"session_id\":\"agent:main:discord:9999999999999999999\",\"api\":\"o1\",\"api_type\":\"LLM\",\"chat_round\":1,\"consumer\":\"clawdbot\",\"input_token\":100,\"output_token\":80,\"reasoning_tokens\":500,\"cached_tokens\":0,\"input_token_details\":\"{}\",\"output_token_details\":\"{\\\"reasoning_tokens\\\":500}\",\"model\":\"o1\",\"response_type\":\"normal\",\"total_token\":580,\"messages\":[{\"role\":\"user\",\"content\":\"解释量子纠缠\"}],\"question\":\"解释量子纠缠\",\"answer\":\"量子纠缠是量子力学中的一种现象...\",\"reasoning\":\"这是一个复杂的物理概念,我需要仔细思考如何用简单的方式解释...\",\"tool_calls\":[]}"}
{"__file_offset__":"4000","timestamp":"2026-02-01T10:03:00Z","ai_log":"{\"session_id\":\"agent:main:discord:1465367993012981988\",\"api\":\"gpt-4o\",\"api_type\":\"LLM\",\"chat_round\":3,\"consumer\":\"clawdbot\",\"input_token\":300,\"output_token\":200,\"reasoning_tokens\":0,\"cached_tokens\":200,\"input_token_details\":\"{\\\"cached_tokens\\\":200}\",\"output_token_details\":\"{}\",\"model\":\"gpt-4o\",\"response_type\":\"normal\",\"total_token\":500,\"messages\":[{\"role\":\"user\",\"content\":\"写一个Python函数计算斐波那契数列\"}],\"question\":\"写一个Python函数计算斐波那契数列\",\"answer\":\"```python\\ndef fibonacci(n):\\n if n <= 1:\\n return n\\n return fibonacci(n-1) + fibonacci(n-2)\\n```\",\"reasoning\":\"\",\"tool_calls\":[]}"}

View File

@@ -0,0 +1,137 @@
#!/bin/bash
# 测试日志轮转功能
set -e
SKILL_DIR="$(dirname "$(dirname "$(realpath "$0")")")"
EXAMPLE_DIR="$SKILL_DIR/example"
TEST_DIR="$EXAMPLE_DIR/rotation_test"
LOG_FILE="$TEST_DIR/access.log"
OUTPUT_DIR="$TEST_DIR/sessions"
echo "========================================"
echo "Log Rotation Test"
echo "========================================"
echo ""
# 清理旧测试数据
rm -rf "$TEST_DIR"
mkdir -p "$TEST_DIR"
echo "📁 Test directory: $TEST_DIR"
echo ""
# 模拟日志轮转场景
echo "========================================"
echo "步骤1创建初始日志文件"
echo "========================================"
# 创建第一批日志10条
for i in {1..10}; do
echo "{\"timestamp\":\"2026-02-01T10:0${i}:00Z\",\"ai_log\":\"{\\\"session_id\\\":\\\"session_001\\\",\\\"model\\\":\\\"gpt-4o\\\",\\\"input_token\\\":$((100+i)),\\\"output_token\\\":$((50+i)),\\\"cached_tokens\\\":$((30+i))}\"}" >> "$LOG_FILE"
done
echo "✅ Created $LOG_FILE with 10 lines"
echo ""
# 首次解析
echo "========================================"
echo "步骤2首次解析应该处理10条记录"
echo "========================================"
python3 "$SKILL_DIR/main.py" \
--log-path "$LOG_FILE" \
--output-dir "$OUTPUT_DIR" \
echo ""
# 检查session数据
echo "Session数据"
cat "$OUTPUT_DIR/session_001.json" | python3 -c "import sys, json; d=json.load(sys.stdin); print(f\" Messages: {d['messages_count']}, Total Input: {d['total_input_tokens']}\")"
echo ""
# 模拟日志轮转
echo "========================================"
echo "步骤3模拟日志轮转"
echo "========================================"
mv "$LOG_FILE" "$LOG_FILE.1"
echo "✅ Rotated: access.log -> access.log.1"
echo ""
# 创建新的日志文件5条新记录
for i in {11..15}; do
echo "{\"timestamp\":\"2026-02-01T10:${i}:00Z\",\"ai_log\":\"{\\\"session_id\\\":\\\"session_001\\\",\\\"model\\\":\\\"gpt-4o\\\",\\\"input_token\\\":$((100+i)),\\\"output_token\\\":$((50+i)),\\\"cached_tokens\\\":$((30+i))}\"}" >> "$LOG_FILE"
done
echo "✅ Created new $LOG_FILE with 5 lines"
echo ""
# 再次解析应该只处理新的5条
echo "========================================"
echo "步骤4再次解析应该只处理新的5条"
echo "========================================"
python3 "$SKILL_DIR/main.py" \
--log-path "$LOG_FILE" \
--output-dir "$OUTPUT_DIR" \
echo ""
# 检查session数据
echo "Session数据"
cat "$OUTPUT_DIR/session_001.json" | python3 -c "import sys, json; d=json.load(sys.stdin); print(f\" Messages: {d['messages_count']}, Total Input: {d['total_input_tokens']} (应该是15条记录)\")"
echo ""
# 再次轮转
echo "========================================"
echo "步骤5再次轮转"
echo "========================================"
mv "$LOG_FILE.1" "$LOG_FILE.2"
mv "$LOG_FILE" "$LOG_FILE.1"
echo "✅ Rotated: access.log -> access.log.1"
echo "✅ Rotated: access.log.1 -> access.log.2"
echo ""
# 创建新的日志文件3条新记录
for i in {16..18}; do
echo "{\"timestamp\":\"2026-02-01T10:${i}:00Z\",\"ai_log\":\"{\\\"session_id\\\":\\\"session_001\\\",\\\"model\\\":\\\"gpt-4o\\\",\\\"input_token\\\":$((100+i)),\\\"output_token\\\":$((50+i)),\\\"cached_tokens\\\":$((30+i))}\"}" >> "$LOG_FILE"
done
echo "✅ Created new $LOG_FILE with 3 lines"
echo ""
# 再次解析应该只处理新的3条
echo "========================================"
echo "步骤6再次解析应该只处理新的3条"
echo "========================================"
python3 "$SKILL_DIR/main.py" \
--log-path "$LOG_FILE" \
--output-dir "$OUTPUT_DIR" \
echo ""
# 检查session数据
echo "Session数据"
cat "$OUTPUT_DIR/session_001.json" | python3 -c "import sys, json; d=json.load(sys.stdin); print(f\" Messages: {d['messages_count']}, Total Input: {d['total_input_tokens']} (应该是18条记录)\")"
echo ""
# 检查状态文件
echo "========================================"
echo "步骤7查看状态文件"
echo "========================================"
echo "状态文件内容:"
cat "$OUTPUT_DIR/.state.json" | python3 -m json.tool | head -20
echo ""
echo "========================================"
echo "✅ 测试完成!"
echo "========================================"
echo ""
echo "💡 验证要点:"
echo " 1. 首次解析处理了10条记录"
echo " 2. 轮转后只处理新增的5条记录总计15条"
echo " 3. 再次轮转后只处理新增的3条记录总计18条"
echo " 4. 状态文件记录了每个文件的inode和offset"
echo ""
echo "📂 测试数据保存在: $TEST_DIR/"

View File

@@ -0,0 +1,639 @@
#!/usr/bin/env python3
"""
Agent Session Monitor - 实时Agent对话观测程序
监控Higress访问日志按session聚合对话追踪token开销
"""
import argparse
import json
import re
import os
import sys
import time
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
# 使用定时轮询机制不依赖watchdog
# ============================================================================
# 配置
# ============================================================================
# Token定价单位美元/1M tokens
TOKEN_PRICING = {
"Qwen": {
"input": 0.0002, # $0.2/1M
"output": 0.0006,
"cached": 0.0001, # cached tokens通常是input的50%
},
"Qwen3-rerank": {
"input": 0.0003,
"output": 0.0012,
"cached": 0.00015,
},
"Qwen-Max": {
"input": 0.0005,
"output": 0.002,
"cached": 0.00025,
},
"GPT-4": {
"input": 0.003,
"output": 0.006,
"cached": 0.0015,
},
"GPT-4o": {
"input": 0.0025,
"output": 0.01,
"cached": 0.00125, # GPT-4o prompt caching: 50% discount
},
"GPT-4-32k": {
"input": 0.01,
"output": 0.03,
"cached": 0.005,
},
"o1": {
"input": 0.015,
"output": 0.06,
"cached": 0.0075,
"reasoning": 0.06, # o1 reasoning tokens same as output
},
"o1-mini": {
"input": 0.003,
"output": 0.012,
"cached": 0.0015,
"reasoning": 0.012,
},
"Claude": {
"input": 0.015,
"output": 0.075,
"cached": 0.0015, # Claude prompt caching: 90% discount
},
"DeepSeek-R1": {
"input": 0.004,
"output": 0.012,
"reasoning": 0.002,
"cached": 0.002,
}
}
DEFAULT_LOG_PATH = "/var/log/higress/access.log"
DEFAULT_OUTPUT_DIR = "./sessions"
# ============================================================================
# Session管理器
# ============================================================================
class SessionManager:
"""管理多个会话的token统计"""
def __init__(self, output_dir: str, load_existing: bool = True):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.sessions: Dict[str, dict] = {}
# 加载已有的session数据
if load_existing:
self._load_existing_sessions()
def _load_existing_sessions(self):
"""加载已有的session数据"""
loaded_count = 0
for session_file in self.output_dir.glob("*.json"):
try:
with open(session_file, 'r', encoding='utf-8') as f:
session = json.load(f)
session_id = session.get('session_id')
if session_id:
self.sessions[session_id] = session
loaded_count += 1
except Exception as e:
print(f"Warning: Failed to load session {session_file}: {e}", file=sys.stderr)
if loaded_count > 0:
print(f"📦 Loaded {loaded_count} existing session(s)")
def update_session(self, session_id: str, ai_log: dict) -> dict:
"""更新或创建session"""
if session_id not in self.sessions:
self.sessions[session_id] = {
"session_id": session_id,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"messages_count": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"total_reasoning_tokens": 0,
"total_cached_tokens": 0,
"rounds": [],
"model": ai_log.get("model", "unknown")
}
session = self.sessions[session_id]
# 更新统计
model = ai_log.get("model", "unknown")
session["model"] = model
session["updated_at"] = datetime.now().isoformat()
# Token统计
session["total_input_tokens"] += ai_log.get("input_token", 0)
session["total_output_tokens"] += ai_log.get("output_token", 0)
# 检查reasoning tokens优先使用ai_log中的reasoning_tokens字段
reasoning_tokens = ai_log.get("reasoning_tokens", 0)
if reasoning_tokens == 0 and "reasoning" in ai_log and ai_log["reasoning"]:
# 如果没有reasoning_tokens字段估算reasoning的token数大致按字符数/4
reasoning_text = ai_log["reasoning"]
reasoning_tokens = len(reasoning_text) // 4
session["total_reasoning_tokens"] += reasoning_tokens
# 检查cached tokensprompt caching
cached_tokens = ai_log.get("cached_tokens", 0)
session["total_cached_tokens"] += cached_tokens
# 检查是否有tool_calls工具调用
has_tool_calls = "tool_calls" in ai_log and ai_log["tool_calls"]
# 更新消息数
session["messages_count"] += 1
# 解析token details如果有
input_token_details = {}
output_token_details = {}
if "input_token_details" in ai_log:
try:
# input_token_details可能是字符串或字典
details = ai_log["input_token_details"]
if isinstance(details, str):
import json
input_token_details = json.loads(details)
else:
input_token_details = details
except (json.JSONDecodeError, TypeError):
pass
if "output_token_details" in ai_log:
try:
# output_token_details可能是字符串或字典
details = ai_log["output_token_details"]
if isinstance(details, str):
import json
output_token_details = json.loads(details)
else:
output_token_details = details
except (json.JSONDecodeError, TypeError):
pass
# 添加轮次记录包含完整的llm请求和响应信息
round_data = {
"round": session["messages_count"],
"timestamp": datetime.now().isoformat(),
"input_tokens": ai_log.get("input_token", 0),
"output_tokens": ai_log.get("output_token", 0),
"reasoning_tokens": reasoning_tokens,
"cached_tokens": cached_tokens,
"model": model,
"has_tool_calls": has_tool_calls,
"response_type": ai_log.get("response_type", "normal"),
# 完整的对话信息
"messages": ai_log.get("messages", []),
"question": ai_log.get("question", ""),
"answer": ai_log.get("answer", ""),
"reasoning": ai_log.get("reasoning", ""),
"tool_calls": ai_log.get("tool_calls", []),
# Token详情
"input_token_details": input_token_details,
"output_token_details": output_token_details,
}
session["rounds"].append(round_data)
# 保存到文件
self._save_session(session)
return session
def _save_session(self, session: dict):
"""保存session数据到文件"""
session_file = self.output_dir / f"{session['session_id']}.json"
with open(session_file, 'w', encoding='utf-8') as f:
json.dump(session, f, ensure_ascii=False, indent=2)
def get_all_sessions(self) -> List[dict]:
"""获取所有session"""
return list(self.sessions.values())
def get_session(self, session_id: str) -> Optional[dict]:
"""获取指定session"""
return self.sessions.get(session_id)
def get_summary(self) -> dict:
"""获取总体统计"""
total_input = sum(s["total_input_tokens"] for s in self.sessions.values())
total_output = sum(s["total_output_tokens"] for s in self.sessions.values())
total_reasoning = sum(s.get("total_reasoning_tokens", 0) for s in self.sessions.values())
total_cached = sum(s.get("total_cached_tokens", 0) for s in self.sessions.values())
# 计算成本
total_cost = 0
for session in self.sessions.values():
model = session.get("model", "unknown")
input_tokens = session["total_input_tokens"]
output_tokens = session["total_output_tokens"]
reasoning_tokens = session.get("total_reasoning_tokens", 0)
cached_tokens = session.get("total_cached_tokens", 0)
pricing = TOKEN_PRICING.get(model, TOKEN_PRICING.get("GPT-4", {}))
# 基础成本计算
# 注意cached_tokens已经包含在input_tokens中需要分开计算
regular_input_tokens = input_tokens - cached_tokens
input_cost = regular_input_tokens * pricing.get("input", 0) / 1000000
output_cost = output_tokens * pricing.get("output", 0) / 1000000
# reasoning成本
reasoning_cost = 0
if "reasoning" in pricing and reasoning_tokens > 0:
reasoning_cost = reasoning_tokens * pricing["reasoning"] / 1000000
# cached成本通常比input便宜
cached_cost = 0
if "cached" in pricing and cached_tokens > 0:
cached_cost = cached_tokens * pricing["cached"] / 1000000
total_cost += input_cost + output_cost + reasoning_cost + cached_cost
return {
"total_sessions": len(self.sessions),
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"total_reasoning_tokens": total_reasoning,
"total_cached_tokens": total_cached,
"total_tokens": total_input + total_output + total_reasoning + total_cached,
"total_cost_usd": round(total_cost, 4),
"active_session_ids": list(self.sessions.keys())
}
# ============================================================================
# 日志解析器
# ============================================================================
class LogParser:
"""解析Higress访问日志提取ai_log支持日志轮转"""
def __init__(self, state_file: str = None):
self.state_file = Path(state_file) if state_file else None
self.file_offsets = {} # {文件路径: 已读取的字节偏移}
self._load_state()
def _load_state(self):
"""加载上次的读取状态"""
if self.state_file and self.state_file.exists():
try:
with open(self.state_file, 'r') as f:
self.file_offsets = json.load(f)
except Exception as e:
print(f"Warning: Failed to load state file: {e}", file=sys.stderr)
def _save_state(self):
"""保存当前的读取状态"""
if self.state_file:
try:
self.state_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file, 'w') as f:
json.dump(self.file_offsets, f, indent=2)
except Exception as e:
print(f"Warning: Failed to save state file: {e}", file=sys.stderr)
def parse_log_line(self, line: str) -> Optional[dict]:
"""解析单行日志提取ai_log JSON"""
try:
# 直接解析整个日志行为JSON
log_obj = json.loads(line.strip())
# 获取ai_log字段这是一个JSON字符串
if 'ai_log' in log_obj:
ai_log_str = log_obj['ai_log']
# 解析内层JSON
ai_log = json.loads(ai_log_str)
return ai_log
except (json.JSONDecodeError, ValueError, KeyError):
# 静默忽略非JSON行或缺少ai_log字段的行
pass
return None
def parse_rotated_logs(self, log_pattern: str, session_manager) -> None:
"""解析日志文件及其轮转文件
Args:
log_pattern: 日志文件路径,如 /var/log/proxy/access.log
session_manager: Session管理器
"""
base_path = Path(log_pattern)
# 自动扫描所有轮转的日志文件(从旧到新)
log_files = []
# 自动扫描轮转文件(最多扫描到 .100,超过这个数量的日志应该很少见)
for i in range(100, 0, -1):
rotated_path = Path(f"{log_pattern}.{i}")
if rotated_path.exists():
log_files.append(str(rotated_path))
# 添加当前日志文件
if base_path.exists():
log_files.append(str(base_path))
if not log_files:
print(f"❌ No log files found for pattern: {log_pattern}")
return
print(f"📂 Found {len(log_files)} log file(s):")
for f in log_files:
print(f" - {f}")
print()
# 按顺序解析每个文件(从旧到新)
for log_file in log_files:
self._parse_file_incremental(log_file, session_manager)
# 保存状态
self._save_state()
def _parse_file_incremental(self, file_path: str, session_manager) -> None:
"""增量解析单个日志文件"""
try:
file_stat = os.stat(file_path)
file_size = file_stat.st_size
file_inode = file_stat.st_ino
# 使用inode作为主键
inode_key = str(file_inode)
last_offset = self.file_offsets.get(inode_key, 0)
# 如果文件变小了说明是新文件被truncate或新创建从头开始读
if file_size < last_offset:
print(f" 📝 File truncated or recreated, reading from start: {file_path}")
last_offset = 0
# 如果offset相同说明没有新内容
if file_size == last_offset:
print(f" ⏭️ No new content in: {file_path} (inode:{inode_key})")
return
print(f" 📖 Reading {file_path} from offset {last_offset} to {file_size} (inode:{inode_key})")
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(last_offset)
lines_processed = 0
for line in f:
ai_log = self.parse_log_line(line)
if ai_log:
session_id = ai_log.get("session_id", "default")
session_manager.update_session(session_id, ai_log)
lines_processed += 1
# 每处理1000行打印一次进度
if lines_processed % 1000 == 0:
print(f" Processed {lines_processed} lines, {len(session_manager.sessions)} sessions")
# 更新offset使用inode作为key
current_offset = f.tell()
self.file_offsets[inode_key] = current_offset
print(f" ✅ Processed {lines_processed} new lines from {file_path}")
except FileNotFoundError:
print(f" ❌ File not found: {file_path}")
except Exception as e:
print(f" ❌ Error parsing {file_path}: {e}")
# ============================================================================
# 实时显示器
# ============================================================================
class RealtimeMonitor:
"""实时监控显示和交互(定时轮询模式)"""
def __init__(self, session_manager: SessionManager, log_parser=None, log_path: str = None, refresh_interval: int = 1):
self.session_manager = session_manager
self.log_parser = log_parser
self.log_path = log_path
self.refresh_interval = refresh_interval
self.running = True
self.last_poll_time = 0
def start(self):
"""启动实时监控(定时轮询日志文件)"""
print(f"\n{'=' * 50}")
print(f"🔍 Agent Session Monitor - Real-time View")
print(f"{'=' * 50}")
print()
print("Press Ctrl+C to stop...")
print()
try:
while self.running:
# 定时轮询日志文件(检查新增内容和轮转)
current_time = time.time()
if self.log_parser and self.log_path and (current_time - self.last_poll_time >= self.refresh_interval):
self.log_parser.parse_rotated_logs(self.log_path, self.session_manager)
self.last_poll_time = current_time
# 显示状态
self._display_status()
time.sleep(self.refresh_interval)
except KeyboardInterrupt:
print("\n\n👋 Stopping monitor...")
self.running = False
self._display_summary()
def _display_status(self):
"""显示当前状态"""
summary = self.session_manager.get_summary()
# 清屏
os.system('clear' if os.name == 'posix' else 'cls')
print(f"{'=' * 50}")
print(f"🔍 Session Monitor - Active")
print(f"{'=' * 50}")
print()
print(f"📊 Active Sessions: {summary['total_sessions']}")
print()
# 显示活跃session的token统计
if summary['active_session_ids']:
print("┌──────────────────────────┬─────────┬──────────┬───────────┐")
print("│ Session ID │ Msgs │ Input │ Output │")
print("├──────────────────────────┼─────────┼──────────┼───────────┤")
for session_id in summary['active_session_ids'][:10]: # 最多显示10个
session = self.session_manager.get_session(session_id)
if session:
sid = session_id[:24] if len(session_id) > 24 else session_id
print(f"{sid:<24}{session['messages_count']:>7}{session['total_input_tokens']:>8,}{session['total_output_tokens']:>9,}")
print("└──────────────────────────┴─────────┴──────────┴───────────┘")
print()
print(f"📈 Token Statistics")
print(f" Total Input: {summary['total_input_tokens']:,} tokens")
print(f" Total Output: {summary['total_output_tokens']:,} tokens")
if summary['total_reasoning_tokens'] > 0:
print(f" Total Reasoning: {summary['total_reasoning_tokens']:,} tokens")
print(f" Total Cached: {summary['total_cached_tokens']:,} tokens")
print(f" Total Cost: ${summary['total_cost_usd']:.4f}")
def _display_summary(self):
"""显示最终汇总"""
summary = self.session_manager.get_summary()
print()
print(f"{'=' * 50}")
print(f"📊 Session Monitor - Summary")
print(f"{'=' * 50}")
print()
print(f"📈 Final Statistics")
print(f" Total Sessions: {summary['total_sessions']}")
print(f" Total Input: {summary['total_input_tokens']:,} tokens")
print(f" Total Output: {summary['total_output_tokens']:,} tokens")
if summary['total_reasoning_tokens'] > 0:
print(f" Total Reasoning: {summary['total_reasoning_tokens']:,} tokens")
print(f" Total Cached: {summary['total_cached_tokens']:,} tokens")
print(f" Total Tokens: {summary['total_tokens']:,} tokens")
print(f" Total Cost: ${summary['total_cost_usd']:.4f}")
print(f"{'=' * 50}")
print()
# ============================================================================
# 主程序
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="Agent Session Monitor - 实时监控多轮Agent对话的token开销",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 监控默认日志
%(prog)s
# 监控指定日志文件
%(prog)s --log-path /var/log/higress/access.log
# 设置预算为500K tokens
%(prog)s --budget 500000
# 监控特定session
%(prog)s --session-key agent:main:discord:channel:1465367993012981988
""",
allow_abbrev=False
)
parser.add_argument(
'--log-path',
default=DEFAULT_LOG_PATH,
help=f'Higress访问日志文件路径默认: {DEFAULT_LOG_PATH}'
)
parser.add_argument(
'--output-dir',
default=DEFAULT_OUTPUT_DIR,
help=f'Session数据存储目录默认: {DEFAULT_OUTPUT_DIR}'
)
parser.add_argument(
'--session-key',
help='只监控包含指定session key的日志'
)
parser.add_argument(
'--refresh-interval',
type=int,
default=1,
help=f'实时监控刷新间隔(秒,默认: 1'
)
parser.add_argument(
'--state-file',
help='状态文件路径用于记录已读取的offset默认: <output-dir>/.state.json'
)
args = parser.parse_args()
# 初始化组件
session_manager = SessionManager(output_dir=args.output_dir)
# 状态文件路径
state_file = args.state_file or str(Path(args.output_dir) / '.state.json')
log_parser = LogParser(state_file=state_file)
print(f"{'=' * 60}")
print(f"🔍 Agent Session Monitor")
print(f"{'=' * 60}")
print()
print(f"📂 Log path: {args.log_path}")
print(f"📁 Output dir: {args.output_dir}")
if args.session_key:
print(f"🔑 Session key filter: {args.session_key}")
print(f"{'=' * 60}")
print()
# 模式选择:实时监控或单次解析
if len(sys.argv) == 1:
# 默认模式:实时监控(定时轮询)
print("📺 Mode: Real-time monitoring (polling mode with log rotation support)")
print(f" Refresh interval: {args.refresh_interval} second(s)")
print()
# 首次解析现有日志文件(包括轮转的文件)
log_parser.parse_rotated_logs(args.log_path, session_manager)
# 启动实时监控(定时轮询模式)
monitor = RealtimeMonitor(
session_manager,
log_parser=log_parser,
log_path=args.log_path,
refresh_interval=args.refresh_interval
)
monitor.start()
else:
# 单次解析模式
print("📊 Mode: One-time log parsing (with log rotation support)")
print()
log_parser.parse_rotated_logs(args.log_path, session_manager)
# 显示汇总
summary = session_manager.get_summary()
print(f"\n{'=' * 50}")
print(f"📊 Session Summary")
print(f"{'=' * 50}")
print()
print(f"📈 Final Statistics")
print(f" Total Sessions: {summary['total_sessions']}")
print(f" Total Input: {summary['total_input_tokens']:,} tokens")
print(f" Total Output: {summary['total_output_tokens']:,} tokens")
if summary['total_reasoning_tokens'] > 0:
print(f" Total Reasoning: {summary['total_reasoning_tokens']:,} tokens")
print(f" Total Cached: {summary['total_cached_tokens']:,} tokens")
print(f" Total Tokens: {summary['total_tokens']:,} tokens")
print(f" Total Cost: ${summary['total_cost_usd']:.4f}")
print(f"{'=' * 50}")
print()
print(f"💾 Session data saved to: {args.output_dir}/")
print(f" Run with --output-dir to specify custom directory")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,600 @@
#!/usr/bin/env python3
"""
Agent Session Monitor CLI - 查询和分析agent对话数据
支持:
1. 实时查询指定session的完整llm请求和响应
2. 按模型统计token开销
3. 按日期统计token开销
4. 生成FinOps报表
"""
import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
import re
# Token定价单位美元/1M tokens
TOKEN_PRICING = {
"Qwen": {
"input": 0.0002, # $0.2/1M
"output": 0.0006,
"cached": 0.0001, # cached tokens通常是input的50%
},
"Qwen3-rerank": {
"input": 0.0003,
"output": 0.0012,
"cached": 0.00015,
},
"Qwen-Max": {
"input": 0.0005,
"output": 0.002,
"cached": 0.00025,
},
"GPT-4": {
"input": 0.003,
"output": 0.006,
"cached": 0.0015,
},
"GPT-4o": {
"input": 0.0025,
"output": 0.01,
"cached": 0.00125, # GPT-4o prompt caching: 50% discount
},
"GPT-4-32k": {
"input": 0.01,
"output": 0.03,
"cached": 0.005,
},
"o1": {
"input": 0.015,
"output": 0.06,
"cached": 0.0075,
"reasoning": 0.06, # o1 reasoning tokens same as output
},
"o1-mini": {
"input": 0.003,
"output": 0.012,
"cached": 0.0015,
"reasoning": 0.012,
},
"Claude": {
"input": 0.015,
"output": 0.075,
"cached": 0.0015, # Claude prompt caching: 90% discount
},
"DeepSeek-R1": {
"input": 0.004,
"output": 0.012,
"reasoning": 0.002,
"cached": 0.002,
}
}
class SessionAnalyzer:
"""Session数据分析器"""
def __init__(self, data_dir: str):
self.data_dir = Path(data_dir)
if not self.data_dir.exists():
raise FileNotFoundError(f"Session data directory not found: {data_dir}")
def load_session(self, session_id: str) -> Optional[dict]:
"""加载指定session的完整数据"""
session_file = self.data_dir / f"{session_id}.json"
if not session_file.exists():
return None
with open(session_file, 'r', encoding='utf-8') as f:
return json.load(f)
def load_all_sessions(self) -> List[dict]:
"""加载所有session数据"""
sessions = []
for session_file in self.data_dir.glob("*.json"):
try:
with open(session_file, 'r', encoding='utf-8') as f:
session = json.load(f)
sessions.append(session)
except Exception as e:
print(f"Warning: Failed to load {session_file}: {e}", file=sys.stderr)
return sessions
def display_session_detail(self, session_id: str, show_messages: bool = True):
"""显示session的详细信息"""
session = self.load_session(session_id)
if not session:
print(f"❌ Session not found: {session_id}")
return
print(f"\n{'='*70}")
print(f"📊 Session Detail: {session_id}")
print(f"{'='*70}\n")
# 基本信息
print(f"🕐 Created: {session['created_at']}")
print(f"🕑 Updated: {session['updated_at']}")
print(f"🤖 Model: {session['model']}")
print(f"💬 Messages: {session['messages_count']}")
print()
# Token统计
print(f"📈 Token Statistics:")
total_input = session['total_input_tokens']
total_output = session['total_output_tokens']
total_reasoning = session.get('total_reasoning_tokens', 0)
total_cached = session.get('total_cached_tokens', 0)
# 区分regular input和cached input
regular_input = total_input - total_cached
if total_cached > 0:
print(f" Input: {regular_input:>10,} tokens (regular)")
print(f" Cached: {total_cached:>10,} tokens (from cache)")
print(f" Total Input:{total_input:>10,} tokens")
else:
print(f" Input: {total_input:>10,} tokens")
print(f" Output: {total_output:>10,} tokens")
if total_reasoning > 0:
print(f" Reasoning: {total_reasoning:>10,} tokens")
# 总计不重复计算cached
total_tokens = total_input + total_output + total_reasoning
print(f" ────────────────────────")
print(f" Total: {total_tokens:>10,} tokens")
print()
# 成本计算
cost = self._calculate_cost(session)
print(f"💰 Estimated Cost: ${cost:.8f} USD")
print()
# 对话轮次
if show_messages and 'rounds' in session:
print(f"📝 Conversation Rounds ({len(session['rounds'])}):")
print(f"{''*70}")
for i, round_data in enumerate(session['rounds'], 1):
timestamp = round_data.get('timestamp', 'N/A')
input_tokens = round_data.get('input_tokens', 0)
output_tokens = round_data.get('output_tokens', 0)
has_tool_calls = round_data.get('has_tool_calls', False)
response_type = round_data.get('response_type', 'normal')
print(f"\n Round {i} @ {timestamp}")
print(f" Tokens: {input_tokens:,} in → {output_tokens:,} out")
if has_tool_calls:
print(f" 🔧 Tool calls: Yes")
if response_type != 'normal':
print(f" Type: {response_type}")
# 显示完整的messages如果有
if 'messages' in round_data:
messages = round_data['messages']
print(f" Messages ({len(messages)}):")
for msg in messages[-3:]: # 只显示最后3条
role = msg.get('role', 'unknown')
content = msg.get('content', '')
content_preview = content[:100] + '...' if len(content) > 100 else content
print(f" [{role}] {content_preview}")
# 显示question/answer/reasoning如果有
if 'question' in round_data:
q = round_data['question']
q_preview = q[:150] + '...' if len(q) > 150 else q
print(f" ❓ Question: {q_preview}")
if 'answer' in round_data:
a = round_data['answer']
a_preview = a[:150] + '...' if len(a) > 150 else a
print(f" ✅ Answer: {a_preview}")
if 'reasoning' in round_data and round_data['reasoning']:
r = round_data['reasoning']
r_preview = r[:150] + '...' if len(r) > 150 else r
print(f" 🧠 Reasoning: {r_preview}")
if 'tool_calls' in round_data and round_data['tool_calls']:
print(f" 🛠️ Tool Calls:")
for tool_call in round_data['tool_calls']:
func_name = tool_call.get('function', {}).get('name', 'unknown')
args = tool_call.get('function', {}).get('arguments', '')
print(f" - {func_name}({args[:80]}...)")
# 显示token details如果有
if round_data.get('input_token_details'):
print(f" 📊 Input Token Details: {round_data['input_token_details']}")
if round_data.get('output_token_details'):
print(f" 📊 Output Token Details: {round_data['output_token_details']}")
print(f"\n{''*70}")
print(f"\n{'='*70}\n")
def _calculate_cost(self, session: dict) -> float:
"""计算session的成本"""
model = session.get('model', 'unknown')
pricing = TOKEN_PRICING.get(model, TOKEN_PRICING.get("GPT-4", {}))
input_tokens = session['total_input_tokens']
output_tokens = session['total_output_tokens']
reasoning_tokens = session.get('total_reasoning_tokens', 0)
cached_tokens = session.get('total_cached_tokens', 0)
# 区分regular input和cached input
regular_input_tokens = input_tokens - cached_tokens
input_cost = regular_input_tokens * pricing.get('input', 0) / 1000000
output_cost = output_tokens * pricing.get('output', 0) / 1000000
reasoning_cost = 0
if 'reasoning' in pricing and reasoning_tokens > 0:
reasoning_cost = reasoning_tokens * pricing['reasoning'] / 1000000
cached_cost = 0
if 'cached' in pricing and cached_tokens > 0:
cached_cost = cached_tokens * pricing['cached'] / 1000000
return input_cost + output_cost + reasoning_cost + cached_cost
def stats_by_model(self) -> Dict[str, dict]:
"""按模型统计token开销"""
sessions = self.load_all_sessions()
stats = defaultdict(lambda: {
'session_count': 0,
'total_input': 0,
'total_output': 0,
'total_reasoning': 0,
'total_cost': 0.0
})
for session in sessions:
model = session.get('model', 'unknown')
stats[model]['session_count'] += 1
stats[model]['total_input'] += session['total_input_tokens']
stats[model]['total_output'] += session['total_output_tokens']
stats[model]['total_reasoning'] += session.get('total_reasoning_tokens', 0)
stats[model]['total_cost'] += self._calculate_cost(session)
return dict(stats)
def stats_by_date(self, days: int = 30) -> Dict[str, dict]:
"""按日期统计token开销最近N天"""
sessions = self.load_all_sessions()
stats = defaultdict(lambda: {
'session_count': 0,
'total_input': 0,
'total_output': 0,
'total_reasoning': 0,
'total_cost': 0.0,
'models': set()
})
cutoff_date = datetime.now() - timedelta(days=days)
for session in sessions:
created_at = datetime.fromisoformat(session['created_at'])
if created_at < cutoff_date:
continue
date_key = created_at.strftime('%Y-%m-%d')
stats[date_key]['session_count'] += 1
stats[date_key]['total_input'] += session['total_input_tokens']
stats[date_key]['total_output'] += session['total_output_tokens']
stats[date_key]['total_reasoning'] += session.get('total_reasoning_tokens', 0)
stats[date_key]['total_cost'] += self._calculate_cost(session)
stats[date_key]['models'].add(session.get('model', 'unknown'))
# 转换sets为lists以便JSON序列化
for date_key in stats:
stats[date_key]['models'] = list(stats[date_key]['models'])
return dict(stats)
def display_model_stats(self):
"""显示按模型的统计"""
stats = self.stats_by_model()
print(f"\n{'='*80}")
print(f"📊 Statistics by Model")
print(f"{'='*80}\n")
print(f"{'Model':<20} {'Sessions':<10} {'Input':<15} {'Output':<15} {'Cost (USD)':<12}")
print(f"{''*80}")
# 按成本降序排列
sorted_models = sorted(stats.items(), key=lambda x: x[1]['total_cost'], reverse=True)
for model, data in sorted_models:
print(f"{model:<20} "
f"{data['session_count']:<10} "
f"{data['total_input']:>12,} "
f"{data['total_output']:>12,} "
f"${data['total_cost']:>10.6f}")
# 总计
total_sessions = sum(d['session_count'] for d in stats.values())
total_input = sum(d['total_input'] for d in stats.values())
total_output = sum(d['total_output'] for d in stats.values())
total_cost = sum(d['total_cost'] for d in stats.values())
print(f"{''*80}")
print(f"{'TOTAL':<20} "
f"{total_sessions:<10} "
f"{total_input:>12,} "
f"{total_output:>12,} "
f"${total_cost:>10.6f}")
print(f"\n{'='*80}\n")
def display_date_stats(self, days: int = 30):
"""显示按日期的统计"""
stats = self.stats_by_date(days)
print(f"\n{'='*80}")
print(f"📊 Statistics by Date (Last {days} days)")
print(f"{'='*80}\n")
print(f"{'Date':<12} {'Sessions':<10} {'Input':<15} {'Output':<15} {'Cost (USD)':<12} {'Models':<20}")
print(f"{''*80}")
# 按日期升序排列
sorted_dates = sorted(stats.items())
for date, data in sorted_dates:
models_str = ', '.join(data['models'][:3]) # 最多显示3个模型
if len(data['models']) > 3:
models_str += f" +{len(data['models'])-3}"
print(f"{date:<12} "
f"{data['session_count']:<10} "
f"{data['total_input']:>12,} "
f"{data['total_output']:>12,} "
f"${data['total_cost']:>10.4f} "
f"{models_str}")
# 总计
total_sessions = sum(d['session_count'] for d in stats.values())
total_input = sum(d['total_input'] for d in stats.values())
total_output = sum(d['total_output'] for d in stats.values())
total_cost = sum(d['total_cost'] for d in stats.values())
print(f"{''*80}")
print(f"{'TOTAL':<12} "
f"{total_sessions:<10} "
f"{total_input:>12,} "
f"{total_output:>12,} "
f"${total_cost:>10.4f}")
print(f"\n{'='*80}\n")
def list_sessions(self, limit: int = 20, sort_by: str = 'updated'):
"""列出所有session"""
sessions = self.load_all_sessions()
# 排序
if sort_by == 'updated':
sessions.sort(key=lambda s: s.get('updated_at', ''), reverse=True)
elif sort_by == 'cost':
sessions.sort(key=lambda s: self._calculate_cost(s), reverse=True)
elif sort_by == 'tokens':
sessions.sort(key=lambda s: s['total_input_tokens'] + s['total_output_tokens'], reverse=True)
print(f"\n{'='*100}")
print(f"📋 Sessions (sorted by {sort_by}, showing {min(limit, len(sessions))} of {len(sessions)})")
print(f"{'='*100}\n")
print(f"{'Session ID':<30} {'Updated':<20} {'Model':<15} {'Msgs':<6} {'Tokens':<12} {'Cost':<10}")
print(f"{''*100}")
for session in sessions[:limit]:
session_id = session['session_id'][:28] + '..' if len(session['session_id']) > 30 else session['session_id']
updated = session.get('updated_at', 'N/A')[:19]
model = session.get('model', 'unknown')[:13]
msg_count = session.get('messages_count', 0)
total_tokens = session['total_input_tokens'] + session['total_output_tokens']
cost = self._calculate_cost(session)
print(f"{session_id:<30} {updated:<20} {model:<15} {msg_count:<6} {total_tokens:>10,} ${cost:>8.4f}")
print(f"\n{'='*100}\n")
def export_finops_report(self, output_file: str, format: str = 'json'):
"""导出FinOps报表"""
model_stats = self.stats_by_model()
date_stats = self.stats_by_date(30)
report = {
'generated_at': datetime.now().isoformat(),
'summary': {
'total_sessions': sum(d['session_count'] for d in model_stats.values()),
'total_input_tokens': sum(d['total_input'] for d in model_stats.values()),
'total_output_tokens': sum(d['total_output'] for d in model_stats.values()),
'total_cost_usd': sum(d['total_cost'] for d in model_stats.values()),
},
'by_model': model_stats,
'by_date': date_stats,
}
output_path = Path(output_file)
if format == 'json':
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"✅ FinOps report exported to: {output_path}")
elif format == 'csv':
import csv
# 按模型导出CSV
model_csv = output_path.with_suffix('.model.csv')
with open(model_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Model', 'Sessions', 'Input Tokens', 'Output Tokens', 'Cost (USD)'])
for model, data in model_stats.items():
writer.writerow([
model,
data['session_count'],
data['total_input'],
data['total_output'],
f"{data['total_cost']:.6f}"
])
# 按日期导出CSV
date_csv = output_path.with_suffix('.date.csv')
with open(date_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Date', 'Sessions', 'Input Tokens', 'Output Tokens', 'Cost (USD)', 'Models'])
for date, data in sorted(date_stats.items()):
writer.writerow([
date,
data['session_count'],
data['total_input'],
data['total_output'],
f"{data['total_cost']:.6f}",
', '.join(data['models'])
])
print(f"✅ FinOps report exported to:")
print(f" Model stats: {model_csv}")
print(f" Date stats: {date_csv}")
def main():
parser = argparse.ArgumentParser(
description="Agent Session Monitor CLI - 查询和分析agent对话数据",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Commands:
show <session-id> 显示session的详细信息
list 列出所有session
stats-model 按模型统计token开销
stats-date 按日期统计token开销默认30天
export 导出FinOps报表
Examples:
# 查看特定session的详细对话
%(prog)s show agent:main:discord:channel:1465367993012981988
# 列出最近20个session按更新时间
%(prog)s list
# 列出token开销最高的10个session
%(prog)s list --sort-by cost --limit 10
# 按模型统计token开销
%(prog)s stats-model
# 按日期统计token开销最近7天
%(prog)s stats-date --days 7
# 导出FinOps报表JSON格式
%(prog)s export finops-report.json
# 导出FinOps报表CSV格式
%(prog)s export finops-report --format csv
"""
)
parser.add_argument(
'command',
choices=['show', 'list', 'stats-model', 'stats-date', 'export'],
help='命令'
)
parser.add_argument(
'args',
nargs='*',
help='命令参数例如session-id或输出文件名'
)
parser.add_argument(
'--data-dir',
default='./sessions',
help='Session数据目录默认: ./sessions'
)
parser.add_argument(
'--limit',
type=int,
default=20,
help='list命令的结果限制默认: 20'
)
parser.add_argument(
'--sort-by',
choices=['updated', 'cost', 'tokens'],
default='updated',
help='list命令的排序方式默认: updated'
)
parser.add_argument(
'--days',
type=int,
default=30,
help='stats-date命令的天数默认: 30'
)
parser.add_argument(
'--format',
choices=['json', 'csv'],
default='json',
help='export命令的输出格式默认: json'
)
parser.add_argument(
'--no-messages',
action='store_true',
help='show命令不显示对话内容'
)
args = parser.parse_args()
try:
analyzer = SessionAnalyzer(args.data_dir)
if args.command == 'show':
if not args.args:
parser.error("show命令需要session-id参数")
session_id = args.args[0]
analyzer.display_session_detail(session_id, show_messages=not args.no_messages)
elif args.command == 'list':
analyzer.list_sessions(limit=args.limit, sort_by=args.sort_by)
elif args.command == 'stats-model':
analyzer.display_model_stats()
elif args.command == 'stats-date':
analyzer.display_date_stats(days=args.days)
elif args.command == 'export':
if not args.args:
parser.error("export命令需要输出文件名参数")
output_file = args.args[0]
analyzer.export_finops_report(output_file, format=args.format)
except FileNotFoundError as e:
print(f"❌ Error: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"❌ Unexpected error: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,755 @@
#!/usr/bin/env python3
"""
Agent Session Monitor - Web Server
提供浏览器访问的观测界面
"""
import argparse
import json
import sys
from pathlib import Path
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from collections import defaultdict
from datetime import datetime, timedelta
import re
# 添加父目录到path以导入cli模块
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from scripts.cli import SessionAnalyzer, TOKEN_PRICING
except ImportError:
# 如果导入失败,定义简单版本
TOKEN_PRICING = {
"Qwen3-rerank": {"input": 0.0003, "output": 0.0012},
"DeepSeek-R1": {"input": 0.004, "output": 0.012, "reasoning": 0.002},
}
class SessionMonitorHandler(BaseHTTPRequestHandler):
"""HTTP请求处理器"""
def __init__(self, *args, data_dir=None, **kwargs):
self.data_dir = Path(data_dir) if data_dir else Path("./sessions")
super().__init__(*args, **kwargs)
def do_GET(self):
"""处理GET请求"""
parsed_path = urlparse(self.path)
path = parsed_path.path
query = parse_qs(parsed_path.query)
if path == '/' or path == '/index.html':
self.serve_index()
elif path == '/session':
session_id = query.get('id', [None])[0]
if session_id:
self.serve_session_detail(session_id)
else:
self.send_error(400, "Missing session id")
elif path == '/api/sessions':
self.serve_api_sessions()
elif path == '/api/session':
session_id = query.get('id', [None])[0]
if session_id:
self.serve_api_session(session_id)
else:
self.send_error(400, "Missing session id")
elif path == '/api/stats':
self.serve_api_stats()
else:
self.send_error(404, "Not Found")
def serve_index(self):
"""首页 - 总览"""
html = self.generate_index_html()
self.send_html(html)
def serve_session_detail(self, session_id: str):
"""Session详情页"""
html = self.generate_session_html(session_id)
self.send_html(html)
def serve_api_sessions(self):
"""API: 获取所有session列表"""
sessions = self.load_all_sessions()
# 简化数据
data = []
for session in sessions:
data.append({
'session_id': session['session_id'],
'model': session.get('model', 'unknown'),
'messages_count': session.get('messages_count', 0),
'total_tokens': session['total_input_tokens'] + session['total_output_tokens'],
'updated_at': session.get('updated_at', ''),
'cost': self.calculate_cost(session)
})
# 按更新时间降序排序
data.sort(key=lambda x: x['updated_at'], reverse=True)
self.send_json(data)
def serve_api_session(self, session_id: str):
"""API: 获取指定session的详细数据"""
session = self.load_session(session_id)
if session:
session['cost'] = self.calculate_cost(session)
self.send_json(session)
else:
self.send_error(404, "Session not found")
def serve_api_stats(self):
"""API: 获取统计数据"""
sessions = self.load_all_sessions()
# 按模型统计
by_model = defaultdict(lambda: {
'count': 0,
'input_tokens': 0,
'output_tokens': 0,
'cost': 0.0
})
# 按日期统计
by_date = defaultdict(lambda: {
'count': 0,
'input_tokens': 0,
'output_tokens': 0,
'cost': 0.0,
'models': set()
})
total_cost = 0.0
for session in sessions:
model = session.get('model', 'unknown')
cost = self.calculate_cost(session)
total_cost += cost
# 按模型
by_model[model]['count'] += 1
by_model[model]['input_tokens'] += session['total_input_tokens']
by_model[model]['output_tokens'] += session['total_output_tokens']
by_model[model]['cost'] += cost
# 按日期
created_at = session.get('created_at', '')
date_key = created_at[:10] if len(created_at) >= 10 else 'unknown'
by_date[date_key]['count'] += 1
by_date[date_key]['input_tokens'] += session['total_input_tokens']
by_date[date_key]['output_tokens'] += session['total_output_tokens']
by_date[date_key]['cost'] += cost
by_date[date_key]['models'].add(model)
# 转换sets为lists
for date in by_date:
by_date[date]['models'] = list(by_date[date]['models'])
stats = {
'total_sessions': len(sessions),
'total_cost': total_cost,
'by_model': dict(by_model),
'by_date': dict(sorted(by_date.items(), reverse=True))
}
self.send_json(stats)
def load_session(self, session_id: str):
"""加载指定session"""
session_file = self.data_dir / f"{session_id}.json"
if session_file.exists():
with open(session_file, 'r', encoding='utf-8') as f:
return json.load(f)
return None
def load_all_sessions(self):
"""加载所有session"""
sessions = []
for session_file in self.data_dir.glob("*.json"):
try:
with open(session_file, 'r', encoding='utf-8') as f:
sessions.append(json.load(f))
except Exception as e:
print(f"Warning: Failed to load {session_file}: {e}", file=sys.stderr)
return sessions
def calculate_cost(self, session: dict) -> float:
"""计算session成本"""
model = session.get('model', 'unknown')
pricing = TOKEN_PRICING.get(model, TOKEN_PRICING.get("GPT-4", {"input": 0.003, "output": 0.006}))
input_tokens = session['total_input_tokens']
output_tokens = session['total_output_tokens']
reasoning_tokens = session.get('total_reasoning_tokens', 0)
cached_tokens = session.get('total_cached_tokens', 0)
# 区分regular input和cached input
regular_input_tokens = input_tokens - cached_tokens
input_cost = regular_input_tokens * pricing.get('input', 0) / 1000000
output_cost = output_tokens * pricing.get('output', 0) / 1000000
reasoning_cost = 0
if 'reasoning' in pricing and reasoning_tokens > 0:
reasoning_cost = reasoning_tokens * pricing['reasoning'] / 1000000
cached_cost = 0
if 'cached' in pricing and cached_tokens > 0:
cached_cost = cached_tokens * pricing['cached'] / 1000000
return input_cost + output_cost + reasoning_cost + cached_cost
def send_html(self, html: str):
"""发送HTML响应"""
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(html.encode('utf-8'))
def send_json(self, data):
"""发送JSON响应"""
self.send_response(200)
self.send_header('Content-type', 'application/json; charset=utf-8')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8'))
def generate_index_html(self) -> str:
"""生成首页HTML"""
return '''<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Agent Session Monitor</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
background: #f5f5f5;
padding: 20px;
}
.container { max-width: 1400px; margin: 0 auto; }
header {
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
h1 { color: #333; margin-bottom: 10px; }
.subtitle { color: #666; font-size: 14px; }
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 20px;
margin-bottom: 20px;
}
.stat-card {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
.stat-label { color: #666; font-size: 14px; margin-bottom: 8px; }
.stat-value { color: #333; font-size: 32px; font-weight: bold; }
.stat-unit { color: #999; font-size: 16px; margin-left: 4px; }
.section {
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
h2 { color: #333; margin-bottom: 20px; font-size: 20px; }
table { width: 100%; border-collapse: collapse; }
thead { background: #f8f9fa; }
th, td { padding: 12px; text-align: left; border-bottom: 1px solid #e9ecef; }
th { font-weight: 600; color: #666; font-size: 14px; }
td { color: #333; }
tbody tr:hover { background: #f8f9fa; }
.session-link {
color: #007bff;
text-decoration: none;
font-family: monospace;
font-size: 13px;
}
.session-link:hover { text-decoration: underline; }
.badge {
display: inline-block;
padding: 4px 8px;
border-radius: 4px;
font-size: 12px;
font-weight: 500;
}
.badge-qwen { background: #e3f2fd; color: #1976d2; }
.badge-deepseek { background: #f3e5f5; color: #7b1fa2; }
.badge-gpt { background: #e8f5e9; color: #388e3c; }
.badge-claude { background: #fff3e0; color: #f57c00; }
.loading { text-align: center; padding: 40px; color: #666; }
.error { color: #d32f2f; padding: 20px; }
.refresh-btn {
background: #007bff;
color: white;
border: none;
padding: 10px 20px;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
}
.refresh-btn:hover { background: #0056b3; }
</style>
</head>
<body>
<div class="container">
<header>
<h1>🔍 Agent Session Monitor</h1>
<p class="subtitle">实时观测Clawdbot对话过程和Token开销</p>
</header>
<div class="stats-grid" id="stats-grid">
<div class="stat-card">
<div class="stat-label">总会话数</div>
<div class="stat-value">-</div>
</div>
<div class="stat-card">
<div class="stat-label">总Token消耗</div>
<div class="stat-value">-</div>
</div>
<div class="stat-card">
<div class="stat-label">总成本</div>
<div class="stat-value">-</div>
</div>
</div>
<div class="section">
<h2>📊 最近会话</h2>
<button class="refresh-btn" onclick="loadSessions()">🔄 刷新</button>
<div id="sessions-table">
<div class="loading">加载中...</div>
</div>
</div>
<div class="section">
<h2>📈 按模型统计</h2>
<div id="model-stats">
<div class="loading">加载中...</div>
</div>
</div>
</div>
<script>
function loadSessions() {
fetch('/api/sessions')
.then(r => r.json())
.then(sessions => {
const html = `
<table>
<thead>
<tr>
<th>Session ID</th>
<th>模型</th>
<th>消息数</th>
<th>总Token</th>
<th>成本</th>
<th>更新时间</th>
</tr>
</thead>
<tbody>
${sessions.slice(0, 50).map(s => `
<tr>
<td><a href="/session?id=${encodeURIComponent(s.session_id)}" class="session-link">${s.session_id}</a></td>
<td>${getModelBadge(s.model)}</td>
<td>${s.messages_count}</td>
<td>${s.total_tokens.toLocaleString()}</td>
<td>$${s.cost.toFixed(6)}</td>
<td>${new Date(s.updated_at).toLocaleString()}</td>
</tr>
`).join('')}
</tbody>
</table>
`;
document.getElementById('sessions-table').innerHTML = html;
})
.catch(err => {
document.getElementById('sessions-table').innerHTML = `<div class="error">加载失败: ${err.message}</div>`;
});
}
function loadStats() {
fetch('/api/stats')
.then(r => r.json())
.then(stats => {
// 更新顶部统计卡片
const cards = document.querySelectorAll('.stat-card');
cards[0].querySelector('.stat-value').textContent = stats.total_sessions;
const totalTokens = Object.values(stats.by_model).reduce((sum, m) => sum + m.input_tokens + m.output_tokens, 0);
cards[1].querySelector('.stat-value').innerHTML = totalTokens.toLocaleString() + '<span class="stat-unit">tokens</span>';
cards[2].querySelector('.stat-value').innerHTML = '$' + stats.total_cost.toFixed(4);
// 模型统计表格
const modelHtml = `
<table>
<thead>
<tr>
<th>模型</th>
<th>会话数</th>
<th>输入Token</th>
<th>输出Token</th>
<th>成本</th>
</tr>
</thead>
<tbody>
${Object.entries(stats.by_model).map(([model, data]) => `
<tr>
<td>${getModelBadge(model)}</td>
<td>${data.count}</td>
<td>${data.input_tokens.toLocaleString()}</td>
<td>${data.output_tokens.toLocaleString()}</td>
<td>$${data.cost.toFixed(6)}</td>
</tr>
`).join('')}
</tbody>
</table>
`;
document.getElementById('model-stats').innerHTML = modelHtml;
})
.catch(err => {
console.error('Failed to load stats:', err);
});
}
function getModelBadge(model) {
let cls = 'badge';
if (model.includes('Qwen')) cls += ' badge-qwen';
else if (model.includes('DeepSeek')) cls += ' badge-deepseek';
else if (model.includes('GPT')) cls += ' badge-gpt';
else if (model.includes('Claude')) cls += ' badge-claude';
return `<span class="${cls}">${model}</span>`;
}
// 初始加载
loadSessions();
loadStats();
// 每30秒自动刷新
setInterval(() => {
loadSessions();
loadStats();
}, 30000);
</script>
</body>
</html>'''
def generate_session_html(self, session_id: str) -> str:
"""生成Session详情页HTML"""
session = self.load_session(session_id)
if not session:
return f'<html><body><h1>Session not found: {session_id}</h1></body></html>'
cost = self.calculate_cost(session)
# 生成对话轮次HTML
rounds_html = []
for r in session.get('rounds', []):
messages_html = ''
if r.get('messages'):
messages_html = '<div class="messages">'
for msg in r['messages'][-5:]: # 最多显示5条
role = msg.get('role', 'unknown')
content = msg.get('content', '')
messages_html += f'<div class="message message-{role}"><strong>[{role}]</strong> {self.escape_html(content)}</div>'
messages_html += '</div>'
tool_calls_html = ''
if r.get('tool_calls'):
tool_calls_html = '<div class="tool-calls"><strong>🛠️ Tool Calls:</strong><ul>'
for tc in r['tool_calls']:
func_name = tc.get('function', {}).get('name', 'unknown')
tool_calls_html += f'<li>{func_name}()</li>'
tool_calls_html += '</ul></div>'
# Token详情显示
token_details_html = ''
if r.get('input_token_details') or r.get('output_token_details'):
token_details_html = '<div class="token-details"><strong>📊 Token Details:</strong><ul>'
if r.get('input_token_details'):
token_details_html += f'<li>Input: {r["input_token_details"]}</li>'
if r.get('output_token_details'):
token_details_html += f'<li>Output: {r["output_token_details"]}</li>'
token_details_html += '</ul></div>'
# Token类型标签
token_badges = ''
if r.get('cached_tokens', 0) > 0:
token_badges += f' <span class="token-badge token-badge-cached">📦 {r["cached_tokens"]:,} cached</span>'
if r.get('reasoning_tokens', 0) > 0:
token_badges += f' <span class="token-badge token-badge-reasoning">🧠 {r["reasoning_tokens"]:,} reasoning</span>'
rounds_html.append(f'''
<div class="round">
<div class="round-header">
<span class="round-number">Round {r['round']}</span>
<span class="round-time">{r['timestamp']}</span>
<span class="round-tokens">{r['input_tokens']:,} in → {r['output_tokens']:,} out{token_badges}</span>
</div>
{messages_html}
{f'<div class="question"><strong>❓ Question:</strong> {self.escape_html(r.get("question", ""))}</div>' if r.get('question') else ''}
{f'<div class="answer"><strong>✅ Answer:</strong> {self.escape_html(r.get("answer", ""))}</div>' if r.get('answer') else ''}
{f'<div class="reasoning"><strong>🧠 Reasoning:</strong> {self.escape_html(r.get("reasoning", ""))}</div>' if r.get('reasoning') else ''}
{tool_calls_html}
{token_details_html}
</div>
''')
return f'''<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{session_id} - Session Monitor</title>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
background: #f5f5f5;
padding: 20px;
}}
.container {{ max-width: 1200px; margin: 0 auto; }}
header {{
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
margin-bottom: 20px;
}}
h1 {{ color: #333; margin-bottom: 10px; font-size: 24px; }}
.back-link {{ color: #007bff; text-decoration: none; margin-bottom: 10px; display: inline-block; }}
.back-link:hover {{ text-decoration: underline; }}
.info-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-top: 20px;
}}
.info-item {{ padding: 10px 0; }}
.info-label {{ color: #666; font-size: 14px; }}
.info-value {{ color: #333; font-size: 18px; font-weight: 600; margin-top: 4px; }}
.section {{
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
margin-bottom: 20px;
}}
h2 {{ color: #333; margin-bottom: 20px; font-size: 20px; }}
.round {{
border-left: 3px solid #007bff;
padding: 20px;
margin-bottom: 20px;
background: #f8f9fa;
border-radius: 4px;
}}
.round-header {{
display: flex;
justify-content: space-between;
margin-bottom: 15px;
font-size: 14px;
}}
.round-number {{ font-weight: 600; color: #007bff; }}
.round-time {{ color: #666; }}
.round-tokens {{ color: #333; }}
.messages {{ margin: 15px 0; }}
.message {{
padding: 10px;
margin: 5px 0;
border-radius: 4px;
font-size: 14px;
line-height: 1.6;
}}
.message-system {{ background: #fff3cd; }}
.message-user {{ background: #d1ecf1; }}
.message-assistant {{ background: #d4edda; }}
.message-tool {{ background: #e2e3e5; }}
.question, .answer, .reasoning, .tool-calls {{
margin: 10px 0;
padding: 10px;
background: white;
border-radius: 4px;
font-size: 14px;
line-height: 1.6;
}}
.question {{ border-left: 3px solid #ffc107; }}
.answer {{ border-left: 3px solid #28a745; }}
.reasoning {{ border-left: 3px solid #17a2b8; }}
.tool-calls {{ border-left: 3px solid #6c757d; }}
.tool-calls ul {{ margin-left: 20px; margin-top: 5px; }}
.token-details {{
margin: 10px 0;
padding: 10px;
background: white;
border-radius: 4px;
font-size: 13px;
border-left: 3px solid #17a2b8;
}}
.token-details ul {{ margin-left: 20px; margin-top: 5px; color: #666; }}
.token-badge {{
display: inline-block;
padding: 2px 6px;
border-radius: 3px;
font-size: 11px;
margin-left: 5px;
}}
.token-badge-cached {{
background: #d4edda;
color: #155724;
}}
.token-badge-reasoning {{
background: #cce5ff;
color: #004085;
}}
.badge {{
display: inline-block;
padding: 4px 8px;
border-radius: 4px;
font-size: 12px;
font-weight: 500;
background: #e3f2fd;
color: #1976d2;
}}
</style>
</head>
<body>
<div class="container">
<header>
<a href="/" class="back-link">← 返回列表</a>
<h1>📊 Session Detail</h1>
<p style="color: #666; font-family: monospace; font-size: 14px; margin-top: 10px;">{session_id}</p>
<div class="info-grid">
<div class="info-item">
<div class="info-label">模型</div>
<div class="info-value"><span class="badge">{session.get('model', 'unknown')}</span></div>
</div>
<div class="info-item">
<div class="info-label">消息数</div>
<div class="info-value">{session.get('messages_count', 0)}</div>
</div>
<div class="info-item">
<div class="info-label">总Token</div>
<div class="info-value">{session['total_input_tokens'] + session['total_output_tokens']:,}</div>
</div>
<div class="info-item">
<div class="info-label">成本</div>
<div class="info-value">${cost:.6f}</div>
</div>
</div>
</header>
<div class="section">
<h2>💬 对话记录 ({len(session.get('rounds', []))} 轮)</h2>
{"".join(rounds_html) if rounds_html else '<p style="color: #666;">暂无对话记录</p>'}
</div>
</div>
</body>
</html>'''
def escape_html(self, text: str) -> str:
"""转义HTML特殊字符"""
return (text.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
.replace('"', '&quot;')
.replace("'", '&#39;'))
def log_message(self, format, *args):
"""重写日志方法,简化输出"""
pass # 不打印每个请求
def create_handler(data_dir):
"""创建带数据目录的处理器"""
def handler(*args, **kwargs):
return SessionMonitorHandler(*args, data_dir=data_dir, **kwargs)
return handler
def main():
parser = argparse.ArgumentParser(
description="Agent Session Monitor - Web Server",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--data-dir',
default='./sessions',
help='Session数据目录默认: ./sessions'
)
parser.add_argument(
'--port',
type=int,
default=8888,
help='HTTP服务器端口默认: 8888'
)
parser.add_argument(
'--host',
default='0.0.0.0',
help='HTTP服务器地址默认: 0.0.0.0'
)
args = parser.parse_args()
# 检查数据目录是否存在
data_dir = Path(args.data_dir)
if not data_dir.exists():
print(f"❌ Error: Data directory not found: {data_dir}")
print(f" Please run main.py first to generate session data.")
sys.exit(1)
# 创建HTTP服务器
handler_class = create_handler(args.data_dir)
server = HTTPServer((args.host, args.port), handler_class)
print(f"{'=' * 60}")
print(f"🌐 Agent Session Monitor - Web Server")
print(f"{'=' * 60}")
print()
print(f"📂 Data directory: {args.data_dir}")
print(f"🌍 Server address: http://{args.host}:{args.port}")
print()
print(f"✅ Server started. Press Ctrl+C to stop.")
print(f"{'=' * 60}")
print()
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n\n👋 Shutting down server...")
server.shutdown()
if __name__ == '__main__':
main()