MCP协议内部详解
今天我们说下MCP的内部代码细节,通过这些细节来讲述下MCP的工作流程。
对于MCP协议,我们的理解是MCP协议主要是让大模型具有调用工具的能力,在没有MCP协议的话,想要让模型调用工具,就需要写好API调用,认证,错误处理的逻辑。MCP协议将上下文管理和工具调用能力抽象出来,让大模型无需感应如何调用,而是只关注可以用什么资源来进行调用并进行相关的调用。
在架构之中,分为了三个部分,分别是MCP Host,比如桌面端的Claude客户端,或者Tare这样的客户端。其内部会创建MCP Client,从而和多个MCP Server建立连接,并通过诸如stdio或者HTTP+SSE协议来通信。
之后是MCP Client,其将大模型的请求进行解析并封装,打包为符合JSON-RPC 2.0的消息,然后交给Server,之后将结果返回给Host,传递给大模型的上下文之中。
最后是MCP Server,其主要提供对应的工具列表和工具执行能力。比如FastMCP,会自动注册一系列的接口,并监听Client的JSON-RPC消息,收到某个调用请求的时候,调用对应的工具并封装Result进行返回。
三者在生命周期的交互流程分为三阶段
- 初始化操作,Host的MCP Client向着Server发起initialize请求,进行确认
- 进行通信,发送各类的事件进行通信
- 关闭,任意一方调用close来断开连接。
而在其中的通信周期中,可以传递的消息类型包括
Request 发送请求,等待响应
Result 返回响应
Error 请求失败时候进行返回
Notification 一个单向的通知消息
那么我们接下来,就是根据上面说的交互逻辑和交互中涉及的请求,进行MCP内部的拆解,探讨MCP中实现的BaseSession,ClientSession,ServerSession三个基础类。
blob/main/src/mcp/client/session.py
首先是最为主要的BaseSession类,作为实现底层协议层的基础类
其负责具体消息的封装,请求和响应返回。
在这个类中,承担了对于交互相关消息的统一管理和连接的生命周期管理。
其实现了诸如 发出请求 等待响应, 发出单向通知,接收并分发对端消息的逻辑。
方便派生类的扩展
针对内部,我们看下相关的伪代码
| class Session(BaseSession[RequestT, NotificationT, ResultT]):
async def send_request( self, request: RequestT, result_type: type[Result] ) -> Result: “””发送请求并等待响应。若响应包含错误,则抛出 McpError。””” # 请求发送和响应处理逻辑 async def send_notification( self, notification: NotificationT ) -> None: “””发送无需响应的单向通知。””” # 通知发送逻辑 async def _received_request( self, responder: RequestResponder[ReceiveRequestT, ResultT] ) -> None: “””处理对方发来的请求。””” # 收到请求后的业务处理 async def _received_notification( self, notification: ReceiveNotificationT ) -> None: “””处理对方发来的通知。””” # 收到通知后的业务处理 |
在这些函数的内部,我们定义了相关的泛型,这一点可以在方法签名上看出,其定义了交互相关的类型
| Generic[
SendRequestT, # 我们能发出的 Request 类型 SendNotificationT, # 我们能发出的 Notification 类型 SendResultT, # 我们发出的 Response(Result)类型 ReceiveRequestT, # 我们会接收的 Request 类型 ReceiveNotificationT# 我们会接收的 Notification 类型 ReceiveResultT # 我们会接收的 Response(Result)类型 ] |
BaseSession的核心属性包含以下几项
- _read_stream / _write_stream:AnyIO 内存流,用于异步收发 SessionMessage(JSONRPC 封装 + 元数据)。
- _request_id:内部自增计数器,用来给每个发出的请求分配唯一 ID。
- _response_streams: dict[RequestId, MemoryObjectSendStream]:每发一个请求就开一个「答案流」,收到对应 ID 的响应或错误时就往流里写,send_request 在此流上等待结果。
- _in_flight: dict[RequestId, RequestResponder]:跟踪当前正在处理但尚未完成的对端请求,方便在收到取消通知时进行清理或超时管理。
- _exit_stack:AnyIO 的异步上下文管理栈,用来统一关闭流和任务。
除此外还有两个重要函数,分别是aenter 创建一个TaskGroup并启动一个_reveive_loop 来并行监听对端消息, aexit 依次关闭所有资源,取消轮训接收
关于业务的函数则是包含
- send_request 将高层的Pydantic模型打包为JSON-RPC消息发送出去。
其内部的逻辑为
分配请求ID,使用request_id 自增
创建响应流,利用ID注册一个缓冲大小为1的内存流
构造 JSONRPCRequest:将 Pydantic 模型 request 序列化为 JSONRPC 格式,附带 ID。
之后写入发送stream,发给对端
等待响应
并在出现错误的时候,抛出错误码
最后进行清理
- _send_response() 无论结果成功与否,都会生成JSON-RPC响应病协会
根据结果分别构建SendResultT和ErrorData进行写入。
- _receive_loop()
不断循环从流中读入任意一方的JSON-RPC消息,并分配到请求处理,回应分发的管道中
常见的比如JSONRPCRequest,这种就将请求接收到,并提取相关的响应,将responder传递给_handle_incoming等钩子。
JSONRPCResponse,根据消息ID,查找对应的response_streams并进行写入。最终给上层建筑进行返回。
在BaseSession还定义了一系列的钩子函数,方便子类去实现。
_received_request(self, responder), _handle_incoming(self, req_or_notification_or_exception)
诸如ServerSession或ClientSession会重写这些方法,并进行相关逻辑实现。
这也是我们接下来要说的两个类
这里首先是ClientSession
每一次开启一个沟通,都会ClientSession,在其内部进行初始化的时候,会将自己的超时设置,各类回调注入进实例,并且发送initialize请求,收到返回后,发出一个InitializedNotification通知完成握手。
其次是ServerSession
握手的逻辑由两部分,一部分是收到客户发来的请求,另一份则是收到第三次握手通知
因此有两个修改接口 _received_request 和 _received_notification
握手完成后,还有着check_client_capability(cap)等方法来读取客户端在初始化时候声明的能力集。从而进行权限等管理。
并且为了调用更加自然,ServerSession提供了一系列的高层接口,诸如send_log_message()
send_resource_updated,send_tool_list_changed。同样地,create_message(), list_roots()等方法封装了JSON-RPC请求的构建和发送,让服务器可以像调用本地函数一样驱动模型。
在最后,整体上来看,客户端和服务器端建立连接后,双方都只需要像在本地发起函数调用一样。诸如客户端执行await session.list_tools函数。ClientSession在内部构成一个JSON-RPC请求,通过send_request发出,由ServerSession的_receive_loop接受。封装为一个RequestResponder交给工具进行函数处理,完成后调用responder.respond()
ServerSession调用底层的_send_response将结果返回。在客户端的send_request反序列化为ListToolsResult返回给调用者。

这样,我们总结一下
Session类是MCP SDK在底层对JSON-RPC 消息手法的统一封装,由框架内部创建和管理。
通过这种方式以及诸如FastMCP的高层框架的额封装,最终让我们只需要声明相关工具函数即可。
只有需要做非常定制化的底层消息拦截,自定义请求路由的时候,才考虑修改Session。