Skip to main content
Open In ColabOpen on GitHub

RunPod 对话模型

开始使用RunPod聊天模型。

概览

本指南介绍了如何使用 LangChain 的 ChatRunPod 类与托管在 RunPod 无服务器 上的聊天模型进行交互。

设置

  1. 安装包:
    pip install -qU langchain-runpod
  2. 部署聊天模型端点: 按照RunPod 提供商指南中的设置步骤,在 RunPod Serverless 上部署兼容的聊天模型端点并获取其端点 ID。
  3. 设置环境变量: 确保已设置 RUNPOD_API_KEYRUNPOD_ENDPOINT_ID(或特定的 RUNPOD_CHAT_ENDPOINT_ID)。
import getpass
import os

# Make sure environment variables are set (or pass them directly to ChatRunPod)
if "RUNPOD_API_KEY" not in os.environ:
os.environ["RUNPOD_API_KEY"] = getpass.getpass("Enter your RunPod API Key: ")

if "RUNPOD_ENDPOINT_ID" not in os.environ:
os.environ["RUNPOD_ENDPOINT_ID"] = input(
"Enter your RunPod Endpoint ID (used if RUNPOD_CHAT_ENDPOINT_ID is not set): "
)

# Optionally use a different endpoint ID specifically for chat models
# if "RUNPOD_CHAT_ENDPOINT_ID" not in os.environ:
# os.environ["RUNPOD_CHAT_ENDPOINT_ID"] = input("Enter your RunPod Chat Endpoint ID (Optional): ")

chat_endpoint_id = os.environ.get(
"RUNPOD_CHAT_ENDPOINT_ID", os.environ.get("RUNPOD_ENDPOINT_ID")
)
if not chat_endpoint_id:
raise ValueError(
"No RunPod Endpoint ID found. Please set RUNPOD_ENDPOINT_ID or RUNPOD_CHAT_ENDPOINT_ID."
)

实例化

初始化 ChatRunPod 类。你可以通过 model_kwargs 传递模型特定的参数,并配置轮询行为。

from langchain_runpod import ChatRunPod

chat = ChatRunPod(
runpod_endpoint_id=chat_endpoint_id, # Specify the correct endpoint ID
model_kwargs={
"max_new_tokens": 512,
"temperature": 0.7,
"top_p": 0.9,
# Add other parameters supported by your endpoint handler
},
# Optional: Adjust polling
# poll_interval=0.2,
# max_polling_attempts=150
)

调用

使用标准的 LangChain .invoke().ainvoke() 方法来调用模型。还支持通过 .stream().astream() 进行流式传输(通过轮询 RunPod /stream 端点模拟)。

from langchain_core.messages import HumanMessage, SystemMessage

messages = [
SystemMessage(content="You are a helpful AI assistant."),
HumanMessage(content="What is the RunPod Serverless API flow?"),
]

# Invoke (Sync)
try:
response = chat.invoke(messages)
print("--- Sync Invoke Response ---")
print(response.content)
except Exception as e:
print(
f"Error invoking Chat Model: {e}. Ensure endpoint ID/API key are correct and endpoint is active/compatible."
)

# Stream (Sync, simulated via polling /stream)
print("\n--- Sync Stream Response ---")
try:
for chunk in chat.stream(messages):
print(chunk.content, end="", flush=True)
print() # Newline
except Exception as e:
print(
f"\nError streaming Chat Model: {e}. Ensure endpoint handler supports streaming output format."
)

### Async Usage

# AInvoke (Async)
try:
async_response = await chat.ainvoke(messages)
print("--- Async Invoke Response ---")
print(async_response.content)
except Exception as e:
print(f"Error invoking Chat Model asynchronously: {e}.")

# AStream (Async)
print("\n--- Async Stream Response ---")
try:
async for chunk in chat.astream(messages):
print(chunk.content, end="", flush=True)
print() # Newline
except Exception as e:
print(
f"\nError streaming Chat Model asynchronously: {e}. Ensure endpoint handler supports streaming output format.\n"
)

链式调用

聊天模型与 LangChain 表达式语言 (LCEL) 链无缝集成。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant."),
("human", "{input}"),
]
)

parser = StrOutputParser()

chain = prompt | chat | parser

try:
chain_response = chain.invoke(
{"input": "Explain the concept of serverless computing in simple terms."}
)
print("--- Chain Response ---")
print(chain_response)
except Exception as e:
print(f"Error running chain: {e}")


# Async chain
try:
async_chain_response = await chain.ainvoke(
{"input": "What are the benefits of using RunPod for AI/ML workloads?"}
)
print("--- Async Chain Response ---")
print(async_chain_response)
except Exception as e:
print(f"Error running async chain: {e}")

模型特性(取决于端点)

高级功能的可用性很大程度上取决于您的RunPod端点处理器的具体实现。ChatRunPod集成提供了基本框架,但处理器必须支持底层功能。

特性集成支持端点依赖?备注
Tool callingRequires handler to process tool definitions and return tool calls (e.g., OpenAI format). Integration needs parsing logic.
Structured outputRequires handler support for forcing structured output (JSON mode, function calling). Integration needs parsing logic.
JSON modeRequires handler to accept a json_mode parameter (or similar) and guarantee JSON output.
Image inputRequires multimodal handler accepting image data (e.g., base64). Integration does not support multimodal messages.
Audio inputRequires handler accepting audio data. Integration does not support audio messages.
Video inputRequires handler accepting video data. Integration does not support video messages.
Token-level streaming✅ (Simulated)Polls /stream. Requires handler to populate stream list in status response with token chunks (e.g., [{"output": "token"}]). True low-latency streaming not built-in.
Native asyncCore ainvoke/astream implemented. Relies on endpoint handler performance.
Token usageRequires handler to return prompt_tokens, completion_tokens in the final response. Integration currently does not parse this.
LogprobsRequires handler to return log probabilities. Integration currently does not parse this.

关键要点:如果端点遵循基本的RunPod API约定,标准聊天调用和模拟流式传输即可正常工作。高级功能需要特定的处理器实现,并可能需要扩展或自定义此集成包。

API 参考

有关 ChatRunPod 类、参数和方法的详细文档,请参阅源代码或生成的API参考(如果可用)。

源代码链接: https://github.com/runpod/langchain-runpod/blob/main/langchain_runpod/chat_models.py