问题一:在tomcat中的工作线程是复用的,当我们使用ThreadLocal来传递当前线程的变量时,可能会出现信息错乱的问题
ThreadLocal是线程安全的,这个毋庸置疑;但是当ThreadLocal 加上线程池时就不一定是安全的了
话不多说,直接上代码
-
创建spring-boot项目
-
在application.properties文件中配置
1
2
3
|
server.port=8001
spring.application.name=error-thread
server.tomcat.threads.max=1 # 模拟达到当访问人数达到最大线程时
|
-
测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> null);
@GetMapping("/getUser")
public String getUser(@RequestParam("uId") Integer uId) {
//设置值前先查询一次
String before = Thread.currentThread().getName() + ":" + threadLocal.get();
System.out.println("before" + before);
threadLocal.set(uId);
//设置完值后在查询一次
String after = Thread.currentThread().getName() + ":" + threadLocal.get();
System.out.println("after" + after);
return "";
}
|
-
访问http://localhost:8001/getUser?uId=1 以及 http://localhost:8001/getUser?uId=2,观察打印日志
1
2
3
4
|
beforehttp-nio-8001-exec-1:null
afterhttp-nio-8001-exec-1:1
beforehttp-nio-8001-exec-1:1
afterhttp-nio-8001-exec-1:2
|
观察上述日志可知,第一次请求没有问题,但是在第二次请求时却能获得上一个请求的数据,这样肯定会导致信息错乱
-
解决办法:每次使用完后都需要清除信息
private final ThreadLocal threadLocal = ThreadLocal.withInitial(() -> null);
1
2
3
4
5
6
7
8
9
10
11
12
|
@GetMapping("/getUser")
public String getUser(@RequestParam("uId") Integer uId) {
//设置值前先查询一次
String before = Thread.currentThread().getName() + ":" + threadLocal.get();
System.out.println("before" + before);
threadLocal.set(uId);
//设置完值后在查询一次
String after = Thread.currentThread().getName() + ":" + threadLocal.get();
System.out.println("after" + after);
threadLocal.remove();
return "";
}
|
日志:
1
2
3
4
|
beforehttp-nio-8001-exec-1:null
afterhttp-nio-8001-exec-1:1
beforehttp-nio-8001-exec-1:null
afterhttp-nio-8001-exec-1:2
|
这个时候就正常了
问题二:ConcurrentHashMap的线程不安全事件。
都知道ConcurrentHashMap是线程安全的键值对存储容器,但是当你使用不规范的时候,也是会发生意外的。
ConcurrentHashMap 的线程安全保证:
- 单个操作是原子的(如
put, get, putAll)
- 内部结构不会损坏
- 不会出现
ConcurrentModificationException
但无法保证:
请看下面代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
private static int ITEM_NUM = 1000;
private static int WORK_THREAD = 10;
private ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(
Collectors.toConcurrentMap(i -> UUID.randomUUID().toString()
, Function.identity()
, (o1, o2) -> o1
, ConcurrentHashMap::new));
}
@GetMapping("/putData")
public String putData() throws InterruptedException {
ConcurrentHashMap<String, Long> data = getData(900);
log.info("初始化完后现有数量:" + data.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(WORK_THREAD);
List<Callable<Void>> tasks = IntStream.rangeClosed(1, 10)
.mapToObj(i -> (Callable<Void>) () -> {
// synchronized (data) {
int size = ITEM_NUM - data.size();
log.info("剩余待添加元素:" + size);
if (size > 0) {
data.putAll(getData(size));
}
//}
return null;
})
.collect(Collectors.toList());
List<Future<Void>> futures = forkJoinPool.invokeAll(tasks);
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("添加完成后元素数量:" + data.size());
return "";
}
|
这个demo是使用十个线程往一个已经有900个元素的ConcurrentHashMap填充到1000。打印结果发现最后添加了1900个元素,问题就在与data.size() 和 data.putAll(getData(size))操作并不是原子性的,所以每个线程都可能得到其他线程还未添加进ConcurrentHashMap之前的size,导致出现了问题。
解决办法:
放开同步代码块,synchronized (data),这样就能保证同步代码块内的操作每次只有一个线程能访问。但是这样也就打打的降低了效率,而且也没有使用上ConcurrentHashMap的特性。
接下来我们再举一个例子来对比说明ConcurrentHashMap的妙用。
案例:
使用10个线程对10000000数量进行词频分析
先上同步代码块的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
//循环次数
private static int LOOP_COUNT = 10000000;
//线程数量
private static int THREAD_COUNT = 10;
//元素数量
private static int ITEM_COUNT = 1000;
private Map<String, Long> normalUse() throws InterruptedException {
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(
() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
if (freqs.containsKey(key)) {
freqs.put(key, freqs.get(key) + 1);
} else {
freqs.put(key, 1L);
}
}
})
);
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;
}
|
再看看使用ConcurrentHashMap中自带的原子操作实现查询并插入的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private Map<String, Long> goodUse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs.entrySet().stream().collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue())
);
}
|
computeIfAbsent()方法表示如果该key未被发现,则进行初始化,否则执行其他操作
对比效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@GetMapping("/comparison")
public String comparison() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("normaluse");
Map<String, Long> normaluse = normalUse();
stopWatch.stop();
//校验元素数量
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
//校验累计总数
Assert.isTrue(normaluse.entrySet().stream()
.mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT
, "normaluse count error");
stopWatch.start("gooduse");
Map<String, Long> gooduse = goodUse();
stopWatch.stop();
Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
Assert.isTrue(gooduse.entrySet().stream()
.mapToLong(item -> item.getValue())
.reduce(0, Long::sum) == LOOP_COUNT
, "gooduse count error");
log.info(stopWatch.prettyPrint());
return "";
}
|
得到结果
1
2
3
4
5
|
----------------------------------------
Seconds % Task name
----------------------------------------
3.2756417 91% normaluse
0.3094116 09% gooduse
|
效果显著
接下来我们看看为什么computeIfAbsent这么快
首先看看computeIfAbsent方法内的大致逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
int hash = spread(key.hashCode());
Node<K,V>[] tab;
while (true) {
// 找到对应的bucket(数组槽位)
int i = (tab.length - 1) & hash;
Node<K,V> f = tabAt(tab, i);
if (f == null) {
// 空bucket:使用CAS创建新节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
} else {
// 非空bucket:同步这个bucket
synchronized (f) {
// 在bucket内链表/树中查找或插入
// 只锁这个bucket,不影响其他bucket
}
}
}
}
|
从上面可以看到,synchronized同步代码块的范围很小,只锁住了key的所在bucket。而由我们自己去加锁的方式去实现的添加则是将整个map都锁上了,所以对于所有操作都是串行的。
问题三:不考虑实际读写情况滥用CopyOnWriteArrayList,导致接口超时。
我们写一段测试代码,来比较下使用 CopyOnWriteArrayList 和普通加锁方式 ArrayList 的读写性能吧。在这段代码中我们针对并发读和并发写分别写了一个测试方法,测试两者一 定次数的写或读操作的耗时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
//测试并发写的性能
@GetMapping("write")
public String testWrite() {
CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<Integer>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("Write:copyOnWriteArrayList");
//循环100000次并发往CopyOnWriteArrayList写入随机元素
IntStream.rangeClosed(1,loopCount).parallel().forEach(__->copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("Write:synchronizedList");
//循环100000次并发往synchronizedList写入随机元素
IntStream.rangeClosed(1,loopCount).parallel().forEach(__->synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
return "success";
}
|
结果
1
2
3
4
5
|
----------------------------------------
Seconds % Task name
----------------------------------------
3.5899543 99% Write:copyOnWriteArrayList
0.024815 01% Write:synchronizedList
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
//测试并发写的性能
@GetMapping("read")
public String testRead() {
CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<Integer>());
//先添加号元素
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 1000000;
stopWatch.start("Read:copyOnWriteArrayList");
//循环100000次并发从CopyOnWriteArrayList查询元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("Read:synchronizedList");
//循环100000次并发从synchronizedList查询元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
return "success";
}
|
结果
1
2
3
4
5
|
----------------------------------------
Seconds % Task name
----------------------------------------
0.0091182 04% Read:copyOnWriteArrayList
0.2139405 96% Read:synchronizedList
|
从上面代码来看CopyOnWriteArrayList和普通加锁方式 ArrayList 的读写性能是两种截然相反的结果。CopyOnWriteArrayList在写操作时性能时简直是太糟糕了,而普通加锁方式 ArrayList 在读时性能又比CopyOnWriteArrayList逊色太多。所以要评估自己需求,来决定是否要使用CopyOnWriteArrayList。
为什么CopyOnWriteArrayList的写入性能会这么差呢?
原因就是它每次执行插入操作时都会进行复制Object[] newElements = Arrays.copyOf(elements, len + 1);
问题四:创建线程池Executors不规范导致程序出现OOM
eg1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
/**
* newFixedThreadPool 创建线程池时的OOM
*/
public void test2(){
ExecutorService executorService = Executors.newFixedThreadPool(1);
for (int i = 0; i < 100000000; i++) {
executorService.execute(()->{
String payload = IntStream.rangeClosed(1,1000000)
.mapToObj(__->"a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(payload);
});
}
}
|
由newFixedThreadPool()创建线程的参数如下
1
2
3
4
5
|
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
|
其中new LinkedBlockingQueue<Runnable>()的大小是Integer.MAX_VALUE , 当任务量多并且任务的处理时间比较慢时,LinkedBlockingQueue带运行任务队列中数量激增,导致OOM
eg2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
/**
* newCachedThreadPool 创建线程池时的OOM
*/
public void test3(){
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100000000; i++) {
executorService.execute(()->{
String payload = IntStream.rangeClosed(1,1000000)
.mapToObj(__->"a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(payload);
});
}
}
|
由newCachedThreadPool()创建线程的参数如下
1
2
3
4
5
|
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
|
SynchronousQueue 是一个没有存储空间的阻塞队列 ,而且这里设置的最大线程数是Integer.MAX_VALUE,当任务量多并且任务的处理时间比较慢时,会一直创建线程,导致OOM。
我们定义一个方法来监控线程池状态
1
2
3
4
5
6
7
8
9
10
|
public void printStatus(ThreadPoolExecutor threadPoolExecutor) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println("========================");
System.out.println("线程数:" + threadPoolExecutor.getPoolSize());
System.out.println("活跃线程数:" + threadPoolExecutor.getActiveCount());
System.out.println("完成了多少任务:" + threadPoolExecutor.getCompletedTaskCount());
System.out.println("队列中还有多少积压:" + threadPoolExecutor.getQueue().size());
System.out.println("========================");
}, 0, 1, TimeUnit.SECONDS);
}
|
自定义一个线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
public void test4() throws InterruptedException {
//具有 2 个核心线程、5 个最大线程、使用容量为 10
//的 ArrayBlockingQueue 阻塞队列作为工作队列,使用默认的 AbortPolicy 拒绝策略,也
//就是任务添加到线程池失败会抛出 RejectedExecutionException。此外,我们借助了
//Jodd 类库的 ThreadFactoryBuilder 方法来构造一个线程工厂,实现线程池线程的自定义
//命名。
AtomicInteger atomicInteger = new AtomicInteger();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 5, TimeUnit.SECONDS
, new ArrayBlockingQueue<>(10)
, new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()
, new ThreadPoolExecutor.AbortPolicy()
);
printStatus(threadPoolExecutor);
IntStream.rangeClosed(1,20).forEach(i->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int id = atomicInteger.incrementAndGet();
try {
threadPoolExecutor.submit(()->{
System.out.println(id + "start。。。。。");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(id + "end。。。。。");
});
} catch (Exception e) {
System.out.println(id + "error submit。。。。。");
throw new RuntimeException(e);
}
});
TimeUnit.SECONDS.sleep(60);
}
|
运行结果打印
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
========================
线程数:0
活跃线程数:0
完成了多少任务:0
队列中还有多少积压:0
========================
1start。。。。。
========================
线程数:1
活跃线程数:1
完成了多少任务:0
队列中还有多少积压:0
========================
....... 省略一部分
========================
线程数:2
活跃线程数:2
完成了多少任务:2
队列中还有多少积压:8
========================
========================
线程数:2
活跃线程数:2
完成了多少任务:2
队列中还有多少积压:9
========================
========================
线程数:2
活跃线程数:2
完成了多少任务:2
队列中还有多少积压:10
========================
15start。。。。。
========================
线程数:3
活跃线程数:3
完成了多少任务:2
队列中还有多少积压:10
========================
16start。。。。。
========================
线程数:4
活跃线程数:4
完成了多少任务:2
队列中还有多少积压:10
========================
17start。。。。。
========================
线程数:5
活跃线程数:5
完成了多少任务:2
队列中还有多少积压:10
========================
18error submit。。。。。
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@30946e09[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@6e5e91e4[Wrapped task = com.error.thread.thread.ThreadDemo4$$Lambda$24/0x000001eb1b003450@2cdf8d8a]] rejected from java.util.concurrent.ThreadPoolExecutor@5cb0d902[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 2]
at com.error.thread.thread.ThreadDemo4.lambda$test4$8(ThreadDemo4.java:127)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:617)
at com.error.thread.thread.ThreadDemo4.test4(ThreadDemo4.java:107)
at com.error.thread.thread.ThreadDemo4.main(ThreadDemo4.java:137)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@30946e09[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@6e5e91e4[Wrapped task = com.error.thread.thread.ThreadDemo4$$Lambda$24/0x000001eb1b003450@2cdf8d8a]] rejected from java.util.concurrent.ThreadPoolExecutor@5cb0d902[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 2]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:123)
at com.error.thread.thread.ThreadDemo4.lambda$test4$8(ThreadDemo4.java:116)
... 4 more
....... 省略一部分
========================
线程数:2
活跃线程数:0
完成了多少任务:17
队列中还有多少积压:0
========================
|
⭐⭐⭐根据自定义线程池检查结果,我们可以总结出线程池的执行逻辑如下
- 当有任务提交到线程池中,使用核心线程执行
- 当核心线程使用完后再有任务提交到线程池中时,不会立马扩充线程,而是会将任务先加入到工作队列中
- 当工作队列到达指定的容量大小时,会开始扩充线程,最大数量不可超过指定最大线程数
- 当达到最大线程数还有任务提交时,则会使用指定的拒绝策略。
- 最后当线程数大于核心线程数时,且在指定
keepAliveTime内没有任务提交过来,则会进行收缩核心线程数到指定数量。
由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队 列的 offer 方法,造成这个队列已满的假象呢?
自定义工作队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class MyArrayBlockingQueue<E> extends ArrayBlockingQueue<E> {
// 持有线程池引用,用于获取实时线程数和最大线程数
private ThreadPoolExecutor threadPoolExecutor;
public MyArrayBlockingQueue(int capacity, ThreadPoolExecutor threadPoolExecutor) {
super(capacity);
this.threadPoolExecutor = threadPoolExecutor;
}
public MyArrayBlockingQueue(int capacity, boolean fair, ThreadPoolExecutor threadPoolExecutor) {
super(capacity, fair);
this.threadPoolExecutor = threadPoolExecutor;
}
public MyArrayBlockingQueue(int capacity, boolean fair, Collection c, ThreadPoolExecutor threadPoolExecutor) {
super(capacity, fair, c);
this.threadPoolExecutor = threadPoolExecutor;
}
@Override
public boolean offer(E o) {
Objects.requireNonNull(o);
if (threadPoolExecutor.getMaximumPoolSize() > threadPoolExecutor.getPoolSize()) {
return false;
}
return super.offer(o);
}
|
创建线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public static ThreadPoolExecutor newMyThreadPoolExecutor() {
/**
* 由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队
* 列的 offer 方法,造成这个队列已满的假象呢?
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 5, TimeUnit.SECONDS
, new ArrayBlockingQueue<>(10) // 临时的,等下通过反射进行替换
, new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()
, new ThreadPoolExecutor.AbortPolicy()
);
//通过反射替换工作队列
MyArrayBlockingQueue workQueue = new MyArrayBlockingQueue<>(10, threadPoolExecutor);
try {
Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
workQueueField.setAccessible(true);
workQueueField.set(threadPoolExecutor , workQueue);
} catch (Exception e) {
throw new RuntimeException("替换队列失败", e);
}
return threadPoolExecutor;
}
|
⭐⭐⭐要根据任务的“轻重缓急”来指定线程池的核心 参数,包括线程数、回收策略和任务队列:
对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队 列。 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理 由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过 多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做 缓冲。