LangGraph如何搭建Agent
对于LangChain,想必诸如并不陌生,之前我们也讲过LangChain相关框架的使用。
LangChain早期是大模型开发框架,其将提示词,模型,工具,RAG等模块进行组合,让开发者可以清晰的组合这些组件,进行应用构建。
但是后续LangChain暴露了问题: 1. 不方便调试,没有暴露中间日志 2. 缺乏并发和分支控制 3. Agent 难以控制
后续就是LangChain团队推出了LCEL这个新的开发语言,基于更加明确的语法组织调用逻辑,将流程变成一个声明式的数据流图,LCEL让开发者像是搭建积木一样构建模块化,清晰可控的流程图。
并后续基于这个开发语言,推出了LLM应用编排框架LangGraph。
利用LCEL定义单个节点的调用逻辑,利用LangGraph来编排多个节点,控制整个工作流的调度。
并进一步,让LangGraph和测试辅助平台LangSmith进行搭配使用,进一步实现错误跟踪和性能分析。
那么我们就首先构建一个最简化版本的LangGraph Agent,用于汇率兑换。
在最初版的Agent之中,我们先不使用AI能力,而是利用硬编码的方式去实现一个Agent。
首先是导入相关的库
| import os
import httpx from typing import TypedDict, Annotated from dotenv import load_dotenv from langchain_core.messages import HumanMessage, AIMessage from langchain_core.tools import tool from langchain_google_genai import ChatGoogleGenerativeAI from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver # 加载环境变量 load_dotenv() |
之后是声明一个类,作为数据存储类,并且在类外定义一个函数作为工具
| class CurrencyState(TypedDict):
“””汇率转换状态””” messages: Annotated[list, “对话消息列表”] exchange_data: Annotated[dict, “汇率数据”] @tool def get_exchange_rate(currency_from: str = ‘USD’, currency_to: str = ‘CNY’) -> dict: “””获取汇率信息 参数: currency_from: 源货币代码 (例如: “USD”, “CNY”) currency_to: 目标货币代码 (例如: “EUR”, “JPY”) 返回: 包含汇率数据的字典 “”” try: response = httpx.get( f’https://api.frankfurter.app/latest’, params={‘from’: currency_from, ‘to’: currency_to}, timeout=10.0 ) response.raise_for_status() return response.json() except Exception as e: return {‘error’: f’获取汇率失败: {e}’} |
之后我们构建Agent
在这个Agent类之中,我们主要定义三个函数,对应的是用户意图识别 process_query,汇率获取get_rate,响应生成respond三部分。
| class SimpleCurrencyAgent:
“””极简汇率Agent””” def __init__(self): “””初始化Agent””” # 配置LLM self.llm = ChatGoogleGenerativeAI( model=’gemini-2.0-flash’, api_key=os.getenv(‘GOOGLE_API_KEY’) ) # 创建图 self.graph = self._create_graph() def _process_query(self, state: CurrencyState) -> CurrencyState: “””处理用户查询””” # 获取用户消息 user_message = state[“messages”][-1].content # 简单的关键词匹配 currencies = { ‘美元’: ‘USD’, ‘人民币’: ‘CNY’, ‘欧元’: ‘EUR’, ‘日元’: ‘JPY’, ‘英镑’: ‘GBP’, ‘澳元’: ‘AUD’, ‘港币’: ‘HKD’, ‘韩元’: ‘KRW’ } # 检查是否包含货币相关关键词 currency_keywords = [‘汇率’, ‘兑’, ‘换’, ‘美元’, ‘人民币’, ‘欧元’, ‘日元’, ‘英镑’, ‘澳元’, ‘港币’, ‘韩元’] is_currency_query = any(keyword in user_message for keyword in currency_keywords) if not is_currency_query: # 非货币查询 state[“exchange_data”] = { “error”: “抱歉,我只能协助货币转换和汇率查询。请询问汇率相关的问题。” } return state # 货币查询处理 found_currencies = [] for cn_name, code in currencies.items(): if cn_name in user_message: found_currencies.append(code) # 设置默认货币对 if len(found_currencies) >= 2: from_currency = found_currencies[0] to_currency = found_currencies[1] elif len(found_currencies) == 1: # 如果只找到一个货币,根据查询内容判断 if ‘兑人民币’ in user_message or ‘换人民币’ in user_message: from_currency = found_currencies[0] to_currency = ‘CNY’ elif ‘人民币兑’ in user_message or ‘人民币换’ in user_message: from_currency = ‘CNY’ to_currency = found_currencies[0] else: # 默认查询该货币兑人民币 from_currency = found_currencies[0] to_currency = ‘CNY’ else: # 没有找到货币,使用默认 from_currency = ‘USD’ to_currency = ‘CNY’ # 避免相同货币的API调用 if from_currency == to_currency: state[“exchange_data”] = { “error”: f”无法查询{from_currency}兑{to_currency}的汇率,因为它们是同一种货币。” } return state state[“exchange_data”] = { “from_currency”: from_currency, “to_currency”: to_currency, “user_query”: user_message } return state def _get_rate(self, state: CurrencyState) -> CurrencyState: “””获取汇率数据””” exchange_data = state[“exchange_data”] # 如果已经有错误信息,直接返回 if “error” in exchange_data: return state # 调用汇率工具 rate_data = get_exchange_rate.invoke({ “currency_from”: exchange_data[“from_currency”], “currency_to”: exchange_data[“to_currency”] }) exchange_data.update(rate_data) return state def _respond(self, state: CurrencyState) -> CurrencyState: “””生成响应””” exchange_data = state[“exchange_data”] if “error” in exchange_data: response_content = f”抱歉,获取汇率时出现错误:{exchange_data[‘error’]}” else: rates = exchange_data.get(‘rates’, {}) base = exchange_data.get(‘base’, ‘EUR’) date = exchange_data.get(‘date’, ‘latest’) if rates: rate_info = [] for currency, rate in rates.items(): rate_info.append(f”{base} -> {currency}: {rate}”) response_content = f”汇率信息 ({date}):\n” + “\n”.join(rate_info) else: response_content = “无法获取汇率信息” # 创建AI响应 ai_message = AIMessage(content=response_content) state[“messages”].append(ai_message) return state |
在代码之中,并没有去实际使用大模型能力,而是通过简单的字符串匹配的方式获取要不要处理汇率问题。
并且最后通过respond函数进行返回
三个函数之间通过CurrencyState作为上下文进行消息传递。
在三个函数下面,则是一个重要的创建图的函数,将上面几个工作用的函数串起来。
| def _create_graph(self):
“””创建LangGraph工作流””” # 创建内存检查点 memory = MemorySaver() # 创建工作流图 workflow = StateGraph(CurrencyState) # 添加节点 workflow.add_node(“process_query”, self._process_query) workflow.add_node(“get_rate”, self._get_rate) workflow.add_node(“respond”, self._respond) # 设置入口点 workflow.set_entry_point(“process_query”) # 添加边 workflow.add_edge(“process_query”, “get_rate”) workflow.add_edge(“get_rate”, “respond”) workflow.add_edge(“respond”, END) # 编译图 return workflow.compile(checkpointer=memory) def process_query(self, query: str, session_id: str = “default”) -> dict: “””处理用户查询””” # 初始化状态 initial_state = { “messages”: [HumanMessage(content=query)], “exchange_data”: {} } # 配置会话 config = {‘configurable’: {‘thread_id’: session_id}} try: # 执行工作流 result = self.graph.invoke(initial_state, config) # 获取最终响应 if result[“messages”]: content = result[“messages”][-1].content else: content = “无法处理请求” # 确定任务状态 is_complete = “error” not in result[“exchange_data”] return { ‘is_task_complete’: is_complete, ‘require_user_input’: not is_complete, ‘content’: content, ‘session_id’: session_id } except Exception as e: return { ‘is_task_complete’: False, ‘require_user_input’: True, ‘content’: f”处理请求时出现错误: {str(e)}”, ‘session_id’: session_id |
其中MemorySaver是使用内存型检查点,来记录每一步的状态数据。
然后StateGraph创建一个有状态图。
之后将三个函数通过add_node添加进去。并利用set_entry_point来设置流程起始点 _process_query,利用add_edge来明确节点之间的转移路径。
最后调用compile函数将其编译为可运行的LangGraph工作流对象。
在上面表格中,除了create_graph函数外,还有一个专门对外暴露的函数 process_query
这个函数和_process_query不一样,这是一个专门对外的函数,方便被调用触发。
invoke对应的LangGraph流程,返回一个结构化的字典,其中包含是否成功。是否需要进一步输出。
到此为止,一个简单的LangGraph智能体就搭建好了。
上面的工作流虽然清晰,但是没有用到任何AI能力,所以我们增加一些小功能,对process_query的设计进行分叉,在小功能内部,利用LLM来进行意图识别。判断要不要进入汇率计算。
首先是架构上的修改
| # 创建工作流图
workflow = StateGraph(CurrencyState) # 添加节点 workflow.add_node(“process_query”, self._process_query) workflow.add_node(“get_rate”, self._get_rate) workflow.add_node(“respond”, self._respond) workflow.add_node(“respond_irrelevant”, self._respond_irrelevant) # 设置入口点 workflow.set_entry_point(“process_query”) # 添加条件边 workflow.add_conditional_edges( “process_query”, self._should_get_rate, { “get_rate”: “get_rate”, “respond_irrelevant”: “respond_irrelevant” } ) # 添加普通边 workflow.add_edge(“get_rate”, “respond”) workflow.add_edge(“respond”, END) workflow.add_edge(“respond_irrelevant”, END) # 编译图 return workflow.compile(checkpointer=memory) |
在其中set_entry_point之后,利用add_conditional_edges来添加了条件边,利用should_get_rate函数来分叉到不同路径。
在这个函数中,可以利用LLM来进行判断
如果判断没有符合意图,则进入_respond_irrelevant函数
| def _respond_irrelevant(self, state: CurrencyState) -> CurrencyState:
“””生成不相关查询的响应””” exchange_data = state[“exchange_data”] response_content = exchange_data.get(“error”, “抱歉,我只能协助货币转换和汇率查询。请询问汇率相关的问题。”) # 创建AI响应 ai_message = AIMessage(content=response_content) state[“messages”].append(ai_message) return state |
通过这样,实现了智能意图识别的功能。
之后是将这个创建好的LangGraph构建的货币转换Agent,集成到了A2A协议中。
首先考虑将LangGraph以ReAct模式进行修改,主要是利用系统级别的提示词来集成ReAct模式。
| class CurrencyAgent:
SYSTEM_INSTRUCTION = ( ‘你是专门进行货币转换的助手。’ “你的唯一目的是使用’get_exchange_rate’工具来回答有关汇率的问题。” ‘如果用户询问除货币转换或汇率之外的任何内容,’ ‘请礼貌地说明你无法帮助该主题,只能协助货币相关的查询。’ ‘不要尝试回答不相关的问题或将工具用于其他目的。’ ‘如果用户需要提供更多信息,请将响应状态设置为input_required。’ ‘如果在处理请求时出现错误,请将响应状态设置为error。’ ‘如果请求完成,请将响应状态设置为completed。’ ) def __init__(self): self.model = ChatGoogleGenerativeAI(model=’gemini-2.0-flash’) self.tools = [get_exchange_rate] # 创建ReAct Agent self.graph = create_react_agent( self.model, tools=self.tools, checkpointer=memory, prompt=self.SYSTEM_INSTRUCTION, response_format=ResponseFormat, ) |
同时也是使用tools来注册外部工具,工具调用是LangGraph Agent的核心能力,通过装饰器来实现
| @tool
def get_exchange_rate( currency_from: str = ‘USD’, currency_to: str = ‘EUR’, currency_date: str = ‘latest’, ): “””使用此工具获取当前汇率。 参数: currency_from: 要转换的货币(例如,”USD”)。 currency_to: 要转换到的货币(例如,”EUR”)。 currency_date: 汇率的日期或”latest”。默认为”latest”。 返回: 包含汇率数据的字典,如果请求失败则返回错误消息。 “”” try: response = httpx.get( f’https://api.frankfurter.app/{currency_date}’, params={‘from’: currency_from, ‘to’: currency_to}, ) response.raise_for_status() data = response.json() if ‘rates’ not in data: return {‘error’: ‘API响应格式无效。’} return data except httpx.HTTPError as e: return {‘error’: f’API请求失败: {e}’} except ValueError: return {‘error’: ‘API返回的JSON响应无效。’} |
在拥有固定的Agent之后,可以在task manager之中增加流式的响应处理。
| async def stream(self, query, sessionId) -> AsyncIterable[dict[str, Any]]:
inputs = {‘messages’: [(‘user’, query)]} config = {‘configurable’: {‘thread_id’: sessionId}} for item in self.graph.stream(inputs, config, stream_mode=’values’): message = item[‘messages’][-1] if ( isinstance(message, AIMessage) and message.tool_calls and len(message.tool_calls) > 0 ): yield { ‘is_task_complete’: False, ‘require_user_input’: False, ‘content’: ‘Looking up the exchange rates…’, } elif isinstance(message, ToolMessage): yield { ‘is_task_complete’: False, ‘require_user_input’: False, ‘content’: ‘Processing the exchange rates..’, } yield self.get_agent_response(config) |
在此过程中,流式处理机制可以识别状态,进行进度反馈,并返回最终结果。
这个流式机制还可以和上层的TaskManager相结合
实现任务状态的记录,更新和通知
| class AgentTaskManager(InMemoryTaskManager):
def __init__( self, agent: CurrencyAgent, notification_sender_auth: PushNotificationSenderAuth, ): super().__init__() self.agent = agent self.notification_sender_auth = notification_sender_auth |
在TaskManager之中,将具体的CurrencyAgnet注入到任务管理器中,便于任务调度器的统一调用,实现通知机制。
在TaskManager之中,也利用相关的机制使用流式回填
| async def _run_streaming_agent(self, request: SendTaskStreamingRequest):
task_send_params: TaskSendParams = request.params query = self._get_user_query(task_send_params) try: async for item in self.agent.stream( query, task_send_params.sessionId ): is_task_complete = item[‘is_task_complete’] require_user_input = item[‘require_user_input’] artifact = None message = None parts = [{‘type’: ‘text’, ‘text’: item[‘content’]}] end_stream = False if not is_task_complete and not require_user_input: task_state = TaskState.WORKING message = Message(role=’agent’, parts=parts) elif require_user_input: task_state = TaskState.INPUT_REQUIRED message = Message(role=’agent’, parts=parts) end_stream = True else: task_state = TaskState.COMPLETED artifact = Artifact(parts=parts, index=0, append=False) end_stream = True task_status = TaskStatus(state=task_state, message=message) latest_task = await self.update_store( task_send_params.id, task_status, None if artifact is None else [artifact], ) await self.send_task_notification(latest_task) if artifact: task_artifact_update_event = TaskArtifactUpdateEvent( id=task_send_params.id, artifact=artifact ) await self.enqueue_events_for_sse( task_send_params.id, task_artifact_update_event ) task_update_event = TaskStatusUpdateEvent( id=task_send_params.id, status=task_status, final=end_stream ) await self.enqueue_events_for_sse( task_send_params.id, task_update_event ) except Exception as e: logger.error(f’An error occurred while streaming the response: {e}’) await self.enqueue_events_for_sse( task_send_params.id, InternalError( message=f’An error occurred while streaming the response: {e}’ ), ) |
最后在主函数之中,通过A2AServer来进行启动,并实现Agent的使用
| server = A2AServer(
agent_card=agent_card, task_manager=AgentTaskManager( agent=CurrencyAgent(), notification_sender_auth=notification_sender_auth, ), host=host, port=port, ) |
通过上述流程,实现了从LangGraph构建到A2A的集成流程