Future解析


FutureTask源码分析
• FutureTask#run() • 执行Thread  public void run() {  //通过CAS获取runner属性并且设置为当前线程的ThreadID;  //保证在计算出结果之前,不会重复计算;  if (state != NEW ||  !UNSAFE.compareAndSwapObject(this, runnerOffset,  null, Thread.currentThread()))  return;  try {  //callable在初始化的时候传递进来的  Callablec = callable;  if (c != null && state == NEW) {  V result;  boolean ran;  try {  //执行callable的结果通过result字段保存  result = c.call();  ran = true;  } catch (Throwable ex) {  result = null;  ran = false;  //保存callable发生错误的情况  setException(ex);  }  if (ran)  set(result);  }  } finally {  // runner must be non-null until state is settled to  // prevent concurrent calls to run()  //初始化自身状态;在CAS时需要使用runnerOffset;  //只有runner为null,才会执行任务  runner = null;  // state must be re-read after nulling runner to prevent  // leaked interrupts  int s = state;  if (s >= INTERRUPTING)  //在后面  handlePossibleCancellationInterrupt(s);  }  }
//Future执行发生异常时  //此时的状态流为NEW -> COMPLETING -> EXCEPTIONAL  protected void setException(Throwable t) {  //CAS修改状态;保证计算结果的正确性;并且修改状态为COMPLETING  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  outcome = t;  //修改状态为EXCEPTIONAL  UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state  finishCompletion();  }  }  //Future正常执行  //此时的状态流为NEW -> COMPLETING -> NORMAL  protected void set(V v) {  //CAS修改状态;  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  outcome = v;  //修改状态为NORMAL  UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state  finishCompletion();  }  }
private void finishCompletion() {  // assert state > COMPLETING;  for (WaitNode q; (q = waiters) != null;) {  //清除等待队列  if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {  for (;;) {  Thread t = q.thread;  if (t != null) {  q.thread = null;  //唤醒等待的线程  LockSupport.unpark(t);  }  WaitNode next = q.next;  if (next == null)  break;  q.next = null; // unlink to help gc  q = next;  }  break;  }  }  //执行done  done();  //清除callable  callable = null; // to reduce footprint  }
//不包含超时的Get  public V get() throws InterruptedException, ExecutionException {  int s = state;  //如果当前Thread的state没有转化为完成状态  if (s <= COMPLETING)  //则等待;  s = awaitDone(false, 0L);  //返回执行的结果或者异常信息 return report(s);  }  //包含超时时间的Get  public V get(long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException {  if (unit == null)  throw new NullPointerException();  int s = state;  //如果在规定时间内没有做出响应且没有完成计算;则抛出TimeOut异常  if (s <= COMPLETING &&  (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)  throw new TimeoutException();  return report(s); }
private int awaitDone(boolean timed, long nanos)  throws InterruptedException {  //计算等待结束时间  final long deadline = timed ? System.nanoTime() + nanos : 0L;  WaitNode q = null;  boolean queued = false;  for (;;) {  //如果线程以及中止  if (Thread.interrupted()) {  //将WaitNode q移除等待队列  removeWaiter(q);  //抛出异常  throw new InterruptedException();  }  int s = state;  //如果线程线程以及执行完成,则直接返回结果  if (s > COMPLETING) {  if (q != null)  q.thread = null;  return s;  }  //如果线程状态即将转化为完成;则让CPU当前执行线程让出CPU  else if (s == COMPLETING) // cannot time out yet  Thread.yield();  //新建队列节点  else if (q == null)  q = new WaitNode();  //将当前节点置于等待队列  else if (!queued)  queued = UNSAFE.compareAndSwapObject(this, waitersOffset,  q.next = waiters, q);  //指定线程在nanos时间后被唤醒  else if (timed) {  nanos = deadline - System.nanoTime();  //如果时间不合法  if (nanos <= 0L) {  //则将超时的等待线程移除等待队列  removeWaiter(q);  return state;  }  //线程等待单位时间后被唤醒  LockSupport.parkNanos(this, nanos);  }  //线程一直阻塞,直到被其余线程唤醒  else  LockSupport.park(this);  }  }
public boolean cancel(boolean mayInterruptIfRunning) {  //mayInterruptIfRunning变量声明对于Thread是否执行强制中断  //基于mayInterruptIfRunning变量;通过CAS修改状态  //NEW -> INTERRUPTING或者NEW -> CANCELLED  //INTERRUPTING只是中间状态  if (!(state == NEW &&  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))  return false;  try { // in case call to interrupt throws exception  //如果允许强制中断  if (mayInterruptIfRunning) {  try {  Thread t = runner;  if (t != null)  //中断当前线程  t.interrupt();  } finally { // final state  //修改线程状态  //NEW -> INTERRUPTING -> INTERRUPTED  UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);  }  }  } finally {  finishCompletion();  }  return true;  }
Copyright © 2023 leiyu.cn. All Rights Reserved. 磊宇云计算 版权所有 许可证编号:B1-20233142/B2-20230630 山东磊宇云计算有限公司 鲁ICP备2020045424号
磊宇云计算致力于以最 “绿色节能” 的方式,让每一位上云的客户成为全球绿色节能和降低碳排放的贡献者