Introducing Airbyte sources within LangChain

在 LangChain 中引入 Airbyte 数据源

4 分钟阅读

编者按:这篇文章是与 Airbyte 团队合作撰写的。他们使将更多数据源连接到 LangChain 作为文档加载器变得非常容易。

现在可以在您的基于 LangChain 的应用程序中直接使用 Airbyte 数据源,用于 GongHubspotSalesforceShopifyStripeTypeformZendesk Support,并以 文档加载器 的形式实现。

例如,要加载用户的 Stripe 发票,您可以使用 AirbyteStripeLoader。安装非常简单,当您在本地安装了 LangChain 后,您只需要安装您感兴趣的数据源,就可以开始了

pip install airbyte-source-stripe

之后,只需导入加载器并传入配置和您想要加载的数据流。

from langchain.document_loaders.airbyte import AirbyteStripeLoader
config = {
  "client_secret": "<secret key>",
  "account_id": "<account id>",
  "start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>"
}
loader = AirbyteStripeLoader(config=config, stream_name="invoices")
documents = loader.load()
# use documents in vector store or otherwise

为什么这很重要?

这是使 Airbyte 的 300 多个数据源 在 LangChain 中作为文档加载器提供的开端。

Airbyte 可以将数据从几乎任何来源移动到您的数据仓库或向量数据库,以支持您的 LLM 用例(查看此 教程 了解如何设置这样的数据管道!)。这通常通过使用 Airbyte Cloud 或本地 Airbyte 实例,设置连接,并在计划时间(或通过 API 触发)运行来实现,以确保您的数据保持新鲜。

但是,如果您刚开始并且所有内容都在本地运行,则使用完整的 Airbyte 实例(包括 UI、调度服务、横向扩展功能等)可能显得多余。

通过此版本,在 LangChain 中直接在您的 Python 运行时中运行任何基于 Python 的数据源比以往任何时候都容易 - 无需启动 Airbyte 实例或向 Airbyte Cloud 发出 API 调用。

在托管和嵌入式 Airbyte 之间切换

由于底层运行的是相同的代码,因此每个 Airbyte 构建的加载器都与 Airbyte 服务中的相应数据源兼容。这意味着将您的嵌入式加载管道提升到您的自托管 Airbyte 安装或您的 Airbyte Cloud 实例中是微不足道的。配置对象和记录的结构是 100% 兼容的。

在托管 Airbyte 上运行同步意味着

  • UI 用于跟踪正在运行的管道
  • 失败同步的警报
  • 轻松地按计划运行管道

使用 LangChain 加载器运行同步意味着

  • 没有运行另一个服务的开销
  • 完全控制时间和管道执行

将 Airbyte 记录映射到 LangChain 文档

默认情况下,每个记录都会作为加载器的一部分映射到一个 Document,记录中的所有各种字段都成为记录的元数据。文档的文本部分留空。您可以传入一个记录处理程序来自定义此行为,以根据数据构建记录的文本部分。

def handle_record(record, id):
    return Document(page_content=record.data["title"], metadata=record.data)
loader = AirbyteGongLoader(config=config, record_handler=handle_record, stream_name="calls")

增量加载

由于您的 python 应用程序基本上充当 Airbyte 平台,因此您可以完全控制“同步”的执行方式。例如,如果您的数据流支持,您仍然可以通过访问加载器的“last_state”属性从 增量同步 中受益。这使您能够仅加载自上次加载以来更改的文档,从而有效地更新现有的向量数据库。

import airbyte_cdk.models.airbyte_protocol import AirbyteMessage
with open('stripe_sync_checkpoint.json', 'wb') as file:
    file.write(loader.last_state.json())

// ... later
with open('stripe_sync_checkpoint.json', 'r') as file:
    current_state = AirbyteStateMessage.parse_raw(file.read())
incremental_loader = AirbyteStripeLoader(config=config, stream_name="invoices", state=current_state)
new_docs = incremental_loader.load()

自定义数据源

目前,以下 Airbyte 数据源以 pip 包的形式提供(未来会有更多)

  • Gong pip install airbyte-source-gong
  • Hubspot pip install airbyte-source-hubspot
  • Salesforce pip install airbyte-source-salesforce
  • Shopify pip install airbyte-source-shopify
  • Stripe pip install airbyte-source-stripe
  • Typeform pip install airbyte-source-typeform
  • Zendesk Support pip install airbyte-source-zendesk-support

但是,如果您已经实现了自己的自定义 Airbyte 数据源,也可以通过使用 AirbyteCDKLoader 基类来集成它们,该基类与 Airbyte CDK 的 Source 接口一起使用。

from langchain.document_loaders.airbyte import AirbyteCDKLoader
from my_source.source import MyCustomSource # plug in your own source here
config = {
   # your custom configuration
}
loader = AirbyteCDKLoader(source_class=MyCustomSource, config=config, stream_name="my-stream")

您还可以通过直接通过 git 安装来从主 Airbyte 存储库安装数据源 - 例如,要获取 Github 数据源,只需运行

pip install "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"

之后,该数据源就可以被插入到 AirbyteCDKLoader 中。

from source_github.source import SourceGithub
issues_loader = AirbyteCDKLoader(source_class=SourceGithub, config=config, stream_name="issues")

查看 连接器开发文档,了解如何开始编写您自己的数据源 - 入门很容易,并且可以让您根据需要从本地嵌入式加载器无缝过渡到使用托管的 Airbyte 实例。

有任何问题吗?我们很乐意听取您的意见

如果您有兴趣利用 Airbyte 将数据传输到您的基于 LLM 的应用程序,请花一点时间 填写我们的调查问卷,以便我们确保优先考虑最重要的功能。

如果您有任何问题或对以这种方式公开的其他现有数据源感兴趣,请随时在我们的 社区 Slack 频道Airbyte 频道 (在 LangChain Discord 服务器上) 中联系我们。