Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to support start/stop accepting connections #1741

Merged
merged 3 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ public Completable closeAsync() {
public Completable closeAsyncGracefully() {
return delegate.closeAsyncGracefully();
}

@Override
public void acceptConnections(final boolean accept) {
delegate.acceptConnections(accept);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package io.servicetalk.http.api;

import io.servicetalk.transport.api.ServerListenContext;

import static java.util.Objects.requireNonNull;

/**
* A {@link HttpConnectionContext} for use in the {@link HttpService} context.
*/
public abstract class HttpServiceContext implements HttpConnectionContext {
public abstract class HttpServiceContext implements HttpConnectionContext, ServerListenContext {
private final HttpHeadersFactory headersFactory;
private final HttpResponseFactory factory;
private final StreamingHttpResponseFactory streamingFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,9 @@ public Completable onClose() {
public Completable closeAsync() {
return completed();
}

@Override
public void acceptConnections(final boolean accept) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ private H2ServerParentConnectionContext(final Channel channel, final BufferAlloc
this.listenAddress = requireNonNull(listenAddress);
}

@Override
public void acceptConnections(final boolean accept) {
channel().parent().config().setAutoRead(accept);
}

@Override
public SocketAddress listenAddress() {
return listenAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ public SocketAddress listenAddress() {
return delegate.listenAddress();
}

@Override
public void acceptConnections(final boolean accept) {
delegate.acceptConnections(accept);
}

@Override
public ExecutionContext executionContext() {
return delegate.executionContext();
Expand Down Expand Up @@ -484,6 +489,12 @@ public Channel nettyChannel() {
return connection.nettyChannel();
}

@Override
public void acceptConnections(final boolean accept) {
assert connection.nettyChannel().parent() != null;
connection.nettyChannel().parent().config().setAutoRead(accept);
}

@Override
public String toString() {
return getClass().getSimpleName() + '(' + connection + ')';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private void startServer() throws Exception {
.protocols(protocol)
.transportObserver(serverTransportObserver)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, () -> true);
configureServerBuilder(serverBuilder);
if (sslEnabled) {
serverBuilder.sslConfig(new ServerSslConfigBuilder(DefaultTestCerts::loadServerPem,
DefaultTestCerts::loadServerKey).build());
Expand Down Expand Up @@ -194,7 +195,10 @@ private void startServer() throws Exception {
httpConnection = httpClient.reserveConnection(httpClient.get("/")).toFuture().get();
}

private SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
}

protected SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
return HttpClients.forResolvedAddress(serverHostAndPort(serverContext));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConnectTimeoutException;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.HostAndPort;

import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static io.netty.util.internal.PlatformDependent.normalizedOs;
import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES;
import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax;
import static io.servicetalk.concurrent.api.BlockingTestUtils.await;
import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpRequestMethod.GET;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO;
import static io.servicetalk.logging.api.LogLevel.TRACE;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.SO_BACKLOG;
import static java.lang.Boolean.TRUE;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest {

private static final boolean IS_LINUX = "linux".equals(normalizedOs());
// There is an off-by-one behavior difference between macOS & Linux.
// Linux has a greater-than check
// (see. https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/include/net/sock.h#L941)
private static final int TCP_BACKLOG = IS_LINUX ? 0 : 1;
tkountis marked this conversation as resolved.
Show resolved Hide resolved
private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(1).toMillis();
private static final int VERIFY_REQUEST_AWAIT_MILLIS = 500;
private static final int TRY_REQUEST_AWAIT_MILLIS = 500;

@Override
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
serverBuilder.listenSocketOption(SO_BACKLOG, TCP_BACKLOG);
}

@Override
protected SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
return super.newClientBuilder()
.appendConnectionFactoryFilter(withMax(5))
.autoRetryStrategy(DISABLE_AUTO_RETRIES)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue)
// It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing.
.socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS);
}

@Test
void testStopAcceptingAndResume() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO);

assertConnectionRequestSucceeds(request);

serverContext().acceptConnections(false);
// Netty will evaluate the auto-read on the next round, so the next connection will go through.
assertConnectionRequestSucceeds(request);
// This connection should get established but not accepted.
assertConnectionRequestReceiveTimesOut(request);

// Restoring auto-read will resume accepting.
serverContext().acceptConnections(true);
assertConnectionRequestSucceeds(request);
}

@Test
void testIdleTimeout() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO);

assertConnectionRequestSucceeds(request);

serverContext().acceptConnections(false);
// Netty will evaluate the auto-read on the next round, so the next connection will go through.
assertConnectionRequestSucceeds(request);
// Connection will establish but remain in the accept-queue
// (i.e., NOT accepted by the server => occupying 1 backlog entry)
assertConnectionRequestReceiveTimesOut(request);
final Single<StreamingHttpResponse> response =
streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request));
// Since we control the backlog size, this connection won't establish (i.e., NO syn-ack)
// timeout operator can be used to kill it or socket connection-timeout
final ExecutionException executionException =
assertThrows(ExecutionException.class, () -> awaitIndefinitely(response));
assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class));
}

private void assertConnectionRequestReceiveTimesOut(final StreamingHttpRequest request) {
assertThrows(TimeoutException.class,
() -> await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)),
TRY_REQUEST_AWAIT_MILLIS, MILLISECONDS));
}

private void assertConnectionRequestSucceeds(final StreamingHttpRequest request) throws Exception {
final StreamingHttpResponse response =
await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)),
VERIFY_REQUEST_AWAIT_MILLIS, MILLISECONDS);
assert response != null;
assertResponse(response, HTTP_1_1, OK, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public ExecutionContext executionContext() {
return serverContext.executionContext();
}

@Override
public void acceptConnections(final boolean accept) {
serverContext.acceptConnections(accept);
}

@Override
public Completable onClose() {
return serverContext.onClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,16 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
((ReferenceCounted) msg).release();
}
}
if (msg instanceof Channel && !channelSet.addIfAbsent((Channel) msg)) {
LOGGER.warn("Channel ({}) not added to ChannelSet", msg);
if (msg instanceof Channel) {
final Channel channel = (Channel) msg;
if (!channel.isActive()) {
channel.close();
LOGGER.debug("Channel ({}) is accepted, but was already inactive", msg);
return;
} else if (!channelSet.addIfAbsent(channel)) {
LOGGER.warn("Channel ({}) not added to ChannelSet", msg);
return;
}
}
ctx.fireChannelRead(msg);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@
/**
* Context for servers.
*/
public interface ServerContext extends ListenableAsyncCloseable, GracefulAutoCloseable {
public interface ServerContext extends ServerListenContext, ListenableAsyncCloseable, GracefulAutoCloseable {

/**
* Listen address for the server associated with this context.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.transport.api;

/**
* Context for controlling listen behavior.
*/
public interface ServerListenContext {
/**
* Toggles the server's ability to accept new connections.
* <p>
* Passing a {@code false} value will signal the server to stop accepting new connections.
* It won't affect any other interactions to currently open connections (i.e., reads / writes).
* <p>
* Depending on the transport, connections may still get ESTABLISHED, see
* {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} or OS wide settings:
* <ul>
* <li>Linux: <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">SOMAXCONN</a></li>
* <li>MacOS/BSD: <a href="https://docs.freebsd.org/en/books/handbook/config/#configtuning-kernel-limits">
* kern.ipc.somaxconn / kern.ipc.soacceptqueue</a></li>
* </ul>
* For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the
* accept queue to be accepted. If the accept queue is full, connection SYNs will await in the
* SYN backlog (in the case of linux). This can be tuned:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider clarifying that it can be tuned at OS level or per server instance using ServiceTalkSocketOptions#SO_BACKLOG option.

I know that you don't have access to HTTP classes from here, but maybe even without referencing an exact method, consider clarifying that they need to use listenSocketOption on the builder, not a socketOption method. It might be a useful hint.

Copy link
Contributor Author

@tkountis tkountis Aug 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not tunable on the per server instance. so_backlog = accept_queue. SYN backlog is a different setting, only at the OS level. I am not elaborating further on it, because its very OS specific.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying!

* <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">tcp_max_syn_backlog</a>
* These additional parameters may affect the behavior of new flows when the service is not accepting.
* <p>
* Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout
* or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies).
tkountis marked this conversation as resolved.
Show resolved Hide resolved
* <p>
* Considerations:
* <ul>
* <li>Upon resumption, {@code accept == true}, backlogged connections will be processed first,
* which may be inactive by that time.</li>
tkountis marked this conversation as resolved.
Show resolved Hide resolved
* <li>The effect of toggling connection acceptance may be lazy evaluated (implementation detail), meaning
* that connections may still go through even after setting this to {@code false}.</li>
* </ul>
* @param accept Toggles the server's accepting connection ability.
*/
void acceptConnections(boolean accept);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@
public final class ServiceTalkSocketOptions {

/**
* The connect timeout in milliseconds.
* Connect timeout in milliseconds.
*/
public static final SocketOption<Integer> CONNECT_TIMEOUT =
new ServiceTalkSocketOption<>("CONNECT_TIMEOUT", Integer.class);

/**
* The threshold after which the the Endpoint is not writable anymore.
* The threshold after which the Endpoint is not writable anymore.
tkountis marked this conversation as resolved.
Show resolved Hide resolved
*/
public static final SocketOption<Integer> WRITE_BUFFER_THRESHOLD =
new ServiceTalkSocketOption<>("WRITE_BUFFER_THRESHOLD", Integer.class);

/**
* Allow to idle timeout in milli seconds after which the connection is closed.
* Connection idle timeout in milliseconds after which the connection is closed.
*/
public static final SocketOption<Long> IDLE_TIMEOUT = new ServiceTalkSocketOption<>("IDLE_TIMEOUT", Long.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,11 @@ public SocketAddress listenAddress() {
return listenChannel.localAddress();
}

@Override
public void acceptConnections(final boolean accept) {
listenChannel.config().setAutoRead(accept);
}

@Override
public ExecutionContext executionContext() {
return executionContext;
Expand Down