diff --git a/.github/workflows/ci-prb.yml b/.github/workflows/ci-prb.yml index db8770dfef..51519889e4 100644 --- a/.github/workflows/ci-prb.yml +++ b/.github/workflows/ci-prb.yml @@ -41,7 +41,7 @@ jobs: - name: Build and Test env: CI: true - JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 + JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 -Dservicetalk.logger.wireLogLevel=TRACE run: ./gradlew --parallel clean test - name: Upload Test Results if: always() diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index b167401729..ad729714b2 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -74,6 +74,11 @@ private H2ServerParentConnectionContext(final Channel channel, final BufferAlloc this.listenAddress = requireNonNull(listenAddress); } + @Override + public void acceptConnections(final boolean accept) { + channel().config().setAutoRead(accept); + } + @Override public SocketAddress listenAddress() { return listenAddress; diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java index feeb2b9205..ff5ec6a2a1 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java @@ -205,6 +205,11 @@ public SocketAddress listenAddress() { return delegate.listenAddress(); } + @Override + public void acceptConnections(final boolean accept) { + delegate.acceptConnections(accept); + } + @Override public ExecutionContext executionContext() { return delegate.executionContext(); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java index 0af04ee7f2..f534f16e87 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java @@ -162,6 +162,7 @@ private void startServer() throws Exception { .protocols(protocol) .transportObserver(serverTransportObserver) .enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true); + configureServerBuilder(serverBuilder); if (sslEnabled) { serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey).build()); @@ -194,7 +195,10 @@ private void startServer() throws Exception { httpConnection = httpClient.reserveConnection(httpClient.get("/")).toFuture().get(); } - private SingleAddressHttpClientBuilder newClientBuilder() { + protected void configureServerBuilder(final HttpServerBuilder serverBuilder) { + } + + protected SingleAddressHttpClientBuilder newClientBuilder() { return HttpClients.forResolvedAddress(serverHostAndPort(serverContext)); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java new file mode 100644 index 0000000000..8d4796210c --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionAcceptingNettyHttpServerTest.java @@ -0,0 +1,130 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.client.api.ConnectTimeoutException; +import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory; +import io.servicetalk.http.api.HttpServerBuilder; +import io.servicetalk.http.api.StreamingHttpClient; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpRequestResponseFactory; +import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.transport.api.ServiceTalkSocketOptions; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; +import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES; +import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax; +import static io.servicetalk.concurrent.api.BlockingTestUtils.await; +import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; +import static io.servicetalk.http.api.DefaultHttpHeadersFactory.INSTANCE; +import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; +import static io.servicetalk.http.api.HttpRequestMethod.GET; +import static io.servicetalk.http.api.HttpResponseStatus.OK; +import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED; +import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO; +import static io.servicetalk.logging.api.LogLevel.TRACE; +import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT; +import static java.lang.Boolean.TRUE; +import static java.time.Duration.ofSeconds; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest { + + static { + System.setProperty("servicetalk.logger.wireLogLevel", "TRACE"); + } + + private static final int TCP_BACKLOG = 1; + private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(5).toMillis(); + private static final int VERIFY_REQUEST_AWAIT_SECS = 5; + private static final int TRY_REQUEST_AWAIT_SECS = 1; + + private final StreamingHttpRequestResponseFactory reqRespFactory = + new DefaultStreamingHttpRequestResponseFactory(DEFAULT_ALLOCATOR, INSTANCE, HTTP_1_1); + + @Override + protected void configureServerBuilder(final HttpServerBuilder serverBuilder) { + serverBuilder.listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, TCP_BACKLOG); + } + + @Test + void testStopAcceptingAndResume() throws Exception { + setUp(CACHED, CACHED); + final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO); + + verifyNewConnectionRequest(request); + + serverContext().acceptConnections(false); + // Netty will evaluate the auto-read on the next round, so the next connection will go through. + verifyNewConnectionRequest(request); + // This connection should get established but not accepted. + tryNewConnectionRequest(request); + + // Restoring auto-read will resume accepting. + serverContext().acceptConnections(true); + verifyNewConnectionRequest(request); + } + + @Test + void testIdleTimeout() throws Exception { + setUp(CACHED, CACHED); + final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO); + verifyNewConnectionRequest(request); + + serverContext().acceptConnections(false); + // Netty will evaluate the auto-read on the next round, so the next connection will go through. + verifyNewConnectionRequest(request); + // Connection will establish but remain in the accept-queue + // (i.e., NOT accepted by the server => occupying 1 backlog entry) + tryNewConnectionRequest(request); + final StreamingHttpClient httpClient2 = newClient(); + // Since we control the backlog size, this connection won't establish (i.e., NO syn-ack) + // timeout operator can be used to kill it or socket connection-timeout + final ExecutionException executionException = + assertThrows(ExecutionException.class, () -> awaitIndefinitely(httpClient2.request(request))); + assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class)); + } + + private void tryNewConnectionRequest(final StreamingHttpRequest request) { + final StreamingHttpClient httpClient = newClient(); + assertThrows(TimeoutException.class, () -> await(httpClient.request(request), TRY_REQUEST_AWAIT_SECS, SECONDS)); + } + + private StreamingHttpClient newClient() { + return newClientBuilder() + .appendConnectionFactoryFilter(withMax(1)) + .autoRetryStrategy(DISABLE_AUTO_RETRIES) + .enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue) + // It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing. + .socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS) + .buildStreaming(); + } + + private void verifyNewConnectionRequest(final StreamingHttpRequest request) throws Exception { + final StreamingHttpClient httpClient = newClientBuilder().buildStreaming(); + final StreamingHttpResponse response = await(httpClient.request(request), VERIFY_REQUEST_AWAIT_SECS, SECONDS); + assert response != null; + assertResponse(response, HTTP_1_1, OK, ""); + } +} diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java index 3acf37acac..0e4c53bd23 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java @@ -169,6 +169,11 @@ public ExecutionContext executionContext() { return serverContext.executionContext(); } + @Override + public void acceptConnections(final boolean accept) { + serverContext.acceptConnections(accept); + } + @Override public Completable onClose() { return serverContext.onClose(); diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java index e08401b5f9..7dc197830d 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServerContext.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,35 @@ default void awaitShutdown() { awaitTermination(onClose().toFuture()); } + /** + * Toggles the server's channel accepting connection ability. + *

+ * Passing a {@code false} value, will stop accepting connections on the server channel, without affecting any other + * interaction to currently open child channels (i.e., reads / writes). + *

+ * Depending on the transport, connections may still get ESTABLISHED, see. + * {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} and + * SOMAXCONN. + * For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the + * accept queue to be accepted. If the accept queue is full, connection SYNs will await in the + * SYN backlog (i.e., + * tcp_max_syn_backlog). These additional parameters may affect the behavior of new flows when the service + * is not accepting. + *

+ * Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout + * or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies). + *

+ * Notes: + *

+ * @param accept Toggles the server's accepting connection ability. + */ + void acceptConnections(boolean accept); + @Override default void close() throws Exception { awaitTermination(closeAsync().toFuture()); diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java index 1f955dab2b..d1ac762a7a 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ServiceTalkSocketOptions.java @@ -28,19 +28,19 @@ public final class ServiceTalkSocketOptions { /** - * The connect timeout in milliseconds. + * Connect timeout in milliseconds. */ public static final SocketOption CONNECT_TIMEOUT = new ServiceTalkSocketOption<>("CONNECT_TIMEOUT", Integer.class); /** - * The threshold after which the the Endpoint is not writable anymore. + * The threshold after which the Endpoint is not writable anymore. */ public static final SocketOption WRITE_BUFFER_THRESHOLD = new ServiceTalkSocketOption<>("WRITE_BUFFER_THRESHOLD", Integer.class); /** - * Allow to idle timeout in milli seconds after which the connection is closed. + * Connection idle timeout in milliseconds after which the connection is closed. */ public static final SocketOption IDLE_TIMEOUT = new ServiceTalkSocketOption<>("IDLE_TIMEOUT", Long.class); diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java index 75f92c202f..f6c7ffa406 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,6 +83,11 @@ public SocketAddress listenAddress() { return listenChannel.localAddress(); } + @Override + public void acceptConnections(final boolean accept) { + listenChannel.config().setAutoRead(accept); + } + @Override public ExecutionContext executionContext() { return executionContext;