Skip to content

Commit

Permalink
API to support start/stop accepting connections
Browse files Browse the repository at this point in the history
Motivation:

Expose a way a user can yield accepting connection on server side, and resume on demand.?

Modifications:

ServerContext now supports an additional `acceptConnections(bool)` API that can be used to
hint to the server the need for start/stop accepting.

Result:

More ways to control the service on-demand.
  • Loading branch information
tkountis committed Aug 18, 2021
1 parent 9e9f27f commit d2ac0b3
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ private H2ServerParentConnectionContext(final Channel channel, final BufferAlloc
this.listenAddress = requireNonNull(listenAddress);
}

@Override
public void acceptConnections(final boolean accept) {
channel().config().setAutoRead(accept);
}

@Override
public SocketAddress listenAddress() {
return listenAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ public SocketAddress listenAddress() {
return delegate.listenAddress();
}

@Override
public void acceptConnections(final boolean accept) {
delegate.acceptConnections(accept);
}

@Override
public ExecutionContext executionContext() {
return delegate.executionContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.ServerSslConfigBuilder;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.NettyIoExecutors;
import io.servicetalk.transport.netty.internal.IoThreadFactory;
Expand Down Expand Up @@ -162,6 +163,7 @@ private void startServer() throws Exception {
.protocols(protocol)
.transportObserver(serverTransportObserver)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true);
configureServerBuilder(serverBuilder);
if (sslEnabled) {
serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem,
DefaultTestCerts::loadServerKey).build());
Expand Down Expand Up @@ -194,7 +196,10 @@ private void startServer() throws Exception {
httpConnection = httpClient.reserveConnection(httpClient.get("/")).toFuture().get();
}

private SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
}

protected SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
return HttpClients.forResolvedAddress(serverHostAndPort(serverContext));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConnectTimeoutException;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;

import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES;
import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax;
import static io.servicetalk.concurrent.api.BlockingTestUtils.await;
import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely;
import static io.servicetalk.http.api.DefaultHttpHeadersFactory.INSTANCE;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpRequestMethod.GET;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest {

private static final int TCP_BACKLOG = 1;
private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(5).toMillis();
private static final int VERIFY_REQUEST_AWAIT_SECS = 5;
private static final int TRY_REQUEST_AWAIT_SECS = 1;

private final StreamingHttpRequestResponseFactory reqRespFactory =
new DefaultStreamingHttpRequestResponseFactory(DEFAULT_ALLOCATOR, INSTANCE, HTTP_1_1);

@Override
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
serverBuilder.listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, TCP_BACKLOG);
}

@Test
void testStopAcceptingAndResume() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO);

verifyNewConnectionRequest(request);

serverContext().acceptConnections(false);
// Netty will evaluate the auto-read on the next round, so the next connection will go through.
verifyNewConnectionRequest(request);
// This connection should get established but not accepted.
tryNewConnectionRequest(request);

// Restoring auto-read will resume accepting.
serverContext().acceptConnections(true);
verifyNewConnectionRequest(request);
}

@Test
void testIdleTimeout() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO);
verifyNewConnectionRequest(request);

serverContext().acceptConnections(false);
// Netty will evaluate the auto-read on the next round, so the next connection will go through.
verifyNewConnectionRequest(request);
// Connection will establish but remain in the accept-queue (i.e., NOT accepted by the server)
tryNewConnectionRequest(request);
final StreamingHttpClient httpClient2 = newClient();
// Since we control the backlog size, this connection won't establish (i.e., NO syn-ack)
// timeout operator can be used to kill it or socket connection-timeout
final ExecutionException executionException =
assertThrows(ExecutionException.class, () -> awaitIndefinitely(httpClient2.request(request)));
assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class));
}

private void tryNewConnectionRequest(final StreamingHttpRequest request) {
final StreamingHttpClient httpClient = newClient();
assertThrows(TimeoutException.class, () -> await(httpClient.request(request), TRY_REQUEST_AWAIT_SECS, SECONDS));
}

private StreamingHttpClient newClient() {
return newClientBuilder()
.appendConnectionFactoryFilter(withMax(1))
.autoRetryStrategy(DISABLE_AUTO_RETRIES)
// It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing.
.socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS)
.buildStreaming();
}

private void verifyNewConnectionRequest(final StreamingHttpRequest request) throws Exception {
final StreamingHttpClient httpClient = newClientBuilder().buildStreaming();
final StreamingHttpResponse response = await(httpClient.request(request), VERIFY_REQUEST_AWAIT_SECS, SECONDS);
assert response != null;
assertResponse(response, HTTP_1_1, OK, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public ExecutionContext executionContext() {
return serverContext.executionContext();
}

@Override
public void acceptConnections(final boolean accept) {
serverContext.acceptConnections(accept);
}

@Override
public Completable onClose() {
return serverContext.onClose();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,35 @@ default void awaitShutdown() {
awaitTermination(onClose().toFuture());
}

/**
* Toggles the server's channel accepting connection ability.
* <p>
* Passing a {@code false} value, will stop accepting connections on the server channel, without affecting any other
* interaction to currently open child channels (i.e., reads / writes).
* <p>
* Depending on the transport, connections may still get ESTABLISHED, see.
* {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} and
* <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">SOMAXCONN</a>.
* For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the
* accept queue to be accepted. If the accept queue is full, connection SYNs will await in the
* SYN backlog (i.e., <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">
* tcp_max_syn_backlog</a>). These additional parameters may affect the behavior of new flows when the service
* is not accepting.
* <p>
* Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout
* or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies).
* <p>
* Notes:
* <ul>
* <li>Upon resumption, {@code accept == true}, backlogged connection will be processed first,
* which may be inactive by that time.</li>
* <li>The effect of toggling connection acceptance may be lazy evaluated (implementation detail), meaning
* that connections may still go through even after setting this to {@code false}.</li>
* </ul>
* @param accept Toggles the server's accepting connection ability.
*/
void acceptConnections(boolean accept);

@Override
default void close() throws Exception {
awaitTermination(closeAsync().toFuture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@
public final class ServiceTalkSocketOptions {

/**
* The connect timeout in milliseconds.
* Connect timeout in milliseconds.
*/
public static final SocketOption<Integer> CONNECT_TIMEOUT =
new ServiceTalkSocketOption<>("CONNECT_TIMEOUT", Integer.class);

/**
* The threshold after which the the Endpoint is not writable anymore.
* The threshold after which the Endpoint is not writable anymore.
*/
public static final SocketOption<Integer> WRITE_BUFFER_THRESHOLD =
new ServiceTalkSocketOption<>("WRITE_BUFFER_THRESHOLD", Integer.class);

/**
* Allow to idle timeout in milli seconds after which the connection is closed.
* Connection idle timeout in milliseconds after which the connection is closed.
*/
public static final SocketOption<Long> IDLE_TIMEOUT = new ServiceTalkSocketOption<>("IDLE_TIMEOUT", Long.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,11 @@ public SocketAddress listenAddress() {
return listenChannel.localAddress();
}

@Override
public void acceptConnections(final boolean accept) {
listenChannel.config().setAutoRead(accept);
}

@Override
public ExecutionContext executionContext() {
return executionContext;
Expand Down

0 comments on commit d2ac0b3

Please sign in to comment.