Airbyte Salesforce(已弃用)
注意:此特定于连接器的加载器已弃用。请改用 AirbyteLoader。
Airbyte is a data integration platform for ELT pipelines from APIs, databases & files to warehouses & lakes. It has the largest catalog of ELT connectors to data warehouses and databases.
此加载器将 Salesforce 连接器暴露为文档加载器,允许您将各种 Salesforce 对象作为文档进行加载。
安装
首先,你需要安装 airbyte-source-salesforce Python 包。
%pip install --upgrade --quiet airbyte-source-salesforce
示例
查看 Airbyte 文档页面 以了解如何配置读取器。 配置对象应遵循的 JSON 模式可在 Github 上找到:https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml。
总体形状如下:
{
"client_id": "<oauth client id>",
"client_secret": "<oauth client secret>",
"refresh_token": "<oauth refresh token>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"is_sandbox": False, # set to True if you're using a sandbox environment
"streams_criteria": [ # Array of filters for salesforce objects that should be loadable
{"criteria": "exacts", "value": "Account"}, # Exact name of salesforce object
{"criteria": "starts with", "value": "Asset"}, # Prefix of the name
# Other allowed criteria: ends with, contains, starts not with, ends not with, not contains, not exacts
],
}
默认情况下,所有字段都作为元数据存储在文档中,而文本被设置为空字符串。请通过对阅读器返回的文档进行转换来构建文档的文本内容。
from langchain_community.document_loaders.airbyte import AirbyteSalesforceLoader
config = {
# your salesforce configuration
}
loader = AirbyteSalesforceLoader(
config=config, stream_name="asset"
) # check the documentation linked above for a list of all streams
现在您可以按常规方式加载文档
docs = loader.load()
由于 load 返回一个列表,它会阻塞直到所有文档加载完成。为了更好地控制此过程,您也可以使用 lazy_load 方法,该方法返回一个迭代器:
docs_iterator = loader.lazy_load()
请注意,默认情况下页面内容为空,而元数据对象包含记录中的所有信息。若要以不同方式创建文档,请在创建加载器时传入 record_handler 函数:
from langchain_core.documents import Document
def handle_record(record, id):
return Document(page_content=record.data["title"], metadata=record.data)
loader = AirbyteSalesforceLoader(
config=config, record_handler=handle_record, stream_name="asset"
)
docs = loader.load()
增量加载
某些流支持增量加载,这意味着源会跟踪已同步的记录,不会再次加载它们。这对于数据量大且频繁更新的源非常有用。
为了利用这一点,请存储加载器的 last_state 属性,并在重新创建加载器时传入该属性。这将确保仅加载新记录。
last_state = loader.last_state # store safely
incremental_loader = AirbyteSalesforceLoader(
config=config, stream_name="asset", state=last_state
)
new_docs = incremental_loader.load()