Merge pull request #1107 from nagisa77/codex/add-posting-tool-support-for-mcp-service

Add MCP tool for creating posts
This commit is contained in:
Tim
2025-10-28 15:06:09 +08:00
committed by GitHub
4 changed files with 375 additions and 3 deletions

View File

@@ -31,6 +31,7 @@ By default the server listens on port `8085` and serves MCP over Streamable HTTP
| Tool | Description |
| --- | --- |
| `search` | Perform a global search against the OpenIsle backend. |
| `create_post` | Publish a new post using a JWT token. |
| `reply_to_post` | Create a new comment on a post using a JWT token. |
| `reply_to_comment` | Reply to an existing comment using a JWT token. |
| `recent_posts` | Retrieve posts created within the last *N* minutes. |

View File

@@ -192,6 +192,12 @@ class CommentCreateResult(BaseModel):
comment: CommentData = Field(description="Comment returned by the backend.")
class PostCreateResult(BaseModel):
"""Structured response returned when creating a new post."""
post: PostDetail = Field(description="Detailed post payload returned by the backend.")
class PostSummary(BaseModel):
"""Summary information for a post."""

View File

@@ -173,6 +173,33 @@ class SearchClient:
logger.info("Reply to post_id=%s succeeded with id=%s", post_id, body.get("id"))
return body
async def create_post(
self,
payload: dict[str, Any],
*,
token: str | None = None,
) -> dict[str, Any]:
"""Create a new post and return the detailed backend payload."""
client = self._get_client()
resolved_token = self._require_token(token)
headers = self._build_headers(token=resolved_token, include_json=True)
logger.debug(
"Creating post with category_id=%s and %d tag(s)",
payload.get("categoryId"),
len(payload.get("tagIds", []) if isinstance(payload.get("tagIds"), list) else []),
)
response = await client.post(
"/api/posts",
json=payload,
headers=headers,
)
response.raise_for_status()
body = self._ensure_dict(response.json())
logger.info("Post creation succeeded with id=%s", body.get("id"))
return body
async def recent_posts(self, minutes: int) -> list[dict[str, Any]]:
"""Return posts created within the given timeframe."""

View File

@@ -20,6 +20,7 @@ from .schemas import (
NotificationCleanupResult,
UnreadNotificationsResponse,
PostDetail,
PostCreateResult,
PostSummary,
RecentPostsResponse,
SearchResponse,
@@ -66,9 +67,9 @@ async def lifespan(_: FastMCP):
app = FastMCP(
name="openisle-mcp",
instructions=(
"Use this server to search OpenIsle content, reply to posts and comments with an "
"authentication token, retrieve details for a specific post, list posts created "
"within a recent time window, and review unread notification messages."
"Use this server to search OpenIsle content, create new posts, reply to posts and "
"comments with an authentication token, retrieve details for a specific post, list "
"posts created within a recent time window, and review unread notification messages."
),
host=settings.host,
port=settings.port,
@@ -341,6 +342,343 @@ async def reply_to_comment(
return CommentReplyResult(comment=comment)
@app.tool(
name="create_post",
description="Publish a new post using an authentication token.",
structured_output=True,
)
async def create_post(
title: Annotated[
str,
PydanticField(description="Title of the post to be created."),
],
content: Annotated[
str,
PydanticField(description="Markdown content of the post."),
],
category_id: Annotated[
int | None,
PydanticField(
default=None,
ge=1,
description="Optional category identifier for the post.",
),
] = None,
tag_ids: Annotated[
list[int] | None,
PydanticField(
default=None,
min_length=1,
description="Optional list of tag identifiers to assign to the post.",
),
] = None,
post_type: Annotated[
str | None,
PydanticField(
default=None,
description="Optional post type value (e.g. LOTTERY, POLL).",
),
] = None,
visible_scope: Annotated[
str | None,
PydanticField(
default=None,
description="Optional visibility scope for the post.",
),
] = None,
prize_description: Annotated[
str | None,
PydanticField(
default=None,
description="Description of the prize for lottery posts.",
),
] = None,
prize_icon: Annotated[
str | None,
PydanticField(
default=None,
description="Icon URL for the lottery prize.",
),
] = None,
prize_count: Annotated[
int | None,
PydanticField(
default=None,
ge=1,
description="Total number of prizes available for lottery posts.",
),
] = None,
point_cost: Annotated[
int | None,
PydanticField(
default=None,
ge=0,
description="Point cost required to participate in the post, when applicable.",
),
] = None,
start_time: Annotated[
str | None,
PydanticField(
default=None,
description="ISO 8601 start time for lottery or poll posts.",
),
] = None,
end_time: Annotated[
str | None,
PydanticField(
default=None,
description="ISO 8601 end time for lottery or poll posts.",
),
] = None,
options: Annotated[
list[str] | None,
PydanticField(
default=None,
min_length=1,
description="Poll options when creating a poll post.",
),
] = None,
multiple: Annotated[
bool | None,
PydanticField(
default=None,
description="Whether the poll allows selecting multiple options.",
),
] = None,
proposed_name: Annotated[
str | None,
PydanticField(
default=None,
description="Proposed category name for suggestion posts.",
),
] = None,
proposal_description: Annotated[
str | None,
PydanticField(
default=None,
description="Supporting description for the proposed category.",
),
] = None,
captcha: Annotated[
str | None,
PydanticField(
default=None,
description="Captcha solution if the backend requires one to create posts.",
),
] = None,
token: Annotated[
str | None,
PydanticField(
default=None,
description=(
"Optional JWT bearer token. When omitted the configured access token is used."
),
),
] = None,
ctx: Context | None = None,
) -> PostCreateResult:
"""Create a new post in OpenIsle and return the detailed backend payload."""
sanitized_title = title.strip()
if not sanitized_title:
raise ValueError("Post title must not be empty.")
sanitized_content = content.strip()
if not sanitized_content:
raise ValueError("Post content must not be empty.")
sanitized_token = token.strip() if isinstance(token, str) else None
if sanitized_token == "":
sanitized_token = None
sanitized_category_id: int | None = None
if category_id is not None:
if isinstance(category_id, bool):
raise ValueError("Category identifier must be an integer, not a boolean.")
try:
sanitized_category_id = int(category_id)
except (TypeError, ValueError) as exc:
raise ValueError("Category identifier must be an integer.") from exc
if sanitized_category_id <= 0:
raise ValueError("Category identifier must be a positive integer.")
sanitized_tag_ids: list[int] | None = None
if tag_ids is not None:
sanitized_tag_ids = []
for value in tag_ids:
if isinstance(value, bool):
raise ValueError("Tag identifiers must be integers, not booleans.")
try:
converted = int(value)
except (TypeError, ValueError) as exc:
raise ValueError("Tag identifiers must be integers.") from exc
if converted <= 0:
raise ValueError("Tag identifiers must be positive integers.")
sanitized_tag_ids.append(converted)
if not sanitized_tag_ids:
sanitized_tag_ids = None
sanitized_post_type = post_type.strip() if isinstance(post_type, str) else None
if sanitized_post_type == "":
sanitized_post_type = None
sanitized_visible_scope = (
visible_scope.strip() if isinstance(visible_scope, str) else None
)
if sanitized_visible_scope == "":
sanitized_visible_scope = None
sanitized_prize_description = (
prize_description.strip() if isinstance(prize_description, str) else None
)
if sanitized_prize_description == "":
sanitized_prize_description = None
sanitized_prize_icon = prize_icon.strip() if isinstance(prize_icon, str) else None
if sanitized_prize_icon == "":
sanitized_prize_icon = None
sanitized_prize_count: int | None = None
if prize_count is not None:
if isinstance(prize_count, bool):
raise ValueError("Prize count must be an integer, not a boolean.")
try:
sanitized_prize_count = int(prize_count)
except (TypeError, ValueError) as exc:
raise ValueError("Prize count must be an integer.") from exc
if sanitized_prize_count <= 0:
raise ValueError("Prize count must be a positive integer.")
sanitized_point_cost: int | None = None
if point_cost is not None:
if isinstance(point_cost, bool):
raise ValueError("Point cost must be an integer, not a boolean.")
try:
sanitized_point_cost = int(point_cost)
except (TypeError, ValueError) as exc:
raise ValueError("Point cost must be an integer.") from exc
if sanitized_point_cost < 0:
raise ValueError("Point cost cannot be negative.")
sanitized_start_time = start_time.strip() if isinstance(start_time, str) else None
if sanitized_start_time == "":
sanitized_start_time = None
sanitized_end_time = end_time.strip() if isinstance(end_time, str) else None
if sanitized_end_time == "":
sanitized_end_time = None
sanitized_options: list[str] | None = None
if options is not None:
sanitized_options = []
for option in options:
if option is None:
continue
stripped_option = option.strip()
if stripped_option:
sanitized_options.append(stripped_option)
if not sanitized_options:
sanitized_options = None
sanitized_multiple = bool(multiple) if isinstance(multiple, bool) else None
sanitized_proposed_name = (
proposed_name.strip() if isinstance(proposed_name, str) else None
)
if sanitized_proposed_name == "":
sanitized_proposed_name = None
sanitized_proposal_description = (
proposal_description.strip() if isinstance(proposal_description, str) else None
)
if sanitized_proposal_description == "":
sanitized_proposal_description = None
sanitized_captcha = captcha.strip() if isinstance(captcha, str) else None
if sanitized_captcha == "":
sanitized_captcha = None
payload: dict[str, object] = {
"title": sanitized_title,
"content": sanitized_content,
}
if sanitized_category_id is not None:
payload["categoryId"] = sanitized_category_id
if sanitized_tag_ids is not None:
payload["tagIds"] = sanitized_tag_ids
if sanitized_post_type is not None:
payload["type"] = sanitized_post_type
if sanitized_visible_scope is not None:
payload["postVisibleScopeType"] = sanitized_visible_scope
if sanitized_prize_description is not None:
payload["prizeDescription"] = sanitized_prize_description
if sanitized_prize_icon is not None:
payload["prizeIcon"] = sanitized_prize_icon
if sanitized_prize_count is not None:
payload["prizeCount"] = sanitized_prize_count
if sanitized_point_cost is not None:
payload["pointCost"] = sanitized_point_cost
if sanitized_start_time is not None:
payload["startTime"] = sanitized_start_time
if sanitized_end_time is not None:
payload["endTime"] = sanitized_end_time
if sanitized_options is not None:
payload["options"] = sanitized_options
if sanitized_multiple is not None:
payload["multiple"] = sanitized_multiple
if sanitized_proposed_name is not None:
payload["proposedName"] = sanitized_proposed_name
if sanitized_proposal_description is not None:
payload["proposalDescription"] = sanitized_proposal_description
if sanitized_captcha is not None:
payload["captcha"] = sanitized_captcha
try:
logger.info("Creating post with title='%s'", sanitized_title)
raw_post = await search_client.create_post(payload, token=sanitized_token)
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
status_code = exc.response.status_code
if status_code == 400:
message = (
"Post creation failed due to invalid input or captcha verification errors."
)
elif status_code == 401:
message = "Authentication failed while creating the post. Please verify the token."
elif status_code == 403:
message = "The provided token is not authorized to create posts."
else:
message = (
"OpenIsle backend returned HTTP "
f"{status_code} while creating the post."
)
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
except httpx.RequestError as exc: # pragma: no cover - network errors
message = f"Unable to reach OpenIsle backend post service: {exc}."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
try:
post = PostDetail.model_validate(raw_post)
except ValidationError as exc:
message = "Received malformed data from the post creation endpoint."
if ctx is not None:
await ctx.error(message)
raise ValueError(message) from exc
if ctx is not None:
await ctx.info(f"Post '{post.title}' created successfully.")
logger.debug(
"Validated created post payload with id=%s and title='%s'",
post.id,
post.title,
)
return PostCreateResult(post=post)
@app.tool(
name="recent_posts",
description="Retrieve posts created in the last N minutes.",