Skip to content

Commit

Permalink
Shared BlockingUtils in concurrent api internal (#2185)
Browse files Browse the repository at this point in the history
Motivation:

BlockingUtils offer useful functionality that could be used outside the http package.

Modifications:

Moved BlockingUtils to concurrent-api-internal

Result:

Shared utilities among modules.
  • Loading branch information
tkountis authored May 23, 2022
1 parent b952931 commit ed9d360
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,40 +13,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.api;
package io.servicetalk.concurrent.api.internal;

import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Single;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static io.servicetalk.concurrent.api.Completable.fromRunnable;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;

final class BlockingUtils {
/**
* Common utility functions to unwrap {@link ExecutionException} from async operations.
*/
public final class BlockingUtils {

private BlockingUtils() {
// no instances
}

interface RunnableCheckedException {
void run() throws Exception;

default void runUnchecked() {
try {
run();
} catch (Exception e) {
throwException(e);
}
}
}

static Completable blockingToCompletable(RunnableCheckedException r) {
return fromRunnable(r::runUnchecked);
}

static <T> T futureGetCancelOnInterrupt(Future<T> future) throws Exception {
/**
* Completes a {@link Future} by invoking {@link Future#get()}.
* Any occurred {@link Exception} will be converted to unchecked, and {@link ExecutionException}s will be unwrapped.
* Upon interruption, the {@link Future} is cancelled.
*
* @param future The future to operate on.
* @param <T> The type of the result.
* @return The result of the future.
* @throws Exception InterrupedException upon interruption or unchecked exceptions for any other exception.
*/
public static <T> T futureGetCancelOnInterrupt(Future<T> future) throws Exception {
try {
return future.get();
} catch (InterruptedException e) {
Expand All @@ -58,14 +54,16 @@ static <T> T futureGetCancelOnInterrupt(Future<T> future) throws Exception {
}
}

static HttpResponse request(final StreamingHttpRequester requester, final HttpRequest request) throws Exception {
// It is assumed that users will always apply timeouts at the StreamingHttpService layer (e.g. via filter). So
// we don't apply any explicit timeout here and just wait forever.
return blockingInvocation(requester.request(request.toStreamingRequest())
.flatMap(response -> response.toResponse().shareContextOnSubscribe()));
}

static <T> T blockingInvocation(Single<T> source) throws Exception {
/**
* Subscribes a {@link Single} immediately and awaits result.
* Any occurred {@link Exception} will be converted to unchecked, and {@link ExecutionException}s will be unwrapped.
*
* @param source The {@link Single} to operate on.
* @param <T> The type of the result.
* @return The result of the single.
* @throws Exception InterrupedException upon interruption or unchecked exceptions for any other exception.
*/
public static <T> T blockingInvocation(Single<T> source) throws Exception {
// It is assumed that users will always apply timeouts at the StreamingHttpService layer (e.g. via filter). So
// we don't apply any explicit timeout here and just wait forever.
try {
Expand All @@ -75,7 +73,14 @@ static <T> T blockingInvocation(Single<T> source) throws Exception {
}
}

static void blockingInvocation(Completable source) throws Exception {
/**
* Subscribes a {@link Completable} immediately and awaits result.
* Any occurred {@link Exception} will be converted to unchecked, and {@link ExecutionException}s will be unwrapped.
*
* @param source The {@link Completable} to operate on.
* @throws Exception unchecked exceptions for any exception that occurs.
*/
public static void blockingInvocation(Completable source) throws Exception {
// It is assumed that users will always apply timeouts at the StreamingHttpService layer (e.g. via filter). So
// we don't apply any explicit timeout here and just wait forever.
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright © 2018, 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.api;

import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;

final class BlockingRequestUtils {

private BlockingRequestUtils() {
// no instances
}

static HttpResponse request(final StreamingHttpRequester requester, final HttpRequest request) throws Exception {
// It is assumed that users will always apply timeouts at the StreamingHttpService layer (e.g. via filter). So
// we don't apply any explicit timeout here and just wait forever.
return blockingInvocation(requester.request(request.toStreamingRequest())
.flatMap(response -> response.toResponse().shareContextOnSubscribe()));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018, 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@
import io.servicetalk.concurrent.api.Single;

import static io.servicetalk.concurrent.api.Single.fromCallable;
import static io.servicetalk.http.api.BlockingUtils.blockingToCompletable;
import static io.servicetalk.http.api.DefaultHttpExecutionStrategy.OFFLOAD_RECEIVE_DATA_STRATEGY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static java.util.Objects.requireNonNull;
Expand All @@ -43,11 +42,17 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,

@Override
public Completable closeAsync() {
return blockingToCompletable(original::close);
return Completable.fromCallable(() -> {
original.close();
return null;
});
}

@Override
public Completable closeAsyncGracefully() {
return blockingToCompletable(original::closeGracefully);
return Completable.fromCallable(() -> {
original.closeGracefully();
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;

import static io.servicetalk.http.api.BlockingUtils.blockingInvocation;
import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;

/**
* A builder for building HTTP Servers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.servicetalk.concurrent.BlockingIterable;

import static io.servicetalk.http.api.BlockingUtils.blockingInvocation;
import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.RequestResponseFactories.toAggregated;
Expand Down Expand Up @@ -59,7 +59,7 @@ public StreamingHttpClient asStreamingClient() {
@Override
public HttpResponse request(final HttpRequest request) throws Exception {
request.context().putIfAbsent(HTTP_EXECUTION_STRATEGY_KEY, strategy);
return BlockingUtils.request(client, request);
return BlockingRequestUtils.request(client, request);
}

@Override
Expand Down Expand Up @@ -146,7 +146,7 @@ public <T> BlockingIterable<? extends T> transportEventIterable(final HttpEventK

@Override
public HttpResponse request(final HttpRequest request) throws Exception {
return BlockingUtils.request(connection, request);
return BlockingRequestUtils.request(connection, request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.servicetalk.concurrent.BlockingIterable;

import static io.servicetalk.http.api.BlockingUtils.blockingInvocation;
import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.RequestResponseFactories.toBlockingStreaming;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public StreamingHttpConnection asStreamingConnection() {

@Override
public HttpResponse request(final HttpRequest request) throws Exception {
return BlockingUtils.request(connection, request);
return BlockingRequestUtils.request(connection, request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.servicetalk.concurrent.BlockingIterable;

import static io.servicetalk.http.api.BlockingUtils.blockingInvocation;
import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;
import static io.servicetalk.http.api.DefaultHttpExecutionStrategy.OFFLOAD_SEND_EVENT_STRATEGY;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.RequestResponseFactories.toBlockingStreaming;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.servicetalk.http.api;

import static io.servicetalk.http.api.BlockingUtils.futureGetCancelOnInterrupt;
import static io.servicetalk.concurrent.api.internal.BlockingUtils.futureGetCancelOnInterrupt;
import static java.util.Objects.requireNonNull;

final class StreamingHttpServiceToBlockingHttpService implements BlockingHttpService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.http.api.BlockingUtils.futureGetCancelOnInterrupt;
import static io.servicetalk.concurrent.api.internal.BlockingUtils.futureGetCancelOnInterrupt;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static java.util.Objects.requireNonNull;

Expand Down

0 comments on commit ed9d360

Please sign in to comment.