Skip to content

Commit

Permalink
Refactor the connection manager to become an endpoint manager, it als…
Browse files Browse the repository at this point in the history
…o becomes agnostic of connections
  • Loading branch information
vietj committed Nov 2, 2023
1 parent 0d63959 commit 04640c2
Show file tree
Hide file tree
Showing 30 changed files with 224 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.netty.resolver.DefaultAddressResolverGroup;
import io.vertx.core.Future;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.spi.dns.AddressResolverProvider;
import io.vertx.core.spi.resolver.dns.AddressResolverProvider;

import java.net.InetSocketAddress;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.vertx.core.impl.HostnameResolver;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.dns.AddressResolverProvider;
import io.vertx.core.spi.resolver.dns.AddressResolverProvider;

import java.io.File;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

import io.vertx.core.Future;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.impl.pool.Endpoint;
import io.vertx.core.net.impl.endpoint.Endpoint;
import io.vertx.core.spi.metrics.ClientMetrics;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
abstract class ClientHttpEndpointBase<C> extends Endpoint<C> {
abstract class ClientHttpEndpointBase<C> extends Endpoint {

private final ClientMetrics metrics; // Shall be removed later combining the PoolMetrics with HttpClientMetrics

Expand All @@ -28,7 +28,6 @@ abstract class ClientHttpEndpointBase<C> extends Endpoint<C> {
this.metrics = metrics;
}

@Override
public Future<C> requestConnection(ContextInternal ctx, long timeout) {
Future<C> fut = requestConnection2(ctx, timeout);
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.spi.net.AddressResolver;
import io.vertx.core.spi.resolver.address.AddressResolver;

import java.util.function.Function;

Expand Down
30 changes: 16 additions & 14 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import io.vertx.core.impl.*;
import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.net.*;
import io.vertx.core.net.impl.endpoint.EndpointManager;
import io.vertx.core.net.impl.endpoint.EndpointProvider;
import io.vertx.core.net.impl.pool.*;
import io.vertx.core.net.impl.resolver.EndpointRequest;
import io.vertx.core.net.impl.resolver.EndpointResolverManager;
import io.vertx.core.net.impl.resolver.EndpointLookup;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.net.AddressResolver;
import io.vertx.core.spi.resolver.address.AddressResolver;

import java.lang.ref.WeakReference;
import java.net.URI;
Expand Down Expand Up @@ -100,7 +102,7 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal
};

private final PoolOptions poolOptions;
private final ConnectionManager<EndpointKey, Lease<HttpClientConnection>> httpCM;
private final EndpointManager<EndpointKey, SharedClientHttpStreamEndpoint> httpCM;
private final EndpointResolverManager<?, ?, ?> endpointResolverManager;
private volatile Function<HttpClientResponse, Future<RequestOptions>> redirectHandler = DEFAULT_HANDLER;
private long timerID;
Expand All @@ -116,7 +118,7 @@ public HttpClientImpl(VertxInternal vertx, AddressResolver<?, ?, ?> addressResol
}

this.poolOptions = poolOptions;
httpCM = new ConnectionManager<>();
httpCM = new EndpointManager<>();
if (poolOptions.getCleanerPeriod() > 0 && (options.getKeepAliveTimeout() > 0L || options.getHttp2KeepAliveTimeout() > 0L)) {
PoolChecker checker = new PoolChecker(this);
ContextInternal timerContext = vertx.createEventLoopContext();
Expand Down Expand Up @@ -174,7 +176,7 @@ protected void checkExpired(Handler<Long> checker) {
}
}

private EndpointProvider<EndpointKey, Lease<HttpClientConnection>> httpEndpointProvider() {
private EndpointProvider<EndpointKey, SharedClientHttpStreamEndpoint> httpEndpointProvider() {
return (key, dispose) -> {
int maxPoolSize = Math.max(poolOptions.getHttp1MaxSize(), poolOptions.getHttp2MaxSize());
ClientMetrics metrics = HttpClientImpl.this.metrics != null ? HttpClientImpl.this.metrics.createEndpointMetrics(key.server, maxPoolSize) : null;
Expand Down Expand Up @@ -315,12 +317,12 @@ private Future<HttpClientRequest> doRequest(
if (server instanceof SocketAddress) {
proxyOptions = computeProxyOptions(proxyConfig, (SocketAddress) server);
EndpointKey key = new EndpointKey(useSSL, proxyOptions, (SocketAddress) server, authority);
future = httpCM.withEndpoint(key, httpEndpointProvider(), (endpoint, created) -> {
Future<Lease<HttpClientConnection>> fut = endpoint.getConnection(connCtx, timeout);
future = httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> {
Future<Lease<HttpClientConnection>> fut = endpoint.requestConnection(connCtx, timeout);
if (fut == null) {
return Optional.empty();
return null;
} else {
return Optional.of(fut.compose(lease -> {
return fut.compose(lease -> {
HttpClientConnection conn = lease.get();
return conn.createStream(ctx).andThen(ar -> {
if (ar.succeeded()) {
Expand All @@ -330,21 +332,21 @@ private Future<HttpClientRequest> doRequest(
});
}
});
}));
});
}
});
} else {
Future<EndpointLookup> fut = endpointResolverManager.lookupEndpoint(ctx, server);
future = fut.compose(lookup -> {
SocketAddress address = lookup.address();
EndpointKey key = new EndpointKey(useSSL, proxyConfig, address, authority != null ? authority : HostAndPort.create(address.host(), address.port()));
return httpCM.withEndpoint(key, httpEndpointProvider(), (endpoint, created) -> {
Future<Lease<HttpClientConnection>> fut2 = endpoint.getConnection(connCtx, timeout);
return httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> {
Future<Lease<HttpClientConnection>> fut2 = endpoint.requestConnection(connCtx, timeout);
if (fut2 == null) {
return Optional.empty();
return null;
} else {
EndpointRequest endpointRequest = lookup.initiateRequest();
return Optional.of(fut2.andThen(ar -> {
return fut2.andThen(ar -> {
if (ar.failed()) {
endpointRequest.reportFailure(ar.cause());
}
Expand All @@ -355,7 +357,7 @@ private Future<HttpClientRequest> doRequest(
wrapped.closeHandler(v -> lease.recycle());
return wrapped;
});
}));
});
}
});
});
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.pool.ConnectionManager;
import io.vertx.core.net.impl.pool.EndpointProvider;
import io.vertx.core.net.impl.endpoint.EndpointManager;
import io.vertx.core.net.impl.endpoint.EndpointProvider;
import io.vertx.core.spi.metrics.ClientMetrics;

import java.net.URI;
Expand All @@ -31,7 +31,7 @@
public class WebSocketClientImpl extends HttpClientBase implements WebSocketClient {

private final WebSocketClientOptions options;
private final ConnectionManager<EndpointKey, HttpClientConnection> webSocketCM;
private final EndpointManager<EndpointKey, WebSocketEndpoint> webSocketCM;

public WebSocketClientImpl(VertxInternal vertx, HttpClientOptions options, WebSocketClientOptions wsOptions) {
super(vertx, options);
Expand All @@ -40,8 +40,8 @@ public WebSocketClientImpl(VertxInternal vertx, HttpClientOptions options, WebSo
this.webSocketCM = webSocketConnectionManager();
}

private ConnectionManager<EndpointKey, HttpClientConnection> webSocketConnectionManager() {
return new ConnectionManager<>();
private EndpointManager<EndpointKey, WebSocketEndpoint> webSocketConnectionManager() {
return new EndpointManager<>();
}

protected void doShutdown(Promise<Void> p) {
Expand Down Expand Up @@ -73,14 +73,14 @@ void webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions, Prom
eventLoopContext = vertx.createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader());
}
// todo: cache
EndpointProvider<EndpointKey, HttpClientConnection> provider = (key_, dispose) -> {
EndpointProvider<EndpointKey, WebSocketEndpoint> provider = (key_, dispose) -> {
int maxPoolSize = options.getMaxConnections();
ClientMetrics metrics = WebSocketClientImpl.this.metrics != null ? WebSocketClientImpl.this.metrics.createEndpointMetrics(key_.server, maxPoolSize) : null;
HttpChannelConnector connector = new HttpChannelConnector(WebSocketClientImpl.this, netClient, key_.proxyOptions, metrics, HttpVersion.HTTP_1_1, key_.ssl, false, key_.authority, key_.server);
return new WebSocketEndpoint(null, maxPoolSize, connector, dispose);
};
webSocketCM
.getConnection(eventLoopContext, provider, key)
.withEndpointAsync(key, provider, (endpoint, created) -> endpoint.requestConnection(ctx, 0L))
.onComplete(c -> {
if (c.succeeded()) {
Http1xClientConnection conn = (Http1xClientConnection) c.result();
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/impl/HostnameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.dns.AddressResolverProvider;
import io.vertx.core.spi.net.AddressResolver;
import io.vertx.core.spi.net.Endpoint;
import io.vertx.core.spi.resolver.dns.AddressResolverProvider;
import io.vertx.core.spi.resolver.address.AddressResolver;
import io.vertx.core.spi.resolver.address.Endpoint;

import java.io.File;
import java.net.InetAddress;
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/vertx/core/loadbalancing/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
*/
package io.vertx.core.loadbalancing;

import io.vertx.core.spi.loadbalancing.DefaultEndpointMetrics;
import io.vertx.core.spi.loadbalancing.EndpointMetrics;
import io.vertx.core.spi.loadbalancing.EndpointSelector;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/net/AddressResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.vertx.core.Vertx;

/**
* A generic address resolver market interface. Implementation must also implement the SPI interface {@link io.vertx.core.spi.net.AddressResolver}
* A generic address resolver market interface. Implementation must also implement the SPI interface {@link io.vertx.core.spi.resolver.address.AddressResolver}
* and can be cast to this type.
*/
public interface AddressResolver {
Expand All @@ -24,6 +24,6 @@ public interface AddressResolver {
* @param vertx the vertx instance
* @return the resolver
*/
io.vertx.core.spi.net.AddressResolver<?, ?, ?> resolver(Vertx vertx);
io.vertx.core.spi.resolver.address.AddressResolver<?, ?, ?> resolver(Vertx vertx);

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.net.impl.pool;

import io.vertx.core.Future;
import io.vertx.core.impl.ContextInternal;
package io.vertx.core.net.impl.endpoint;

/**
* An endpoint, i.e a set of connection to the same address.
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public abstract class Endpoint<C> {
public abstract class Endpoint {

private final Runnable dispose;
private boolean closed;
Expand All @@ -30,28 +27,28 @@ public Endpoint(Runnable dispose) {
this.dispose = dispose;
}

public Future<C> getConnection(ContextInternal ctx, long timeout) {
boolean before() {
synchronized (this) {
if (disposed) {
return null;
return false;
}
pendingRequestCount++;
}
return requestConnection(ctx, timeout).andThen(ar -> {
boolean dispose;
synchronized (Endpoint.this) {
pendingRequestCount--;
dispose = checkDispose();
}
// Dispose before callback otherwise we can have the callback handler retrying the same
// endpoint and never get the callback it expects to creating an infinite loop
if (dispose) {
disposeInternal();
}
});
return true;
}

public abstract Future<C> requestConnection(ContextInternal ctx, long timeout);
void after() {
boolean dispose;
synchronized (Endpoint.this) {
pendingRequestCount--;
dispose = checkDispose();
}
// Dispose before callback otherwise we can have the callback handler retrying the same
// endpoint and never get the callback it expects to creating an infinite loop
if (dispose) {
disposeInternal();
}
}

protected void checkExpired() {
}
Expand Down Expand Up @@ -96,7 +93,7 @@ protected void dispose() {
}

/**
* Close the endpoint, this will close all connections, this method is called by the {@link ConnectionManager} when
* Close the endpoint, this will close all connections, this method is called by the {@link EndpointManager} when
* it is closed.
*/
protected void close() {
Expand Down
Loading

0 comments on commit 04640c2

Please sign in to comment.