什么是 RAG?
RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合了信息检索和生成式 AI 的技术架构。它的核心思想是:
检索阶段 :从知识库中检索与用户问题相关的内容
生成阶段 :将检索到的内容作为上下文,让大模型生成答案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
用户问题
│
▼
┌─────────────────┐
│ 向量数据库检索 │ ←── 知识库已向量化
└────────┬────────┘
│
▼
┌─────────────────┐
│ 构建 Prompt │ ←── 系统提示词 + 检索内容 + 用户问题
└────────┬────────┘
│
▼
┌─────────────────┐
│ 大模型生成答案 │
└─────────────────┘
项目中的向量化架构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
┌──────────────────────────────────────────────────────────────────────┐
│ 知识库上传流程 │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 文件上传 │───▶│ 文件解析 │───▶│ 去重检查 │ │
│ │ (RustFS) │ │ (Tika) │ │ (SHA-256) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 元数据入库 │◀───│ 状态 PENDING│◀───│ 发送任务 │ │
│ │ (PostgreSQL)│ │ │ │ (Redis Stream)│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 向量化消费流程 │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 消费消息 │───▶│ 文本分块 │───▶│ 添加元数据 │ │
│ │ │ │ (800 tokens)│ │ (kb_id) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 分批存储 │◀───│ 向量化嵌入 │◀───│ 每批 ≤10 │ │
│ │ (VectorStore)│ │ (DashScope) │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 更新状态 │◀───│ COMPLETED │ │
│ │ │ │ /FAILED │ │
│ └─────────────┘ └─────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
核心实现
1. 知识库实体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Entity
@Table ( name = "knowledge_bases" , indexes = {
@Index ( name = "idx_kb_hash" , columnList = "fileHash" , unique = true )
})
public class KnowledgeBaseEntity {
@Id
@GeneratedValue ( strategy = GenerationType . IDENTITY )
private Long id ;
@Column ( nullable = false , unique = true , length = 64 )
private String fileHash ; // 用于去重
@Column ( nullable = false )
private String name ;
@Enumerated ( EnumType . STRING )
@Column ( length = 20 )
private VectorStatus vectorStatus = VectorStatus . PENDING ;
private Integer chunkCount = 0 ; // 分块数量
}
2. 向量化状态枚举
1
2
3
4
5
6
public enum VectorStatus {
PENDING , // 待处理
PROCESSING , // 处理中
COMPLETED , // 完成
FAILED // 失败
}
3. 向量化服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Service
@Slf4j
public class KnowledgeBaseVectorService {
private static final int MAX_BATCH_SIZE = 10 ; // 每批最多 10 个
private static final int CHUNK_MAX_SIZE = 800 ; // 每个 chunk 最大 tokens
private final VectorStore vectorStore ;
private final TextSplitter textSplitter ;
public int vectorizeAndStore ( Long knowledgeBaseId , String content ) {
// 1. 删除旧向量数据
deleteByKnowledgeBaseId ( knowledgeBaseId );
// 2. 文本分块
List < Document > documents = doSplitDocuments ( content );
// 3. 添加元数据
documents . forEach ( doc -> doc . getMetadata (). put ( "kb_id" , knowledgeBaseId . toString ()));
// 4. 分批向量化
int batchCount = vectorBatchAdd ( documents );
return batchCount ;
}
private int vectorBatchAdd ( List < Document > documents ) {
int totalChunks = documents . size ();
int batchCount = ( totalChunks + MAX_BATCH_SIZE - 1 ) / MAX_BATCH_SIZE ;
for ( int i = 0 ; i < batchCount ; i ++ ) {
int start = i * MAX_BATCH_SIZE ;
int end = Math . min ( start + MAX_BATCH_SIZE , documents . size ());
vectorStore . add ( documents . subList ( start , end ));
}
return totalChunks ;
}
private List < Document > doSplitDocuments ( String content ) {
List < Document > documents = new ArrayList <> ();
int length = content . length ();
int step = CHUNK_MAX_SIZE - CHUNK_CONTEXT_SIZE ;
for ( int i = 0 ; i < length ; i += step ) {
int end = Math . min ( i + CHUNK_MAX_SIZE , length );
documents . add ( new Document ( content . substring ( i , end )));
}
return documents ;
}
}
4. Redis Stream 异步处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 生产者
@Service
public class VectorizeStreamProducer extends AbstractStreamProducer < VectorizeTaskPayload > {
public void sendVectorizeTask ( Long id , String content ) {
sendTask ( new VectorizeTaskPayload ( id , content ));
}
@Override
protected String streamKey () {
return "knowledgebase:vectorize:stream" ;
}
}
// 消费者
@Service
public class VectorizeStreamConsumer extends AbstractStreamConsumer < VectorizeTaskPayload > {
private final KnowledgeBaseVectorService vectorService ;
@Override
protected void processBusiness ( VectorizeTaskPayload payload ) {
int chunks = vectorService . vectorizeAndStore ( payload . kbId (), payload . content ());
// 更新分块数
knowledgeRepository . findById ( payload . kbId ). ifPresent ( entity -> {
entity . setChunkCount ( chunks );
knowledgeRepository . save ( entity );
});
}
}
文本分块策略
固定长度分块
1
2
3
4
5
6
7
8
9
10
11
12
private List < Document > doSplitDocuments ( String content ) {
List < Document > documents = new ArrayList <> ();
int chunkSize = 800 ;
int overlap = 200 ; // 重叠区域
for ( int i = 0 ; i < content . length (); i += ( chunkSize - overlap )) {
int end = Math . min ( i + chunkSize , content . length ());
String chunk = content . substring ( i , end );
documents . add ( new Document ( chunk ));
}
return documents ;
}
TokenTextSplitter(推荐)
Spring AI 提供了基于 token 的分块器:
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class KnowledgeBaseVectorService {
private final TextSplitter textSplitter ;
public KnowledgeBaseVectorService ( VectorStore vectorStore , TextSplitter textSplitter ) {
this . textSplitter = new TokenTextSplitter . Builder ()
. withChunkSize ( 800 ) // 每块 token 数
. withMinChunkSizeChars ( 100 ) // 最小字符数
. withMaxNumChunks ( 100 ) // 最大块数
. build ();
}
}
按段落分块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private List < Document > splitByParagraphs ( String content ) {
List < Document > documents = new ArrayList <> ();
// 按换行符分割段落
String [] paragraphs = content . split ( "\\n\\s*\\n" );
StringBuilder currentChunk = new StringBuilder ();
for ( String para : paragraphs ) {
if ( currentChunk . length () + para . length () > MAX_CHUNK_SIZE ) {
if ( currentChunk . length () > 0 ) {
documents . add ( new Document ( currentChunk . toString (). trim ()));
currentChunk = new StringBuilder ();
}
}
currentChunk . append ( para ). append ( "\n\n" );
}
if ( currentChunk . length () > 0 ) {
documents . add ( new Document ( currentChunk . toString (). trim ()));
}
return documents ;
}
元数据设计
知识库元数据
1
2
3
4
5
6
7
documents . forEach ( doc -> {
Map < String , Object > metadata = doc . getMetadata ();
metadata . put ( "kb_id" , knowledgeBaseId . toString ());
metadata . put ( "kb_name" , knowledgeBase . getName ());
metadata . put ( "category" , knowledgeBase . getCategory ());
metadata . put ( "uploaded_at" , knowledgeBase . getUploadedAt (). toString ());
});
查询时过滤
1
2
3
4
5
6
7
8
9
10
11
12
13
// 只查询特定知识库的内容
VectorStoreFilter filter = VectorStoreFilter . builder ()
. key ( "kb_id" )
. is ( knowledgeBaseId . toString ())
. build ();
List < Document > results = vectorStore . similaritySearch (
SearchRequest . builder ()
. query ( userQuestion )
. topK ( 5 )
. filter ( filter )
. build ()
);
向量数据库集成
PostgreSQL + pgvector
Spring AI 与 PostgreSQL + pgvector 集成:
1
2
3
4
5
6
7
spring :
ai :
vectorstore :
pgvector :
index-type : HNSW # HNSW 或 IVF_FLAT
distance-type : COSINE_DISTANCE # COSINE_DISTANCE 或 EUCLIDEAN_DISTANCE
dimensions : 1536 # OpenAI embedding 维度
自定义 VectorStore
1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class VectorStoreConfig {
@Bean
public VectorStore vectorStore ( DataSource dataSource , EntityManagerFactory entityManagerFactory ) {
return SpringJdbcVectorStore . builder ( dataSource )
. configureBuilder ( builder -> builder
. getCommonBuilder ()
. withDistanceType ( VectorStoreCommonBuilder . DistanceType . COSINE_DISTANCE )
)
. build ();
}
}
相似度搜索
基础搜索
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class RagService {
private final VectorStore vectorStore ;
public List < String > searchSimilar ( String query , int topK ) {
List < Document > results = vectorStore . similaritySearch (
SearchRequest . builder ()
. query ( query )
. topK ( topK )
. build ()
);
return results . stream ()
. map ( Document :: getContent )
. toList ();
}
}
带过滤条件的搜索
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List < String > searchByKnowledgeBase ( String query , Long knowledgeBaseId ) {
VectorStoreFilter filter = VectorStoreFilter . builder ()
. key ( "kb_id" )
. is ( knowledgeBaseId . toString ())
. build ();
List < Document > results = vectorStore . similaritySearch (
SearchRequest . builder ()
. query ( query )
. topK ( 5 )
. filter ( filter )
. build ()
);
return results . stream (). map ( Document :: getContent ). toList ();
}
RAG 问答实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Service
@Slf4j
public class RagQuestionService {
private final VectorStore vectorStore ;
private final ChatClient chatClient ;
public String answer ( String question , Long knowledgeBaseId ) {
// 1. 检索相关知识
List < String > relevantDocs = searchByKnowledgeBase ( question , knowledgeBaseId );
if ( relevantDocs . isEmpty ()) {
return "抱歉,未找到相关内容。" ;
}
// 2. 构建上下文
String context = relevantDocs . stream ()
. collect ( Collectors . joining ( "\n---\n" ));
// 3. 构建 Prompt
String prompt = String . format ( """
基于以下知识库内容回答用户问题。如果知识库中没有相关信息,请说明无法回答。
知识库内容:
%s
用户问题:%s
""" , context , question );
// 4. 调用大模型
return chatClient . prompt ()
. system ( "你是一个专业的知识库问答助手。" )
. user ( prompt )
. call ()
. content ();
}
}
性能优化
1. 批量向量化限制
1
2
// DashScope API 限制每批最多 10 条
private static final int MAX_BATCH_SIZE = 10 ;
2. 异步处理
1
2
// 使用 Redis Stream 异步处理,避免阻塞请求
vectorizeStreamProducer . sendVectorizeTask ( knowledgeBaseId , content );
3. 预加载向量
1
2
3
4
5
6
7
8
// 启动时预加载常用知识库
@PostConstruct
public void preloadKnowledge () {
List < KnowledgeBaseEntity > kbs = knowledgeRepository . findByVectorStatus ( VectorStatus . PENDING );
for ( KnowledgeBaseEntity kb : kbs ) {
vectorizeStreamProducer . sendVectorizeTask ( kb . getId (), kb . getContent ());
}
}
4. 向量索引优化
1
2
3
4
5
-- PostgreSQL pgvector 索引
CREATE INDEX idx_vector_store_embedding
ON vector_store
USING hnsw ( embedding vector_cosine_ops )
WITH ( m = 16 , ef_construction = 64 );
向量删除处理
1
2
3
4
5
6
7
8
9
10
@Transactional
public int deleteByKnowledgeBaseId ( Long knowledgeBaseId ) {
// 使用 JdbcTemplate 直接操作向量表
String delSql = """
DELETE FROM vector_store
WHERE metadata->>'kb_id' = ?
OR (metadata->>'kb_id_long' IS NOT NULL AND (metadata->>'kb_id_long')::bigint = ?)
""" ;
return jdbcTemplate . update ( delSql , knowledgeBaseId . toString (), knowledgeBaseId );
}
常见问题
1. 向量化失败
检查 API 配额
确认文本不为空
分块大小是否合理
2. 搜索结果不准确
调整 topK 值
优化分块策略
使用更好的 embedding 模型
3. 性能问题
总结
本项目的向量化处理流程:
上传 → Tika 解析文本 → SHA-256 去重
异步 → Redis Stream 队列 → 消费者处理
分块 → 固定大小分块(800 tokens)
存储 → 添加 kb_id 元数据 → 分批向量化
检索 → 相似度搜索 → RAG 问答
通过这套流程,实现了:
知识库的自动向量化
异步处理,不阻塞主流程
精确的向量过滤(按知识库 ID)
完整的错误处理和重试机制