Apache Cassandra
本页面提供了使用 Apache Cassandra® 作为向量存储的快速入门指南。
Cassandra is a NoSQL, row-oriented, highly scalable and highly available database.Starting with version 5.0, the database ships with vector search capabilities.
注意:除了访问数据库外,运行完整示例还需要一个 OpenAI API 密钥。
设置和通用依赖项
使用该集成需要以下 Python 包。
%pip install --upgrade --quiet langchain-community "cassio>=0.1.4"
注意:根据您的 LangChain 设置,您可能需要安装/升级此演示所需的其他依赖项
(具体而言,需要 datasets、openai、pypdf 和 tiktoken 的最新版本,以及 langchain-community)。
import os
from getpass import getpass
from datasets import (
load_dataset,
)
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("OPENAI_API_KEY = ")
embe = OpenAIEmbeddings()
导入向量存储
from langchain_community.vectorstores import Cassandra
连接参数
本页展示的向量存储集成不仅适用于 Cassandra,也适用于其他衍生数据库(例如 Astra DB),这些数据库均使用 CQL(Cassandra 查询语言)协议。
DataStax Astra DB is a managed serverless database built on Cassandra, offering the same interface and strengths.
根据您是通过 CQL 连接到 Cassandra 集群还是 Astra DB,在创建向量存储对象时您将提供不同的参数。
连接到 Cassandra 集群
您首先需要创建一个cassandra.cluster.Session对象,如Cassandra 驱动程序文档中所述。具体细节会有所不同(例如网络设置和身份验证),但可能类似于:
from cassandra.cluster import Cluster
cluster = Cluster(["127.0.0.1"])
session = cluster.connect()
您现在可以将会话以及所需的键空间名称设置为全局 CassIO 参数:
import cassio
CASSANDRA_KEYSPACE = input("CASSANDRA_KEYSPACE = ")
cassio.init(session=session, keyspace=CASSANDRA_KEYSPACE)
现在您可以创建向量存储:
vstore = Cassandra(
embedding=embe,
table_name="cassandra_vector_demo",
# session=None, keyspace=None # Uncomment on older versions of LangChain
)
注意:在创建向量存储时,您也可以直接将会话和密钥空间作为参数传递。然而,使用全局 cassio.init 设置非常方便,如果您的应用程序以多种方式使用 Cassandra(例如,用于向量存储、聊天内存和 LLM 响应缓存),因为它允许将凭据和数据库连接管理集中在一处。
通过 CQL 连接到 Astra DB
在这种情况下,您可以使用以下连接参数初始化 CassIO:
- 数据库 ID,例如
01234567-89ab-cdef-0123-456789abcdef - 令牌,例如
AstraCS:6gBhNmsk135....(它必须是“数据库管理员”令牌) - 可选的 Keyspace 名称(如果省略,将使用数据库的默认名称)
ASTRA_DB_ID = input("ASTRA_DB_ID = ")
ASTRA_DB_APPLICATION_TOKEN = getpass("ASTRA_DB_APPLICATION_TOKEN = ")
desired_keyspace = input("ASTRA_DB_KEYSPACE (optional, can be left empty) = ")
if desired_keyspace:
ASTRA_DB_KEYSPACE = desired_keyspace
else:
ASTRA_DB_KEYSPACE = None
import cassio
cassio.init(
database_id=ASTRA_DB_ID,
token=ASTRA_DB_APPLICATION_TOKEN,
keyspace=ASTRA_DB_KEYSPACE,
)
现在您可以创建向量存储:
vstore = Cassandra(
embedding=embe,
table_name="cassandra_vector_demo",
# session=None, keyspace=None # Uncomment on older versions of LangChain
)
加载数据集
将源数据集中的每个条目转换为 Document,然后将其写入向量存储:
philo_dataset = load_dataset("datastax/philosopher-quotes")["train"]
docs = []
for entry in philo_dataset:
metadata = {"author": entry["author"]}
doc = Document(page_content=entry["quote"], metadata=metadata)
docs.append(doc)
inserted_ids = vstore.add_documents(docs)
print(f"\nInserted {len(inserted_ids)} documents.")
在上文中,从源数据创建了 metadata 个字典,它们是 Document 的一部分。
添加更多条目,这次使用 add_texts:
texts = ["I think, therefore I am.", "To the things themselves!"]
metadatas = [{"author": "descartes"}, {"author": "husserl"}]
ids = ["desc_01", "huss_xy"]
inserted_ids_2 = vstore.add_texts(texts=texts, metadatas=metadatas, ids=ids)
print(f"\nInserted {len(inserted_ids_2)} documents.")
注意:您可能希望通过提高并发级别来加速add_texts和add_documents的执行 - 请查看这些批量操作方法的batch_size参数以获取更多详情。根据网络和客户端机器的规格,您的最佳性能参数选择可能会有所不同。
运行搜索
本节演示元数据过滤并获取相似度分数:
results = vstore.similarity_search("Our life is what we make of it", k=3)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
results_filtered = vstore.similarity_search(
"Our life is what we make of it",
k=3,
filter={"author": "plato"},
)
for res in results_filtered:
print(f"* {res.page_content} [{res.metadata}]")
results = vstore.similarity_search_with_score("Our life is what we make of it", k=3)
for res, score in results:
print(f"* [SIM={score:3f}] {res.page_content} [{res.metadata}]")
MMR(最大边际相关性)搜索
results = vstore.max_marginal_relevance_search(
"Our life is what we make of it",
k=3,
filter={"author": "aristotle"},
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
删除已存储的文档
delete_1 = vstore.delete(inserted_ids[:3])
print(f"all_succeed={delete_1}") # True, all documents deleted
delete_2 = vstore.delete(inserted_ids[2:5])
print(f"some_succeeds={delete_2}") # True, though some IDs were gone already
一个极简的 RAG 链
接下来的单元格将实现一个简单的 RAG 流水线:
- 下载一个示例 PDF 文件并将其加载到存储中;
- 使用 LCEL(LangChain 表达式语言)创建一个以向量存储为核心的 RAG 链;
- 运行问答链。
!curl -L \
"https://github.com/awesome-astra/datasets/blob/main/demo-resources/what-is-philosophy/what-is-philosophy.pdf?raw=true" \
-o "what-is-philosophy.pdf"
pdf_loader = PyPDFLoader("what-is-philosophy.pdf")
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
docs_from_pdf = pdf_loader.load_and_split(text_splitter=splitter)
print(f"Documents from PDF: {len(docs_from_pdf)}.")
inserted_ids_from_pdf = vstore.add_documents(docs_from_pdf)
print(f"Inserted {len(inserted_ids_from_pdf)} documents.")
retriever = vstore.as_retriever(search_kwargs={"k": 3})
philo_template = """
You are a philosopher that draws inspiration from great thinkers of the past
to craft well-thought answers to user questions. Use the provided context as the basis
for your answers and do not make up new reasoning paths - just mix-and-match what you are given.
Your answers must be concise and to the point, and refrain from answering about other topics than philosophy.
CONTEXT:
{context}
QUESTION: {question}
YOUR ANSWER:"""
philo_prompt = ChatPromptTemplate.from_template(philo_template)
llm = ChatOpenAI()
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| philo_prompt
| llm
| StrOutputParser()
)
chain.invoke("How does Russel elaborate on Peirce's idea of the security blanket?")
了解更多,请查看通过 CQL 使用 Astra DB 的完整 RAG 模板此处。
清理
以下内容本质上是从 CassIO 检索 Session 对象,并使用它运行一条 CQL DROP TABLE 语句:
您将丢失其中存储的数据。
cassio.config.resolve_session().execute(
f"DROP TABLE {cassio.config.resolve_keyspace()}.cassandra_vector_demo;"
)
了解更多
如需更多信息、扩展的快速入门指南以及其他使用示例,请访问 CassIO 文档,了解更多关于使用 LangChain Cassandra 向量存储的内容。
引用声明
Apache Cassandra, Cassandra and Apache are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.