目 录CONTENT

文章目录

Eexcutors框架 - JDK提供的预设线程池

FatFish1
2024-10-28 / 0 评论 / 0 点赞 / 57 阅读 / 0 字 / 正在检测是否收录...

FixedThreadPool - 定长线程池

固定长度线程池,是一个corePoolSize=maxPoolSize的线程池,这就意味着当核心线程池满,新任务就开始向阻塞队列中提交,不会继续扩展线程池线程数

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

可用看到参数1 corePoolSize和参数2 maximumPoolSize一致

同时参数3 keepAliveTime取的0,因为不取0也用不上,因为线程池线程数削减在getTask方法中,判断当前线程数大于核心线程数且空闲,就做削减

参数5使用的是无界队列LinkedBlockingQueue根据execute方法,创建线程的顺序是核心线程不满创建核心线程,核心线程满存队列,队列满且最大线程不满创建非核心线程,在使用无界队列的情况下,队列永远不满,因此maximumPoolSize也无效

因此可知定长线程池的特征:

  • corePoolSize=maximumPoolSize, keepAliveTime无效,则定长线程池不需要做线程的剔除

  • 使用无界队列,maximumPoolSize无效,因此未运行shutdown或shutdownNow的池子不会拒绝任务

SingleThreadExecutor - 单线程线程池

使用1作为corePoolSize的定长线程池,因此它具有全部定长线程池的特征

CachedThreadPool - 可缓存线程池

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

可缓存线程池内部使用了同步转移队列SynchronousQueue,因此它不会存元素,但是线程的execute方法要求如果队列塞不进去,且当前线程数大于核心线程数小于最大线程数,就构造新线程

观察CachedThreadPool,核心是0,但它的maximumPoolSize取的是最大值,因此来一个新任务就新增一条线程

ScheduledThreadPoolExecutor - 计划任务线程池

ScheduledThreadPool继承自Executor,父类是ThreadPoolExecutor,和其他线程池最大的区别是使用的阻塞队列是 DelayedWorkQueue,而且多了两个定时执行的方法scheduleAtFixedRate和scheduleWithFixedDelay,提供了定时任务的能力

线程池执行方法

schedule方法

  • 入参可选runnable类型和callable类型的任务,long类型的延迟执行时间,时间单位。

  • schedule方法只支持延迟执行,可以用future接收返回值

scheduleAtFixedRate方法-周期执行任务

参数

  • command:执行线程

  • initialDelay:初始化延时

  • period:两次开始执行最小间隔时间

  • unit:计时单位

返回值

scheduleAtFixedRate()方法返回的ScheduledFuture对象无法获得返回值,也就是scheduleAtFixedRate()方法不具有获得返回值的功能,而schedule()方法却可以获得返回值。所以当使用scheduleAtFixedRate()方法实现重复运行任务的效果时,需要结合自定义的Runnable接口的实现类,不要使用FutureTask类,因为FutureTask类并不能实现重复运行的效果

scheduleWithFixedDelay方法-周期执行任务

  • command:执行线程

  • initialDelay:初始化延时

  • period:两次开始执行最小间隔时间

  • unit:计时单位

该方法与scheduleAtFixedRate的区别是没有超时与非超时的情况

局限

计划任务线程池scheduleAtFixedRate跑不起来callable类型的任务,如果还想做一下结果的监控,可以跑runnable方法,例如改数据库,如果成功,状态改1,不成功改0,其中通过调用带返回值的excute方法,if判断返回值是否符合预期,如果不符合,改0,符合改1

源码

schedule - 定时执行

RunnableScheduledFuture<V> t = decorateTask(…

把执行任务包装成自己的RunnableFuture实现RunnableScheduledFuture

delayedExecute(t);

核心在这里

delayedExecute - 任务执行框架

super.getQueue().add(task);
……
ensurePrestart();

入队,然后判断状态,最后调用执行方法,ensurePrestart里面调的addworker方法,根据线程数量判断加核心还是非核心,然后在里面调用thread.start()方法再通过native方法调用到Worker重写的run方法,里面有getTask取任务的流程,这里又到了workQueue.poll,ScheduledThreadPoolExecutor重写的是delayQueue,从而在poll方法中实现了延迟任务选取。这里实现的是延迟

delayQueue可以获取任务,就可以继续执行runnable.run方法,即schedule方法中封装的ScheduledFutureTask

ScheduledFutureTask#run

内部类,是RunnableScheduledFuture的实现

else if (!isPeriodic())
    super.run();
else if (super.runAndReset()) {
    setNextRunTime();
    reExecutePeriodic(outerTask);

如果非周期性任务,直接调用父类FutureTask.run,如果是周期任务,先执行runAndReset方法,执行并重置状态,然后修改下一次执行时间,最后调用reExecutePeriodic方法,再次调用ensurePrestart方法,实现循环执行

scheduleAtFixedRate - 周期任务

与schedule相比,多了一个period入参,构造的ScheduledFutureTask是period的

Timer - 下位替代

Timer是简单定时任务线程,目前已被scheduledThreadPoolExecutor替代,不推荐使用。原因有二:

  • Timer的任务都会维护在一个queue里面,串行捞取执行,前一个任务执行超时会影响下一个任务执行

  • Timer捞任务是一个线程中循环,前一个任务抛异常,后续任务可能全都执行不了

可以通过Timer的源码分析下:

核心成员变量

// 一个Timer会维护一个queue和一个thread,用于循环这一个任务不断执行
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);

构造方法

public Timer(String name) {
    thread.setName(name);
    thread.start();
}

可以看出,当一个Timer创建出来的时候,就已经启动了线程的start方法,就会直接调用到task的run方法,而不是向一般线程池一样需要去通过execute启动

run

try {
    mainLoop();

run方法中就是调用了mainLoop,这个是Timer的核心方法

schedule - 不做启动只做任务投入

sched(task, System.currentTimeMillis()+delay, -period);

通过调用sched方法投入任务

sched

synchronized(queue) {
    ……
queue.add(task);
if (queue.getMin() == task)
        queue.notify();

加锁不提,这里做了两个操作,一个是向队列中添加任务,一个是判定队列中是否有task,有就发唤醒信号,这里唤醒的是mainLoop循环中的线程

mainLoop - 核心执行线程

while (queue.isEmpty() && newTasksMayBeScheduled)
    queue.wait();

可以看到是一个循环

如果队列里面没有东西执行就让线程自己WAITING

synchronized(task.lock) {
    ……
    queue.rescheduleMin(
  task.period<0 ? currentTime   - task.period
                : executionTime + task.period);

取到task加锁之后,执行前先做任务重新存入,以便下一次执行

task.run();

最后再执行这个task

0

评论区