关于我们

质量为本、客户为根、勇于拼搏、务实创新

< 返回新闻公共列表

Future

发布时间:2023-06-26 14:00:15

Future解析

  • Future是什么?
    • 类如如下:
    • Future提供了任务检索、任务取消的高层规范
    • 表示异步计算的结果
    • Future的Get需要产生结果才能获取;否则会阻塞
  • FutureTask结构图
    • 如下:
    • FutureTask是基于Future的基本实现;
    • FutureTask提供了启动和取消计算、查询计算是否完成以及检索计算结果的方法
    • FutureTask内部支持Callable、Runnable
    • FutureTask实现了Runnable,可以交给Executor执行
  • FutureTask的内部状态
    • NEW:构造方法赋予的初始状态
    • COMPLETING:Future计算完成的状态
    • NORMAL:Future计算完成并且正确给出结果的状态
    • EXCEPTIONAL:Future计算完成但是抛出异常的状态
    • CANCELLED:Future中止线程的状态;未中止正在运行的线程
    • INTERRUPTING:中间状态
    • INTERRUPTED:中止正在运行的线程;强制中断正在运行的线程
      • Future的内部状态转换序列
        • NEW -> COMPLETING -> NORMAL(正常执行)
        • NEW -> COMPLETING -> EXCEPTIONAL(执行时发生异常)
        • NEW -> CANCELLED(中断任务,但是不中断正在执行的线程)
        • NEW -> INTERRUPTING -> INTERRUPTED(中断任务,且中断正在执行的任务)

FutureTask源码分析

• FutureTask#run() • 执行Thread  public void run() {  //通过CAS获取runner属性并且设置为当前线程的ThreadID;  //保证在计算出结果之前,不会重复计算;  if (state != NEW ||  !UNSAFE.compareAndSwapObject(this, runnerOffset,  null, Thread.currentThread()))  return;  try {  //callable在初始化的时候传递进来的  Callablec = callable;  if (c != null && state == NEW) {  V result;  boolean ran;  try {  //执行callable的结果通过result字段保存  result = c.call();  ran = true;  } catch (Throwable ex) {  result = null;  ran = false;  //保存callable发生错误的情况  setException(ex);  }  if (ran)  set(result);  }  } finally {  // runner must be non-null until state is settled to  // prevent concurrent calls to run()  //初始化自身状态;在CAS时需要使用runnerOffset;  //只有runner为null,才会执行任务  runner = null;  // state must be re-read after nulling runner to prevent  // leaked interrupts  int s = state;  if (s >= INTERRUPTING)  //在后面  handlePossibleCancellationInterrupt(s);  }  }

   

  • FutureTask#setException() and FutureTask#set()
    • 保存Thread执行结果
//Future执行发生异常时  //此时的状态流为NEW -> COMPLETING -> EXCEPTIONAL  protected void setException(Throwable t) {  //CAS修改状态;保证计算结果的正确性;并且修改状态为COMPLETING  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  outcome = t;  //修改状态为EXCEPTIONAL  UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state  finishCompletion();  }  }  //Future正常执行  //此时的状态流为NEW -> COMPLETING -> NORMAL  protected void set(V v) {  //CAS修改状态;  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  outcome = v;  //修改状态为NORMAL  UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state  finishCompletion();  }  }

   

  • FutureTask#finishCompletion()
    • 清除Get操纵导致阻塞的队列;重置Callable
private void finishCompletion() {  // assert state > COMPLETING;  for (WaitNode q; (q = waiters) != null;) {  //清除等待队列  if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {  for (;;) {  Thread t = q.thread;  if (t != null) {  q.thread = null;  //唤醒等待的线程  LockSupport.unpark(t);  }  WaitNode next = q.next;  if (next == null)  break;  q.next = null; // unlink to help gc  q = next;  }  break;  }  }  //执行done  done();  //清除callable  callable = null; // to reduce footprint  }

   

  • FutureTask#get()
    • 返回结果、或者抛出异常    
//不包含超时的Get  public V get() throws InterruptedException, ExecutionException {  int s = state;  //如果当前Thread的state没有转化为完成状态  if (s <= COMPLETING)  //则等待;  s = awaitDone(false, 0L);  //返回执行的结果或者异常信息 return report(s);  }  //包含超时时间的Get  public V get(long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException {  if (unit == null)  throw new NullPointerException();  int s = state;  //如果在规定时间内没有做出响应且没有完成计算;则抛出TimeOut异常  if (s <= COMPLETING &&  (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)  throw new TimeoutException();  return report(s); }

   

  • Future#awaitDone()
    • 等待线程执行完成;等待的结果不确定
private int awaitDone(boolean timed, long nanos)  throws InterruptedException {  //计算等待结束时间  final long deadline = timed ? System.nanoTime() + nanos : 0L;  WaitNode q = null;  boolean queued = false;  for (;;) {  //如果线程以及中止  if (Thread.interrupted()) {  //将WaitNode q移除等待队列  removeWaiter(q);  //抛出异常  throw new InterruptedException();  }  int s = state;  //如果线程线程以及执行完成,则直接返回结果  if (s > COMPLETING) {  if (q != null)  q.thread = null;  return s;  }  //如果线程状态即将转化为完成;则让CPU当前执行线程让出CPU  else if (s == COMPLETING) // cannot time out yet  Thread.yield();  //新建队列节点  else if (q == null)  q = new WaitNode();  //将当前节点置于等待队列  else if (!queued)  queued = UNSAFE.compareAndSwapObject(this, waitersOffset,  q.next = waiters, q);  //指定线程在nanos时间后被唤醒  else if (timed) {  nanos = deadline - System.nanoTime();  //如果时间不合法  if (nanos <= 0L) {  //则将超时的等待线程移除等待队列  removeWaiter(q);  return state;  }  //线程等待单位时间后被唤醒  LockSupport.parkNanos(this, nanos);  }  //线程一直阻塞,直到被其余线程唤醒  else  LockSupport.park(this);  }  }

   

  • FutureTask#cancel(boolean mayInterruptIfRunning)
    • 尝试取消此任务的执行


public boolean cancel(boolean mayInterruptIfRunning) {  //mayInterruptIfRunning变量声明对于Thread是否执行强制中断  //基于mayInterruptIfRunning变量;通过CAS修改状态  //NEW -> INTERRUPTING或者NEW -> CANCELLED  //INTERRUPTING只是中间状态  if (!(state == NEW &&  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))  return false;  try { // in case call to interrupt throws exception  //如果允许强制中断  if (mayInterruptIfRunning) {  try {  Thread t = runner;  if (t != null)  //中断当前线程  t.interrupt();  } finally { // final state  //修改线程状态  //NEW -> INTERRUPTING -> INTERRUPTED  UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);  }  }  } finally {  finishCompletion();  }  return true;  }

   



/template/Home/leiyu/PC/Static