概述
常用于定时任务,如:定时关机。常用方法
int compareTo(Delayed o)
:比较大小,自动升序。比较方法建议和getDelay方法配合完成。如果任务是需要按时完成的计划任务,必须配合getDelay方法完成。long getDelay(TimeUnit unit)
:获取计划时长的方法,根据参数TimeUnit来决定,如何返回结果值。
delayQueue和workQueue
延迟队列DelayQueue是一个无界阻塞队列,它的队列元素只能在该元素的延迟已经结束或者说过期才能被出队,DelayQueue就是基于PriorityQueue实现的,DelayQueue队列实际上就是将队列元素保存到内部的一个PriorityQueue实例中的(所以也不支持插入null值)。
DelayedWorkQueue也是一种设计为定时任务的延迟队列,它的实现和DelayQueue一样,不过是将优先级队列和DelayQueue的实现过程迁移到本身方法体中,从而可以在该过程当中灵活的加入定时任务特有的方法调用。
delayQueue实现延迟任务
实现DelayQueue入参Delayed接口的getDelay方法和compareTo方法,构建延迟任务。delayQueue基于PriorityBlockingQueue实现。
案例
@Slf4j
public class DelayQueueDemo {
public static void main(String[] args) {
DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>();
new Thread(() -> {
while (true) {
try {
SanYouTask sanYouTask = sanYouTaskDelayQueue.take();
log.info("获取到延迟任务:{}", sanYouTask.getTaskContent());
} catch (Exception e) {
}
}
}).start();
log.info("提交延迟任务");
sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记5s", 5L));
sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记3s", 3L));
sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记8s", 8L));
}
}
@Getter
public class SanYouTask implements Delayed {
private final String taskContent;
private final Long triggerTime;
public SanYouTask(String taskContent, Long delayTime) {
this.taskContent = taskContent;
this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return this.triggerTime.compareTo(((SanYouTask) o).triggerTime);
}
}
queue.offer方法提交任务时会根据compareTo的实现对任务进行排序,最先需要被执行的任务放到队列头;queue.take方法获取任务的时候会拿队列头部的元素,通过getDelay返回值判断任务是否需要立刻执行,如果需要就返回任务,不需要则等待延迟时间到了返回任务。
源码分析
offer - 入队
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
上锁后执行内部PriorityQueue的offer方法,构造小顶堆。
入队后激活出队的condition
take/poll - 阻塞出队非阻塞出队
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
出队走一个无限自旋,如果没有first元素,证明已经空了,这里阻塞,如果成功拿到了first元素,这里要判断延迟,如果延迟到了,才能出队,脱离自旋
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
如果没到延迟时间,那就说明第一个元素还没到点,这里把引用解锁掉,是为了防止其他线程先拿到,结果本线程又拿一次出问题
然后判断阻塞Thread是否存在,如果不存在,设置为当前线程,然后进行条件等待,同时释放锁,直到到时间,停止阻塞,重新获取锁
这时判断阻塞线程如果是自己,清空,然后重新尝试获取头节点
应用
Timer和ScheduledThreadPoolExecutor
Timer和ScheduledThreadPoolExecutor是延迟队列的应用,例如:
@Slf4j
public class ScheduledThreadPoolExecutorDemo {
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());
log.info("提交延迟任务");
executor.schedule(() -> log.info("执行延迟任务"), 5, TimeUnit.SECONDS);
}
}
ScheduledThreadPoolExecutor在构造的时候会传入一个DelayedWorkQueue阻塞队列,所以线程池内部的阻塞队列是DelayedWorkQueue。
Timer类似封装了DelayQueue,但是是单线程,且没有对运行时异常进行处理,更推荐用scheduledThreadPoolExecutor,具体分析见ScheduledThreadPoolExecutor部分。
评论区