以 Tomcat 为例分析 Java 中的线程池

news/2025/2/25 5:59:48

以 Tomcat 为例分析 Java 中的线程池

首先,为什么会有“池”的概念?

我们的项目在运行过程中,需要使用系统资源(CPU、内存、网络、磁盘等)来完成信息的处理,比如在 JVM 中新建对象就需要消耗 CPU 和内存资源,当需要频繁创建大量的对象,并且这些对象的存活时间短,就意味着需要进行频繁销毁,那么很有可能这部分代码会成为性能的瓶颈。

而“池”就是用来解决这个问题的,简单来说,对象池就是把用过的对象保存起来,等下一次需要这种对象的时候,直接从对象池中拿出来重复使用,避免频繁地创建和销毁。

Java 线程池

ThreadPoolExecutor

看下 java.util.concurrent.ThreadPoolExecutor 中的构造方法

java">    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

每次提交任务时,如果线程数还没达到核心线程数 corePoolSize线程池就创建新线程来执行。当线程数达到**corePoolSize **后,新增的任务就放到工作队列 workQueue 里,而线程池中的线程则努力地从 workQueue 里拉活来干,也就是调用 poll 方法来获取任务。

如果任务很多,并且 workQueue 是个有界队列,队列可能会满,此时线程池就会紧急创建新的临时线程来救场,如果总的线程数达到了最大线程数 maximumPoolSize,则不能再创建新的临时线程了,转而执行拒绝策略 handler,比如抛出异常或者由调用者线程来执行任务等。

如果高峰过去了,线程池比较闲了怎么办?临时线程使用 poll(keepAliveTime, unit)方法从工作队列中拉活干,请注意 poll 方法设置了超时时间,如果超时了仍然两手空空没拉到活,表明它太闲了,这个线程会被销毁回收。

那还有一个参数 threadFactory 是用来做什么的呢?通过它你可以扩展原生的线程工厂,比如给创建出来的线程取个有意义的名字。

注意这些默认策略是可以修改的:

  • 声明线程池后立即调用 prestartAllCoreThreads 方法,来启动所有核心线程;
  • 传入 true 给 allowCoreThreadTimeOut 方法,来让线程池在空闲的时候同样回收核心线程。

FixedThreadPool/CachedThreadPool

Java 提供了一些默认的线程池实现,比如 FixedThreadPool 和 CachedThreadPool,它们的本质就是给 ThreadPoolExecutor 设置了不同的参数,是定制版的 ThreadPoolExecutor。

java">public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>());
}
 
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

对比一下我们可以发现,线程池的两个关键点就是:

  • 是否限制线程个数。
  • 是否限制队列长度。

FixedThreadPool 的核心线程数就是最大线程数,当忙不过来时 task 会被丢到 LinkedBlockingQueue 队列中,注意:这是一个无界队列,也就是在任务量足够大时会触发 OOM。

CachedThreadPool 的核心线程数为 0,最大线程数是 Integer 的最大值,因此它对线程个数不做限制,忙不过来时无限创建临时线程,闲下来时再回收。它的任务队列是 SynchronousQueue,表明队列长度为 0。在任务量足够大时会触发 OOM,因为资源是有限的,无法一直创建新线程。

因此,不建议使用 Executors 提供的两种快捷的线程池,原因如下:

  • 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。
  • 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。

拒绝策略

拒绝策略特点适用场景
AbortPolicy默认策略,抛出 RejectedExecutionException 异常。不允许丢失任务,任务必须立即处理的场景。
CallerRunsPolicy任务由调用者线程执行,降低并发度。调用者线程可以处理任务,适合减少任务提交速度的场景。
DiscardPolicy丢弃无法执行的任务,不抛出异常。可以容忍任务丢失,适合不重要的任务丢弃。
DiscardOldestPolicy丢弃队列中最旧的任务,加入新任务。适合丢弃最旧任务,保持队列中的新任务。

其中,注意 CallerRunsPolicy,在实际业务开发中,可能会导致 tomcat 的工作线程来进行业务的处理,进一步降低系统并发度。

Tomcat 线程池

Java 线程池是先用工作队列来存放来不及处理的任务,满了之后再扩容线程池。当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。那么,我们有没有办法让线程池更激进一点,优先开启更多的线程,而把队列当成一个后备方案呢?

有的兄弟,有的

按照正常的线程池流程,在任务到来时,如果队列已满,就会创建新的非核心线程,那么可以重写队列的 offer 方法,造成队列已满的假象,在线程数达到最大线程数时,执行拒绝策略的时候,把任务尝试加入队列,如果这时队列真的满了,再按照拒绝策略处理。

ThreadPoolExecutor

java">    public void execute(Runnable command, long timeout, TimeUnit unit) {
        // 计数器 + 1,维护提交到了线程池但是还没执行完成的任务数量
        submittedCount.incrementAndGet();
        try {
            // 尝试进行处理
            executeInternal(command);
        } catch (RejectedExecutionException rx) {
            if (getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue) getQueue();
                try {
                    // 继续尝试把任务放到任务队列中去
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                // 计数器 - 1,抛出异常
                submittedCount.decrementAndGet();
                throw rx;
            }
        }
    }

    private void executeInternal(Runnable command) {
        int c = ctl.get();
        
        // 线程数 < 核心线程数,创建一个核心线程来接收
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
            c = ctl.get();
        }
        
        // 线程数 >= 核心线程数,尝试让队列接收
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) { // 有可能此时有线程死亡了,再次检查是否需要添加线程
                addWorker(null, false);
            }
        } else if (!addWorker(command, false)) { // 队列接收可能返回 false,尝试创建非核心线程来处理
            reject(command);
        }
    }

TaskQuene,注意继承了 LinkedBlockingQueue 无界队列,如果不重写线程池执行方法,新请求只会放入队列,直到 OOM。

java">    // TaskQueue extends LinkedBlockingQueue<Runnable>
	// 进入此方法的前提是 当前线程数已经达到了核心线程数
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent == null) {
            return super.offer(o);
        }

        // we are maxed out on threads, simply queue the object
        // 线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
            return super.offer(o);
        }

        // we have idle threads, just add it to the queue
        // 已提交的任务数 <= 当前线程数,表示还有空闲线程,无需创建新线程
        // AtomicInteger submittedCount 维护已经提交到了线程池,但是还没有执行完的任务个数
        if (parent.getSubmittedCount() <= (parent.getPoolSize())) {
            return super.offer(o);
        }

        // if we have less threads than maximum force creation of a new thread
        // 已提交的任务数 > 当前线程数,并且当前线程数 < 最大线程数,返回 false 创建新线程
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) {
            return false;
        }

        // if we reached here, we need to add it to the queue
        // 其他情况下添加到队列中
        return super.offer(o);
    }

可以看到,在 24 行如果 当前线程数 > 核心线程数,且 < 最大线程数,会优先创建新的非核心线程,而不是优先使用队列。原因是队列继承了无界队列,如果先放入队列会导致最大线程数失效,定制版的任务队列,重写了 offer 方法,使得在任务队列长度无限制的情况下,线程池仍然有机会创建新的线程。

Tomcat 的线程池与 Java 原生线程池的最大区别是:在线程数达到最大线程数后,继续尝试把任务添加到任务队列中去,如果这时候插入失败,再真正执行拒绝策略

最佳实践

要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列,既然使用了线程池就需要确保线程池是在复用的,每次 new 一个线程池出来可能比不用线程池还糟糕。复用线程池不代表应用程序始终使用同一个线程池,我们应该根据任务的性质来选用不同的线程池。特别注意 IO 绑定的任务和 CPU 绑定的任务对于线程池属性的偏好,如果希望减少任务间的相互干扰,考虑按需使用隔离的线程池

  • 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队列。
  • 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。

http://www.niftyadmin.cn/n/5865068.html

相关文章

docker 一键部署wvp+zlm

拉取容器 docker pull 648540858/wvp_pro启动容器 docker run --env WVP_IP"自己电脑的ip" -it -p 18080:18080 -p 30000-30500:30000-30500/udp -p 30000-30500:30000-30500/tcp -p 80:80 -p 5060:5060 -p 5060:5060/udp 648540858/wvp_pro3.浏览器访问测试摄像头…

【Docker】如何在Linux、Windows、MacOS中安装Docker

Linux安装Docker 在终端中执行一键安装脚本命令安装docker sudo curl -fsSL https://gitee.com/tech-shrimp/docker_installer/releases/download/latest/linux.sh | bash -s docker --mirror Aliyun1.1 配置docker镜像源 在终端执行 一行命令&#xff0c;编辑配置文件 sudo …

图数据库Neo4j面试内容整理-索引(Index)

索引(Index) 是数据库中用来提高查询性能的技术,特别是在处理大量数据时,索引能够大大加速查询操作。在 Neo4j 这样的图数据库中,索引也起着非常重要的作用,尤其是在图中查找节点时,使用索引可以避免全图扫描,从而提高查询效率。 1. Neo4j 中的索引概念

相似性搜索(2)

在本篇中&#xff0c;我们通过播客相似性搜索为例&#xff0c;进一步研究基于chroma 的相似性搜索&#xff1a; 参考&#xff1a; https://www.kaggle.com/code/switkowski/building-a-podcast-recommendation-engine/notebook 数据集来源&#xff1a; https://www.kaggle.…

【JavaEE】SpringMVC 请求传参

目录 一、请求二、传递单个参数三、传递多个参数四、传递对象五、RequestParam注解 后端参数重命名&#xff08;后端参数映射&#xff09;六、传递数组七、传递集合&#xff0c;RequestParam八、传递JSON数据8.1 JSON字符串和Java对象互转8.1.1 Test注解8.1.2 Java对象转JSON8.…

opencv交叉编译报错:undefined reference to `png_riffle_palette_neon

序偶NEON 概述 NEON&#xff08;Nested Enhanced Vector Instruction Set&#xff09;是 ARM 架构中的一种高级 SIMD&#xff08;Single Instruction, Multiple Data&#xff0c;单指令多数据&#xff09;扩展技术。它专为加速多媒体和信号处理任务而设计&#xff0c;允许在单…

【一起学Rust | 框架篇 | Tauri2.0框架】在Tauri应用中设置Http头(Headers)

文章目录 前言一、配置准备1. 检查版本2. 使用条件3. 支持的请求头&#xff08;并不是全部支持&#xff09; 二、使用步骤1. 如何配置header2. 框架集成1. 对于Vite系列、Nuxt、Next.js这种前端框架Vite系列框架Angular系列框架Nuxt系列框架Next.js系列框架 2. 对于Yew和Leptos…

Quickwit获取Kafka数据源消息

介绍 Quickwit可以将数据从一个或多个源插入到索引中。创建索引后&#xff0c;可以使用CLI 命令quickwit source create添加源&#xff0c;支持的源有&#xff1a;file、kafka、kinesis、pulsar。 本章讲解如何从Quickwit搜索引擎中创建Kafka源和获取Kafka源主题数据流&#…