MCP协议解析与实践:打造你的AI助手

探索MCP协议:构建AI助手,实现安全数据访问与智能化任务执行。附带实践代码,助力开发者快速上手。

原文标题:手搓Manus?MCP 原理解析与MCP Client实践

原文作者:阿里云开发者

冷月清谈:

本文深入解析了Anthropic开源的MCP(Model Context Protocol)协议,该协议旨在为AI系统提供安全、标准化的数据访问方式。文章首先介绍了MCP的基础概念,包括MCP Hosts、Clients、Servers以及数据源,阐述了协议层与传输层的工作原理,以及消息类型和生命周期。随后,作者分享了基于MCP的实践经验,旨在构建一个能够理解用户意图并调用工具的AI助手。通过一个搜索助手的MCP Server示例,详细展示了AI如何识别用户意图、选择合适的工具、与MCP Server通信并最终将结果返回给用户的全过程。文章还探讨了在工程应用中如何简化交互流程,以及如何利用MCP Server的结果进行多轮交互,实现更复杂的任务。

怜星夜思:

1、文章中提到AI在决定调用工具时,有时识别不准确或返回结构不标准,除了文中的方法外,大家有什么更好的解决思路吗?
2、文章中提到了MCP Client可以使用Python和TypeScript实现,你认为在开发AI Agent时,选择哪种语言更合适?
3、文章中提到可以将MCP Server的结果交给AI进行总结润色,也可以由系统直接接管返回。在什么场景下,你认为交给AI润色是必要的?

原文内容

阿里妹导读


本文讲述了MCP 原理解析和作者的MCP Client实践,希望能实现一个自己的 agent,让 AI 不仅能与人交流,还能协助工作。文末附源码!

MCP(Model Context Protocol)是由Anthropic于2024年底提出并开源的一种协议,旨在为AI系统(如AI编程助手、Agent等)提供安全、标准化的数据访问方式。它采用客户端-服务器架构,使AI工具(如Claude Desktop、IDE插件等)能够通过MCP客户端与MCP服务端交互,访问本地或远程数据源。  

官方文档MCP Quickstart

https://modelcontextprotocol.io/quickstart/server

基础概念

基础概念讲解总结自官方文档

MCP 是客户端-服务端架构,一个 Host 可以连接多个 MCP Server。





  • MCP Hosts(宿主程序)如Claude Desktop、IDE等,通过MCP访问数据。  

  • MCP Clients(客户端)与服务器建立1:1连接,处理通信。  

  • MCP Servers(服务端)轻量级程序,提供标准化的数据或工具访问能力。  

  • Local Data Sources(本地数据源)如文件、数据库等,由MCP服务端安全访问。  

  • Remote Services(远程服务)如API、云服务等,MCP服务端可代理访问。

协议层与传输层





协议层(Protocol Layer)
  • 负责消息封装(framing)、请求/响应关联、高级通信模式管理。

传输层(Transport Layer)

支持两种通信方式: 

1.Stdio传输(标准输入/输出)  

  • 适用于本地进程间通信。

2.HTTP + SSE传输  

  • 服务端→客户端:Server-Sent Events(SSE) 

  • 客户端→服务端:HTTP POST 

  • 适用于远程网络通信。

所有传输均采用JSON-RPC 2.0进行消息交换。  

消息类型

MCP 拥有多种类型的消息来处理不同的场景

请求(Request)(期望获得响应)

interface Request {
  method: string;
  params?: { ... };
}

成功响应(Result)

interface Result {
  [key: string]: unknown;
}

错误响应(Error)

interface Error {
  code: number;
  message: string;
  data?: unknown;
}

通知(Notification)(单向,无需响应)

interface Notification {
  method: string;
  params?: { ... };
}

生命周期

类似于三次握手,MCP客户端与MCP服务端初始化建立连接会进行以下步骤:





初始化(Initialization)  

1.客户端发送initialize请求(含协议版本、能力集)。  

2.服务端返回版本及能力信息。 

3.客户端发送initialized通知确认。  

4.进入正常通信阶段。

消息交换(Message Exchange)  

当初始化完毕,就可以进行通信了,目前支持:

  • 请求-响应模式(Request-Response):双向通信。  

  • 通知模式(Notification):单向消息。

终止(Termination)  

有以下几种方式会关闭连接

  • 主动关闭(close())。  

  • 传输层断开。 

  • 错误触发终止。

实践

基础概念介绍完毕,接下来进行实践,我希望能实现一个自己的 agent,让 AI 不仅能和我交流,还能帮我干活。换一句话就是

myAgent is a general AI agent that turns your thoughts into actions. It excels at various tasks in work and life, getting everything done while you rest.

 [手动doge][手动doge]

先画一个图:





如图,要实现一个这样的效果,实现一个 myAgent,启动时,MCP Client建立与 MCP 服务端的连接,此时 MCP Server 上的能力或者可调用的工具会注册进来, 让 Client 感知到这个MCP服务能够干啥。

当用户与 Agent 进行交互时,Agent 会让 MCP Client 将用户的输入发送给 AI,让 AI 解析用户意图,一并发送的还有注册在 Client 上的能力集合。

我写了一个搜索助手的 MCP Server ,能力集的数据是这样的,可以看到目前只有一个 function,get_offers,可以看到工具里有它的名字,能力描述,需要的字段(包含字段类型,字段描述)。

available_tools is: [{'type': 'function', 'function': {'name': 'get_offers', 'description': 'Get product offers from API', 'parameters': {'type': 'object', 'properties': {'keywords': {'type': 'string', 'description': 'Keywords to search for products', 'default': ''}, 'pageSize': {'type': 'number', 'description': 'Number of items per page', 'minimum': 1, 'maximum': 100, 'default': 10}}}}}]

AI 会理解用户意图,决定是否用自然语言回答用户,或者选择合适的工具,告诉 client,帮我调用它。

当输入 你好 时,传给 AI 的 message 是这样的,这里系统预设了 AI 的一个身份,用于更好的完成特定领域的任务。

[
    {
        "role": "user",
        "content": "你好"
    },
    {
        "role": "system",
        "content": "You're a helpful digital assistant who can answer questions and support users in completing tasks."
    }
]

此时 client 接收到 AI 的消息后,会解析数据,当没有工具要调用时

AI返回的是这样的:

ChatCompletionMessage(content='你好!有什么可以帮助你的吗?', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None)

可以看到,AI 返回是带有角色信息的,然后本次并没有识别到需要调用工具的地方,因此直接返回给用户就好,当然,在工程应用时,可以进行额外的逻辑处理。

让 AI 长出手,AI调用 MCP Server流程揭秘

当输入 帮我找一些手表 时,输入是:

[{'role': 'user', 'content': '帮我找一些手表'}, {'role': 'system', 'content': 'You are a helpful assistant that can answer questions and help with tasks.'}]
第一次AI 交互

AI返回的是

AI response is 
ChatCompletionMessage(content='', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, 
tool_calls=[ChatCompletionMessageToolCall(id='0195c8c06aaf3ea050e6d8eed17380ec', function=Function(arguments='{"keywords": "手表", "pageSize": 10}', name='get_offers'), type='function')])

可以看到,AI 识别到了用户的意图,是要寻找一些手表,并自动的选择了一个工具进行调用,根据工具使用说明,决定了选择的工具应该输入什么入参。(这里和模型很相关,是一个重要的节点,识别用户意图并决定要调用工具,有时识别的并不准确,或者返回的结构不是标准可解析的,这时就触发不了工具的调用,还会引入一些辣鸡信息,可能的解决方案是 换效果更好的模型,或者用提示词来约束模型返回,或者系统自己增加鲁棒性,提升成功率)

下面举一个效果不好的例子,大家如果知道有其他解决方法欢迎留言。

AI response is ChatCompletionMessage(content='leton\n{{"name": "get_offers", "arguments": {"keywords": "手表", "pageSize": 10}}}\n</tool_call>', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None)
Client 调用 MCP Server

client 接收到AI 的消息后,发现要调用工具,并且也有了工具所需的参数,就会与通过协议与 MCP Server 进行通信,告诉 MCP Server 该使用 get_offers 能力了,并且期待 MCP Server 将结果返回回来:

result = await self.session.call_tool(tool_name, tool_args)
获取 MCP Server 数据

 MCP Server 不负众望,将结果返回了,可以看看返回的格式是什么样的:

meta=None content=[TextContent(type='text', text='some...product...info', annotations=None)] isError=False

MCP Client 拿到数据后,再将数据发送给 AI,输入是这样的:

{
  "messages": [
    {
      "role": "user",
      "content": "帮我找一些手表"
    },
    {
      "role": "system",
      "content": "You are a helpful assistant that can answer questions and help with tasks."
    },
    {
      "role": "assistant",
      "content": "",
      "tool_calls": [
        {
          "id": "0195c8c06aaf3ea050e6d8eed17380ec",
          "type": "function",
          "function": {
            "name": "get_offers",
            "arguments": "{\"keywords\": \"手表\", \"pageSize\": 10}"
          }
        }
      ]
    },
    {
      "role": "tool",
      "tool_call_id": "0195c8c06aaf3ea050e6d8eed17380ec",
      "content": {
        "type": "text",
        "text": "some...product...info"
      }
    }
  ]
}
第二次 AI 交互

最后,AI 将 MCPServer 的结果,进行总结润色,结构化返回:

🤖AI: [📞调用工具 get_offers 🔑参数是 {'keywords': '手表', 'pageSize': 10}]
根据您的搜索,这里有几款手表供您参考:

1. 款式 ID: ididid2
   价格: $0.24
   供应商: Guangzhou Huapan Cosmetics Co., Ltd.
   评分: **
   收评次数: **
   供应商年限: **年
   推荐指数: ★★

2. 款式 ID: ididid
   价格: $3.99
   供应商: Shenzhen Top Quality Electronics Technology Co., Ltd.
   评分: **
   收评次数: **
   供应商年限: **年
   推荐指数: ★★

这两款手表的评价和销售情况都还不错,您可以根据自己的需求选择合适的款式。如果还有其他问题或需要更多信息,请随时告诉我。

这里给了 它 10 个品 ,但是只总结了两个品,可能适合我之前的输入 帮我找一些手表 有关,看来AI 也会偷懒😅。

实际效果

图片

实践后的总结

上面的交互过程,其实可以化简,如果在工程应用上,调用的 MCP Server 是一个预期内的结构化的结果或者触发某个任务时,可以不必进行二次 AI 调用。如上面的例子中,MCP Server 是一个搜索助手,内部发起调用的是搜索的接口,并进行结构化返回。此时在 AI 识别到用户意图并告诉 Client 该调用什么工具时,与 AI 的交互就可以结束了,由系统接管决定应该返回给用户什么,不必再将结果给到 AI 进行润色总结。

给到 AI 进行润色总结的好处是可以根据用户的输入,再结合工具获取的数据,更智能友好的返回给用户信息,这一点可以在工程应用时,进行衡量取舍。

将MCP Server 的结果交给 AI,在需要进行多轮交互场景是有必要的,根据 MCP Server的结果,进行分析及决策,动态调整要使用的工具,可以将一个复杂的任务交给 AI , 它会自己拆解成小任务,然后自动完成。 

对于该场景,也进行了一些实践。

例如,让Al agent拆解抽象任务,并自己主动与系统进行多轮交互,完成任务场景。

后续文章我们会详细介绍具体实现方法,讲讲如何让Al在我的电脑上玩起贪吃蛇?

图片

附录

MCP Client代码 (Python实现)
import asyncio
import json
import os
import traceback
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()  # load environment variables from .env
class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.client = OpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            base_url=os.getenv("OPENAI_BASE_URL")
        )
        self.model = os.getenv("OPENAI_MODEL")
        self.messages = [
            {
                "role": "system",
                "content": "You are a versatile assistant capable of answering questions, completing tasks, and intelligently invoking specialized tools to deliver optimal results."
            }
        ]
        self.available_tools = []
    
    @staticmethod
    def convert_custom_object(obj):
        """
        将自定义对象转换为字典
        """
        if hasattr(obj, "__dict__"):  # 如果对象有 __dict__ 属性,直接使用
            return obj.__dict__
        elif isinstance(obj, (list, tuple)):  # 如果是列表或元组,递归处理
            return [MCPClient.convert_custom_object(item) for item in obj]
        elif isinstance(obj, dict):  # 如果是字典,递归处理值
            return {key: MCPClient.convert_custom_object(value) for key, value in obj.items()}
        else:  # 其他类型(如字符串、数字等)直接返回
            return obj
        
    async def connect_to_server(self, server_script_path: str):
        """Connect to an MCP server
        
        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("Server script must be a .py or .js file")
            
        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None
        )
        
        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
        
        await self.session.initialize()
        
        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])
    async def process_query(self, query: str) -> str:
        """Process a query with multi-turn tool calling support"""
        # Add user query to message history
        self.messages.append({
            "role": "user",
            "content": query
        })
        # Get available tools if not already set
        if not self.available_tools:
            response = await self.session.list_tools()
            self.available_tools = [{
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema
                }
            } for tool in response.tools]
        current_response = self.client.chat.completions.create(
            model=self.model,
            messages=self.messages,
            tools=self.available_tools,
            stream=False
        )
        # Print initial response if exists
        if current_response.choices[0].message.content:
            print("\n🤖 AI:", current_response.choices[0].message.content)
        # 直到下一次交互 AI 没有选择调用工具时退出循环
        while current_response.choices[0].message.tool_calls:
            # AI 一次交互中可能会调用多个工具
            for tool_call in current_response.choices[0].message.tool_calls:
                tool_name = tool_call.function.name
                try:
                    tool_args = json.loads(tool_call.function.arguments)
                except json.JSONDecodeError:
                    tool_args = {}
                print(f"\n🔧 调用工具 {tool_name}")
                print(f"📝 参数: {tool_args}")
                # Execute tool call
                result = await self.session.call_tool(tool_name, tool_args)
                print(f"\n工具结果: {result}")
                # Add AI message and tool result to history
                self.messages.append(current_response.choices[0].message)
                self.messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": json.dumps(result.content)
                })
            # Get next response
            current_response = self.client.chat.completions.create(
                model=self.model,
                messages=self.messages,
                tools=self.available_tools,
                stream=False
            )
        # Add final response to history
        self.messages.append(current_response.choices[0].message)
        return current_response.choices[0].message.content or ""
    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        
        while True:
            try:
                query = input("\nCommend: ").strip()
                
                if query.lower() == 'quit':
                    break
                    
                response = await self.process_query(query)
                print("\n🤖AI: " + response)
                    
            except Exception as e:
                print(f"\nError occurs: {e}")
                traceback.print_exc()
    
    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()
async def main():
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)
        
    client = MCPClient()
    try:
        await client.connect_to_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()
if __name__ == "__main__":
    import sys
    asyncio.run(main())
MCP Client代码 (TypeScript实现)
/**
 * MCP客户端实现
 * 
 * 提供与MCP服务器的连接、工具调用和聊天交互功能
 * 
 * 主要功能:
 * 1. 连接Python或JavaScript实现的MCP服务器
 * 2. 获取服务器提供的工具列表
 * 3. 通过OpenAI API处理用户查询
 * 4. 自动处理工具调用链
 * 5. 提供交互式命令行界面
 * 
 * 使用说明:
 * 1. 确保设置OPENAI_API_KEY环境变量
 * 2. 通过命令行参数指定MCP服务器脚本路径
 * 3. 启动后输入查询或'quit'退出
 * 
 * 依赖:
 * - @modelcontextprotocol/sdk: MCP协议SDK
 * - openai: OpenAI API客户端
 * - dotenv: 环境变量加载
 */
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import OpenAI from "openai";
import type { ChatCompletionMessageParam } from "openai/resources/chat/completions";
import type { Tool } from "@modelcontextprotocol/sdk/types.js";
import * as dotenv from "dotenv";
import * as readline from 'readline';
// 加载环境变量配置
dotenv.config();
/**
 * MCP客户端类,封装与MCP服务器的交互逻辑
 */
class MCPClient {
    private openai: OpenAI; // OpenAI API客户端实例
    private client: Client; // MCP协议客户端实例
    private messages: ChatCompletionMessageParam[] = [
        {
            role: "system",
            content: "You are a versatile assistant capable of answering questions, completing tasks, and intelligently invoking specialized tools to deliver optimal results."
        },
    ]; // 聊天消息历史记录,用于维护对话上下文
    private availableTools: any[] = []; // 服务器提供的可用工具列表,格式化为OpenAI工具格式
    /**
     * 构造函数,初始化OpenAI和MCP客户端
     * 
     * @throws {Error} 如果OPENAI_API_KEY环境变量未设置
     * 
     * 初始化过程:
     * 1. 检查必要的环境变量
     * 2. 创建OpenAI客户端实例
     * 3. 创建MCP客户端实例
     * 4. 初始化消息历史记录
     */
    constructor() {
        if (!process.env.OPENAI_API_KEY) {
            throw new Error("OPENAI_API_KEY环境变量未设置");
        }
        this.openai = new OpenAI({
            apiKey: process.env.OPENAI_API_KEY,
            baseURL: process.env.OPENAI_BASE_URL,
        });
        this.client = new Client(
            {
                name: "my-mcp-client",
                version: "1.0.0",
            },
        );
    }
    /**
     * 连接到MCP服务器
     * 
     * @param {string} serverScriptPath - 服务器脚本路径(.py或.js)
     * @returns {Promise<void>} 连接成功时解析
     * @throws {Error} 如果服务器脚本不是.py或.js文件,或连接失败
     * 
     * 连接过程:
     * 1. 检查脚本文件扩展名
     * 2. 根据扩展名决定使用python或node执行
     * 3. 通过stdio建立连接
     * 4. 获取服务器工具列表并转换为OpenAI工具格式
     * 
     * 注意事项:
     * - 服务器脚本必须具有可执行权限
     * - 连接成功后会自动获取工具列表
     */
    async connectToServer(serverScriptPath: string) {
        const isPython = serverScriptPath.endsWith('.py');
        const isJs = serverScriptPath.endsWith('.js');
        if (!isPython && !isJs) {
            throw new Error("Server script must be a .py or .js file");
        }
        const command = isPython ? "python" : "node";
        const transport = new StdioClientTransport({
            command,
            args: [serverScriptPath],
        });
        await this.client.connect(transport);
        // 获取并转换可用工具列表
        const tools = (await this.client.listTools()).tools as unknown as Tool[];
        this.availableTools = tools.map(tool => ({
            type: "function" as const,
            function: {
                name: tool.name as string,
                description: tool.description as string,
                parameters: {
                    type: "object",
                    properties: tool.inputSchema.properties as Record<string, unknown>,
                    required: tool.inputSchema.required as string[],
                },
            }
        }));
        console.log("\n已连接到服务器,可用工具:", tools.map(tool => tool.name));
    }
    /**
     * 处理工具调用链
     * 
     * @param {OpenAI.Chat.Completions.ChatCompletion} response - 初始OpenAI响应,包含工具调用
     * @param {ChatCompletionMessageParam[]} messages - 当前消息历史记录
     * @returns {Promise<OpenAI.Chat.Completions.ChatCompletion>} 最终OpenAI响应
     * 
     * 处理流程:
     * 1. 检查响应中是否包含工具调用
     * 2. 循环处理所有工具调用
     * 3. 解析每个工具调用的参数
     * 4. 执行工具调用
     * 5. 将工具结果添加到消息历史
     * 6. 获取下一个OpenAI响应
     * 
     * 错误处理:
     * - 参数解析失败时使用空对象继续执行
     * - 工具调用失败会抛出异常
     * 
     * 注意事项:
     * - 此方法会修改传入的messages数组
     * - 可能多次调用OpenAI API
     */
    private async toolCalls(response: OpenAI.Chat.Completions.ChatCompletion, messages: ChatCompletionMessageParam[]) {
        let currentResponse = response;
        // 直到下一次交互 AI 没有选择调用工具时退出循环
        while (currentResponse.choices[0].message.tool_calls) {
            if (currentResponse.choices[0].message.content) {
                console.log("\n🤖 AI: tool_calls", JSON.stringify(currentResponse.choices[0].message));
            }
            // AI 一次交互中可能会调用多个工具
            for (const toolCall of currentResponse.choices[0].message.tool_calls) {
                const toolName = toolCall.function.name;
                const rawArgs = toolCall.function.arguments;
                let toolArgs;
                try {
                    console.log(`rawArgs is ===== ${rawArgs}`)
                    toolArgs = "{}" == JSON.parse(rawArgs) ? {} : JSON.parse(rawArgs);
                    if (typeof toolArgs === "string") {
                        toolArgs = JSON.parse(toolArgs);
                    }
                } catch (error) {
                    console.error('⚠️ 参数解析失败,使用空对象替代');
                    toolArgs = {};
                }
                console.log(`\n🔧 调用工具 ${toolName}`);
                console.log(`📝 参数:`, toolArgs);
                // 调用工具获取结果
                const result = await this.client.callTool({
                    name: toolName,
                    arguments: toolArgs
                });
                console.log(`\n result is ${JSON.stringify(result)}`);
                // 添加 AI 的响应和工具调用结果到消息历史
                // console.log(`📝 currentResponse.choices[0].message:`, currentResponse.choices[0].message);
                messages.push(currentResponse.choices[0].message);
                messages.push({
                    role: "tool",
                    tool_call_id: toolCall.id,
                    content: JSON.stringify(result.content),
                } as ChatCompletionMessageParam);
            }
            // console.log(`📝 messages: `, messages);
            // 获取下一个响应
            currentResponse = await this.openai.chat.completions.create({
                model: process.env.OPENAI_MODEL as string,
                messages: messages,
                tools: this.availableTools,
            });
        }
        return currentResponse;
    }
    /**
     * 处理用户查询
     * 
     * @param {string} query - 用户输入的查询字符串
     * @returns {Promise<string>} AI生成的响应内容
     * 
     * 处理流程:
     * 1. 将用户查询添加到消息历史
     * 2. 调用OpenAI API获取初始响应
     * 3. 如果有工具调用,处理工具调用链
     * 4. 返回最终响应内容
     * 
     * 错误处理:
     * - OpenAI API调用失败会抛出异常
     * - 工具调用链中的错误会被捕获并记录
     * 
     * 注意事项:
     * - 此方法会更新内部消息历史
     * - 可能触发多个工具调用
     */
    async processQuery(query: string): Promise<string> {
        // 添加用户查询到消息历史
        this.messages.push({
            role: "user",
            content: query,
        });
        // 初始OpenAI API调用
        let response = await this.openai.chat.completions.create({
            model: process.env.OPENAI_MODEL as string,
            messages: this.messages,
            tools: this.availableTools,
        });
        // 打印初始响应
        if (response.choices[0].message.content) {
            console.log("\n🤖 AI:", response.choices[0].message);
        }
        // 处理工具调用链
        if (response.choices[0].message.tool_calls) {
            response = await this.toolCalls(response, this.messages);
        }
        // 更新消息历史
        this.messages.push(response.choices[0].message);
        return response.choices[0].message.content || "";
    }
    /**
     * 启动交互式聊天循环
     * 
     * @returns {Promise<void>} 当用户退出时解析
     * 
     * 功能:
     * 1. 持续接收用户输入
     * 2. 处理用户查询
     * 3. 显示AI响应
     * 4. 输入'quit'退出
     * 
     * 实现细节:
     * - 使用readline模块实现交互式输入输出
     * - 循环处理直到用户输入退出命令
     * - 捕获并显示处理过程中的错误
     * 
     * 注意事项:
     * - 此方法是阻塞调用,会一直运行直到用户退出
     * - 确保在调用前已连接服务器
     */
    async chatLoop() {
        console.log("\nMCP Client Started!");
        console.log("Type your queries or 'quit' to exit.");
        const rl = readline.createInterface({
            input: process.stdin,
            output: process.stdout,
        });
        while (true) {
            const query = await new Promise<string>((resolve) => {
                rl.question("\nQuery: ", resolve);
            });
            if (query.toLowerCase() === 'quit') {
                break;
            }
            try {
                const response = await this.processQuery(query);
                console.log("\n" + response);
            } catch (e) {
                console.error("\nError:", e instanceof Error ? e.message : String(e));
            }
        }
        rl.close();
    }
    /**
     * 清理资源
     * 
     * @returns {Promise<void>} 资源清理完成后解析
     * 
     * 关闭以下资源:
     * 1. MCP客户端连接
     * 2. 任何打开的句柄
     * 
     * 最佳实践:
     * - 应在程序退出前调用
     * - 建议在finally块中调用以确保执行
     * 
     * 注意事项:
     * - 多次调用是安全的
     * - 清理后实例不可再用
     */
    async cleanup() {
        if (this.client) {
            await this.client.close();
        }
    }
}
/**
 * 主函数
 * 
 * 程序入口点,执行流程:
 * 1. 检查命令行参数
 * 2. 创建MCP客户端实例
 * 3. 连接到指定服务器脚本
 * 4. 启动交互式聊天循环
 * 5. 退出时清理资源
 * 
 * @throws {Error} 如果缺少命令行参数或连接失败
 * 
 * 使用示例:
 * ```bash
 * node index.js /path/to/server.js
 * ```
 * 
 * 退出码:
 * - 0: 正常退出
 * - 1: 参数错误或运行时错误
 */
async function main() {
    if (process.argv.length < 3) {
        console.log("Usage: node dist/index.js <path_to_server_script>");
        process.exit(1);
    }
    const client = new MCPClient();
    try {
        await client.connectToServer(process.argv[2]);
        await client.chatLoop();
    } finally {
        await client.cleanup();
    }
}
main().catch((error) => {
    console.error("Error:", error);
    process.exit(1);
});
MCP Server 代码 (TypeScript实现)
#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ErrorCode,
  ListToolsRequestSchema,
  McpError,
} from '@modelcontextprotocol/sdk/types.js';
import axios from 'axios';

interface Product {
  id: string;
  name: string;
  supplier: string;
  省略…
}

interface Offer {
  productId: string;
  title: string;
  companyName: string;
  省略…
}

interface ApiResponse {
  ret: string;
  encode: string;
  code: number;
  traceId: string;
  msg: string;
  time: number;
  data: {
    offers: Offer;
    resultCount: string;
    totalCount: number;
  };
}

class ProductOffersServer {
  private server: Server;
  private baseUrl = ‘换成你要调用的 url’;

  constructor() {
    this.server = new Server(
      {
        name: ‘search-assistant-server’,
        version: ‘0.1.0’,
      },
      {
        capabilities: {
          tools: {},
        },
      }
    );

    this.setupToolHandlers();
    
    // Error handling
    this.server.onerror = (error) => console.error(‘[MCP Error]’, error);
    process.on(‘SIGINT’, async () => {
      await this.server.close();
      process.exit(0);
    });
  }

  private async fetchOffers(keywords?: string, pageSize?: number): Promise<Product> {
    try {
      const params: Record<string, any> = {};
      if (keywords) params.keywords = keywords;
      if (pageSize) params.pageSize = pageSize;
      const response = await axios.get<ApiResponse>(this.baseUrl, { params });
      
      return response.data.data.offers.map(offer => ({
        id: offer.productId,
        name: offer.title,
        supplier: offer.companyName
        省略…
      }));
    } catch (error) {
      if (axios.isAxiosError(error)) {
        throw new McpError(
          ErrorCode.InternalError,
          Failed to fetch offers:&nbsp;${error.message}
        );
      }
      throw error;
    }
  }

  private setupToolHandlers() {
    this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
      tools: [
        {
          name: ‘get_offers’,
          description: ‘Get product offers from API’,
          inputSchema: {
            type: ‘object’,
            properties: {
              keywords: {
                type: ‘string’,
                description: ‘Keywords to search for products’,
                default: ‘’
              },
              pageSize: {
                type: ‘number’,
                description: ‘Number of items per page’,
                minimum: 1,
                maximum: 100,
                default: 10
              }
            }
          }
        }
      ]
    }));

    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      if (request.params.name !== ‘get_offers’) {
        throw new McpError(
          ErrorCode.MethodNotFound,
          Unknown tool:&nbsp;${request.params.name}
        );
      }

      const args = request.params.arguments as { keywords?: string; pageSize?: number };

      try {
        const products = await this.fetchOffers(args.keywords, args.pageSize);
        return {
          content: [
            {
              type: ‘text’,
              text: JSON.stringify({
                products: products,
                totalCount: products.length
              }, null, 2)
            }
          ]
        };
      } catch (error) {
        if (error instanceof McpError) {
          throw error;
        }
        throw new McpError(
          ErrorCode.InternalError,
          Failed to fetch offers:&nbsp;${error}
        );
      }
    });
  }

  async run() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
    console.error(‘Product Offers MCP server running on stdio’);
  }
}

const server = new ProductOffersServer();
server.run().catch(console.error);

SelectDB 实现日志高效存储与实时分析


企业级日志数据具有数据量巨大、写入和查询速度快、结构多样的特点,本方案基于阿里云云数据库 SelectDB 版构建高性能、低成本、分析能力强大的日志存储与分析解决方案,覆盖运维监控、安全审计、业务分析等场景,并通过智能索引与分级存储实现数据亚秒级检索。


点击阅读原文查看详情。

Python一时爽,重构火葬场。TypeScript 虽然前期麻烦点,但后期维护起来真的省心。特别是大型项目,类型检查能帮你避免好多低级错误。而且现在Vscode 对 TS 的支持也很好,写起来也挺舒服的。

可以考虑在模型输出后增加一个校验层,使用正则表达式或JSON Schema等方式对模型输出进行校验,如果不符合预期则进行重试或者纠正。此外,还可以引入人工干预,对识别不准确的案例进行标注,然后用这些数据微调模型,提高识别准确率。

我觉得这取决于你团队的技术栈。如果团队成员更熟悉JavaScript/TypeScript,那么选择TypeScript可以降低学习成本,提高开发效率。如果团队更擅长Python,那么就选择Python。技术选型要服务于团队和项目,而不是反过来。

当MCP Server返回的结果比较raw,或者缺乏上下文信息时,交给AI润色是很有必要的。AI可以根据用户的输入,结合工具返回的数据,生成更自然、更易理解的回复。例如,当用户查询一个比较宽泛的问题,而工具返回了大量结果时,AI可以对结果进行过滤、排序和总结,提供更符合用户需求的信息。

如果是快速原型验证,我会选择Python。Python生态中有大量的AI/ML库,例如OpenAI的Python SDK使用起来非常方便。但如果是对性能有较高要求的生产环境,TypeScript可能更合适,它可以提供更好的类型安全性和编译时错误检查,有助于构建更健壮的系统。

别想太复杂,问题往往出在数据质量上。检查一下你的训练数据,看看是不是有噪声数据或者标注错误。清洗数据,重新训练,可能效果比你想象的要好。实在不行,就加钱上更牛的模型。

我觉得可以尝试引入Prompt Engineering的技巧,在Prompt中加入更明确的约束条件,例如指定返回的JSON格式,或者加入一些示例,让模型更好地理解任务要求。另外,可以考虑使用一些专门针对工具调用进行优化的模型微调方法。

看情况吧,如果只是简单的信息展示,系统接管也OK。但如果涉及到情感交流、复杂推理或者需要根据用户反馈进行调整的场景,那还是得交给AI。记住,AI的价值在于它的智能,而不是简单地替代人工。

如果你的产品注重用户体验,那么交给AI润色绝对是加分项。AI可以像一个贴心的助手一样,理解用户的语气和情感,生成更人性化的回复。此外,在多轮对话的场景下,AI还可以根据上下文信息,动态调整回复的内容和风格,提供更流畅的对话体验。