前言
很多 AI 项目只停留在调用一个 API。而真实的工程问题包括:如何处理大模型响应慢的问题?如何让大模型输出格式固定?如何让大模型基于私有文档回答?
本文聚焦工程化落地:异步处理、事务安全、稳定性、成本控制。
一、异步任务场景
1.1 典型异步任务
| 场景 |
说明 |
耗时 |
| 知识库向量化 |
文档切分、生成向量嵌入并存储到 pgvector |
5-30 秒 |
| 简历 AI 分析 |
调用 LLM 对简历进行评分和建议生成 |
5-15 秒 |
| 面试报告生成 |
对面试会话进行综合评估,生成详细报告 |
5-20 秒 |
1.2 为什么需要异步处理
- 用户体验:上传即返回,后台稳如狗
- 避免超时:LLM 响应 10 秒~1 分钟,同步等待容易触发 HTTP 超时
- 系统稳定性:防止同步阻塞导致服务雪崩
二、Redis Stream 异步处理
2.1 为什么选择 Redis Stream
| 特性 |
Redis Stream |
Redis List |
Redis pub/sub |
Kafka |
RabbitMQ |
| 消费者组 |
✅ 原生支持 |
❌ 需自己实现 |
❌ 不支持 |
✅ 支持 |
✅ 支持 |
| 消息确认 |
✅ ACK 机制 |
❌ 无 |
❌ 无 |
✅ 支持 |
✅ 支持 |
| 消息持久化 |
✅ 支持 |
✅ 支持 |
❌ 不支持 |
✅ 支持 |
✅ 支持 |
| 消息回溯 |
✅ 支持 |
❌ 不支持 |
❌ 不支持 |
✅ 支持 |
❌ 不支持 |
| 部署复杂度 |
低 |
低 |
低 |
高 |
中 |
| 适用规模 |
中小规模 |
简单队列 |
实时通知 |
大规模 |
中大规模 |
选择 Redis Stream 的理由:
- 复用现有基础设施:项目已经使用 Redis 做缓存
- 消费者组支持:天然支持多实例部署
- 消息确认机制:通过 ACK 机制确保消息不丢失
- 轻量级:相比 Kafka/RabbitMQ,运维成本更低
2.2 消息队列对比
| 维度 |
Redis Stream |
RabbitMQ |
Kafka |
| 吞吐量 |
高(十万级 QPS) |
中(万级 QPS) |
极高(百万级) |
| 延迟 |
极低(亚毫秒级) |
低(毫秒级) |
中(毫秒到十毫秒级) |
| 消息堆积 |
一般(受限于内存) |
中(磁盘堆积) |
极强(TB 级) |
| 可靠性 |
中 |
高 |
极高 |
2.3 状态流转
1
2
3
|
PENDING → PROCESSING → COMPLETED
↓
FAILED
|
| 状态 |
说明 |
| PENDING |
用户刚上传文件,任务已写入 DB 并推送到 Stream |
| PROCESSING |
消费者从 Stream 拉取到消息,开始执行业务逻辑 |
| COMPLETED |
消费者业务逻辑执行成功,结果已保存 |
| FAILED |
消费者处理过程中发生异常 |
2.4 核心实现
通用常量定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public final class AsyncTaskStreamConstants {
// 通用消费者配置
public static final int MAX_RETRY_COUNT = 3; // 最大重试次数
public static final int BATCH_SIZE = 10; // 每次拉取消息数
public static final long POLL_INTERVAL_MS = 1000; // 轮询间隔
// 知识库向量化
public static final String KB_VECTORIZE_STREAM_KEY = "knowledgebase:vectorize:stream";
public static final String KB_VECTORIZE_GROUP_NAME = "vectorize-group";
// 简历分析
public static final String RESUME_ANALYZE_STREAM_KEY = "resume:analyze:stream";
public static final String RESUME_ANALYZE_GROUP_NAME = "analyze-group";
}
|
生产者实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public abstract class AbstractStreamProducer<T> {
private final RedisService redisService;
public void sendMessage(T payload) {
Map<String, String> message = buildMessage(payload);
try {
// 写入 Stream
redisService.streamAdd(getStreamKey(), message);
} catch (Exception e) {
// 发送失败时更新状态
onSendFailed(payload, e);
}
}
}
|
消费者实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public abstract class AbstractStreamConsumer<T> {
public void start() {
// 初始化消费者组
redisService.streamCreateGroup(getStreamKey(), getGroupName());
// 消费循环
while (running) {
List<Map<String, String>> messages =
redisService.streamRead(getStreamKey(), getGroupName(), getConsumerName());
for (Map<String, String> message : messages) {
try {
T payload = parsePayload(message);
processBusiness(payload);
ack(message.getId());
} catch (Exception e) {
handleError(message, e);
}
}
}
}
}
|
2.5 Template Method 模式
为什么提取抽象类?
三条异步链路(向量化、简历分析、面试评估)的 Producer/Consumer"骨架代码"高度重复:
- Consumer 都有:初始化消费者组、生成唯一 consumerName、启动循环、ACK、重试
- Producer 都有:组装消息、写入 Stream、发送失败后的状态兜底
采用 Template Method 做收敛:
1
2
3
4
5
6
|
AbstractStreamConsumer<T>
├── init() // 固定:初始化消费者组
├── consumeLoop() // 固定:消费循环
├── parsePayload() // 钩子:解析消息
├── processBusiness() // 钩子:业务处理
└── ack()/retry() // 固定:确认/重试
|
三、数据库与事务安全
3.1 事务反模式
错误做法:
1
2
3
4
5
6
7
8
|
@Transactional
public void processResume(Long resumeId) {
// ❌ 禁止在事务内调用 LLM!
ResumeAnalysisResult result = llmClient.analyze(resumeContent);
// 业务保存
resumeRepository.update(resumeId, result);
}
|
问题:LLM 调用可能耗时 10~60 秒,长时间占用数据库连接,导致连接池耗尽。
3.2 正确做法:事务极小化
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public void processResume(Long resumeId) {
// 1. LLM 调用游离在事务外
ResumeAnalysisResult result = llmClient.analyze(resumeContent);
// 2. 校验结果
validate(result);
// 3. 简短事务落盘
transactionTemplate.execute(status -> {
resumeRepository.update(resumeId, result);
return true;
});
}
|
3.3 事务边界设计原则
| 原则 |
说明 |
| LLM 调用在事务外 |
LLM 调用耗时长,不能占用数据库连接 |
| 事务极小化 |
只包含必要的数据库操作 |
| 结果校验 |
LLM 输出必须校验后再落盘 |
四、高并发与流式响应
4.1 线程池雪崩风险
问题:LLM 响应 10 秒~1 分钟,传统 Spring MVC 同步阻塞会卡死 Tomcat 线程。
1
2
3
4
5
6
|
Tomcat 线程池 (200 线程)
├── 请求1 ──▶ LLM 调用(等待30秒) ──占用线程──▶ 响应返回
├── 请求2 ──▶ LLM 调用(等待30秒) ──占用线程──▶ ...
├── ...
├── 请求199 ──▶ LLM 调用(等待30秒) ──占用线程──▶ ...
└── 请求200 ──✗ 拒绝服务
|
4.2 解决方案:SseEmitter + 异步线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(@RequestParam String question) {
SseEmitter emitter = new SseEmitter();
// 异步处理,不占用 Tomcat 线程
asyncExecutor.execute(() -> {
try {
chatClient.prompt()
.system(systemPrompt)
.user(question)
.stream()
.content()
.subscribe(chunk -> {
emitter.send(SseEmitter.event()
.data(chunk.replace("\n", "\\n")));
});
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
|
4.3 虚拟线程配置
1
2
3
4
|
spring:
threads:
virtual:
enabled: true # 启用 Java 21 虚拟线程
|
虚拟线程 vs 平台线程:
| 场景 |
平台线程 |
虚拟线程 |
| 200 并发 SSE |
线程池满,排队 |
轻松处理 |
| AI 调用等待 3 秒 |
线程阻塞,占用资源 |
自动挂起,让出资源 |
| 10000 并发请求 |
拒绝服务 |
正常处理 |
五、分布式限流
5.1 限流必要性
| 风险 |
后果 |
| 用户恶意刷接口 |
资源耗尽,正常用户无法访问 |
| LLM API 限流 |
请求被拒绝,服务不可用 |
| Token 消耗失控 |
Agent 死循环导致账单爆炸 |
5.2 Redis + Lua 限流实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
-- 滑动窗口限流
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local expire = tonumber(ARGV[4])
-- 删除窗口外的旧数据
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- 统计当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < limit then
-- 未超限,写入请求
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, expire)
return 1
else
return 0
end
|
5.3 多维度限流
| 维度 |
说明 |
配置 |
| 用户维度 |
限制单个用户 |
user:{userId}:rate |
| IP 维度 |
限制单个 IP |
ip:{ip}:rate |
| 接口维度 |
限制单个接口 |
api:{apiPath}:rate |
| Token 维度 |
限制 Token 消耗 |
user:{userId}:token |
5.4 限流组件封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@Component
public class RateLimiter {
private final RedisTemplate<String, String> redisTemplate;
public boolean tryAcquire(String key, int limit, long windowMs) {
String luaScript = loadLuaScript("rate_limiter.lua");
return redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(key),
String.valueOf(limit),
String.valueOf(windowMs),
String.valueOf(System.currentTimeMillis()),
String.valueOf(windowMs / 1000)
) == 1L;
}
}
|
六、FinOps 成本控制
6.1 LLM 成本构成
| 成本项 |
说明 |
控制策略 |
| Input Tokens |
Prompt 消耗 |
提示词压缩、语义缓存 |
| Output Tokens |
回复消耗 |
限制 max_tokens |
| API 调用次数 |
请求频率 |
限流、批处理 |
| 重试开销 |
失败重试 |
指数退避、熔断 |
6.2 Token 计量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Component
public class TokenCounter {
public void countTokens(String prompt, String completion) {
int promptTokens = countTokens(prompt);
int completionTokens = countTokens(completion);
// 记录到 Prometheus
tokenCounter.labels("prompt").inc(promptTokens);
tokenCounter.labels("completion").inc(completionTokens);
// 记录到日志
log.info("Token 消耗 - prompt: {}, completion: {}, total: {}",
promptTokens, completionTokens, promptTokens + completionTokens);
}
}
|
6.3 成本监控
1
2
3
4
5
|
management:
metrics:
export:
prometheus:
enabled: true
|
关键指标:
| 指标 |
说明 |
告警阈值 |
llm_tokens_total |
Token 总消耗 |
单日消耗 > 预算 80% |
llm_requests_total |
请求次数 |
QPS 异常波动 |
llm_cost_dollars |
成本金额 |
超过日预算 |
llm_latency_seconds |
响应延迟 |
P99 > 30s |
6.4 成本预警
1
2
3
4
5
6
7
8
9
10
11
|
@Scheduled(cron = "0 0 * * * ?") // 每小时检查
public void checkCostBudget() {
BigDecimal dailyCost = calculateDailyCost();
if (dailyCost.compareTo(budgetThreshold) > 0) {
// 发送告警
alertService.send(AlertChannel.DINGTALK,
"LLM 成本预警:今日消耗 $" + dailyCost +
",超过预算 $" + budgetThreshold);
}
}
|
6.5 成本优化策略
| 策略 |
说明 |
效果 |
| 语义缓存 |
相似问题命中缓存 |
拦截 30~50% LLM 调用 |
| 提示词压缩 |
减少无效上下文 |
节省 20~30% Token |
| 模型降级 |
主模型失败切备用 |
保证可用性 |
| 批量处理 |
合并多次调用 |
减少固定开销 |
七、安全防护
7.1 Prompt Injection 防护
攻击形式:用户恶意输入篡改 System Prompt
防护策略:
1
2
3
4
5
6
7
8
|
public String sanitizeUserInput(String input) {
// 移除可能破坏 Prompt 结构的符号
return input.replace("<script", "<script")
.replace("</script>", "</script>")
.replace("<iframe", "<iframe")
.replace("ignore", "❌ignore")
.trim();
}
|
7.2 权限校验
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Aspect
@Component
public class ToolPermissionAspect {
@Before("@annotation(toolCall)")
public void checkPermission(ToolCall toolCall) {
String userId = SecurityContext.getCurrentUserId();
String toolName = toolCall.value();
if (!permissionService.hasPermission(userId, toolName)) {
throw new PermissionDeniedException("用户 " + userId + " 无权限调用工具 " + toolName);
}
}
}
|
7.3 工具调用超时熔断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Bean
public ToolExecutor toolExecutor() {
return new ToolExecutor() {
@Override
public Object execute(ToolRequest request) {
// 5 秒超时
return CompletableFuture.supplyAsync(() -> toolService.execute(request))
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("工具执行超时: {}", request.getToolName());
throw new ToolExecutionException("工具执行超时");
})
.join();
}
};
}
|
八、常见问题与解决方案
| 问题 |
原因 |
解决方案 |
| 线程池耗尽 |
同步调用 LLM 阻塞 |
虚拟线程 + 异步处理 |
| 事务超时 |
LLM 调用在事务内 |
事务极小化 |
| 成本失控 |
Token 消耗无监控 |
语义缓存 + 成本预警 |
| 限流误杀 |
滑动窗口配置不合理 |
多维度限流 + 白名单 |
| 消息丢失 |
消费者未 ACK |
Redis Stream ACK 机制 |
九、总结
工程化落地的核心问题与解决方案:
- 异步处理:Redis Stream 实现"上传即返回,后台稳如狗"
- 事务安全:LLM 调用游离在事务外,事务极小化
- 高并发:虚拟线程 + SseEmitter 流式响应
- 分布式限流:Redis + Lua 多维度限流
- 成本控制:Token 计量 + Prometheus + 成本预警
- 安全防护:输入清洗 + 权限校验 + 超时熔断