LangGraph如何搭建Agent

对于LangChain,想必诸如并不陌生,之前我们也讲过LangChain相关框架的使用。

LangChain早期是大模型开发框架,其将提示词,模型,工具,RAG等模块进行组合,让开发者可以清晰的组合这些组件,进行应用构建。

但是后续LangChain暴露了问题: 1. 不方便调试,没有暴露中间日志 2. 缺乏并发和分支控制 3. Agent 难以控制

后续就是LangChain团队推出了LCEL这个新的开发语言,基于更加明确的语法组织调用逻辑,将流程变成一个声明式的数据流图,LCEL让开发者像是搭建积木一样构建模块化,清晰可控的流程图。

并后续基于这个开发语言,推出了LLM应用编排框架LangGraph。

利用LCEL定义单个节点的调用逻辑,利用LangGraph来编排多个节点,控制整个工作流的调度。

并进一步,让LangGraph和测试辅助平台LangSmith进行搭配使用,进一步实现错误跟踪和性能分析。

那么我们就首先构建一个最简化版本的LangGraph Agent,用于汇率兑换。

在最初版的Agent之中,我们先不使用AI能力,而是利用硬编码的方式去实现一个Agent。

首先是导入相关的库

import os

import httpx

from typing import TypedDict, Annotated

from dotenv import load_dotenv

from langchain_core.messages import HumanMessage, AIMessage

from langchain_core.tools import tool

from langchain_google_genai import ChatGoogleGenerativeAI

from langgraph.graph import StateGraph, END

from langgraph.checkpoint.memory import MemorySaver

# 加载环境变量

load_dotenv()

之后是声明一个类,作为数据存储类,并且在类外定义一个函数作为工具

class CurrencyState(TypedDict):

“””汇率转换状态”””

messages: Annotated[list, “对话消息列表”]

exchange_data: Annotated[dict, “汇率数据”]

@tool

def get_exchange_rate(currency_from: str = ‘USD’, currency_to: str = ‘CNY’) -> dict:

“””获取汇率信息

参数:

currency_from: 源货币代码 (例如: “USD”, “CNY”)

currency_to: 目标货币代码 (例如: “EUR”, “JPY”)

返回:

包含汇率数据的字典

“””

try:

response = httpx.get(

f’https://api.frankfurter.app/latest’,

params={‘from’: currency_from, ‘to’: currency_to},

timeout=10.0

)

response.raise_for_status()

return response.json()

except Exception as e:

return {‘error’: f’获取汇率失败: {e}’}

之后我们构建Agent

在这个Agent类之中,我们主要定义三个函数,对应的是用户意图识别 process_query,汇率获取get_rate,响应生成respond三部分。

class SimpleCurrencyAgent:

“””极简汇率Agent”””

def __init__(self):

“””初始化Agent”””

# 配置LLM

self.llm = ChatGoogleGenerativeAI(

model=’gemini-2.0-flash’,

api_key=os.getenv(‘GOOGLE_API_KEY’)

)

# 创建图

self.graph = self._create_graph()

def _process_query(self, state: CurrencyState) -> CurrencyState:

“””处理用户查询”””

# 获取用户消息

user_message = state[“messages”][-1].content

# 简单的关键词匹配

currencies = {

‘美元’: ‘USD’, ‘人民币’: ‘CNY’, ‘欧元’: ‘EUR’, ‘日元’: ‘JPY’,

‘英镑’: ‘GBP’, ‘澳元’: ‘AUD’, ‘港币’: ‘HKD’, ‘韩元’: ‘KRW’

}

# 检查是否包含货币相关关键词

currency_keywords = [‘汇率’, ‘兑’, ‘换’, ‘美元’, ‘人民币’, ‘欧元’, ‘日元’, ‘英镑’, ‘澳元’, ‘港币’, ‘韩元’]

is_currency_query = any(keyword in user_message for keyword in currency_keywords)

if not is_currency_query:

# 非货币查询

state[“exchange_data”] = {

“error”: “抱歉,我只能协助货币转换和汇率查询。请询问汇率相关的问题。”

}

return state

# 货币查询处理

found_currencies = []

for cn_name, code in currencies.items():

if cn_name in user_message:

found_currencies.append(code)

# 设置默认货币对

if len(found_currencies) >= 2:

from_currency = found_currencies[0]

to_currency = found_currencies[1]

elif len(found_currencies) == 1:

# 如果只找到一个货币,根据查询内容判断

if ‘兑人民币’ in user_message or ‘换人民币’ in user_message:

from_currency = found_currencies[0]

to_currency = ‘CNY’

elif ‘人民币兑’ in user_message or ‘人民币换’ in user_message:

from_currency = ‘CNY’

to_currency = found_currencies[0]

else:

# 默认查询该货币兑人民币

from_currency = found_currencies[0]

to_currency = ‘CNY’

else:

# 没有找到货币,使用默认

from_currency = ‘USD’

to_currency = ‘CNY’

# 避免相同货币的API调用

if from_currency == to_currency:

state[“exchange_data”] = {

“error”: f”无法查询{from_currency}兑{to_currency}的汇率,因为它们是同一种货币。”

}

return state

state[“exchange_data”] = {

“from_currency”: from_currency,

“to_currency”: to_currency,

“user_query”: user_message

}

return state

def _get_rate(self, state: CurrencyState) -> CurrencyState:

“””获取汇率数据”””

exchange_data = state[“exchange_data”]

# 如果已经有错误信息,直接返回

if “error” in exchange_data:

return state

# 调用汇率工具

rate_data = get_exchange_rate.invoke({

“currency_from”: exchange_data[“from_currency”],

“currency_to”: exchange_data[“to_currency”]

})

exchange_data.update(rate_data)

return state

def _respond(self, state: CurrencyState) -> CurrencyState:

“””生成响应”””

exchange_data = state[“exchange_data”]

if “error” in exchange_data:

response_content = f”抱歉,获取汇率时出现错误:{exchange_data[‘error’]}”

else:

rates = exchange_data.get(‘rates’, {})

base = exchange_data.get(‘base’, ‘EUR’)

date = exchange_data.get(‘date’, ‘latest’)

if rates:

rate_info = []

for currency, rate in rates.items():

rate_info.append(f”{base} -> {currency}: {rate}”)

response_content = f”汇率信息 ({date}):\n” + “\n”.join(rate_info)

else:

response_content = “无法获取汇率信息”

# 创建AI响应

ai_message = AIMessage(content=response_content)

state[“messages”].append(ai_message)

return state

在代码之中,并没有去实际使用大模型能力,而是通过简单的字符串匹配的方式获取要不要处理汇率问题。

并且最后通过respond函数进行返回

三个函数之间通过CurrencyState作为上下文进行消息传递。

在三个函数下面,则是一个重要的创建图的函数,将上面几个工作用的函数串起来。

def _create_graph(self):

“””创建LangGraph工作流”””

# 创建内存检查点

memory = MemorySaver()

# 创建工作流图

workflow = StateGraph(CurrencyState)

# 添加节点

workflow.add_node(“process_query”, self._process_query)

workflow.add_node(“get_rate”, self._get_rate)

workflow.add_node(“respond”, self._respond)

# 设置入口点

workflow.set_entry_point(“process_query”)

# 添加边

workflow.add_edge(“process_query”, “get_rate”)

workflow.add_edge(“get_rate”, “respond”)

workflow.add_edge(“respond”, END)

# 编译图

return workflow.compile(checkpointer=memory)

def process_query(self, query: str, session_id: str = “default”) -> dict:

“””处理用户查询”””

# 初始化状态

initial_state = {

“messages”: [HumanMessage(content=query)],

“exchange_data”: {}

}

# 配置会话

config = {‘configurable’: {‘thread_id’: session_id}}

try:

# 执行工作流

result = self.graph.invoke(initial_state, config)

# 获取最终响应

if result[“messages”]:

content = result[“messages”][-1].content

else:

content = “无法处理请求”

# 确定任务状态

is_complete = “error” not in result[“exchange_data”]

return {

‘is_task_complete’: is_complete,

‘require_user_input’: not is_complete,

‘content’: content,

‘session_id’: session_id

}

except Exception as e:

return {

‘is_task_complete’: False,

‘require_user_input’: True,

‘content’: f”处理请求时出现错误: {str(e)}”,

‘session_id’: session_id

其中MemorySaver是使用内存型检查点,来记录每一步的状态数据。

然后StateGraph创建一个有状态图。

之后将三个函数通过add_node添加进去。并利用set_entry_point来设置流程起始点 _process_query,利用add_edge来明确节点之间的转移路径。

最后调用compile函数将其编译为可运行的LangGraph工作流对象。

在上面表格中,除了create_graph函数外,还有一个专门对外暴露的函数 process_query

这个函数和_process_query不一样,这是一个专门对外的函数,方便被调用触发。

invoke对应的LangGraph流程,返回一个结构化的字典,其中包含是否成功。是否需要进一步输出。

到此为止,一个简单的LangGraph智能体就搭建好了。

上面的工作流虽然清晰,但是没有用到任何AI能力,所以我们增加一些小功能,对process_query的设计进行分叉,在小功能内部,利用LLM来进行意图识别。判断要不要进入汇率计算。

首先是架构上的修改

# 创建工作流图

workflow = StateGraph(CurrencyState)

# 添加节点

workflow.add_node(“process_query”, self._process_query)

workflow.add_node(“get_rate”, self._get_rate)

workflow.add_node(“respond”, self._respond)

workflow.add_node(“respond_irrelevant”, self._respond_irrelevant)

# 设置入口点

workflow.set_entry_point(“process_query”)

# 添加条件边

workflow.add_conditional_edges(

“process_query”,

self._should_get_rate,

{

“get_rate”: “get_rate”,

“respond_irrelevant”: “respond_irrelevant”

}

)

# 添加普通边

workflow.add_edge(“get_rate”, “respond”)

workflow.add_edge(“respond”, END)

workflow.add_edge(“respond_irrelevant”, END)

# 编译图

return workflow.compile(checkpointer=memory)

在其中set_entry_point之后,利用add_conditional_edges来添加了条件边,利用should_get_rate函数来分叉到不同路径。

在这个函数中,可以利用LLM来进行判断

如果判断没有符合意图,则进入_respond_irrelevant函数

def _respond_irrelevant(self, state: CurrencyState) -> CurrencyState:

“””生成不相关查询的响应”””

exchange_data = state[“exchange_data”]

response_content = exchange_data.get(“error”, “抱歉,我只能协助货币转换和汇率查询。请询问汇率相关的问题。”)

# 创建AI响应

ai_message = AIMessage(content=response_content)

state[“messages”].append(ai_message)

return state

通过这样,实现了智能意图识别的功能。

之后是将这个创建好的LangGraph构建的货币转换Agent,集成到了A2A协议中。

首先考虑将LangGraph以ReAct模式进行修改,主要是利用系统级别的提示词来集成ReAct模式。

class CurrencyAgent:

SYSTEM_INSTRUCTION = (

‘你是专门进行货币转换的助手。’

“你的唯一目的是使用’get_exchange_rate’工具来回答有关汇率的问题。”

‘如果用户询问除货币转换或汇率之外的任何内容,’

‘请礼貌地说明你无法帮助该主题,只能协助货币相关的查询。’

‘不要尝试回答不相关的问题或将工具用于其他目的。’

‘如果用户需要提供更多信息,请将响应状态设置为input_required。’

‘如果在处理请求时出现错误,请将响应状态设置为error。’

‘如果请求完成,请将响应状态设置为completed。’

)

def __init__(self):

self.model = ChatGoogleGenerativeAI(model=’gemini-2.0-flash’)

self.tools = [get_exchange_rate]

# 创建ReAct Agent

self.graph = create_react_agent(

self.model,

tools=self.tools,

checkpointer=memory,

prompt=self.SYSTEM_INSTRUCTION,

response_format=ResponseFormat,

)

同时也是使用tools来注册外部工具,工具调用是LangGraph Agent的核心能力,通过装饰器来实现

@tool

def get_exchange_rate(

currency_from: str = ‘USD’,

currency_to: str = ‘EUR’,

currency_date: str = ‘latest’,

):

“””使用此工具获取当前汇率。

参数:

currency_from: 要转换的货币(例如,”USD”)。

currency_to: 要转换到的货币(例如,”EUR”)。

currency_date: 汇率的日期或”latest”。默认为”latest”。

返回:

包含汇率数据的字典,如果请求失败则返回错误消息。

“””

try:

response = httpx.get(

f’https://api.frankfurter.app/{currency_date}’,

params={‘from’: currency_from, ‘to’: currency_to},

)

response.raise_for_status()

data = response.json()

if ‘rates’ not in data:

return {‘error’: ‘API响应格式无效。’}

return data

except httpx.HTTPError as e:

return {‘error’: f’API请求失败: {e}’}

except ValueError:

return {‘error’: ‘API返回的JSON响应无效。’}

在拥有固定的Agent之后,可以在task manager之中增加流式的响应处理。

async def stream(self, query, sessionId) -> AsyncIterable[dict[str, Any]]:

inputs = {‘messages’: [(‘user’, query)]}

config = {‘configurable’: {‘thread_id’: sessionId}}

for item in self.graph.stream(inputs, config, stream_mode=’values’):

message = item[‘messages’][-1]

if (

isinstance(message, AIMessage)

and message.tool_calls

and len(message.tool_calls) > 0

):

yield {

‘is_task_complete’: False,

‘require_user_input’: False,

‘content’: ‘Looking up the exchange rates…’,

}

elif isinstance(message, ToolMessage):

yield {

‘is_task_complete’: False,

‘require_user_input’: False,

‘content’: ‘Processing the exchange rates..’,

}

yield self.get_agent_response(config)

在此过程中,流式处理机制可以识别状态,进行进度反馈,并返回最终结果。

这个流式机制还可以和上层的TaskManager相结合

实现任务状态的记录,更新和通知

class AgentTaskManager(InMemoryTaskManager):

def __init__(

self,

agent: CurrencyAgent,

notification_sender_auth: PushNotificationSenderAuth,

):

super().__init__()

self.agent = agent

self.notification_sender_auth = notification_sender_auth

在TaskManager之中,将具体的CurrencyAgnet注入到任务管理器中,便于任务调度器的统一调用,实现通知机制。

在TaskManager之中,也利用相关的机制使用流式回填

async def _run_streaming_agent(self, request: SendTaskStreamingRequest):

task_send_params: TaskSendParams = request.params

query = self._get_user_query(task_send_params)

try:

async for item in self.agent.stream(

query, task_send_params.sessionId

):

is_task_complete = item[‘is_task_complete’]

require_user_input = item[‘require_user_input’]

artifact = None

message = None

parts = [{‘type’: ‘text’, ‘text’: item[‘content’]}]

end_stream = False

if not is_task_complete and not require_user_input:

task_state = TaskState.WORKING

message = Message(role=’agent’, parts=parts)

elif require_user_input:

task_state = TaskState.INPUT_REQUIRED

message = Message(role=’agent’, parts=parts)

end_stream = True

else:

task_state = TaskState.COMPLETED

artifact = Artifact(parts=parts, index=0, append=False)

end_stream = True

task_status = TaskStatus(state=task_state, message=message)

latest_task = await self.update_store(

task_send_params.id,

task_status,

None if artifact is None else [artifact],

)

await self.send_task_notification(latest_task)

if artifact:

task_artifact_update_event = TaskArtifactUpdateEvent(

id=task_send_params.id, artifact=artifact

)

await self.enqueue_events_for_sse(

task_send_params.id, task_artifact_update_event

)

task_update_event = TaskStatusUpdateEvent(

id=task_send_params.id, status=task_status, final=end_stream

)

await self.enqueue_events_for_sse(

task_send_params.id, task_update_event

)

except Exception as e:

logger.error(f’An error occurred while streaming the response: {e}’)

await self.enqueue_events_for_sse(

task_send_params.id,

InternalError(

message=f’An error occurred while streaming the response: {e}’

),

)

最后在主函数之中,通过A2AServer来进行启动,并实现Agent的使用

server = A2AServer(

agent_card=agent_card,

task_manager=AgentTaskManager(

agent=CurrencyAgent(),

notification_sender_auth=notification_sender_auth,

),

host=host,

port=port,

)

通过上述流程,实现了从LangGraph构建到A2A的集成流程