AsyncHttpClient ListenableFuture源码流程解读
序
本文主要研究一下AsyncHttpClient的ListenableFuture
ListenableFuture
org/asynchttpclient/ListenableFuture.java
public interface ListenableFuture<V> extends Future<V> { /** * Terminate and if there is no exception, mark this Future as done and release the internal lock. */ void done(); /** * Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future} * * @param t the exception */ void abort(Throwable t); /** * Touch the current instance to prevent external service to times out. */ void touch(); /** * Adds a listener and executor to the ListenableFuture. * The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed * to the executor} for execution when the {@code Future}'s computation is * {@linkplain Future#isDone() complete}. * <br> * Executor can be <code>null</code>, in that case executor will be executed * in the thread where completion happens. * <br> * There is no guaranteed ordering of execution of listeners, they may get * called in the order they were added and they may get called out of order, * but any listener added through this method is guaranteed to be called once * the computation is complete. * * @param listener the listener to run when the computation is complete. * @param exec the executor to run the listener in. * @return this Future */ ListenableFuture<V> addListener(Runnable listener, Executor exec); CompletableFuture<V> toCompletableFuture(); //...... }
ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法
CompletedFailure
org/asynchttpclient/ListenableFuture.java
class CompletedFailure<T> implements ListenableFuture<T> { private final ExecutionException e; public CompletedFailure(Throwable t) { e = new ExecutionException(t); } public CompletedFailure(String message, Throwable t) { e = new ExecutionException(message, t); } @Override public boolean cancel(boolean mayInterruptIfRunning) { return true; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return true; } @Override public T get() throws ExecutionException { throw e; } @Override public T get(long timeout, TimeUnit unit) throws ExecutionException { throw e; } @Override public void done() { } @Override public void abort(Throwable t) { } @Override public void touch() { } @Override public ListenableFuture<T> addListener(Runnable listener, Executor exec) { if (exec != null) { exec.execute(listener); } else { listener.run(); } return this; } @Override public CompletableFuture<T> toCompletableFuture() { CompletableFuture<T> future = new CompletableFuture<>(); future.completeExceptionally(e); return future; } }
CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true
NettyResponseFuture
org/asynchttpclient/netty/NettyResponseFuture.java
public final class NettyResponseFuture<V> implements ListenableFuture<V> { private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater .newUpdater(NettyResponseFuture.class, "redirectCount"); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater .newUpdater(NettyResponseFuture.class, "currentRetry"); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater .newUpdater(NettyResponseFuture.class, "isDone"); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater .newUpdater(NettyResponseFuture.class, "isCancelled"); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater .newUpdater(NettyResponseFuture.class, "inAuth"); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater .newUpdater(NettyResponseFuture.class, "inProxyAuth"); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater .newUpdater(NettyResponseFuture.class, "contentProcessed"); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater .newUpdater(NettyResponseFuture.class, "onThrowableCalled"); @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater .newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder"); @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater .newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock"); private final long start = unpreciseMillisTime(); private final ChannelPoolPartitioning connectionPoolPartitioning; private final ConnectionSemaphore connectionSemaphore; private final ProxyServer proxyServer; private final int maxRetry; private final CompletableFuture<V> future = new CompletableFuture<>(); //...... @Override public V get() throws InterruptedException, ExecutionException { return future.get(); } @Override public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException { return future.get(l, tu); } }
NettyResponseFuture实现了ListenableFuture接口
done
public final void done() { if (terminateAndExit()) return; try { loadContent(); } catch (ExecutionException ignored) { } catch (RuntimeException t) { future.completeExceptionally(t); } catch (Throwable t) { future.completeExceptionally(t); throw t; } } private boolean terminateAndExit() { releasePartitionKeyLock(); cancelTimeouts(); this.channel = null; this.reuseChannel = false; return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0; } private void loadContent() throws ExecutionException { if (future.isDone()) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException("unreachable", e); } } // No more retry CURRENT_RETRY_UPDATER.set(this, maxRetry); if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) { try { future.complete(asyncHandler.onCompleted()); } catch (Throwable ex) { if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) { try { try { asyncHandler.onThrowable(ex); } catch (Throwable t) { LOGGER.debug("asyncHandler.onThrowable", t); } } finally { cancelTimeouts(); } } future.completeExceptionally(ex); } } future.getNow(null); }
done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调
abort
public final void abort(final Throwable t) { if (terminateAndExit()) return; future.completeExceptionally(t); if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) { try { asyncHandler.onThrowable(t); } catch (Throwable te) { LOGGER.debug("asyncHandler.onThrowable", te); } } }
abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调
touch
public void touch() { touch = unpreciseMillisTime(); }
touch方法用当前时间戳更新touch属性
addListener
public ListenableFuture<V> addListener(Runnable listener, Executor exec) { if (exec == null) { exec = Runnable::run; } future.whenCompleteAsync((r, v) -> listener.run(), exec); return this; }
addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)
toCompletableFuture
public CompletableFuture<V> toCompletableFuture() { return future; }
toCompletableFuture方法直接返回future
newNettyResponseFuture
org/asynchttpclient/netty/request/NettyRequestSender.java
private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request, AsyncHandler<T> asyncHandler, NettyRequest nettyRequest, ProxyServer proxyServer) { NettyResponseFuture<T> future = new NettyResponseFuture<>( request, asyncHandler, nettyRequest, config.getMaxRequestRetry(), request.getChannelPoolPartitioning(), connectionSemaphore, proxyServer); String expectHeader = request.getHeaders().get(EXPECT); if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader)) future.setDontWriteBodyBecauseExpectContinue(true); return future; } private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request, AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, ProxyServer proxyServer, boolean performConnectRequest) { NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, performConnectRequest); Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler); return Channels.isChannelActive(channel) ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel) : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler); }
NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求
小结
AsyncHttpClient的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。
以上就是聊聊AsyncHttpClient的ListenableFuture的详细内容,更多关于AsyncHttpClient的ListenableFuture的资料请关注脚本之家其它相关文章!
以上就是AsyncHttpClient ListenableFuture源码流程解读的详细内容,更多关于AsyncHttpClient ListenableFuture的资料请关注脚本之家其它相关文章!
最新评论