Future接口
Future接口定义了操作异步任务执行的一些方法,提供了一种异步并行计算的功能,例如:获取异步任务的执行结果、取消异步任务的执行、判断任务是否被取消、判断任务执行是否完毕等
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
RunnableFuture - 可执行的Future
继承了Future和Runnable两个接口,它出现的意义在于:
异步计算:在某些情况下,我们希望在后台执行某些任务,然后在需要的时候再获取结果。FutureTask 允许程序在提交任务后立即返回,并可以在将来的某个时间点获取结果。
任务取消:FutureTask 提供了取消任务的功能,你可以决定一个任务是否应该被中止。
避免回调地狱:通过 FutureTask,我们可以更清晰地编排代码,而不是使用复杂的回调机制。
FutureTask
是什么
FutureTask实现了RunnableFuture接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run()
)。
它存在的意义是给耗时任务增加一个并行执行其他任务的方案,同时能够获取结果。
根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3种状态:
未启动:还没执行
FutureTask#run
方法已启动:正在执行
FutureTask#run
方法已完成:
FutureTask#run
方法正常结束/被取消/抛出了异常
用法
提交FutureTask:
// 使用线程池执行
executorService.submit(futureTask);
// 自行执行
futureTask.run();
获取futureTask结果
futureTask.get();
多线程执行和获取,get方法会阻塞直到执行线程完成
源码分析
构造函数
public FutureTask(Callable<V> callable) {
……
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
两个构造函数,会将无论callable还是runnable类型的入参统一封装成callable来执行
run
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
首先通过CAS修改状态
Callable<V> c = callable;
……
result = c.call();
执行Callable类的call方法,拿到结果
set(result);
执行set方法处理结果
set
outcome = v;
STATE.setRelease(this, NORMAL); // final state
首先把结果v给到outcome,然后CAS修改状态
finishCompletion();
激活阻塞的等待结果的线程
finishCompletion - 激活等待线程
for (WaitNode q; (q = waiters) != null;) {
是一个自旋,当有线程等待,开启自旋
Thread t = q.thread;
……
q.thread = null;
LockSupport.unpark(t);
激活线程
get - 获取结果
s = awaitDone(false, 0L);
return report(s);
两个,方法,一个阻塞等待,一个返回结果
awaitDone - 阻塞等待
for (;;) {
// 场景1
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
也是一个自旋,如果当前状态已经完成,直接返回完成状态
场景1:当state>COMPLETING,说明已经完成,直接返回
// 场景2
else if (s == COMPLETING)
Thread.yield();
场景2:COMPLETING状态,说明已经完成了,正在设置结果,这里就不做阻塞了,先让出cpu资源,再开启下一轮循环,减少线程上下文切换
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
如果没获取到值,且没构造过node,这里先构造node,但是本轮不阻塞,再循环一次,下次阻塞,也是为了减少上下文切换
// 场景3:超时阻塞
else if (timed) {
……
}
// 场景4:直接阻塞
else
LockSupport.park(this);
场景3、4就是阻塞,再循环一圈都没获取到值,就把自己挂起,等待执行线程调用finishCompletion方法唤醒自己
CompletableFuture
CompletableFuture不仅实现了Future相关接口,还实现了CompletionStage接口,是对Future的非常强大的扩展,提供函数式编程的能力。
CompletableFuture可以代表一个明确完成的 Future,也可以代表一个完成阶段 CompletionStage
ComletionStage
提供了许多函数式编程相关的能力
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
……
ComletableFuture源码分析
构造
四个静态方法基本可以分成两组,一组是无返回值的runAsync
,一组是有返回值的supplyAsync
// 无返回值的runAsync
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值的supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
评论区