Skip to main content
Open on GitHub

流式传输

流式传输对于提升基于大型语言模型(LLMs)构建的应用程序的响应能力至关重要。通过逐步显示输出,即使在完整响应准备好之前也能展示部分内容,流式传输显著改善了用户体验(UX),特别是在处理大型语言模型的延迟时。

概览

生成完整的响应通常会导致几秒钟的延迟,这在具有多个模型调用的复杂应用中尤为明显。幸运的是,LLM 是迭代生成响应的,因此可以在生成过程中显示中间结果。通过流式传输这些中间输出,LangChain 使基于 LLM 的应用程序拥有更流畅的用户体验,并在其设计核心提供了内置的流式传输支持。

在本指南中,我们将讨论大语言模型(LLM)应用中的流式处理,并探讨 LangChain 的流式 API 如何促进您应用中各个组件实现实时输出。

在LLM应用中流式传输什么

在涉及大型语言模型(LLM)的应用中,可以流式传输多种类型的数据,以减少感知延迟并提高透明度,从而改善用户体验。这些数据类型包括:

1. 流式输出 LLM

最常用且关键的数据流是 LLM 本身生成的输出。LLM 通常需要时间才能生成完整响应,而通过实时流式传输输出,用户可以看到正在生成的部分结果。这提供了即时反馈,并有助于减少用户的等待时间。

2. 流式管道或工作流程进度

除了流式传输 LLM 输出外,更复杂的流程或管道中流式传输进度也很有用,这能使用户对应用程序的整体进展有直观感受。这可能包括:

  • 在 LangGraph 工作流中: 使用 LangGraph,工作流由代表各个步骤的节点和边组成。此处的流式传输涉及跟踪图状态的变化,因为各个节点会请求更新。这使得能够更细致地监控工作流中当前活跃的节点,从而在工作流推进到不同阶段时提供关于其状态的实时更新。

  • 在 LCEL 管道中:LCEL 管道流式传输更新涉及捕获各个子可运行对象(sub-runnables)的进度。例如,随着管道的不同步骤或组件执行,您可以流式传输当前正在运行的子可运行对象,从而提供对整体管道进度的实时洞察。

流式管道或工作流程的进度对于向用户提供应用程序在执行过程中的清晰视图至关重要。

3. 流式传输自定义数据

在某些情况下,您可能需要流式传输自定义数据,这些数据超出了管道或工作流结构所提供的信息。这种自定义信息会注入到工作流中的特定步骤内,无论该步骤是工具还是 LangGraph 节点。例如,您可以实时流式传输有关工具正在执行的操作的更新,或者展示通过 LangGraph 节点的进度。这种直接从步骤内部发出的细粒度数据,提供了对工作流执行的更详细洞察,在需要更高可见性的复杂过程中尤其有用。

流式API

LangChain 有两个主要的 API 用于实时流式输出。这些 API 由任何实现了 Runnable 接口 的组件支持,包括 LLMs编译后的 LangGraph 图,以及使用 LCEL 生成的任何 Runnable。

  1. sync stream and async astream: 用于流式传输由各个 Runnable(例如聊天模型)生成的输出,或流式传输使用 LangGraph 创建的任意工作流。
  2. 仅异步的 astream_events:使用此 API 访问完全基于 LCEL 构建的 LLM 应用程序的自定义事件和中间输出。注意,在使用 LangGraph 时,虽然该 API 可用,但通常不需要。
注意

此外,还有一个遗留的异步 astream_log API。该 API 不推荐用于新项目,因为它比其他流式 API 更复杂且功能较少。

stream() and astream()

stream() 方法返回一个迭代器,该迭代器在输出产生时同步地生成输出块。您可以使用 for 循环来实时处理每个块。例如,在使用大型语言模型(LLM)时,这允许输出在生成过程中以流式方式逐步输出,从而减少用户的等待时间。

stream()astream() 方法生成的 chunk 类型取决于被流式传输的组件。例如,当从 LLM 进行流式传输时,每个组件都将是 AIMessageChunk;然而,对于其他组件,chunk 的类型可能不同。

The stream() method returns an iterator that yields these chunks as they are produced. For example,

for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)

The 异步版本astream(),工作方式类似,但专为非阻塞工作流设计。您可以在异步代码中使用它来实现相同的实时流式行为。

与聊天模型配合使用

当对聊天模型使用stream()astream()时,输出会以流式方式呈现为AIMessageChunks,由大语言模型(LLM)实时生成。这允许你在生成的过程中逐步展示或处理 LLM 的输出,尤其在交互式应用或界面中非常有用。

与 LangGraph 配合使用

LangGraph 编译的图是 可运行对象,并支持标准的流式 API。

当在 LangGraph 中使用 streamastream 方法时,您可以选择一个或多个 流式模式,从而控制流式输出的类型。可用的流式模式包括:

  • "values": 为每个步骤发射 state 的所有值。
  • "updates": 仅发射每个步骤后由节点返回的节点名称和更新内容。
  • "调试": 为每个步骤发出调试事件。
  • "messages": 发射 LLM 消息 逐词
  • "custom": 发射使用 LangGraph 的 StreamWriter 编写的自定义输出。

有关更多信息,请参阅:

使用 LCEL

如果您使用 LangChain 的表达式语言 (LCEL) 组合多个 Runnables,根据惯例,stream()astream() 方法将流式传输链中最后一步的输出。这允许最终处理结果以增量方式流式传输。LCEL 致力于优化流水线中的流式传输延迟,以便尽可能快地获取来自最后一步的流式传输结果。

astream_events

提示

使用 astream_events API 访问完全基于 LCEL 构建的 LLM 应用的自定义数据和中间输出。

虽然此 API 也可与 LangGraph 配合使用,但在处理 LangGraph 时通常不需要它,因为 streamastream 方法为 LangGraph 图提供了全面的流式传输功能。

对于使用 LCEL 构建的链,.stream() 方法仅流式传输链中最后一步的输出。这对某些应用可能已经足够,但随着您构建包含多个 LLM 调用的更复杂链时,您可能希望将链的中间值与最终输出一起使用。例如,在构建基于文档的聊天应用时,您可能希望将来源与最终生成结果一同返回。

有几种方法可以实现这一点,例如使用回调,或者以某种方式构建您的链,使其像级联的.assign()调用一样将中间值传递给最终结果,但 LangChain 还提供了一个.astream_events()方法,它将回调的灵活性与.stream()的人体工程学相结合。当被调用时,它会返回一个迭代器,该迭代器生成各种类型的事件,您可以根据项目需求对其进行过滤和处理。

这里有一个小示例,仅打印包含流式聊天模型输出的事件:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

你可以大致将其视为回调事件的迭代器(尽管格式有所不同)——并且它几乎可以用于所有 LangChain 组件!

查看 本指南 以获取有关如何使用 .astream_events() 的更详细信息,包括列出可用事件的表格。

将自定义数据写入流

要编写自定义数据到流中,您需要根据正在使用的组件选择以下方法之一:

  1. LangGraph 的 StreamWriter 可用于写入自定义数据,这些数据将在与 LangGraph 配合使用时通过 streamastream API 显示。重要提示:这是 LangGraph 的一项功能,因此在使用纯 LCEL 时不可用。有关更多信息,请参阅 如何流式传输自定义数据
  2. dispatch_events / adispatch_events可用于写入将通过astream_eventsAPI显示的数据。有关如何分派自定义回调事件的更多信息,请参阅如何分派自定义回调事件

"自动流式传输"聊天模型

LangChain 简化了来自 聊天模型 的流式传输,它会在某些情况下自动启用流式模式,即使您没有显式调用流式方法。当您使用非流式的 invoke 方法但仍希望流式传输整个应用程序(包括来自聊天模型的中间结果)时,这一点特别有用。

工作原理

当您调用聊天模型的invoke(或ainvoke)方法时,如果您尝试流式传输整个应用程序,LangChain 将自动切换到流式模式。

在底层,它将使用 invoke(或 ainvoke)调用 stream(或 astream)方法来生成其输出。对于使用该代码的部分而言,调用的结果将完全相同;然而,当聊天模型被流式传输时,LangChain 会在 LangChain 的 回调系统 中负责触发 on_llm_new_token 事件。这些回调事件允许 LangGraph stream/astreamastream_events 实时展示聊天模型的输出。

示例:

def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

异步编程

LangChain 为其许多方法同时提供了同步(sync)和异步(async)版本。异步方法通常以字母"a"为前缀(例如,ainvokeastream)。在编写异步代码时,务必始终使用这些异步方法,以确保非阻塞行为并获得最佳性能。

如果流式数据未能实时出现,请确保您的工作流使用了正确的异步方法。

请查阅 LangChain 指南中的异步编程,以获取有关使用 LangChain 编写异步代码的更多信息。

请查看以下操操作指南,以获取 LangChain 中流式处理的特定示例:

对于编写自定义数据到流,请参阅以下资源: