Skip to main content
Open In ColabOpen on GitHub

Upstash 限流回调

在本指南中,我们将介绍如何使用 UpstashRatelimitHandler 根据请求数量或令牌数量添加速率限制。此处理器使用了 Upstash 的 ratelimit 库,它利用了 Upstash Redis

Upstash限流通过在每次调用limit方法时向Upstash Redis发送HTTP请求来工作。用户的剩余令牌/请求数量会被检查并更新。根据剩余的令牌数量,我们可以停止执行昂贵的操作,例如调用大型语言模型(LLM)或查询向量存储:

response = ratelimit.limit()
if response.allowed:
execute_costly_operation()

UpstashRatelimitHandler 可让您在几分钟内将速率限制逻辑集成到您的链中。

首先,您需要前往 Upstash 控制台 并创建一个 Redis 数据库 (参阅我们的文档)。创建数据库后,您需要设置环境变量:

UPSTASH_REDIS_REST_URL="****"
UPSTASH_REDIS_REST_TOKEN="****"

接下来,你需要使用以下命令安装 Upstash Ratelimit 和 Redis 库:

pip install upstash-ratelimit upstash-redis

你现在可以为你的链添加速率限制了!

请求限速

让我们想象一下,我们希望允许我们的用户每分钟调用我们的链10次。实现这一点非常简单:

# set env variables
import os

os.environ["UPSTASH_REDIS_REST_URL"] = "****"
os.environ["UPSTASH_REDIS_REST_TOKEN"] = "****"

from langchain_community.callbacks import UpstashRatelimitError, UpstashRatelimitHandler
from langchain_core.runnables import RunnableLambda
from upstash_ratelimit import FixedWindow, Ratelimit
from upstash_redis import Redis

# create ratelimit
ratelimit = Ratelimit(
redis=Redis.from_env(),
# 10 requests per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=10, window=60),
)

# create handler
user_id = "user_id" # should be a method which gets the user id
handler = UpstashRatelimitHandler(identifier=user_id, request_ratelimit=ratelimit)

# create mock chain
chain = RunnableLambda(str)

# invoke chain with handler:
try:
result = chain.invoke("Hello world!", config={"callbacks": [handler]})
except UpstashRatelimitError:
print("Handling ratelimit.", UpstashRatelimitError)
Error in UpstashRatelimitHandler.on_chain_start callback: UpstashRatelimitError('Request limit reached!')
``````output
Handling ratelimit. <class 'langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitError'>

请注意,我们将处理器传递给 invoke 方法,而不是在定义链时传递处理器。

对于除 FixedWindow 以外的限速算法,请参阅 upstash-ratelimit 文档

在执行管道中的任何步骤之前,ratelimit 会检查用户是否已超过请求限制。如果是,则抛出 UpstashRatelimitError

按令牌限制速率

另一种选择是根据以下内容对链调用进行速率限制:

  1. 提示中的标记数量
  2. 提示词和大型语言模型完成时的标记数量

这仅在链中包含一个大型语言模型(LLM)时才有效。另一个要求是,你使用的LLM应该在其 LLMOutput 中返回令牌使用情况。

工作原理

处理器将在调用大型语言模型(LLM)之前获取剩余的标记。如果剩余标记数大于 0,则会调用 LLM。否则将引发 UpstashRatelimitError

在调用大型语言模型(LLM)之后,将使用令牌使用信息从用户的剩余令牌中扣除。在此链阶段不会引发错误。

配置

对于第一个配置,只需像这样初始化处理器:

ratelimit = Ratelimit(
redis=Redis.from_env(),
# 1000 tokens per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=1000, window=60),
)

handler = UpstashRatelimitHandler(identifier=user_id, token_ratelimit=ratelimit)

对于第二个配置,以下是初始化处理器的方法:

ratelimit = Ratelimit(
redis=Redis.from_env(),
# 1000 tokens per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=1000, window=60),
)

handler = UpstashRatelimitHandler(
identifier=user_id,
token_ratelimit=ratelimit,
include_output_tokens=True, # set to True
)

您还可以同时根据请求和令牌使用速率限制,只需传递 request_ratelimittoken_ratelimit 参数即可。

以下是一个使用大型语言模型(LLM)的链的例子:

# set env variables
import os

os.environ["UPSTASH_REDIS_REST_URL"] = "****"
os.environ["UPSTASH_REDIS_REST_TOKEN"] = "****"
os.environ["OPENAI_API_KEY"] = "****"

from langchain_community.callbacks import UpstashRatelimitError, UpstashRatelimitHandler
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from upstash_ratelimit import FixedWindow, Ratelimit
from upstash_redis import Redis

# create ratelimit
ratelimit = Ratelimit(
redis=Redis.from_env(),
# 500 tokens per window, where window size is 60 seconds:
limiter=FixedWindow(max_requests=500, window=60),
)

# create handler
user_id = "user_id" # should be a method which gets the user id
handler = UpstashRatelimitHandler(identifier=user_id, token_ratelimit=ratelimit)

# create mock chain
as_str = RunnableLambda(str)
model = ChatOpenAI()

chain = as_str | model

# invoke chain with handler:
try:
result = chain.invoke("Hello world!", config={"callbacks": [handler]})
except UpstashRatelimitError:
print("Handling ratelimit.", UpstashRatelimitError)
Error in UpstashRatelimitHandler.on_llm_start callback: UpstashRatelimitError('Token limit reached!')
``````output
Handling ratelimit. <class 'langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitError'>