AsyncHttpClient ListenableFuture源码流程解读

 更新时间:2023年12月18日 08:35:35   作者:codecraft  
这篇文章主要为大家介绍了AsyncHttpClient ListenableFuture源码流程解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下AsyncHttpClient的ListenableFuture

ListenableFuture

org/asynchttpclient/ListenableFuture.java

public interface ListenableFuture<V> extends Future<V> {
  /**
   * Terminate and if there is no exception, mark this Future as done and release the internal lock.
   */
  void done();
  /**
   * Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}
   *
   * @param t the exception
   */
  void abort(Throwable t);
  /**
   * Touch the current instance to prevent external service to times out.
   */
  void touch();
  /**
   * Adds a listener and executor to the ListenableFuture.
   * The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed
   * to the executor} for execution when the {@code Future}'s computation is
   * {@linkplain Future#isDone() complete}.
   * <br>
   * Executor can be <code>null</code>, in that case executor will be executed
   * in the thread where completion happens.
   * <br>
   * There is no guaranteed ordering of execution of listeners, they may get
   * called in the order they were added and they may get called out of order,
   * but any listener added through this method is guaranteed to be called once
   * the computation is complete.
   *
   * @param listener the listener to run when the computation is complete.
   * @param exec     the executor to run the listener in.
   * @return this Future
   */
  ListenableFuture<V> addListener(Runnable listener, Executor exec);
  CompletableFuture<V> toCompletableFuture();
  //......
}
ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法

CompletedFailure

org/asynchttpclient/ListenableFuture.java

class CompletedFailure<T> implements ListenableFuture<T> {
    private final ExecutionException e;
    public CompletedFailure(Throwable t) {
      e = new ExecutionException(t);
    }
    public CompletedFailure(String message, Throwable t) {
      e = new ExecutionException(message, t);
    }
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
      return true;
    }
    @Override
    public boolean isCancelled() {
      return false;
    }
    @Override
    public boolean isDone() {
      return true;
    }
    @Override
    public T get() throws ExecutionException {
      throw e;
    }
    @Override
    public T get(long timeout, TimeUnit unit) throws ExecutionException {
      throw e;
    }
    @Override
    public void done() {
    }
    @Override
    public void abort(Throwable t) {
    }
    @Override
    public void touch() {
    }
    @Override
    public ListenableFuture<T> addListener(Runnable listener, Executor exec) {
      if (exec != null) {
        exec.execute(listener);
      } else {
        listener.run();
      }
      return this;
    }
    @Override
    public CompletableFuture<T> toCompletableFuture() {
      CompletableFuture<T> future = new CompletableFuture<>();
      future.completeExceptionally(e);
      return future;
    }
  }
CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true

NettyResponseFuture

org/asynchttpclient/netty/NettyResponseFuture.java

public final class NettyResponseFuture<V> implements ListenableFuture<V> {
  private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "redirectCount");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "currentRetry");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "isDone");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "isCancelled");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "inAuth");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "inProxyAuth");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "contentProcessed");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "onThrowableCalled");
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater
          .newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater
          .newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");
  private final long start = unpreciseMillisTime();
  private final ChannelPoolPartitioning connectionPoolPartitioning;
  private final ConnectionSemaphore connectionSemaphore;
  private final ProxyServer proxyServer;
  private final int maxRetry;
  private final CompletableFuture<V> future = new CompletableFuture<>();          
          //......
  @Override
  public V get() throws InterruptedException, ExecutionException {
    return future.get();
  }
  @Override
  public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {
    return future.get(l, tu);
  }          
}
NettyResponseFuture实现了ListenableFuture接口

done

public final void done() {
    if (terminateAndExit())
      return;
    try {
      loadContent();
    } catch (ExecutionException ignored) {
    } catch (RuntimeException t) {
      future.completeExceptionally(t);
    } catch (Throwable t) {
      future.completeExceptionally(t);
      throw t;
    }
  }
  private boolean terminateAndExit() {
    releasePartitionKeyLock();
    cancelTimeouts();
    this.channel = null;
    this.reuseChannel = false;
    return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
  }  
private void loadContent() throws ExecutionException {
    if (future.isDone()) {
      try {
        future.get();
      } catch (InterruptedException e) {
        throw new RuntimeException("unreachable", e);
      }
    }
    // No more retry
    CURRENT_RETRY_UPDATER.set(this, maxRetry);
    if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {
      try {
        future.complete(asyncHandler.onCompleted());
      } catch (Throwable ex) {
        if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
          try {
            try {
              asyncHandler.onThrowable(ex);
            } catch (Throwable t) {
              LOGGER.debug("asyncHandler.onThrowable", t);
            }
          } finally {
            cancelTimeouts();
          }
        }
        future.completeExceptionally(ex);
      }
    }
    future.getNow(null);
  }
done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调

abort

public final void abort(final Throwable t) {
    if (terminateAndExit())
      return;
    future.completeExceptionally(t);
    if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {
      try {
        asyncHandler.onThrowable(t);
      } catch (Throwable te) {
        LOGGER.debug("asyncHandler.onThrowable", te);
      }
    }
  }
abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调

touch

public void touch() {
    touch = unpreciseMillisTime();
  }
touch方法用当前时间戳更新touch属性

addListener

public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
    if (exec == null) {
      exec = Runnable::run;
    }
    future.whenCompleteAsync((r, v) -> listener.run(), exec);
    return this;
  }
addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)

toCompletableFuture

public CompletableFuture<V> toCompletableFuture() {
    return future;
  }
toCompletableFuture方法直接返回future

newNettyResponseFuture

org/asynchttpclient/netty/request/NettyRequestSender.java

private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,
                                                            AsyncHandler<T> asyncHandler,
                                                            NettyRequest nettyRequest,
                                                            ProxyServer proxyServer) {
    NettyResponseFuture<T> future = new NettyResponseFuture<>(
            request,
            asyncHandler,
            nettyRequest,
            config.getMaxRequestRetry(),
            request.getChannelPoolPartitioning(),
            connectionSemaphore,
            proxyServer);
    String expectHeader = request.getHeaders().get(EXPECT);
    if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))
      future.setDontWriteBodyBecauseExpectContinue(true);
    return future;
  }
  private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,
                                                                     AsyncHandler<T> asyncHandler,
                                                                     NettyResponseFuture<T> future,
                                                                     ProxyServer proxyServer,
                                                                     boolean performConnectRequest) {
    NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
            performConnectRequest);
    Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
    return Channels.isChannelActive(channel)
            ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
            : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
  }
NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求

小结

AsyncHttpClient的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。

以上就是聊聊AsyncHttpClient的ListenableFuture的详细内容,更多关于AsyncHttpClient的ListenableFuture的资料请关注脚本之家其它相关文章!

以上就是AsyncHttpClient ListenableFuture源码流程解读的详细内容,更多关于AsyncHttpClient ListenableFuture的资料请关注脚本之家其它相关文章!

相关文章

  • 解析Java和IDEA中的文件打包问题

    解析Java和IDEA中的文件打包问题

    这篇文章主要介绍了Java和IDEA中的文件打包问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-07-07
  • java自定义类加载器代码示例

    java自定义类加载器代码示例

    这篇文章主要介绍了java自定义类加载器代码示例,具有一定借鉴价值,需要的朋友可以了解下。
    2017-12-12
  • maven下mybatis-plus和pagehelp冲突问题的解决方法

    maven下mybatis-plus和pagehelp冲突问题的解决方法

    这篇文章主要介绍了maven下mybatis-plus和pagehelp冲突的解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Java web过滤器验证登录防止未登录进入界面

    Java web过滤器验证登录防止未登录进入界面

    这篇文章主要介绍了Java web过滤器验证登录防止未登录进入界面,在一些系统中经常可以用到此功能,对java web 验证登录知识感兴趣的朋友一起看下吧
    2016-08-08
  • Spring-全面详解(学习总结)

    Spring-全面详解(学习总结)

    这篇文章主要介绍了详解Spring框架入门,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧,希望能给你带来帮助
    2021-07-07
  • springboot后端存储富文本内容的思路与步骤(含图片内容)

    springboot后端存储富文本内容的思路与步骤(含图片内容)

    在所有的编辑器中,大概最受欢迎的就是富文本编辑器和MarkDown编辑器了,下面这篇文章主要给大家介绍了关于springboot后端存储富文本内容的思路与步骤的相关资料,需要的朋友可以参考下
    2023-04-04
  • Java并发包工具类CountDownLatch的应用详解

    Java并发包工具类CountDownLatch的应用详解

    CountDownLatch是Java并发包中非常实用的一个工具类,它可以帮助我们实现线程之间的同步和协作。本文主要介绍了CountDownLatch的应用场景及最佳实践,希望对大家有所帮助
    2023-04-04
  • SpringCache快速使用及入门案例

    SpringCache快速使用及入门案例

    Spring Cache 是Spring 提供的一整套的缓存解决方案,它不是具体的缓存实现,本文主要介绍了SpringCache快速使用及入门案例,感兴趣的可以了解一下
    2023-08-08
  • 解读@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstructor的区别及在springboot常用地方

    解读@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstr

    这篇文章主要介绍了解读@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstructor的区别及在springboot常用地方,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • Java OpenCV学习之Mat的基本操作详解

    Java OpenCV学习之Mat的基本操作详解

    OpenCV用来存储图像,很多时候都会用到这个Mat方法。数字图像可看做一个数值矩阵, 其中的每一个元素表明一个像素点。Mat在 OpenCV 中表示的是 N 维稠密矩阵,与稠密矩阵相对的是稀疏矩阵。本文将重点介绍OpenCV中Mat的一些基本操作,需要的可以参考一下
    2022-03-03

最新评论