From 4cf48f9157d87bd1a55d9da0ec37a58512f4b50b Mon Sep 17 00:00:00 2001 From: Tim <135014430+nagisa77@users.noreply.github.com> Date: Tue, 28 Oct 2025 01:01:25 +0800 Subject: [PATCH] Enhance MCP logging and add unread message tool --- mcp/src/openisle_mcp/config.py | 16 ++- mcp/src/openisle_mcp/schemas.py | 54 ++++++++ mcp/src/openisle_mcp/search_client.py | 171 ++++++++++++++++++++--- mcp/src/openisle_mcp/server.py | 191 ++++++++++++++++++++++++-- 4 files changed, 396 insertions(+), 36 deletions(-) diff --git a/mcp/src/openisle_mcp/config.py b/mcp/src/openisle_mcp/config.py index 60499fe70..6fb0fd528 100644 --- a/mcp/src/openisle_mcp/config.py +++ b/mcp/src/openisle_mcp/config.py @@ -5,7 +5,7 @@ from __future__ import annotations from functools import lru_cache from typing import Literal -from pydantic import Field +from pydantic import Field, SecretStr from pydantic.networks import AnyHttpUrl from pydantic_settings import BaseSettings, SettingsConfigDict @@ -36,6 +36,20 @@ class Settings(BaseSettings): gt=0, description="Timeout (seconds) for backend search requests.", ) + access_token: SecretStr | None = Field( + default=None, + description=( + "Optional JWT bearer token used for authenticated backend calls. " + "When set, tools that support authentication will use this token " + "automatically unless an explicit token override is provided." + ), + ) + log_level: str = Field( + "INFO", + description=( + "Logging level for the MCP server (e.g. DEBUG, INFO, WARNING)." + ), + ) model_config = SettingsConfigDict( env_prefix="OPENISLE_MCP_", diff --git a/mcp/src/openisle_mcp/schemas.py b/mcp/src/openisle_mcp/schemas.py index 720c2b9db..f466726e9 100644 --- a/mcp/src/openisle_mcp/schemas.py +++ b/mcp/src/openisle_mcp/schemas.py @@ -277,3 +277,57 @@ class PostDetail(PostSummary): ) model_config = ConfigDict(populate_by_name=True, extra="allow") + + +class NotificationData(BaseModel): + """Unread notification payload returned by the backend.""" + + id: Optional[int] = Field(default=None, description="Notification identifier.") + type: Optional[str] = Field(default=None, description="Type of the notification.") + post: Optional[PostSummary] = Field( + default=None, description="Post associated with the notification if applicable." + ) + comment: Optional[CommentData] = Field( + default=None, description="Comment referenced by the notification when available." + ) + parent_comment: Optional[CommentData] = Field( + default=None, + alias="parentComment", + description="Parent comment for nested replies, when present.", + ) + from_user: Optional[AuthorInfo] = Field( + default=None, + alias="fromUser", + description="User who triggered the notification.", + ) + reaction_type: Optional[str] = Field( + default=None, + alias="reactionType", + description="Reaction type for reaction-based notifications.", + ) + content: Optional[str] = Field( + default=None, description="Additional content or message for the notification." + ) + approved: Optional[bool] = Field( + default=None, description="Approval status for moderation notifications." + ) + read: Optional[bool] = Field(default=None, description="Whether the notification is read.") + created_at: Optional[datetime] = Field( + default=None, + alias="createdAt", + description="Timestamp when the notification was created.", + ) + + model_config = ConfigDict(populate_by_name=True, extra="allow") + + +class UnreadNotificationsResponse(BaseModel): + """Structured response for unread notification queries.""" + + page: int = Field(description="Requested page index for the unread notifications.") + size: int = Field(description="Requested page size for the unread notifications.") + total: int = Field(description="Number of unread notifications returned in this page.") + notifications: list[NotificationData] = Field( + default_factory=list, + description="Unread notifications returned by the backend.", + ) diff --git a/mcp/src/openisle_mcp/search_client.py b/mcp/src/openisle_mcp/search_client.py index 3e061211c..fa8a07931 100644 --- a/mcp/src/openisle_mcp/search_client.py +++ b/mcp/src/openisle_mcp/search_client.py @@ -3,38 +3,108 @@ from __future__ import annotations import json +import logging from typing import Any import httpx +logger = logging.getLogger(__name__) + + class SearchClient: """Client for calling the OpenIsle HTTP APIs used by the MCP server.""" - def __init__(self, base_url: str, *, timeout: float = 10.0) -> None: + def __init__( + self, + base_url: str, + *, + timeout: float = 10.0, + access_token: str | None = None, + ) -> None: self._base_url = base_url.rstrip("/") self._timeout = timeout self._client: httpx.AsyncClient | None = None + self._access_token = self._sanitize_token(access_token) def _get_client(self) -> httpx.AsyncClient: if self._client is None: - self._client = httpx.AsyncClient(base_url=self._base_url, timeout=self._timeout) + logger.debug( + "Creating httpx.AsyncClient for base URL %s with timeout %.2fs", + self._base_url, + self._timeout, + ) + self._client = httpx.AsyncClient( + base_url=self._base_url, + timeout=self._timeout, + ) return self._client + @staticmethod + def _sanitize_token(token: str | None) -> str | None: + if token is None: + return None + stripped = token.strip() + return stripped or None + + def update_access_token(self, token: str | None) -> None: + """Update the default access token used for authenticated requests.""" + + self._access_token = self._sanitize_token(token) + if self._access_token: + logger.debug("Configured default access token for SearchClient requests.") + else: + logger.debug("Cleared default access token for SearchClient requests.") + + def _resolve_token(self, token: str | None) -> str | None: + candidate = self._sanitize_token(token) + if candidate is not None: + return candidate + return self._access_token + + def _require_token(self, token: str | None) -> str: + resolved = self._resolve_token(token) + if resolved is None: + raise ValueError( + "Authenticated request requires an access token but none was provided." + ) + return resolved + + def _build_headers( + self, + *, + token: str | None = None, + accept: str = "application/json", + include_json: bool = False, + ) -> dict[str, str]: + headers: dict[str, str] = {"Accept": accept} + resolved = self._resolve_token(token) + if resolved: + headers["Authorization"] = f"Bearer {resolved}" + if include_json: + headers["Content-Type"] = "application/json" + return headers + async def global_search(self, keyword: str) -> list[dict[str, Any]]: """Call the global search endpoint and return the parsed JSON payload.""" client = self._get_client() + logger.debug("Calling global search with keyword=%s", keyword) response = await client.get( "/api/search/global", params={"keyword": keyword}, - headers={"Accept": "application/json"}, + headers=self._build_headers(), ) response.raise_for_status() payload = response.json() if not isinstance(payload, list): formatted = json.dumps(payload, ensure_ascii=False)[:200] raise ValueError(f"Unexpected response format from search endpoint: {formatted}") + logger.info( + "Global search returned %d results for keyword '%s'", + len(payload), + keyword, + ) return [self._ensure_dict(entry) for entry in payload] async def reply_to_comment( @@ -47,24 +117,28 @@ class SearchClient: """Reply to an existing comment and return the created reply.""" client = self._get_client() - headers = { - "Accept": "application/json", - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - } + resolved_token = self._require_token(token) + headers = self._build_headers(token=resolved_token, include_json=True) payload: dict[str, Any] = {"content": content} if captcha is not None: stripped_captcha = captcha.strip() if stripped_captcha: payload["captcha"] = stripped_captcha + logger.debug( + "Posting reply to comment_id=%s (captcha=%s)", + comment_id, + bool(captcha), + ) response = await client.post( f"/api/comments/{comment_id}/replies", json=payload, headers=headers, ) response.raise_for_status() - return self._ensure_dict(response.json()) + body = self._ensure_dict(response.json()) + logger.info("Reply to comment_id=%s succeeded with id=%s", comment_id, body.get("id")) + return body async def reply_to_post( self, @@ -76,33 +150,41 @@ class SearchClient: """Create a comment on a post and return the backend payload.""" client = self._get_client() - headers = { - "Accept": "application/json", - "Content-Type": "application/json", - "Authorization": f"Bearer {token}", - } + resolved_token = self._require_token(token) + headers = self._build_headers(token=resolved_token, include_json=True) payload: dict[str, Any] = {"content": content} if captcha is not None: stripped_captcha = captcha.strip() if stripped_captcha: payload["captcha"] = stripped_captcha + logger.debug( + "Posting comment to post_id=%s (captcha=%s)", + post_id, + bool(captcha), + ) response = await client.post( f"/api/posts/{post_id}/comments", json=payload, headers=headers, ) response.raise_for_status() - return self._ensure_dict(response.json()) + body = self._ensure_dict(response.json()) + logger.info("Reply to post_id=%s succeeded with id=%s", post_id, body.get("id")) + return body async def recent_posts(self, minutes: int) -> list[dict[str, Any]]: """Return posts created within the given timeframe.""" client = self._get_client() + logger.debug( + "Fetching recent posts within last %s minutes", + minutes, + ) response = await client.get( "/api/posts/recent", params={"minutes": minutes}, - headers={"Accept": "application/json"}, + headers=self._build_headers(), ) response.raise_for_status() payload = response.json() @@ -111,19 +193,65 @@ class SearchClient: raise ValueError( f"Unexpected response format from recent posts endpoint: {formatted}" ) + logger.info( + "Fetched %d recent posts for window=%s minutes", + len(payload), + minutes, + ) return [self._ensure_dict(entry) for entry in payload] async def get_post(self, post_id: int, token: str | None = None) -> dict[str, Any]: """Retrieve the detailed payload for a single post.""" client = self._get_client() - headers = {"Accept": "application/json"} - if token: - headers["Authorization"] = f"Bearer {token}" - + headers = self._build_headers(token=token) + logger.debug("Fetching post details for post_id=%s", post_id) response = await client.get(f"/api/posts/{post_id}", headers=headers) response.raise_for_status() - return self._ensure_dict(response.json()) + body = self._ensure_dict(response.json()) + logger.info( + "Retrieved post_id=%s successfully with %d top-level comments", + post_id, + len(body.get("comments", []) if isinstance(body.get("comments"), list) else []), + ) + return body + + async def list_unread_notifications( + self, + *, + page: int = 0, + size: int = 30, + token: str | None = None, + ) -> list[dict[str, Any]]: + """Return unread notifications for the authenticated user.""" + + client = self._get_client() + resolved_token = self._require_token(token) + logger.debug( + "Fetching unread notifications with page=%s, size=%s", + page, + size, + ) + response = await client.get( + "/api/notifications/unread", + params={"page": page, "size": size}, + headers=self._build_headers(token=resolved_token), + ) + response.raise_for_status() + payload = response.json() + if not isinstance(payload, list): + formatted = json.dumps(payload, ensure_ascii=False)[:200] + raise ValueError( + "Unexpected response format from unread notifications endpoint: " + f"{formatted}" + ) + logger.info( + "Fetched %d unread notifications (page=%s, size=%s)", + len(payload), + page, + size, + ) + return [self._ensure_dict(entry) for entry in payload] async def aclose(self) -> None: """Dispose of the underlying HTTP client.""" @@ -131,6 +259,7 @@ class SearchClient: if self._client is not None: await self._client.aclose() self._client = None + logger.debug("Closed httpx.AsyncClient for SearchClient.") @staticmethod def _ensure_dict(entry: Any) -> dict[str, Any]: diff --git a/mcp/src/openisle_mcp/server.py b/mcp/src/openisle_mcp/server.py index 9f5a6af31..9e4670701 100644 --- a/mcp/src/openisle_mcp/server.py +++ b/mcp/src/openisle_mcp/server.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging from contextlib import asynccontextmanager from typing import Annotated @@ -15,6 +16,8 @@ from .schemas import ( CommentCreateResult, CommentData, CommentReplyResult, + NotificationData, + UnreadNotificationsResponse, PostDetail, PostSummary, RecentPostsResponse, @@ -24,8 +27,26 @@ from .schemas import ( from .search_client import SearchClient settings = get_settings() +if not logging.getLogger().handlers: + logging.basicConfig( + level=getattr(logging, settings.log_level.upper(), logging.INFO), + format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", + ) +else: + logging.getLogger().setLevel( + getattr(logging, settings.log_level.upper(), logging.INFO) + ) + +logger = logging.getLogger(__name__) + search_client = SearchClient( - str(settings.backend_base_url), timeout=settings.request_timeout + str(settings.backend_base_url), + timeout=settings.request_timeout, + access_token=( + settings.access_token.get_secret_value() + if settings.access_token is not None + else None + ), ) @@ -34,8 +55,10 @@ async def lifespan(_: FastMCP): """Lifecycle hook that disposes shared resources when the server stops.""" try: + logger.debug("OpenIsle MCP server lifespan started.") yield finally: + logger.debug("Disposing shared SearchClient instance.") await search_client.aclose() @@ -43,8 +66,8 @@ app = FastMCP( name="openisle-mcp", instructions=( "Use this server to search OpenIsle content, reply to posts and comments with an " - "authentication token, retrieve details for a specific post, and list posts created " - "within a recent time window." + "authentication token, retrieve details for a specific post, list posts created " + "within a recent time window, and review unread notification messages." ), host=settings.host, port=settings.port, @@ -68,6 +91,7 @@ async def search( raise ValueError("Keyword must not be empty.") try: + logger.info("Received search request for keyword='%s'", sanitized) raw_results = await search_client.global_search(sanitized) except httpx.HTTPStatusError as exc: # pragma: no cover - network errors message = ( @@ -93,6 +117,11 @@ async def search( if ctx is not None: await ctx.info(f"Search keyword '{sanitized}' returned {len(results)} results.") + logger.debug( + "Validated %d search results for keyword='%s'", + len(results), + sanitized, + ) return SearchResponse(keyword=sanitized, total=len(results), results=results) @@ -107,10 +136,6 @@ async def reply_to_post( int, PydanticField(ge=1, description="Identifier of the post being replied to."), ], - token: Annotated[ - str, - PydanticField(description="JWT bearer token for the user performing the reply."), - ], content: Annotated[ str, PydanticField(description="Markdown content of the reply."), @@ -122,6 +147,15 @@ async def reply_to_post( description="Optional captcha solution if the backend requires it.", ), ] = None, + token: Annotated[ + str | None, + PydanticField( + default=None, + description=( + "Optional JWT bearer token. When omitted the configured access token is used." + ), + ), + ] = None, ctx: Context | None = None, ) -> CommentCreateResult: """Create a comment on a post and return the backend payload.""" @@ -130,13 +164,16 @@ async def reply_to_post( if not sanitized_content: raise ValueError("Reply content must not be empty.") - sanitized_token = token.strip() - if not sanitized_token: - raise ValueError("Authentication token must not be empty.") + sanitized_token = token.strip() if isinstance(token, str) else None sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None try: + logger.info( + "Creating reply for post_id=%s (captcha=%s)", + post_id, + bool(sanitized_captcha), + ) raw_comment = await search_client.reply_to_post( post_id, sanitized_token, @@ -187,6 +224,11 @@ async def reply_to_post( "Reply created successfully for post " f"{post_id}." ) + logger.debug( + "Validated reply comment payload for post_id=%s (comment_id=%s)", + post_id, + comment.id, + ) return CommentCreateResult(comment=comment) @@ -201,7 +243,6 @@ async def reply_to_comment( int, PydanticField(ge=1, description="Identifier of the comment being replied to."), ], - token: Annotated[str, PydanticField(description="JWT bearer token for the user performing the reply.")], content: Annotated[ str, PydanticField(description="Markdown content of the reply."), @@ -213,6 +254,15 @@ async def reply_to_comment( description="Optional captcha solution if the backend requires it.", ), ] = None, + token: Annotated[ + str | None, + PydanticField( + default=None, + description=( + "Optional JWT bearer token. When omitted the configured access token is used." + ), + ), + ] = None, ctx: Context | None = None, ) -> CommentReplyResult: """Create a reply for a comment and return the backend payload.""" @@ -221,13 +271,16 @@ async def reply_to_comment( if not sanitized_content: raise ValueError("Reply content must not be empty.") - sanitized_token = token.strip() - if not sanitized_token: - raise ValueError("Authentication token must not be empty.") + sanitized_token = token.strip() if isinstance(token, str) else None sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None try: + logger.info( + "Creating reply for comment_id=%s (captcha=%s)", + comment_id, + bool(sanitized_captcha), + ) raw_comment = await search_client.reply_to_comment( comment_id, sanitized_token, @@ -276,6 +329,11 @@ async def reply_to_comment( "Reply created successfully for comment " f"{comment_id}." ) + logger.debug( + "Validated reply payload for comment_id=%s (reply_id=%s)", + comment_id, + comment.id, + ) return CommentReplyResult(comment=comment) @@ -295,6 +353,7 @@ async def recent_posts( """Fetch recent posts from the backend and return structured data.""" try: + logger.info("Fetching recent posts for last %s minutes", minutes) raw_posts = await search_client.recent_posts(minutes) except httpx.HTTPStatusError as exc: # pragma: no cover - network errors message = ( @@ -322,6 +381,11 @@ async def recent_posts( await ctx.info( f"Found {len(posts)} posts created within the last {minutes} minutes." ) + logger.debug( + "Validated %d recent posts for window=%s minutes", + len(posts), + minutes, + ) return RecentPostsResponse(minutes=minutes, total=len(posts), posts=posts) @@ -352,6 +416,7 @@ async def get_post( sanitized_token = None try: + logger.info("Fetching post details for post_id=%s", post_id) raw_post = await search_client.get_post(post_id, sanitized_token) except httpx.HTTPStatusError as exc: # pragma: no cover - network errors status_code = exc.response.status_code @@ -385,10 +450,108 @@ async def get_post( if ctx is not None: await ctx.info(f"Retrieved post {post_id} successfully.") + logger.debug( + "Validated post payload for post_id=%s with %d comments", + post_id, + len(post.comments), + ) return post +@app.tool( + name="list_unread_messages", + description="List unread notification messages for the authenticated user.", + structured_output=True, +) +async def list_unread_messages( + page: Annotated[ + int, + PydanticField( + default=0, + ge=0, + description="Page number of unread notifications to retrieve.", + ), + ] = 0, + size: Annotated[ + int, + PydanticField( + default=30, + ge=1, + le=100, + description="Number of unread notifications to include per page.", + ), + ] = 30, + token: Annotated[ + str | None, + PydanticField( + default=None, + description=( + "Optional JWT bearer token. When omitted the configured access token is used." + ), + ), + ] = None, + ctx: Context | None = None, +) -> UnreadNotificationsResponse: + """Retrieve unread notifications and return structured data.""" + + sanitized_token = token.strip() if isinstance(token, str) else None + + try: + logger.info( + "Fetching unread notifications (page=%s, size=%s)", + page, + size, + ) + raw_notifications = await search_client.list_unread_notifications( + page=page, + size=size, + token=sanitized_token, + ) + except httpx.HTTPStatusError as exc: # pragma: no cover - network errors + message = ( + "OpenIsle backend returned HTTP " + f"{exc.response.status_code} while fetching unread notifications." + ) + if ctx is not None: + await ctx.error(message) + raise ValueError(message) from exc + except httpx.RequestError as exc: # pragma: no cover - network errors + message = f"Unable to reach OpenIsle backend notification service: {exc}." + if ctx is not None: + await ctx.error(message) + raise ValueError(message) from exc + + try: + notifications = [ + NotificationData.model_validate(entry) for entry in raw_notifications + ] + except ValidationError as exc: + message = "Received malformed data from the unread notifications endpoint." + if ctx is not None: + await ctx.error(message) + raise ValueError(message) from exc + + total = len(notifications) + if ctx is not None: + await ctx.info( + f"Retrieved {total} unread notifications (page {page}, size {size})." + ) + logger.debug( + "Validated %d unread notifications for page=%s size=%s", + total, + page, + size, + ) + + return UnreadNotificationsResponse( + page=page, + size=size, + total=total, + notifications=notifications, + ) + + def main() -> None: """Run the MCP server using the configured transport."""