探索MCP协议:构建AI助手,实现安全数据访问与智能化任务执行。附带实践代码,助力开发者快速上手。
原文标题:手搓Manus?MCP 原理解析与MCP Client实践
原文作者:阿里云开发者
冷月清谈:
怜星夜思:
2、文章中提到了MCP Client可以使用Python和TypeScript实现,你认为在开发AI Agent时,选择哪种语言更合适?
3、文章中提到可以将MCP Server的结果交给AI进行总结润色,也可以由系统直接接管返回。在什么场景下,你认为交给AI润色是必要的?
原文内容
阿里妹导读
本文讲述了MCP 原理解析和作者的MCP Client实践,希望能实现一个自己的 agent,让 AI 不仅能与人交流,还能协助工作。文末附源码!
MCP(Model Context Protocol)是由Anthropic于2024年底提出并开源的一种协议,旨在为AI系统(如AI编程助手、Agent等)提供安全、标准化的数据访问方式。它采用客户端-服务器架构,使AI工具(如Claude Desktop、IDE插件等)能够通过MCP客户端与MCP服务端交互,访问本地或远程数据源。
官方文档:MCP Quickstart
https://modelcontextprotocol.io/quickstart/server
基础概念
基础概念讲解总结自官方文档
MCP 是客户端-服务端架构,一个 Host 可以连接多个 MCP Server。
-
MCP Hosts(宿主程序):如Claude Desktop、IDE等,通过MCP访问数据。
-
MCP Clients(客户端):与服务器建立1:1连接,处理通信。
-
MCP Servers(服务端):轻量级程序,提供标准化的数据或工具访问能力。
-
Local Data Sources(本地数据源):如文件、数据库等,由MCP服务端安全访问。
-
Remote Services(远程服务):如API、云服务等,MCP服务端可代理访问。
协议层与传输层
-
负责消息封装(framing)、请求/响应关联、高级通信模式管理。
支持两种通信方式:
1.Stdio传输(标准输入/输出)
-
适用于本地进程间通信。
2.HTTP + SSE传输
-
服务端→客户端:Server-Sent Events(SSE)
-
客户端→服务端:HTTP POST
-
适用于远程网络通信。
所有传输均采用JSON-RPC 2.0进行消息交换。
消息类型
MCP 拥有多种类型的消息来处理不同的场景
请求(Request)(期望获得响应)
interface Request {
method: string;
params?: { ... };
}
成功响应(Result)
interface Result {
[key: string]: unknown;
}
错误响应(Error)
interface Error {
code: number;
message: string;
data?: unknown;
}
通知(Notification)(单向,无需响应)
interface Notification {
method: string;
params?: { ... };
}
生命周期
类似于三次握手,MCP客户端与MCP服务端初始化建立连接会进行以下步骤:
初始化(Initialization)
1.客户端发送initialize
请求(含协议版本、能力集)。
2.服务端返回版本及能力信息。
3.客户端发送initialized
通知确认。
4.进入正常通信阶段。
消息交换(Message Exchange)
当初始化完毕,就可以进行通信了,目前支持:
-
请求-响应模式(Request-Response):双向通信。
-
通知模式(Notification):单向消息。
终止(Termination)
有以下几种方式会关闭连接
-
主动关闭(
close()
)。 -
传输层断开。
-
错误触发终止。
实践
基础概念介绍完毕,接下来进行实践,我希望能实现一个自己的 agent,让 AI 不仅能和我交流,还能帮我干活。换一句话就是
myAgent is a general AI agent that turns your thoughts into actions. It excels at various tasks in work and life, getting everything done while you rest. [手动doge][手动doge]
先画一个图:
如图,要实现一个这样的效果,实现一个 myAgent,启动时,MCP Client建立与 MCP 服务端的连接,此时 MCP Server 上的能力或者可调用的工具会注册进来, 让 Client 感知到这个MCP服务能够干啥。
当用户与 Agent 进行交互时,Agent 会让 MCP Client 将用户的输入发送给 AI,让 AI 解析用户意图,一并发送的还有注册在 Client 上的能力集合。
我写了一个搜索助手的 MCP Server ,能力集的数据是这样的,可以看到目前只有一个 function,get_offers,可以看到工具里有它的名字,能力描述,需要的字段(包含字段类型,字段描述)。
available_tools is: [{'type': 'function', 'function': {'name': 'get_offers', 'description': 'Get product offers from API', 'parameters': {'type': 'object', 'properties': {'keywords': {'type': 'string', 'description': 'Keywords to search for products', 'default': ''}, 'pageSize': {'type': 'number', 'description': 'Number of items per page', 'minimum': 1, 'maximum': 100, 'default': 10}}}}}]
AI 会理解用户意图,决定是否用自然语言回答用户,或者选择合适的工具,告诉 client,帮我调用它。
当输入 你好
时,传给 AI 的 message 是这样的,这里系统预设了 AI 的一个身份,用于更好的完成特定领域的任务。
[
{
"role": "user",
"content": "你好"
},
{
"role": "system",
"content": "You're a helpful digital assistant who can answer questions and support users in completing tasks."
}
]
此时 client 接收到 AI 的消息后,会解析数据,当没有工具要调用时
AI返回的是这样的:
ChatCompletionMessage(content='你好!有什么可以帮助你的吗?', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None)
可以看到,AI 返回是带有角色信息的,然后本次并没有识别到需要调用工具的地方,因此直接返回给用户就好,当然,在工程应用时,可以进行额外的逻辑处理。
让 AI 长出手,AI调用 MCP Server流程揭秘
当输入 帮我找一些手表
时,输入是:
[{'role': 'user', 'content': '帮我找一些手表'}, {'role': 'system', 'content': 'You are a helpful assistant that can answer questions and help with tasks.'}]
第一次AI 交互
AI返回的是
AI response is
ChatCompletionMessage(content='', refusal=None, role='assistant', annotations=None, audio=None, function_call=None,
tool_calls=[ChatCompletionMessageToolCall(id='0195c8c06aaf3ea050e6d8eed17380ec', function=Function(arguments='{"keywords": "手表", "pageSize": 10}', name='get_offers'), type='function')])
可以看到,AI 识别到了用户的意图,是要寻找一些手表,并自动的选择了一个工具进行调用,根据工具使用说明,决定了选择的工具应该输入什么入参。(这里和模型很相关,是一个重要的节点,识别用户意图并决定要调用工具,有时识别的并不准确,或者返回的结构不是标准可解析的,这时就触发不了工具的调用,还会引入一些辣鸡信息,可能的解决方案是 换效果更好的模型,或者用提示词来约束模型返回,或者系统自己增加鲁棒性,提升成功率)
下面举一个效果不好的例子,大家如果知道有其他解决方法欢迎留言。
AI response is ChatCompletionMessage(content='leton\n{{"name": "get_offers", "arguments": {"keywords": "手表", "pageSize": 10}}}\n</tool_call>', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None)
Client 调用 MCP Server
client 接收到AI 的消息后,发现要调用工具,并且也有了工具所需的参数,就会与通过协议与 MCP Server 进行通信,告诉 MCP Server 该使用 get_offers 能力了,并且期待 MCP Server 将结果返回回来:
result = await self.session.call_tool(tool_name, tool_args)
获取 MCP Server 数据
MCP Server 不负众望,将结果返回了,可以看看返回的格式是什么样的:
meta=None content=[TextContent(type='text', text='some...product...info', annotations=None)] isError=False
MCP Client 拿到数据后,再将数据发送给 AI,输入是这样的:
{
"messages": [
{
"role": "user",
"content": "帮我找一些手表"
},
{
"role": "system",
"content": "You are a helpful assistant that can answer questions and help with tasks."
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "0195c8c06aaf3ea050e6d8eed17380ec",
"type": "function",
"function": {
"name": "get_offers",
"arguments": "{\"keywords\": \"手表\", \"pageSize\": 10}"
}
}
]
},
{
"role": "tool",
"tool_call_id": "0195c8c06aaf3ea050e6d8eed17380ec",
"content": {
"type": "text",
"text": "some...product...info"
}
}
]
}
第二次 AI 交互
最后,AI 将 MCPServer 的结果,进行总结润色,结构化返回:
🤖AI: [📞调用工具 get_offers 🔑参数是 {'keywords': '手表', 'pageSize': 10}] 根据您的搜索,这里有几款手表供您参考:
1. 款式 ID: ididid2
价格: $0.24
供应商: Guangzhou Huapan Cosmetics Co., Ltd.
评分: **
收评次数: **
供应商年限: **年
推荐指数: ★★2. 款式 ID: ididid
价格: $3.99
供应商: Shenzhen Top Quality Electronics Technology Co., Ltd.
评分: **
收评次数: **
供应商年限: **年
推荐指数: ★★
这两款手表的评价和销售情况都还不错,您可以根据自己的需求选择合适的款式。如果还有其他问题或需要更多信息,请随时告诉我。
这里给了 它 10 个品 ,但是只总结了两个品,可能适合我之前的输入 帮我找一些手表
有关,看来AI 也会偷懒😅。
实际效果

实践后的总结
上面的交互过程,其实可以化简,如果在工程应用上,调用的 MCP Server 是一个预期内的结构化的结果或者触发某个任务时,可以不必进行二次 AI 调用。如上面的例子中,MCP Server 是一个搜索助手,内部发起调用的是搜索的接口,并进行结构化返回。此时在 AI 识别到用户意图并告诉 Client 该调用什么工具时,与 AI 的交互就可以结束了,由系统接管决定应该返回给用户什么,不必再将结果给到 AI 进行润色总结。
给到 AI 进行润色总结的好处是可以根据用户的输入,再结合工具获取的数据,更智能友好的返回给用户信息,这一点可以在工程应用时,进行衡量取舍。
将MCP Server 的结果交给 AI,在需要进行多轮交互场景是有必要的,根据 MCP Server的结果,进行分析及决策,动态调整要使用的工具,可以将一个复杂的任务交给 AI , 它会自己拆解成小任务,然后自动完成。
对于该场景,也进行了一些实践。
例如,让Al agent拆解抽象任务,并自己主动与系统进行多轮交互,完成任务场景。
后续文章我们会详细介绍具体实现方法,讲讲如何让Al在我的电脑上玩起贪吃蛇?
附录
import asyncio
import json
import os
import traceback
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv() # load environment variables from .env
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL")
)
self.model = os.getenv("OPENAI_MODEL")
self.messages = [
{
"role": "system",
"content": "You are a versatile assistant capable of answering questions, completing tasks, and intelligently invoking specialized tools to deliver optimal results."
}
]
self.available_tools = []
@staticmethod
def convert_custom_object(obj):
"""
将自定义对象转换为字典
"""
if hasattr(obj, "__dict__"): # 如果对象有 __dict__ 属性,直接使用
return obj.__dict__
elif isinstance(obj, (list, tuple)): # 如果是列表或元组,递归处理
return [MCPClient.convert_custom_object(item) for item in obj]
elif isinstance(obj, dict): # 如果是字典,递归处理值
return {key: MCPClient.convert_custom_object(value) for key, value in obj.items()}
else: # 其他类型(如字符串、数字等)直接返回
return obj
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""Process a query with multi-turn tool calling support"""
# Add user query to message history
self.messages.append({
"role": "user",
"content": query
})
# Get available tools if not already set
if not self.available_tools:
response = await self.session.list_tools()
self.available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
current_response = self.client.chat.completions.create(
model=self.model,
messages=self.messages,
tools=self.available_tools,
stream=False
)
# Print initial response if exists
if current_response.choices[0].message.content:
print("\n🤖 AI:", current_response.choices[0].message.content)
# 直到下一次交互 AI 没有选择调用工具时退出循环
while current_response.choices[0].message.tool_calls:
# AI 一次交互中可能会调用多个工具
for tool_call in current_response.choices[0].message.tool_calls:
tool_name = tool_call.function.name
try:
tool_args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
tool_args = {}
print(f"\n🔧 调用工具 {tool_name}")
print(f"📝 参数: {tool_args}")
# Execute tool call
result = await self.session.call_tool(tool_name, tool_args)
print(f"\n工具结果: {result}")
# Add AI message and tool result to history
self.messages.append(current_response.choices[0].message)
self.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result.content)
})
# Get next response
current_response = self.client.chat.completions.create(
model=self.model,
messages=self.messages,
tools=self.available_tools,
stream=False
)
# Add final response to history
self.messages.append(current_response.choices[0].message)
return current_response.choices[0].message.content or ""
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nCommend: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n🤖AI: " + response)
except Exception as e:
print(f"\nError occurs: {e}")
traceback.print_exc()
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
import sys
asyncio.run(main())
/**
* MCP客户端实现
*
* 提供与MCP服务器的连接、工具调用和聊天交互功能
*
* 主要功能:
* 1. 连接Python或JavaScript实现的MCP服务器
* 2. 获取服务器提供的工具列表
* 3. 通过OpenAI API处理用户查询
* 4. 自动处理工具调用链
* 5. 提供交互式命令行界面
*
* 使用说明:
* 1. 确保设置OPENAI_API_KEY环境变量
* 2. 通过命令行参数指定MCP服务器脚本路径
* 3. 启动后输入查询或'quit'退出
*
* 依赖:
* - @modelcontextprotocol/sdk: MCP协议SDK
* - openai: OpenAI API客户端
* - dotenv: 环境变量加载
*/
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import OpenAI from "openai";
import type { ChatCompletionMessageParam } from "openai/resources/chat/completions";
import type { Tool } from "@modelcontextprotocol/sdk/types.js";
import * as dotenv from "dotenv";
import * as readline from 'readline';
// 加载环境变量配置
dotenv.config();
/**
* MCP客户端类,封装与MCP服务器的交互逻辑
*/
class MCPClient {
private openai: OpenAI; // OpenAI API客户端实例
private client: Client; // MCP协议客户端实例
private messages: ChatCompletionMessageParam[] = [
{
role: "system",
content: "You are a versatile assistant capable of answering questions, completing tasks, and intelligently invoking specialized tools to deliver optimal results."
},
]; // 聊天消息历史记录,用于维护对话上下文
private availableTools: any[] = []; // 服务器提供的可用工具列表,格式化为OpenAI工具格式
/**
* 构造函数,初始化OpenAI和MCP客户端
*
* @throws {Error} 如果OPENAI_API_KEY环境变量未设置
*
* 初始化过程:
* 1. 检查必要的环境变量
* 2. 创建OpenAI客户端实例
* 3. 创建MCP客户端实例
* 4. 初始化消息历史记录
*/
constructor() {
if (!process.env.OPENAI_API_KEY) {
throw new Error("OPENAI_API_KEY环境变量未设置");
}
this.openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
baseURL: process.env.OPENAI_BASE_URL,
});
this.client = new Client(
{
name: "my-mcp-client",
version: "1.0.0",
},
);
}
/**
* 连接到MCP服务器
*
* @param {string} serverScriptPath - 服务器脚本路径(.py或.js)
* @returns {Promise<void>} 连接成功时解析
* @throws {Error} 如果服务器脚本不是.py或.js文件,或连接失败
*
* 连接过程:
* 1. 检查脚本文件扩展名
* 2. 根据扩展名决定使用python或node执行
* 3. 通过stdio建立连接
* 4. 获取服务器工具列表并转换为OpenAI工具格式
*
* 注意事项:
* - 服务器脚本必须具有可执行权限
* - 连接成功后会自动获取工具列表
*/
async connectToServer(serverScriptPath: string) {
const isPython = serverScriptPath.endsWith('.py');
const isJs = serverScriptPath.endsWith('.js');
if (!isPython && !isJs) {
throw new Error("Server script must be a .py or .js file");
}
const command = isPython ? "python" : "node";
const transport = new StdioClientTransport({
command,
args: [serverScriptPath],
});
await this.client.connect(transport);
// 获取并转换可用工具列表
const tools = (await this.client.listTools()).tools as unknown as Tool[];
this.availableTools = tools.map(tool => ({
type: "function" as const,
function: {
name: tool.name as string,
description: tool.description as string,
parameters: {
type: "object",
properties: tool.inputSchema.properties as Record<string, unknown>,
required: tool.inputSchema.required as string[],
},
}
}));
console.log("\n已连接到服务器,可用工具:", tools.map(tool => tool.name));
}
/**
* 处理工具调用链
*
* @param {OpenAI.Chat.Completions.ChatCompletion} response - 初始OpenAI响应,包含工具调用
* @param {ChatCompletionMessageParam[]} messages - 当前消息历史记录
* @returns {Promise<OpenAI.Chat.Completions.ChatCompletion>} 最终OpenAI响应
*
* 处理流程:
* 1. 检查响应中是否包含工具调用
* 2. 循环处理所有工具调用
* 3. 解析每个工具调用的参数
* 4. 执行工具调用
* 5. 将工具结果添加到消息历史
* 6. 获取下一个OpenAI响应
*
* 错误处理:
* - 参数解析失败时使用空对象继续执行
* - 工具调用失败会抛出异常
*
* 注意事项:
* - 此方法会修改传入的messages数组
* - 可能多次调用OpenAI API
*/
private async toolCalls(response: OpenAI.Chat.Completions.ChatCompletion, messages: ChatCompletionMessageParam[]) {
let currentResponse = response;
// 直到下一次交互 AI 没有选择调用工具时退出循环
while (currentResponse.choices[0].message.tool_calls) {
if (currentResponse.choices[0].message.content) {
console.log("\n🤖 AI: tool_calls", JSON.stringify(currentResponse.choices[0].message));
}
// AI 一次交互中可能会调用多个工具
for (const toolCall of currentResponse.choices[0].message.tool_calls) {
const toolName = toolCall.function.name;
const rawArgs = toolCall.function.arguments;
let toolArgs;
try {
console.log(`rawArgs is ===== ${rawArgs}`)
toolArgs = "{}" == JSON.parse(rawArgs) ? {} : JSON.parse(rawArgs);
if (typeof toolArgs === "string") {
toolArgs = JSON.parse(toolArgs);
}
} catch (error) {
console.error('⚠️ 参数解析失败,使用空对象替代');
toolArgs = {};
}
console.log(`\n🔧 调用工具 ${toolName}`);
console.log(`📝 参数:`, toolArgs);
// 调用工具获取结果
const result = await this.client.callTool({
name: toolName,
arguments: toolArgs
});
console.log(`\n result is ${JSON.stringify(result)}`);
// 添加 AI 的响应和工具调用结果到消息历史
// console.log(`📝 currentResponse.choices[0].message:`, currentResponse.choices[0].message);
messages.push(currentResponse.choices[0].message);
messages.push({
role: "tool",
tool_call_id: toolCall.id,
content: JSON.stringify(result.content),
} as ChatCompletionMessageParam);
}
// console.log(`📝 messages: `, messages);
// 获取下一个响应
currentResponse = await this.openai.chat.completions.create({
model: process.env.OPENAI_MODEL as string,
messages: messages,
tools: this.availableTools,
});
}
return currentResponse;
}
/**
* 处理用户查询
*
* @param {string} query - 用户输入的查询字符串
* @returns {Promise<string>} AI生成的响应内容
*
* 处理流程:
* 1. 将用户查询添加到消息历史
* 2. 调用OpenAI API获取初始响应
* 3. 如果有工具调用,处理工具调用链
* 4. 返回最终响应内容
*
* 错误处理:
* - OpenAI API调用失败会抛出异常
* - 工具调用链中的错误会被捕获并记录
*
* 注意事项:
* - 此方法会更新内部消息历史
* - 可能触发多个工具调用
*/
async processQuery(query: string): Promise<string> {
// 添加用户查询到消息历史
this.messages.push({
role: "user",
content: query,
});
// 初始OpenAI API调用
let response = await this.openai.chat.completions.create({
model: process.env.OPENAI_MODEL as string,
messages: this.messages,
tools: this.availableTools,
});
// 打印初始响应
if (response.choices[0].message.content) {
console.log("\n🤖 AI:", response.choices[0].message);
}
// 处理工具调用链
if (response.choices[0].message.tool_calls) {
response = await this.toolCalls(response, this.messages);
}
// 更新消息历史
this.messages.push(response.choices[0].message);
return response.choices[0].message.content || "";
}
/**
* 启动交互式聊天循环
*
* @returns {Promise<void>} 当用户退出时解析
*
* 功能:
* 1. 持续接收用户输入
* 2. 处理用户查询
* 3. 显示AI响应
* 4. 输入'quit'退出
*
* 实现细节:
* - 使用readline模块实现交互式输入输出
* - 循环处理直到用户输入退出命令
* - 捕获并显示处理过程中的错误
*
* 注意事项:
* - 此方法是阻塞调用,会一直运行直到用户退出
* - 确保在调用前已连接服务器
*/
async chatLoop() {
console.log("\nMCP Client Started!");
console.log("Type your queries or 'quit' to exit.");
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
while (true) {
const query = await new Promise<string>((resolve) => {
rl.question("\nQuery: ", resolve);
});
if (query.toLowerCase() === 'quit') {
break;
}
try {
const response = await this.processQuery(query);
console.log("\n" + response);
} catch (e) {
console.error("\nError:", e instanceof Error ? e.message : String(e));
}
}
rl.close();
}
/**
* 清理资源
*
* @returns {Promise<void>} 资源清理完成后解析
*
* 关闭以下资源:
* 1. MCP客户端连接
* 2. 任何打开的句柄
*
* 最佳实践:
* - 应在程序退出前调用
* - 建议在finally块中调用以确保执行
*
* 注意事项:
* - 多次调用是安全的
* - 清理后实例不可再用
*/
async cleanup() {
if (this.client) {
await this.client.close();
}
}
}
/**
* 主函数
*
* 程序入口点,执行流程:
* 1. 检查命令行参数
* 2. 创建MCP客户端实例
* 3. 连接到指定服务器脚本
* 4. 启动交互式聊天循环
* 5. 退出时清理资源
*
* @throws {Error} 如果缺少命令行参数或连接失败
*
* 使用示例:
* ```bash
* node index.js /path/to/server.js
* ```
*
* 退出码:
* - 0: 正常退出
* - 1: 参数错误或运行时错误
*/
async function main() {
if (process.argv.length < 3) {
console.log("Usage: node dist/index.js <path_to_server_script>");
process.exit(1);
}
const client = new MCPClient();
try {
await client.connectToServer(process.argv[2]);
await client.chatLoop();
} finally {
await client.cleanup();
}
}
main().catch((error) => {
console.error("Error:", error);
process.exit(1);
});
#!/usr/bin/env node import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { CallToolRequestSchema, ErrorCode, ListToolsRequestSchema, McpError, } from '@modelcontextprotocol/sdk/types.js'; import axios from 'axios';
interface Product {
id: string;
name: string;
supplier: string;
省略…
}interface Offer {
productId: string;
title: string;
companyName: string;
省略…
}interface ApiResponse {
ret: string;
encode: string;
code: number;
traceId: string;
msg: string;
time: number;
data: {
offers: Offer;
resultCount: string;
totalCount: number;
};
}class ProductOffersServer {
private server: Server;
private baseUrl = ‘换成你要调用的 url’;constructor() {
this.server = new Server(
{
name: ‘search-assistant-server’,
version: ‘0.1.0’,
},
{
capabilities: {
tools: {},
},
}
);this.setupToolHandlers();
// Error handling
this.server.onerror = (error) => console.error(‘[MCP Error]’, error);
process.on(‘SIGINT’, async () => {
await this.server.close();
process.exit(0);
});
}private async fetchOffers(keywords?: string, pageSize?: number): Promise<Product> {
try {
const params: Record<string, any> = {};
if (keywords) params.keywords = keywords;
if (pageSize) params.pageSize = pageSize;
const response = await axios.get<ApiResponse>(this.baseUrl, { params });
return response.data.data.offers.map(offer => ({
id: offer.productId,
name: offer.title,
supplier: offer.companyName
省略…
}));
} catch (error) {
if (axios.isAxiosError(error)) {
throw new McpError(
ErrorCode.InternalError,
Failed to fetch offers: ${error.message}
);
}
throw error;
}
}private setupToolHandlers() {
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: ‘get_offers’,
description: ‘Get product offers from API’,
inputSchema: {
type: ‘object’,
properties: {
keywords: {
type: ‘string’,
description: ‘Keywords to search for products’,
default: ‘’
},
pageSize: {
type: ‘number’,
description: ‘Number of items per page’,
minimum: 1,
maximum: 100,
default: 10
}
}
}
}
]
}));this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name !== ‘get_offers’) {
throw new McpError(
ErrorCode.MethodNotFound,
Unknown tool: ${request.params.name}
);
}const args = request.params.arguments as { keywords?: string; pageSize?: number };
try {
const products = await this.fetchOffers(args.keywords, args.pageSize);
return {
content: [
{
type: ‘text’,
text: JSON.stringify({
products: products,
totalCount: products.length
}, null, 2)
}
]
};
} catch (error) {
if (error instanceof McpError) {
throw error;
}
throw new McpError(
ErrorCode.InternalError,
Failed to fetch offers: ${error}
);
}
});
}async run() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error(‘Product Offers MCP server running on stdio’);
}
}
const server = new ProductOffersServer();
server.run().catch(console.error);
SelectDB 实现日志高效存储与实时分析
企业级日志数据具有数据量巨大、写入和查询速度快、结构多样的特点,本方案基于阿里云云数据库 SelectDB 版构建高性能、低成本、分析能力强大的日志存储与分析解决方案,覆盖运维监控、安全审计、业务分析等场景,并通过智能索引与分级存储实现数据亚秒级检索。
点击阅读原文查看详情。