处理查询分析时的多个查询
有时,查询分析技术可能会允许生成多个查询。在这种情况下,我们需要记住运行所有查询,然后合并结果。我们将展示一个简单示例(使用模拟数据)来说明如何实现这一点。
设置
安装依赖
%pip install -qU langchain langchain-community langchain-openai langchain-chroma
Note: you may need to restart the kernel to use updated packages.
设置环境变量
我们将在本示例中使用 OpenAI:
import getpass
import os
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass()
# Optional, uncomment to trace runs with LangSmith. Sign up here: https://smith.langchain.com.
# os.environ["LANGSMITH_TRACING"] = "true"
# os.environ["LANGSMITH_API_KEY"] = getpass.getpass()
创建索引
我们将基于虚假信息创建一个向量存储。
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
texts = ["Harrison worked at Kensho", "Ankush worked at Facebook"]
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_texts(
texts,
embeddings,
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
查询分析
我们将使用函数调用来结构化输出。我们将让它返回多个查询。
from typing import List, Optional
from pydantic import BaseModel, Field
class Search(BaseModel):
"""Search over a database of job records."""
queries: List[str] = Field(
...,
description="Distinct queries to search for",
)
from langchain_core.output_parsers.openai_tools import PydanticToolsParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
output_parser = PydanticToolsParser(tools=[Search])
system = """You have the ability to issue search queries to get information to help answer user information.
If you need to look up two distinct pieces of information, you are allowed to do that!"""
prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "{question}"),
]
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm = llm.with_structured_output(Search)
query_analyzer = {"question": RunnablePassthrough()} | prompt | structured_llm
我们可以看到这允许创建多个查询
query_analyzer.invoke("where did Harrison Work")
Search(queries=['Harrison Work', 'Harrison employment history'])
query_analyzer.invoke("where did Harrison and ankush Work")
Search(queries=['Harrison work history', 'Ankush work history'])
检索与查询分析
那么我们将如何将其包含在链中呢?如果我们异步调用检索器,这将使事情变得容易得多——这样我们就可以循环处理查询,而不会被响应时间阻塞。
from langchain_core.runnables import chain
API 参考:Chains
@chain
async def custom_chain(question):
response = await query_analyzer.ainvoke(question)
docs = []
for query in response.queries:
new_docs = await retriever.ainvoke(query)
docs.extend(new_docs)
# You probably want to think about reranking or deduplicating documents here
# But that is a separate topic
return docs
await custom_chain.ainvoke("where did Harrison Work")
[Document(page_content='Harrison worked at Kensho'),
Document(page_content='Harrison worked at Kensho')]
await custom_chain.ainvoke("where did Harrison and ankush Work")
[Document(page_content='Harrison worked at Kensho'),
Document(page_content='Ankush worked at Facebook')]