Elasticsearch
Elasticsearch is a distributed, RESTful search and analytics engine, capable of performing both vector and lexical search. It is built on top of the Apache Lucene library.
本笔记本展示了如何使用与 Elasticsearch 向量存储相关的功能。
设置
为了使用 Elasticsearch 向量搜索,您必须安装 langchain-elasticsearch 包。
%pip install -qU langchain-elasticsearch
凭据
设置Elasticsearch实例以供使用有两种主要方法:
- Elastic Cloud:Elastic Cloud 是一项托管的 Elasticsearch 服务。注册 免费试用。
要连接一个不需要登录凭据的 Elasticsearch 实例(启动 Docker 实例时启用了安全功能),请将 Elasticsearch URL 和索引名称以及嵌入对象传递给构造函数。
- 本地安装Elasticsearch:通过在本地运行Elasticsearch来开始使用。最简单的方法是使用官方的Elasticsearch Docker镜像。更多信息请参阅Elasticsearch Docker文档。
通过 Docker 运行 Elasticsearch
示例:运行一个单节点的 Elasticsearch 实例,安全功能已禁用。 这不推荐用于生产环境。
%docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.12.1
运行时启用身份验证
在生产环境中,我们建议您启用安全功能。要使用登录凭据进行连接,您可以使用参数 es_api_key 或 es_user 和 es_password。
pip install -qU langchain-openai
import getpass
import os
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
from langchain_elasticsearch import ElasticsearchStore
elastic_vector_search = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="langchain_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)
如何获取默认“elastic”用户的密码?
要获取默认“elastic”用户的 Elastic Cloud 密码:
- 登录 Elastic Cloud 控制台 https://cloud.elastic.co
- 转到“安全”>“用户”
- 定位“elastic”用户并点击“编辑”。
- 点击“重置密码”
- 按照提示重置密码
如何获取API密钥?
获取API密钥:
- 登录 Elastic Cloud 控制台 https://cloud.elastic.co
- 打开 Kibana,进入堆栈管理 > API 密钥。
- 点击“创建API密钥”
- 输入 API 密钥的名称并点击“创建”。
- 复制 API 密钥并将其粘贴到
api_key参数中
Elastic Cloud
要连接到 Elastic Cloud 上的 Elasticsearch 实例,您可以使用 es_cloud_id 参数或 es_url。
elastic_vector_search = ElasticsearchStore(
es_cloud_id="<cloud_id>",
index_name="test_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)
如果您希望获得一流的模型调用自动追踪功能,还可以通过取消注释以下代码来设置您的 LangSmith API 密钥:
# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"
初始化
Elasticsearch 正在本地主机 localhost:9200 上运行,使用 docker。有关如何从 Elastic Cloud 连接到 Elasticsearch 的详细信息,请参阅上方的 通过身份验证连接。
from langchain_elasticsearch import ElasticsearchStore
vector_store = ElasticsearchStore(
"langchain-demo", embedding=embeddings, es_url="http://localhost:9201"
)
管理向量存储
将项目添加到向量存储
from uuid import uuid4
from langchain_core.documents import Document
document_1 = Document(
page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.",
metadata={"source": "tweet"},
)
document_2 = Document(
page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.",
metadata={"source": "news"},
)
document_3 = Document(
page_content="Building an exciting new project with LangChain - come check it out!",
metadata={"source": "tweet"},
)
document_4 = Document(
page_content="Robbers broke into the city bank and stole $1 million in cash.",
metadata={"source": "news"},
)
document_5 = Document(
page_content="Wow! That was an amazing movie. I can't wait to see it again.",
metadata={"source": "tweet"},
)
document_6 = Document(
page_content="Is the new iPhone worth the price? Read this review to find out.",
metadata={"source": "website"},
)
document_7 = Document(
page_content="The top 10 soccer players in the world right now.",
metadata={"source": "website"},
)
document_8 = Document(
page_content="LangGraph is the best framework for building stateful, agentic applications!",
metadata={"source": "tweet"},
)
document_9 = Document(
page_content="The stock market is down 500 points today due to fears of a recession.",
metadata={"source": "news"},
)
document_10 = Document(
page_content="I have a bad feeling I am going to get deleted :(",
metadata={"source": "tweet"},
)
documents = [
document_1,
document_2,
document_3,
document_4,
document_5,
document_6,
document_7,
document_8,
document_9,
document_10,
]
uuids = [str(uuid4()) for _ in range(len(documents))]
vector_store.add_documents(documents=documents, ids=uuids)
['21cca03c-9089-42d2-b41c-3d156be2b519',
'a6ceb967-b552-4802-bb06-c0e95fce386e',
'3a35fac4-e5f0-493b-bee0-9143b41aedae',
'176da099-66b1-4d6a-811b-dfdfe0808d30',
'ecfa1a30-3c97-408b-80c0-5c43d68bf5ff',
'c0f08baa-e70b-4f83-b387-c6e0a0f36f73',
'489b2c9c-1925-43e1-bcf0-0fa94cf1cbc4',
'408c6503-9ba4-49fd-b1cc-95584cd914c5',
'5248c899-16d5-4377-a9e9-736ca443ad4f',
'ca182769-c4fc-4e25-8f0a-8dd0a525955c']
从向量存储中删除项目
vector_store.delete(ids=[uuids[-1]])
True
查询向量存储
一旦你的向量存储创建完成并且相关文档已添加,你很可能希望在链或代理运行时对其进行查询。这些示例还展示了如何在搜索时使用过滤功能。
直接查询
相似性搜索
通过元数据过滤进行简单的相似性搜索可以如下操作:
results = vector_store.similarity_search(
query="LangChain provides abstractions to make working with LLMs easy",
k=2,
filter=[{"term": {"metadata.source.keyword": "tweet"}}],
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
* Building an exciting new project with LangChain - come check it out! [{'source': 'tweet'}]
* LangGraph is the best framework for building stateful, agentic applications! [{'source': 'tweet'}]
带分数的相似性搜索
如果您想执行相似度搜索并获取相应的分数,可以运行:
results = vector_store.similarity_search_with_score(
query="Will it be hot tomorrow",
k=1,
filter=[{"term": {"metadata.source.keyword": "news"}}],
)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.765887] The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees. [{'source': 'news'}]
通过转换为检索器进行查询
您还可以将向量存储转换为检索器,以便在链中更轻松地使用。
retriever = vector_store.as_retriever(
search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.2}
)
retriever.invoke("Stealing from the bank is a crime")
[Document(metadata={'source': 'news'}, page_content='Robbers broke into the city bank and stole $1 million in cash.'),
Document(metadata={'source': 'news'}, page_content='The stock market is down 500 points today due to fears of a recession.'),
Document(metadata={'source': 'website'}, page_content='Is the new iPhone worth the price? Read this review to find out.'),
Document(metadata={'source': 'tweet'}, page_content='Building an exciting new project with LangChain - come check it out!')]
距离相似性算法
Elasticsearch 支持以下向量距离相似性算法:
- 余弦
- 欧几里得
- dot_product
余弦相似度算法是默认的。
你可以通过 similarity 参数指定所需的相似性算法。
注意: 根据检索策略,相似性算法在查询时无法更改。它需要在创建字段的索引映射时设置。如果需要更改相似性算法,需要删除索引并使用正确的距离策略重新创建。
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
distance_strategy="COSINE",
# distance_strategy="EUCLIDEAN_DISTANCE"
# distance_strategy="DOT_PRODUCT"
)
检索策略
Elasticsearch 在仅支持向量的数据库中具有显著优势,因为它能够支持多种检索策略。在本笔记本中,我们将配置 ElasticsearchStore 以支持一些最常见的检索策略。
默认情况下,ElasticsearchStore 使用 DenseVectorStrategy(在 0.2.0 版本之前称为 ApproxRetrievalStrategy)。
DenseVectorStrategy
这将返回与查询向量最相似的前 k 个向量。当初始化 ElasticsearchStore 时,k 参数被设置为 0。默认值为 10。
from langchain_elasticsearch import DenseVectorStrategy
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorStrategy(),
)
docs = db.similarity_search(
query="What did the president say about Ketanji Brown Jackson?", k=10
)
示例:使用密集向量和关键词搜索的混合检索
此示例将展示如何配置 ElasticsearchStore 以执行混合检索,结合近似语义搜索和基于关键词的搜索。
我们使用RRF来平衡来自不同检索方法的两个分数。
为了启用混合检索,我们需要在 DenseVectorStrategy 构造函数中设置 hybrid=True。
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorStrategy(hybrid=True),
)
当启用混合模式时,查询将结合近似语义搜索和基于关键词的搜索。
它将使用RRF(倒数排名融合)来平衡来自不同检索方法的两个分数。
注意:RRF 需要 Elasticsearch 8.9.0 或更高版本。
{
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"bool": {
"filter": [],
"must": [{"match": {"text": {"query": "foo"}}}],
}
},
},
},
{
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, ..., 0.0],
},
},
]
}
}
}
示例:在Elasticsearch中使用嵌入模型进行密集向量搜索
此示例将展示如何配置 ElasticsearchStore 以使用在 Elasticsearch 中部署的嵌入模型进行密集向量检索。
要使用此功能,请通过构造函数中的 query_model_id 参数指定模型 ID。
注意: 这需要模型在Elasticsearch ML节点中部署并运行。请参阅笔记本示例,了解如何使用eland部署模型。
DENSE_SELF_DEPLOYED_INDEX_NAME = "test-dense-self-deployed"
# Note: This does not have an embedding function specified
# Instead, we will use the embedding model deployed in Elasticsearch
db = ElasticsearchStore(
es_cloud_id="<your cloud id>",
es_user="elastic",
es_password="<your password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)
# Setup a Ingest Pipeline to perform the embedding
# of the text field
db.client.ingest.put_pipeline(
id="test_pipeline",
processors=[
{
"inference": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"field_map": {"query_field": "text_field"},
"target_field": "vector_query_field",
}
}
],
)
# creating a new index with the pipeline,
# not relying on langchain to create the index
db.client.indices.create(
index=DENSE_SELF_DEPLOYED_INDEX_NAME,
mappings={
"properties": {
"text_field": {"type": "text"},
"vector_query_field": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "l2_norm",
}
}
},
}
},
settings={"index": {"default_pipeline": "test_pipeline"}},
)
db.from_texts(
["hello world"],
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)
# Perform search
db.similarity_search("hello world", k=10)
稀疏向量策略 (ELSER)
该策略使用 Elasticsearch 的稀疏向量检索来获取前 k 个结果。目前我们仅支持我们自己的“ELSER”嵌入模型。
注意:这需要 ELSER 模型在 Elasticsearch ml 节点中部署并运行。
要使用此功能,请在 ElasticsearchStore 构造函数中指定 SparseVectorStrategy(在 0.2.0 版本之前称为 SparseVectorRetrievalStrategy)。您需要提供一个模型 ID。
from langchain_elasticsearch import SparseVectorStrategy
# Note that this example doesn't have an embedding function. This is because we infer the tokens at index time and at query time within Elasticsearch.
# This requires the ELSER model to be loaded and running in Elasticsearch.
db = ElasticsearchStore.from_documents(
docs,
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name="test-elser",
strategy=SparseVectorStrategy(model_id=".elser_model_2"),
)
db.client.indices.refresh(index="test-elser")
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson", k=4
)
print(results[0])
DenseVectorScriptScoreStrategy
此策略使用 Elasticsearch 的脚本评分查询执行精确向量检索(也称为暴力搜索)以检索前 k 个结果。(在 0.2.0 版本之前,此策略被称为 ExactRetrievalStrategy。)
要使用此功能,请在构造函数中指定 DenseVectorScriptScoreStrategy。
from langchain_elasticsearch import SparseVectorStrategy
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorScriptScoreStrategy(),
)
BM25Strategy
最后,您可以使用全文关键词搜索。
要使用此功能,请在构造函数中指定 BM25Strategy。
from langchain_elasticsearch import BM25Strategy
db = ElasticsearchStore.from_documents(
docs,
es_url="http://localhost:9200",
index_name="test",
strategy=BM25Strategy(),
)
BM25RetrievalStrategy
这种策略允许用户使用纯 BM25 进行搜索,而不使用向量搜索。
要使用此功能,请在构造函数中指定 BM25RetrievalStrategy。
请注意,在下面的示例中,未指定嵌入选项,表示搜索不使用嵌入进行。
from langchain_elasticsearch import ElasticsearchStore
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
strategy=ElasticsearchStore.BM25RetrievalStrategy(),
)
db.add_texts(
["foo", "foo bar", "foo bar baz", "bar", "bar baz", "baz"],
)
results = db.similarity_search(query="foo", k=10)
print(results)
自定义查询
在搜索中使用 custom_query 个参数,您可以调整用于从 Elasticsearch 检索文档的查询。如果您希望使用更复杂的查询,或者支持字段的线性提升,这将非常有用。
# Example of a custom query thats just doing a BM25 search on the text field.
def custom_query(query_body: dict, query: str):
"""Custom query to be used in Elasticsearch.
Args:
query_body (dict): Elasticsearch query body.
query (str): Query string.
Returns:
dict: Elasticsearch query body.
"""
print("Query Retriever created by the retrieval strategy:")
print(query_body)
print()
new_query_body = {"query": {"match": {"text": query}}}
print("Query thats actually used in Elasticsearch:")
print(new_query_body)
print()
return new_query_body
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
custom_query=custom_query,
)
print("Results:")
print(results[0])
自定义文档构建器
在搜索中使用 doc_builder 个参数,您可以调整如何使用从 Elasticsearch 检索的数据构建文档。如果您有未使用 Langchain 创建的索引,这将特别有用。
from typing import Dict
from langchain_core.documents import Document
def custom_document_builder(hit: Dict) -> Document:
src = hit.get("_source", {})
return Document(
page_content=src.get("content", "Missing content!"),
metadata={
"page_number": src.get("page_number", -1),
"original_filename": src.get("original_filename", "Missing filename!"),
},
)
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
doc_builder=custom_document_builder,
)
print("Results:")
print(results[0])
检索增强生成的用法
有关如何使用此向量存储进行检索增强生成 (RAG) 的指南,请参阅以下部分:
FAQ
Question: 我在将文档索引到Elasticsearch时遇到超时错误。如何解决这个问题?
一个可能的问题是你的文档在索引到Elasticsearch时可能会花费更长的时间。ElasticsearchStore使用Elasticsearch的批量API,该API有一些默认设置,你可以调整这些设置以减少超时错误的可能性。
这也是使用 SparseVectorRetrievalStrategy 时的一个好主意。
默认值为:
chunk_size: 500max_chunk_bytes: 100MB
要调整这些设置,您可以将参数 chunk_size 和 max_chunk_bytes 传递给 ElasticsearchStore 的 add_texts 方法。
vector_store.add_texts(
texts,
bulk_kwargs={
"chunk_size": 50,
"max_chunk_bytes": 200000000
}
)
升级到ElasticsearchStore
如果您已经在基于LangChain的项目中使用Elasticsearch,您可能正在使用旧的实现:ElasticVectorSearch 和 ElasticKNNSearch,它们现在已被弃用。我们引入了一个新的实现版本 ElasticsearchStore,它更加灵活且易于使用。本笔记本将指导您完成升级到新实现的过程。
最新动态?
新的实现现在是一个名为 ElasticsearchStore 的类,可以通过不同的策略用于近似密集向量、精确密集向量、稀疏向量(ELSER)、BM25 检索以及混合检索。
我正在使用 ElasticKNNSearch
旧的实现方式:
from langchain_community.vectorstores.elastic_vector_search import ElasticKNNSearch
db = ElasticKNNSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)
新实现:
from langchain_elasticsearch import ElasticsearchStore, DenseVectorStrategy
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
# if you use the model_id
# strategy=DenseVectorStrategy(model_id="test_model")
# if you use hybrid search
# strategy=DenseVectorStrategy(hybrid=True)
)
我正在使用ElasticVectorSearch
旧的实现方式:
from langchain_community.vectorstores.elastic_vector_search import ElasticVectorSearch
db = ElasticVectorSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)
新实现:
from langchain_elasticsearch import ElasticsearchStore, DenseVectorScriptScoreStrategy
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
strategy=DenseVectorScriptScoreStrategy()
)
db.client.indices.delete(
index="test-metadata, test-elser, test-basic",
ignore_unavailable=True,
allow_no_indices=True,
)
API 参考
有关所有 ElasticSearchStore 功能和配置的详细文档,请访问 API 参考: https://python.langchain.com/api_reference/elasticsearch/vectorstores/langchain_elasticsearch.vectorstores.ElasticsearchStore.html