Featured image of post 知识库向量化处理实战

知识库向量化处理实战

什么是 RAG?

RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合了信息检索和生成式 AI 的技术架构。它的核心思想是:

  1. 检索阶段:从知识库中检索与用户问题相关的内容
  2. 生成阶段:将检索到的内容作为上下文,让大模型生成答案
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
用户问题
┌─────────────────┐
│  向量数据库检索   │ ←── 知识库已向量化
└────────┬────────┘
┌─────────────────┐
│  构建 Prompt    │ ←── 系统提示词 + 检索内容 + 用户问题
└────────┬────────┘
┌─────────────────┐
│  大模型生成答案  │
└─────────────────┘

项目中的向量化架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
┌──────────────────────────────────────────────────────────────────────┐
│                      知识库上传流程                                   │
├──────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │ 文件上传     │───▶│ 文件解析    │───▶│ 去重检查    │             │
│  │ (RustFS)    │    │ (Tika)      │    │ (SHA-256)   │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                │                     │
│                                                ▼                     │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │ 元数据入库   │◀───│ 状态 PENDING│◀───│ 发送任务    │             │
│  │ (PostgreSQL)│    │             │    │ (Redis Stream)│            │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                                      │
└──────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────┐
│                      向量化消费流程                                   │
├──────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │ 消费消息    │───▶│ 文本分块    │───▶│ 添加元数据  │             │
│  │             │    │ (800 tokens)│    │ (kb_id)     │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                │                     │
│                                                ▼                     │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐             │
│  │ 分批存储    │◀───│ 向量化嵌入  │◀───│ 每批 ≤10    │             │
│  │ (VectorStore)│   │ (DashScope) │    │             │             │
│  └─────────────┘    └─────────────┘    └─────────────┘             │
│                                                                      │
│  ┌─────────────┐    ┌─────────────┐                                │
│  │ 更新状态    │◀───│ COMPLETED   │                                │
│  │             │    │ /FAILED     │                                │
│  └─────────────┘    └─────────────┘                                │
│                                                                      │
└──────────────────────────────────────────────────────────────────────┘

核心实现

1. 知识库实体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Entity
@Table(name = "knowledge_bases", indexes = {
    @Index(name = "idx_kb_hash", columnList = "fileHash", unique = true)
})
public class KnowledgeBaseEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true, length = 64)
    private String fileHash;  // 用于去重

    @Column(nullable = false)
    private String name;

    @Column(columnDefinition = "TEXT")
    private String content;  // 解析后的文本内容

    @Enumerated(EnumType.STRING)
    @Column(length = 20)
    private VectorStatus vectorStatus = VectorStatus.PENDING;

    private Integer chunkCount = 0;  // 分块数量

    private String category;  // 分类

    private String originalFilename;  // 原始文件名

    private String contentType;  // 文件类型
}

2. 向量化状态枚举

1
2
3
4
5
6
public enum VectorStatus {
    PENDING,     // 待处理
    PROCESSING,  // 处理中
    COMPLETED,   // 完成
    FAILED       // 失败
}

3. 向量化服务

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@Service
@Slf4j
public class KnowledgeBaseVectorService {

    private static final int MAX_BATCH_SIZE = 10;
    private static final int CHUNK_MAX_SIZE = 800;
    private static final int CHUNK_CONTEXT_SIZE = 200;

    private final VectorStore vectorStore;
    private final TextSplitter textSplitter;
    private final VectorRepository vectorRepository;

    public KnowledgeBaseVectorService(VectorStore vectorStore, TextSplitter textSplitter, VectorRepository vectorRepository) {
        this.vectorStore = vectorStore;
        // 使用 TokenTextSplitter 默认配置
        this.textSplitter = TokenTextSplitter.builder().build();
        this.vectorRepository = vectorRepository;
    }

    public int vectorizeAndStore(Long knowledgeBaseId, String content) {
        log.info("开始向量化知识库: kbId={}, contentLength={}", knowledgeBaseId, content.length());
        try {
            // 1. 先删除该知识库的旧向量数据
            deleteByKnowledgeBaseId(knowledgeBaseId);

            // 2. 将文本分块
            List<Document> documents = doSplitDocuments(content);
            if (documents.isEmpty()) {
                log.debug("知识库{}内容为空", knowledgeBaseId);
                throw new BusinessException(ErrorCode.KNOWLEDGE_BASE_VECTORIZATION_FAILED, "知识库内容为空!");
            }
            int chunks = documents.size();
            log.info("文本分块完成: {} 个chunks", chunks);

            // 3. 为每个chunk添加metadata(知识库ID)
            documents.forEach(document -> {
                document.getMetadata().put("kb_id", knowledgeBaseId.toString());
            });

            // 4. 分批向量化并存储(阿里云 DashScope API 限制 batch size <= 10)
            int batchCount = vectorBatchAdd(documents);
            log.info("知识库向量化完成: kbId={}, chunks={}, batches={}", knowledgeBaseId, chunks, batchCount);
            return batchCount;
        } catch (BusinessException e) {
            log.error("向量化知识库失败: kbId={}, error={}", knowledgeBaseId, e.getMessage(), e);
            throw e;
        }
    }

    private int vectorBatchAdd(List<Document> documents) {
        int totalChunks = documents.size();
        int batchCount = (totalChunks + MAX_BATCH_SIZE - 1) / MAX_BATCH_SIZE;
        log.info("开始分批向量化: 总共 {} 个chunks,分 {} 批处理,每批最多 {} 个", totalChunks, batchCount, MAX_BATCH_SIZE);
        for (int i = 0; i < batchCount; i++) {
            int start = i * MAX_BATCH_SIZE;
            int end = Math.min(start + MAX_BATCH_SIZE, documents.size());
            log.debug("处理第 {}/{} 批: chunks {}-{}", i + 1, batchCount, start, end);
            vectorStore.add(documents.subList(start, end));
        }
        return totalChunks;
    }

    private void deleteByKnowledgeBaseId(Long knowledgeBaseId) {
        try {
            vectorRepository.deleteByKnowledgeBaseId(knowledgeBaseId);
        } catch (Exception e) {
            log.error("删除向量数据失败: kbId={}, error={}", knowledgeBaseId, e.getMessage(), e);
        }
    }

    private List<Document> doSplitDocuments(String content) {
        if (Func.isEmpty(content)) {
            return Collections.emptyList();
        }
        List<Document> documents = new ArrayList<>();
        int length = content.length();
        int step = CHUNK_MAX_SIZE - CHUNK_CONTEXT_SIZE; // 步长

        for (int i = 0; i < length; i += step) {
            int end = Math.min(i + CHUNK_MAX_SIZE, length);
            String chunk = content.substring(i, end);
            documents.add(new Document(chunk));
        }
        return documents;
    }

    public List<Document> similaritySearch(String query, List<Long> knowledgeIds, int topK, double minScore) {
        log.info("向量相似度搜索: query={}, kbIds={}, topK={}, minScore={}", query, knowledgeIds, topK, minScore);

        try {
            SearchRequest.Builder builder = SearchRequest.builder()
                    .query(query)
                    .topK(Math.max(topK, 1));

            if (minScore > 0) {
                builder.similarityThreshold(minScore);
            }
            if (!Func.isEmpty(knowledgeIds)) {
                builder.filterExpression(buildKbFilterExpression(knowledgeIds));
            }
            List<Document> documents = vectorStore.similaritySearch(builder.build());
            if (Func.isEmpty(documents)) {
                return List.of();
            }
            return documents.stream().limit(topK).toList();
        } catch (Exception e) {
            log.warn("向量搜索前置过滤失败,回退到本地过滤: {}", e.getMessage());
            return similaritySearchFallback(query, knowledgeIds, topK, minScore);
        }
    }

    private List<Document> similaritySearchFallback(String query, List<Long> knowledgeIds, int topK, double minScore) {
        try {
            SearchRequest.Builder builder = SearchRequest.builder()
                    .query(query)
                    .topK(Math.max(topK * 3, 1));

            if (minScore > 0) {
                builder.similarityThreshold(minScore);
            }

            List<Document> documents = vectorStore.similaritySearch(builder.build());

            if (!Func.isEmpty(knowledgeIds)) {
                documents = documents.stream()
                        .filter(document -> isDocInKnowledgeBases(document, knowledgeIds))
                        .toList();
            }

            return documents.stream().limit(topK).toList();
        } catch (Exception e) {
            log.error("向量搜索失败: {}", e.getMessage(), e);
            throw new BusinessException(ErrorCode.KNOWLEDGE_BASE_QUERY_FAILED,
                    "向量搜索失败: " + e.getMessage());
        }
    }

    private boolean isDocInKnowledgeBases(Document document, List<Long> knowledgeIds) {
        Map<String, Object> metadata = document.getMetadata();
        Long kbId = Long.valueOf(metadata.get("kb_id").toString());
        return knowledgeIds.contains(kbId);
    }

    private String buildKbFilterExpression(List<Long> knowledgeIds) {
        String ids = knowledgeIds.stream().filter(Objects::nonNull).map(id -> "'" + id + "'").collect(Collectors.joining(",")).trim();
        return "kb_id in [" + ids + "]";
    }
}

4. Redis Stream 异步处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 生产者
@Service
public class VectorizeStreamProducer extends AbstractStreamProducer<VectorizeTaskPayload> {

    public void sendVectorizeTask(Long id, String content) {
        sendTask(new VectorizeTaskPayload(id, content));
    }

    @Override
    protected String streamKey() {
        return "knowledgebase:vectorize:stream";
    }
}

// 消费者
@Service
public class VectorizeStreamConsumer extends AbstractStreamConsumer<VectorizeTaskPayload> {

    private final KnowledgeBaseVectorService vectorService;

    @Override
    protected void processBusiness(VectorizeTaskPayload payload) {
        int chunks = vectorService.vectorizeAndStore(payload.kbId(), payload.content());
        // 更新分块数
        knowledgeRepository.findById(payload.kbId).ifPresent(entity -> {
            entity.setChunkCount(chunks);
            knowledgeRepository.save(entity);
        });
    }
}

5. 向量删除处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Repository
public class VectorRepository {

    private final JdbcTemplate jdbcTemplate;

    public int deleteByKnowledgeBaseId(Long knowledgeBaseId) {
        String delSql = """
            DELETE FROM vector_store
            WHERE metadata->>'kb_id' = ?
               OR (metadata->>'kb_id_long' IS NOT NULL AND (metadata->>'kb_id_long')::bigint = ?)
            """;
        return jdbcTemplate.update(delSql, knowledgeBaseId.toString(), knowledgeBaseId);
    }
}

文本分块策略

固定长度分块(项目实际使用)

项目采用固定字符数分块,步长 = chunk大小 - 重叠大小:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private List<Document> doSplitDocuments(String content) {
    if (Func.isEmpty(content)) {
        return Collections.emptyList();
    }
    List<Document> documents = new ArrayList<>();
    int length = content.length();
    int step = CHUNK_MAX_SIZE - CHUNK_CONTEXT_SIZE; // 步长 = 800 - 200 = 600

    for (int i = 0; i < length; i += step) {
        int end = Math.min(i + CHUNK_MAX_SIZE, length);
        String chunk = content.substring(i, end);
        documents.add(new Document(chunk));
    }
    return documents;
}

TokenTextSplitter(Spring AI 推荐)

Spring AI 提供了基于 token 的分块器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 在构造方法中配置
public KnowledgeBaseVectorService(VectorStore vectorStore, TextSplitter textSplitter, VectorRepository vectorRepository) {
    this.vectorStore = vectorStore;
    // 使用 TokenTextSplitter.Builder 配置
    this.textSplitter = TokenTextSplitter.builder()
        .withChunkSize(800)        // 每块 token 数
        .withMinChunkSizeChars(100) // 最小字符数
        .withMaxNumChunks(100)      // 最大块数
        .build();
    this.vectorRepository = vectorRepository;
}

按段落分块

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
private List<Document> splitByParagraphs(String content) {
    List<Document> documents = new ArrayList<>();
    // 按换行符分割段落
    String[] paragraphs = content.split("\\n\\s*\\n");

    StringBuilder currentChunk = new StringBuilder();
    for (String para : paragraphs) {
        if (currentChunk.length() + para.length() > MAX_CHUNK_SIZE) {
            if (currentChunk.length() > 0) {
                documents.add(new Document(currentChunk.toString().trim()));
                currentChunk = new StringBuilder();
            }
        }
        currentChunk.append(para).append("\n\n");
    }
    if (currentChunk.length() > 0) {
        documents.add(new Document(currentChunk.toString().trim()));
    }
    return documents;
}

RAG 问答实现

KnowledgeBaseQueryService

RAG 问答服务是整个系统的核心,提供流式和普通两种问答方式:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
@Service
@Slf4j
public class KnowledgeBaseQueryService {

    private static final String NO_RESULT_RESPONSE = "抱歉,在选定的知识库中未检索到相关信息。请换一个更具体的关键词或补充上下文后再试。";
    private static final Pattern SHORT_TOKEN_PATTERN = Pattern.compile("^[\\p{L}\\p{N}_-]{2,20}$");
    // 中文疑问前缀:提取核心词
    private static final Pattern ZH_QUESTION_PREFIX = Pattern.compile(
            "^(?:什么是|如何|怎么|怎样|为什么|什么叫|什么叫做|讲一下|解释一下|介绍一下|说一下|谈谈|描述)(.+)$");
    // 中文疑问后缀:提取核心词
    private static final Pattern ZH_QUESTION_SUFFIX = Pattern.compile(
            "^(.+?)(?:是什么|怎么样|如何|有哪些|有什么|是啥|是干什么的).*$");
    private static final int STREAM_PROBE_CHARS = 120;

    private final KnowledgeBaseVectorService vectorService;
    private final KnowledgeBaseListService listService;
    private final ChatClient chatClient;
    private final PromptTemplate sysPromptTemplate;
    private final PromptTemplate userPromptTemplate;
    private final PromptTemplate reWritePromptTemplate;

    // Query Rewrite 配置
    private final boolean reWriteEnable;
    private final int shortQueryLength;

    // 动态 topK 配置
    private final int topKShort;
    private final int topKMedium;
    private final int topKLong;
    private final double minScoreShort;
    private final double minScoreDefault;

    public KnowledgeBaseQueryService(KnowledgeBaseVectorService vectorService,
            KnowledgeBaseListService listService,
            ChatClient.Builder builder,
            KnowledgeBaseQueryProperties queryProperties,
            ResourceLoader resourceLoader) {
        this.vectorService = vectorService;
        this.listService = listService;
        this.chatClient = builder.build();
        this.sysPromptTemplate = new PromptTemplate(resourceLoader.getResource(queryProperties.getSystemPromptPath()));
        this.userPromptTemplate = new PromptTemplate(resourceLoader.getResource(queryProperties.getUserPromptPath()));
        this.reWritePromptTemplate = new PromptTemplate(resourceLoader.getResource(queryProperties.getRewritePromptPath()));
        this.reWriteEnable = queryProperties.getRewrite().isEnabled();
        this.shortQueryLength = queryProperties.getSearch().getShortQueryLength();
        this.topKShort = queryProperties.getSearch().getTopkShort();
        this.topKMedium = queryProperties.getSearch().getTopkMedium();
        this.topKLong = queryProperties.getSearch().getTopkLong();
        this.minScoreShort = queryProperties.getSearch().getMinScoreShort();
        this.minScoreDefault = queryProperties.getSearch().getMinScoreDefault();
    }

    /**
     * 流式查询知识库(SSE)
     */
    public Flux<String> answerQuestionStream(List<Long> knowledgeBaseIds, String question) {
        try {
            log.info("收到知识库流式提问: kbIds={}, question={}", knowledgeBaseIds, question);

            if (knowledgeBaseIds == null || knowledgeBaseIds.isEmpty() || normalizeQuestion(question).isBlank()) {
                return Flux.just(NO_RESULT_RESPONSE);
            }

            // 1. 验证知识库是否存在并更新问题计数
            listService.updateQuestionCounts(knowledgeBaseIds);

            // 2. Query rewrite + 动态参数检索
            QueryContext queryContext = buildQueryContext(question);
            List<Document> documents = retrieveRelevantDocs(queryContext, knowledgeBaseIds);

            if (!hasEffectiveHit(question, documents)) {
                return Flux.just(NO_RESULT_RESPONSE);
            }

            // 3. 构建上下文
            String context = documents.stream()
                    .map(Document::getText)
                    .collect(Collectors.joining("\n\n---\n\n"));

            log.debug("检索到 {} 个相关文档片段", documents.size());

            // 4. 构建提示词
            String systemPrompt = buildSystemPrompt();
            String userPrompt = buildUserPrompt(context, queryContext.candidateQueries.getFirst());

            // 5. 流式调用 + 探测窗口归一化
            Flux<String> responseFlux = chatClient.prompt()
                    .system(systemPrompt)
                    .user(userPrompt)
                    .stream()
                    .content();
            log.info("开始流式输出知识库回答(探测窗口): kbIds={}", knowledgeBaseIds);
            return normalizeStreamOutput(responseFlux)
                    .doOnComplete(() -> log.info("流式输出完成: kbIds={}", knowledgeBaseIds))
                    .onErrorResume(e -> {
                        log.error("流式输出失败: kbIds={}, error={}", knowledgeBaseIds, e.getMessage(), e);
                        return Flux.just("【错误】知识库查询失败:AI服务暂时不可用,请稍后重试。");
                    });

        } catch (Exception e) {
            log.error("知识库流式问答失败: {}", e.getMessage(), e);
            return Flux.just("【错误】知识库查询失败:" + e.getMessage());
        }
    }

    /**
     * 普通查询知识库(非流式)
     */
    public String answerQuestion(List<Long> knowledgeIds, String question) {
        log.info("收到知识库提问: kbIds={}, question={}", knowledgeIds, question);
        if (knowledgeIds == null || knowledgeIds.isEmpty() || normalizeQuestion(question).isBlank()) {
            return NO_RESULT_RESPONSE;
        }

        // 1. 验证知识库是否存在并更新问题计数
        listService.updateQuestionCounts(knowledgeIds);

        // 2. Query rewrite + 动态参数检索
        QueryContext queryContext = buildQueryContext(question);

        // 3. 向量检索
        List<Document> relevantDocs = retrieveRelevantDocs(queryContext, knowledgeIds);
        if (!hasEffectiveHit(question, relevantDocs)) {
            return NO_RESULT_RESPONSE;
        }

        // 4. 构建上下文
        String context = relevantDocs.stream()
                .map(Document::getText)
                .collect(Collectors.joining("\n\n---\n\n"));
        log.debug("检索到 {} 个相关文档片段", relevantDocs.size());

        // 5. 构建提示词
        String systemPrompt = buildSystemPrompt();
        String userPrompt = buildUserPrompt(context, queryContext.candidateQueries.getFirst());

        // 6. 调用大模型
        try {
            String content = chatClient
                    .prompt()
                    .system(systemPrompt)
                    .user(userPrompt)
                    .call()
                    .content();
            content = normalizeAnswer(content);
            return content;
        } catch (Exception e) {
            log.error("知识库问答失败: {}", e.getMessage(), e);
            throw new BusinessException(ErrorCode.KNOWLEDGE_BASE_QUERY_FAILED, "知识库查询失败:" + e.getMessage());
        }
    }

    /**
     * 流式输出归一化处理
     * - 探测前 120 字符,快速识别"无信息"模板
     * - 命中无信息:立即输出固定模板并结束
     * - 非无信息:尽快释放缓冲并实时透传
     */
    private Flux<String> normalizeStreamOutput(Flux<String> rawFlux) {
        return Flux.create(sink -> {
            StringBuilder probeBuffer = new StringBuilder();
            AtomicBoolean passthrough = new AtomicBoolean(false);
            AtomicBoolean completed = new AtomicBoolean(false);
            final Disposable[] disposableRef = new Disposable[1];

            disposableRef[0] = rawFlux.subscribe(
                    chunk -> {
                        if (completed.get() || sink.isCancelled()) {
                            return;
                        }
                        if (passthrough.get()) {
                            sink.next(chunk);
                            return;
                        }

                        probeBuffer.append(chunk);
                        String probeText = probeBuffer.toString();
                        if (isNoResultLike(probeText)) {
                            completed.set(true);
                            sink.next(NO_RESULT_RESPONSE);
                            sink.complete();
                            if (disposableRef[0] != null) {
                                disposableRef[0].dispose();
                            }
                            return;
                        }

                        if (probeBuffer.length() >= STREAM_PROBE_CHARS) {
                            passthrough.set(true);
                            sink.next(probeText);
                            probeBuffer.setLength(0);
                        }
                    },
                    sink::error,
                    () -> {
                        if (completed.get() || sink.isCancelled()) {
                            return;
                        }
                        if (!passthrough.get()) {
                            sink.next(normalizeAnswer(probeBuffer.toString()));
                        }
                        sink.complete();
                    }
            );

            sink.onCancel(() -> {
                if (disposableRef[0] != null) {
                    disposableRef[0].dispose();
                }
            });
        });
    }

    private String normalizeAnswer(String answer) {
        if (answer == null || answer.isBlank()) {
            return NO_RESULT_RESPONSE;
        }
        String normalized = answer.trim();
        if (isNoResultLike(normalized)) {
            return NO_RESULT_RESPONSE;
        }
        return normalized;
    }

    private boolean isNoResultLike(String text) {
        return text.contains("没有找到相关信息")
                || text.contains("未检索到相关信息")
                || text.contains("信息不足")
                || text.contains("超出知识库范围")
                || text.contains("无法根据提供内容回答");
    }

    private String buildUserPrompt(String context, String question) {
        Map<String, Object> map = Map.of("context", context, "question", question);
        return userPromptTemplate.render(map);
    }

    private String buildSystemPrompt() {
        return sysPromptTemplate.render();
    }

    /**
     * 多候选查询检索相关文档
     */
    private List<Document> retrieveRelevantDocs(QueryContext queryContext, List<Long> knowledgeIds) {
        for (String candidateQuery : queryContext.candidateQueries) {
            if (candidateQuery.isBlank()) {
                continue;
            }
            List<Document> docs = vectorService.similaritySearch(
                    candidateQuery,
                    knowledgeIds,
                    queryContext.searchParams().topK(),
                    queryContext.searchParams().minScore()
            );
            log.info("检索候选 query='{}',命中 {} 条", candidateQuery, docs.size());
            // 判断查询结果是否命中用户提问的核心词
            if (hasEffectiveHit(candidateQuery, docs)) {
                return docs;
            }
        }
        return List.of();
    }

    /**
     * 判断检索结果是否有效命中
     * 通过提取中文问句的核心词进行字面匹配
     */
    private boolean hasEffectiveHit(String candidateQuery, List<Document> docs) {
        if (Func.isEmpty(candidateQuery) || Func.isEmpty(docs)) {
            return false;
        }
        String question = normalizeQuestion(candidateQuery);
        String coreTerm = extractCoreTerm(question).toLowerCase();
        for (Document doc : docs) {
            String text = doc.getText();
            if (!Func.isEmpty(text) && text.toLowerCase().contains(coreTerm)) {
                return true;
            }
        }
        return false;
    }

    /**
     * 提取中文问句的核心词
     * "什么是进程" → "进程"
     * "进程是什么" → "进程"
     */
    private String extractCoreTerm(String question) {
        Matcher m = ZH_QUESTION_PREFIX.matcher(question);
        if (m.matches()) {
            return m.group(1).trim();
        }
        m = ZH_QUESTION_SUFFIX.matcher(question);
        if (m.matches()) {
            return m.group(1).trim();
        }
        return question;
    }

    /**
     * 构建查询上下文(Query Rewrite + 动态参数)
     */
    private QueryContext buildQueryContext(String question) {
        String normalizeQuestion = normalizeQuestion(question);
        String rewrittenQuestion = rewriteQuestion(normalizeQuestion);

        Set<String> candidates = new LinkedHashSet<>();
        candidates.add(rewrittenQuestion);
        candidates.add(normalizeQuestion);

        SearchParams searchParams = resolveSearchParams(normalizeQuestion);
        return new QueryContext(normalizeQuestion, new ArrayList<>(candidates), searchParams);
    }

    /**
     * 根据问题长度动态调整搜索参数
     * 短查询:检索更多文档,降低阈值
     * 长查询:检索较少文档
     */
    private SearchParams resolveSearchParams(String question) {
        int compactLength = question.replaceAll("\\s+", "").length();
        if (compactLength <= shortQueryLength) {
            return new SearchParams(topKShort, minScoreDefault);
        }
        if (compactLength <= 12) {
            return new SearchParams(topKMedium, minScoreDefault);
        }
        return new SearchParams(topKLong, minScoreDefault);
    }

    /**
     * Query Rewrite - 对用户问题进行重构丰富
     */
    private String rewriteQuestion(String normalizeQuestion) {
        if (normalizeQuestion.isBlank() || !reWriteEnable) {
            return normalizeQuestion;
        }
        try {
            Map<String, Object> param = Map.of("question", normalizeQuestion);
            String render = reWritePromptTemplate.render(param);
            String rewritten = chatClient.prompt()
                    .user(render)
                    .call()
                    .content();
            if (rewritten == null || rewritten.isBlank()) {
                return normalizeQuestion;
            }
            log.info("Query rewrite: origin='{}', rewritten='{}'", normalizeQuestion, rewritten);
            return rewritten.trim();
        } catch (Exception e) {
            log.warn("Query rewrite 失败,使用原问题继续检索: {}", e.getMessage());
        }
        return normalizeQuestion;
    }

    private String normalizeQuestion(String question) {
        return question == null ? "" : question.trim();
    }

    private record SearchParams(int topK, double minScore) {}
    private record QueryContext(String originalQuestion, List<String> candidateQueries, SearchParams searchParams) {}
}

请求与响应模型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 请求:支持多知识库联合检索
public record QueryRequest(
        @NotEmpty(message = "至少选择一个知识库")
        List<Long> knowledgeBaseIds,

        @NotBlank(message = "问题不能为空")
        String question
) {
    // 兼容单个知识库ID的构造方式
    public QueryRequest(Long knowledgeBaseId, String question) {
        this(List.of(knowledgeBaseId), question);
    }
}

// 响应
public record QueryResponse(
        String answer,              // AI 生成的答案
        Long knowledgeBaseId,       // 主知识库 ID
        String knowledgeBaseName    // 知识库名称(逗号分隔)
) {}

API 接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
@RestController
@RequestMapping("/api/knowledgebase")
@RequiredArgsConstructor
@Tag(name = "知识库管理", description = "知识库上传、下载、查询、分类与向量化")
public class KnowledgeBaseController {

    private final KnowledgeBaseListService listService;
    private final KnowledgeBaseDeleteService deleteService;
    private final KnowledgeBaseUploadService uploadService;
    private final KnowledgeBaseQueryService queryService;

    /**
     * 普通查询
     */
    @PostMapping("/query")
    @RateLimit(dimension = RateLimit.Dimension.GLOBAL, count = 10)
    @RateLimit(dimension = RateLimit.Dimension.IP, count = 10)
    public Result<QueryResponse> queryKnowledgeBase(@Valid @RequestBody QueryRequest request) {
        return Result.success(queryService.queryKnowledgeBase(request));
    }

    /**
     * 流式查询(SSE)
     */
    @PostMapping(value = "/query/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @RateLimit(dimension = RateLimit.Dimension.GLOBAL, count = 5)
    @RateLimit(dimension = RateLimit.Dimension.IP, count = 5)
    public Flux<String> queryKnowledgeBaseStream(@Valid @RequestBody QueryRequest request) {
        log.debug("收到知识库流式查询请求: kbIds={}, question={}, 线程: {} (虚拟线程: {})",
                request.knowledgeBaseIds(), request.question(), Thread.currentThread(), Thread.currentThread().isVirtual());
        return queryService.answerQuestionStream(request.knowledgeBaseIds(), request.question());
    }
}

配置属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Data
@Component
@ConfigurationProperties(prefix = "app.ai.rag")
public class KnowledgeBaseQueryProperties {

    private Rewrite rewrite = new Rewrite();
    private Search search = new Search();
    private String systemPromptPath = "classpath:prompts/knowledgebase-query-system.st";
    private String userPromptPath = "classpath:prompts/knowledgebase-query-user.st";
    private String rewritePromptPath = "classpath:prompts/knowledgebase-query-rewrite.st";

    @Data
    public static class Rewrite {
        private boolean enabled = true;
    }

    @Data
    public static class Search {
        private int shortQueryLength = 4;      // 短查询阈值
        private int topkShort = 20;           // 短查询 topK
        private int topkMedium = 12;           // 中等查询 topK
        private int topkLong = 8;              // 长查询 topK
        private double minScoreShort = 0.18;   // 短查询最小相似度
        private double minScoreDefault = 0.28; // 默认最小相似度
    }
}

向量数据库集成

PostgreSQL + pgvector

Spring AI 与 PostgreSQL + pgvector 集成:

1
2
3
4
5
6
7
spring:
  ai:
    vectorstore:
      pgvector:
        index-type: HNSW  # HNSW 或 IVF_FLAT
        distance-type: COSINE_DISTANCE  # COSINE_DISTANCE 或 EUCLIDEAN_DISTANCE
        dimensions: 1536  # OpenAI embedding 维度

自定义 VectorStore

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Configuration
public class VectorStoreConfig {

    @Bean
    public VectorStore vectorStore(DataSource dataSource, EntityManagerFactory entityManagerFactory) {
        return SpringJdbcVectorStore.builder(dataSource)
            .configureBuilder(builder -> builder
                .getCommonBuilder()
                .withDistanceType(VectorStoreCommonBuilder.DistanceType.COSINE_DISTANCE)
            )
            .build();
    }
}

性能优化

1. 批量向量化限制

DashScope API 限制每批最多 10 条:

1
private static final int MAX_BATCH_SIZE = 10;

2. 异步处理

使用 Redis Stream 异步处理,避免阻塞请求:

1
vectorizeStreamProducer.sendVectorizeTask(knowledgeBaseId, content);

3. 预加载向量

启动时预加载待处理的知识库:

1
2
3
4
5
6
7
@PostConstruct
public void preloadKnowledge() {
    List<KnowledgeBaseEntity> kbs = knowledgeRepository.findByVectorStatus(VectorStatus.PENDING);
    for (KnowledgeBaseEntity kb : kbs) {
        vectorizeStreamProducer.sendVectorizeTask(kb.getId(), kb.getContent());
    }
}

4. 向量索引优化

1
2
3
4
5
-- PostgreSQL pgvector 索引
CREATE INDEX idx_vector_store_embedding
ON vector_store
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

常见问题

1. 向量化失败

  • 检查 API 配额
  • 确认文本不为空
  • 分块大小是否合理

2. 搜索结果不准确

  • 调整 topK 值
  • 优化分块策略
  • 使用更好的 embedding 模型

3. 性能问题

  • 使用 HNSW 索引
  • 批量处理
  • 异步预加载

总结

本项目的向量化处理流程:

  1. 上传 → Tika 解析文本 → SHA-256 去重
  2. 异步 → Redis Stream 队列 → 消费者处理
  3. 分块 → 固定大小分块(800 tokens,步长 600)
  4. 存储 → 添加 kb_id 元数据 → 分批向量化(每批 ≤10)
  5. 检索 → 相似度搜索 → RAG 问答

RAG 问答的核心特性:

  • Query Rewrite:对短查询进行重构丰富,提升检索效果
  • 动态参数:根据问题长度自动调整 topK 和 minScore
  • 多候选查询:尝试多个查询表述,找到最佳匹配
  • 核心词提取:对中文问句提取核心词进行二次验证
  • 流式输出:SSE 流式返回,支持"无结果"快速终止
使用 Hugo 构建
主题 StackJimmy 设计

发布了 35 篇文章 | 共 90422 字