feat: add MCP search server

This commit is contained in:
Tim
2025-10-25 21:58:11 +08:00
parent a24bd81942
commit bcd6a3249d
11 changed files with 637 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
"""OpenIsle MCP server package."""
from .server import main
__all__ = ["main"]

View File

@@ -0,0 +1,218 @@
"""HTTP client wrappers for interacting with the OpenIsle backend."""
from __future__ import annotations
import html
import re
from typing import Any, Iterable
import httpx
from .models import NormalizedSearchResult, SearchResponse, SearchScope
from .settings import Settings
_TAG_RE = re.compile(r"<[^>]+>")
_WHITESPACE_RE = re.compile(r"\s+")
class SearchClient:
"""High level client around the OpenIsle search API."""
_ENDPOINTS: dict[SearchScope, str] = {
SearchScope.GLOBAL: "/api/search/global",
SearchScope.POSTS: "/api/search/posts",
SearchScope.POSTS_CONTENT: "/api/search/posts/content",
SearchScope.POSTS_TITLE: "/api/search/posts/title",
SearchScope.USERS: "/api/search/users",
}
def __init__(self, settings: Settings) -> None:
self._base_url = settings.sanitized_base_url()
self._timeout = settings.request_timeout
self._default_limit = settings.default_limit
self._snippet_length = settings.snippet_length
self._client = httpx.AsyncClient(
base_url=self._base_url,
timeout=self._timeout,
headers={"Accept": "application/json"},
)
async def aclose(self) -> None:
await self._client.aclose()
def endpoint_path(self, scope: SearchScope) -> str:
return self._ENDPOINTS[scope]
def endpoint_url(self, scope: SearchScope) -> str:
return f"{self._base_url}{self.endpoint_path(scope)}"
async def search(
self,
keyword: str,
scope: SearchScope,
*,
limit: int | None = None,
) -> SearchResponse:
"""Execute a search request and normalise the results."""
keyword = keyword.strip()
effective_limit = self._resolve_limit(limit)
if not keyword:
return SearchResponse(
keyword=keyword,
scope=scope,
endpoint=self.endpoint_url(scope),
limit=effective_limit,
total_results=0,
returned_results=0,
normalized=[],
raw=[],
)
response = await self._client.get(
self.endpoint_path(scope),
params={"keyword": keyword},
)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, list): # pragma: no cover - defensive programming
raise ValueError("Search API did not return a JSON array")
total_results = len(payload)
items = payload if effective_limit is None else payload[:effective_limit]
normalized = [self._normalise_item(scope, item) for item in items]
return SearchResponse(
keyword=keyword,
scope=scope,
endpoint=self.endpoint_url(scope),
limit=effective_limit,
total_results=total_results,
returned_results=len(items),
normalized=normalized,
raw=items,
)
def _resolve_limit(self, requested: int | None) -> int | None:
value = requested if requested is not None else self._default_limit
if value is None:
return None
if value <= 0:
return None
return value
def _normalise_item(
self,
scope: SearchScope,
item: Any,
) -> NormalizedSearchResult:
"""Normalise raw API objects into a consistent structure."""
if not isinstance(item, dict): # pragma: no cover - defensive programming
return NormalizedSearchResult(type=scope.value, metadata={"raw": item})
if scope == SearchScope.GLOBAL:
return self._normalise_global(item)
if scope in {SearchScope.POSTS, SearchScope.POSTS_CONTENT, SearchScope.POSTS_TITLE}:
return self._normalise_post(item)
if scope == SearchScope.USERS:
return self._normalise_user(item)
return NormalizedSearchResult(type=scope.value, metadata=item)
def _normalise_global(self, item: dict[str, Any]) -> NormalizedSearchResult:
highlights = {
"title": item.get("highlightedText"),
"subtitle": item.get("highlightedSubText"),
"snippet": item.get("highlightedExtra"),
}
snippet_source = highlights.get("snippet") or item.get("extra")
metadata = {
"postId": item.get("postId"),
"highlights": {k: v for k, v in highlights.items() if v},
}
return NormalizedSearchResult(
type=str(item.get("type", "result")),
id=_safe_int(item.get("id")),
title=highlights.get("title") or _safe_str(item.get("text")),
subtitle=highlights.get("subtitle") or _safe_str(item.get("subText")),
snippet=self._snippet(snippet_source),
metadata={k: v for k, v in metadata.items() if v not in (None, {}, [])},
)
def _normalise_post(self, item: dict[str, Any]) -> NormalizedSearchResult:
author = _safe_dict(item.get("author"))
category = _safe_dict(item.get("category"))
tags = [tag.get("name") for tag in _safe_iter(item.get("tags")) if isinstance(tag, dict)]
metadata = {
"author": author.get("username"),
"category": category.get("name"),
"tags": tags,
"views": item.get("views"),
"commentCount": item.get("commentCount"),
"status": item.get("status"),
"apiUrl": f"{self._base_url}/api/posts/{item.get('id')}" if item.get("id") else None,
}
return NormalizedSearchResult(
type="post",
id=_safe_int(item.get("id")),
title=_safe_str(item.get("title")),
subtitle=_safe_str(category.get("name")),
snippet=self._snippet(item.get("content")),
metadata={k: v for k, v in metadata.items() if v not in (None, [], {})},
)
def _normalise_user(self, item: dict[str, Any]) -> NormalizedSearchResult:
metadata = {
"followers": item.get("followers"),
"following": item.get("following"),
"totalViews": item.get("totalViews"),
"role": item.get("role"),
"subscribed": item.get("subscribed"),
"apiUrl": f"{self._base_url}/api/users/{item.get('id')}" if item.get("id") else None,
}
return NormalizedSearchResult(
type="user",
id=_safe_int(item.get("id")),
title=_safe_str(item.get("username")),
subtitle=_safe_str(item.get("email") or item.get("role")),
snippet=self._snippet(item.get("introduction")),
metadata={k: v for k, v in metadata.items() if v not in (None, [], {})},
)
def _snippet(self, value: Any) -> str | None:
text = _safe_str(value)
if not text:
return None
text = html.unescape(text)
text = _TAG_RE.sub(" ", text)
text = _WHITESPACE_RE.sub(" ", text).strip()
if not text:
return None
if len(text) <= self._snippet_length:
return text
return text[: self._snippet_length - 1].rstrip() + ""
def _safe_int(value: Any) -> int | None:
try:
return int(value)
except (TypeError, ValueError): # pragma: no cover - defensive
return None
def _safe_str(value: Any) -> str | None:
if value is None:
return None
text = str(value).strip()
return text or None
def _safe_dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _safe_iter(value: Any) -> Iterable[Any]:
if isinstance(value, list | tuple | set):
return value
return []

View File

@@ -0,0 +1,71 @@
"""Shared models for the OpenIsle MCP server."""
from __future__ import annotations
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class SearchScope(str, Enum):
"""Supported search endpoints."""
GLOBAL = "global"
POSTS = "posts"
POSTS_CONTENT = "posts_content"
POSTS_TITLE = "posts_title"
USERS = "users"
def __str__(self) -> str: # pragma: no cover - convenience for logging
return self.value
class NormalizedSearchResult(BaseModel):
"""Compact structure returned by the MCP search tool."""
type: str = Field(description="Entity type, e.g. user, post, comment.")
id: int | None = Field(default=None, description="Primary identifier of the entity.")
title: str | None = Field(default=None, description="Display title for the result.")
subtitle: str | None = Field(default=None, description="Secondary line of context.")
snippet: str | None = Field(default=None, description="Short summary of the result.")
metadata: dict[str, Any] = Field(
default_factory=dict,
description="Additional attributes extracted from the API response.",
)
model_config = {
"extra": "ignore",
}
class SearchResponse(BaseModel):
"""Payload returned to MCP clients."""
keyword: str
scope: SearchScope
endpoint: str
limit: int | None = Field(
default=None,
description="Result limit applied to the request. None means unlimited.",
)
total_results: int = Field(
default=0,
description="Total number of items returned by the OpenIsle API before limiting.",
)
returned_results: int = Field(
default=0,
description="Number of items returned to the MCP client after limiting.",
)
normalized: list[NormalizedSearchResult] = Field(
default_factory=list,
description="Normalised representation of each search hit.",
)
raw: list[Any] = Field(
default_factory=list,
description="Raw response objects from the OpenIsle REST API.",
)
model_config = {
"extra": "ignore",
}

View File

@@ -0,0 +1,95 @@
"""Entrypoint for the OpenIsle MCP server."""
from __future__ import annotations
import os
from contextlib import asynccontextmanager
from typing import Any
import httpx
from fastmcp import Context, FastMCP
from .client import SearchClient
from .models import SearchResponse, SearchScope
from .settings import Settings
__all__ = ["main"]
def _create_lifespan(settings: Settings):
@asynccontextmanager
async def lifespan(app: FastMCP):
client = SearchClient(settings)
setattr(app, "_search_client", client)
try:
yield {"client": client}
finally:
await client.aclose()
if hasattr(app, "_search_client"):
delattr(app, "_search_client")
return lifespan
_settings = Settings.from_env()
mcp = FastMCP(
name="OpenIsle Search",
version="0.1.0",
instructions=(
"Provides access to OpenIsle search endpoints for retrieving users, posts, "
"comments, tags, and categories."
),
lifespan=_create_lifespan(_settings),
)
@mcp.tool("search")
async def search(
keyword: str,
scope: SearchScope = SearchScope.GLOBAL,
limit: int | None = None,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Perform a search against the OpenIsle backend."""
client = _resolve_client(ctx)
try:
response: SearchResponse = await client.search(keyword=keyword, scope=scope, limit=limit)
except httpx.HTTPError as exc:
message = f"OpenIsle search request failed: {exc}".rstrip()
raise RuntimeError(message) from exc
payload = response.model_dump()
payload["transport"] = {
"scope": scope.value,
"endpoint": client.endpoint_url(scope),
}
return payload
def _resolve_client(ctx: Context | None) -> SearchClient:
app = ctx.fastmcp if ctx is not None else mcp
client = getattr(app, "_search_client", None)
if client is None:
raise RuntimeError("Search client is not initialised; lifespan hook not executed")
return client
def main() -> None:
"""CLI entrypoint."""
transport = os.getenv("OPENISLE_MCP_TRANSPORT", "stdio").strip().lower()
show_banner = os.getenv("OPENISLE_MCP_SHOW_BANNER", "true").lower() in {"1", "true", "yes"}
run_kwargs: dict[str, Any] = {"show_banner": show_banner}
if transport in {"http", "sse", "streamable-http"}:
host = os.getenv("OPENISLE_MCP_HOST", "127.0.0.1")
port = int(os.getenv("OPENISLE_MCP_PORT", "8974"))
run_kwargs.update({"host": host, "port": port})
mcp.run(transport=transport, **run_kwargs)
if __name__ == "__main__": # pragma: no cover - manual execution guard
main()

View File

@@ -0,0 +1,102 @@
"""Environment configuration for the MCP server."""
from __future__ import annotations
import os
from typing import Any
from pydantic import BaseModel, Field, ValidationError, field_validator
class Settings(BaseModel):
"""Runtime configuration sourced from environment variables."""
api_base_url: str = Field(
default="http://springboot:8080",
description="Base URL of the OpenIsle backend REST API.",
)
request_timeout: float = Field(
default=10.0,
description="Timeout in seconds for outgoing HTTP requests.",
ge=0.1,
)
default_limit: int = Field(
default=20,
description="Default maximum number of results returned by the search tool.",
)
snippet_length: int = Field(
default=160,
description="Maximum length for the normalised snippet field.",
ge=40,
)
model_config = {
"extra": "ignore",
"validate_assignment": True,
}
@field_validator("api_base_url", mode="before")
@classmethod
def _strip_trailing_slash(cls, value: Any) -> Any:
if isinstance(value, str):
value = value.strip()
if value.endswith("/"):
return value.rstrip("/")
return value
@field_validator("default_limit", mode="before")
@classmethod
def _parse_default_limit(cls, value: Any) -> Any:
if isinstance(value, str) and value.strip():
try:
return int(value)
except ValueError as exc: # pragma: no cover - defensive
raise ValueError("default_limit must be an integer") from exc
return value
@field_validator("snippet_length", mode="before")
@classmethod
def _parse_snippet_length(cls, value: Any) -> Any:
if isinstance(value, str) and value.strip():
try:
return int(value)
except ValueError as exc: # pragma: no cover - defensive
raise ValueError("snippet_length must be an integer") from exc
return value
@field_validator("request_timeout", mode="before")
@classmethod
def _parse_timeout(cls, value: Any) -> Any:
if isinstance(value, str) and value.strip():
try:
return float(value)
except ValueError as exc: # pragma: no cover - defensive
raise ValueError("request_timeout must be a number") from exc
return value
@classmethod
def from_env(cls) -> "Settings":
"""Build a settings object using environment variables."""
data: dict[str, Any] = {}
mapping = {
"api_base_url": "OPENISLE_API_BASE_URL",
"request_timeout": "OPENISLE_API_TIMEOUT",
"default_limit": "OPENISLE_MCP_DEFAULT_LIMIT",
"snippet_length": "OPENISLE_MCP_SNIPPET_LENGTH",
}
for field, env_key in mapping.items():
value = os.getenv(env_key)
if value is not None and value != "":
data[field] = value
try:
return cls.model_validate(data)
except ValidationError as exc: # pragma: no cover - validation errors surface early
raise ValueError(
"Invalid MCP settings derived from environment variables"
) from exc
def sanitized_base_url(self) -> str:
"""Return the API base URL without trailing slashes."""
return self.api_base_url.rstrip("/")