目 录CONTENT

文章目录

线程池

FatFish1
2024-10-28 / 0 评论 / 0 点赞 / 72 阅读 / 0 字 / 正在检测是否收录...

线程池基础知识

什么是线程池

线程池是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后再创建线程完成这些任务。

线程:java中的thread

任务:实现了runnable或callable接口的实例对象

线程池的优势

  • 线程和任务分离,提升线程重用性

  • 控制线程并发数量,降低服务器压力

  • 节约了每次创建和销毁线程的时间,提升响应速度

线程池应用场景

云盘上传下载、网购商品秒杀等

线程池的5种状态

RUNNING、STOP、SHUTDOWN、TIDYING、TERMINATED

线程池的工作流程

提交任务,如果任务数没达到核心线程数,执行任务,如果达到了,进任务队列等待,如果任务队列也满了,创建非核心线程,如果已道达到了最大线程数,执行饱和策略

线程池源码分析

核心属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

原子类型ctl代表线程池状态

从ctl到CAPACITY

ctl在线程池中用于记录线程池生命周期状态和当前工作线程数,在线程池运行流程中,其生命周期状态和当前工作线程数也是具有原子性的,为了实现这个原子性要求,ctl使用一个32位二进制(整型的最大位数)进行分割,两部分分别表示两个信息

那么分割值是几?答案是3,因为线程池有5种状态,3个二进制位能表示5种状态。因此线程池固化了两个常量

// 分割位数:29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 把分割位数进行左移,用于后续所有拆装箱的函数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

CAPACITY是1 << 29 - 1,<<的作用是左移,1 << 29即1后面加29个0,再减1即29个1,总位数32位,即前三位是0,后29位是1CAPACITY构造成这样,并不是具有什么业务意义,它只是一个用于计算ctl的工具,用于后面ctl的运算更加方便:即某个数与CAPACITY做与运算,一定能取得它的后29位,与CAPACITY的反码做与运算,一定能取得它的前3位

打包与拆包

//拆包函数
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
//打包函数
private static int ctlOf(int rs, int wc) { return rs | wc; }

根据CAPACITY的作用,就可以知道,当有一个任意值c,想获得其中的state值(即前3位),直接与~CAPACITY(反码)做与运算即可,想获得其中的工作先参数(即后29位),直接与CAPACITY做与运算即可

而打包函数ctlOf就更好理解了,直接把代表runstate的rs和工作线程数的wc两个数做或操作,因为他们两个分别代表的不同位数,或操作绝对不会冲突

运行状态及判断函数

private static final int RUNNING    = -1 << COUNT_BITS;  111
private static final int SHUTDOWN   =  0 << COUNT_BITS;  000
private static final int STOP       =  1 << COUNT_BITS;  001
private static final int TIDYING    =  2 << COUNT_BITS;  010
private static final int TERMINATED =  3 << COUNT_BITS;  011
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

线程池中将五种运行状态分别取-1、0、1、2、3,都做了左移29位的操作,五种状态判断,后29位均为0,判断时就可以直接使用符号,因为高位比较,跟低位就没啥关系了

线程数量修改

private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

线程数量的修改直接使用CAS操作,在当前数值基础上加减即可,因为线程数量占据的是低29位,加减甚至不影响当前线程状态

线程池状态修改

ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))

线程池状态是直接新算,结果替换老值。首先获取c中的线程数量,然后调用ctlOf方法拼接目标状态,得到一个最新的32位二进制,直接通过CAS操作替换进去

构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心线程数量,正常情况下创建出来的线程数量。一般核心线程数量通过时间规划计算,例如1个线程处理一个任务0.1s,想要1s内处理100个任务,则需要10个核心线程。通常可以基于8020原则,即通常80%的情况下,每秒产生多少个任务,计算得到核心线程,另外的就给非核心

  • maximumPoolSize:最大线程数量,允许创建的线程数量上限。最大线程数一般为(最大任务数-任务队列长度)*单个任务执行时间

  • keepAliveTime:最大空闲时间,无用线程的最长存活时间,可以通过allowCoreThreadTimeout属性修改是否允许核心线程被销毁,参考系统运行环境和硬件压力,合理设置即可

  • unit:时间单位,枚举常量,时分秒之类的

  • workQueue:任务队列,一个集合,如果线程已经达到核心线程数量,如果有新的任务进来,会被添加到任务队列中,如果任务队列已经满了,才会在最大线程数的范围内创建新线程

    • 任务队列长度一般为核心线程数/单个任务执行时间*2

  • threadFactory:线程工厂,允许自己通过独立代码创建线程,重写newThread方法

    • 常用的模板有谷歌的new ThreadFactoryBuilder()

  • handler:饱和处理机制:实现RejectedExecutionHandler接口,重写rejectedExecution方法可以自定义超过任务超过队列长度后的饱和机制

execute与submit

对应Runnable和Callable接口,线程池也提供了分别执行两种可执行任务的方法:

  • execute()方法是在Executor接口中定定义的,只能提交runnable类型的任务。

  • submit()方法定义在ExecutorService接口中,而ExecutorService接口继承了Executor接口,入参可以是runnable的task也可以是callable的task,均调用了newTaskFor方法,传入task

submit

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

先看submit,通过newTaskFor(task)方法把传入的Callable对象封装成FutureTask对象,实现了执行任务和结果分离,然后执行的还是execute方法,再返回FutureTask对象(已经封装好了执行结果)。FutureTask实现执行和结果分离的原理可以参考FutureTask部分。

从这里可以看出来,线程池执行的核心逻辑在execute()方法

execute

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

从这部分可以看出,状态是用ctl封装的,是一个AtomicInteger。

  • 第一个if是判断判断当前工作线程数与核心线程数的值,小于的话,直接使用addWorker添加线程执行任务,否则走第二个if

  • 第二个if判断线程池是否处于running状态,线程池running,就要往workQueue里面放,这里调用的是非阻塞的offer方法,因此提交任务默认是不阻塞的,如果想实现提交任务阻塞,必须自行实现队列,重写offer方法改为阻塞方法。

  • 最后一个else-if是如果队列都塞不进去了,先尝试增加非核心线程,如果非核心线程也加不了了,直接执行reject方法

其中用了ctl,ctl 本质就是一个 int 类型数值,默认值为0,共有32位,表示两个状态:表示线程池当前状态(高三位),标识线程池当前的工作线程个数(低29位),即线程池的5状态。

从execute方法中可以看出,当一个任务被执行时,线程池的工作流程是:

  • 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)

  • 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue

  • 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)

  • 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法

addWorker - 添加线程执行方法

根据addWorker的方法体,可以大致将方法分成两部分,第一部分是通过retry标签包裹的两重循环,这一部分是真正添加执行线程数量的,另一部分是retry后面的内容,这一部分是为了真正启动执行任务。

先看第一部分:

retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

第一层循环体是通过ctl状态值判断是否可以添加worker,如果添加不了直接返回fale了。否则进入第二层循环体。这层的if判断条件是:

  • ctl的状态部分已经超过SHUTDOWN(即SHUTDOW、STOP、TIDYING、TERMINATED)

  • 排除场景:(是SHUTDOWN且没有传入的任务且任务队列非空)

第二层循环体是在判断线程池状态是可以接受新任务的前提下,判断ctl的workCount部分,有以下场景:

  • 如果workCount已经触顶,或是打算向核心线程池添加线程但核心线程池已满时,retrun false

  • 否则就可以使用CAS操作增加一个位数,即增加一个Worker数,

    • 成功增加就跳出retry部分。

    • 没有成功增加,判断状态是否和传入时一致,不一致则继续循环判断状态。

再看第二部分:

先根据传入的可执行任务构建Worker,并拿到对应的执行线程。worker的概念见前

    w = new Worker(firstTask);
    final Thread t = w.thread;

确认可以加线程执行任务时,先加锁防止同一时间线程池被ShutDown,枷锁后要重新拿ctl并判断状态。

    mainLock.lock();
    try {
        int rs = runStateOf(ctl.get());

条件符合,添加worker实际执行的是下面这部分。

    workers.add(w);
    int s = workers.size();
    if (s > largestPoolSize)
        largestPoolSize = s;
    workerAdded = true;

添加成功才执行start方法启动任务。这里有个关键点:

复习线程部分,t.start()执行时调用的native方法start0,然后调用到自己实现的run方法,由run方法调用Thread中的可执行对象target.run()方法。

这里的thread中的target是Worker对象,因此调用到Worker#run方法,进而调用到runWorker方法

   if (workerAdded) {
       t.start();
       workerStarted = true;
   }

不成功才执行补救方法

   if (! workerStarted)
        //移除workers中的工作线程
        //工作线程数-1
        //尝试修改线程池状态为Tidying
        addWorkerFailed(w);

参数2 boolean core在addWorker方法中扮演什么角色?

可以看到这一部分代码:

    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;

实际在线程池中是没有一个成员变量标志谁是核心线程池谁是非核心线程池的。而是在addWorker方法中用一个if做判断。如果core是True即添加核心线程,是将目前的Worker数量与核心线程池大小进行比较,反之与最大线程池大小进行比较。

这里可以映射execute方法,如果比的是核心线程池,且返回false,这里addWorker必然会失败,那么execute的流程就会走到入队的流程,但在后面启动非核心线程就会成功

addWorkerFailed - 补救方法

如果workerAdded是true,就意味着work没被加入到works中,work中的thread也没有执行start方法,就需要进行补救。

先对mainLock进行加锁,防止操作workers并发。获取锁后分表执行移除操作、计数操作、状态转换操作。

    if (w != null)
        workers.remove(w);
    decrementWorkerCount();
    tryTerminate();

tryTerminate方法完成状态转换。

tryTerminate - 转换terminate状态

tryTerminate内部是一个死循环,首先判断状态,下面是跳出循环的情况:

  • 线程池处于RUNNING状态

  • 线程池状态处于TIDYING状态

  • 线程池状态处于SHUTDOWN状态并且队列不为空

    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;

如果不满足上述的情况,那么目前状态属于SHUTDOWN且队列为空,或者状态属于STOP,那么调用interruptIdleWorkers方法停止一个Worker线程,然后退出。

    if (workerCountOf(c) != 0) { // Eligible to terminate
        interruptIdleWorkers(ONLY_ONE);
        return;
    }

接下来如果没有退出循环的话,那么就首先将状态设置成TIDYING,然后调用terminated方法,最后设置状态为TERMINATED。terminated方法是个空实现,用于当线程池终结时处理一些事情。

    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
        try {
            terminated();
        } finally {
            ctl.set(ctlOf(TERMINATED, 0));
            termination.signalAll();
        }

最后释放锁

reject - 拒绝方法

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

这里就是直接调了创建线程池时自己实现的handler中的reject策略。

abortPolicy

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }

直接把异常抛出去了,再看e的toString方法

    return super.toString() +
        "[" + rs +
        ", pool size = " + nworkers +
        ", active threads = " + nactive +
        ", queued tasks = " + workQueue.size() +
        ", completed tasks = " + ncompleted +
        "]";

这一部分active threads数量是返回的nactive的值。向上追溯其赋值点:

    for (Worker w : workers) {
        ncompleted += w.completedTasks;
        if (w.isLocked())
            ++nactive;
}

可以看到是加锁的才会被赋值。这里就说明了active数量和corePoolSize可能不一样。

判断是否加锁的方法用的是

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

这里state值是0意味着解锁,1意味着加锁

内部类Worker

worker是线程池执行任务的执行器,继承AQS,实现Runnable

  • 继承AQS是为了提供其在捞任务的时候的同步控制能力

  • 实现Runnable是为了让它具备执行能力

核心属性

final Thread thread;
Runnable firstTask;
volatile long completedTasks;

可以看到worker是在thread和runnable的基础上又封装了一层,它是任务执行工人的一个抽象。

还记得在Thread部分研究过为什么要在thread里面封装Runnable对象,是因为实现线程和任务的解耦。在这里增加一个worker也是同样的效果。worker、thread、task三级解耦,实现复用

构造函数

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

run

实现Runnable,因此实现了run方法,用于thread.start执行时调用

因为Worker构造方法中thread属性传的是自己,所以在线程池addWorker方法中调用

final Thread t = w.thread;
t.start();

实际上执行的是worker自己。

因为worker同时是Runnable可执行对象,因此执行start方法实际执行的是thread中run方法,进而执行target实现的run方法,即最终调用的还是Worker的run方法。

    public void run() {
        runWorker(this);
    }

最后调到runWoker方法中

runWoker

方法开始先把自己的firstTask取出来并清空了

    Runnable task = w.firstTask;
    w.firstTask = null;

然后执行了unlock操作:

    w.unlock(); // allow interrupts
    ……
    public void unlock()      { release(1); }
    ……
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
}

这里的unlock操作实际调用到了AQS的release操作,而AQS的释放操作最终释放的执行实际是tryRelease在锁中自己实现的,因此还是在Worker中。这里tryRelease方法比较简单,就是把持锁线程设置为null,然后状态修改为0。

这里执行unlock的目的是什么?

    while (task != null || (task = getTask()) != null) {
        w.lock();

回想一个问题:异常处理中toString方法nactive取的是workers中AQS非0的Worker的数量,为什么不会有-1被取到呢,概率应该是非常非常小的。

首先在addWorker中Worker构造出来是-1状态,这时还没有加入workers, toString方法统计nactive时统计不到;接着mainLock加锁,toString方法就势必不会再拿到锁了,因此也不会再执行统计操作;在持锁期间,worker加入workers,然后释放锁,立即执行了start方法,可见worker处于-1状态的大部分时间都在持锁时间段内,被取到-1状态的可能性应该是不会太大的

然后这里捞任务,优先从firstTask中取,即execute提交的,如果firstTask没有通过getTask方法从队列中取的,这时候就是从队列里面捞取入队最久的了这里可以回看execute方法,addWorker时核心线程池参数1传的是task,非核心线程池先入队,然后addWorker时传的是null。捞到任务后就可以加锁了。lock方法同样也是实现自AQS。

此外,还需要注意:捞任务这里是while循环,只要能捞到任务,这个worker就会一直工作下去,直到捞不到任务,才让这个worker走向消亡,从而实现了线程池中线程的复用

加锁成功后判断线程池状态,如果线程池停止了,那么中断线程。

    if ((runStateAtLeast(ctl.get(), STOP) ||
         (Thread.interrupted() &&
          runStateAtLeast(ctl.get(), STOP))) &&
        !wt.isInterrupted())
        wt.interrupt();

最后在执行task.run操作前后做了两个可供扩展的点:

    beforeExecute(wt, task);
    task.run();
    afterExecute(task, thrown);

上面的while是一个死循环,只有当firstTask为null且getTask()为null时才会跳出循环。跳出循环后走finally这部分

    completedAbruptly = false;
} finally {
    processWorkerExit(w, completedAbruptly);
}

先看跳出循环的结果:worker线程执行完毕,等待回收。

再看跳出循环的条件:没有新任务,且队列中也没有任务。

因此看下队列是如何判断的。

getTask - 从队列中取任务

getTask的作用是从队列中捞取没处理完的任务。当有任务积压,Worker是不会释放的,反之,线程池会清理掉多余的Worker。

可以发现getTask中也是一个死循环,即捞的时候要么捞不到继续捞,直到确认线程池状态或队列为空,要么捞到返回。

    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

首先判断线程池状态,当出现以下情况:

  • 线程池状态走入shutdown状态以上

  • 且线程池状态走入STOP以上,或队列为空

这里回想线程池状态,SHUTDOWN为线程池停止但还继续处理队列中的任务,STOP为停止且不处理任务,比它俩值大的只有TYDING和TERMINATED状态,即线程池确认已经停止了。这个时候,走Worker数量减1,且return null,对应在runWorker中执行Worker清理。

    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
        if (compareAndDecrementWorkerCount(c))
            return null;
        continue;
    }

第二个判断是判断核心线程池数量:

  • allowCoreThreadTimeOut属性表示核心线程池是否应用超时,默认为False(核心线程池不清理保持alive),但如果配置核心线程池超时,即这里为True

  • 或Worker数量已经超过核心线程池数量了

这个情况下判断上面的T、F,与Worker数量大于最大线程池数量,任意成立一个,且队列为空或Worker数量大于1,即触发Worker清理。但是这里清理之前还做了一步判断,能削减,才返回null清理,不能,继续捞任务。

这里也可以映射前面的execute方法和addWorker方法中涉及核心线程池的部分,可以看到线程池中没有谁一定是核心线程,而是通过一个大小和if比较,将能回收的先回收掉,剩下到阈值的就没有必要回收了

当上面两步判断都走完,才真正是在从队列中拉取任务。拉到则返回任务执行。

try {
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();
    if (r != null)
        return r;
    timedOut = true;
} catch (InterruptedException retry) {
    timedOut = false;
}

这里队列的take方法底层也涉及到了AQS的condition流程,这里以LinkedBlockingQueue为例。

0

评论区