Skip to main content
Open In ColabOpen on GitHub

Apache Cassandra

本页面提供了使用 Apache Cassandra® 作为向量存储的快速入门指南。

Cassandra is a NoSQL, row-oriented, highly scalable and highly available database.Starting with version 5.0, the database ships with vector search capabilities.

注意:除了访问数据库外,运行完整示例还需要一个 OpenAI API 密钥。

设置和通用依赖项

使用该集成需要以下 Python 包。

%pip install --upgrade --quiet langchain-community "cassio>=0.1.4"

注意:根据您的 LangChain 设置,您可能需要安装/升级此演示所需的其他依赖项 (具体而言,需要 datasetsopenaipypdftiktoken 的最新版本,以及 langchain-community)。

import os
from getpass import getpass

from datasets import (
load_dataset,
)
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("OPENAI_API_KEY = ")
embe = OpenAIEmbeddings()

导入向量存储

from langchain_community.vectorstores import Cassandra
API 参考:Cassandra

连接参数

本页展示的向量存储集成不仅适用于 Cassandra,也适用于其他衍生数据库(例如 Astra DB),这些数据库均使用 CQL(Cassandra 查询语言)协议。

DataStax Astra DB is a managed serverless database built on Cassandra, offering the same interface and strengths.

根据您是通过 CQL 连接到 Cassandra 集群还是 Astra DB,在创建向量存储对象时您将提供不同的参数。

连接到 Cassandra 集群

您首先需要创建一个cassandra.cluster.Session对象,如Cassandra 驱动程序文档中所述。具体细节会有所不同(例如网络设置和身份验证),但可能类似于:

from cassandra.cluster import Cluster

cluster = Cluster(["127.0.0.1"])
session = cluster.connect()

您现在可以将会话以及所需的键空间名称设置为全局 CassIO 参数:

import cassio

CASSANDRA_KEYSPACE = input("CASSANDRA_KEYSPACE = ")

cassio.init(session=session, keyspace=CASSANDRA_KEYSPACE)

现在您可以创建向量存储:

vstore = Cassandra(
embedding=embe,
table_name="cassandra_vector_demo",
# session=None, keyspace=None # Uncomment on older versions of LangChain
)

注意:在创建向量存储时,您也可以直接将会话和密钥空间作为参数传递。然而,使用全局 cassio.init 设置非常方便,如果您的应用程序以多种方式使用 Cassandra(例如,用于向量存储、聊天内存和 LLM 响应缓存),因为它允许将凭据和数据库连接管理集中在一处。

通过 CQL 连接到 Astra DB

在这种情况下,您可以使用以下连接参数初始化 CassIO:

  • 数据库 ID,例如 01234567-89ab-cdef-0123-456789abcdef
  • 令牌,例如 AstraCS:6gBhNmsk135....(它必须是“数据库管理员”令牌)
  • 可选的 Keyspace 名称(如果省略,将使用数据库的默认名称)
ASTRA_DB_ID = input("ASTRA_DB_ID = ")
ASTRA_DB_APPLICATION_TOKEN = getpass("ASTRA_DB_APPLICATION_TOKEN = ")

desired_keyspace = input("ASTRA_DB_KEYSPACE (optional, can be left empty) = ")
if desired_keyspace:
ASTRA_DB_KEYSPACE = desired_keyspace
else:
ASTRA_DB_KEYSPACE = None
import cassio

cassio.init(
database_id=ASTRA_DB_ID,
token=ASTRA_DB_APPLICATION_TOKEN,
keyspace=ASTRA_DB_KEYSPACE,
)

现在您可以创建向量存储:

vstore = Cassandra(
embedding=embe,
table_name="cassandra_vector_demo",
# session=None, keyspace=None # Uncomment on older versions of LangChain
)

加载数据集

将源数据集中的每个条目转换为 Document,然后将其写入向量存储:

philo_dataset = load_dataset("datastax/philosopher-quotes")["train"]

docs = []
for entry in philo_dataset:
metadata = {"author": entry["author"]}
doc = Document(page_content=entry["quote"], metadata=metadata)
docs.append(doc)

inserted_ids = vstore.add_documents(docs)
print(f"\nInserted {len(inserted_ids)} documents.")

在上文中,从源数据创建了 metadata 个字典,它们是 Document 的一部分。

添加更多条目,这次使用 add_texts

texts = ["I think, therefore I am.", "To the things themselves!"]
metadatas = [{"author": "descartes"}, {"author": "husserl"}]
ids = ["desc_01", "huss_xy"]

inserted_ids_2 = vstore.add_texts(texts=texts, metadatas=metadatas, ids=ids)
print(f"\nInserted {len(inserted_ids_2)} documents.")

注意:您可能希望通过提高并发级别来加速add_textsadd_documents的执行 - 请查看这些批量操作方法的batch_size参数以获取更多详情。根据网络和客户端机器的规格,您的最佳性能参数选择可能会有所不同。

运行搜索

本节演示元数据过滤并获取相似度分数:

results = vstore.similarity_search("Our life is what we make of it", k=3)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
results_filtered = vstore.similarity_search(
"Our life is what we make of it",
k=3,
filter={"author": "plato"},
)
for res in results_filtered:
print(f"* {res.page_content} [{res.metadata}]")
results = vstore.similarity_search_with_score("Our life is what we make of it", k=3)
for res, score in results:
print(f"* [SIM={score:3f}] {res.page_content} [{res.metadata}]")
results = vstore.max_marginal_relevance_search(
"Our life is what we make of it",
k=3,
filter={"author": "aristotle"},
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")

删除已存储的文档

delete_1 = vstore.delete(inserted_ids[:3])
print(f"all_succeed={delete_1}") # True, all documents deleted
delete_2 = vstore.delete(inserted_ids[2:5])
print(f"some_succeeds={delete_2}") # True, though some IDs were gone already

一个极简的 RAG 链

接下来的单元格将实现一个简单的 RAG 流水线:

  • 下载一个示例 PDF 文件并将其加载到存储中;
  • 使用 LCEL(LangChain 表达式语言)创建一个以向量存储为核心的 RAG 链;
  • 运行问答链。
!curl -L \
"https://github.com/awesome-astra/datasets/blob/main/demo-resources/what-is-philosophy/what-is-philosophy.pdf?raw=true" \
-o "what-is-philosophy.pdf"
pdf_loader = PyPDFLoader("what-is-philosophy.pdf")
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
docs_from_pdf = pdf_loader.load_and_split(text_splitter=splitter)

print(f"Documents from PDF: {len(docs_from_pdf)}.")
inserted_ids_from_pdf = vstore.add_documents(docs_from_pdf)
print(f"Inserted {len(inserted_ids_from_pdf)} documents.")
retriever = vstore.as_retriever(search_kwargs={"k": 3})

philo_template = """
You are a philosopher that draws inspiration from great thinkers of the past
to craft well-thought answers to user questions. Use the provided context as the basis
for your answers and do not make up new reasoning paths - just mix-and-match what you are given.
Your answers must be concise and to the point, and refrain from answering about other topics than philosophy.

CONTEXT:
{context}

QUESTION: {question}

YOUR ANSWER:"""

philo_prompt = ChatPromptTemplate.from_template(philo_template)

llm = ChatOpenAI()

chain = (
{"context": retriever, "question": RunnablePassthrough()}
| philo_prompt
| llm
| StrOutputParser()
)
chain.invoke("How does Russel elaborate on Peirce's idea of the security blanket?")

了解更多,请查看通过 CQL 使用 Astra DB 的完整 RAG 模板此处

清理

以下内容本质上是从 CassIO 检索 Session 对象,并使用它运行一条 CQL DROP TABLE 语句:

您将丢失其中存储的数据。

cassio.config.resolve_session().execute(
f"DROP TABLE {cassio.config.resolve_keyspace()}.cassandra_vector_demo;"
)

了解更多

如需更多信息、扩展的快速入门指南以及其他使用示例,请访问 CassIO 文档,了解更多关于使用 LangChain Cassandra 向量存储的内容。

引用声明

Apache Cassandra, Cassandra and Apache are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.