MCP的stdio和SSE通信方式使用案例

一、内容概述

        这是一个基于 OpenAI API 的多服务器工具调用系统,主要功能包括:

1. **多服务器工具集成**
- 支持同时连接多个 MCP (Model-Controller-Provider) 服务器
- 目前集成了三个服务器:
  - 天气服务 (weather)
  - Python 执行服务 (PythonServer)
  - SQL 查询服务 (SQLServer)

2. **两种运行模式**
- SSE (Server-Sent Events) 模式:通过 <mcfile name="client_sse.py" 
- 标准 I/O 模式:通过 <mcfile name="client_stdio.py" 

3. **基于 Function Calling 的工具调用**
- 使用 OpenAI 的最新 Function Calling 功能
- 系统会自动:
  1. 收集各服务器提供的工具信息
  2. 将工具信息转换为 OpenAI Function Calling 格式
  3. 根据用户输入智能选择合适的工具
  4. 执行工具调用并返回结果

4. **主要依赖**
- OpenAI API 用于 LLM 调用
- httpx 用于 HTTP 客户端
- pandas 用于数据处理
- pymysql 用于数据库操作

        一个典型的 AI Agent 系统,通过 LLM 的能力来智能调用不同服务器提供的各种工具,实现更复杂的任务处理能力。作为一个模板样例便于以后学习。

        主要文件如下所示:

二、依赖库

        即requirements.txt

annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
cryptography==44.0.2
distro==1.9.0
exceptiongroup==1.2.2
h11==0.14.0
httpcore==1.0.8
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
jiter==0.9.0
mcp==1.6.0
numpy==2.2.4
openai==1.74.0
pandas==2.2.3
pycparser==2.22
pydantic==2.11.3
pydantic-core==2.33.1
pydantic-settings==2.8.1
pymysql==1.1.1
python-dateutil==2.9.0.post0
python-dotenv==1.1.0
pytz==2025.2
requests==2.32.3
six==1.17.0
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.2
tqdm==4.67.1
typing-extensions==4.13.2
typing-inspection==0.4.0
tzdata==2025.2
urllib3==2.4.0
uvicorn==0.34.1

三、主要代码

.env用于存放LLM的API密钥

BASE_URL=http://192.168.0.10:11434/v1/
MODEL=qwq:32b-fp16  # qwq:32b-fp16  qwen2.5-coder:14b
OPENAI_API_KEY=ollama

# BASE_URL=https://api.siliconflow.cn/v1/
# MODEL=deepseek-ai/DeepSeek-R1-Distill-Qwen-7B
# OPENAI_API_KEY=sk-usbsjapwulwdmlpjseihldovpiyccpkadasylecgtriwsuwj

client_sse.py

import asyncio
import os
import json
from typing import Optional, Dict
from contextlib import AsyncExitStack

from openai import OpenAI
from dotenv import load_dotenv

from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client

load_dotenv()

class MultiServerMCPClient:
    def __init__(self):
        """管理多个 MCP 服务器的客户端"""
        self.exit_stack = AsyncExitStack()
        self.openai_api_key = os.getenv("OPENAI_API_KEY")  
        self.base_url = os.getenv("BASE_URL")  
        self.model = os.getenv("MODEL")  
        if not self.openai_api_key:
            raise ValueError("❌ 未找到 OPENAI_API_KEY,请在 .env 文件中配置")

        # 初始化 OpenAI Client
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
        
        # 存储 (server_name -> MCP ClientSession) 映射
        self.sessions: Dict[str, ClientSession] = {}
        # 存储工具信息
        self.tools_by_session: Dict[str, list] = {}  # 每个 session 的 tools 列表
        self.all_tools = []  # 合并所有工具的列表

    async def connect_to_servers(self, servers: dict):
        """
        同时启动多个服务器并获取工具
        servers: 形如 {"weather": "weather_server.py", "rag": "rag_server.py"}
        """
        for server_name, script_path in servers.items():
            session = await self._start_one_server(script_path)
            self.sessions[server_name] = session
            
            # 列出此服务器的工具
            resp = await session.list_tools()
            self.tools_by_session[server_name] = resp.tools  # 保存到 self.tools_by_session

            for tool in resp.tools:
                # OpenAI Function Calling 格式修正
                function_name = f"{server_name}_{tool.name}"
                # print(tool.name)
                self.all_tools.append({
                    "type": "function",
                    "function": {
                        "name": function_name,
                        "description": tool.description,
                        "input_schema": tool.inputSchema
                    }
                })
         
        
        # 转化function calling格式
        self.all_tools = await self.transform_json(self.all_tools)
        # print(self.all_tools)

        print("\n✅ 已连接到下列服务器:")
        for name in servers:
            print(f"  - {name}: {servers[name]}")
        print("\n汇总的工具:")
        
        for t in self.all_tools:
            print(f"  - {t['function']['name']}")

    async def transform_json(self, json2_data):
        """
        将类似 json2 的格式转换为类似 json1 的格式,多余字段会被直接删除。
        
        :param json2_data: 一个可被解释为列表的 Python 对象(或已解析的 JSON 数据)
        :return: 转换后的新列表
        """
        result = []
        
        for item in json2_data:
            # 确保有 "type" 和 "function" 两个关键字段
            if not isinstance(item, dict) or "type" not in item or "function" not in item:
                continue
        
            old_func = item["function"]
        
            # 确保 function 下有我们需要的关键子字段
            if not isinstance(old_func, dict) or "name" not in old_func or "description" not in old_func:
                continue
        
            # 处理新 function 字段
            new_func = {
                "name": old_func["name"],
                "description": old_func["description"],
                "parameters": {}
            }
        
            # 读取 input_schema 并转成 parameters
            if "input_schema" in old_func and isinstance(old_func["input_schema"], dict):
                old_schema = old_func["input_schema"]
                
                # 新的 parameters 保留 type, properties, required 这三个字段
                new_func["parameters"]["type"] = old_schema.get("type", "object")
                new_func["parameters"]["properties"] = old_schema.get("properties", {})
                new_func["parameters"]["required"] = old_schema.get("required", [])
            
            new_item = {
                "type": item["type"],
                "function": new_func
            }
        
            result.append(new_item)
    
        return result            

    async def _start_one_server(self, server_url: str) -> ClientSession:
        """启动单个 MCP 服务器子进程,并返回 ClientSession"""
        # 创建 SSE 客户端连接上下文管理器
        sse_transport = await self.exit_stack.enter_async_context(sse_client(url=server_url))
        # 异步初始化 SSE 连接,获取数据流对象
        read_stream, write_stream = sse_transport

        session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
        await session.initialize()
        return session


    async def chat_base(self, messages: list) -> list:
    
        # messages = [{"role": "user", "content": query}]
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=self.all_tools
        )
        if response.choices[0].finish_reason == "tool_calls":

            max_number = 10  # 防止无限循环调用工具
            while max_number:
                messages = await self.create_function_response_messages(messages, response)
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=messages,
                    tools=self.all_tools
                )
                if response.choices[0].finish_reason != "tool_calls":
                    break
                
                max_number = max_number - 1
                    
        # return response.choices[0].message.content
        return response
        
    async def create_function_response_messages(self, messages, response):
        function_call_messages = response.choices[0].message.tool_calls
        messages.append(response.choices[0].message.model_dump())
        
        for function_call_message in function_call_messages:
            tool_name = function_call_message.function.name
            tool_args = json.loads(function_call_message.function.arguments)
        
            # 运行外部函数
            function_response = await self._call_mcp_tool(tool_name, tool_args)

            # 拼接消息队列
            messages.append(
                {
                    "role": "tool",
                    "content": function_response,
                    "tool_call_id": function_call_message.id,
                }
            )
        return messages  

    async def process_query(self, user_query: str) -> str:
        """
        OpenAI 最新 Function Calling 逻辑:
         1. 发送用户消息 + tools 信息
         2. 若模型 `finish_reason == "tool_calls"`,则解析 toolCalls 并执行相应 MCP 工具
         3. 把调用结果返回给 OpenAI,让模型生成最终回答
        """
        messages = [{"role": "user", "content": user_query}]

        # 第一次请求
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=self.all_tools
        )
        content = response.choices[0]
        print(content)
        print(self.all_tools)

        # 如果模型调用了 MCP 工具
        if content.finish_reason == "tool_calls":
            # 解析 tool_calls
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name  # 形如 "weather_query_weather"
            tool_args = json.loads(tool_call.function.arguments)

            print(f"\n[ 调用工具: {tool_name}, 参数: {tool_args} ]\n")

            # 执行MCP工具
            result = await self._call_mcp_tool(tool_name, tool_args)

            # 把工具调用历史写进 messages
            messages.append(content.message.model_dump())
            messages.append({
                "role": "tool",
                "content": result,
                "tool_call_id": tool_call.id,
            })
            # 第二次请求,让模型整合工具结果,生成最终回答
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages
            )
            return response.choices[0].message.content

        # 如果模型没调用工具,直接返回回答
        return content.message.content

    async def _call_mcp_tool(self, tool_full_name: str, tool_args: dict) -> str:
        """
        根据 "serverName_toolName" 调用相应的服务器工具
        """
        parts = tool_full_name.split("_", 1)  # 拆分 "weather_query_weather" -> ["weather", "query_weather"]
        if len(parts) != 2:
            return f"无效的工具名称: {tool_full_name}"

        server_name, tool_name = parts
        session = self.sessions.get(server_name)
        if not session:
            return f"找不到服务器: {server_name}"
        
        # 执行 MCP 工具
        resp = await session.call_tool(tool_name, tool_args)
        print(resp)
        return resp.content if resp.content else "工具执行无输出"

    async def chat_loop(self):
        print("\n🤖 多服务器 MCP + 最新 Function Calling 客户端已启动!输入 'quit' 退出。")
        messages = []

        while True:
            query = input("\n你: ").strip()
            if query.lower() == "quit":
                break
            try:
                messages.append({"role": "user", "content": query})
                messages = messages[-20: ]
                # print(messages)
                response = await self.chat_base(messages)
                messages.append(response.choices[0].message.model_dump())
                result = response.choices[0].message.content
                
                print(f"\nAI: {result}")
            except Exception as e:
                print(f"\n⚠️  调用过程出错: {e}")

    async def cleanup(self):
        # 关闭所有资源
        await self.exit_stack.aclose()

async def main():
    # 服务器配置 以SSE 方式运行 MCP 服务器
    servers = {
        "weather": "http://192.168.0.10:8001/sse",
        "PythonServer": "http://192.168.0.10:8002/sse",
        "SQLServer": "http://192.168.0.10:8003/sse"
    }

    client = MultiServerMCPClient()
    try:
        await client.connect_to_servers(servers)
        await client.chat_loop()
    finally:
        await client.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

client_stdio.py

import asyncio
import os
import json
from typing import Optional, Dict
from contextlib import AsyncExitStack

from openai import OpenAI
from dotenv import load_dotenv

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

load_dotenv()

class MultiServerMCPClient:
    def __init__(self):
        """管理多个 MCP 服务器的客户端"""
        self.exit_stack = AsyncExitStack()
        self.openai_api_key = os.getenv("OPENAI_API_KEY")  
        self.base_url = os.getenv("BASE_URL")  
        self.model = os.getenv("MODEL")  
        if not self.openai_api_key:
            raise ValueError("❌ 未找到 OPENAI_API_KEY,请在 .env 文件中配置")

        # 初始化 OpenAI Client
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
        
        # 存储 (server_name -> MCP ClientSession) 映射
        self.sessions: Dict[str, ClientSession] = {}
        # 存储工具信息
        self.tools_by_session: Dict[str, list] = {}  # 每个 session 的 tools 列表
        self.all_tools = []  # 合并所有工具的列表

    async def connect_to_servers(self, servers: dict):
        """
        同时启动多个服务器并获取工具
        servers: 形如 {"weather": "weather_server.py", "rag": "rag_server.py"}
        """
        for server_name, script_path in servers.items():
            session = await self._start_one_server(script_path)
            self.sessions[server_name] = session
            
            # 列出此服务器的工具
            resp = await session.list_tools()
            self.tools_by_session[server_name] = resp.tools  # 保存到 self.tools_by_session

            for tool in resp.tools:
                # OpenAI Function Calling 格式修正
                function_name = f"{server_name}_{tool.name}"
                # print(tool.name)
                self.all_tools.append({
                    "type": "function",
                    "function": {
                        "name": function_name,
                        "description": tool.description,
                        "input_schema": tool.inputSchema
                    }
                })
         
        
        # 转化function calling格式
        self.all_tools = await self.transform_json(self.all_tools)
        # print(self.all_tools)

        print("\n✅ 已连接到下列服务器:")
        for name in servers:
            print(f"  - {name}: {servers[name]}")
        print("\n汇总的工具:")
        
        for t in self.all_tools:
            print(f"  - {t['function']['name']}")

    async def transform_json(self, json2_data):
        """
        将类似 json2 的格式转换为类似 json1 的格式,多余字段会被直接删除。
        
        :param json2_data: 一个可被解释为列表的 Python 对象(或已解析的 JSON 数据)
        :return: 转换后的新列表
        """
        result = []
        
        for item in json2_data:
            # 确保有 "type" 和 "function" 两个关键字段
            if not isinstance(item, dict) or "type" not in item or "function" not in item:
                continue
        
            old_func = item["function"]
        
            # 确保 function 下有我们需要的关键子字段
            if not isinstance(old_func, dict) or "name" not in old_func or "description" not in old_func:
                continue
        
            # 处理新 function 字段
            new_func = {
                "name": old_func["name"],
                "description": old_func["description"],
                "parameters": {}
            }
        
            # 读取 input_schema 并转成 parameters
            if "input_schema" in old_func and isinstance(old_func["input_schema"], dict):
                old_schema = old_func["input_schema"]
                
                # 新的 parameters 保留 type, properties, required 这三个字段
                new_func["parameters"]["type"] = old_schema.get("type", "object")
                new_func["parameters"]["properties"] = old_schema.get("properties", {})
                new_func["parameters"]["required"] = old_schema.get("required", [])
            
            new_item = {
                "type": item["type"],
                "function": new_func
            }
        
            result.append(new_item)
    
        return result            

    async def _start_one_server(self, script_path: str) -> ClientSession:
        """启动单个 MCP 服务器子进程,并返回 ClientSession"""
        is_python = script_path.endswith(".py")
        is_js = script_path.endswith(".js")
        if not (is_python or is_js):
            raise ValueError("服务器脚本必须是 .py 或 .js 文件")

        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[script_path],
            env=None
        )
        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        read_stream, write_stream = stdio_transport
        session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
        await session.initialize()
        return session


    async def chat_base(self, messages: list) -> list:
    
        # messages = [{"role": "user", "content": query}]
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=self.all_tools
        )
        if response.choices[0].finish_reason == "tool_calls":

            max_number = 10  # 防止无限循环调用工具
            while max_number:
                messages = await self.create_function_response_messages(messages, response)
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=messages,
                    tools=self.all_tools
                )
                if response.choices[0].finish_reason != "tool_calls":
                    break
                
                max_number = max_number - 1
                    
        # return response.choices[0].message.content
        return response
        
    async def create_function_response_messages(self, messages, response):
        function_call_messages = response.choices[0].message.tool_calls
        messages.append(response.choices[0].message.model_dump())
        
        for function_call_message in function_call_messages:
            tool_name = function_call_message.function.name
            tool_args = json.loads(function_call_message.function.arguments)
        
            # 运行外部函数
            function_response = await self._call_mcp_tool(tool_name, tool_args)

            # 拼接消息队列
            messages.append(
                {
                    "role": "tool",
                    "content": function_response,
                    "tool_call_id": function_call_message.id,
                }
            )
        return messages  

    async def process_query(self, user_query: str) -> str:
        """
        OpenAI 最新 Function Calling 逻辑:
         1. 发送用户消息 + tools 信息
         2. 若模型 `finish_reason == "tool_calls"`,则解析 toolCalls 并执行相应 MCP 工具
         3. 把调用结果返回给 OpenAI,让模型生成最终回答
        """
        messages = [{"role": "user", "content": user_query}]

        # 第一次请求
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=self.all_tools
        )
        content = response.choices[0]
        print(content)
        print(self.all_tools)

        # 如果模型调用了 MCP 工具
        if content.finish_reason == "tool_calls":
            # 解析 tool_calls
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name  # 形如 "weather_query_weather"
            tool_args = json.loads(tool_call.function.arguments)

            print(f"\n[ 调用工具: {tool_name}, 参数: {tool_args} ]\n")

            # 执行MCP工具
            result = await self._call_mcp_tool(tool_name, tool_args)

            # 把工具调用历史写进 messages
            messages.append(content.message.model_dump())
            messages.append({
                "role": "tool",
                "content": result,
                "tool_call_id": tool_call.id,
            })
            # 第二次请求,让模型整合工具结果,生成最终回答
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages
            )
            return response.choices[0].message.content

        # 如果模型没调用工具,直接返回回答
        return content.message.content

    async def _call_mcp_tool(self, tool_full_name: str, tool_args: dict) -> str:
        """
        根据 "serverName_toolName" 调用相应的服务器工具
        """
        parts = tool_full_name.split("_", 1)  # 拆分 "weather_query_weather" -> ["weather", "query_weather"]
        if len(parts) != 2:
            return f"无效的工具名称: {tool_full_name}"

        server_name, tool_name = parts
        session = self.sessions.get(server_name)
        if not session:
            return f"找不到服务器: {server_name}"
        
        # 执行 MCP 工具
        resp = await session.call_tool(tool_name, tool_args)
        print(resp)
        return resp.content if resp.content else "工具执行无输出"

    async def chat_loop(self):
        print("\n🤖 多服务器 MCP + 最新 Function Calling 客户端已启动!输入 'quit' 退出。")
        messages = []

        while True:
            query = input("\n你: ").strip()
            if query.lower() == "quit":
                break
            try:
                messages.append({"role": "user", "content": query})
                messages = messages[-20: ]
                # print(messages)
                response = await self.chat_base(messages)
                messages.append(response.choices[0].message.model_dump())
                result = response.choices[0].message.content
                
                print(f"\nAI: {result}")
            except Exception as e:
                print(f"\n⚠️  调用过程出错: {e}")

    async def cleanup(self):
        # 关闭所有资源
        await self.exit_stack.aclose()

async def main():
    # 服务器脚本 以标准 I/O 方式运行 MCP 服务器
    servers = {
        "weather": "weather_server.py",
        "SQLServer":"sql_server.py",
        "PythonServer":"python_server.py"
    }

    client = MultiServerMCPClient()
    try:
        await client.connect_to_servers(servers)
        await client.chat_loop()
    finally:
        await client.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

python_server.py

import json
from typing import Any
import csv
import numpy as np
import pandas as pd
import random
from mcp.server.fastmcp import FastMCP

# 初始化 MCP 服务器
mcp = FastMCP("PythonServer", port=8002)
USER_AGENT = "Pythonserver-app/1.0"

@mcp.tool()
async def python_inter(py_code):
    """
    运行用户提供的 Python 代码,并返回执行结果。
    
    :param py_code: 字符串形式的 Python 代码
    :return: 代码运行的最终结果
    """
    g = globals()
    
    try:
        # 若是表达式,直接运行并返回
        result = eval(py_code, g)
        return json.dumps(str(result), ensure_ascii=False)
    
    except Exception:
        global_vars_before = set(g.keys())
        try:
            exec(py_code, g)
        except Exception as e:
            return json.dumps(f"代码执行时报错: {e}", ensure_ascii=False)

        global_vars_after = set(g.keys())
        new_vars = global_vars_after - global_vars_before

        if new_vars:
            # 只返回可序列化的变量值
            safe_result = {}
            for var in new_vars:
                try:
                    json.dumps(g[var])  # 尝试序列化,确保可以转换为 JSON
                    safe_result[var] = g[var]
                except (TypeError, OverflowError):
                    safe_result[var] = str(g[var])  # 如果不能序列化,则转换为字符串

            return json.dumps(safe_result, ensure_ascii=False)
        
        else:
            return json.dumps("已经顺利执行代码", ensure_ascii=False)

if __name__ == "__main__":
    # 以标准 I/O 方式运行 MCP 服务器
    # mcp.run(transport='stdio')

    # 使用 SSE 方式运行 MCP 服务器
    mcp.run(transport='sse')

sql_server.py

import json
import httpx
from typing import Any
import pymysql
import csv
from mcp.server.fastmcp import FastMCP

# 初始化 MCP 服务器
mcp = FastMCP("SQLServer", port=8003)
USER_AGENT = "SQLserver-app/1.0"

g_host_ip = "192.168.0.10"
# g_host_ip = "localhost"
g_user = "root"
g_passwd = "123000000"
g_db = "school"

@mcp.tool()
async def sql_inter(sql_query):
    """
    查询本地MySQL数据库,通过运行一段SQL代码来进行数据库查询。\
    :param sql_query: 字符串形式的SQL查询语句,用于执行对MySQL中school数据库中各张表进行查询,并获得各表中的各类相关信息
    :return:sql_query在MySQL中的运行结果。
    """
    
    connection = pymysql.connect(
            host=g_host_ip,  # 数据库地址
            user=g_user,  # 数据库用户名
            passwd=g_passwd,  # 数据库密码
            db=g_db,  # 数据库名
            charset='utf8'  # 字符集选择utf8
        )
    
    try:
        with connection.cursor() as cursor:
            # SQL查询语句
            sql = sql_query
            cursor.execute(sql)

            # 获取查询结果
            results = cursor.fetchall()

    finally:
        connection.close()
    
    
    return json.dumps(results)

@mcp.tool()
async def export_table_to_csv(table_name, output_file):
    """
    将 MySQL 数据库中的某个表导出为 CSV 文件。
    
    :param table_name: 需要导出的表名
    :param output_file: 输出的 CSV 文件路径
    """
    # 连接 MySQL 数据库
    connection = pymysql.connect(
            host=g_host_ip,  # 数据库地址
            user=g_user,  # 数据库用户名
            passwd=g_passwd,  # 数据库密码
            db=g_db,  # 数据库名
            charset='utf8'  # 字符集选择utf8
        )

    try:
        with connection.cursor() as cursor:
            # 查询数据表的所有数据
            query = f"SELECT * FROM {table_name};"
            cursor.execute(query)

            # 获取所有列名
            column_names = [desc[0] for desc in cursor.description]

            # 获取查询结果
            rows = cursor.fetchall()

            # 将数据写入 CSV 文件
            with open(output_file, mode='w', newline='', encoding='utf-8') as file:
                writer = csv.writer(file)
                
                # 写入表头
                writer.writerow(column_names)
                
                # 写入数据
                writer.writerows(rows)

            print(f"数据表 {table_name} 已成功导出至 {output_file}")

    except Exception as e:
        print(f"导出失败: {e}")

    finally:
        connection.close()

if __name__ == "__main__":
    # 以标准 I/O 方式运行 MCP 服务器
    # mcp.run(transport='stdio')

    # 使用 SSE 方式运行 MCP 服务器
    mcp.run(transport='sse')

weather_server.py

import json
import httpx
from typing import Any, Union
from mcp.server.fastmcp import FastMCP
from datetime import datetime

import requests

# 初始化 MCP 服务器
mcp = FastMCP("WeatherServer", port=8001)


# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://key.wenwen-ai.com"
API_KEY = "sk-NYsoG3VBKDiTuvdtC969F95aFc4f45379aD3854a93602327" # 请替换为你自己的 OpenWeather API Key
USER_AGENT = "v1"

# async def fetch_weather(city: str) -> dict[str, Any] | None:
#     """
#     从 OpenWeather API 获取天气信息。
#     :param city: 城市名称(需使用英文,如 Beijing)
#     :return: 天气数据字典;若出错返回包含 error 信息的字典
#     """
#     params = {
#         "q": city,
#         "appid": API_KEY,
#         "units": "metric",
#         "lang": "zh_cn"
#     }
#     headers = {"User-Agent": USER_AGENT}
#     async with httpx.AsyncClient() as client:
#         try:
#             response = await client.get(OPENWEATHER_API_BASE, params=params,headers=headers, timeout=60.0)
#             response.raise_for_status()
#             return response.json() # 返回字典类型
#         except httpx.HTTPStatusError as e:
#             return {"error": f"HTTP 错误: {e.response.status_code}"}
#         except Exception as e:
#             return {"error": f"请求失败: {str(e)}"}


# def format_weather(data: dict[str, Any] | str) -> str:
#     def format_weather(data: Union[dict[str, Any], str]) -> str:
#     """
#     将天气数据格式化为易读文本。
#     :param data: 天气数据(可以是字典或 JSON 字符串)
#     :return: 格式化后的天气信息字符串
#     """
#     # 如果传入的是字符串,则先转换为字典
#     if isinstance(data, str):
#         try:
#             data = json.loads(data)
#         except Exception as e:
#             return f"无法解析天气数据: {e}"

#     # 如果数据中包含错误信息,直接返回错误提示
#     if "error" in data:
#         return f"⚠️ {data['error']}"

#     # 提取数据时做容错处理
#     city = data.get("place", "未知")
#     temp = data.get("temperature", "N/A")
#     humidity = data.get("humidity", "N/A")
#     wind_speed = data.get("windSpeed", "N/A")
#     description = data.get("weather1", "未知")
#     current_time = datetime.now()
#     format_time = current_time.strftime("%Y-%m-%d %H:%M:%S")

#     return (
#         f"更新时间: {format_time}\n"
#         f"🌍 {city}\n"
#         f"🌡 温度: {temp}°C\n"
#         f"💧 湿度: {humidity}%\n"
#         f"🌬 风速: {wind_speed} m/s\n"
#         f"🌤 天气: {description}\n"
#     )

# @mcp.tool()
# async def query_weather(city: str) -> str:
#     """
#     输入指定城市的英文名称,返回今日天气查询结果。
#     :param city: 城市名称(需使用英文)
#     :return: 格式化后的天气信息
#     """
#     data = await fetch_weather(city)
#     return format_weather(data)



# 国内天气server
g_id = "88888888"
g_key = "88888888"
g_base_url = "https://cn.apihz.cn/api/tianqi/tqyb.php"

async def get_weather(sheng:str, place: str) -> dict:
    """
    获取指定地区的天气信息,比如place是南昌,则推出sheng是江西,最后获取南昌的天气信息并返回

    Args:
        sheng: 省级名称(如:江西等)
        place: 地区名称(如:南昌等)

    Returns:
        dict: 天气信息字典,包含温度、天气状况等信息
    """
    # 构建请求参数
    params = {
        "id": g_id,
        "key": g_key,
        "place": place,
        "sheng": sheng
    }

    try:
        # 发送GET请求
        response = requests.get(g_base_url, params=params)
        response.raise_for_status()  # 检查请求是否成功

        # 解析JSON响应
        weather_data = response.json()

        return weather_data

    except requests.exceptions.RequestException as e:
        print(f"请求失败: {e}")
        return None
    except json.JSONDecodeError as e:
        print(f"JSON解析失败: {e}")
        return None

    
def format_weather(data: dict[str, Any] | str) -> str:
    """
    将天气数据格式化为易读文本。
    :param data: 天气数据(可以是字典或 JSON 字符串)
    :return: 格式化后的天气信息字符串
    """
    # 如果传入的是字符串,则先转换为字典
    if isinstance(data, str):
        try:
            data = json.loads(data)
        except Exception as e:
            return f"无法解析天气数据: {e}"

    # 如果数据中包含错误信息,直接返回错误提示
    if "error" in data:
        return f"⚠️ {data['error']}"

    # 提取数据时做容错处理
    city = data.get("place", "未知")
    temp = data.get("temperature", "N/A")
    humidity = data.get("humidity", "N/A")
    wind_speed = data.get("windSpeed", "N/A")
    description = data.get("weather1", "未知")
    current_time = datetime.now()
    format_time = current_time.strftime("%Y-%m-%d %H:%M:%S")

    return (
        f"更新时间: {format_time}\n"
        f"🌍 {city}\n"
        f"🌡 温度: {temp}°C\n"
        f"💧 湿度: {humidity}%\n"
        f"🌬 风速: {wind_speed} m/s\n"
        f"🌤 天气: {description}\n"
    )


@mcp.tool()
async def query_weather(place:str, sheng:str) -> dict:
    """
    获取指定地区的天气信息,比如place是南昌,则推出sheng是江西,最后获取南昌的天气信息并返回;
    如果输入的原本就是省级单位,则place为首府,比如sheng为江苏,则place为南京;
    如果输入的是直辖市,则sheng与place相同,比如place为北京,则sheng为北京。
    Args:
        sheng: 省级名称(如:江西等)
        place: 地区名称(如:南昌等)
    :return: 格式化后的天气信息
    """
    data = await get_weather(sheng, place)
    return format_weather(data)


if __name__ == "__main__":
    # 以标准 I/O 方式运行 MCP 服务器
    # mcp.run(transport='stdio')
    # print(format_weather(get_weather("江西", "南昌")))

    # 使用 SSE 方式运行 MCP 服务器
    mcp.run(transport='sse')

四、启动

        需要说明的是,SSE和stdio代码和启动都不一样,注意*_server.py文件里if __name__ == "__main__"下的注释,需要对应上。

4.1 SSE通信模型启动程序

        每个*_server.py需要单独一个终端执行如下命令:

uv run sql_server.py
uv run pyhton_server.py
uv run weather_server.py

        客户端启动

uv run client_sse.py

4.2 stdio通信模型启动程序

uv run client_stdio.py

五、效果图

        SSE运行效果图:

stdio运行效果图:

### 如何配置MCP服务器及其最佳实践 配置MCP(Multi-Client Protocol)服务器涉及多个方面,包括理解其基本架构、设置环境以及优化性能。以下是关于如何配置MCP服务器的一些关键点: #### MCP服务器的基础概念 MCP协议允许客户端与服务器之间通过特定的消息传递机制进行通信。它分为客户端服务器端两部分,在实际应用中可以先专注于服务器端的搭建[^2]。 #### 配置MCP服务器的关键步骤 1. **选择合适的传输方式** - MCP支持多种数据传输方法,比如`stdio`或`SSE (Server-Sent Events)`。如果计划使用`stdio`作为主要传输手段,则需确保客户端能够解析标准输入/输出流中的消息;而采用`SSE`则需要提供有效的URL地址供客户端访问[^3]。 2. **设定连接参数** - 对于基于`stdio`的服务来说,主要是指定启动程序所需的命令行选项。 - 如果是网络服务形式(`SSE`),那么应该明确指出用于接收请求的具体HTTP路径或者WebSocket链接。 3. **集成到现有系统** - 开发者可以选择直接构建自己的MCP实现方案,也可以利用已有的开源项目快速上手。例如某些平台上已经存在可立即部署使用的预建MCP服务器实例。 4. **测试与调试** - 完成初步配置之后,务必进行全面的功能验证以确认所有预期行为均正常运作。可以通过模拟不同类型的用户交互场景来进行压力测试并调整资源分配策略[^1]。 5. **安全性考量** - 考虑到网络安全的重要性,在开放外部接口之前应实施必要的身份认证措施防止未授权访问。同时也要注意保护敏感信息不被泄露给未经授权的一方。 6. **文档记录维护** - 清晰详尽的技术文档有助于后续团队成员的理解支持工作。定期更新指南说明任何改动之处以便长期管理更加便捷高效。 ```bash # 示例:简单的MCP Server初始化脚本(伪代码) #!/bin/bash # 设置基础变量 SERVER_NAME="My_MCP_Server" PORT=8080 echo "Starting $SERVER_NAME on port $PORT..." # 启动服务逻辑... ./mcp_server --port=$PORT & PID=$! trap 'kill $PID' EXIT wait $PID ``` 以上内容概括了从零开始建立一个功能性完备的MCP server所需要遵循的主要指导原则技术要点。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值