编者按:这篇文章是与 Airbyte 团队合作撰写的。他们使将更多数据源连接到 LangChain 作为文档加载器变得非常容易。
现在可以在您的基于 LangChain 的应用程序中直接使用 Airbyte 数据源,用于 Gong、 Hubspot、 Salesforce、 Shopify、 Stripe、 Typeform 和 Zendesk Support,并以 文档加载器 的形式实现。
例如,要加载用户的 Stripe 发票,您可以使用 AirbyteStripeLoader。安装非常简单,当您在本地安装了 LangChain 后,您只需要安装您感兴趣的数据源,就可以开始了
pip install airbyte-source-stripe
之后,只需导入加载器并传入配置和您想要加载的数据流。
from langchain.document_loaders.airbyte import AirbyteStripeLoader
config = {
"client_secret": "<secret key>",
"account_id": "<account id>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>"
}
loader = AirbyteStripeLoader(config=config, stream_name="invoices")
documents = loader.load()
# use documents in vector store or otherwise
为什么这很重要?
这是使 Airbyte 的 300 多个数据源 在 LangChain 中作为文档加载器提供的开端。
Airbyte 可以将数据从几乎任何来源移动到您的数据仓库或向量数据库,以支持您的 LLM 用例(查看此 教程 了解如何设置这样的数据管道!)。这通常通过使用 Airbyte Cloud 或本地 Airbyte 实例,设置连接,并在计划时间(或通过 API 触发)运行来实现,以确保您的数据保持新鲜。
但是,如果您刚开始并且所有内容都在本地运行,则使用完整的 Airbyte 实例(包括 UI、调度服务、横向扩展功能等)可能显得多余。
通过此版本,在 LangChain 中直接在您的 Python 运行时中运行任何基于 Python 的数据源比以往任何时候都容易 - 无需启动 Airbyte 实例或向 Airbyte Cloud 发出 API 调用。
在托管和嵌入式 Airbyte 之间切换
由于底层运行的是相同的代码,因此每个 Airbyte 构建的加载器都与 Airbyte 服务中的相应数据源兼容。这意味着将您的嵌入式加载管道提升到您的自托管 Airbyte 安装或您的 Airbyte Cloud 实例中是微不足道的。配置对象和记录的结构是 100% 兼容的。
在托管 Airbyte 上运行同步意味着
- UI 用于跟踪正在运行的管道
- 失败同步的警报
- 轻松地按计划运行管道
使用 LangChain 加载器运行同步意味着
- 没有运行另一个服务的开销
- 完全控制时间和管道执行
将 Airbyte 记录映射到 LangChain 文档
默认情况下,每个记录都会作为加载器的一部分映射到一个 Document,记录中的所有各种字段都成为记录的元数据。文档的文本部分留空。您可以传入一个记录处理程序来自定义此行为,以根据数据构建记录的文本部分。
def handle_record(record, id):
return Document(page_content=record.data["title"], metadata=record.data)
loader = AirbyteGongLoader(config=config, record_handler=handle_record, stream_name="calls")
增量加载
由于您的 python 应用程序基本上充当 Airbyte 平台,因此您可以完全控制“同步”的执行方式。例如,如果您的数据流支持,您仍然可以通过访问加载器的“last_state”属性从 增量同步 中受益。这使您能够仅加载自上次加载以来更改的文档,从而有效地更新现有的向量数据库。
import airbyte_cdk.models.airbyte_protocol import AirbyteMessage
with open('stripe_sync_checkpoint.json', 'wb') as file:
file.write(loader.last_state.json())
// ... later
with open('stripe_sync_checkpoint.json', 'r') as file:
current_state = AirbyteStateMessage.parse_raw(file.read())
incremental_loader = AirbyteStripeLoader(config=config, stream_name="invoices", state=current_state)
new_docs = incremental_loader.load()
自定义数据源
目前,以下 Airbyte 数据源以 pip 包的形式提供(未来会有更多)
- Gong pip install airbyte-source-gong
- Hubspot pip install airbyte-source-hubspot
- Salesforce pip install airbyte-source-salesforce
- Shopify pip install airbyte-source-shopify
- Stripe pip install airbyte-source-stripe
- Typeform pip install airbyte-source-typeform
- Zendesk Support pip install airbyte-source-zendesk-support
但是,如果您已经实现了自己的自定义 Airbyte 数据源,也可以通过使用 AirbyteCDKLoader 基类来集成它们,该基类与 Airbyte CDK 的 Source 接口一起使用。
from langchain.document_loaders.airbyte import AirbyteCDKLoader
from my_source.source import MyCustomSource # plug in your own source here
config = {
# your custom configuration
}
loader = AirbyteCDKLoader(source_class=MyCustomSource, config=config, stream_name="my-stream")
您还可以通过直接通过 git 安装来从主 Airbyte 存储库安装数据源 - 例如,要获取 Github 数据源,只需运行
pip install "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"
之后,该数据源就可以被插入到 AirbyteCDKLoader 中。
from source_github.source import SourceGithub
issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name="issues")
查看 连接器开发文档,了解如何开始编写您自己的数据源 - 入门很容易,并且可以让您根据需要从本地嵌入式加载器无缝过渡到使用托管的 Airbyte 实例。
有任何问题吗?我们很乐意听取您的意见
如果您有兴趣利用 Airbyte 将数据传输到您的基于 LLM 的应用程序,请花一点时间 填写我们的调查问卷,以便我们确保优先考虑最重要的功能。
如果您有任何问题或对以这种方式公开的其他现有数据源感兴趣,请随时在我们的 社区 Slack 频道 或 Airbyte 频道 (在 LangChain Discord 服务器上) 中联系我们。