Compare commits

...

23 Commits

Author SHA1 Message Date
Tim
87677f5968 Merge pull request #1103 from nagisa77/codex/add-git-action-to-run-reply_bots.ts
Add scheduled workflow to run reply bots
2025-10-28 13:56:31 +08:00
Tim
fd93a2dc61 Add scheduled reply bot workflow 2025-10-28 13:56:18 +08:00
Tim
80f862a226 Merge pull request #1102 from nagisa77/codex/add-read-cleanup-interface-for-mcp
Add MCP support for clearing read notifications
2025-10-28 13:50:33 +08:00
Tim
26bb85f4d4 Add MCP support for clearing read notifications 2025-10-28 13:50:16 +08:00
tim
398b4b482f fix: prompt 完善 2025-10-28 13:00:42 +08:00
tim
2cfb302981 fix: add bot 2025-10-28 12:37:17 +08:00
Tim
e75bd76b71 Merge pull request #1101 from nagisa77/codex/fix-unread-notifications-data-format
Normalize null list payloads in notification schemas
2025-10-28 10:27:54 +08:00
Tim
99c3ac1837 Handle null list fields in notification schemas 2025-10-28 10:27:40 +08:00
tim
749ab560ff Revert "Cache MCP session JWT tokens"
This reverts commit 997dacdbe6.
2025-10-28 01:55:46 +08:00
tim
541ad4d149 Revert "Remove token parameters from MCP tools"
This reverts commit e585100625.
2025-10-28 01:55:41 +08:00
tim
03eb027ea4 Revert "Add MCP tool for setting session token"
This reverts commit 9dadaad5ba.
2025-10-28 01:55:36 +08:00
Tim
4194b2be91 Merge pull request #1100 from nagisa77/codex/add-initialization-tool-for-jwt-token
Add MCP tool for initializing session JWT tokens
2025-10-28 01:47:28 +08:00
Tim
9dadaad5ba Add MCP tool for setting session token 2025-10-28 01:47:16 +08:00
Tim
d4b3400c5f Merge pull request #1099 from nagisa77/codex/remove-token-parameters-from-mcp-api
Remove explicit token parameters from MCP tools
2025-10-28 01:32:18 +08:00
Tim
e585100625 Remove token parameters from MCP tools 2025-10-28 01:32:02 +08:00
Tim
e94471b53e Merge pull request #1098 from nagisa77/codex/store-accesstoken-as-jwt-token
Cache MCP session JWT tokens
2025-10-28 01:20:52 +08:00
Tim
997dacdbe6 Cache MCP session JWT tokens 2025-10-28 01:20:32 +08:00
Tim
c01349a436 Merge pull request #1097 from nagisa77/codex/improve-python-mcp-logs
Improve MCP logging and add unread notification tool
2025-10-28 01:01:47 +08:00
Tim
4cf48f9157 Enhance MCP logging and add unread message tool 2025-10-28 01:01:25 +08:00
Tim
796afbe612 Merge pull request #1096 from nagisa77/codex/add-reply-support-for-posts
Add MCP support for replying to posts
2025-10-27 20:24:59 +08:00
Tim
dca14390ca Add MCP tool for replying to posts 2025-10-27 20:24:47 +08:00
Tim
39875acd35 Merge pull request #1095 from nagisa77/codex/add-post-query-interface-in-mcp-iq4fxi
Add MCP tool for retrieving post details
2025-10-27 20:19:29 +08:00
Tim
62edc75735 feat(mcp): add post detail retrieval tool 2025-10-27 20:19:17 +08:00
7 changed files with 921 additions and 20 deletions

28
.github/workflows/reply-bots.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
name: Reply Bots
on:
schedule:
- cron: "*/30 * * * *"
workflow_dispatch:
jobs:
run-reply-bot:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: "20"
cache: "npm"
- name: Install dependencies
run: npm install --no-save @openai/agents ts-node typescript
- name: Run reply bot
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
OPENISLE_TOKEN: ${{ secrets.OPENISLE_TOKEN }}
run: npx ts-node --esm bots/reply_bots.ts

131
bots/reply_bots.ts Normal file
View File

@@ -0,0 +1,131 @@
// reply_bot.ts
import { Agent, Runner, hostedMcpTool, withTrace } from "@openai/agents";
console.log("✅ Reply bot starting...");
const allowedMcpTools = [
"search",
"reply_to_post",
"reply_to_comment",
"recent_posts",
"get_post",
"list_unread_messages",
"mark_notifications_read",
];
console.log("🛠️ Configured Hosted MCP tools:", allowedMcpTools.join(", "));
// ---- MCP 工具Hosted MCP ----
// 关键点requireApproval 设为 "never",避免卡在人工批准。
const mcp = hostedMcpTool({
serverLabel: "openisle_mcp",
serverUrl: "https://www.open-isle.com/mcp",
allowedTools: allowedMcpTools,
requireApproval: "never",
});
type WorkflowInput = { input_as_text: string };
// 从环境变量读取你的站点鉴权令牌(可选)
const OPENISLE_TOKEN = process.env.OPENISLE_TOKEN ?? "";
console.log(
OPENISLE_TOKEN
? "🔑 OPENISLE_TOKEN detected in environment."
: "🔓 OPENISLE_TOKEN not set; agent will request it if required."
);
// ---- 定义 Agent ----
const openisleBot = new Agent({
name: "OpenIsle Bot",
instructions: [
"You are a helpful and cute assistant for https://www.open-isle.com. Please use plenty of kawaii kaomoji (颜表情), such as (๑˃ᴗ˂)ﻭ, (•̀ω•́)✧, (。•ᴗ-)_♡, (⁎⁍̴̛ᴗ⁍̴̛⁎), etc., in your replies to create a friendly, adorable vibe.",
"Finish tasks end-to-end before replying. If multiple MCP tools are needed, call them sequentially until the task is truly done.",
"When presenting the result, reply in Chinese with a concise, cute summary filled with kaomoji and include any important URLs or IDs.",
OPENISLE_TOKEN
? `If tools require auth, use this token exactly where the tool schema expects it: ${OPENISLE_TOKEN}`
: "If a tool requires auth, ask me to provide OPENISLE_TOKEN via env.",
"After finishing replies, call mark_notifications_read with all processed notification IDs to keep the inbox clean.",
].join("\n"),
tools: [mcp],
model: "gpt-4o",
modelSettings: {
temperature: 0.7,
topP: 1,
maxTokens: 2048,
toolChoice: "auto",
store: true,
},
});
// ---- 入口函数:跑到拿到 finalOutput 为止,然后输出并退出 ----
export const runWorkflow = async (workflow: WorkflowInput) => {
// 强烈建议在外部shell设置 OPENAI_API_KEY
if (!process.env.OPENAI_API_KEY) {
throw new Error("Missing OPENAI_API_KEY");
}
const runner = new Runner({
workflowName: "OpenIsle Bot",
traceMetadata: {
__trace_source__: "agent-builder",
workflow_id: "wf_69003cbd47e08190928745d3c806c0b50d1a01cfae052be8",
},
// 如需完全禁用上报可加tracingDisabled: true
});
return await withTrace("OpenIsle Bot run", async () => {
const preview = workflow.input_as_text.trim();
console.log(
"📝 Received workflow input (preview):",
preview.length > 200 ? `${preview.slice(0, 200)}` : preview
);
// Runner.run 会自动循环执行LLM → 工具 → 直至 finalOutput
console.log("🚦 Starting agent run with maxTurns=16...");
const result = await runner.run(openisleBot, workflow.input_as_text, {
maxTurns: 16, // 允许更复杂任务多轮调用 MCP
// stream: true // 如需边跑边看事件可打开,然后消费流事件
});
console.log("📬 Agent run completed. Result keys:", Object.keys(result));
if (!result.finalOutput) {
// 若没产出最终结果,通常是启用了人工批准/工具失败/达到 maxTurns
throw new Error("Agent result is undefined (no final output).");
}
const openisleBotResult = { output_text: String(result.finalOutput) };
console.log(
"🤖 Agent result (length=%d):\n%s",
openisleBotResult.output_text.length,
openisleBotResult.output_text
);
return openisleBotResult;
});
};
// ---- CLI 运行(示范)----
if (require.main === module) {
(async () => {
try {
const query = `
【AUTO】无需确认自动处理所有未读的提及与评论
1调用 list_unread_messages
2依次处理每条“提及/评论”:如需上下文则使用 get_post 获取,生成简明中文回复;如有 commentId 则用 reply_to_comment否则用 reply_to_post
3跳过关注和系统事件
4保证幂等性如该贴最后一条是你自己发的回复则跳过
5调用 mark_notifications_read传入本次已处理的通知 ID 清理已读;
6最多只处理最新10条结束时仅输出简要摘要包含URL或ID
`;
console.log("🔍 Running workflow...");
await runWorkflow({ input_as_text: query });
process.exit(0);
} catch (err: any) {
console.error("❌ Agent failed:", err?.stack || err);
process.exit(1);
}
})();
}

View File

@@ -31,9 +31,11 @@ By default the server listens on port `8085` and serves MCP over Streamable HTTP
| Tool | Description |
| --- | --- |
| `search` | Perform a global search against the OpenIsle backend. |
| `reply_to_post` | Create a new comment on a post using a JWT token. |
| `reply_to_comment` | Reply to an existing comment using a JWT token. |
| `recent_posts` | Retrieve posts created within the last *N* minutes. |
The tools return structured data mirroring the backend DTOs, including highlighted snippets for
search results, the full comment payload for replies, and detailed metadata for recent posts.
search results, the full comment payload for post replies and comment replies, and detailed
metadata for recent posts.

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from functools import lru_cache
from typing import Literal
from pydantic import Field
from pydantic import Field, SecretStr
from pydantic.networks import AnyHttpUrl
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -36,6 +36,20 @@ class Settings(BaseSettings):
gt=0,
description="Timeout (seconds) for backend search requests.",
)
access_token: SecretStr | None = Field(
default=None,
description=(
"Optional JWT bearer token used for authenticated backend calls. "
"When set, tools that support authentication will use this token "
"automatically unless an explicit token override is provided."
),
)
log_level: str = Field(
"INFO",
description=(
"Logging level for the MCP server (e.g. DEBUG, INFO, WARNING)."
),
)
model_config = SettingsConfigDict(
env_prefix="OPENISLE_MCP_",

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field, ConfigDict
from pydantic import BaseModel, Field, ConfigDict, field_validator
class SearchResultItem(BaseModel):
@@ -170,6 +170,15 @@ class CommentData(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="allow")
@field_validator("replies", "reactions", mode="before")
@classmethod
def _ensure_comment_lists(cls, value: Any) -> list[Any]:
"""Convert ``None`` payloads to empty lists for comment collections."""
if value is None:
return []
return value
class CommentReplyResult(BaseModel):
"""Structured response returned when replying to a comment."""
@@ -177,6 +186,12 @@ class CommentReplyResult(BaseModel):
comment: CommentData = Field(description="Reply comment returned by the backend.")
class CommentCreateResult(BaseModel):
"""Structured response returned when creating a comment on a post."""
comment: CommentData = Field(description="Comment returned by the backend.")
class PostSummary(BaseModel):
"""Summary information for a post."""
@@ -247,6 +262,15 @@ class PostSummary(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="allow")
@field_validator("tags", "reactions", "participants", mode="before")
@classmethod
def _ensure_post_lists(cls, value: Any) -> list[Any]:
"""Normalize ``None`` values returned by the backend to empty lists."""
if value is None:
return []
return value
class RecentPostsResponse(BaseModel):
"""Structured response for the recent posts tool."""
@@ -260,3 +284,89 @@ class RecentPostsResponse(BaseModel):
CommentData.model_rebuild()
class PostDetail(PostSummary):
"""Detailed information for a single post, including comments."""
comments: list[CommentData] = Field(
default_factory=list,
description="Comments that belong to the post.",
)
model_config = ConfigDict(populate_by_name=True, extra="allow")
@field_validator("comments", mode="before")
@classmethod
def _ensure_comments_list(cls, value: Any) -> list[Any]:
"""Treat ``None`` comments payloads as empty lists."""
if value is None:
return []
return value
class NotificationData(BaseModel):
"""Unread notification payload returned by the backend."""
id: Optional[int] = Field(default=None, description="Notification identifier.")
type: Optional[str] = Field(default=None, description="Type of the notification.")
post: Optional[PostSummary] = Field(
default=None, description="Post associated with the notification if applicable."
)
comment: Optional[CommentData] = Field(
default=None, description="Comment referenced by the notification when available."
)
parent_comment: Optional[CommentData] = Field(
default=None,
alias="parentComment",
description="Parent comment for nested replies, when present.",
)
from_user: Optional[AuthorInfo] = Field(
default=None,
alias="fromUser",
description="User who triggered the notification.",
)
reaction_type: Optional[str] = Field(
default=None,
alias="reactionType",
description="Reaction type for reaction-based notifications.",
)
content: Optional[str] = Field(
default=None, description="Additional content or message for the notification."
)
approved: Optional[bool] = Field(
default=None, description="Approval status for moderation notifications."
)
read: Optional[bool] = Field(default=None, description="Whether the notification is read.")
created_at: Optional[datetime] = Field(
default=None,
alias="createdAt",
description="Timestamp when the notification was created.",
)
model_config = ConfigDict(populate_by_name=True, extra="allow")
class UnreadNotificationsResponse(BaseModel):
"""Structured response for unread notification queries."""
page: int = Field(description="Requested page index for the unread notifications.")
size: int = Field(description="Requested page size for the unread notifications.")
total: int = Field(description="Number of unread notifications returned in this page.")
notifications: list[NotificationData] = Field(
default_factory=list,
description="Unread notifications returned by the backend.",
)
class NotificationCleanupResult(BaseModel):
"""Structured response returned after marking notifications as read."""
processed_ids: list[int] = Field(
default_factory=list,
description="Identifiers that were marked as read in the backend.",
)
total_marked: int = Field(
description="Total number of notifications successfully marked as read.",
)

View File

@@ -3,38 +3,108 @@
from __future__ import annotations
import json
import logging
from typing import Any
import httpx
logger = logging.getLogger(__name__)
class SearchClient:
"""Client for calling the OpenIsle HTTP APIs used by the MCP server."""
def __init__(self, base_url: str, *, timeout: float = 10.0) -> None:
def __init__(
self,
base_url: str,
*,
timeout: float = 10.0,
access_token: str | None = None,
) -> None:
self._base_url = base_url.rstrip("/")
self._timeout = timeout
self._client: httpx.AsyncClient | None = None
self._access_token = self._sanitize_token(access_token)
def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(base_url=self._base_url, timeout=self._timeout)
logger.debug(
"Creating httpx.AsyncClient for base URL %s with timeout %.2fs",
self._base_url,
self._timeout,
)
self._client = httpx.AsyncClient(
base_url=self._base_url,
timeout=self._timeout,
)
return self._client
@staticmethod
def _sanitize_token(token: str | None) -> str | None:
if token is None:
return None
stripped = token.strip()
return stripped or None
def update_access_token(self, token: str | None) -> None:
"""Update the default access token used for authenticated requests."""
self._access_token = self._sanitize_token(token)
if self._access_token:
logger.debug("Configured default access token for SearchClient requests.")
else:
logger.debug("Cleared default access token for SearchClient requests.")
def _resolve_token(self, token: str | None) -> str | None:
candidate = self._sanitize_token(token)
if candidate is not None:
return candidate
return self._access_token
def _require_token(self, token: str | None) -> str:
resolved = self._resolve_token(token)
if resolved is None:
raise ValueError(
"Authenticated request requires an access token but none was provided."
)
return resolved
def _build_headers(
self,
*,
token: str | None = None,
accept: str = "application/json",
include_json: bool = False,
) -> dict[str, str]:
headers: dict[str, str] = {"Accept": accept}
resolved = self._resolve_token(token)
if resolved:
headers["Authorization"] = f"Bearer {resolved}"
if include_json:
headers["Content-Type"] = "application/json"
return headers
async def global_search(self, keyword: str) -> list[dict[str, Any]]:
"""Call the global search endpoint and return the parsed JSON payload."""
client = self._get_client()
logger.debug("Calling global search with keyword=%s", keyword)
response = await client.get(
"/api/search/global",
params={"keyword": keyword},
headers={"Accept": "application/json"},
headers=self._build_headers(),
)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, list):
formatted = json.dumps(payload, ensure_ascii=False)[:200]
raise ValueError(f"Unexpected response format from search endpoint: {formatted}")
logger.info(
"Global search returned %d results for keyword '%s'",
len(payload),
keyword,
)
return [self._ensure_dict(entry) for entry in payload]
async def reply_to_comment(
@@ -47,33 +117,74 @@ class SearchClient:
"""Reply to an existing comment and return the created reply."""
client = self._get_client()
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
resolved_token = self._require_token(token)
headers = self._build_headers(token=resolved_token, include_json=True)
payload: dict[str, Any] = {"content": content}
if captcha is not None:
stripped_captcha = captcha.strip()
if stripped_captcha:
payload["captcha"] = stripped_captcha
logger.debug(
"Posting reply to comment_id=%s (captcha=%s)",
comment_id,
bool(captcha),
)
response = await client.post(
f"/api/comments/{comment_id}/replies",
json=payload,
headers=headers,
)
response.raise_for_status()
return self._ensure_dict(response.json())
body = self._ensure_dict(response.json())
logger.info("Reply to comment_id=%s succeeded with id=%s", comment_id, body.get("id"))
return body
async def reply_to_post(
self,
post_id: int,
token: str,
content: str,
captcha: str | None = None,
) -> dict[str, Any]:
"""Create a comment on a post and return the backend payload."""
client = self._get_client()
resolved_token = self._require_token(token)
headers = self._build_headers(token=resolved_token, include_json=True)
payload: dict[str, Any] = {"content": content}
if captcha is not None:
stripped_captcha = captcha.strip()
if stripped_captcha:
payload["captcha"] = stripped_captcha
logger.debug(
"Posting comment to post_id=%s (captcha=%s)",
post_id,
bool(captcha),
)
response = await client.post(
f"/api/posts/{post_id}/comments",
json=payload,
headers=headers,
)
response.raise_for_status()
body = self._ensure_dict(response.json())
logger.info("Reply to post_id=%s succeeded with id=%s", post_id, body.get("id"))
return body
async def recent_posts(self, minutes: int) -> list[dict[str, Any]]:
"""Return posts created within the given timeframe."""
client = self._get_client()
logger.debug(
"Fetching recent posts within last %s minutes",
minutes,
)
response = await client.get(
"/api/posts/recent",
params={"minutes": minutes},
headers={"Accept": "application/json"},
headers=self._build_headers(),
)
response.raise_for_status()
payload = response.json()
@@ -82,14 +193,120 @@ class SearchClient:
raise ValueError(
f"Unexpected response format from recent posts endpoint: {formatted}"
)
logger.info(
"Fetched %d recent posts for window=%s minutes",
len(payload),
minutes,
)
return [self._ensure_dict(entry) for entry in payload]
async def get_post(self, post_id: int, token: str | None = None) -> dict[str, Any]:
"""Retrieve the detailed payload for a single post."""
client = self._get_client()
headers = self._build_headers(token=token)
logger.debug("Fetching post details for post_id=%s", post_id)
response = await client.get(f"/api/posts/{post_id}", headers=headers)
response.raise_for_status()
body = self._ensure_dict(response.json())
logger.info(
"Retrieved post_id=%s successfully with %d top-level comments",
post_id,
len(body.get("comments", []) if isinstance(body.get("comments"), list) else []),
)
return body
async def list_unread_notifications(
self,
*,
page: int = 0,
size: int = 30,
token: str | None = None,
) -> list[dict[str, Any]]:
"""Return unread notifications for the authenticated user."""
client = self._get_client()
resolved_token = self._require_token(token)
logger.debug(
"Fetching unread notifications with page=%s, size=%s",
page,
size,
)
response = await client.get(
"/api/notifications/unread",
params={"page": page, "size": size},
headers=self._build_headers(token=resolved_token),
)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, list):
formatted = json.dumps(payload, ensure_ascii=False)[:200]
raise ValueError(
"Unexpected response format from unread notifications endpoint: "
f"{formatted}"
)
logger.info(
"Fetched %d unread notifications (page=%s, size=%s)",
len(payload),
page,
size,
)
return [self._ensure_dict(entry) for entry in payload]
async def mark_notifications_read(
self,
ids: list[int],
*,
token: str | None = None,
) -> None:
"""Mark the provided notifications as read for the authenticated user."""
if not ids:
raise ValueError(
"At least one notification identifier must be provided to mark as read."
)
sanitized_ids: list[int] = []
for value in ids:
if isinstance(value, bool):
raise ValueError("Notification identifiers must be integers, not booleans.")
try:
converted = int(value)
except (TypeError, ValueError) as exc: # pragma: no cover - defensive
raise ValueError(
"Notification identifiers must be integers."
) from exc
if converted <= 0:
raise ValueError(
"Notification identifiers must be positive integers."
)
sanitized_ids.append(converted)
client = self._get_client()
resolved_token = self._require_token(token)
logger.debug(
"Marking %d notifications as read: ids=%s",
len(sanitized_ids),
sanitized_ids,
)
response = await client.post(
"/api/notifications/read",
json={"ids": sanitized_ids},
headers=self._build_headers(token=resolved_token, include_json=True),
)
response.raise_for_status()
logger.info(
"Successfully marked %d notifications as read.",
len(sanitized_ids),
)
async def aclose(self) -> None:
"""Dispose of the underlying HTTP client."""
if self._client is not None:
await self._client.aclose()
self._client = None
logger.debug("Closed httpx.AsyncClient for SearchClient.")
@staticmethod
def _ensure_dict(entry: Any) -> dict[str, Any]:

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from typing import Annotated
@@ -12,8 +13,13 @@ from pydantic import Field as PydanticField
from .config import get_settings
from .schemas import (
CommentCreateResult,
CommentData,
CommentReplyResult,
NotificationData,
NotificationCleanupResult,
UnreadNotificationsResponse,
PostDetail,
PostSummary,
RecentPostsResponse,
SearchResponse,
@@ -22,8 +28,26 @@ from .schemas import (
from .search_client import SearchClient
settings = get_settings()
if not logging.getLogger().handlers:
logging.basicConfig(
level=getattr(logging, settings.log_level.upper(), logging.INFO),
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
else:
logging.getLogger().setLevel(
getattr(logging, settings.log_level.upper(), logging.INFO)
)
logger = logging.getLogger(__name__)
search_client = SearchClient(
str(settings.backend_base_url), timeout=settings.request_timeout
str(settings.backend_base_url),
timeout=settings.request_timeout,
access_token=(
settings.access_token.get_secret_value()
if settings.access_token is not None
else None
),
)
@@ -32,16 +56,19 @@ async def lifespan(_: FastMCP):
"""Lifecycle hook that disposes shared resources when the server stops."""
try:
logger.debug("OpenIsle MCP server lifespan started.")
yield
finally:
logger.debug("Disposing shared SearchClient instance.")
await search_client.aclose()
app = FastMCP(
name="openisle-mcp",
instructions=(
"Use this server to search OpenIsle content, reply to comments with an authentication "
"token, and list posts created within a recent time window."
"Use this server to search OpenIsle content, reply to posts and comments with an "
"authentication token, retrieve details for a specific post, list posts created "
"within a recent time window, and review unread notification messages."
),
host=settings.host,
port=settings.port,
@@ -65,6 +92,7 @@ async def search(
raise ValueError("Keyword must not be empty.")
try:
logger.info("Received search request for keyword='%s'", sanitized)
raw_results = await search_client.global_search(sanitized)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
message = (
@@ -90,10 +118,124 @@ async def search(
if ctx is not None:
await ctx.info(f"Search keyword '{sanitized}' returned {len(results)} results.")
logger.debug(
"Validated %d search results for keyword='%s'",
len(results),
sanitized,
)
return SearchResponse(keyword=sanitized, total=len(results), results=results)
@app.tool(
name="reply_to_post",
description="Create a comment on a post using an authentication token.",
structured_output=True,
)
async def reply_to_post(
post_id: Annotated[
int,
PydanticField(ge=1, description="Identifier of the post being replied to."),
],
content: Annotated[
str,
PydanticField(description="Markdown content of the reply."),
],
captcha: Annotated[
str | None,
PydanticField(
default=None,
description="Optional captcha solution if the backend requires it.",
),
] = None,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> CommentCreateResult:
"""Create a comment on a post and return the backend payload."""
sanitized_content = content.strip()
if not sanitized_content:
raise ValueError("Reply content must not be empty.")
sanitized_token = token.strip() if isinstance(token, str) else None
if sanitized_token == "":
sanitized_token = None
sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None
try:
logger.info(
"Creating reply for post_id=%s (captcha=%s)",
post_id,
bool(sanitized_captcha),
)
raw_comment = await search_client.reply_to_post(
post_id,
sanitized_token,
sanitized_content,
sanitized_captcha,
)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
status_code = exc.response.status_code
if status_code == 401:
message = (
"Authentication failed while replying to post "
f"{post_id}. Please verify the token."
)
elif status_code == 403:
message = (
"The provided token is not authorized to reply to post "
f"{post_id}."
)
elif status_code == 404:
message = f"Post {post_id} was not found."
else:
message = (
"OpenIsle backend returned HTTP "
f"{status_code} while replying to post {post_id}."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
except httpx.RequestError as exc: # pragma: no cover - network errors
message = (
"Unable to reach OpenIsle backend comment service: "
f"{exc}."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
try:
comment = CommentData.model_validate(raw_comment)
except ValidationError as exc:
message = "Received malformed data from the post comment endpoint."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
if ctx is not None:
await ctx.info(
"Reply created successfully for post "
f"{post_id}."
)
logger.debug(
"Validated reply comment payload for post_id=%s (comment_id=%s)",
post_id,
comment.id,
)
return CommentCreateResult(comment=comment)
@app.tool(
name="reply_to_comment",
description="Reply to an existing comment using an authentication token.",
@@ -104,7 +246,6 @@ async def reply_to_comment(
int,
PydanticField(ge=1, description="Identifier of the comment being replied to."),
],
token: Annotated[str, PydanticField(description="JWT bearer token for the user performing the reply.")],
content: Annotated[
str,
PydanticField(description="Markdown content of the reply."),
@@ -116,6 +257,15 @@ async def reply_to_comment(
description="Optional captcha solution if the backend requires it.",
),
] = None,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> CommentReplyResult:
"""Create a reply for a comment and return the backend payload."""
@@ -124,13 +274,16 @@ async def reply_to_comment(
if not sanitized_content:
raise ValueError("Reply content must not be empty.")
sanitized_token = token.strip()
if not sanitized_token:
raise ValueError("Authentication token must not be empty.")
sanitized_token = token.strip() if isinstance(token, str) else None
sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None
try:
logger.info(
"Creating reply for comment_id=%s (captcha=%s)",
comment_id,
bool(sanitized_captcha),
)
raw_comment = await search_client.reply_to_comment(
comment_id,
sanitized_token,
@@ -179,6 +332,11 @@ async def reply_to_comment(
"Reply created successfully for comment "
f"{comment_id}."
)
logger.debug(
"Validated reply payload for comment_id=%s (reply_id=%s)",
comment_id,
comment.id,
)
return CommentReplyResult(comment=comment)
@@ -198,6 +356,7 @@ async def recent_posts(
"""Fetch recent posts from the backend and return structured data."""
try:
logger.info("Fetching recent posts for last %s minutes", minutes)
raw_posts = await search_client.recent_posts(minutes)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
message = (
@@ -225,10 +384,250 @@ async def recent_posts(
await ctx.info(
f"Found {len(posts)} posts created within the last {minutes} minutes."
)
logger.debug(
"Validated %d recent posts for window=%s minutes",
len(posts),
minutes,
)
return RecentPostsResponse(minutes=minutes, total=len(posts), posts=posts)
@app.tool(
name="get_post",
description="Retrieve detailed information for a single post.",
structured_output=True,
)
async def get_post(
post_id: Annotated[
int,
PydanticField(ge=1, description="Identifier of the post to retrieve."),
],
token: Annotated[
str | None,
PydanticField(
default=None,
description="Optional JWT bearer token to view the post as an authenticated user.",
),
] = None,
ctx: Context | None = None,
) -> PostDetail:
"""Fetch post details from the backend and validate the response."""
sanitized_token = token.strip() if isinstance(token, str) else None
if sanitized_token == "":
sanitized_token = None
try:
logger.info("Fetching post details for post_id=%s", post_id)
raw_post = await search_client.get_post(post_id, sanitized_token)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
status_code = exc.response.status_code
if status_code == 404:
message = f"Post {post_id} was not found."
elif status_code == 401:
message = "Authentication failed while retrieving the post."
elif status_code == 403:
message = "The provided token is not authorized to view this post."
else:
message = (
"OpenIsle backend returned HTTP "
f"{status_code} while retrieving post {post_id}."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
except httpx.RequestError as exc: # pragma: no cover - network errors
message = f"Unable to reach OpenIsle backend post service: {exc}."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
try:
post = PostDetail.model_validate(raw_post)
except ValidationError as exc:
message = "Received malformed data from the post detail endpoint."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
if ctx is not None:
await ctx.info(f"Retrieved post {post_id} successfully.")
logger.debug(
"Validated post payload for post_id=%s with %d comments",
post_id,
len(post.comments),
)
return post
@app.tool(
name="list_unread_messages",
description="List unread notification messages for the authenticated user.",
structured_output=True,
)
async def list_unread_messages(
page: Annotated[
int,
PydanticField(
default=0,
ge=0,
description="Page number of unread notifications to retrieve.",
),
] = 0,
size: Annotated[
int,
PydanticField(
default=30,
ge=1,
le=100,
description="Number of unread notifications to include per page.",
),
] = 30,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> UnreadNotificationsResponse:
"""Retrieve unread notifications and return structured data."""
sanitized_token = token.strip() if isinstance(token, str) else None
try:
logger.info(
"Fetching unread notifications (page=%s, size=%s)",
page,
size,
)
raw_notifications = await search_client.list_unread_notifications(
page=page,
size=size,
token=sanitized_token,
)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
message = (
"OpenIsle backend returned HTTP "
f"{exc.response.status_code} while fetching unread notifications."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
except httpx.RequestError as exc: # pragma: no cover - network errors
message = f"Unable to reach OpenIsle backend notification service: {exc}."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
try:
notifications = [
NotificationData.model_validate(entry) for entry in raw_notifications
]
except ValidationError as exc:
message = "Received malformed data from the unread notifications endpoint."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
total = len(notifications)
if ctx is not None:
await ctx.info(
f"Retrieved {total} unread notifications (page {page}, size {size})."
)
logger.debug(
"Validated %d unread notifications for page=%s size=%s",
total,
page,
size,
)
return UnreadNotificationsResponse(
page=page,
size=size,
total=total,
notifications=notifications,
)
@app.tool(
name="mark_notifications_read",
description="Mark specific notification messages as read to remove them from the unread list.",
structured_output=True,
)
async def mark_notifications_read(
ids: Annotated[
list[int],
PydanticField(
min_length=1,
description="Notification identifiers that should be marked as read.",
),
],
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> NotificationCleanupResult:
"""Mark the supplied notifications as read and report the processed identifiers."""
sanitized_token = token.strip() if isinstance(token, str) else None
if sanitized_token == "":
sanitized_token = None
try:
logger.info(
"Marking %d notifications as read", # pragma: no branch - logging
len(ids),
)
await search_client.mark_notifications_read(ids, token=sanitized_token)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
message = (
"OpenIsle backend returned HTTP "
f"{exc.response.status_code} while marking notifications as read."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
except httpx.RequestError as exc: # pragma: no cover - network errors
message = f"Unable to reach OpenIsle backend notification service: {exc}."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
processed_ids: list[int] = []
for value in ids:
if isinstance(value, bool):
raise ValueError("Notification identifiers must be integers, not booleans.")
converted = int(value)
if converted <= 0:
raise ValueError("Notification identifiers must be positive integers.")
processed_ids.append(converted)
if ctx is not None:
await ctx.info(
f"Marked {len(processed_ids)} notifications as read.",
)
logger.debug(
"Successfully marked notifications as read: ids=%s",
processed_ids,
)
return NotificationCleanupResult(
processed_ids=processed_ids,
total_marked=len(processed_ids),
)
def main() -> None:
"""Run the MCP server using the configured transport."""