Skip to main content
Open In ColabOpen on GitHub

从 MapReduceDocumentsChain 迁移

MapReduceDocumentsChain 在(可能很长的)文本上实现了一种 map-reduce 策略。该策略如下所示:

  • 将文本拆分为更小的文档;
  • 将流程映射到较小的文档中;
  • 将流程的结果减少或整合为最终结果。

请注意,map 步骤通常会对输入文档进行并行化处理。

在此上下文中常用的流程是摘要生成,其中映射步骤对各个文档进行摘要,而归约步骤则生成这些摘要的汇总摘要。

在缩减步骤中,MapReduceDocumentsChain支持对摘要进行递归“折叠”:输入会根据令牌限制进行分区,并对这些分区生成摘要。此步骤会重复执行,直到所有摘要的总长度满足预期限制,从而实现对任意长度文本的摘要处理。这对于上下文窗口较小的模型特别有用。

LangGraph 支持map-reduce工作流,并为该问题提供了诸多优势:

  • LangGraph 允许将各个步骤(例如连续的摘要生成)进行流式传输,从而实现对执行过程的更精细控制;
  • LangGraph 的 检查点(checkpointing) 支持错误恢复、扩展人机协作工作流,以及更便捷地集成到对话应用中。
  • LangGraph 的实现更易于扩展,如下所示。

下面我们将依次介绍 MapReduceDocumentsChain 和对应的 LangGraph 实现,首先通过一个简单示例进行说明,然后通过一段较长的文本来演示递归归约步骤。

让我们首先加载一个聊天模型:

pip install -qU "langchain[openai]"
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

llm = init_chat_model("gpt-4o-mini", model_provider="openai")

基本示例(短文档)

让我们使用以下3个文档作为说明示例。

from langchain_core.documents import Document

documents = [
Document(page_content="Apples are red", metadata={"title": "apple_book"}),
Document(page_content="Blueberries are blue", metadata={"title": "blueberry_book"}),
Document(page_content="Bananas are yelow", metadata={"title": "banana_book"}),
]
API 参考:文档

遗留

详细信息

下面展示了一个使用 MapReduceDocumentsChain 的实现。我们定义了映射和归约步骤的提示模板,为这些步骤实例化单独的链,最后实例化 MapReduceDocumentsChain

from langchain.chains import MapReduceDocumentsChain, ReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.llm import LLMChain
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import CharacterTextSplitter

# Map
map_template = "Write a concise summary of the following: {docs}."
map_prompt = ChatPromptTemplate([("human", map_template)])
map_chain = LLMChain(llm=llm, prompt=map_prompt)


# Reduce
reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)


# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)

# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=1000,
)

# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
result = map_reduce_chain.invoke(documents)

print(result["output_text"])
Fruits come in a variety of colors, with apples being red, blueberries being blue, and bananas being yellow.

LangSmith 追踪中,我们观察到四次 LLM 调用:分别对三份输入文档进行总结,以及对这些总结结果再次进行总结。

LangGraph

下面我们展示一个 LangGraph 实现,使用与上文相同的提示模板。该图包含一个用于生成摘要的节点,该节点映射到输入文档列表。此节点随后流转到第二个节点以生成最终摘要。

详细信息

我们将需要安装langgraph

%pip install -qU langgraph
import operator
from typing import Annotated, List, TypedDict

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langgraph.constants import Send
from langgraph.graph import END, START, StateGraph

map_template = "Write a concise summary of the following: {context}."

reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""

map_prompt = ChatPromptTemplate([("human", map_template)])
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])

map_chain = map_prompt | llm | StrOutputParser()
reduce_chain = reduce_prompt | llm | StrOutputParser()

# Graph components: define the components that will make up the graph


# This will be the overall state of the main graph.
# It will contain the input document contents, corresponding
# summaries, and a final summary.
class OverallState(TypedDict):
# Notice here we use the operator.add
# This is because we want combine all the summaries we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
contents: List[str]
summaries: Annotated[list, operator.add]
final_summary: str


# This will be the state of the node that we will "map" all
# documents to in order to generate summaries
class SummaryState(TypedDict):
content: str


# Here we generate a summary, given a document
async def generate_summary(state: SummaryState):
response = await map_chain.ainvoke(state["content"])
return {"summaries": [response]}


# Here we define the logic to map out over the documents
# We will use this an edge in the graph
def map_summaries(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [
Send("generate_summary", {"content": content}) for content in state["contents"]
]


# Here we will generate the final summary
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["summaries"])
return {"final_summary": response}


# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary)
graph.add_node("generate_final_summary", generate_final_summary)
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "generate_final_summary")
graph.add_edge("generate_final_summary", END)
app = graph.compile()
from IPython.display import Image

Image(app.get_graph().draw_mermaid_png())

请注意,以流式模式调用图允许我们在执行过程中监控步骤并可能对其采取行动。

# Call the graph:
async for step in app.astream({"contents": [doc.page_content for doc in documents]}):
print(step)
{'generate_summary': {'summaries': ['Apples are typically red in color.']}}
{'generate_summary': {'summaries': ['Bananas are yellow in color.']}}
{'generate_summary': {'summaries': ['Blueberries are a type of fruit that are blue in color.']}}
{'generate_final_summary': {'final_summary': 'The main themes are the colors of different fruits: apples are red, blueberries are blue, and bananas are yellow.'}}

LangSmith 追踪 中,我们恢复了与之前相同的四次 LLM 调用。

总结长文档

Map-reduce 流程在文本长度相对于 LLM 上下文窗口较长时特别有用。MapReduceDocumentsChain 支持对摘要进行递归“压缩”:输入根据令牌限制进行分区,并为每个分区生成摘要。此步骤会重复执行,直到所有摘要的总长度处于期望的限制范围内,从而实现对任意长度文本的摘要处理。

此“折叠”步骤在MapReduceDocumentsChain中实现为一个while循环。我们可以在更长的文本上演示此步骤,例如 Lilian Weng 撰写的LLM 驱动的自主代理博客文章(该文章也出现在RAG 教程及其他文档中)。

首先我们加载帖子并将其拆分为更小的“子文档”:

from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import CharacterTextSplitter

loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
documents = loader.load()

text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=0
)
split_docs = text_splitter.split_documents(documents)
print(f"Generated {len(split_docs)} documents.")
USER_AGENT environment variable not set, consider setting it to identify your requests.
Created a chunk of size 1003, which is longer than the specified 1000
``````output
Generated 14 documents.

遗留

详细信息

我们可以像之前一样调用 MapReduceDocumentsChain

result = map_reduce_chain.invoke(split_docs)

print(result["output_text"])
The article discusses the use of Large Language Models (LLMs) to power autonomous agents in various tasks, showcasing their capabilities in problem-solving beyond generating written content. Key components such as planning, memory optimization, and tool use are explored, with proof-of-concept demos like AutoGPT and GPT-Engineer demonstrating the potential of LLM-powered agents. Challenges include limitations in historical information retention and natural language interface reliability, while the potential of LLMs in enhancing reasoning, problem-solving, and planning proficiency for autonomous agents is highlighted. Overall, the article emphasizes the versatility and power of LLMs in creating intelligent agents for tasks like scientific discovery and experiment design.

考虑上述调用的 LangSmith 追踪。在实例化我们的 ReduceDocumentsChain 时,我们设置了 1,000 个 token 的 token_max。这导致总共进行了 17 次 LLM 调用:

  • 14 次调用用于总结由我们的文本分割器生成的 14 个子文档。
  • 这生成的摘要总计约 1,000 - 2,000 个 token。由于我们将 token_max 设置为 1,000,因此还有两次调用用于对这些摘要进行总结(或“折叠”)。
  • 最后一次调用是生成两个“折叠”摘要的最终总结。

LangGraph

详细信息

我们可以扩展 LangGraph 中原始的 map-reduce 实现,以执行相同的递归折叠步骤。我们进行了以下更改:

  • 向状态中添加一个collapsed_summaries键以存储折叠后的摘要;
  • 更新最终的总结节点以汇总折叠后的摘要;
  • 添加一个 collapse_summaries 节点,该节点根据令牌长度(此处为1,000个令牌,如前所述)将文档列表进行分区,生成每个分区的摘要,并将结果存储在 collapsed_summaries 中。

我们添加一条从 collapse_summaries 到自身的条件边以形成循环:如果合并后的摘要总长度超过 token_max,则重新运行该节点。

from typing import Literal

from langchain.chains.combine_documents.reduce import (
acollapse_docs,
split_list_of_docs,
)


def length_function(documents: List[Document]) -> int:
"""Get number of tokens for input contents."""
return sum(llm.get_num_tokens(doc.page_content) for doc in documents)


token_max = 1000


class OverallState(TypedDict):
contents: List[str]
summaries: Annotated[list, operator.add]
collapsed_summaries: List[Document] # add key for collapsed summaries
final_summary: str


# Add node to store summaries for collapsing
def collect_summaries(state: OverallState):
return {
"collapsed_summaries": [Document(summary) for summary in state["summaries"]]
}


# Modify final summary to read off collapsed summaries
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["collapsed_summaries"])
return {"final_summary": response}


graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary) # same as before
graph.add_node("collect_summaries", collect_summaries)
graph.add_node("generate_final_summary", generate_final_summary)


# Add node to collapse summaries
async def collapse_summaries(state: OverallState):
doc_lists = split_list_of_docs(
state["collapsed_summaries"], length_function, token_max
)
results = []
for doc_list in doc_lists:
results.append(await acollapse_docs(doc_list, reduce_chain.ainvoke))

return {"collapsed_summaries": results}


graph.add_node("collapse_summaries", collapse_summaries)


def should_collapse(
state: OverallState,
) -> Literal["collapse_summaries", "generate_final_summary"]:
num_tokens = length_function(state["collapsed_summaries"])
if num_tokens > token_max:
return "collapse_summaries"
else:
return "generate_final_summary"


graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "collect_summaries")
graph.add_conditional_edges("collect_summaries", should_collapse)
graph.add_conditional_edges("collapse_summaries", should_collapse)
graph.add_edge("generate_final_summary", END)
app = graph.compile()

LangGraph 允许将图结构绘制出来,以帮助可视化其功能:

from IPython.display import Image

Image(app.get_graph().draw_mermaid_png())

与之前一样,我们可以流式传输图以观察其步骤序列。下面,我们将仅打印出步骤的名称。

请注意,由于图中存在循环,指定 recursion_limit 以限制其执行可能会很有帮助。这类似于 ReduceDocumentsChain.token_max,当超过指定的限制时,它将抛出特定错误。

async for step in app.astream(
{"contents": [doc.page_content for doc in split_docs]},
{"recursion_limit": 10},
):
print(list(step.keys()))
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['collect_summaries']
['collapse_summaries']
['generate_final_summary']
print(step)
{'generate_final_summary': {'final_summary': 'The summaries discuss the use of Large Language Models (LLMs) to power autonomous agents in various tasks such as problem-solving, planning, and tool use. Key components like planning, memory, and task decomposition are highlighted, along with challenges such as inefficient planning and hallucination. Techniques like Algorithm Distillation and Maximum Inner Product Search are explored for optimization, while frameworks like ReAct and Reflexion show improvements in knowledge-intensive tasks. The importance of accurate interpretation of user input and well-structured code for functional autonomy is emphasized, along with the potential of LLMs in prompting, reasoning, and emergent social behavior in simulation environments. Challenges in real-world scenarios and the use of LLMs with expert-designed tools for tasks like organic synthesis and drug discovery are also discussed.'}}

在对应的 LangSmith 追踪 中,我们可以看到与之前相同的 17 次 LLM 调用,这次它们被分组到各自的节点下。

下一步

查看 LangGraph 文档 以了解使用 LangGraph 构建的详细信息,包括关于 LangGraph 中映射 - 归约细节的 此指南

查看 此教程 了解更多基于大语言模型的摘要策略。