Skip to main content
Open In ColabOpen on GitHub

如何流式运行可运行对象

前置条件

本指南假设您熟悉以下概念:

流式处理对于使基于大语言模型的应用程序对最终用户具有响应性至关重要。

重要的 LangChain 原语,如 聊天模型输出解析器提示词检索器智能体,都实现了 LangChain 可运行接口

该接口提供了两种通用的流式内容方法:

  1. sync stream 和 async astream:一种流式处理的默认实现,用于从链中流式传输最终输出
  2. async astream_events 和 async astream_log:这两种方式提供了一种流式输出链的中间步骤最终输出的方法。

让我们来看看这两种方法,并尝试理解如何使用它们。

信息

关于 LangChain 中流式处理技术的更高级概述,请参阅概念指南的这一部分

使用 Stream

所有 Runnable 对象都实现了一个名为 stream 的同步方法,以及一个名为 astream 的异步变体。

这些方法旨在以块形式流式传输最终输出,一旦每个块可用即立即返回。

流式传输仅当程序中的所有步骤都知道如何处理输入流时才可能实现;即,一次处理一个输入块,并产生相应的输出块。

此类处理的复杂度可能各不相同,从简单的任务(如输出由大语言模型生成的令牌)到更具挑战性的任务(如在完整的 JSON 结果生成之前流式传输部分 JSON 内容)不等。

探索流式处理的最佳起点是大型语言模型(LLM)应用中最关键的组件——即大型语言模型本身!

大型语言模型和聊天模型

大型语言模型及其聊天变体是 LLM 驱动应用的主要瓶颈。

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这远慢于应用程序给终端用户带来响应感知的~200-300 毫秒阈值。

使应用程序感觉更响应的关键策略是显示中间进度;即,以逐 token的方式流式传输模型的输出。

我们将展示使用聊天模型进行流式传输的示例。请从以下选项中选择一项:

pip install -qU "langchain[openai]"
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

model = init_chat_model("gpt-4o-mini", model_provider="openai")

让我们从同步 stream API 开始:

chunks = []
for chunk in model.stream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|

或者,如果您在异步环境中工作,您可以考虑使用异步 astream API:

chunks = []
async for chunk in model.astream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|

让我们检查其中一个块

chunks[0]
AIMessageChunk(content='The', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

我们收到了一个名为 AIMessageChunk 的内容。这个片段代表 AIMessage 的一部分。

消息块是设计为可累加的——人们只需将它们相加即可获得当前响应的状态!

chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]
AIMessageChunk(content='The sky appears blue during', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

链式操作

几乎所有大语言模型应用都涉及比直接调用语言模型更多的步骤。

让我们使用 LangChain Expression Language (LCEL) 构建一个简单的链,该链结合提示词、模型和解析器,并验证流式传输是否正常工作。

我们将使用 StrOutputParser 来解析模型的输出。这是一个简单的解析器,它从 AIMessageChunk 中提取 content 字段,从而获取模型返回的 token

提示

LCEL 是一种 声明式 方法,通过串联不同的 LangChain 原语来指定一个“程序”。使用 LCEL 创建的链受益于对 streamastream 的自动实现,从而允许流式传输最终输出。事实上,使用 LCEL 创建的链实现了整个标准的 Runnable 接口。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
print(chunk, end="|", flush=True)
Here|'s| a| joke| about| a| par|rot|:|

A man| goes| to| a| pet| shop| to| buy| a| par|rot|.| The| shop| owner| shows| him| two| stunning| pa|rr|ots| with| beautiful| pl|um|age|.|

"|There|'s| a| talking| par|rot| an|d a| non|-|talking| par|rot|,"| the| owner| says|.| "|The| talking| par|rot| costs| $|100|,| an|d the| non|-|talking| par|rot| is| $|20|."|

The| man| says|,| "|I|'ll| take| the| non|-|talking| par|rot| at| $|20|."|

He| pays| an|d leaves| with| the| par|rot|.| As| he|'s| walking| down| the| street|,| the| par|rot| looks| up| at| him| an|d says|,| "|You| know|,| you| really| are| a| stupi|d man|!"|

The| man| is| stun|ne|d an|d looks| at| the| par|rot| in| dis|bel|ief|.| The| par|rot| continues|,| "|Yes|,| you| got| r|ippe|d off| big| time|!| I| can| talk| just| as| well| as| that| other| par|rot|,| an|d you| only| pai|d $|20| |for| me|!"|

请注意,尽管我们在上述链的末尾使用了 parser,我们仍然获得了流式输出。parser 会对每个流式块单独进行处理。许多 LCEL 原语 也支持这种转换风格的透传流式处理,这在构建应用时非常便捷。

自定义函数可以设计为返回生成器,这些生成器能够处理流式数据。

某些可运行组件,如 提示模板聊天模型,无法处理单独的块,而是会聚合所有之前的步骤。这类可运行组件可能会中断流式处理过程。

注意

LangChain 表达式语言允许您将链的构建与其使用方式(例如:同步/异步、批量/流式等)分离。如果这与您的构建目标无关,您也可以依赖标准的命令式编程方法,通过分别对每个组件调用 invokebatchstream,将结果赋值给变量,然后在下游按需使用它们。

处理输入流

如果您希望将生成的输出以流式方式传输为 JSON,该怎么办?

如果您依赖 json.loads 来解析部分 JSON,解析将会失败,因为该部分 JSON 不是有效的 JSON。

你很可能完全不知所措,并声称无法流式传输 JSON。

好吧,事实证明有一种方法可以做到这一点——解析器需要在输入流上运行,并尝试将部分 JSON“自动补全”为有效状态。

让我们通过一个解析器的实际运行来理解这其中的含义。

from langchain_core.output_parsers import JsonOutputParser

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, flush=True)
API 参考:JsonOutputParser
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 67413}]}
{'countries': [{'name': 'France', 'population': 67413000}]}
{'countries': [{'name': 'France', 'population': 67413000}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584000}]}

现在,让我们来演示流式处理。我们将使用之前的示例,并在末尾添加一个提取函数,该函数从最终化的 JSON 中提取国家名称。

警告

链中任何操作于已最终化的输入而非输入流的步骤,都可能通过streamastream破坏流式功能。

提示

稍后,我们将讨论 astream_events API,该 API 会流式传输来自中间步骤的结果。即使链包含仅操作已确认输入的步骤,此 API 也会流式传输来自中间步骤的结果。

from langchain_core.output_parsers import (
JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, end="|", flush=True)
API 参考:JsonOutputParser
['France', 'Spain', 'Japan']|

生成器函数

让我们使用一个可以在输入流上操作的生成器函数来修复流式处理问题。

提示

一个生成器函数(使用yield的函数)允许编写操作输入流的代码

from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()

async for input in input_stream:
if not isinstance(input, dict):
continue

if "countries" not in input:
continue

countries = input["countries"]

if not isinstance(countries, list):
continue

for country in countries:
name = country.get("name")
if not name:
continue
if name not in country_names_so_far:
yield name
country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(text, end="|", flush=True)
API 参考:JsonOutputParser
France|Spain|Japan|
注意

由于上述代码依赖于 JSON 自动补全,您可能会看到国家的部分名称(例如 SpSpain),但这并不是提取结果所期望的!

我们专注于流式处理概念,而不一定是链的结果。

非流式组件

一些内置组件(如检索器)不提供任何 streaming。如果我们尝试 stream 它们会发生什么?🤨

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(page_content='harrison worked at kensho'),
Document(page_content='harrison likes spicy food')]]

流刚刚从该组件生成了最终结果。

这没问题 🥹!并非所有组件都必须实现流式传输——在某些情况下,流式传输既非必要,也难以实现或根本不合理。

提示

使用非流式组件构建的 LCEL 链,在许多情况下仍然能够流式传输,部分输出的流式传输将从链中最后一个非流式步骤之后开始。

retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
Base|d on| the| given| context|,| Harrison| worke|d at| K|ens|ho|.|

Here| are| |3| |made| up| sentences| about| this| place|:|

1|.| K|ens|ho| was| a| cutting|-|edge| technology| company| known| for| its| innovative| solutions| in| artificial| intelligence| an|d data| analytics|.|

2|.| The| modern| office| space| at| K|ens|ho| feature|d open| floor| plans|,| collaborative| work|sp|aces|,| an|d a| vib|rant| atmosphere| that| fos|tere|d creativity| an|d team|work|.|

3|.| With| its| prime| location| in| the| heart| of| the| city|,| K|ens|ho| attracte|d top| talent| from| aroun|d the| worl|d,| creating| a| diverse| an|d dynamic| work| environment|.|

现在我们已经了解了streamastream的工作原理,让我们进入流式事件的世界吧。🏞️

使用流事件

Event Streaming 是一个beta API。此 API 可能会根据反馈进行一些调整。

注意

本指南演示了 V2 API,需要 langchain-core >= 0.2。对于与旧版 LangChain 兼容的 V1 API,请参见 此处

import langchain_core

langchain_core.__version__

为了使 astream_events API 正常工作:

  • 在代码中尽可能全程使用 async(例如异步工具等)
  • 定义自定义函数/可运行对象时传播回调
  • 每当使用不带 LCEL 的可运行对象时,请确保调用 .astream() 而不是 .ainvoke 来强制大语言模型流式输出 token。
  • 如果您发现任何内容未按预期工作,请告诉我们!:)

事件参考

下面是参考表,显示了各种 Runnable 对象可能发出的一些事件。

注意

当正确实现流式传输时,可运行对象的输入在输入流被完全消耗之前是未知的。这意味着inputs通常仅包含在end事件中,而不是start事件中。

事件名称分块输入输出
on_chat_model_start[model name]{"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_stream[model name]AIMessageChunk(content="hello")
on_chat_model_end[model name]{"messages": [[SystemMessage, HumanMessage]]}AIMessageChunk(content="hello world")
on_llm_start[model name]{'input': 'hello'}
on_llm_stream[model name]'Hello'
on_llm_end[model name]'Hello human!'
on_chain_startformat_docs
on_chain_streamformat_docs"hello world!, goodbye world!"
on_chain_endformat_docs[Document(...)]"hello world!, goodbye world!"
on_tool_startsome_tool{"x": 1, "y": "2"}
on_tool_endsome_tool{"x": 1, "y": "2"}
on_retriever_start[retriever name]{"query": "hello"}
on_retriever_end[retriever name]{"query": "hello"}[Document(...), ..]
on_prompt_start[template_name]{"question": "hello"}
on_prompt_end[template_name]{"question": "hello"}ChatPromptValue(messages: [SystemMessage, ...])

聊天模型

让我们从查看聊天模型产生的事件开始。

events = []
async for event in model.astream_events("hello"):
events.append(event)
注意

对于 langchain-core<0.3.37,请显式设置 version 关键字参数(例如:model.astream_events("hello", version="v2"))。

让我们来看看一些开始事件和一些结束事件。

events[:3]
[{'event': 'on_chat_model_start',
'data': {'input': 'hello'},
'name': 'ChatAnthropic',
'tags': [],
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 8, 'output_tokens': 4, 'total_tokens': 12, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='Hello! How can', additional_kwargs={}, response_metadata={}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66')},
'parent_ids': []}]
events[-2:]
[{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 0, 'output_tokens': 12, 'total_tokens': 12, 'input_token_details': {}})},
'parent_ids': []},
{'event': 'on_chat_model_end',
'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 8, 'output_tokens': 16, 'total_tokens': 24, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': []}]

Chains

让我们重新审视一下解析流式 JSON 的示例链,以探索流式事件 API。

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
event
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
)
]

如果您查看前几个事件,您会注意到有 3 个不同的启动事件,而不是 2 个启动事件。

这三个启动事件对应:

  1. 链(模型 + 解析器)
  2. 模型
  3. 解析器
events[:3]
[{'event': 'on_chain_start',
'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
'name': 'RunnableSequence',
'tags': [],
'run_id': '4765006b-16e2-4b1d-a523-edd9fd64cb92',
'metadata': {}},
{'event': 'on_chat_model_start',
'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}},
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'metadata': {}},
{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='{', id='run-0320c234-7b52-4a14-ae4e-5f100949e589')},
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'metadata': {}}]

如果你查看最近3个事件,你会看到什么?那中间的呢?

让我们使用此 API 输出来自模型和解析器的流事件。我们忽略开始事件、结束事件以及来自链的事件。

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ''
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n "countries'
Chat model chunk: '": [\n '
Parser chunk: {'countries': []}
Chat model chunk: '{\n "'
Parser chunk: {'countries': [{}]}
Chat model chunk: 'name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n "'
Chat model chunk: 'population": 67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
Chat model chunk: '413'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413}]}
Chat model chunk: '000\n },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}]}
Chat model chunk: '\n {'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {}]}
Chat model chunk: '\n "name":'
...

由于模型和解析器都支持流式传输,我们可以实时看到来自这两个组件的流式事件!是不是很酷?🦜

过滤事件

由于该 API 会产生大量事件,因此能够按事件进行过滤非常有用。

您可以按组件 name、组件 tags 或组件 type 进行筛选。

按名称

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
include_names=["my_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'metadata': {}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
...

按类型

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='": [\n ', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{\n "', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='name": "France', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='",\n "', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='population": 67', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='413', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='000\n },', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
...

按标签

注意

标签会被给定可运行对象的子组件继承。

如果您正在使用标签进行过滤,请确保这是您想要的。

chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_tags=["my_chain"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'metadata': {}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_stream', 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chain_stream', 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': []}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='": [\n ', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_stream', 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chain_stream', 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': []}
...

非流式组件

还记得有些组件无法很好地流式传输,因为它们不操作输入流吗?

虽然当使用 astream 时,此类组件可能会中断最终输出的流式传输,但使用 astream_events 仍会生成来自支持流式传输的中间步骤的流式事件!

# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = (
model | JsonOutputParser() | _extract_country_names
) # This parser only works with OpenAI right now

正如预期的那样,astream API 无法正常工作,因为 _extract_country_names 不支持流式处理。

async for chunk in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(chunk, flush=True)
['France', 'Spain', 'Japan']

现在,让我们确认一下使用 astream_events 时,我们是否仍然能看到来自模型和解析器的流式输出。

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ''
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n "countries'
Chat model chunk: '": [\n '
Parser chunk: {'countries': []}
Chat model chunk: '{\n "'
Parser chunk: {'countries': [{}]}
Chat model chunk: 'name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n "'
Chat model chunk: 'population": 67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
Chat model chunk: '413'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413}]}
Chat model chunk: '000\n },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}]}
Chat model chunk: '\n {'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {}]}
Chat model chunk: '\n "name":'
Chat model chunk: ' "Spain",'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
Chat model chunk: '\n "population":'
Chat model chunk: ' 47'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
Chat model chunk: '351'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
...

传播回调

注意

如果您在工具中调用可运行对象(runnables),则需要将回调传播给该可运行对象;否则,将不会生成任何流事件。

注意

当使用 RunnableLambdas@chain 装饰器时,回调会自动在幕后传播。

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
"""Custom tool that doesn't propagate callbacks."""
return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello"):
print(event)
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'name': 'bad_tool', 'tags': [], 'metadata': {}}

这是一个正确传播回调的重新实现。你会发现,现在我们也从 reverse_word runnable 中获取事件了。

@tool
def correct_tool(word: str, callbacks):
"""A tool that correctly propagates callbacks."""
return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello"):
print(event)
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'name': 'correct_tool', 'tags': [], 'metadata': {}}

如果您从 Runnable Lambdas 或 @chains 内调用可运行对象,则回调将自动为您传递。

from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
print(event)
API 参考:可运行Lambda
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

并且使用 @chain 装饰器:

from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
print(event)
API 参考:Chains
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

下一步

现在您已经学习了一些使用 LangChain 流式输出最终结果和内部步骤的方法。

要了解更多信息,请查看本节中的其他操操作指南,或LangChain表达式语言的概念指南