import os import sys import asyncio from typing import Tuple, Any from mcp.client.streamable_http import streamablehttp_client from mcp.client.session import ClientSession SERVER_URL: str = os.environ.get("MCP_SERVER_URL", "http://127.0.0.1:8010/mcp") TOKEN: str = os.environ.get("MCP_SERVER_TOKEN", "") async def _extract_ping_result(res: Any) -> Any: """ Extract a sensible value from various CallToolResult shapes returned by the SDK. Handles: - objects with 'structuredContent' (dict) -> use 'result' or first value - objects with 'output' attribute - objects with 'content' list containing a TextContent with .text - plain scalars """ # structuredContent is common in newer SDK responses if hasattr(res, "structuredContent") and isinstance(res.structuredContent, dict): # prefer a 'result' key if "result" in res.structuredContent: return res.structuredContent["result"] # fallback to any first value for v in res.structuredContent.values(): return v # older / alternative shape: an 'output' attribute if hasattr(res, "output"): return res.output # textual content list (TextContent objects) if hasattr(res, "content") and isinstance(res.content, list) and res.content: first = res.content[0] # Some SDK TextContent exposes 'text' if hasattr(first, "text"): return first.text # fallback to stringifying the object return str(first) # final fallback: try direct indexing or string conversion try: return res["result"] except Exception: return str(res) async def check_ping(url: str, token: str) -> Tuple[bool, str]: """ Connect to the MCP server at `url` and call the 'ping' tool. Returns: (ok, message) where ok is True if ping returned "ok", otherwise False. """ headers = {"Authorization": f"Bearer {token}"} if token else {} try: async with streamablehttp_client(url=url, headers=headers) as (read_stream, write_stream, _): async with ClientSession(read_stream, write_stream) as session: await session.initialize() res = await session.call_tool("ping", arguments={}) output = await _extract_ping_result(res) # robust extractor if output == "ok": return True, "ping -> ok" return False, f"unexpected ping response: {output!r}" except Exception as e: return False, f"error connecting/calling ping: {e!r}" def main() -> None: ok, msg = asyncio.run(check_ping(SERVER_URL, TOKEN)) print(msg) if not ok: sys.exit(1) if __name__ == "__main__": main()