RunPod 对话模型
开始使用RunPod聊天模型。
概览
本指南介绍了如何使用 LangChain 的 ChatRunPod 类与托管在 RunPod 无服务器 上的聊天模型进行交互。
设置
- 安装包:
pip install -qU langchain-runpod - 部署聊天模型端点: 按照RunPod 提供商指南中的设置步骤,在 RunPod Serverless 上部署兼容的聊天模型端点并获取其端点 ID。
- 设置环境变量: 确保已设置
RUNPOD_API_KEY和RUNPOD_ENDPOINT_ID(或特定的RUNPOD_CHAT_ENDPOINT_ID)。
import getpass
import os
# Make sure environment variables are set (or pass them directly to ChatRunPod)
if "RUNPOD_API_KEY" not in os.environ:
os.environ["RUNPOD_API_KEY"] = getpass.getpass("Enter your RunPod API Key: ")
if "RUNPOD_ENDPOINT_ID" not in os.environ:
os.environ["RUNPOD_ENDPOINT_ID"] = input(
"Enter your RunPod Endpoint ID (used if RUNPOD_CHAT_ENDPOINT_ID is not set): "
)
# Optionally use a different endpoint ID specifically for chat models
# if "RUNPOD_CHAT_ENDPOINT_ID" not in os.environ:
# os.environ["RUNPOD_CHAT_ENDPOINT_ID"] = input("Enter your RunPod Chat Endpoint ID (Optional): ")
chat_endpoint_id = os.environ.get(
"RUNPOD_CHAT_ENDPOINT_ID", os.environ.get("RUNPOD_ENDPOINT_ID")
)
if not chat_endpoint_id:
raise ValueError(
"No RunPod Endpoint ID found. Please set RUNPOD_ENDPOINT_ID or RUNPOD_CHAT_ENDPOINT_ID."
)
实例化
初始化 ChatRunPod 类。你可以通过 model_kwargs 传递模型特定的参数,并配置轮询行为。
from langchain_runpod import ChatRunPod
chat = ChatRunPod(
runpod_endpoint_id=chat_endpoint_id, # Specify the correct endpoint ID
model_kwargs={
"max_new_tokens": 512,
"temperature": 0.7,
"top_p": 0.9,
# Add other parameters supported by your endpoint handler
},
# Optional: Adjust polling
# poll_interval=0.2,
# max_polling_attempts=150
)
调用
使用标准的 LangChain .invoke() 和 .ainvoke() 方法来调用模型。还支持通过 .stream() 和 .astream() 进行流式传输(通过轮询 RunPod /stream 端点模拟)。
from langchain_core.messages import HumanMessage, SystemMessage
messages = [
SystemMessage(content="You are a helpful AI assistant."),
HumanMessage(content="What is the RunPod Serverless API flow?"),
]
# Invoke (Sync)
try:
response = chat.invoke(messages)
print("--- Sync Invoke Response ---")
print(response.content)
except Exception as e:
print(
f"Error invoking Chat Model: {e}. Ensure endpoint ID/API key are correct and endpoint is active/compatible."
)
# Stream (Sync, simulated via polling /stream)
print("\n--- Sync Stream Response ---")
try:
for chunk in chat.stream(messages):
print(chunk.content, end="", flush=True)
print() # Newline
except Exception as e:
print(
f"\nError streaming Chat Model: {e}. Ensure endpoint handler supports streaming output format."
)
### Async Usage
# AInvoke (Async)
try:
async_response = await chat.ainvoke(messages)
print("--- Async Invoke Response ---")
print(async_response.content)
except Exception as e:
print(f"Error invoking Chat Model asynchronously: {e}.")
# AStream (Async)
print("\n--- Async Stream Response ---")
try:
async for chunk in chat.astream(messages):
print(chunk.content, end="", flush=True)
print() # Newline
except Exception as e:
print(
f"\nError streaming Chat Model asynchronously: {e}. Ensure endpoint handler supports streaming output format.\n"
)
API 参考:HumanMessage | SystemMessage
链式调用
聊天模型与 LangChain 表达式语言 (LCEL) 链无缝集成。
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant."),
("human", "{input}"),
]
)
parser = StrOutputParser()
chain = prompt | chat | parser
try:
chain_response = chain.invoke(
{"input": "Explain the concept of serverless computing in simple terms."}
)
print("--- Chain Response ---")
print(chain_response)
except Exception as e:
print(f"Error running chain: {e}")
# Async chain
try:
async_chain_response = await chain.ainvoke(
{"input": "What are the benefits of using RunPod for AI/ML workloads?"}
)
print("--- Async Chain Response ---")
print(async_chain_response)
except Exception as e:
print(f"Error running async chain: {e}")
API 参考:StrOutputParser | ChatPromptTemplate
模型特性(取决于端点)
高级功能的可用性很大程度上取决于您的RunPod端点处理器的具体实现。ChatRunPod集成提供了基本框架,但处理器必须支持底层功能。
| 特性 | 集成支持 | 端点依赖? | 备注 |
|---|---|---|---|
| Tool calling | ❌ | ✅ | Requires handler to process tool definitions and return tool calls (e.g., OpenAI format). Integration needs parsing logic. |
| Structured output | ❌ | ✅ | Requires handler support for forcing structured output (JSON mode, function calling). Integration needs parsing logic. |
| JSON mode | ❌ | ✅ | Requires handler to accept a json_mode parameter (or similar) and guarantee JSON output. |
| Image input | ❌ | ✅ | Requires multimodal handler accepting image data (e.g., base64). Integration does not support multimodal messages. |
| Audio input | ❌ | ✅ | Requires handler accepting audio data. Integration does not support audio messages. |
| Video input | ❌ | ✅ | Requires handler accepting video data. Integration does not support video messages. |
| Token-level streaming | ✅ (Simulated) | ✅ | Polls /stream. Requires handler to populate stream list in status response with token chunks (e.g., [{"output": "token"}]). True low-latency streaming not built-in. |
| Native async | ✅ | ✅ | Core ainvoke/astream implemented. Relies on endpoint handler performance. |
| Token usage | ❌ | ✅ | Requires handler to return prompt_tokens, completion_tokens in the final response. Integration currently does not parse this. |
| Logprobs | ❌ | ✅ | Requires handler to return log probabilities. Integration currently does not parse this. |
关键要点:如果端点遵循基本的RunPod API约定,标准聊天调用和模拟流式传输即可正常工作。高级功能需要特定的处理器实现,并可能需要扩展或自定义此集成包。
API 参考
有关 ChatRunPod 类、参数和方法的详细文档,请参阅源代码或生成的API参考(如果可用)。
源代码链接: https://github.com/runpod/langchain-runpod/blob/main/langchain_runpod/chat_models.py