From 95921f8a426deaddeab76ba6c833013e5d444d65 Mon Sep 17 00:00:00 2001 From: Thomas Kountis Date: Mon, 16 Aug 2021 14:58:20 +0100 Subject: [PATCH 1/3] API to support start/stop accepting connections Motivation: Expose a way a user can yield accepting connection on server side, and resume on demand.? Modifications: ServerContext now supports an additional `acceptConnections(bool)` API that can be used to hint to the server the need for start/stop accepting. Result: More ways to control the service on-demand. --- .../api/DelegatingHttpServiceContext.java | 5 + .../http/api/HttpServiceContext.java | 4 +- .../http/api/TestHttpServiceContext.java | 5 + .../H2ServerParentConnectionContext.java | 5 + .../http/netty/NettyHttpServer.java | 11 ++ .../netty/AbstractNettyHttpServerTest.java | 6 +- ...onnectionAcceptingNettyHttpServerTest.java | 131 ++++++++++++++++++ .../NettyHttpServerConnectionDrainTest.java | 5 + .../transport/api/ServerContext.java | 4 +- .../transport/api/ServerListenContext.java | 50 +++++++ .../api/ServiceTalkSocketOptions.java | 6 +- .../netty/internal/NettyServerContext.java | 7 +- 12 files changed, 231 insertions(+), 8 deletions(-) create mode 100644 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java create mode 100644 servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java index 19df9431b7..86873cebb6 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java @@ -86,4 +86,9 @@ public Completable closeAsync() { public Completable closeAsyncGracefully() { return delegate.closeAsyncGracefully(); } + + @Override + public void acceptConnections(final boolean accept) { + delegate.acceptConnections(accept); + } } diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServiceContext.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServiceContext.java index ad9dc5ed09..820319e7e0 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServiceContext.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServiceContext.java @@ -15,12 +15,14 @@ */ package io.servicetalk.http.api; +import io.servicetalk.transport.api.ServerListenContext; + import static java.util.Objects.requireNonNull; /** * A {@link HttpConnectionContext} for use in the {@link HttpService} context. */ -public abstract class HttpServiceContext implements HttpConnectionContext { +public abstract class HttpServiceContext implements HttpConnectionContext, ServerListenContext { private final HttpHeadersFactory headersFactory; private final HttpResponseFactory factory; private final StreamingHttpResponseFactory streamingFactory; diff --git a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java index 19ca93eabb..ed1e75d874 100644 --- a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java +++ b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java @@ -102,4 +102,9 @@ public Completable onClose() { public Completable closeAsync() { return completed(); } + + @Override + public void acceptConnections(final boolean accept) { + throw new UnsupportedOperationException(); + } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index b167401729..ad729714b2 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -74,6 +74,11 @@ private H2ServerParentConnectionContext(final Channel channel, final BufferAlloc this.listenAddress = requireNonNull(listenAddress); } + @Override + public void acceptConnections(final boolean accept) { + channel().config().setAutoRead(accept); + } + @Override public SocketAddress listenAddress() { return listenAddress; diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java index feeb2b9205..2e6aef169b 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java @@ -205,6 +205,11 @@ public SocketAddress listenAddress() { return delegate.listenAddress(); } + @Override + public void acceptConnections(final boolean accept) { + delegate.acceptConnections(accept); + } + @Override public ExecutionContext executionContext() { return delegate.executionContext(); @@ -484,6 +489,12 @@ public Channel nettyChannel() { return connection.nettyChannel(); } + @Override + public void acceptConnections(final boolean accept) { + assert connection.nettyChannel().parent() != null; + connection.nettyChannel().parent().config().setAutoRead(accept); + } + @Override public String toString() { return getClass().getSimpleName() + '(' + connection + ')'; diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java index 0af04ee7f2..f534f16e87 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java @@ -162,6 +162,7 @@ private void startServer() throws Exception { .protocols(protocol) .transportObserver(serverTransportObserver) .enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true); + configureServerBuilder(serverBuilder); if (sslEnabled) { serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey).build()); @@ -194,7 +195,10 @@ private void startServer() throws Exception { httpConnection = httpClient.reserveConnection(httpClient.get("/")).toFuture().get(); } - private SingleAddressHttpClientBuilder newClientBuilder() { + protected void configureServerBuilder(final HttpServerBuilder serverBuilder) { + } + + protected SingleAddressHttpClientBuilder newClientBuilder() { return HttpClients.forResolvedAddress(serverHostAndPort(serverContext)); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java new file mode 100644 index 0000000000..f362815c84 --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java @@ -0,0 +1,131 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.client.api.ConnectTimeoutException; +import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory; +import io.servicetalk.http.api.HttpServerBuilder; +import io.servicetalk.http.api.StreamingHttpClient; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpRequestResponseFactory; +import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.transport.api.ServiceTalkSocketOptions; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static io.netty.util.internal.PlatformDependent.normalizedOs; +import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; +import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES; +import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax; +import static io.servicetalk.concurrent.api.BlockingTestUtils.await; +import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; +import static io.servicetalk.http.api.DefaultHttpHeadersFactory.INSTANCE; +import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; +import static io.servicetalk.http.api.HttpRequestMethod.GET; +import static io.servicetalk.http.api.HttpResponseStatus.OK; +import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED; +import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO; +import static io.servicetalk.logging.api.LogLevel.TRACE; +import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT; +import static java.lang.Boolean.TRUE; +import static java.time.Duration.ofSeconds; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest { + + private static final boolean IS_LINUX = "linux".equals(normalizedOs()); + // There is an off-by-one behavior difference between macOS & Linux. + // Linux has a greater-than check + // (see. https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/include/net/sock.h#L941) + private static final int TCP_BACKLOG = IS_LINUX ? 0 : 1; + private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(5).toMillis(); + private static final int VERIFY_REQUEST_AWAIT_SECS = 5; + private static final int TRY_REQUEST_AWAIT_SECS = 1; + + private final StreamingHttpRequestResponseFactory reqRespFactory = + new DefaultStreamingHttpRequestResponseFactory(DEFAULT_ALLOCATOR, INSTANCE, HTTP_1_1); + + @Override + protected void configureServerBuilder(final HttpServerBuilder serverBuilder) { + serverBuilder.listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, TCP_BACKLOG); + } + + @Test + void testStopAcceptingAndResume() throws Exception { + setUp(CACHED, CACHED); + final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO); + + assertConnectionRequestSucceeds(request); + + serverContext().acceptConnections(false); + // Netty will evaluate the auto-read on the next round, so the next connection will go through. + assertConnectionRequestSucceeds(request); + // This connection should get established but not accepted. + assertConnectionRequestReceiveTimesOut(request); + + // Restoring auto-read will resume accepting. + serverContext().acceptConnections(true); + assertConnectionRequestSucceeds(request); + } + + @Test + void testIdleTimeout() throws Exception { + setUp(CACHED, CACHED); + final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO); + assertConnectionRequestSucceeds(request); + + serverContext().acceptConnections(false); + // Netty will evaluate the auto-read on the next round, so the next connection will go through. + assertConnectionRequestSucceeds(request); + // Connection will establish but remain in the accept-queue + // (i.e., NOT accepted by the server => occupying 1 backlog entry) + assertConnectionRequestReceiveTimesOut(request); + final StreamingHttpClient httpClient2 = newClient(); + // Since we control the backlog size, this connection won't establish (i.e., NO syn-ack) + // timeout operator can be used to kill it or socket connection-timeout + final ExecutionException executionException = + assertThrows(ExecutionException.class, () -> awaitIndefinitely(httpClient2.request(request))); + assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class)); + } + + private void assertConnectionRequestReceiveTimesOut(final StreamingHttpRequest request) { + final StreamingHttpClient httpClient = newClient(); + assertThrows(TimeoutException.class, () -> await(httpClient.request(request), TRY_REQUEST_AWAIT_SECS, SECONDS)); + } + + private StreamingHttpClient newClient() { + return newClientBuilder() + .appendConnectionFactoryFilter(withMax(1)) + .autoRetryStrategy(DISABLE_AUTO_RETRIES) + .enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue) + // It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing. + .socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS) + .buildStreaming(); + } + + private void assertConnectionRequestSucceeds(final StreamingHttpRequest request) throws Exception { + final StreamingHttpClient httpClient = newClientBuilder().buildStreaming(); + final StreamingHttpResponse response = await(httpClient.request(request), VERIFY_REQUEST_AWAIT_SECS, SECONDS); + assert response != null; + assertResponse(response, HTTP_1_1, OK, ""); + } +} diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java index 3acf37acac..0e4c53bd23 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java @@ -169,6 +169,11 @@ public ExecutionContext executionContext() { return serverContext.executionContext(); } + @Override + public void acceptConnections(final boolean accept) { + serverContext.acceptConnections(accept); + } + @Override public Completable onClose() { return serverContext.onClose(); diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java index e08401b5f9..ae707ceb32 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ /** * Context for servers. */ -public interface ServerContext extends ListenableAsyncCloseable, GracefulAutoCloseable { +public interface ServerContext extends ServerListenContext, ListenableAsyncCloseable, GracefulAutoCloseable { /** * Listen address for the server associated with this context. diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java new file mode 100644 index 0000000000..b5c0db2213 --- /dev/null +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.transport.api; + +/** + * Context for controlling listen behavior. + */ +public interface ServerListenContext { + /** + * Toggles the server's channel accepting connection ability. + *

+ * Passing a {@code false} value, will stop accepting connections on the server channel, without affecting any other + * interaction to currently open child channels (i.e., reads / writes). + *

+ * Depending on the transport, connections may still get ESTABLISHED, see. + * {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} and (in case of Linux) + * SOMAXCONN. + * For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the + * accept queue to be accepted. If the accept queue is full, connection SYNs will await in the + * SYN backlog (i.e., in case of Linux + * tcp_max_syn_backlog). These additional parameters may affect the behavior of new flows when the service + * is not accepting. + *

+ * Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout + * or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies). + *

+ * Considerations: + *

    + *
  • Upon resumption, {@code accept == true}, backlogged connections will be processed first, + * which may be inactive by that time.
  • + *
  • The effect of toggling connection acceptance may be lazy evaluated (implementation detail), meaning + * that connections may still go through even after setting this to {@code false}.
  • + *
+ * @param accept Toggles the server's accepting connection ability. + */ + void acceptConnections(boolean accept); +} diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java index 1f955dab2b..d1ac762a7a 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java @@ -28,19 +28,19 @@ public final class ServiceTalkSocketOptions { /** - * The connect timeout in milliseconds. + * Connect timeout in milliseconds. */ public static final SocketOption CONNECT_TIMEOUT = new ServiceTalkSocketOption<>("CONNECT_TIMEOUT", Integer.class); /** - * The threshold after which the the Endpoint is not writable anymore. + * The threshold after which the Endpoint is not writable anymore. */ public static final SocketOption WRITE_BUFFER_THRESHOLD = new ServiceTalkSocketOption<>("WRITE_BUFFER_THRESHOLD", Integer.class); /** - * Allow to idle timeout in milli seconds after which the connection is closed. + * Connection idle timeout in milliseconds after which the connection is closed. */ public static final SocketOption IDLE_TIMEOUT = new ServiceTalkSocketOption<>("IDLE_TIMEOUT", Long.class); diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java index 75f92c202f..f6c7ffa406 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,6 +83,11 @@ public SocketAddress listenAddress() { return listenChannel.localAddress(); } + @Override + public void acceptConnections(final boolean accept) { + listenChannel.config().setAutoRead(accept); + } + @Override public ExecutionContext executionContext() { return executionContext; From ca60a1c311ee13eb9d8d2ca10d3e37b5ab05a0c2 Mon Sep 17 00:00:00 2001 From: Thomas Kountis Date: Thu, 26 Aug 2021 00:01:03 +0100 Subject: [PATCH 2/3] Comments --- .../H2ServerParentConnectionContext.java | 2 +- ...onnectionAcceptingNettyHttpServerTest.java | 64 +++++++++---------- .../tcp/netty/internal/TcpServerBinder.java | 2 +- .../transport/api/ServerListenContext.java | 22 ++++--- 4 files changed, 47 insertions(+), 43 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index ad729714b2..0d4a70dc63 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -76,7 +76,7 @@ private H2ServerParentConnectionContext(final Channel channel, final BufferAlloc @Override public void acceptConnections(final boolean accept) { - channel().config().setAutoRead(accept); + channel().parent().config().setAutoRead(accept); } @Override diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java index f362815c84..b883756a83 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java @@ -16,26 +16,24 @@ package io.servicetalk.http.netty; import io.servicetalk.client.api.ConnectTimeoutException; -import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory; +import io.servicetalk.concurrent.api.Single; import io.servicetalk.http.api.HttpServerBuilder; -import io.servicetalk.http.api.StreamingHttpClient; +import io.servicetalk.http.api.SingleAddressHttpClientBuilder; import io.servicetalk.http.api.StreamingHttpRequest; -import io.servicetalk.http.api.StreamingHttpRequestResponseFactory; import io.servicetalk.http.api.StreamingHttpResponse; -import io.servicetalk.transport.api.ServiceTalkSocketOptions; +import io.servicetalk.transport.api.HostAndPort; import org.junit.jupiter.api.Test; +import java.net.InetSocketAddress; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import static io.netty.util.internal.PlatformDependent.normalizedOs; -import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES; import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax; import static io.servicetalk.concurrent.api.BlockingTestUtils.await; import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; -import static io.servicetalk.http.api.DefaultHttpHeadersFactory.INSTANCE; import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; import static io.servicetalk.http.api.HttpRequestMethod.GET; import static io.servicetalk.http.api.HttpResponseStatus.OK; @@ -43,9 +41,10 @@ import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO; import static io.servicetalk.logging.api.LogLevel.TRACE; import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT; +import static io.servicetalk.transport.api.ServiceTalkSocketOptions.SO_BACKLOG; import static java.lang.Boolean.TRUE; import static java.time.Duration.ofSeconds; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -57,22 +56,29 @@ class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest // Linux has a greater-than check // (see. https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/include/net/sock.h#L941) private static final int TCP_BACKLOG = IS_LINUX ? 0 : 1; - private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(5).toMillis(); - private static final int VERIFY_REQUEST_AWAIT_SECS = 5; - private static final int TRY_REQUEST_AWAIT_SECS = 1; - - private final StreamingHttpRequestResponseFactory reqRespFactory = - new DefaultStreamingHttpRequestResponseFactory(DEFAULT_ALLOCATOR, INSTANCE, HTTP_1_1); + private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(1).toMillis(); + private static final int VERIFY_REQUEST_AWAIT_MILLIS = 500; + private static final int TRY_REQUEST_AWAIT_MILLIS = 500; @Override protected void configureServerBuilder(final HttpServerBuilder serverBuilder) { - serverBuilder.listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, TCP_BACKLOG); + serverBuilder.listenSocketOption(SO_BACKLOG, TCP_BACKLOG); + } + + @Override + protected SingleAddressHttpClientBuilder newClientBuilder() { + return super.newClientBuilder() + .appendConnectionFactoryFilter(withMax(5)) + .autoRetryStrategy(DISABLE_AUTO_RETRIES) + .enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue) + // It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing. + .socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS); } @Test void testStopAcceptingAndResume() throws Exception { setUp(CACHED, CACHED); - final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO); + final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO); assertConnectionRequestSucceeds(request); @@ -90,7 +96,8 @@ void testStopAcceptingAndResume() throws Exception { @Test void testIdleTimeout() throws Exception { setUp(CACHED, CACHED); - final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO); + final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO); + assertConnectionRequestSucceeds(request); serverContext().acceptConnections(false); @@ -99,32 +106,25 @@ void testIdleTimeout() throws Exception { // Connection will establish but remain in the accept-queue // (i.e., NOT accepted by the server => occupying 1 backlog entry) assertConnectionRequestReceiveTimesOut(request); - final StreamingHttpClient httpClient2 = newClient(); + final Single response = + streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)); // Since we control the backlog size, this connection won't establish (i.e., NO syn-ack) // timeout operator can be used to kill it or socket connection-timeout final ExecutionException executionException = - assertThrows(ExecutionException.class, () -> awaitIndefinitely(httpClient2.request(request))); + assertThrows(ExecutionException.class, () -> awaitIndefinitely(response)); assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class)); } private void assertConnectionRequestReceiveTimesOut(final StreamingHttpRequest request) { - final StreamingHttpClient httpClient = newClient(); - assertThrows(TimeoutException.class, () -> await(httpClient.request(request), TRY_REQUEST_AWAIT_SECS, SECONDS)); - } - - private StreamingHttpClient newClient() { - return newClientBuilder() - .appendConnectionFactoryFilter(withMax(1)) - .autoRetryStrategy(DISABLE_AUTO_RETRIES) - .enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue) - // It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing. - .socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS) - .buildStreaming(); + assertThrows(TimeoutException.class, + () -> await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)), + TRY_REQUEST_AWAIT_MILLIS, MILLISECONDS)); } private void assertConnectionRequestSucceeds(final StreamingHttpRequest request) throws Exception { - final StreamingHttpClient httpClient = newClientBuilder().buildStreaming(); - final StreamingHttpResponse response = await(httpClient.request(request), VERIFY_REQUEST_AWAIT_SECS, SECONDS); + final StreamingHttpResponse response = + await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)), + VERIFY_REQUEST_AWAIT_MILLIS, MILLISECONDS); assert response != null; assertResponse(response, HTTP_1_1, OK, ""); } diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java index 947b20fdbb..a259d61914 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java @@ -107,7 +107,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { ((ReferenceCounted) msg).release(); } } - if (msg instanceof Channel && !channelSet.addIfAbsent((Channel) msg)) { + if (msg instanceof Channel && (!((Channel) msg).isActive() || !channelSet.addIfAbsent((Channel) msg))) { LOGGER.warn("Channel ({}) not added to ChannelSet", msg); } ctx.fireChannelRead(msg); diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java index b5c0db2213..c9e372958c 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerListenContext.java @@ -20,19 +20,23 @@ */ public interface ServerListenContext { /** - * Toggles the server's channel accepting connection ability. + * Toggles the server's ability to accept new connections. *

- * Passing a {@code false} value, will stop accepting connections on the server channel, without affecting any other - * interaction to currently open child channels (i.e., reads / writes). + * Passing a {@code false} value will signal the server to stop accepting new connections. + * It won't affect any other interactions to currently open connections (i.e., reads / writes). *

- * Depending on the transport, connections may still get ESTABLISHED, see. - * {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} and (in case of Linux) - * SOMAXCONN. + * Depending on the transport, connections may still get ESTABLISHED, see + * {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} or OS wide settings: + *

* For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the * accept queue to be accepted. If the accept queue is full, connection SYNs will await in the - * SYN backlog (i.e., in case of Linux - * tcp_max_syn_backlog). These additional parameters may affect the behavior of new flows when the service - * is not accepting. + * SYN backlog (in the case of linux). This can be tuned: + * tcp_max_syn_backlog + * These additional parameters may affect the behavior of new flows when the service is not accepting. *

* Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout * or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies). From 981ee6de5ae87b27531b6327eb139c1f78f836b1 Mon Sep 17 00:00:00 2001 From: Thomas Kountis Date: Thu, 26 Aug 2021 11:25:22 +0100 Subject: [PATCH 3/3] Comments --- .../tcp/netty/internal/TcpServerBinder.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java index a259d61914..4c63e6e499 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java @@ -107,8 +107,16 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { ((ReferenceCounted) msg).release(); } } - if (msg instanceof Channel && (!((Channel) msg).isActive() || !channelSet.addIfAbsent((Channel) msg))) { - LOGGER.warn("Channel ({}) not added to ChannelSet", msg); + if (msg instanceof Channel) { + final Channel channel = (Channel) msg; + if (!channel.isActive()) { + channel.close(); + LOGGER.debug("Channel ({}) is accepted, but was already inactive", msg); + return; + } else if (!channelSet.addIfAbsent(channel)) { + LOGGER.warn("Channel ({}) not added to ChannelSet", msg); + return; + } } ctx.fireChannelRead(msg); }