Featured image of post 业务开发问题-代码篇(线程)

业务开发问题-代码篇(线程)

问题一:在tomcat中的工作线程是复用的,当我们使用ThreadLocal来传递当前线程的变量时,可能会出现信息错乱的问题

ThreadLocal是线程安全的,这个毋庸置疑;但是当ThreadLocal 加上线程池时就不一定是安全的了

话不多说,直接上代码

  1. 创建spring-boot项目

  2. 在application.properties文件中配置

    1
    2
    3
    
    server.port=8001
    spring.application.name=error-thread
    server.tomcat.threads.max=1 # 模拟达到当访问人数达到最大线程时
    
  3. 测试代码

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
     private final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> null);
    
    
        @GetMapping("/getUser")
        public String getUser(@RequestParam("uId") Integer uId) {
            //设置值前先查询一次
            String before = Thread.currentThread().getName() + ":" + threadLocal.get();
            System.out.println("before" + before);
            threadLocal.set(uId);
            //设置完值后在查询一次
            String after = Thread.currentThread().getName() + ":" + threadLocal.get();
            System.out.println("after" + after);
            return "";
        }
    
  4. 访问http://localhost:8001/getUser?uId=1 以及 http://localhost:8001/getUser?uId=2,观察打印日志

    1
    2
    3
    4
    
     beforehttp-nio-8001-exec-1:null
     afterhttp-nio-8001-exec-1:1
     beforehttp-nio-8001-exec-1:1
     afterhttp-nio-8001-exec-1:2
    

观察上述日志可知,第一次请求没有问题,但是在第二次请求时却能获得上一个请求的数据,这样肯定会导致信息错乱

  1. 解决办法:每次使用完后都需要清除信息

    private final ThreadLocal threadLocal = ThreadLocal.withInitial(() -> null);

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    @GetMapping("/getUser")
    public String getUser(@RequestParam("uId") Integer uId) {
        //设置值前先查询一次
        String before = Thread.currentThread().getName() + ":" + threadLocal.get();
        System.out.println("before" + before);
        threadLocal.set(uId);
        //设置完值后在查询一次
        String after = Thread.currentThread().getName() + ":" + threadLocal.get();
        System.out.println("after" + after);
        threadLocal.remove();
        return "";
    }
    

    日志:

    1
    2
    3
    4
    
    beforehttp-nio-8001-exec-1:null
    afterhttp-nio-8001-exec-1:1
    beforehttp-nio-8001-exec-1:null
    afterhttp-nio-8001-exec-1:2
    

    这个时候就正常了

问题二:ConcurrentHashMap的线程不安全事件。

都知道ConcurrentHashMap是线程安全的键值对存储容器,但是当你使用不规范的时候,也是会发生意外的。

ConcurrentHashMap 的线程安全保证:

  • 单个操作是原子的(如 put, get, putAll
  • 内部结构不会损坏
  • 不会出现 ConcurrentModificationException

但无法保证:

  • 复合操作的原子性
  • 业务逻辑的正确性

请看下面代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private static int ITEM_NUM = 1000;

    private static int WORK_THREAD = 10;


    private ConcurrentHashMap<String, Long> getData(int count) {
        return LongStream.rangeClosed(1, count)
                .boxed()
                .collect(
                        Collectors.toConcurrentMap(i -> UUID.randomUUID().toString()
                                , Function.identity()
                                , (o1, o2) -> o1
                                , ConcurrentHashMap::new));
    }


    @GetMapping("/putData")
    public String putData() throws InterruptedException {
        ConcurrentHashMap<String, Long> data = getData(900);
        log.info("初始化完后现有数量:" + data.size());
        ForkJoinPool forkJoinPool = new ForkJoinPool(WORK_THREAD);
        List<Callable<Void>> tasks = IntStream.rangeClosed(1, 10)
                .mapToObj(i -> (Callable<Void>) () -> {
                    // synchronized (data) {
                        int size = ITEM_NUM - data.size();
                        log.info("剩余待添加元素:" + size);
                        if (size > 0) {
                            data.putAll(getData(size));
                        }
                    //}
                    return null;
                })
                .collect(Collectors.toList());

        List<Future<Void>> futures = forkJoinPool.invokeAll(tasks);
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        log.info("添加完成后元素数量:" + data.size());
        return "";
    }

这个demo是使用十个线程往一个已经有900个元素的ConcurrentHashMap填充到1000。打印结果发现最后添加了1900个元素,问题就在与data.size() 和 data.putAll(getData(size))操作并不是原子性的,所以每个线程都可能得到其他线程还未添加进ConcurrentHashMap之前的size,导致出现了问题。

解决办法:

放开同步代码块,synchronized (data),这样就能保证同步代码块内的操作每次只有一个线程能访问。但是这样也就打打的降低了效率,而且也没有使用上ConcurrentHashMap的特性。

接下来我们再举一个例子来对比说明ConcurrentHashMap的妙用。

案例:

使用10个线程对10000000数量进行词频分析

先上同步代码块的代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
	//循环次数
    private static int LOOP_COUNT = 10000000;
    //线程数量
    private static int THREAD_COUNT = 10;
    //元素数量
    private static int ITEM_COUNT = 1000;

    private  Map<String, Long> normalUse() throws InterruptedException {
        ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        forkJoinPool.execute(
                () -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
                    String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
                    synchronized (freqs) {
                        if (freqs.containsKey(key)) {
                            freqs.put(key, freqs.get(key) + 1);
                        } else {
                            freqs.put(key, 1L);
                        }
                    }

                })
        );
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        return freqs;
    }

再看看使用ConcurrentHashMap中自带的原子操作实现查询并插入的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private  Map<String, Long> goodUse() throws InterruptedException {
    ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
    ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
    forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
        String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
        freqs.computeIfAbsent(key, k -> new LongAdder()).increment();

    }));
    forkJoinPool.shutdown();
    forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
    return freqs.entrySet().stream().collect(Collectors.toMap(
            e -> e.getKey(),
            e -> e.getValue().longValue())
    );
}

computeIfAbsent()方法表示如果该key未被发现,则进行初始化,否则执行其他操作

对比效率

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
	@GetMapping("/comparison")
    public String comparison() throws InterruptedException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("normaluse");
        Map<String, Long> normaluse = normalUse();
        stopWatch.stop();
        //校验元素数量
        Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
        //校验累计总数
        Assert.isTrue(normaluse.entrySet().stream()
                        .mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT
                , "normaluse count error");
        stopWatch.start("gooduse");
        Map<String, Long> gooduse = goodUse();
        stopWatch.stop();
        Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
        Assert.isTrue(gooduse.entrySet().stream()
                        .mapToLong(item -> item.getValue())
                        .reduce(0, Long::sum) == LOOP_COUNT
                , "gooduse count error");
        log.info(stopWatch.prettyPrint());

        return "";
    }

得到结果

1
2
3
4
5
----------------------------------------
Seconds       %       Task name
----------------------------------------
3.2756417     91%     normaluse
0.3094116     09%     gooduse

效果显著

接下来我们看看为什么computeIfAbsent这么快

首先看看computeIfAbsent方法内的大致逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
    int hash = spread(key.hashCode());
    Node<K,V>[] tab;
    
    while (true) {
        // 找到对应的bucket(数组槽位)
        int i = (tab.length - 1) & hash;
        Node<K,V> f = tabAt(tab, i);
        if (f == null) {
            // 空bucket:使用CAS创建新节点
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;
        } else {
            // 非空bucket:同步这个bucket
            synchronized (f) {
                // 在bucket内链表/树中查找或插入
                // 只锁这个bucket,不影响其他bucket
            }
        }
    }
}

从上面可以看到,synchronized同步代码块的范围很小,只锁住了key的所在bucket。而由我们自己去加锁的方式去实现的添加则是将整个map都锁上了,所以对于所有操作都是串行的。

问题三:不考虑实际读写情况滥用CopyOnWriteArrayList,导致接口超时。

我们写一段测试代码,来比较下使用 CopyOnWriteArrayList 和普通加锁方式 ArrayList 的读写性能吧。在这段代码中我们针对并发读和并发写分别写了一个测试方法,测试两者一 定次数的写或读操作的耗时。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
 	//测试并发写的性能
    @GetMapping("write")
    public String testWrite() {
        CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
        List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<Integer>());
        StopWatch stopWatch = new StopWatch();
        int loopCount = 100000;
        stopWatch.start("Write:copyOnWriteArrayList");
        //循环100000次并发往CopyOnWriteArrayList写入随机元素
        IntStream.rangeClosed(1,loopCount).parallel().forEach(__->copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
        stopWatch.stop();
        stopWatch.start("Write:synchronizedList");
        //循环100000次并发往synchronizedList写入随机元素
        IntStream.rangeClosed(1,loopCount).parallel().forEach(__->synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
        return "success";
    }

结果

1
2
3
4
5
----------------------------------------
Seconds       %       Task name
----------------------------------------
3.5899543     99%     Write:copyOnWriteArrayList
0.024815      01%     Write:synchronizedList
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
//测试并发写的性能
@GetMapping("read")
public String testRead() {
    CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
    List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<Integer>());
    //先添加号元素
    addAll(copyOnWriteArrayList);
    addAll(synchronizedList);

    StopWatch stopWatch = new StopWatch();
    int loopCount = 1000000;
    stopWatch.start("Read:copyOnWriteArrayList");
    //循环100000次并发从CopyOnWriteArrayList查询元素
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(loopCount)));
    stopWatch.stop();
    stopWatch.start("Read:synchronizedList");
    //循环100000次并发从synchronizedList查询元素
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(loopCount)));
    stopWatch.stop();
    log.info(stopWatch.prettyPrint());
    return "success";
}

结果

1
2
3
4
5
----------------------------------------
Seconds       %       Task name
----------------------------------------
0.0091182     04%     Read:copyOnWriteArrayList
0.2139405     96%     Read:synchronizedList

从上面代码来看CopyOnWriteArrayList和普通加锁方式 ArrayList 的读写性能是两种截然相反的结果。CopyOnWriteArrayList在写操作时性能时简直是太糟糕了,而普通加锁方式 ArrayList 在读时性能又比CopyOnWriteArrayList逊色太多。所以要评估自己需求,来决定是否要使用CopyOnWriteArrayList

为什么CopyOnWriteArrayList的写入性能会这么差呢?

原因就是它每次执行插入操作时都会进行复制Object[] newElements = Arrays.copyOf(elements, len + 1);

问题四:创建线程池Executors不规范导致程序出现OOM

eg1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
/**
 * newFixedThreadPool 创建线程池时的OOM
 */
public void test2(){
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    for (int i = 0; i < 100000000; i++) {
        executorService.execute(()->{
            String payload = IntStream.rangeClosed(1,1000000)
                    .mapToObj(__->"a")
                    .collect(Collectors.joining("")) + UUID.randomUUID().toString();
            try {
                TimeUnit.HOURS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(payload);
        });
    }
}

newFixedThreadPool()创建线程的参数如下

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

其中new LinkedBlockingQueue<Runnable>()的大小是Integer.MAX_VALUE , 当任务量多并且任务的处理时间比较慢时,LinkedBlockingQueue带运行任务队列中数量激增,导致OOM

eg2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
/**
 * newCachedThreadPool 创建线程池时的OOM
 */
public void test3(){
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 100000000; i++) {
        executorService.execute(()->{
            String payload = IntStream.rangeClosed(1,1000000)
                    .mapToObj(__->"a")
                    .collect(Collectors.joining("")) + UUID.randomUUID().toString();
            try {
                TimeUnit.HOURS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(payload);
        });
    }
}

newCachedThreadPool()创建线程的参数如下

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SynchronousQueue 是一个没有存储空间的阻塞队列 ,而且这里设置的最大线程数是Integer.MAX_VALUE,当任务量多并且任务的处理时间比较慢时,会一直创建线程,导致OOM。

我们定义一个方法来监控线程池状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public void printStatus(ThreadPoolExecutor threadPoolExecutor) {
    Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
        System.out.println("========================");
        System.out.println("线程数:" + threadPoolExecutor.getPoolSize());
        System.out.println("活跃线程数:" + threadPoolExecutor.getActiveCount());
        System.out.println("完成了多少任务:" + threadPoolExecutor.getCompletedTaskCount());
        System.out.println("队列中还有多少积压:" + threadPoolExecutor.getQueue().size());
        System.out.println("========================");
    }, 0, 1, TimeUnit.SECONDS);
}

自定义一个线程池

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void test4() throws InterruptedException {
    //具有 2 个核心线程、5 个最大线程、使用容量为 10
    //的 ArrayBlockingQueue 阻塞队列作为工作队列,使用默认的 AbortPolicy 拒绝策略,也
    //就是任务添加到线程池失败会抛出 RejectedExecutionException。此外,我们借助了
    //Jodd 类库的 ThreadFactoryBuilder 方法来构造一个线程工厂,实现线程池线程的自定义
    //命名。
    AtomicInteger atomicInteger = new AtomicInteger();
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 5, 5, TimeUnit.SECONDS
            , new ArrayBlockingQueue<>(10)
            , new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()
            , new ThreadPoolExecutor.AbortPolicy()
    );
    printStatus(threadPoolExecutor);
    IntStream.rangeClosed(1,20).forEach(i->{
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        int id = atomicInteger.incrementAndGet();

        try {
            threadPoolExecutor.submit(()->{
                System.out.println(id + "start。。。。。");
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(id + "end。。。。。");
            });
        } catch (Exception e) {
            System.out.println(id + "error submit。。。。。");
            throw new RuntimeException(e);
        }
    });
    TimeUnit.SECONDS.sleep(60);

}

运行结果打印

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
========================
线程数:0
活跃线程数:0
完成了多少任务:0
队列中还有多少积压:0
========================
1start。。。。。
========================
线程数:1
活跃线程数:1
完成了多少任务:0
队列中还有多少积压:0
========================

 ....... 省略一部分
 
 
 ========================
线程数:2
活跃线程数:2
完成了多少任务:2
队列中还有多少积压:8
========================
========================
线程数:2
活跃线程数:2
完成了多少任务:2
队列中还有多少积压:9
========================
========================
线程数:2
活跃线程数:2
完成了多少任务:2
队列中还有多少积压:10
========================
15start。。。。。
========================
线程数:3
活跃线程数:3
完成了多少任务:2
队列中还有多少积压:10
========================
16start。。。。。
========================
线程数:4
活跃线程数:4
完成了多少任务:2
队列中还有多少积压:10
========================
17start。。。。。
========================
线程数:5
活跃线程数:5
完成了多少任务:2
队列中还有多少积压:10
========================
18error submit。。。。。
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@30946e09[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@6e5e91e4[Wrapped task = com.error.thread.thread.ThreadDemo4$$Lambda$24/0x000001eb1b003450@2cdf8d8a]] rejected from java.util.concurrent.ThreadPoolExecutor@5cb0d902[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 2]
	at com.error.thread.thread.ThreadDemo4.lambda$test4$8(ThreadDemo4.java:127)
	at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
	at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:617)
	at com.error.thread.thread.ThreadDemo4.test4(ThreadDemo4.java:107)
	at com.error.thread.thread.ThreadDemo4.main(ThreadDemo4.java:137)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@30946e09[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@6e5e91e4[Wrapped task = com.error.thread.thread.ThreadDemo4$$Lambda$24/0x000001eb1b003450@2cdf8d8a]] rejected from java.util.concurrent.ThreadPoolExecutor@5cb0d902[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 2]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
	at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:123)
	at com.error.thread.thread.ThreadDemo4.lambda$test4$8(ThreadDemo4.java:116)
	... 4 more



 ....... 省略一部分
 
 
 ========================
线程数:2
活跃线程数:0
完成了多少任务:17
队列中还有多少积压:0
========================

⭐⭐⭐根据自定义线程池检查结果,我们可以总结出线程池的执行逻辑如下

  1. 当有任务提交到线程池中,使用核心线程执行
  2. 当核心线程使用完后再有任务提交到线程池中时,不会立马扩充线程,而是会将任务先加入到工作队列中
  3. 当工作队列到达指定的容量大小时,会开始扩充线程,最大数量不可超过指定最大线程数
  4. 当达到最大线程数还有任务提交时,则会使用指定的拒绝策略。
  5. 最后当线程数大于核心线程数时,且在指定keepAliveTime内没有任务提交过来,则会进行收缩核心线程数到指定数量。

由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队 列的 offer 方法,造成这个队列已满的假象呢?

自定义工作队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyArrayBlockingQueue<E> extends ArrayBlockingQueue<E> {
    // 持有线程池引用,用于获取实时线程数和最大线程数
    private ThreadPoolExecutor threadPoolExecutor;

    public MyArrayBlockingQueue(int capacity, ThreadPoolExecutor threadPoolExecutor) {
        super(capacity);
        this.threadPoolExecutor = threadPoolExecutor;


    }

    public MyArrayBlockingQueue(int capacity, boolean fair, ThreadPoolExecutor threadPoolExecutor) {
        super(capacity, fair);
        this.threadPoolExecutor = threadPoolExecutor;
    }

    public MyArrayBlockingQueue(int capacity, boolean fair, Collection c, ThreadPoolExecutor threadPoolExecutor) {
        super(capacity, fair, c);
        this.threadPoolExecutor = threadPoolExecutor;

    }

    @Override
    public boolean offer(E o) {
        Objects.requireNonNull(o);
        if (threadPoolExecutor.getMaximumPoolSize() > threadPoolExecutor.getPoolSize()) {
            return false;
        }
        return super.offer(o);
    }

创建线程池

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public static ThreadPoolExecutor newMyThreadPoolExecutor() {
    /**
     * 由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队
     * 列的 offer 方法,造成这个队列已满的假象呢?
     */
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 5, TimeUnit.SECONDS
            , new ArrayBlockingQueue<>(10) // 临时的,等下通过反射进行替换
            , new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()
            , new ThreadPoolExecutor.AbortPolicy()
    );
    //通过反射替换工作队列
    MyArrayBlockingQueue workQueue = new MyArrayBlockingQueue<>(10, threadPoolExecutor);
    try {
        Field workQueueField = ThreadPoolExecutor.class.getDeclaredField("workQueue");
        workQueueField.setAccessible(true);
        workQueueField.set(threadPoolExecutor , workQueue);
    } catch (Exception e) {
        throw new RuntimeException("替换队列失败", e);
    }
    return threadPoolExecutor;
}

⭐⭐⭐要根据任务的“轻重缓急”来指定线程池的核心 参数,包括线程数、回收策略和任务队列:

对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队 列。 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理 由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过 多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做 缓冲。

使用 Hugo 构建
主题 StackJimmy 设计

发布了 16 篇文章 | 共 31507 字