Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to support start/stop accepting connections #1741

Merged
merged 3 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private H2ServerParentConnectionContext(final Channel channel, final BufferAlloc

@Override
public void acceptConnections(final boolean accept) {
channel().config().setAutoRead(accept);
channel().parent().config().setAutoRead(accept);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,35 @@
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConnectTimeoutException;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;
import io.servicetalk.transport.api.HostAndPort;

import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static io.netty.util.internal.PlatformDependent.normalizedOs;
import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static io.servicetalk.client.api.AutoRetryStrategyProvider.DISABLE_AUTO_RETRIES;
import static io.servicetalk.client.api.LimitingConnectionFactoryFilter.withMax;
import static io.servicetalk.concurrent.api.BlockingTestUtils.await;
import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely;
import static io.servicetalk.http.api.DefaultHttpHeadersFactory.INSTANCE;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpRequestMethod.GET;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO;
import static io.servicetalk.logging.api.LogLevel.TRACE;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.CONNECT_TIMEOUT;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.SO_BACKLOG;
import static java.lang.Boolean.TRUE;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -57,22 +56,29 @@ class ConnectionAcceptingNettyHttpServerTest extends AbstractNettyHttpServerTest
// Linux has a greater-than check
// (see. https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/include/net/sock.h#L941)
private static final int TCP_BACKLOG = IS_LINUX ? 0 : 1;
tkountis marked this conversation as resolved.
Show resolved Hide resolved
private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(5).toMillis();
private static final int VERIFY_REQUEST_AWAIT_SECS = 5;
private static final int TRY_REQUEST_AWAIT_SECS = 1;

private final StreamingHttpRequestResponseFactory reqRespFactory =
new DefaultStreamingHttpRequestResponseFactory(DEFAULT_ALLOCATOR, INSTANCE, HTTP_1_1);
private static final int CONNECT_TIMEOUT_MILLIS = (int) ofSeconds(1).toMillis();
private static final int VERIFY_REQUEST_AWAIT_MILLIS = 500;
private static final int TRY_REQUEST_AWAIT_MILLIS = 500;

@Override
protected void configureServerBuilder(final HttpServerBuilder serverBuilder) {
serverBuilder.listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, TCP_BACKLOG);
serverBuilder.listenSocketOption(SO_BACKLOG, TCP_BACKLOG);
}

@Override
protected SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder() {
return super.newClientBuilder()
.appendConnectionFactoryFilter(withMax(5))
.autoRetryStrategy(DISABLE_AUTO_RETRIES)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue)
// It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing.
.socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS);
}

@Test
void testStopAcceptingAndResume() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO);
final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO);

assertConnectionRequestSucceeds(request);

Expand All @@ -90,7 +96,8 @@ void testStopAcceptingAndResume() throws Exception {
@Test
void testIdleTimeout() throws Exception {
setUp(CACHED, CACHED);
final StreamingHttpRequest request = reqRespFactory.newRequest(GET, SVC_ECHO);
final StreamingHttpRequest request = streamingHttpClient().newRequest(GET, SVC_ECHO);

assertConnectionRequestSucceeds(request);

serverContext().acceptConnections(false);
Expand All @@ -99,32 +106,25 @@ void testIdleTimeout() throws Exception {
// Connection will establish but remain in the accept-queue
// (i.e., NOT accepted by the server => occupying 1 backlog entry)
assertConnectionRequestReceiveTimesOut(request);
final StreamingHttpClient httpClient2 = newClient();
final Single<StreamingHttpResponse> response =
streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request));
// Since we control the backlog size, this connection won't establish (i.e., NO syn-ack)
// timeout operator can be used to kill it or socket connection-timeout
final ExecutionException executionException =
assertThrows(ExecutionException.class, () -> awaitIndefinitely(httpClient2.request(request)));
assertThrows(ExecutionException.class, () -> awaitIndefinitely(response));
assertThat(executionException.getCause(), instanceOf(ConnectTimeoutException.class));
}

private void assertConnectionRequestReceiveTimesOut(final StreamingHttpRequest request) {
final StreamingHttpClient httpClient = newClient();
assertThrows(TimeoutException.class, () -> await(httpClient.request(request), TRY_REQUEST_AWAIT_SECS, SECONDS));
}

private StreamingHttpClient newClient() {
return newClientBuilder()
.appendConnectionFactoryFilter(withMax(1))
.autoRetryStrategy(DISABLE_AUTO_RETRIES)
.enableWireLogging("servicetalk-tests-wire-logger", TRACE, TRUE::booleanValue)
// It's important to use CONNECT_TIMEOUT here to verify that connections aren't establishing.
.socketOption(CONNECT_TIMEOUT, CONNECT_TIMEOUT_MILLIS)
.buildStreaming();
assertThrows(TimeoutException.class,
() -> await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)),
TRY_REQUEST_AWAIT_MILLIS, MILLISECONDS));
}

private void assertConnectionRequestSucceeds(final StreamingHttpRequest request) throws Exception {
final StreamingHttpClient httpClient = newClientBuilder().buildStreaming();
final StreamingHttpResponse response = await(httpClient.request(request), VERIFY_REQUEST_AWAIT_SECS, SECONDS);
final StreamingHttpResponse response =
await(streamingHttpClient().reserveConnection(request).flatMap(conn -> conn.request(request)),
VERIFY_REQUEST_AWAIT_MILLIS, MILLISECONDS);
assert response != null;
assertResponse(response, HTTP_1_1, OK, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
((ReferenceCounted) msg).release();
}
}
if (msg instanceof Channel && !channelSet.addIfAbsent((Channel) msg)) {
if (msg instanceof Channel && (!((Channel) msg).isActive() || !channelSet.addIfAbsent((Channel) msg))) {
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.warn("Channel ({}) not added to ChannelSet", msg);
}
ctx.fireChannelRead(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@
*/
public interface ServerListenContext {
/**
* Toggles the server's channel accepting connection ability.
* Toggles the server's ability to accept new connections.
* <p>
* Passing a {@code false} value, will stop accepting connections on the server channel, without affecting any other
* interaction to currently open child channels (i.e., reads / writes).
* Passing a {@code false} value will signal the server to stop accepting new connections.
* It won't affect any other interactions to currently open connections (i.e., reads / writes).
* <p>
* Depending on the transport, connections may still get ESTABLISHED, see.
* {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} and (in case of Linux)
* <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">SOMAXCONN</a>.
* Depending on the transport, connections may still get ESTABLISHED, see
* {@link ServiceTalkSocketOptions#SO_BACKLOG backlog} or OS wide settings:
* <ul>
* <li>Linux: <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">SOMAXCONN</a></li>
* <li>MacOS/BSD: <a href="https://docs.freebsd.org/en/books/handbook/config/#configtuning-kernel-limits">
* kern.ipc.somaxconn / kern.ipc.soacceptqueue</a></li>
* </ul>
* For instance, in case of TCP the 3-way handshake may finish, and the connection will await in the
* accept queue to be accepted. If the accept queue is full, connection SYNs will await in the
* SYN backlog (i.e., in case of Linux <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">
* tcp_max_syn_backlog</a>). These additional parameters may affect the behavior of new flows when the service
* is not accepting.
* SYN backlog (in the case of linux). This can be tuned:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider clarifying that it can be tuned at OS level or per server instance using ServiceTalkSocketOptions#SO_BACKLOG option.

I know that you don't have access to HTTP classes from here, but maybe even without referencing an exact method, consider clarifying that they need to use listenSocketOption on the builder, not a socketOption method. It might be a useful hint.

Copy link
Contributor Author

@tkountis tkountis Aug 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not tunable on the per server instance. so_backlog = accept_queue. SYN backlog is a different setting, only at the OS level. I am not elaborating further on it, because its very OS specific.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying!

* <a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt">tcp_max_syn_backlog</a>
* These additional parameters may affect the behavior of new flows when the service is not accepting.
* <p>
* Depending on how long this stays in the {@code false} state, it may affect other timeouts (i.e., connect-timeout
* or idleness) on the peer-side and/or the other flows to the peer (i.e., proxies).
tkountis marked this conversation as resolved.
Show resolved Hide resolved
Expand Down