mirror of
https://github.com/nagisa77/OpenIsle.git
synced 2026-05-08 03:37:28 +08:00
feat: add mcp
This commit is contained in:
6
mcp/src/openisle_mcp/__init__.py
Normal file
6
mcp/src/openisle_mcp/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""OpenIsle MCP server package."""
|
||||
|
||||
from .config import Settings, get_settings
|
||||
|
||||
__all__ = ["Settings", "get_settings"]
|
||||
|
||||
52
mcp/src/openisle_mcp/config.py
Normal file
52
mcp/src/openisle_mcp/config.py
Normal file
@@ -0,0 +1,52 @@
|
||||
"""Application configuration helpers for the OpenIsle MCP server."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from functools import lru_cache
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import Field
|
||||
from pydantic.networks import AnyHttpUrl
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""Configuration for the MCP server."""
|
||||
|
||||
backend_base_url: AnyHttpUrl = Field(
|
||||
"http://springboot:8080",
|
||||
description="Base URL for the OpenIsle backend service.",
|
||||
)
|
||||
host: str = Field(
|
||||
"0.0.0.0",
|
||||
description="Host interface to bind when running with HTTP transports.",
|
||||
)
|
||||
port: int = Field(
|
||||
8085,
|
||||
ge=1,
|
||||
le=65535,
|
||||
description="TCP port for HTTP transports.",
|
||||
)
|
||||
transport: Literal["stdio", "sse", "streamable-http"] = Field(
|
||||
"streamable-http",
|
||||
description="MCP transport to use when running the server.",
|
||||
)
|
||||
request_timeout: float = Field(
|
||||
10.0,
|
||||
gt=0,
|
||||
description="Timeout (seconds) for backend search requests.",
|
||||
)
|
||||
|
||||
model_config = SettingsConfigDict(
|
||||
env_prefix="OPENISLE_MCP_",
|
||||
env_file=".env",
|
||||
env_file_encoding="utf-8",
|
||||
case_sensitive=False,
|
||||
)
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_settings() -> Settings:
|
||||
"""Return cached application settings."""
|
||||
|
||||
return Settings()
|
||||
55
mcp/src/openisle_mcp/schemas.py
Normal file
55
mcp/src/openisle_mcp/schemas.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""Pydantic models describing tool inputs and outputs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, Field, ConfigDict
|
||||
|
||||
|
||||
class SearchResultItem(BaseModel):
|
||||
"""A single search result entry."""
|
||||
|
||||
type: str = Field(description="Entity type for the result (post, user, tag, etc.).")
|
||||
id: Optional[int] = Field(default=None, description="Identifier of the matched entity.")
|
||||
text: Optional[str] = Field(default=None, description="Primary text associated with the result.")
|
||||
sub_text: Optional[str] = Field(
|
||||
default=None,
|
||||
alias="subText",
|
||||
description="Secondary text, e.g. a username or excerpt.",
|
||||
)
|
||||
extra: Optional[str] = Field(default=None, description="Additional contextual information.")
|
||||
post_id: Optional[int] = Field(
|
||||
default=None,
|
||||
alias="postId",
|
||||
description="Associated post identifier when relevant.",
|
||||
)
|
||||
highlighted_text: Optional[str] = Field(
|
||||
default=None,
|
||||
alias="highlightedText",
|
||||
description="Highlighted snippet of the primary text if available.",
|
||||
)
|
||||
highlighted_sub_text: Optional[str] = Field(
|
||||
default=None,
|
||||
alias="highlightedSubText",
|
||||
description="Highlighted snippet of the secondary text if available.",
|
||||
)
|
||||
highlighted_extra: Optional[str] = Field(
|
||||
default=None,
|
||||
alias="highlightedExtra",
|
||||
description="Highlighted snippet of extra information if available.",
|
||||
)
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
|
||||
class SearchResponse(BaseModel):
|
||||
"""Structured response returned by the search tool."""
|
||||
|
||||
keyword: str = Field(description="The keyword that was searched.")
|
||||
total: int = Field(description="Total number of matches returned by the backend.")
|
||||
results: list[SearchResultItem] = Field(
|
||||
default_factory=list,
|
||||
description="Ordered collection of search results.",
|
||||
)
|
||||
|
||||
51
mcp/src/openisle_mcp/search_client.py
Normal file
51
mcp/src/openisle_mcp/search_client.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""HTTP client helpers for talking to the OpenIsle backend search endpoints."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
class SearchClient:
|
||||
"""Client for calling the OpenIsle search API."""
|
||||
|
||||
def __init__(self, base_url: str, *, timeout: float = 10.0) -> None:
|
||||
self._base_url = base_url.rstrip("/")
|
||||
self._timeout = timeout
|
||||
self._client: httpx.AsyncClient | None = None
|
||||
|
||||
def _get_client(self) -> httpx.AsyncClient:
|
||||
if self._client is None:
|
||||
self._client = httpx.AsyncClient(base_url=self._base_url, timeout=self._timeout)
|
||||
return self._client
|
||||
|
||||
async def global_search(self, keyword: str) -> list[dict[str, Any]]:
|
||||
"""Call the global search endpoint and return the parsed JSON payload."""
|
||||
|
||||
client = self._get_client()
|
||||
response = await client.get(
|
||||
"/api/search/global",
|
||||
params={"keyword": keyword},
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
if not isinstance(payload, list):
|
||||
formatted = json.dumps(payload, ensure_ascii=False)[:200]
|
||||
raise ValueError(f"Unexpected response format from search endpoint: {formatted}")
|
||||
return [self._validate_entry(entry) for entry in payload]
|
||||
|
||||
async def aclose(self) -> None:
|
||||
"""Dispose of the underlying HTTP client."""
|
||||
|
||||
if self._client is not None:
|
||||
await self._client.aclose()
|
||||
self._client = None
|
||||
|
||||
@staticmethod
|
||||
def _validate_entry(entry: Any) -> dict[str, Any]:
|
||||
if not isinstance(entry, dict):
|
||||
raise ValueError(f"Search entry must be an object, got: {type(entry)!r}")
|
||||
return entry
|
||||
98
mcp/src/openisle_mcp/server.py
Normal file
98
mcp/src/openisle_mcp/server.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""Entry point for running the OpenIsle MCP server."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Annotated
|
||||
|
||||
import httpx
|
||||
from mcp.server.fastmcp import Context, FastMCP
|
||||
from pydantic import ValidationError
|
||||
from pydantic import Field as PydanticField
|
||||
|
||||
from .config import get_settings
|
||||
from .schemas import SearchResponse, SearchResultItem
|
||||
from .search_client import SearchClient
|
||||
|
||||
settings = get_settings()
|
||||
search_client = SearchClient(
|
||||
str(settings.backend_base_url), timeout=settings.request_timeout
|
||||
)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_: FastMCP):
|
||||
"""Lifecycle hook that disposes shared resources when the server stops."""
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
await search_client.aclose()
|
||||
|
||||
|
||||
app = FastMCP(
|
||||
name="openisle-mcp",
|
||||
instructions=(
|
||||
"Use this server to search OpenIsle posts, users, tags, categories, and comments "
|
||||
"via the global search endpoint."
|
||||
),
|
||||
host=settings.host,
|
||||
port=settings.port,
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
|
||||
@app.tool(
|
||||
name="search",
|
||||
description="Perform a global search across OpenIsle resources.",
|
||||
structured_output=True,
|
||||
)
|
||||
async def search(
|
||||
keyword: Annotated[str, PydanticField(description="Keyword to search for.")],
|
||||
ctx: Context | None = None,
|
||||
) -> SearchResponse:
|
||||
"""Call the OpenIsle global search endpoint and return structured results."""
|
||||
|
||||
sanitized = keyword.strip()
|
||||
if not sanitized:
|
||||
raise ValueError("Keyword must not be empty.")
|
||||
|
||||
try:
|
||||
raw_results = await search_client.global_search(sanitized)
|
||||
except httpx.HTTPStatusError as exc: # pragma: no cover - network errors
|
||||
message = (
|
||||
"OpenIsle backend returned HTTP "
|
||||
f"{exc.response.status_code} while searching for '{sanitized}'."
|
||||
)
|
||||
if ctx is not None:
|
||||
await ctx.error(message)
|
||||
raise ValueError(message) from exc
|
||||
except httpx.RequestError as exc: # pragma: no cover - network errors
|
||||
message = f"Unable to reach OpenIsle backend search service: {exc}."
|
||||
if ctx is not None:
|
||||
await ctx.error(message)
|
||||
raise ValueError(message) from exc
|
||||
|
||||
try:
|
||||
results = [SearchResultItem.model_validate(entry) for entry in raw_results]
|
||||
except ValidationError as exc:
|
||||
message = "Received malformed data from the OpenIsle backend search endpoint."
|
||||
if ctx is not None:
|
||||
await ctx.error(message)
|
||||
raise ValueError(message) from exc
|
||||
|
||||
if ctx is not None:
|
||||
await ctx.info(f"Search keyword '{sanitized}' returned {len(results)} results.")
|
||||
|
||||
return SearchResponse(keyword=sanitized, total=len(results), results=results)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Run the MCP server using the configured transport."""
|
||||
|
||||
app.run(transport=settings.transport)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover - manual execution
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user