MCP协议二:MCP协议下的RAG框架
现在我们基于MCP协议开发一个MCP+FAISS的RAG框架。
MCP服务将具备端到端的索引,检索以及工具提供能力。
原生的RAG框架由三部分组成,分别是嵌入-检索-生成三部分构成、
首先将用户问题转换为向量,然后基于向量来检索相关文档块,之后将文档块和问题一起作为上下文,交给大模型来检索并生成答案。
而我们的思路则是将嵌入的相关服务和检索的相关服务以工具的方式通过MCP协议提供给Client。让Client可以调用对应的工具获取文档并生成返回结果。
这里主要的代码存在于服务器端 index_docs和retrieve_docs 用于实现向量检索和文档更新
Index_docs 上传并将新文档的向量添加到索引中,将文本保存在_docs列表中,retrieve_docs用于查询向量,返回对应的文本片段。
那么我们具体看下对应的代码实现
对应的服务端的代码,首先是导入相关的苦,主要的是fastmcp的导入。
|
from typing import List
import faiss import numpy as np from openai import OpenAI from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv print(“load_dotenv”) load_dotenv() |
其次是在fastmcp模块中初始化一个mcp server
mcp = FastMcp(“rag”)
FastMCP是一个基于MCP协议封装的高层框架,可以基于简单的装饰器和管理器来快速创建和部署工具,资源以及提示。
其抹去了用户不需要关系的诸如 list_tools call_tool list_resources list_prompts这些交互,一方面支持更多的交互协议,一方面让开发者只需要聚焦于具体的Python函数实现上。
FastMCP支持标准的IO,SSE,流式HTTP等传输协议,提供run的一键式启动
这里我们以一个工具的封装为示例看下
|
@mcp.tool()
async def index_docs(docs: List[str]) -> str: “””将一批文档加入索引。 Args: docs: 文本列表 “”” global _index, _docs embeddings = await embed_text(docs) _index.add(embeddings.astype(‘float32’)) _docs.extend(docs) return f”已索引 {len(docs)} 篇文档,总文档数:{len(_docs)}” |
其中的核心无非就是index变量的初始化以及embed_text函数的实现
最终就是返回给客户端新增了多少个索引
关于_index变量的初始化,是进行一个数组的初始化
_index: faiss.IndexFlatL2 = faiss.IndexFlatL2(1536)
_docs: List[str] = []
然后是关于这个embed_text函数的实现
|
# OpenAI API(用于生成嵌入)
openai = OpenAI() async def embed_text(texts: List[str]) -> np.ndarray: resp = openai.embeddings.create( model=”text-embedding-3-small”, input=texts, encoding_format=”float” ) return np.array([d.embedding for d in resp.data], dtype=’float32′) |
利用OpenAI客户端异步调用text-embedding-3-small模型,一次性将字符串列表生成嵌入向量,将返回结果的d.embedding提取,组成一个float32类型的NumPy数组,进行返回。
并将其塞入到数组之中。
之后是retrieve_docs的相关文档
|
@mcp.tool()
async def retrieve_docs(query: str, top_k: int = 3) -> str: “””检索最相关文档片段。 Args: query: 用户查询 top_k: 返回的文档数 “”” q_emb = await embed_text([query]) D, I = _index.search(q_emb.astype(‘float32’), top_k) results = [f”[{i}] {_docs[i]}” for i in I[0] if i < len(_docs)] return “\n\n”.join(results) if results else “未检索到相关文档。” |
这是一个异步的操作,先行对用户查询query进行embed_text的调用
然后调用index自身的search在索引中进行相近邻搜索,得到距离数组D和索引数组l
根据查找到的结果,从其中取出文档并拼接后返回。
如果没有命中任何的文档,直接返回未检索到相关文档。
|
if __name__ == “__main__”:
mcp.run(transport=”stdio”) # mcp.run(transport=”tcp”, host=”127.0.0.1″, port=8000) |
这里调用了mcp.run(transport = “stdio”) 基于当前的进程启动一个基于标准输入输出的MCP客户端,监听stdin的消息流。
其次就是客户端的代码开发,客户端的代码在于rag-client目录下运行。
这里就是利用mcp包中的stdio_client来进行实现
|
class RagClient:
def __init__(self): self.session = None self.transport = None # 用来保存 stdio_client 的上下文管理器 self.client = OpenAI( api_key=os.getenv(“DEEPSEEK_API_KEY”), base_url=”https://api.deepseek.com” ) self.tools = None # 将在 connect 时从服务器获取 async def connect(self, server_script: str): # 1) 构造参数对象 params = StdioServerParameters( command=”你的Repo路径/mcp-in-action/02-mcp-rag/server/.venv/bin/python”, args=[server_script], # command=”uv”, # args=[“run”, server_script], ) # 2) 保存上下文管理器 self.transport = stdio_client(params) # 3) 进入上下文,拿到 stdio, write self.stdio, self.write = await self.transport.__aenter__() # 4) 初始化 MCP 会话 self.session = await ClientSession(self.stdio, self.write).__aenter__() await self.session.initialize() # 必须要有,否则无法初始化对话 # 5) 获取服务器端定义的工具 resp = await self.session.list_tools() self.tools = [{ “type”: “function”, “function”: { “name”: tool.name, “description”: tool.description, “parameters”: tool.inputSchema } } for tool in resp.tools] print(“可用工具:”, [t[“function”][“name”] for t in self.tools]) async def query(self, q: str): # 初始化对话消息 messages = [ {“role”: “system”, “content”: “你是一个专业的医学助手,请根据提供的医学文档回答问题。如果用户的问题需要查询医学知识,请使用列表中的工具来获取相关信息。”}, {“role”: “user”, “content”: q} ] while True: try: # 调用 DeepSeek API response = self.client.chat.completions.create( model=”deepseek-chat”, messages=messages, tools=self.tools, tool_choice=”auto” ) message = response.choices[0].message messages.append(message) # 如果没有工具调用,直接返回回答 if not message.tool_calls: return message.content # 处理工具调用 for tool_call in message.tool_calls: # 解析工具参数 args = json.loads(tool_call.function.arguments) # 调用工具 result = await self.session.call_tool( tool_call.function.name, args ) # 将工具调用结果添加到对话历史 messages.append({ “role”: “tool”, “content”: str(result), # 确保结果是字符串 “tool_call_id”: tool_call.id }) except Exception as e: print(f”发生错误: {str(e)}”) return “抱歉,处理您的请求时出现了问题。” async def close(self): try: # 先关闭 MCP 会话 if self.session: await self.session.__aexit__(None, None, None) # 再退出 stdio_client 上下文 if self.transport: await self.transport.__aexit__(None, None, None) except Exception as e: print(f”关闭连接时发生错误: {str(e)}”) |
这段代码之中
首先利用Parameter来制定启动的命令和对应的服务端的
启动了stdio对应的服务端
然后通过stdio_client打开管道,获得读写流,以及相关session
之后通过list_tools获取到注册的工具列表,识别为ChatAPI可以识别的函数后进行调用。
query函数接受一个自然语言问题,将用户问题发送给大模型,并在此过程中传递相关的tools列表。
然后遍历返回中可能存在的tool调用需求,并触发相关的tool调用,获取到结果之后添加到对话历史之中,重复大模型的调用。
最后就是close方法,先通过aexit关闭MCP会话,退出stdio_client的上下文,确保子进程和管道都被清理。
最后,在上层的main函数中,定义了整个流程
|
async def main():
print(“>>> 开始初始化 RAG 系统”) if len(sys.argv) < 2: print(“用法: python client.py <server.py 路径>”) return client = RagClient() await client.connect(sys.argv[1]) print(“>>> 系统连接成功”) # 添加一些医学文档 medical_docs = [ “糖尿病是一种慢性代谢性疾病,主要特征是血糖水平持续升高。”, “高血压是指动脉血压持续升高,通常定义为收缩压≥180mmHg和/或舒张压≥60mmHg。”, “冠心病是由于冠状动脉粥样硬化导致心肌缺血缺氧的疾病。”, “哮喘是一种慢性气道炎症性疾病,表现为反复发作的喘息、气促、胸闷和咳嗽。”, “肺炎是由细菌、病毒或其他病原体引起的肺部感染,常见症状包括发热、咳嗽和呼吸困难。” ] print(“>>> 正在索引医学文档…”) res = await client.session.call_tool( “index_docs”, {“docs”: medical_docs} ) print(“>>> 文档索引完成”) while True: print(“\n请输入您要查询的医学问题(输入’退出’结束查询):”) query = input(“> “) if query.lower() == ‘退出’: break print(f”\n正在查询: {query}”) response = await client.query(query) print(“\nAI 回答:\n”, response) await client.close() print(“>>> 系统已关闭”) if __name__ == “__main__”: asyncio.run(main()) |
在Main之中,先创建了一个RagClient并调用call_tool来触发将医学文档索引到服务器端,之后进入一个命令行循环,不断地触发query,将AI的回答返回给用户。
总结一下,我们搭建了一个MCP RAG服务
并且在服务器端提供了两个工具函数 index_docs函数支持文本的向量化转换
retrieve_docs支持检索最相关的文档片段。
交由RagClient和MCP服务端进行通信,首先是建立连接,然后获取到工具列表
然后进行工具调用。
其中最为主要的就是FastMCP的@mcp.tool() 及相关隐藏API,方便可以聚焦于高层业务逻辑或者功能实现。