Compare commits

...

4 Commits

Author SHA1 Message Date
Tim
e585100625 Remove token parameters from MCP tools 2025-10-28 01:32:02 +08:00
Tim
e94471b53e Merge pull request #1098 from nagisa77/codex/store-accesstoken-as-jwt-token
Cache MCP session JWT tokens
2025-10-28 01:20:52 +08:00
Tim
997dacdbe6 Cache MCP session JWT tokens 2025-10-28 01:20:32 +08:00
Tim
c01349a436 Merge pull request #1097 from nagisa77/codex/improve-python-mcp-logs
Improve MCP logging and add unread notification tool
2025-10-28 01:01:47 +08:00

View File

@@ -4,12 +4,13 @@ from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from typing import Annotated
from typing import Annotated, Any
import httpx
from mcp.server.fastmcp import Context, FastMCP
from pydantic import ValidationError
from pydantic import Field as PydanticField
from weakref import WeakKeyDictionary
from .config import get_settings
from .schemas import (
@@ -50,6 +51,69 @@ search_client = SearchClient(
)
class SessionTokenManager:
"""Cache JWT access tokens on a per-session basis."""
def __init__(self) -> None:
self._tokens: WeakKeyDictionary[Any, str] = WeakKeyDictionary()
def resolve(
self, ctx: Context | None, token: str | None = None
) -> str | None:
"""Resolve and optionally persist the token for the current session."""
session = self._get_session(ctx)
if isinstance(token, str):
stripped = token.strip()
if stripped:
if session is not None:
self._tokens[session] = stripped
logger.debug(
"Stored JWT token for session %s.",
self._describe_session(session),
)
return stripped
if session is not None and session in self._tokens:
logger.debug(
"Clearing stored JWT token for session %s due to empty input.",
self._describe_session(session),
)
del self._tokens[session]
return None
if session is not None:
cached = self._tokens.get(session)
if cached:
logger.debug(
"Reusing cached JWT token for session %s.",
self._describe_session(session),
)
return cached
return None
@staticmethod
def _get_session(ctx: Context | None) -> Any | None:
if ctx is None:
return None
try:
return ctx.session
except Exception: # pragma: no cover - defensive guard
return None
@staticmethod
def _describe_session(session: Any) -> str:
identifier = getattr(session, "mcp_session_id", None)
if isinstance(identifier, str) and identifier:
return identifier
return hex(id(session))
session_token_manager = SessionTokenManager()
@asynccontextmanager
async def lifespan(_: FastMCP):
"""Lifecycle hook that disposes shared resources when the server stops."""
@@ -65,8 +129,8 @@ async def lifespan(_: FastMCP):
app = FastMCP(
name="openisle-mcp",
instructions=(
"Use this server to search OpenIsle content, reply to posts and comments with an "
"authentication token, retrieve details for a specific post, list posts created "
"Use this server to search OpenIsle content, reply to posts and comments with "
"session-managed authentication, retrieve details for a specific post, list posts created "
"within a recent time window, and review unread notification messages."
),
host=settings.host,
@@ -128,7 +192,7 @@ async def search(
@app.tool(
name="reply_to_post",
description="Create a comment on a post using an authentication token.",
description="Create a comment on a post using session authentication.",
structured_output=True,
)
async def reply_to_post(
@@ -147,15 +211,6 @@ async def reply_to_post(
description="Optional captcha solution if the backend requires it.",
),
] = None,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> CommentCreateResult:
"""Create a comment on a post and return the backend payload."""
@@ -164,10 +219,10 @@ async def reply_to_post(
if not sanitized_content:
raise ValueError("Reply content must not be empty.")
sanitized_token = token.strip() if isinstance(token, str) else None
sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None
resolved_token = session_token_manager.resolve(ctx)
try:
logger.info(
"Creating reply for post_id=%s (captcha=%s)",
@@ -176,7 +231,7 @@ async def reply_to_post(
)
raw_comment = await search_client.reply_to_post(
post_id,
sanitized_token,
resolved_token,
sanitized_content,
sanitized_captcha,
)
@@ -235,7 +290,7 @@ async def reply_to_post(
@app.tool(
name="reply_to_comment",
description="Reply to an existing comment using an authentication token.",
description="Reply to an existing comment using session authentication.",
structured_output=True,
)
async def reply_to_comment(
@@ -254,15 +309,6 @@ async def reply_to_comment(
description="Optional captcha solution if the backend requires it.",
),
] = None,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> CommentReplyResult:
"""Create a reply for a comment and return the backend payload."""
@@ -271,10 +317,10 @@ async def reply_to_comment(
if not sanitized_content:
raise ValueError("Reply content must not be empty.")
sanitized_token = token.strip() if isinstance(token, str) else None
sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None
resolved_token = session_token_manager.resolve(ctx)
try:
logger.info(
"Creating reply for comment_id=%s (captcha=%s)",
@@ -283,7 +329,7 @@ async def reply_to_comment(
)
raw_comment = await search_client.reply_to_comment(
comment_id,
sanitized_token,
resolved_token,
sanitized_content,
sanitized_captcha,
)
@@ -400,24 +446,15 @@ async def get_post(
int,
PydanticField(ge=1, description="Identifier of the post to retrieve."),
],
token: Annotated[
str | None,
PydanticField(
default=None,
description="Optional JWT bearer token to view the post as an authenticated user.",
),
] = None,
ctx: Context | None = None,
) -> PostDetail:
"""Fetch post details from the backend and validate the response."""
sanitized_token = token.strip() if isinstance(token, str) else None
if sanitized_token == "":
sanitized_token = None
resolved_token = session_token_manager.resolve(ctx)
try:
logger.info("Fetching post details for post_id=%s", post_id)
raw_post = await search_client.get_post(post_id, sanitized_token)
raw_post = await search_client.get_post(post_id, resolved_token)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
status_code = exc.response.status_code
if status_code == 404:
@@ -482,20 +519,11 @@ async def list_unread_messages(
description="Number of unread notifications to include per page.",
),
] = 30,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> UnreadNotificationsResponse:
"""Retrieve unread notifications and return structured data."""
sanitized_token = token.strip() if isinstance(token, str) else None
resolved_token = session_token_manager.resolve(ctx)
try:
logger.info(
@@ -506,7 +534,7 @@ async def list_unread_messages(
raw_notifications = await search_client.list_unread_notifications(
page=page,
size=size,
token=sanitized_token,
token=resolved_token,
)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
message = (