mirror of
https://github.com/alibaba/higress.git
synced 2026-02-06 23:21:08 +08:00
fix: rag add python example code (#3043)
This commit is contained in:
@@ -132,7 +132,7 @@ data:
|
||||
- path_rewrite_prefix: ""
|
||||
upstream_type: ""
|
||||
enable_path_rewrite: false
|
||||
match_rule_domain: ""
|
||||
match_rule_domain: "*"
|
||||
match_rule_path: "/mcp-servers/rag"
|
||||
match_rule_type: "prefix"
|
||||
servers:
|
||||
@@ -323,5 +323,615 @@ Open your browser and navigate to http://localhost:8000
|
||||
```
|
||||
|
||||
|
||||
## 如何对接已有的向量数据库
|
||||
|
||||
### 1. 基于 langchain + langchain-milvus 代码样例,用于生成测试向量数据库。
|
||||
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
基于 LangChain Milvus 的文档处理系统
|
||||
功能:
|
||||
1. 使用 langchain UnstructuredFileLoader 加载文本文件成 Document
|
||||
2. 使用 RecursiveTextSplitter 对 Document 进行 chunk 分割
|
||||
3. 使用 OpenAI 兼容的 embedding 模型生成向量
|
||||
4. 使用 langchain_milvus.Milvus 进行向量存储和检索, 参考文档 https://python.langchain.com/docs/integrations/vectorstores/milvus/
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
import uuid
|
||||
from typing import List, Dict, Any, Optional
|
||||
from pathlib import Path
|
||||
|
||||
# LangChain imports
|
||||
from langchain_community.document_loaders import UnstructuredFileLoader
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
from langchain_core.documents import Document
|
||||
from langchain_milvus import Milvus
|
||||
from langchain_core.embeddings import Embeddings
|
||||
|
||||
# OpenAI client import
|
||||
from openai import OpenAI
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DashScopeEmbeddings(Embeddings):
|
||||
def __init__(self, openai_api_key: Optional[str] = None, openai_api_base: Optional[str] = None, model: str = "text-embedding-v1", dim: int = 1536):
|
||||
self.client = OpenAI(
|
||||
api_key=openai_api_key or os.getenv("DASHSCOPE_API_KEY"),
|
||||
base_url=openai_api_base or "https://dashscope.aliyuncs.com/compatible-mode/v1"
|
||||
)
|
||||
self.model = model
|
||||
self.dim = dim
|
||||
|
||||
def embed_documents(self, texts: List[str]) -> List[List[float]]:
|
||||
response = self.client.embeddings.create(
|
||||
model=self.model,
|
||||
input=texts,
|
||||
dimensions=self.dim,
|
||||
encoding_format="float"
|
||||
)
|
||||
return [data.embedding for data in response.data]
|
||||
|
||||
def embed_query(self, text: str) -> List[float]:
|
||||
response = self.client.embeddings.create(
|
||||
model=self.model,
|
||||
input=[text],
|
||||
dimensions=self.dim,
|
||||
encoding_format="float"
|
||||
)
|
||||
return response.data[0].embedding
|
||||
|
||||
|
||||
class LangChainMilvusProcessor:
|
||||
"""基于 LangChain Milvus 的文档处理器"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
milvus_uri: str = "http://localhost:19530",
|
||||
milvus_token: str = "",
|
||||
db_name: str = "default",
|
||||
collection_name: str = "langchain_rag",
|
||||
embedding_model: str = "text-embedding-v4",
|
||||
openai_api_key: Optional[str] = None,
|
||||
openai_api_base: Optional[str] = None,
|
||||
chunk_size: int = 500,
|
||||
chunk_overlap: int = 50,
|
||||
embedding_dim: int = 1024,
|
||||
drop_old: bool = False
|
||||
):
|
||||
"""
|
||||
初始化 LangChain Milvus 文档处理器
|
||||
|
||||
Args:
|
||||
milvus_uri: Milvus 服务器 URI
|
||||
milvus_token: Milvus 认证 token
|
||||
db_name: 数据库名称
|
||||
collection_name: 集合名称
|
||||
embedding_model: 嵌入模型名称
|
||||
openai_api_key: OpenAI API 密钥
|
||||
openai_api_base: OpenAI API 基础 URL
|
||||
chunk_size: 文本分割大小
|
||||
chunk_overlap: 文本分割重叠大小
|
||||
embedding_dim: 向量维度
|
||||
drop_old: 是否删除已存在的集合
|
||||
"""
|
||||
self.milvus_uri = milvus_uri
|
||||
self.milvus_token = milvus_token
|
||||
self.db_name = db_name
|
||||
self.collection_name = collection_name
|
||||
self.chunk_size = chunk_size
|
||||
self.chunk_overlap = chunk_overlap
|
||||
self.embedding_dim = embedding_dim
|
||||
self.drop_old = drop_old
|
||||
self.embedding_model = embedding_model
|
||||
self.embedding_dim = embedding_dim
|
||||
|
||||
# 初始化文本分割器
|
||||
self.text_splitter = RecursiveCharacterTextSplitter(
|
||||
chunk_size=chunk_size,
|
||||
chunk_overlap=chunk_overlap,
|
||||
length_function=len,
|
||||
separators=["\n\n", "\n", " ", ""]
|
||||
)
|
||||
|
||||
self.embeddings = DashScopeEmbeddings(
|
||||
openai_api_key=openai_api_key,
|
||||
openai_api_base=openai_api_base,
|
||||
model=embedding_model,
|
||||
dim=embedding_dim
|
||||
)
|
||||
|
||||
# 初始化 Milvus 向量存储
|
||||
self.vectorstore = None
|
||||
self._init_vectorstore()
|
||||
|
||||
def _init_vectorstore(self):
|
||||
"""初始化 Milvus 向量存储"""
|
||||
try:
|
||||
self.vectorstore = Milvus(
|
||||
embedding_function=self.embeddings,
|
||||
collection_name=self.collection_name,
|
||||
connection_args={
|
||||
"uri": self.milvus_uri,
|
||||
"token": self.milvus_token,
|
||||
"db_name": self.db_name
|
||||
},
|
||||
index_params={
|
||||
"index_type": "HNSW",
|
||||
"metric_type": "IP",
|
||||
"params": {"M": 8, "efConstruction": 64}
|
||||
},
|
||||
consistency_level="Strong",
|
||||
drop_old=self.drop_old,
|
||||
metadata_field="metadata" # 自定义元数据字段名
|
||||
)
|
||||
|
||||
logger.info(f"成功初始化 Milvus 向量存储: {self.collection_name}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"初始化 Milvus 向量存储失败: {e}")
|
||||
raise
|
||||
|
||||
def load_document(self, file_path: str) -> List[Document]:
|
||||
"""
|
||||
使用 UnstructuredFileLoader 加载文档
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
|
||||
Returns:
|
||||
Document 列表
|
||||
"""
|
||||
try:
|
||||
logger.info(f"加载文档: {file_path}")
|
||||
|
||||
# 检查文件是否存在
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"文件不存在: {file_path}")
|
||||
|
||||
# 使用 UnstructuredFileLoader 加载文档
|
||||
loader = UnstructuredFileLoader(file_path)
|
||||
documents = loader.load()
|
||||
|
||||
# 添加文件路径到元数据
|
||||
for doc in documents:
|
||||
doc.metadata["source"] = os.path.basename(file_path)
|
||||
doc.metadata["filename"] = file_path
|
||||
|
||||
logger.info(f"成功加载 {len(documents)} 个文档")
|
||||
return documents
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"加载文档失败: {e}")
|
||||
return []
|
||||
|
||||
def split_documents(self, documents: List[Document]) -> List[Document]:
|
||||
"""
|
||||
使用 RecursiveTextSplitter 分割文档
|
||||
|
||||
Args:
|
||||
documents: 文档列表
|
||||
|
||||
Returns:
|
||||
分割后的文档 chunk 列表
|
||||
"""
|
||||
try:
|
||||
logger.info(f"开始分割 {len(documents)} 个文档")
|
||||
|
||||
chunks = self.text_splitter.split_documents(documents)
|
||||
# 为每个 chunk 添加唯一 ID
|
||||
for i, chunk in enumerate(chunks):
|
||||
chunk.metadata["chunk_id"] = str(uuid.uuid4())
|
||||
chunk.metadata["chunk_index"] = i
|
||||
|
||||
logger.info(f"文档分割完成,共生成 {len(chunks)} 个 chunk")
|
||||
return chunks
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"文档分割失败: {e}")
|
||||
return []
|
||||
|
||||
def add_documents(self, documents: List[Document], ids: Optional[List[str]] = None) -> List[str]:
|
||||
"""
|
||||
添加文档到向量存储
|
||||
|
||||
Args:
|
||||
documents: 文档列表
|
||||
ids: 文档 ID 列表(可选)
|
||||
|
||||
Returns:
|
||||
添加的文档 ID 列表
|
||||
"""
|
||||
try:
|
||||
if not documents:
|
||||
logger.warning("没有文档需要添加")
|
||||
return []
|
||||
|
||||
logger.info(f"开始添加 {len(documents)} 个文档到向量存储")
|
||||
|
||||
# 如果没有提供 ID,则生成 UUID
|
||||
if ids is None:
|
||||
ids = [str(uuid.uuid4()) for _ in range(len(documents))]
|
||||
|
||||
# 添加文档到向量存储
|
||||
added_ids = self.vectorstore.add_documents(documents=documents, ids=ids)
|
||||
|
||||
logger.info(f"成功添加 {len(added_ids)} 个文档到向量存储")
|
||||
return added_ids
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"添加文档到向量存储失败: {e}")
|
||||
return []
|
||||
|
||||
def process_file(self, file_path: str) -> bool:
|
||||
"""
|
||||
处理单个文件的完整流程
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
try:
|
||||
logger.info(f"开始处理文件: {file_path}")
|
||||
|
||||
# 1. 加载文档
|
||||
documents = self.load_document(file_path)
|
||||
if not documents:
|
||||
return False
|
||||
|
||||
# 2. 分割文档
|
||||
chunks = self.split_documents(documents)
|
||||
if not chunks:
|
||||
return False
|
||||
|
||||
# 3. 添加到向量存储
|
||||
added_ids = self.add_documents(chunks)
|
||||
|
||||
if added_ids:
|
||||
logger.info(f"文件处理完成: {file_path},添加了 {len(added_ids)} 个 chunk")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"文件处理失败: {file_path}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理文件失败: {e}")
|
||||
return False
|
||||
|
||||
def process_directory(self, directory_path: str, file_extensions: List[str] = None) -> Dict[str, bool]:
|
||||
"""
|
||||
处理目录中的所有文件
|
||||
|
||||
Args:
|
||||
directory_path: 目录路径
|
||||
file_extensions: 支持的文件扩展名列表
|
||||
|
||||
Returns:
|
||||
文件处理结果字典
|
||||
"""
|
||||
if file_extensions is None:
|
||||
file_extensions = ['.txt', '.md']
|
||||
|
||||
results = {}
|
||||
|
||||
try:
|
||||
directory = Path(directory_path)
|
||||
if not directory.exists():
|
||||
logger.error(f"目录不存在: {directory_path}")
|
||||
return results
|
||||
|
||||
# 遍历目录中的文件
|
||||
for file_path in directory.rglob('*'):
|
||||
if file_path.is_file() and file_path.suffix.lower() in file_extensions:
|
||||
logger.info(f"处理文件: {file_path}")
|
||||
results[str(file_path)] = self.process_file(str(file_path))
|
||||
|
||||
# 统计结果
|
||||
success_count = sum(1 for success in results.values() if success)
|
||||
total_count = len(results)
|
||||
|
||||
logger.info(f"目录处理完成: {success_count}/{total_count} 个文件成功处理")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理目录失败: {e}")
|
||||
|
||||
return results
|
||||
|
||||
def similarity_search(self, query: str, k: int = 5) -> List[Document]:
|
||||
"""
|
||||
相似性搜索
|
||||
|
||||
注意:此方法仅用于原生 LangChain 检索示例,如果通过 Higress 网关进行检索,
|
||||
请使用网关提供的 MCP 工具(如 search 工具),无需直接调用此方法。
|
||||
|
||||
Args:
|
||||
query: 查询文本
|
||||
k: 返回结果数量
|
||||
|
||||
Returns:
|
||||
相似文档列表
|
||||
"""
|
||||
try:
|
||||
logger.info(f"执行相似性搜索: {query}")
|
||||
|
||||
results = self.vectorstore.similarity_search(query, k=k)
|
||||
|
||||
logger.info(f"搜索完成,返回 {len(results)} 个结果")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"相似性搜索失败: {e}")
|
||||
return []
|
||||
|
||||
def similarity_search_with_score(self, query: str, k: int = 5) -> List[tuple]:
|
||||
"""
|
||||
带分数的相似性搜索
|
||||
|
||||
Args:
|
||||
query: 查询文本
|
||||
k: 返回结果数量
|
||||
|
||||
Returns:
|
||||
(文档, 分数) 元组列表
|
||||
"""
|
||||
try:
|
||||
logger.info(f"执行带分数的相似性搜索: {query}")
|
||||
|
||||
results = self.vectorstore.similarity_search_with_score(query, k=k)
|
||||
|
||||
logger.info(f"搜索完成,返回 {len(results)} 个结果")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"带分数的相似性搜索失败: {e}")
|
||||
return []
|
||||
|
||||
def get_collection_stats(self) -> Dict[str, Any]:
|
||||
"""
|
||||
获取集合统计信息
|
||||
|
||||
Returns:
|
||||
集合统计信息字典
|
||||
"""
|
||||
try:
|
||||
# 通过 vectorstore 获取基本信息
|
||||
stats = {
|
||||
"collection_name": self.collection_name,
|
||||
"milvus_uri": self.milvus_uri,
|
||||
"db_name": self.db_name,
|
||||
"embedding_model": self.embedding_model,
|
||||
"embedding_dim": self.embedding_dim,
|
||||
"chunk_size": self.chunk_size,
|
||||
"chunk_overlap": self.chunk_overlap
|
||||
}
|
||||
|
||||
logger.info(f"成功获取集合 {self.collection_name} 的统计信息")
|
||||
return stats
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取集合统计信息失败: {e}")
|
||||
return {}
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数 - 示例用法"""
|
||||
# 配置参数
|
||||
config = {
|
||||
"milvus_uri": "http://localhost:19530",
|
||||
"milvus_token": "",
|
||||
"db_name": "default",
|
||||
"collection_name": "langchain_rag",
|
||||
"embedding_model": "text-embedding-v4",
|
||||
"openai_api_key": "sk-xxxx",
|
||||
"openai_api_base": "https://dashscope.aliyuncs.com/compatible-mode/v1",
|
||||
"chunk_size": 500,
|
||||
"chunk_overlap": 50,
|
||||
"embedding_dim": 1024,
|
||||
"drop_old": False
|
||||
}
|
||||
|
||||
# 创建处理器
|
||||
processor = LangChainMilvusProcessor(**config)
|
||||
|
||||
# 示例:处理单个文件
|
||||
file_path = "/path/demo.txt"
|
||||
processor.process_file(file_path)
|
||||
|
||||
# 示例:添加一些测试文档
|
||||
test_documents = [
|
||||
Document(
|
||||
page_content="""Istio 介绍
|
||||
服务网格是一个基础设施层,它为应用程序提供零信任安全、可观察性和高级流量管理等功能, 而无需更改代码。Istio 是最受欢迎、最强大、最值得信赖的服务网格。 Istio 由 Google、IBM 和 Lyft 于 2016 年创立,是云原生计算基金会的一个毕业项目, 与 Kubernetes 和 Prometheus 等项目并列。
|
||||
Istio 可确保云原生和分布式系统具有弹性,帮助现代企业在保持连接和保护的同时跨不同平台维护其工作负载。 它启用安全和治理控制,包括 mTLS 加密、策略管理和访问控制、 支持网络功能,例如金丝雀部署、A/B 测试、负载平衡、故障恢复, 并增加对整个资产流量的可观察性。
|
||||
Istio 并不局限于单个集群、网络或运行时的边界——在 Kubernetes 或 VM、多云、混合或本地上运行的服务都可以包含在单个网格中。
|
||||
Istio 经过精心设计,具有可扩展性,并受到贡献者和合作伙伴的广泛生态系统的支持, 它为各种用例提供打包的集成和分发。您可以独立安装 Istio,也可以选择由提供基于 Istio 的解决方案的商业供应商提供的托管支持。""",
|
||||
metadata={"source": "istio introduction"}
|
||||
),
|
||||
Document(
|
||||
page_content="""Istio 安全概述
|
||||
Istio 安全功能提供了强大的身份、强大的策略、透明的 TLS 加密、 认证/授权/审计(AAA)工具来保护您的服务和数据。Istio 安全功能提供了强大的身份、强大的策略、透明的 TLS 加密、 认证/授权/审计(AAA)工具来保护您的服务和数据。
|
||||
Istio 中的安全性涉及多个组件:
|
||||
- 用于密钥和证书管理的证书颁发机构(CA)
|
||||
- 配置 API 服务器分发给代理:
|
||||
- 认证策略
|
||||
- 授权策略
|
||||
- 安全命名信息
|
||||
- Sidecar 和边缘代理作为策略执行点(PEP) 以保护客户端和服务器之间的通信安全。
|
||||
- 一组 Envoy 代理扩展,用于管理遥测和审计。""",
|
||||
metadata={"source": "istio security"}
|
||||
),
|
||||
Document(
|
||||
page_content="""Istio 流量管理介绍
|
||||
为了在网格中导流,Istio 需要知道所有的 endpoint 在哪以及它们属于哪些服务。 为了定位到 service registry(服务注册中心), Istio 会连接到一个服务发现系统。例如,如果您在 Kubernetes 集群上安装了 Istio, 那么它将自动检测该集群中的服务和 endpoint。
|
||||
使用此服务注册中心,Envoy 代理可以将流量定向到相关服务。大多数基于微服务的应用程序, 每个服务的工作负载都有多个实例来处理流量,称为负载均衡池。默认情况下, Envoy 代理基于轮询调度模型在服务的负载均衡池内分发流量,按顺序将请求发送给池中每个成员, 一旦所有服务实例均接收过一次请求后,就重新回到第一个池成员。
|
||||
Istio 基本的服务发现和负载均衡能力为您提供了一个可用的服务网格, 但它能做到的远比这多的多。在许多情况下,您可能希望对网格的流量情况进行更细粒度的控制。 作为 A/B 测试的一部分,您可能想将特定百分比的流量定向到新版本的服务, 或者为特定的服务实例子集应用不同的负载均衡策略。您可能还想对进出网格的流量应用特殊的规则, 或者将网格的外部依赖项添加到服务注册中心。通过使用 Istio 的流量管理 API 将流量配置添加到 Istio, 就可以完成所有这些甚至更多的工作。
|
||||
和其他 Istio 配置一样,这些 API 也使用 Kubernetes 的自定义资源定义 (CRD)来声明,您可以像示例中看到的那样使用 YAML 进行配置。""",
|
||||
metadata={"source": "istio traffic management"}
|
||||
)
|
||||
]
|
||||
|
||||
# 添加测试文档
|
||||
processor.add_documents(test_documents)
|
||||
|
||||
# 示例:搜索
|
||||
query = "Istio 安全功能"
|
||||
search_results = processor.similarity_search_with_score(query, k=3)
|
||||
|
||||
print(f"\n搜索查询: {query}")
|
||||
print("=" * 50)
|
||||
for doc, score in search_results:
|
||||
print(f"分数: {score:.4f}")
|
||||
print(f"内容: {doc.page_content[:100]}...")
|
||||
print(f"元数据: {doc.metadata}")
|
||||
print("-" * 30)
|
||||
|
||||
# 获取统计信息
|
||||
stats = processor.get_collection_stats()
|
||||
print("\n集合统计信息:")
|
||||
print("=" * 50)
|
||||
for key, value in stats.items():
|
||||
print(f"{key}: {value}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
```
|
||||
|
||||
### 2. python 参考 requirements.txt
|
||||
|
||||
```
|
||||
langchain>=1.0.2
|
||||
langchain-community>=0.4
|
||||
unstructured[all-docs]
|
||||
openai>=1.14.3
|
||||
|
||||
# Milvus向量数据库
|
||||
pymilvus>=2.6.2
|
||||
langchain-milvus>=0.2.2
|
||||
```
|
||||
|
||||
### 3. Higress RAG mcp server config 配置
|
||||
|
||||
```yaml
|
||||
rag:
|
||||
splitter:
|
||||
provider: "nosplitter"
|
||||
chunk_size: 500
|
||||
chunk_overlap: 50
|
||||
threshold: 0.5
|
||||
top_k: 10
|
||||
|
||||
embedding:
|
||||
provider: "openai"
|
||||
base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
|
||||
api_key: "sk-xxx"
|
||||
model: "text-embedding-v4"
|
||||
dimensions: 1024
|
||||
|
||||
vector_db:
|
||||
provider: "milvus"
|
||||
host: "localhost"
|
||||
port: 19530
|
||||
database: "default"
|
||||
collection: "langchain_rag"
|
||||
mapping:
|
||||
# 字段映射配置:当标准字段名与 Milvus Collection 中实际字段名不一致时,需要通过 mapping 进行映射
|
||||
# standard_name: 系统内部使用的标准字段名(如 id, content, vector, metadata, created_at)
|
||||
# raw_name: milvus collection 中的实际字段名
|
||||
fields:
|
||||
- standard_name: "id"
|
||||
raw_name: "pk"
|
||||
properties:
|
||||
max_length: 256
|
||||
auto_id: false
|
||||
- standard_name: "content"
|
||||
raw_name: "text"
|
||||
properties:
|
||||
max_length: 8192
|
||||
- standard_name: "vector"
|
||||
raw_name: "vector"
|
||||
properties: {}
|
||||
- standard_name: "metadata"
|
||||
raw_name: "metadata"
|
||||
properties: {}
|
||||
index:
|
||||
index_type: "HNSW"
|
||||
params:
|
||||
M: 8
|
||||
ef_construction: 64
|
||||
search:
|
||||
metric_type: "IP"
|
||||
params:
|
||||
ef: 32
|
||||
```
|
||||
|
||||
### 4. 关于 langchain-milvus 对 Document metadata 处理
|
||||
|
||||
在使用 langchain-milvus 进行文档处理时,有两种处理 metadata 的方法:
|
||||
|
||||
#### 方法一:JSON 字符串存储(推荐)
|
||||
- **特点**:metadata 会被转换为 JSON 字符串存储在 Milvus 中,查询时会将 JSON 字符串转换为 Python 字典
|
||||
- **优势**:可以动态添加字段
|
||||
- **支持**:Higress RAG 支持读写操作
|
||||
|
||||
**配置步骤**:
|
||||
1. 初始化 Milvus 时,需要指定 `metadata_field` 参数为实际的字段名称(这里为 "metadata")
|
||||
2. 在 mapping 配置中添加 metadata 字段
|
||||
|
||||
**Python 代码示例**:
|
||||
```python
|
||||
Milvus(
|
||||
...
|
||||
metadata_field="metadata" # 自定义元数据字段名
|
||||
)
|
||||
```
|
||||
|
||||
**YAML 配置示例**:
|
||||
```yaml
|
||||
mapping:
|
||||
fields:
|
||||
- standard_name: "metadata"
|
||||
raw_name: "metadata"
|
||||
properties: {}
|
||||
```
|
||||
|
||||
#### 方法二:字段展开存储
|
||||
- **特点**:metadata 中的字段会直接展开,metadata 里的 key 会作为字段名存储在 Milvus 中
|
||||
- **限制**:不可以动态添加字段
|
||||
- **支持**:Higress RAG 只支持读操作
|
||||
|
||||
**配置步骤**:
|
||||
1. 初始化 Milvus 时,不需要指定 `metadata_field` 参数
|
||||
2. 在 mapping 配置中移除 metadata 字段
|
||||
|
||||
**推荐使用方法一**,因为它提供了更好的灵活性和完整的读写支持。
|
||||
|
||||
|
||||
### 5. Higress RAG MCP 插件和 CherryStudio 集成
|
||||
|
||||
Higress RAG MCP 插件和 CherryStudio 集成,实现基于 RAG 的智能问答功能。
|
||||
|
||||
**配置步骤**:
|
||||
|
||||
1. 在 CherryStudio 中配置 Higress RAG MCP 插件的 endpoint: `http://<higress-gateway>:<port>/mcp-servers/rag/sse`, 如下图:
|
||||
|
||||

|
||||
|
||||
2. 查看 CherryStudio 中配置 Higress RAG MCP 插件的 Tools 列表, 如下图:
|
||||
|
||||

|
||||
|
||||
**对话**:
|
||||
|
||||
在 CherryStudio 对话中, 添加 Higress RAG MCP 插件。然后在对话中就可以调用 Higress RAG MCP 插件的提供工具方法。如下图:
|
||||
|
||||

|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 287 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 146 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 210 KiB |
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/rag/textsplitter"
|
||||
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/rag/vectordb"
|
||||
"github.com/distribution/distribution/v3/uuid"
|
||||
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -31,6 +32,7 @@ type RAGClient struct {
|
||||
|
||||
// NewRAGClient creates a new RAG client instance
|
||||
func NewRAGClient(config *config.Config) (*RAGClient, error) {
|
||||
api.LogDebugf("RAG NewRAGClient: %+v", config)
|
||||
ragclient := &RAGClient{
|
||||
config: config,
|
||||
}
|
||||
@@ -40,12 +42,14 @@ func NewRAGClient(config *config.Config) (*RAGClient, error) {
|
||||
}
|
||||
ragclient.textSplitter = textSplitter
|
||||
|
||||
api.LogDebugf("RAG New Embedding Provider: %+v", ragclient.config.Embedding)
|
||||
embeddingProvider, err := embedding.NewEmbeddingProvider(ragclient.config.Embedding)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create embedding provider failed, err: %w", err)
|
||||
}
|
||||
ragclient.embeddingProvider = embeddingProvider
|
||||
|
||||
api.LogDebugf("RAG New LLM Provider: %+v", ragclient.config.LLM)
|
||||
if ragclient.config.LLM.Provider == "" {
|
||||
ragclient.llmProvider = nil
|
||||
} else {
|
||||
@@ -56,6 +60,7 @@ func NewRAGClient(config *config.Config) (*RAGClient, error) {
|
||||
ragclient.llmProvider = llmProvider
|
||||
}
|
||||
|
||||
api.LogDebugf("RAG New VectorDB Provider: %+v", ragclient.config.VectorDB)
|
||||
dim := ragclient.config.Embedding.Dimensions
|
||||
provider, err := vectordb.NewVectorDBProvider(&ragclient.config.VectorDB, dim)
|
||||
if err != nil {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/rag/config"
|
||||
"github.com/alibaba/higress/plugins/golang-filter/mcp-session/common"
|
||||
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
)
|
||||
|
||||
@@ -16,6 +17,7 @@ type RAGConfig struct {
|
||||
}
|
||||
|
||||
func init() {
|
||||
api.LogDebugf("RAG init")
|
||||
common.GlobalRegistry.RegisterServer("rag", &RAGConfig{
|
||||
config: &config.Config{
|
||||
RAG: config.RAGConfig{
|
||||
@@ -45,7 +47,7 @@ func init() {
|
||||
VectorDB: config.VectorDBConfig{
|
||||
Provider: "milvus",
|
||||
Host: "localhost",
|
||||
Port: 6379,
|
||||
Port: 19530,
|
||||
Database: "default",
|
||||
Collection: "rag",
|
||||
Username: "",
|
||||
@@ -98,7 +100,9 @@ func init() {
|
||||
}
|
||||
|
||||
func (c *RAGConfig) ParseConfig(cfg map[string]any) error {
|
||||
// Parse RAG configuration
|
||||
api.LogDebugf("RAG start to parse config: %+v", cfg)
|
||||
// Parse RAG con
|
||||
api.LogDebugf("RAG parse rag config")
|
||||
if ragConfig, ok := cfg["rag"].(map[string]any); ok {
|
||||
if splitter, exists := ragConfig["splitter"].(map[string]any); exists {
|
||||
if splitterType, exists := splitter["provider"].(string); exists {
|
||||
@@ -120,6 +124,7 @@ func (c *RAGConfig) ParseConfig(cfg map[string]any) error {
|
||||
}
|
||||
|
||||
// Parse Embedding configuration
|
||||
api.LogDebugf("RAG parse embedding config")
|
||||
if embeddingConfig, ok := cfg["embedding"].(map[string]any); ok {
|
||||
if provider, exists := embeddingConfig["provider"].(string); exists {
|
||||
c.config.Embedding.Provider = provider
|
||||
@@ -142,6 +147,7 @@ func (c *RAGConfig) ParseConfig(cfg map[string]any) error {
|
||||
}
|
||||
|
||||
// Parse llm configuration
|
||||
api.LogDebugf("RAG parse llm config")
|
||||
if llmConfig, ok := cfg["llm"].(map[string]any); ok {
|
||||
if provider, exists := llmConfig["provider"].(string); exists {
|
||||
c.config.LLM.Provider = provider
|
||||
@@ -164,6 +170,7 @@ func (c *RAGConfig) ParseConfig(cfg map[string]any) error {
|
||||
}
|
||||
|
||||
// Parse VectorDB configuration
|
||||
api.LogDebugf("RAG parse vectordb config")
|
||||
if vectordbConfig, ok := cfg["vectordb"].(map[string]any); ok {
|
||||
if provider, exists := vectordbConfig["provider"].(string); exists {
|
||||
c.config.VectorDB.Provider = provider
|
||||
@@ -241,10 +248,13 @@ func (c *RAGConfig) ParseConfig(cfg map[string]any) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
api.LogDebugf("RAG parse config successful with config:%+v", c.config)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *RAGConfig) NewServer(serverName string) (*common.MCPServer, error) {
|
||||
api.LogDebugf("RAG NewServer: %s", serverName)
|
||||
mcpServer := common.NewMCPServer(
|
||||
serverName,
|
||||
Version,
|
||||
@@ -252,11 +262,13 @@ func (c *RAGConfig) NewServer(serverName string) (*common.MCPServer, error) {
|
||||
)
|
||||
|
||||
// Initialize RAG client with configuration
|
||||
api.LogDebugf("RAG NewRAGClient: %+v", c.config)
|
||||
ragClient, err := NewRAGClient(c.config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create rag client failed, err: %w", err)
|
||||
}
|
||||
|
||||
api.LogDebugf("RAG start add tool")
|
||||
// Knowledge Base Management Tools
|
||||
mcpServer.AddTool(
|
||||
mcp.NewToolWithRawSchema("create-chunks-from-text", "Process and segment input text into semantic chunks for knowledge base ingestion", GetCreateChunkFromTextSchema()),
|
||||
@@ -284,6 +296,6 @@ func (c *RAGConfig) NewServer(serverName string) (*common.MCPServer, error) {
|
||||
mcp.NewToolWithRawSchema("chat", "Answer user questions by retrieving relevant knowledge from the database and generating responses using RAG-enhanced LLM", GetChatSchema()),
|
||||
HandleChat(ragClient),
|
||||
)
|
||||
|
||||
api.LogDebugf("RAG NewServer successful")
|
||||
return mcpServer, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user