一、内容概述
这是一个基于 OpenAI API 的多服务器工具调用系统,主要功能包括:
1. **多服务器工具集成**
- 支持同时连接多个 MCP (Model-Controller-Provider) 服务器
- 目前集成了三个服务器:
- 天气服务 (weather)
- Python 执行服务 (PythonServer)
- SQL 查询服务 (SQLServer)
2. **两种运行模式**
- SSE (Server-Sent Events) 模式:通过 <mcfile name="client_sse.py"
- 标准 I/O 模式:通过 <mcfile name="client_stdio.py"
3. **基于 Function Calling 的工具调用**
- 使用 OpenAI 的最新 Function Calling 功能
- 系统会自动:
1. 收集各服务器提供的工具信息
2. 将工具信息转换为 OpenAI Function Calling 格式
3. 根据用户输入智能选择合适的工具
4. 执行工具调用并返回结果
4. **主要依赖**
- OpenAI API 用于 LLM 调用
- httpx 用于 HTTP 客户端
- pandas 用于数据处理
- pymysql 用于数据库操作
一个典型的 AI Agent 系统,通过 LLM 的能力来智能调用不同服务器提供的各种工具,实现更复杂的任务处理能力。作为一个模板样例便于以后学习。
主要文件如下所示:
二、依赖库
即requirements.txt
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
cryptography==44.0.2
distro==1.9.0
exceptiongroup==1.2.2
h11==0.14.0
httpcore==1.0.8
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
jiter==0.9.0
mcp==1.6.0
numpy==2.2.4
openai==1.74.0
pandas==2.2.3
pycparser==2.22
pydantic==2.11.3
pydantic-core==2.33.1
pydantic-settings==2.8.1
pymysql==1.1.1
python-dateutil==2.9.0.post0
python-dotenv==1.1.0
pytz==2025.2
requests==2.32.3
six==1.17.0
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.2
tqdm==4.67.1
typing-extensions==4.13.2
typing-inspection==0.4.0
tzdata==2025.2
urllib3==2.4.0
uvicorn==0.34.1
三、主要代码
.env用于存放LLM的API密钥
BASE_URL=http://192.168.0.10:11434/v1/
MODEL=qwq:32b-fp16 # qwq:32b-fp16 qwen2.5-coder:14b
OPENAI_API_KEY=ollama
# BASE_URL=https://api.siliconflow.cn/v1/
# MODEL=deepseek-ai/DeepSeek-R1-Distill-Qwen-7B
# OPENAI_API_KEY=sk-usbsjapwulwdmlpjseihldovpiyccpkadasylecgtriwsuwj
client_sse.py
import asyncio
import os
import json
from typing import Optional, Dict
from contextlib import AsyncExitStack
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
load_dotenv()
class MultiServerMCPClient:
def __init__(self):
"""管理多个 MCP 服务器的客户端"""
self.exit_stack = AsyncExitStack()
self.openai_api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("BASE_URL")
self.model = os.getenv("MODEL")
if not self.openai_api_key:
raise ValueError("❌ 未找到 OPENAI_API_KEY,请在 .env 文件中配置")
# 初始化 OpenAI Client
self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
# 存储 (server_name -> MCP ClientSession) 映射
self.sessions: Dict[str, ClientSession] = {}
# 存储工具信息
self.tools_by_session: Dict[str, list] = {} # 每个 session 的 tools 列表
self.all_tools = [] # 合并所有工具的列表
async def connect_to_servers(self, servers: dict):
"""
同时启动多个服务器并获取工具
servers: 形如 {"weather": "weather_server.py", "rag": "rag_server.py"}
"""
for server_name, script_path in servers.items():
session = await self._start_one_server(script_path)
self.sessions[server_name] = session
# 列出此服务器的工具
resp = await session.list_tools()
self.tools_by_session[server_name] = resp.tools # 保存到 self.tools_by_session
for tool in resp.tools:
# OpenAI Function Calling 格式修正
function_name = f"{server_name}_{tool.name}"
# print(tool.name)
self.all_tools.append({
"type": "function",
"function": {
"name": function_name,
"description": tool.description,
"input_schema": tool.inputSchema
}
})
# 转化function calling格式
self.all_tools = await self.transform_json(self.all_tools)
# print(self.all_tools)
print("\n✅ 已连接到下列服务器:")
for name in servers:
print(f" - {name}: {servers[name]}")
print("\n汇总的工具:")
for t in self.all_tools:
print(f" - {t['function']['name']}")
async def transform_json(self, json2_data):
"""
将类似 json2 的格式转换为类似 json1 的格式,多余字段会被直接删除。
:param json2_data: 一个可被解释为列表的 Python 对象(或已解析的 JSON 数据)
:return: 转换后的新列表
"""
result = []
for item in json2_data:
# 确保有 "type" 和 "function" 两个关键字段
if not isinstance(item, dict) or "type" not in item or "function" not in item:
continue
old_func = item["function"]
# 确保 function 下有我们需要的关键子字段
if not isinstance(old_func, dict) or "name" not in old_func or "description" not in old_func:
continue
# 处理新 function 字段
new_func = {
"name": old_func["name"],
"description": old_func["description"],
"parameters": {}
}
# 读取 input_schema 并转成 parameters
if "input_schema" in old_func and isinstance(old_func["input_schema"], dict):
old_schema = old_func["input_schema"]
# 新的 parameters 保留 type, properties, required 这三个字段
new_func["parameters"]["type"] = old_schema.get("type", "object")
new_func["parameters"]["properties"] = old_schema.get("properties", {})
new_func["parameters"]["required"] = old_schema.get("required", [])
new_item = {
"type": item["type"],
"function": new_func
}
result.append(new_item)
return result
async def _start_one_server(self, server_url: str) -> ClientSession:
"""启动单个 MCP 服务器子进程,并返回 ClientSession"""
# 创建 SSE 客户端连接上下文管理器
sse_transport = await self.exit_stack.enter_async_context(sse_client(url=server_url))
# 异步初始化 SSE 连接,获取数据流对象
read_stream, write_stream = sse_transport
session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
await session.initialize()
return session
async def chat_base(self, messages: list) -> list:
# messages = [{"role": "user", "content": query}]
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.all_tools
)
if response.choices[0].finish_reason == "tool_calls":
max_number = 10 # 防止无限循环调用工具
while max_number:
messages = await self.create_function_response_messages(messages, response)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.all_tools
)
if response.choices[0].finish_reason != "tool_calls":
break
max_number = max_number - 1
# return response.choices[0].message.content
return response
async def create_function_response_messages(self, messages, response):
function_call_messages = response.choices[0].message.tool_calls
messages.append(response.choices[0].message.model_dump())
for function_call_message in function_call_messages:
tool_name = function_call_message.function.name
tool_args = json.loads(function_call_message.function.arguments)
# 运行外部函数
function_response = await self._call_mcp_tool(tool_name, tool_args)
# 拼接消息队列
messages.append(
{
"role": "tool",
"content": function_response,
"tool_call_id": function_call_message.id,
}
)
return messages
async def process_query(self, user_query: str) -> str:
"""
OpenAI 最新 Function Calling 逻辑:
1. 发送用户消息 + tools 信息
2. 若模型 `finish_reason == "tool_calls"`,则解析 toolCalls 并执行相应 MCP 工具
3. 把调用结果返回给 OpenAI,让模型生成最终回答
"""
messages = [{"role": "user", "content": user_query}]
# 第一次请求
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.all_tools
)
content = response.choices[0]
print(content)
print(self.all_tools)
# 如果模型调用了 MCP 工具
if content.finish_reason == "tool_calls":
# 解析 tool_calls
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name # 形如 "weather_query_weather"
tool_args = json.loads(tool_call.function.arguments)
print(f"\n[ 调用工具: {tool_name}, 参数: {tool_args} ]\n")
# 执行MCP工具
result = await self._call_mcp_tool(tool_name, tool_args)
# 把工具调用历史写进 messages
messages.append(content.message.model_dump())
messages.append({
"role": "tool",
"content": result,
"tool_call_id": tool_call.id,
})
# 第二次请求,让模型整合工具结果,生成最终回答
response = self.client.chat.completions.create(
model=self.model,
messages=messages
)
return response.choices[0].message.content
# 如果模型没调用工具,直接返回回答
return content.message.content
async def _call_mcp_tool(self, tool_full_name: str, tool_args: dict) -> str:
"""
根据 "serverName_toolName" 调用相应的服务器工具
"""
parts = tool_full_name.split("_", 1) # 拆分 "weather_query_weather" -> ["weather", "query_weather"]
if len(parts) != 2:
return f"无效的工具名称: {tool_full_name}"
server_name, tool_name = parts
session = self.sessions.get(server_name)
if not session:
return f"找不到服务器: {server_name}"
# 执行 MCP 工具
resp = await session.call_tool(tool_name, tool_args)
print(resp)
return resp.content if resp.content else "工具执行无输出"
async def chat_loop(self):
print("\n🤖 多服务器 MCP + 最新 Function Calling 客户端已启动!输入 'quit' 退出。")
messages = []
while True:
query = input("\n你: ").strip()
if query.lower() == "quit":
break
try:
messages.append({"role": "user", "content": query})
messages = messages[-20: ]
# print(messages)
response = await self.chat_base(messages)
messages.append(response.choices[0].message.model_dump())
result = response.choices[0].message.content
print(f"\nAI: {result}")
except Exception as e:
print(f"\n⚠️ 调用过程出错: {e}")
async def cleanup(self):
# 关闭所有资源
await self.exit_stack.aclose()
async def main():
# 服务器配置 以SSE 方式运行 MCP 服务器
servers = {
"weather": "http://192.168.0.10:8001/sse",
"PythonServer": "http://192.168.0.10:8002/sse",
"SQLServer": "http://192.168.0.10:8003/sse"
}
client = MultiServerMCPClient()
try:
await client.connect_to_servers(servers)
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
client_stdio.py
import asyncio
import os
import json
from typing import Optional, Dict
from contextlib import AsyncExitStack
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
load_dotenv()
class MultiServerMCPClient:
def __init__(self):
"""管理多个 MCP 服务器的客户端"""
self.exit_stack = AsyncExitStack()
self.openai_api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("BASE_URL")
self.model = os.getenv("MODEL")
if not self.openai_api_key:
raise ValueError("❌ 未找到 OPENAI_API_KEY,请在 .env 文件中配置")
# 初始化 OpenAI Client
self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
# 存储 (server_name -> MCP ClientSession) 映射
self.sessions: Dict[str, ClientSession] = {}
# 存储工具信息
self.tools_by_session: Dict[str, list] = {} # 每个 session 的 tools 列表
self.all_tools = [] # 合并所有工具的列表
async def connect_to_servers(self, servers: dict):
"""
同时启动多个服务器并获取工具
servers: 形如 {"weather": "weather_server.py", "rag": "rag_server.py"}
"""
for server_name, script_path in servers.items():
session = await self._start_one_server(script_path)
self.sessions[server_name] = session
# 列出此服务器的工具
resp = await session.list_tools()
self.tools_by_session[server_name] = resp.tools # 保存到 self.tools_by_session
for tool in resp.tools:
# OpenAI Function Calling 格式修正
function_name = f"{server_name}_{tool.name}"
# print(tool.name)
self.all_tools.append({
"type": "function",
"function": {
"name": function_name,
"description": tool.description,
"input_schema": tool.inputSchema
}
})
# 转化function calling格式
self.all_tools = await self.transform_json(self.all_tools)
# print(self.all_tools)
print("\n✅ 已连接到下列服务器:")
for name in servers:
print(f" - {name}: {servers[name]}")
print("\n汇总的工具:")
for t in self.all_tools:
print(f" - {t['function']['name']}")
async def transform_json(self, json2_data):
"""
将类似 json2 的格式转换为类似 json1 的格式,多余字段会被直接删除。
:param json2_data: 一个可被解释为列表的 Python 对象(或已解析的 JSON 数据)
:return: 转换后的新列表
"""
result = []
for item in json2_data:
# 确保有 "type" 和 "function" 两个关键字段
if not isinstance(item, dict) or "type" not in item or "function" not in item:
continue
old_func = item["function"]
# 确保 function 下有我们需要的关键子字段
if not isinstance(old_func, dict) or "name" not in old_func or "description" not in old_func:
continue
# 处理新 function 字段
new_func = {
"name": old_func["name"],
"description": old_func["description"],
"parameters": {}
}
# 读取 input_schema 并转成 parameters
if "input_schema" in old_func and isinstance(old_func["input_schema"], dict):
old_schema = old_func["input_schema"]
# 新的 parameters 保留 type, properties, required 这三个字段
new_func["parameters"]["type"] = old_schema.get("type", "object")
new_func["parameters"]["properties"] = old_schema.get("properties", {})
new_func["parameters"]["required"] = old_schema.get("required", [])
new_item = {
"type": item["type"],
"function": new_func
}
result.append(new_item)
return result
async def _start_one_server(self, script_path: str) -> ClientSession:
"""启动单个 MCP 服务器子进程,并返回 ClientSession"""
is_python = script_path.endswith(".py")
is_js = script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
read_stream, write_stream = stdio_transport
session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
await session.initialize()
return session
async def chat_base(self, messages: list) -> list:
# messages = [{"role": "user", "content": query}]
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.all_tools
)
if response.choices[0].finish_reason == "tool_calls":
max_number = 10 # 防止无限循环调用工具
while max_number:
messages = await self.create_function_response_messages(messages, response)
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.all_tools
)
if response.choices[0].finish_reason != "tool_calls":
break
max_number = max_number - 1
# return response.choices[0].message.content
return response
async def create_function_response_messages(self, messages, response):
function_call_messages = response.choices[0].message.tool_calls
messages.append(response.choices[0].message.model_dump())
for function_call_message in function_call_messages:
tool_name = function_call_message.function.name
tool_args = json.loads(function_call_message.function.arguments)
# 运行外部函数
function_response = await self._call_mcp_tool(tool_name, tool_args)
# 拼接消息队列
messages.append(
{
"role": "tool",
"content": function_response,
"tool_call_id": function_call_message.id,
}
)
return messages
async def process_query(self, user_query: str) -> str:
"""
OpenAI 最新 Function Calling 逻辑:
1. 发送用户消息 + tools 信息
2. 若模型 `finish_reason == "tool_calls"`,则解析 toolCalls 并执行相应 MCP 工具
3. 把调用结果返回给 OpenAI,让模型生成最终回答
"""
messages = [{"role": "user", "content": user_query}]
# 第一次请求
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.all_tools
)
content = response.choices[0]
print(content)
print(self.all_tools)
# 如果模型调用了 MCP 工具
if content.finish_reason == "tool_calls":
# 解析 tool_calls
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name # 形如 "weather_query_weather"
tool_args = json.loads(tool_call.function.arguments)
print(f"\n[ 调用工具: {tool_name}, 参数: {tool_args} ]\n")
# 执行MCP工具
result = await self._call_mcp_tool(tool_name, tool_args)
# 把工具调用历史写进 messages
messages.append(content.message.model_dump())
messages.append({
"role": "tool",
"content": result,
"tool_call_id": tool_call.id,
})
# 第二次请求,让模型整合工具结果,生成最终回答
response = self.client.chat.completions.create(
model=self.model,
messages=messages
)
return response.choices[0].message.content
# 如果模型没调用工具,直接返回回答
return content.message.content
async def _call_mcp_tool(self, tool_full_name: str, tool_args: dict) -> str:
"""
根据 "serverName_toolName" 调用相应的服务器工具
"""
parts = tool_full_name.split("_", 1) # 拆分 "weather_query_weather" -> ["weather", "query_weather"]
if len(parts) != 2:
return f"无效的工具名称: {tool_full_name}"
server_name, tool_name = parts
session = self.sessions.get(server_name)
if not session:
return f"找不到服务器: {server_name}"
# 执行 MCP 工具
resp = await session.call_tool(tool_name, tool_args)
print(resp)
return resp.content if resp.content else "工具执行无输出"
async def chat_loop(self):
print("\n🤖 多服务器 MCP + 最新 Function Calling 客户端已启动!输入 'quit' 退出。")
messages = []
while True:
query = input("\n你: ").strip()
if query.lower() == "quit":
break
try:
messages.append({"role": "user", "content": query})
messages = messages[-20: ]
# print(messages)
response = await self.chat_base(messages)
messages.append(response.choices[0].message.model_dump())
result = response.choices[0].message.content
print(f"\nAI: {result}")
except Exception as e:
print(f"\n⚠️ 调用过程出错: {e}")
async def cleanup(self):
# 关闭所有资源
await self.exit_stack.aclose()
async def main():
# 服务器脚本 以标准 I/O 方式运行 MCP 服务器
servers = {
"weather": "weather_server.py",
"SQLServer":"sql_server.py",
"PythonServer":"python_server.py"
}
client = MultiServerMCPClient()
try:
await client.connect_to_servers(servers)
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
python_server.py
import json
from typing import Any
import csv
import numpy as np
import pandas as pd
import random
from mcp.server.fastmcp import FastMCP
# 初始化 MCP 服务器
mcp = FastMCP("PythonServer", port=8002)
USER_AGENT = "Pythonserver-app/1.0"
@mcp.tool()
async def python_inter(py_code):
"""
运行用户提供的 Python 代码,并返回执行结果。
:param py_code: 字符串形式的 Python 代码
:return: 代码运行的最终结果
"""
g = globals()
try:
# 若是表达式,直接运行并返回
result = eval(py_code, g)
return json.dumps(str(result), ensure_ascii=False)
except Exception:
global_vars_before = set(g.keys())
try:
exec(py_code, g)
except Exception as e:
return json.dumps(f"代码执行时报错: {e}", ensure_ascii=False)
global_vars_after = set(g.keys())
new_vars = global_vars_after - global_vars_before
if new_vars:
# 只返回可序列化的变量值
safe_result = {}
for var in new_vars:
try:
json.dumps(g[var]) # 尝试序列化,确保可以转换为 JSON
safe_result[var] = g[var]
except (TypeError, OverflowError):
safe_result[var] = str(g[var]) # 如果不能序列化,则转换为字符串
return json.dumps(safe_result, ensure_ascii=False)
else:
return json.dumps("已经顺利执行代码", ensure_ascii=False)
if __name__ == "__main__":
# 以标准 I/O 方式运行 MCP 服务器
# mcp.run(transport='stdio')
# 使用 SSE 方式运行 MCP 服务器
mcp.run(transport='sse')
sql_server.py
import json
import httpx
from typing import Any
import pymysql
import csv
from mcp.server.fastmcp import FastMCP
# 初始化 MCP 服务器
mcp = FastMCP("SQLServer", port=8003)
USER_AGENT = "SQLserver-app/1.0"
g_host_ip = "192.168.0.10"
# g_host_ip = "localhost"
g_user = "root"
g_passwd = "123000000"
g_db = "school"
@mcp.tool()
async def sql_inter(sql_query):
"""
查询本地MySQL数据库,通过运行一段SQL代码来进行数据库查询。\
:param sql_query: 字符串形式的SQL查询语句,用于执行对MySQL中school数据库中各张表进行查询,并获得各表中的各类相关信息
:return:sql_query在MySQL中的运行结果。
"""
connection = pymysql.connect(
host=g_host_ip, # 数据库地址
user=g_user, # 数据库用户名
passwd=g_passwd, # 数据库密码
db=g_db, # 数据库名
charset='utf8' # 字符集选择utf8
)
try:
with connection.cursor() as cursor:
# SQL查询语句
sql = sql_query
cursor.execute(sql)
# 获取查询结果
results = cursor.fetchall()
finally:
connection.close()
return json.dumps(results)
@mcp.tool()
async def export_table_to_csv(table_name, output_file):
"""
将 MySQL 数据库中的某个表导出为 CSV 文件。
:param table_name: 需要导出的表名
:param output_file: 输出的 CSV 文件路径
"""
# 连接 MySQL 数据库
connection = pymysql.connect(
host=g_host_ip, # 数据库地址
user=g_user, # 数据库用户名
passwd=g_passwd, # 数据库密码
db=g_db, # 数据库名
charset='utf8' # 字符集选择utf8
)
try:
with connection.cursor() as cursor:
# 查询数据表的所有数据
query = f"SELECT * FROM {table_name};"
cursor.execute(query)
# 获取所有列名
column_names = [desc[0] for desc in cursor.description]
# 获取查询结果
rows = cursor.fetchall()
# 将数据写入 CSV 文件
with open(output_file, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# 写入表头
writer.writerow(column_names)
# 写入数据
writer.writerows(rows)
print(f"数据表 {table_name} 已成功导出至 {output_file}")
except Exception as e:
print(f"导出失败: {e}")
finally:
connection.close()
if __name__ == "__main__":
# 以标准 I/O 方式运行 MCP 服务器
# mcp.run(transport='stdio')
# 使用 SSE 方式运行 MCP 服务器
mcp.run(transport='sse')
weather_server.py
import json
import httpx
from typing import Any, Union
from mcp.server.fastmcp import FastMCP
from datetime import datetime
import requests
# 初始化 MCP 服务器
mcp = FastMCP("WeatherServer", port=8001)
# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://key.wenwen-ai.com"
API_KEY = "sk-NYsoG3VBKDiTuvdtC969F95aFc4f45379aD3854a93602327" # 请替换为你自己的 OpenWeather API Key
USER_AGENT = "v1"
# async def fetch_weather(city: str) -> dict[str, Any] | None:
# """
# 从 OpenWeather API 获取天气信息。
# :param city: 城市名称(需使用英文,如 Beijing)
# :return: 天气数据字典;若出错返回包含 error 信息的字典
# """
# params = {
# "q": city,
# "appid": API_KEY,
# "units": "metric",
# "lang": "zh_cn"
# }
# headers = {"User-Agent": USER_AGENT}
# async with httpx.AsyncClient() as client:
# try:
# response = await client.get(OPENWEATHER_API_BASE, params=params,headers=headers, timeout=60.0)
# response.raise_for_status()
# return response.json() # 返回字典类型
# except httpx.HTTPStatusError as e:
# return {"error": f"HTTP 错误: {e.response.status_code}"}
# except Exception as e:
# return {"error": f"请求失败: {str(e)}"}
# def format_weather(data: dict[str, Any] | str) -> str:
# def format_weather(data: Union[dict[str, Any], str]) -> str:
# """
# 将天气数据格式化为易读文本。
# :param data: 天气数据(可以是字典或 JSON 字符串)
# :return: 格式化后的天气信息字符串
# """
# # 如果传入的是字符串,则先转换为字典
# if isinstance(data, str):
# try:
# data = json.loads(data)
# except Exception as e:
# return f"无法解析天气数据: {e}"
# # 如果数据中包含错误信息,直接返回错误提示
# if "error" in data:
# return f"⚠️ {data['error']}"
# # 提取数据时做容错处理
# city = data.get("place", "未知")
# temp = data.get("temperature", "N/A")
# humidity = data.get("humidity", "N/A")
# wind_speed = data.get("windSpeed", "N/A")
# description = data.get("weather1", "未知")
# current_time = datetime.now()
# format_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
# return (
# f"更新时间: {format_time}\n"
# f"🌍 {city}\n"
# f"🌡 温度: {temp}°C\n"
# f"💧 湿度: {humidity}%\n"
# f"🌬 风速: {wind_speed} m/s\n"
# f"🌤 天气: {description}\n"
# )
# @mcp.tool()
# async def query_weather(city: str) -> str:
# """
# 输入指定城市的英文名称,返回今日天气查询结果。
# :param city: 城市名称(需使用英文)
# :return: 格式化后的天气信息
# """
# data = await fetch_weather(city)
# return format_weather(data)
# 国内天气server
g_id = "88888888"
g_key = "88888888"
g_base_url = "https://cn.apihz.cn/api/tianqi/tqyb.php"
async def get_weather(sheng:str, place: str) -> dict:
"""
获取指定地区的天气信息,比如place是南昌,则推出sheng是江西,最后获取南昌的天气信息并返回
Args:
sheng: 省级名称(如:江西等)
place: 地区名称(如:南昌等)
Returns:
dict: 天气信息字典,包含温度、天气状况等信息
"""
# 构建请求参数
params = {
"id": g_id,
"key": g_key,
"place": place,
"sheng": sheng
}
try:
# 发送GET请求
response = requests.get(g_base_url, params=params)
response.raise_for_status() # 检查请求是否成功
# 解析JSON响应
weather_data = response.json()
return weather_data
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
except json.JSONDecodeError as e:
print(f"JSON解析失败: {e}")
return None
def format_weather(data: dict[str, Any] | str) -> str:
"""
将天气数据格式化为易读文本。
:param data: 天气数据(可以是字典或 JSON 字符串)
:return: 格式化后的天气信息字符串
"""
# 如果传入的是字符串,则先转换为字典
if isinstance(data, str):
try:
data = json.loads(data)
except Exception as e:
return f"无法解析天气数据: {e}"
# 如果数据中包含错误信息,直接返回错误提示
if "error" in data:
return f"⚠️ {data['error']}"
# 提取数据时做容错处理
city = data.get("place", "未知")
temp = data.get("temperature", "N/A")
humidity = data.get("humidity", "N/A")
wind_speed = data.get("windSpeed", "N/A")
description = data.get("weather1", "未知")
current_time = datetime.now()
format_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
return (
f"更新时间: {format_time}\n"
f"🌍 {city}\n"
f"🌡 温度: {temp}°C\n"
f"💧 湿度: {humidity}%\n"
f"🌬 风速: {wind_speed} m/s\n"
f"🌤 天气: {description}\n"
)
@mcp.tool()
async def query_weather(place:str, sheng:str) -> dict:
"""
获取指定地区的天气信息,比如place是南昌,则推出sheng是江西,最后获取南昌的天气信息并返回;
如果输入的原本就是省级单位,则place为首府,比如sheng为江苏,则place为南京;
如果输入的是直辖市,则sheng与place相同,比如place为北京,则sheng为北京。
Args:
sheng: 省级名称(如:江西等)
place: 地区名称(如:南昌等)
:return: 格式化后的天气信息
"""
data = await get_weather(sheng, place)
return format_weather(data)
if __name__ == "__main__":
# 以标准 I/O 方式运行 MCP 服务器
# mcp.run(transport='stdio')
# print(format_weather(get_weather("江西", "南昌")))
# 使用 SSE 方式运行 MCP 服务器
mcp.run(transport='sse')
四、启动
需要说明的是,SSE和stdio代码和启动都不一样,注意*_server.py文件里if __name__ == "__main__"下的注释,需要对应上。
4.1 SSE通信模型启动程序
每个*_server.py需要单独一个终端执行如下命令:
uv run sql_server.py
uv run pyhton_server.py
uv run weather_server.py
客户端启动
uv run client_sse.py
4.2 stdio通信模型启动程序
uv run client_stdio.py
五、效果图
SSE运行效果图:
stdio运行效果图: