MCP协议内部详解

今天我们说下MCP的内部代码细节,通过这些细节来讲述下MCP的工作流程。

对于MCP协议,我们的理解是MCP协议主要是让大模型具有调用工具的能力,在没有MCP协议的话,想要让模型调用工具,就需要写好API调用,认证,错误处理的逻辑。MCP协议将上下文管理和工具调用能力抽象出来,让大模型无需感应如何调用,而是只关注可以用什么资源来进行调用并进行相关的调用。

在架构之中,分为了三个部分,分别是MCP Host,比如桌面端的Claude客户端,或者Tare这样的客户端。其内部会创建MCP Client,从而和多个MCP Server建立连接,并通过诸如stdio或者HTTP+SSE协议来通信。

之后是MCP Client,其将大模型的请求进行解析并封装,打包为符合JSON-RPC 2.0的消息,然后交给Server,之后将结果返回给Host,传递给大模型的上下文之中。

最后是MCP Server,其主要提供对应的工具列表和工具执行能力。比如FastMCP,会自动注册一系列的接口,并监听Client的JSON-RPC消息,收到某个调用请求的时候,调用对应的工具并封装Result进行返回。

三者在生命周期的交互流程分为三阶段

  1. 初始化操作,Host的MCP Client向着Server发起initialize请求,进行确认
  2. 进行通信,发送各类的事件进行通信
  3. 关闭,任意一方调用close来断开连接。

而在其中的通信周期中,可以传递的消息类型包括

Request 发送请求,等待响应

Result 返回响应

Error 请求失败时候进行返回

Notification 一个单向的通知消息

那么我们接下来,就是根据上面说的交互逻辑和交互中涉及的请求,进行MCP内部的拆解,探讨MCP中实现的BaseSession,ClientSession,ServerSession三个基础类。

blob/main/src/mcp/client/session.py

首先是最为主要的BaseSession类,作为实现底层协议层的基础类

其负责具体消息的封装,请求和响应返回。

在这个类中,承担了对于交互相关消息的统一管理和连接的生命周期管理。

其实现了诸如 发出请求 等待响应, 发出单向通知,接收并分发对端消息的逻辑。

方便派生类的扩展

针对内部,我们看下相关的伪代码

class Session(BaseSession[RequestT, NotificationT, ResultT]):

async def send_request(

self,

request: RequestT,

result_type: type[Result]

) -> Result:

“””发送请求并等待响应。若响应包含错误,则抛出 McpError。”””

# 请求发送和响应处理逻辑

async def send_notification(

self,

notification: NotificationT

) -> None:

“””发送无需响应的单向通知。”””

# 通知发送逻辑

async def _received_request(

self,

responder: RequestResponder[ReceiveRequestT, ResultT]

) -> None:

“””处理对方发来的请求。”””

# 收到请求后的业务处理

async def _received_notification(

self,

notification: ReceiveNotificationT

) -> None:

“””处理对方发来的通知。”””

# 收到通知后的业务处理

在这些函数的内部,我们定义了相关的泛型,这一点可以在方法签名上看出,其定义了交互相关的类型

Generic[

SendRequestT, # 我们能发出的 Request 类型

SendNotificationT, # 我们能发出的 Notification 类型

SendResultT, # 我们发出的 Response(Result)类型

ReceiveRequestT, # 我们会接收的 Request 类型

ReceiveNotificationT# 我们会接收的 Notification 类型

ReceiveResultT # 我们会接收的 Response(Result)类型

]

BaseSession的核心属性包含以下几项

  1. _read_stream / _write_stream:AnyIO 内存流,用于异步收发 SessionMessage(JSONRPC 封装 + 元数据)。
  2. _request_id:内部自增计数器,用来给每个发出的请求分配唯一 ID。
  3. _response_streams: dict[RequestId, MemoryObjectSendStream]:每发一个请求就开一个「答案流」,收到对应 ID 的响应或错误时就往流里写,send_request 在此流上等待结果。
  4. _in_flight: dict[RequestId, RequestResponder]:跟踪当前正在处理但尚未完成的对端请求,方便在收到取消通知时进行清理或超时管理。
  5. _exit_stack:AnyIO 的异步上下文管理栈,用来统一关闭流和任务。

除此外还有两个重要函数,分别是aenter 创建一个TaskGroup并启动一个_reveive_loop 来并行监听对端消息, aexit 依次关闭所有资源,取消轮训接收

关于业务的函数则是包含

  1. send_request 将高层的Pydantic模型打包为JSON-RPC消息发送出去。

其内部的逻辑为

分配请求ID,使用request_id 自增

创建响应流,利用ID注册一个缓冲大小为1的内存流

构造 JSONRPCRequest:将 Pydantic 模型 request 序列化为 JSONRPC 格式,附带 ID。

之后写入发送stream,发给对端

等待响应

并在出现错误的时候,抛出错误码

最后进行清理

  1. _send_response() 无论结果成功与否,都会生成JSON-RPC响应病协会

根据结果分别构建SendResultT和ErrorData进行写入。

  1. _receive_loop()

不断循环从流中读入任意一方的JSON-RPC消息,并分配到请求处理,回应分发的管道中

常见的比如JSONRPCRequest,这种就将请求接收到,并提取相关的响应,将responder传递给_handle_incoming等钩子。

JSONRPCResponse,根据消息ID,查找对应的response_streams并进行写入。最终给上层建筑进行返回。

在BaseSession还定义了一系列的钩子函数,方便子类去实现。

_received_request(self, responder), _handle_incoming(self, req_or_notification_or_exception)

诸如ServerSession或ClientSession会重写这些方法,并进行相关逻辑实现。

这也是我们接下来要说的两个类

这里首先是ClientSession

每一次开启一个沟通,都会ClientSession,在其内部进行初始化的时候,会将自己的超时设置,各类回调注入进实例,并且发送initialize请求,收到返回后,发出一个InitializedNotification通知完成握手。

其次是ServerSession

握手的逻辑由两部分,一部分是收到客户发来的请求,另一份则是收到第三次握手通知

因此有两个修改接口 _received_request 和 _received_notification

握手完成后,还有着check_client_capability(cap)等方法来读取客户端在初始化时候声明的能力集。从而进行权限等管理。

并且为了调用更加自然,ServerSession提供了一系列的高层接口,诸如send_log_message()

send_resource_updated,send_tool_list_changed。同样地,create_message(), list_roots()等方法封装了JSON-RPC请求的构建和发送,让服务器可以像调用本地函数一样驱动模型。

在最后,整体上来看,客户端和服务器端建立连接后,双方都只需要像在本地发起函数调用一样。诸如客户端执行await session.list_tools函数。ClientSession在内部构成一个JSON-RPC请求,通过send_request发出,由ServerSession的_receive_loop接受。封装为一个RequestResponder交给工具进行函数处理,完成后调用responder.respond()

ServerSession调用底层的_send_response将结果返回。在客户端的send_request反序列化为ListToolsResult返回给调用者。

这样,我们总结一下

Session类是MCP SDK在底层对JSON-RPC 消息手法的统一封装,由框架内部创建和管理。

通过这种方式以及诸如FastMCP的高层框架的额封装,最终让我们只需要声明相关工具函数即可。

只有需要做非常定制化的底层消息拦截,自定义请求路由的时候,才考虑修改Session。