Add MCP tool for replying to posts

This commit is contained in:
Tim
2025-10-27 20:24:47 +08:00
parent 39875acd35
commit dca14390ca
4 changed files with 136 additions and 4 deletions

View File

@@ -12,6 +12,7 @@ from pydantic import Field as PydanticField
from .config import get_settings
from .schemas import (
CommentCreateResult,
CommentData,
CommentReplyResult,
PostDetail,
@@ -41,9 +42,9 @@ async def lifespan(_: FastMCP):
app = FastMCP(
name="openisle-mcp",
instructions=(
"Use this server to search OpenIsle content, reply to comments with an authentication "
"token, retrieve details for a specific post, and list posts created within a recent time "
"window."
"Use this server to search OpenIsle content, reply to posts and comments with an "
"authentication token, retrieve details for a specific post, and list posts created "
"within a recent time window."
),
host=settings.host,
port=settings.port,
@@ -96,6 +97,100 @@ async def search(
return SearchResponse(keyword=sanitized, total=len(results), results=results)
@app.tool(
name="reply_to_post",
description="Create a comment on a post using an authentication token.",
structured_output=True,
)
async def reply_to_post(
post_id: Annotated[
int,
PydanticField(ge=1, description="Identifier of the post being replied to."),
],
token: Annotated[
str,
PydanticField(description="JWT bearer token for the user performing the reply."),
],
content: Annotated[
str,
PydanticField(description="Markdown content of the reply."),
],
captcha: Annotated[
str | None,
PydanticField(
default=None,
description="Optional captcha solution if the backend requires it.",
),
] = None,
ctx: Context | None = None,
) -> CommentCreateResult:
"""Create a comment on a post and return the backend payload."""
sanitized_content = content.strip()
if not sanitized_content:
raise ValueError("Reply content must not be empty.")
sanitized_token = token.strip()
if not sanitized_token:
raise ValueError("Authentication token must not be empty.")
sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None
try:
raw_comment = await search_client.reply_to_post(
post_id,
sanitized_token,
sanitized_content,
sanitized_captcha,
)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
status_code = exc.response.status_code
if status_code == 401:
message = (
"Authentication failed while replying to post "
f"{post_id}. Please verify the token."
)
elif status_code == 403:
message = (
"The provided token is not authorized to reply to post "
f"{post_id}."
)
elif status_code == 404:
message = f"Post {post_id} was not found."
else:
message = (
"OpenIsle backend returned HTTP "
f"{status_code} while replying to post {post_id}."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
except httpx.RequestError as exc: # pragma: no cover - network errors
message = (
"Unable to reach OpenIsle backend comment service: "
f"{exc}."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
try:
comment = CommentData.model_validate(raw_comment)
except ValidationError as exc:
message = "Received malformed data from the post comment endpoint."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
if ctx is not None:
await ctx.info(
"Reply created successfully for post "
f"{post_id}."
)
return CommentCreateResult(comment=comment)
@app.tool(
name="reply_to_comment",
description="Reply to an existing comment using an authentication token.",