Compare commits

..

1 Commits

Author SHA1 Message Date
Tim
353041f929 feat(mcp): add post detail retrieval tool 2025-10-27 20:19:01 +08:00
5 changed files with 25 additions and 517 deletions

View File

@@ -31,11 +31,9 @@ By default the server listens on port `8085` and serves MCP over Streamable HTTP
| Tool | Description | | Tool | Description |
| --- | --- | | --- | --- |
| `search` | Perform a global search against the OpenIsle backend. | | `search` | Perform a global search against the OpenIsle backend. |
| `reply_to_post` | Create a new comment on a post using a JWT token. |
| `reply_to_comment` | Reply to an existing comment using a JWT token. | | `reply_to_comment` | Reply to an existing comment using a JWT token. |
| `recent_posts` | Retrieve posts created within the last *N* minutes. | | `recent_posts` | Retrieve posts created within the last *N* minutes. |
The tools return structured data mirroring the backend DTOs, including highlighted snippets for The tools return structured data mirroring the backend DTOs, including highlighted snippets for
search results, the full comment payload for post replies and comment replies, and detailed search results, the full comment payload for replies, and detailed metadata for recent posts.
metadata for recent posts.

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from functools import lru_cache from functools import lru_cache
from typing import Literal from typing import Literal
from pydantic import Field, SecretStr from pydantic import Field
from pydantic.networks import AnyHttpUrl from pydantic.networks import AnyHttpUrl
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -36,20 +36,6 @@ class Settings(BaseSettings):
gt=0, gt=0,
description="Timeout (seconds) for backend search requests.", description="Timeout (seconds) for backend search requests.",
) )
access_token: SecretStr | None = Field(
default=None,
description=(
"Optional JWT bearer token used for authenticated backend calls. "
"When set, tools that support authentication will use this token "
"automatically unless an explicit token override is provided."
),
)
log_level: str = Field(
"INFO",
description=(
"Logging level for the MCP server (e.g. DEBUG, INFO, WARNING)."
),
)
model_config = SettingsConfigDict( model_config = SettingsConfigDict(
env_prefix="OPENISLE_MCP_", env_prefix="OPENISLE_MCP_",

View File

@@ -177,12 +177,6 @@ class CommentReplyResult(BaseModel):
comment: CommentData = Field(description="Reply comment returned by the backend.") comment: CommentData = Field(description="Reply comment returned by the backend.")
class CommentCreateResult(BaseModel):
"""Structured response returned when creating a comment on a post."""
comment: CommentData = Field(description="Comment returned by the backend.")
class PostSummary(BaseModel): class PostSummary(BaseModel):
"""Summary information for a post.""" """Summary information for a post."""
@@ -277,57 +271,3 @@ class PostDetail(PostSummary):
) )
model_config = ConfigDict(populate_by_name=True, extra="allow") model_config = ConfigDict(populate_by_name=True, extra="allow")
class NotificationData(BaseModel):
"""Unread notification payload returned by the backend."""
id: Optional[int] = Field(default=None, description="Notification identifier.")
type: Optional[str] = Field(default=None, description="Type of the notification.")
post: Optional[PostSummary] = Field(
default=None, description="Post associated with the notification if applicable."
)
comment: Optional[CommentData] = Field(
default=None, description="Comment referenced by the notification when available."
)
parent_comment: Optional[CommentData] = Field(
default=None,
alias="parentComment",
description="Parent comment for nested replies, when present.",
)
from_user: Optional[AuthorInfo] = Field(
default=None,
alias="fromUser",
description="User who triggered the notification.",
)
reaction_type: Optional[str] = Field(
default=None,
alias="reactionType",
description="Reaction type for reaction-based notifications.",
)
content: Optional[str] = Field(
default=None, description="Additional content or message for the notification."
)
approved: Optional[bool] = Field(
default=None, description="Approval status for moderation notifications."
)
read: Optional[bool] = Field(default=None, description="Whether the notification is read.")
created_at: Optional[datetime] = Field(
default=None,
alias="createdAt",
description="Timestamp when the notification was created.",
)
model_config = ConfigDict(populate_by_name=True, extra="allow")
class UnreadNotificationsResponse(BaseModel):
"""Structured response for unread notification queries."""
page: int = Field(description="Requested page index for the unread notifications.")
size: int = Field(description="Requested page size for the unread notifications.")
total: int = Field(description="Number of unread notifications returned in this page.")
notifications: list[NotificationData] = Field(
default_factory=list,
description="Unread notifications returned by the backend.",
)

View File

@@ -3,108 +3,38 @@
from __future__ import annotations from __future__ import annotations
import json import json
import logging
from typing import Any from typing import Any
import httpx import httpx
logger = logging.getLogger(__name__)
class SearchClient: class SearchClient:
"""Client for calling the OpenIsle HTTP APIs used by the MCP server.""" """Client for calling the OpenIsle HTTP APIs used by the MCP server."""
def __init__( def __init__(self, base_url: str, *, timeout: float = 10.0) -> None:
self,
base_url: str,
*,
timeout: float = 10.0,
access_token: str | None = None,
) -> None:
self._base_url = base_url.rstrip("/") self._base_url = base_url.rstrip("/")
self._timeout = timeout self._timeout = timeout
self._client: httpx.AsyncClient | None = None self._client: httpx.AsyncClient | None = None
self._access_token = self._sanitize_token(access_token)
def _get_client(self) -> httpx.AsyncClient: def _get_client(self) -> httpx.AsyncClient:
if self._client is None: if self._client is None:
logger.debug( self._client = httpx.AsyncClient(base_url=self._base_url, timeout=self._timeout)
"Creating httpx.AsyncClient for base URL %s with timeout %.2fs",
self._base_url,
self._timeout,
)
self._client = httpx.AsyncClient(
base_url=self._base_url,
timeout=self._timeout,
)
return self._client return self._client
@staticmethod
def _sanitize_token(token: str | None) -> str | None:
if token is None:
return None
stripped = token.strip()
return stripped or None
def update_access_token(self, token: str | None) -> None:
"""Update the default access token used for authenticated requests."""
self._access_token = self._sanitize_token(token)
if self._access_token:
logger.debug("Configured default access token for SearchClient requests.")
else:
logger.debug("Cleared default access token for SearchClient requests.")
def _resolve_token(self, token: str | None) -> str | None:
candidate = self._sanitize_token(token)
if candidate is not None:
return candidate
return self._access_token
def _require_token(self, token: str | None) -> str:
resolved = self._resolve_token(token)
if resolved is None:
raise ValueError(
"Authenticated request requires an access token but none was provided."
)
return resolved
def _build_headers(
self,
*,
token: str | None = None,
accept: str = "application/json",
include_json: bool = False,
) -> dict[str, str]:
headers: dict[str, str] = {"Accept": accept}
resolved = self._resolve_token(token)
if resolved:
headers["Authorization"] = f"Bearer {resolved}"
if include_json:
headers["Content-Type"] = "application/json"
return headers
async def global_search(self, keyword: str) -> list[dict[str, Any]]: async def global_search(self, keyword: str) -> list[dict[str, Any]]:
"""Call the global search endpoint and return the parsed JSON payload.""" """Call the global search endpoint and return the parsed JSON payload."""
client = self._get_client() client = self._get_client()
logger.debug("Calling global search with keyword=%s", keyword)
response = await client.get( response = await client.get(
"/api/search/global", "/api/search/global",
params={"keyword": keyword}, params={"keyword": keyword},
headers=self._build_headers(), headers={"Accept": "application/json"},
) )
response.raise_for_status() response.raise_for_status()
payload = response.json() payload = response.json()
if not isinstance(payload, list): if not isinstance(payload, list):
formatted = json.dumps(payload, ensure_ascii=False)[:200] formatted = json.dumps(payload, ensure_ascii=False)[:200]
raise ValueError(f"Unexpected response format from search endpoint: {formatted}") raise ValueError(f"Unexpected response format from search endpoint: {formatted}")
logger.info(
"Global search returned %d results for keyword '%s'",
len(payload),
keyword,
)
return [self._ensure_dict(entry) for entry in payload] return [self._ensure_dict(entry) for entry in payload]
async def reply_to_comment( async def reply_to_comment(
@@ -117,74 +47,33 @@ class SearchClient:
"""Reply to an existing comment and return the created reply.""" """Reply to an existing comment and return the created reply."""
client = self._get_client() client = self._get_client()
resolved_token = self._require_token(token) headers = {
headers = self._build_headers(token=resolved_token, include_json=True) "Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
payload: dict[str, Any] = {"content": content} payload: dict[str, Any] = {"content": content}
if captcha is not None: if captcha is not None:
stripped_captcha = captcha.strip() stripped_captcha = captcha.strip()
if stripped_captcha: if stripped_captcha:
payload["captcha"] = stripped_captcha payload["captcha"] = stripped_captcha
logger.debug(
"Posting reply to comment_id=%s (captcha=%s)",
comment_id,
bool(captcha),
)
response = await client.post( response = await client.post(
f"/api/comments/{comment_id}/replies", f"/api/comments/{comment_id}/replies",
json=payload, json=payload,
headers=headers, headers=headers,
) )
response.raise_for_status() response.raise_for_status()
body = self._ensure_dict(response.json()) return self._ensure_dict(response.json())
logger.info("Reply to comment_id=%s succeeded with id=%s", comment_id, body.get("id"))
return body
async def reply_to_post(
self,
post_id: int,
token: str,
content: str,
captcha: str | None = None,
) -> dict[str, Any]:
"""Create a comment on a post and return the backend payload."""
client = self._get_client()
resolved_token = self._require_token(token)
headers = self._build_headers(token=resolved_token, include_json=True)
payload: dict[str, Any] = {"content": content}
if captcha is not None:
stripped_captcha = captcha.strip()
if stripped_captcha:
payload["captcha"] = stripped_captcha
logger.debug(
"Posting comment to post_id=%s (captcha=%s)",
post_id,
bool(captcha),
)
response = await client.post(
f"/api/posts/{post_id}/comments",
json=payload,
headers=headers,
)
response.raise_for_status()
body = self._ensure_dict(response.json())
logger.info("Reply to post_id=%s succeeded with id=%s", post_id, body.get("id"))
return body
async def recent_posts(self, minutes: int) -> list[dict[str, Any]]: async def recent_posts(self, minutes: int) -> list[dict[str, Any]]:
"""Return posts created within the given timeframe.""" """Return posts created within the given timeframe."""
client = self._get_client() client = self._get_client()
logger.debug(
"Fetching recent posts within last %s minutes",
minutes,
)
response = await client.get( response = await client.get(
"/api/posts/recent", "/api/posts/recent",
params={"minutes": minutes}, params={"minutes": minutes},
headers=self._build_headers(), headers={"Accept": "application/json"},
) )
response.raise_for_status() response.raise_for_status()
payload = response.json() payload = response.json()
@@ -193,65 +82,19 @@ class SearchClient:
raise ValueError( raise ValueError(
f"Unexpected response format from recent posts endpoint: {formatted}" f"Unexpected response format from recent posts endpoint: {formatted}"
) )
logger.info(
"Fetched %d recent posts for window=%s minutes",
len(payload),
minutes,
)
return [self._ensure_dict(entry) for entry in payload] return [self._ensure_dict(entry) for entry in payload]
async def get_post(self, post_id: int, token: str | None = None) -> dict[str, Any]: async def get_post(self, post_id: int, token: str | None = None) -> dict[str, Any]:
"""Retrieve the detailed payload for a single post.""" """Retrieve the detailed payload for a single post."""
client = self._get_client() client = self._get_client()
headers = self._build_headers(token=token) headers = {"Accept": "application/json"}
logger.debug("Fetching post details for post_id=%s", post_id) if token:
headers["Authorization"] = f"Bearer {token}"
response = await client.get(f"/api/posts/{post_id}", headers=headers) response = await client.get(f"/api/posts/{post_id}", headers=headers)
response.raise_for_status() response.raise_for_status()
body = self._ensure_dict(response.json()) return self._ensure_dict(response.json())
logger.info(
"Retrieved post_id=%s successfully with %d top-level comments",
post_id,
len(body.get("comments", []) if isinstance(body.get("comments"), list) else []),
)
return body
async def list_unread_notifications(
self,
*,
page: int = 0,
size: int = 30,
token: str | None = None,
) -> list[dict[str, Any]]:
"""Return unread notifications for the authenticated user."""
client = self._get_client()
resolved_token = self._require_token(token)
logger.debug(
"Fetching unread notifications with page=%s, size=%s",
page,
size,
)
response = await client.get(
"/api/notifications/unread",
params={"page": page, "size": size},
headers=self._build_headers(token=resolved_token),
)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, list):
formatted = json.dumps(payload, ensure_ascii=False)[:200]
raise ValueError(
"Unexpected response format from unread notifications endpoint: "
f"{formatted}"
)
logger.info(
"Fetched %d unread notifications (page=%s, size=%s)",
len(payload),
page,
size,
)
return [self._ensure_dict(entry) for entry in payload]
async def aclose(self) -> None: async def aclose(self) -> None:
"""Dispose of the underlying HTTP client.""" """Dispose of the underlying HTTP client."""
@@ -259,7 +102,6 @@ class SearchClient:
if self._client is not None: if self._client is not None:
await self._client.aclose() await self._client.aclose()
self._client = None self._client = None
logger.debug("Closed httpx.AsyncClient for SearchClient.")
@staticmethod @staticmethod
def _ensure_dict(entry: Any) -> dict[str, Any]: def _ensure_dict(entry: Any) -> dict[str, Any]:

View File

@@ -2,7 +2,6 @@
from __future__ import annotations from __future__ import annotations
import logging
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Annotated from typing import Annotated
@@ -13,11 +12,8 @@ from pydantic import Field as PydanticField
from .config import get_settings from .config import get_settings
from .schemas import ( from .schemas import (
CommentCreateResult,
CommentData, CommentData,
CommentReplyResult, CommentReplyResult,
NotificationData,
UnreadNotificationsResponse,
PostDetail, PostDetail,
PostSummary, PostSummary,
RecentPostsResponse, RecentPostsResponse,
@@ -27,26 +23,8 @@ from .schemas import (
from .search_client import SearchClient from .search_client import SearchClient
settings = get_settings() settings = get_settings()
if not logging.getLogger().handlers:
logging.basicConfig(
level=getattr(logging, settings.log_level.upper(), logging.INFO),
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
else:
logging.getLogger().setLevel(
getattr(logging, settings.log_level.upper(), logging.INFO)
)
logger = logging.getLogger(__name__)
search_client = SearchClient( search_client = SearchClient(
str(settings.backend_base_url), str(settings.backend_base_url), timeout=settings.request_timeout
timeout=settings.request_timeout,
access_token=(
settings.access_token.get_secret_value()
if settings.access_token is not None
else None
),
) )
@@ -55,19 +33,17 @@ async def lifespan(_: FastMCP):
"""Lifecycle hook that disposes shared resources when the server stops.""" """Lifecycle hook that disposes shared resources when the server stops."""
try: try:
logger.debug("OpenIsle MCP server lifespan started.")
yield yield
finally: finally:
logger.debug("Disposing shared SearchClient instance.")
await search_client.aclose() await search_client.aclose()
app = FastMCP( app = FastMCP(
name="openisle-mcp", name="openisle-mcp",
instructions=( instructions=(
"Use this server to search OpenIsle content, reply to posts and comments with an " "Use this server to search OpenIsle content, reply to comments with an authentication "
"authentication token, retrieve details for a specific post, list posts created " "token, retrieve details for a specific post, and list posts created within a recent time "
"within a recent time window, and review unread notification messages." "window."
), ),
host=settings.host, host=settings.host,
port=settings.port, port=settings.port,
@@ -91,7 +67,6 @@ async def search(
raise ValueError("Keyword must not be empty.") raise ValueError("Keyword must not be empty.")
try: try:
logger.info("Received search request for keyword='%s'", sanitized)
raw_results = await search_client.global_search(sanitized) raw_results = await search_client.global_search(sanitized)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
message = ( message = (
@@ -117,122 +92,10 @@ async def search(
if ctx is not None: if ctx is not None:
await ctx.info(f"Search keyword '{sanitized}' returned {len(results)} results.") await ctx.info(f"Search keyword '{sanitized}' returned {len(results)} results.")
logger.debug(
"Validated %d search results for keyword='%s'",
len(results),
sanitized,
)
return SearchResponse(keyword=sanitized, total=len(results), results=results) return SearchResponse(keyword=sanitized, total=len(results), results=results)
@app.tool(
name="reply_to_post",
description="Create a comment on a post using an authentication token.",
structured_output=True,
)
async def reply_to_post(
post_id: Annotated[
int,
PydanticField(ge=1, description="Identifier of the post being replied to."),
],
content: Annotated[
str,
PydanticField(description="Markdown content of the reply."),
],
captcha: Annotated[
str | None,
PydanticField(
default=None,
description="Optional captcha solution if the backend requires it.",
),
] = None,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> CommentCreateResult:
"""Create a comment on a post and return the backend payload."""
sanitized_content = content.strip()
if not sanitized_content:
raise ValueError("Reply content must not be empty.")
sanitized_token = token.strip() if isinstance(token, str) else None
sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None
try:
logger.info(
"Creating reply for post_id=%s (captcha=%s)",
post_id,
bool(sanitized_captcha),
)
raw_comment = await search_client.reply_to_post(
post_id,
sanitized_token,
sanitized_content,
sanitized_captcha,
)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
status_code = exc.response.status_code
if status_code == 401:
message = (
"Authentication failed while replying to post "
f"{post_id}. Please verify the token."
)
elif status_code == 403:
message = (
"The provided token is not authorized to reply to post "
f"{post_id}."
)
elif status_code == 404:
message = f"Post {post_id} was not found."
else:
message = (
"OpenIsle backend returned HTTP "
f"{status_code} while replying to post {post_id}."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
except httpx.RequestError as exc: # pragma: no cover - network errors
message = (
"Unable to reach OpenIsle backend comment service: "
f"{exc}."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
try:
comment = CommentData.model_validate(raw_comment)
except ValidationError as exc:
message = "Received malformed data from the post comment endpoint."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
if ctx is not None:
await ctx.info(
"Reply created successfully for post "
f"{post_id}."
)
logger.debug(
"Validated reply comment payload for post_id=%s (comment_id=%s)",
post_id,
comment.id,
)
return CommentCreateResult(comment=comment)
@app.tool( @app.tool(
name="reply_to_comment", name="reply_to_comment",
description="Reply to an existing comment using an authentication token.", description="Reply to an existing comment using an authentication token.",
@@ -243,6 +106,7 @@ async def reply_to_comment(
int, int,
PydanticField(ge=1, description="Identifier of the comment being replied to."), PydanticField(ge=1, description="Identifier of the comment being replied to."),
], ],
token: Annotated[str, PydanticField(description="JWT bearer token for the user performing the reply.")],
content: Annotated[ content: Annotated[
str, str,
PydanticField(description="Markdown content of the reply."), PydanticField(description="Markdown content of the reply."),
@@ -254,15 +118,6 @@ async def reply_to_comment(
description="Optional captcha solution if the backend requires it.", description="Optional captcha solution if the backend requires it.",
), ),
] = None, ] = None,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None, ctx: Context | None = None,
) -> CommentReplyResult: ) -> CommentReplyResult:
"""Create a reply for a comment and return the backend payload.""" """Create a reply for a comment and return the backend payload."""
@@ -271,16 +126,13 @@ async def reply_to_comment(
if not sanitized_content: if not sanitized_content:
raise ValueError("Reply content must not be empty.") raise ValueError("Reply content must not be empty.")
sanitized_token = token.strip() if isinstance(token, str) else None sanitized_token = token.strip()
if not sanitized_token:
raise ValueError("Authentication token must not be empty.")
sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None
try: try:
logger.info(
"Creating reply for comment_id=%s (captcha=%s)",
comment_id,
bool(sanitized_captcha),
)
raw_comment = await search_client.reply_to_comment( raw_comment = await search_client.reply_to_comment(
comment_id, comment_id,
sanitized_token, sanitized_token,
@@ -329,11 +181,6 @@ async def reply_to_comment(
"Reply created successfully for comment " "Reply created successfully for comment "
f"{comment_id}." f"{comment_id}."
) )
logger.debug(
"Validated reply payload for comment_id=%s (reply_id=%s)",
comment_id,
comment.id,
)
return CommentReplyResult(comment=comment) return CommentReplyResult(comment=comment)
@@ -353,7 +200,6 @@ async def recent_posts(
"""Fetch recent posts from the backend and return structured data.""" """Fetch recent posts from the backend and return structured data."""
try: try:
logger.info("Fetching recent posts for last %s minutes", minutes)
raw_posts = await search_client.recent_posts(minutes) raw_posts = await search_client.recent_posts(minutes)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
message = ( message = (
@@ -381,11 +227,6 @@ async def recent_posts(
await ctx.info( await ctx.info(
f"Found {len(posts)} posts created within the last {minutes} minutes." f"Found {len(posts)} posts created within the last {minutes} minutes."
) )
logger.debug(
"Validated %d recent posts for window=%s minutes",
len(posts),
minutes,
)
return RecentPostsResponse(minutes=minutes, total=len(posts), posts=posts) return RecentPostsResponse(minutes=minutes, total=len(posts), posts=posts)
@@ -416,7 +257,6 @@ async def get_post(
sanitized_token = None sanitized_token = None
try: try:
logger.info("Fetching post details for post_id=%s", post_id)
raw_post = await search_client.get_post(post_id, sanitized_token) raw_post = await search_client.get_post(post_id, sanitized_token)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
status_code = exc.response.status_code status_code = exc.response.status_code
@@ -450,108 +290,10 @@ async def get_post(
if ctx is not None: if ctx is not None:
await ctx.info(f"Retrieved post {post_id} successfully.") await ctx.info(f"Retrieved post {post_id} successfully.")
logger.debug(
"Validated post payload for post_id=%s with %d comments",
post_id,
len(post.comments),
)
return post return post
@app.tool(
name="list_unread_messages",
description="List unread notification messages for the authenticated user.",
structured_output=True,
)
async def list_unread_messages(
page: Annotated[
int,
PydanticField(
default=0,
ge=0,
description="Page number of unread notifications to retrieve.",
),
] = 0,
size: Annotated[
int,
PydanticField(
default=30,
ge=1,
le=100,
description="Number of unread notifications to include per page.",
),
] = 30,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> UnreadNotificationsResponse:
"""Retrieve unread notifications and return structured data."""
sanitized_token = token.strip() if isinstance(token, str) else None
try:
logger.info(
"Fetching unread notifications (page=%s, size=%s)",
page,
size,
)
raw_notifications = await search_client.list_unread_notifications(
page=page,
size=size,
token=sanitized_token,
)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
message = (
"OpenIsle backend returned HTTP "
f"{exc.response.status_code} while fetching unread notifications."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
except httpx.RequestError as exc: # pragma: no cover - network errors
message = f"Unable to reach OpenIsle backend notification service: {exc}."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
try:
notifications = [
NotificationData.model_validate(entry) for entry in raw_notifications
]
except ValidationError as exc:
message = "Received malformed data from the unread notifications endpoint."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
total = len(notifications)
if ctx is not None:
await ctx.info(
f"Retrieved {total} unread notifications (page {page}, size {size})."
)
logger.debug(
"Validated %d unread notifications for page=%s size=%s",
total,
page,
size,
)
return UnreadNotificationsResponse(
page=page,
size=size,
total=total,
notifications=notifications,
)
def main() -> None: def main() -> None:
"""Run the MCP server using the configured transport.""" """Run the MCP server using the configured transport."""