Graph-based metadata filtering for improving vector search in RAG applications

基于图的元数据过滤,以提高 RAG 应用中的向量搜索性能

11 分钟阅读

使用 LangChain 和 Neo4j 通过先进的基于图的元数据技术优化向量检索

编者注:以下是由 Tomaz Bratanic 撰写的客座博客文章,他专注于 Graph ML 和 GenAI 在 Neo4j的研究。 Neo4j 是一家图数据库和分析公司,帮助组织深入、轻松、快速地在数十亿的数据连接中找到隐藏的关系和模式。

文本嵌入和向量相似性搜索帮助我们通过理解文档的含义以及它们彼此之间的相似程度来查找文档。然而,当基于特定标准(如日期或类别)对信息进行排序时,文本嵌入的效果并不理想;例如,如果您需要查找特定年份创建的所有文档或标记在特定类别(如“科幻小说”)下的文档。这就是元数据过滤或过滤向量搜索发挥作用的地方,因为它可以有效地处理这些结构化过滤器,允许用户根据特定属性缩小搜索结果。

使用预过滤方法的元数据过滤,您首先缩小相关文档的集合,然后在缩小的集合上应用向量相似性搜索。图片由作者提供。

在提供的图像中,流程从用户询问 2021 年是否实施了任何新政策开始。然后使用元数据过滤器按指定的年份(在本例中为 2021 年)对较大的索引文档池进行排序。这产生了仅来自该年份的过滤文档子集。为了进一步缩小到最相关的文档,在此子集中执行向量相似性搜索。此方法允许系统从 2021 年在上下文相关的文档池中找到与感兴趣主题密切相关的文档。这个两步过程,元数据过滤后跟向量相似性搜索,提高了搜索结果的准确性和相关性。

最近,我们推出了 LangChain 对 Neo4j 中基于节点属性的元数据过滤的支持。然而,像 Neo4j 这样的图数据库可以存储高度复杂和连接的结构化数据以及非结构化数据。让我们看下面的例子

高度连接的数据以及非结构化数据的图表示。图片由作者提供。

数据集的非结构化部分表示文章及其文本块,位于可视化的右上角。文本块节点包含文本及其文本嵌入值,并链接到文章节点,其中存在有关文章的更多信息,例如日期、情感、作者等。然而,文章随后进一步链接到它们提及的组织。在本例中,文章提到了 Neo4j。此外,我们的数据集还包括关于 Neo4j 的丰富结构化信息,例如其投资者、董事会成员、供应商等等。

因此,我们可以利用这种广泛的结构化信息来执行复杂的元数据过滤,使我们能够使用结构化标准精确地优化我们的文档选择,例如

  • Rod Johnson 担任董事会成员的公司中,是否有任何公司实施了新的在家办公政策?
  • 是否有关于 Neo4j 投资的公司的任何负面新闻?
  • 与现代汽车供应商的公司相关的供应链问题是否有任何值得关注的新闻?

通过所有这些示例问题,您可以使用基于结构化图的元数据过滤器大大缩小相关文档子集。

在这篇博文中,我将向您展示如何结合 LangChain 和 OpenAI 函数调用代理来实现基于图的元数据过滤。代码可在 GitHub上找到。

议程

我们将使用所谓的公司图数据集,该数据集在 Neo4j 托管的公共演示服务器上可用。您可以使用以下凭据访问它。

Neo4j Browser URI: https://demo.neo4jlabs.com:7473/browser/
username: companies
password: companies
database: companies

数据集的完整模式如下

图模式。图片由作者提供。

图模式围绕着组织节点展开。关于它们的供应商、竞争对手、地点、董事会成员等有大量可用信息。如前所述,还有提及特定组织的文章及其相应的文本块。

我们将实现一个带有单个工具的 OpenAI 代理,该工具可以根据用户输入动态生成 Cypher 语句,并从图数据库中检索相关文本块。在本例中,该工具将有四个可选输入参数

  • 主题:用户感兴趣的除组织、国家和情感之外的任何特定信息或主题。
  • 组织:用户想要查找信息的组织
  • 国家:用户感兴趣的组织所在的国家。使用全名,如美利坚合众国和法国。
  • 情感:文章的情感

基于这四个输入参数,我们将动态但确定性地构建相应的 Cypher 语句,以从图中检索相关信息,并将其用作上下文,以使用 LLM 生成最终答案。

您将需要一个 OpenAI API 密钥才能继续学习代码。

函数实现

我们将首先定义凭据和与 Neo4j 的相关连接。

import os

os.environ["OPENAI_API_KEY"] = "sk-"
os.environ["NEO4J_URI"] = "neo4j+s://demo.neo4jlabs.com"
os.environ["NEO4J_USERNAME"] = "companies"
os.environ["NEO4J_PASSWORD"] = "companies"
os.environ["NEO4J_DATABASE"] = "companies"

embeddings = OpenAIEmbeddings()
graph = Neo4jGraph()
vector_index = Neo4jVector.from_existing_index(
    embeddings, 
    index_name="news"
)

如前所述,我们将使用 OpenAI 嵌入,为此您需要他们的 API 密钥。接下来,我们定义与 Neo4j 的图连接,允许我们执行任意 Cypher 语句。最后,我们实例化一个 Neo4jVector 连接,它可以通​​过查询现有的向量索引来检索信息。在撰写本文时,您不能将向量索引与预过滤方法结合使用;您只能将后过滤与向量索引结合使用。然而,关于后过滤的辩论超出了本文的范围,因为我们将重点关注预过滤方法与详尽的向量相似性搜索相结合。

或多或少,整篇博文都归结为以下 get_organization_news 函数,该函数动态生成 Cypher 语句并检索相关信息。为了清晰起见,我将代码分成多个部分。

def get_organization_news(
    topic: Optional[str] = None,
    organization: Optional[str] = None,
    country: Optional[str] = None,
    sentiment: Optional[str] = None,
) -> str:
    # If there is no prefiltering, we can use vector index
    if topic and not organization and not country and not sentiment:
        return vector_index.similarity_search(topic)
    # Uses parallel runtime where available
    base_query = (
        "CYPHER runtime = parallel parallelRuntimeSupport=all "
        "MATCH (c:Chunk)<-[:HAS_CHUNK]-(a:Article) WHERE "
    )
    where_queries = []
    params = {"k": 5} # Define the number of text chunks to retrieve

我们首先定义输入参数。如您所见,它们都是可选字符串。topic 参数用于在文档中查找特定信息。在实践中,我们嵌入 topic 参数的值,并将其用作向量相似性搜索的输入。其他三个参数将用于演示预过滤方法。

如果所有预过滤参数都为空,我们可以使用现有的向量索引找到相关文档。否则,我们开始准备将用于预过滤元数据方法的基础 Cypher 语句。子句 CYPHER runtime = parallel parallelRuntimeSupport=all 指示 Neo4j 数据库在可用时使用 并行运行时。接下来,我们准备一个 match 语句,该语句选择 Chunk 节点及其对应的 Article 节点。

现在我们准备好将元数据过滤器动态附加到 Cypher 语句。我们将从 Organization过滤器开始。

if organization:
    # Map to database
    candidates = get_candidates(organization)
    if len(candidates) > 1:  # Ask for follow up if too many options
        return (
         "Ask a follow up question which of the available organizations "
         f"did the user mean. Available options: {candidates}"
        )
    where_queries.append(
        "EXISTS {(a)-[:MENTIONS]->(:Organization {name: $organization})}"
    )
    params["organization"] = candidates[0]

如果 LLM 识别出用户感兴趣的任何特定组织,我们必须首先使用 get_candidates函数将该值映射到数据库。在底层,get_candidates函数使用 关键字搜索,利用全文索引来查找候选节点。如果找到多个候选节点,我们指示 LLM 向用户提出后续问题,以明确他们具体指的是哪个组织。否则,我们附加一个 存在性子查询,该子查询将提及特定组织的文章过滤到过滤器列表中。为了防止任何 Cypher 注入,我们使用查询参数而不是连接查询。

接下来,我们处理用户想要根据提及的组织所在的国家/地区预过滤文本块的情况。

if country:
    # No need to disambiguate
    where_queries.append(
        "EXISTS {(a)-[:MENTIONS]->(:Organization)-[:IN_CITY]->()-[:IN_COUNTRY]->(:Country {name: $country})}"
    )
    params["country"] = country

由于国家/地区遵循标准命名,因此我们不必将值映射到数据库,因为 LLM 熟悉大多数国家/地区命名标准。

类似地,我们也处理情感元数据过滤。

if sentiment:
    if sentiment == "positive":
        where_queries.append("a.sentiment > $sentiment")
        params["sentiment"] = 0.5
    else:
        where_queries.append("a.sentiment < $sentiment")
        params["sentiment"] = -0.5

我们将指示 LLM 仅对情感输入值使用两个值,即正面或负面。然后,我们将这两个值映射到适当的过滤器值。

我们以稍微不同的方式处理 topic参数,因为它不用于预过滤,而是用于向量相似性搜索。

if topic:  # Do vector comparison
    vector_snippet = (
        " WITH c, a, vector.similarity.cosine(c.embedding,$embedding) AS score "
        "ORDER BY score DESC LIMIT toInteger($k) "
    )
    params["embedding"] = embeddings.embed_query(topic)
else:  # Just return the latest data
    vector_snippet = " WITH c, a ORDER BY a.date DESC LIMIT toInteger($k) "

如果 LLM 识别出用户对新闻中的特定主题感兴趣,我们使用主题输入的文本嵌入来查找最相关的文档。另一方面,如果没有识别出特定主题,我们只需返回最近的几篇文章,并完全避免向量相似性搜索。

现在,我们必须将 Cypher 语句放在一起,并使用它从数据库中检索信息。

return_snippet = "RETURN '#title ' + a.title + '\n#date ' + toString(a.date) + '\n#text ' + c.text AS output"

complete_query = (
    base_query + " AND ".join(where_queries) + vector_snippet + return_snippet
)

# Retrieve information from the database
data = graph.query(complete_query, params)
print(f"Cypher: {complete_query}\n")
# Safely remove embedding before printing
params.pop('embedding', None)
print(f"Parameters: {params}")
return "###Article: ".join([el["output"] for el in data])

我们通过组合所有查询片段来构建最终的 complete_query。之后,我们使用动态生成的 Cypher 语句从数据库中检索信息并将其返回给 LLM。让我们检查针对示例输入生成的 Cypher 语句。

get_organization_news(
  organization='neo4j',
  sentiment='positive',
  topic='remote work'
)

# Cypher: CYPHER runtime = parallel parallelRuntimeSupport=all
# MATCH (c:Chunk)<-[:HAS_CHUNK]-(a:Article) WHERE 
# EXISTS {(a)-[:MENTIONS]->(:Organization {name: $organization})} AND 
# a.sentiment > $sentiment 
# WITH c, a, vector.similarity.cosine(c.embedding,$embedding) AS score 
# ORDER BY score DESC LIMIT toInteger($k) 
# RETURN '#title ' + a.title + '\ndate ' + toString(a.date) + '\ntext ' + c.text AS output

# Parameters: {'k': 5, 'organization': 'Neo4j', 'sentiment': 0.5}

动态查询生成按预期工作,并且能够从数据库中检索相关信息。

定义 OpenAI 代理

接下来,我们需要将该函数包装为代理工具。首先,我们将添加输入参数描述。

fewshot_examples = """{Input:What are the health benefits for Google employees in the news? Query: Health benefits}
{Input: What is the latest positive news about Google? Query: None}
{Input: Are there any news about VertexAI regarding Google? Query: VertexAI}
{Input: Are there any news about new products regarding Google? Query: new products}
"""

class NewsInput(BaseModel):
    topic: Optional[str] = Field(
        description="Any specific information or topic besides organization, country, and sentiment that the user is interested in. Here are some examples: "
        + fewshot_examples
    )
    organization: Optional[str] = Field(
        description="Organization that the user wants to find information about"
    )
    country: Optional[str] = Field(
        description="Country of organizations that the user is interested in. Use full names like United States of America and France."
    )
    sentiment: Optional[str] = Field(
        description="Sentiment of articles", enum=["positive", "negative"]
    )

预过滤参数非常容易描述,但是我在使 topic 参数按预期工作方面遇到了一些问题。最后,我决定添加一些示例,以便 LLM 更好地理解它。此外,您可以观察到我们向 LLM 提供了关于 country命名格式的信息,并为 sentiment 提供了枚举。

现在,我们可以通过给自定义工具命名和描述(其中包含关于 LLM 何时使用它的说明)来定义自定义工具。

class NewsTool(BaseTool):
    name = "NewsInformation"
    description = (
        "useful for when you need to find relevant information in the news"
    )
    args_schema: Type[BaseModel] = NewsInput

    def _run(
        self,
        topic: Optional[str] = None,
        organization: Optional[str] = None,
        country: Optional[str] = None,
        sentiment: Optional[str] = None,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Use the tool."""
        return get_organization_news(topic, organization, country, sentiment)

最后一件事是定义代理执行器。我只是重用了我前段时间实现的 OpenAI 代理的 LCEL 实现

llm = ChatOpenAI(temperature=0, model="gpt-4-turbo", streaming=True)
tools = [NewsTool()]

llm_with_tools = llm.bind(functions=[format_tool_to_openai_function(t) for t in tools])

prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant that finds information about movies "
            " and recommends them. If tools require follow up questions, "
            "make sure to ask the user for clarification. Make sure to include any "
            "available options that need to be clarified in the follow up questions "
            "Do only the things the user specifically requested. ",
        ),
        MessagesPlaceholder(variable_name="chat_history"),
        ("user", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

agent = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: _format_chat_history(x["chat_history"])
        if x.get("chat_history")
        else [],
        "agent_scratchpad": lambda x: format_to_openai_function_messages(
            x["intermediate_steps"]
        ),
    }
    | prompt
    | llm_with_tools
    | OpenAIFunctionsAgentOutputParser()
)

agent_executor = AgentExecutor(agent=agent, tools=tools)

该代理有一个工具可以用来检索有关新闻的信息。我们还添加了 chat_history 消息占位符,使代理具有对话性,并允许后续问题和回复。

实现测试

让我们运行几个输入并检查生成的 Cypher 语句和参数。

agent_executor.invoke(
  {"input": "What are some positive news regarding neo4j?"}
)

# Cypher: CYPHER runtime = parallel parallelRuntimeSupport=all 
# MATCH (c:Chunk)<-[:HAS_CHUNK]-(a:Article) WHERE 
# EXISTS {(a)-[:MENTIONS]->(:Organization {name: $organization})} AND 
# a.sentiment > $sentiment WITH c, a 
# ORDER BY a.date DESC LIMIT toInteger($k) 
# RETURN '#title ' + a.title + 'date ' + toString(a.date) + 'text ' + c.text AS output
# Parameters: {'k': 5, 'organization': 'Neo4j', 'sentiment': 0.5}

生成的 Cypher 语句是有效的。由于我们没有指定任何特定主题,因此它返回提及 Neo4j 的正面文章的最后五个文本块。让我们做一些更复杂的事情

agent_executor.invoke(
   {"input": "What are some of the latest negative news about employee happiness for companies from France?"}
)

# Cypher: CYPHER runtime = parallel parallelRuntimeSupport=all 
# MATCH (c:Chunk)<-[:HAS_CHUNK]-(a:Article) WHERE 
# EXISTS {(a)-[:MENTIONS]->(:Organization)-[:IN_CITY]->()-[:IN_COUNTRY]->(:Country {name: $country})} AND 
# a.sentiment < $sentiment 
# WITH c, a, vector.similarity.cosine(c.embedding,$embedding) AS score 
# ORDER BY score DESC LIMIT toInteger($k) 
# RETURN '#title ' + a.title + 'date ' + toString(a.date) + 'text ' + c.text AS output
# Parameters: {'k': 5, 'country': 'France', 'sentiment': -0.5, 'topic': 'employee happiness'}

LLM 代理正确生成了预过滤参数,但也识别出了一个特定的员工幸福感主题。此主题用作向量相似性搜索的输入,使我们能够进一步优化检索过程。

总结

在这篇博文中,我们实现了基于图的元数据过滤器示例,从而提高了向量搜索的准确性。然而,数据集具有广泛且互连的选项,这些选项允许进行更复杂的预过滤查询。借助图数据表示,当与 LLM 函数调用功能结合使用以动态生成 Cypher 语句时,结构化过滤器的可能性几乎是无限的。

此外,您的代理可以具有检索非结构化文本的工具(如本博文所示),以及其他可以检索结构化信息的工具,从而使知识图谱成为许多 RAG 应用程序的绝佳解决方案。

代码可在 GitHub上找到。