Featured image of post 知识库向量化处理实战

知识库向量化处理实战

什么是 RAG?

RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合了信息检索和生成式 AI 的技术架构。它的核心思想是:

  1. 检索阶段:从知识库中检索与用户问题相关的内容
  2. 生成阶段:将检索到的内容作为上下文,让大模型生成答案
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
用户问题
┌─────────────────┐
│  向量数据库检索   │ ←── 知识库已向量化
└────────┬────────┘
┌─────────────────┐
│  构建 Prompt    │ ←── 系统提示词 + 检索内容 + 用户问题
└────────┬────────┘
┌─────────────────┐
│  大模型生成答案  │
└─────────────────┘

项目中的向量化架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
┌──────────────────────────────────────────────────────────────────────┐
│                      知识库上传流程                                   │
├──────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │ 文件上传     │───▶│ 文件解析    │───▶│ 去重检查    │             │
│  │ (RustFS)    │    │ (Tika)      │    │ (SHA-256)   │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                │                     │
│                                                ▼                     │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │ 元数据入库   │◀───│ 状态 PENDING│◀───│ 发送任务    │             │
│  │ (PostgreSQL)│    │             │    │ (Redis Stream)│            │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                                      │
└──────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────┐
│                      向量化消费流程                                   │
├──────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │ 消费消息    │───▶│ 文本分块    │───▶│ 添加元数据  │             │
│  │             │    │ (800 tokens)│    │ (kb_id)     │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                │                     │
│                                                ▼                     │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │ 分批存储    │◀───│ 向量化嵌入  │◀───│ 每批 ≤10    │             │
│  │ (VectorStore)│   │ (DashScope) │    │             │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                                      │
│  ┌─────────────┐    ┌─────────────┐                                │
│  │ 更新状态    │◀───│ COMPLETED   │                                │
│  │             │    │ /FAILED     │                                │
│  └─────────────┘    └─────────────┘                                │
│                                                                      │
└──────────────────────────────────────────────────────────────────────┘

核心实现

1. 知识库实体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Entity
@Table(name = "knowledge_bases", indexes = {
    @Index(name = "idx_kb_hash", columnList = "fileHash", unique = true)
})
public class KnowledgeBaseEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true, length = 64)
    private String fileHash;  // 用于去重

    @Column(nullable = false)
    private String name;

    @Enumerated(EnumType.STRING)
    @Column(length = 20)
    private VectorStatus vectorStatus = VectorStatus.PENDING;

    private Integer chunkCount = 0;  // 分块数量
}

2. 向量化状态枚举

1
2
3
4
5
6
public enum VectorStatus {
    PENDING,     // 待处理
    PROCESSING,  // 处理中
    COMPLETED,   // 完成
    FAILED       // 失败
}

3. 向量化服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Service
@Slf4j
public class KnowledgeBaseVectorService {

    private static final int MAX_BATCH_SIZE = 10;     // 每批最多 10 个
    private static final int CHUNK_MAX_SIZE = 800;   // 每个 chunk 最大 tokens

    private final VectorStore vectorStore;
    private final TextSplitter textSplitter;

    public int vectorizeAndStore(Long knowledgeBaseId, String content) {
        // 1. 删除旧向量数据
        deleteByKnowledgeBaseId(knowledgeBaseId);

        // 2. 文本分块
        List<Document> documents = doSplitDocuments(content);

        // 3. 添加元数据
        documents.forEach(doc -> doc.getMetadata().put("kb_id", knowledgeBaseId.toString()));

        // 4. 分批向量化
        int batchCount = vectorBatchAdd(documents);
        return batchCount;
    }

    private int vectorBatchAdd(List<Document> documents) {
        int totalChunks = documents.size();
        int batchCount = (totalChunks + MAX_BATCH_SIZE - 1) / MAX_BATCH_SIZE;

        for (int i = 0; i < batchCount; i++) {
            int start = i * MAX_BATCH_SIZE;
            int end = Math.min(start + MAX_BATCH_SIZE, documents.size());
            vectorStore.add(documents.subList(start, end));
        }
        return totalChunks;
    }

    private List<Document> doSplitDocuments(String content) {
        List<Document> documents = new ArrayList<>();
        int length = content.length();
        int step = CHUNK_MAX_SIZE - CHUNK_CONTEXT_SIZE;

        for (int i = 0; i < length; i += step) {
            int end = Math.min(i + CHUNK_MAX_SIZE, length);
            documents.add(new Document(content.substring(i, end)));
        }
        return documents;
    }
}

4. Redis Stream 异步处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 生产者
@Service
public class VectorizeStreamProducer extends AbstractStreamProducer<VectorizeTaskPayload> {

    public void sendVectorizeTask(Long id, String content) {
        sendTask(new VectorizeTaskPayload(id, content));
    }

    @Override
    protected String streamKey() {
        return "knowledgebase:vectorize:stream";
    }
}

// 消费者
@Service
public class VectorizeStreamConsumer extends AbstractStreamConsumer<VectorizeTaskPayload> {

    private final KnowledgeBaseVectorService vectorService;

    @Override
    protected void processBusiness(VectorizeTaskPayload payload) {
        int chunks = vectorService.vectorizeAndStore(payload.kbId(), payload.content());
        // 更新分块数
        knowledgeRepository.findById(payload.kbId).ifPresent(entity -> {
            entity.setChunkCount(chunks);
            knowledgeRepository.save(entity);
        });
    }
}

文本分块策略

固定长度分块

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private List<Document> doSplitDocuments(String content) {
    List<Document> documents = new ArrayList<>();
    int chunkSize = 800;
    int overlap = 200;  // 重叠区域

    for (int i = 0; i < content.length(); i += (chunkSize - overlap)) {
        int end = Math.min(i + chunkSize, content.length());
        String chunk = content.substring(i, end);
        documents.add(new Document(chunk));
    }
    return documents;
}

TokenTextSplitter(推荐)

Spring AI 提供了基于 token 的分块器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Service
public class KnowledgeBaseVectorService {

    private final TextSplitter textSplitter;

    public KnowledgeBaseVectorService(VectorStore vectorStore, TextSplitter textSplitter) {
        this.textSplitter = new TokenTextSplitter.Builder()
            .withChunkSize(800)        // 每块 token 数
            .withMinChunkSizeChars(100) // 最小字符数
            .withMaxNumChunks(100)      // 最大块数
            .build();
    }
}

按段落分块

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
private List<Document> splitByParagraphs(String content) {
    List<Document> documents = new ArrayList<>();
    // 按换行符分割段落
    String[] paragraphs = content.split("\\n\\s*\\n");

    StringBuilder currentChunk = new StringBuilder();
    for (String para : paragraphs) {
        if (currentChunk.length() + para.length() > MAX_CHUNK_SIZE) {
            if (currentChunk.length() > 0) {
                documents.add(new Document(currentChunk.toString().trim()));
                currentChunk = new StringBuilder();
            }
        }
        currentChunk.append(para).append("\n\n");
    }
    if (currentChunk.length() > 0) {
        documents.add(new Document(currentChunk.toString().trim()));
    }
    return documents;
}

元数据设计

知识库元数据

1
2
3
4
5
6
7
documents.forEach(doc -> {
    Map<String, Object> metadata = doc.getMetadata();
    metadata.put("kb_id", knowledgeBaseId.toString());
    metadata.put("kb_name", knowledgeBase.getName());
    metadata.put("category", knowledgeBase.getCategory());
    metadata.put("uploaded_at", knowledgeBase.getUploadedAt().toString());
});

查询时过滤

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 只查询特定知识库的内容
VectorStoreFilter filter = VectorStoreFilter.builder()
    .key("kb_id")
    .is(knowledgeBaseId.toString())
    .build();

List<Document> results = vectorStore.similaritySearch(
    SearchRequest.builder()
        .query(userQuestion)
        .topK(5)
        .filter(filter)
        .build()
);

向量数据库集成

PostgreSQL + pgvector

Spring AI 与 PostgreSQL + pgvector 集成:

1
2
3
4
5
6
7
spring:
  ai:
    vectorstore:
      pgvector:
        index-type: HNSW  # HNSW 或 IVF_FLAT
        distance-type: COSINE_DISTANCE  # COSINE_DISTANCE 或 EUCLIDEAN_DISTANCE
        dimensions: 1536  # OpenAI embedding 维度

自定义 VectorStore

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Configuration
public class VectorStoreConfig {

    @Bean
    public VectorStore vectorStore(DataSource dataSource, EntityManagerFactory entityManagerFactory) {
        return SpringJdbcVectorStore.builder(dataSource)
            .configureBuilder(builder -> builder
                .getCommonBuilder()
                .withDistanceType(VectorStoreCommonBuilder.DistanceType.COSINE_DISTANCE)
            )
            .build();
    }
}

相似度搜索

基础搜索

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Service
public class RagService {

    private final VectorStore vectorStore;

    public List<String> searchSimilar(String query, int topK) {
        List<Document> results = vectorStore.similaritySearch(
            SearchRequest.builder()
                .query(query)
                .topK(topK)
                .build()
        );
        return results.stream()
            .map(Document::getContent)
            .toList();
    }
}

带过滤条件的搜索

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public List<String> searchByKnowledgeBase(String query, Long knowledgeBaseId) {
    VectorStoreFilter filter = VectorStoreFilter.builder()
        .key("kb_id")
        .is(knowledgeBaseId.toString())
        .build();

    List<Document> results = vectorStore.similaritySearch(
        SearchRequest.builder()
            .query(query)
            .topK(5)
            .filter(filter)
            .build()
    );
    return results.stream().map(Document::getContent).toList();
}

RAG 问答实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Service
@Slf4j
public class RagQuestionService {

    private final VectorStore vectorStore;
    private final ChatClient chatClient;

    public String answer(String question, Long knowledgeBaseId) {
        // 1. 检索相关知识
        List<String> relevantDocs = searchByKnowledgeBase(question, knowledgeBaseId);

        if (relevantDocs.isEmpty()) {
            return "抱歉,未找到相关内容。";
        }

        // 2. 构建上下文
        String context = relevantDocs.stream()
            .collect(Collectors.joining("\n---\n"));

        // 3. 构建 Prompt
        String prompt = String.format("""
            基于以下知识库内容回答用户问题。如果知识库中没有相关信息,请说明无法回答。

            知识库内容:
            %s

            用户问题:%s
            """, context, question);

        // 4. 调用大模型
        return chatClient.prompt()
            .system("你是一个专业的知识库问答助手。")
            .user(prompt)
            .call()
            .content();
    }
}

性能优化

1. 批量向量化限制

1
2
// DashScope API 限制每批最多 10 条
private static final int MAX_BATCH_SIZE = 10;

2. 异步处理

1
2
// 使用 Redis Stream 异步处理,避免阻塞请求
vectorizeStreamProducer.sendVectorizeTask(knowledgeBaseId, content);

3. 预加载向量

1
2
3
4
5
6
7
8
// 启动时预加载常用知识库
@PostConstruct
public void preloadKnowledge() {
    List<KnowledgeBaseEntity> kbs = knowledgeRepository.findByVectorStatus(VectorStatus.PENDING);
    for (KnowledgeBaseEntity kb : kbs) {
        vectorizeStreamProducer.sendVectorizeTask(kb.getId(), kb.getContent());
    }
}

4. 向量索引优化

1
2
3
4
5
-- PostgreSQL pgvector 索引
CREATE INDEX idx_vector_store_embedding
ON vector_store
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

向量删除处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Transactional
public int deleteByKnowledgeBaseId(Long knowledgeBaseId) {
    // 使用 JdbcTemplate 直接操作向量表
    String delSql = """
        DELETE FROM vector_store
        WHERE metadata->>'kb_id' = ?
           OR (metadata->>'kb_id_long' IS NOT NULL AND (metadata->>'kb_id_long')::bigint = ?)
        """;
    return jdbcTemplate.update(delSql, knowledgeBaseId.toString(), knowledgeBaseId);
}

常见问题

1. 向量化失败

  • 检查 API 配额
  • 确认文本不为空
  • 分块大小是否合理

2. 搜索结果不准确

  • 调整 topK 值
  • 优化分块策略
  • 使用更好的 embedding 模型

3. 性能问题

  • 使用 HNSW 索引
  • 批量处理
  • 异步预加载

总结

本项目的向量化处理流程:

  1. 上传 → Tika 解析文本 → SHA-256 去重
  2. 异步 → Redis Stream 队列 → 消费者处理
  3. 分块 → 固定大小分块(800 tokens)
  4. 存储 → 添加 kb_id 元数据 → 分批向量化
  5. 检索 → 相似度搜索 → RAG 问答

通过这套流程,实现了:

  • 知识库的自动向量化
  • 异步处理,不阻塞主流程
  • 精确的向量过滤(按知识库 ID)
  • 完整的错误处理和重试机制
使用 Hugo 构建
主题 StackJimmy 设计

发布了 32 篇文章 | 共 75016 字