Skip to content

Commit

Permalink
Fix some error handling in subscriptions over websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
jmartisk committed Jan 25, 2023
1 parent 8b4322c commit 7c05606
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public enum MessageType {
GQL_CONNECTION_ERROR("connection_error"),
GQL_CONNECTION_ACK("connection_ack"),
GQL_DATA("data"),
GQL_ERROR("connection_error"),
GQL_ERROR("error"),
GQL_COMPLETE("complete"),
GQL_CONNECTION_KEEP_ALIVE("ka");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public void write(ExecutionResponse executionResponse) {
} else if (data instanceof Publisher) {
// this means the operation is a subscription
sendStreamingMessage(operationId, executionResponse);
} else if (data == null) {
// if isDataPresent() == true && but data == null,
// then this is probably a subscription, but the subscription
// method threw an exception instead of returning
// a failed Multi
sendErrorMessage(operationId, executionResponse);
} else {
logUnknownResult(executionResult);
}
Expand Down Expand Up @@ -167,8 +173,8 @@ private JsonObject createDataMessage(String operationId, JsonObject payload) {
}

private void logUnknownResult(ExecutionResult executionResult) {
LOG.warn("Unknown execution result of type "
+ executionResult.getClass());
LOG.warn("Unknown data type of execution result: "
+ executionResult.getData().getClass());
}

private void sendSingleMessage(String operationId, ExecutionResponse executionResponse) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public enum MessageType {
GQL_CONNECTION_ERROR("connection_error"),
GQL_CONNECTION_ACK("connection_ack"),
GQL_DATA("data"),
GQL_ERROR("connection_error"),
GQL_ERROR("error"),
GQL_COMPLETE("complete"),
GQL_CONNECTION_KEEP_ALIVE("ka");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,25 @@
import static org.junit.Assert.*;

import java.net.URL;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import jakarta.json.JsonValue;

import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.arquillian.test.api.ArquillianResource;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import io.smallrye.graphql.client.GraphQLClientException;
import io.smallrye.graphql.client.GraphQLError;
import io.smallrye.graphql.client.Response;
import io.smallrye.graphql.client.core.Document;
import io.smallrye.graphql.client.core.OperationType;
import io.smallrye.graphql.client.vertx.dynamic.VertxDynamicGraphQLClient;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;

public abstract class AbstractDynamicClientSubscriptionTest {

Expand All @@ -38,6 +35,8 @@ public static WebArchive deployment() {
.addClasses(DynamicClientSubscriptionApi.class);
}

static Duration DURATION = Duration.ofSeconds(5);

@ArquillianResource
URL testingURL;

Expand Down Expand Up @@ -69,23 +68,28 @@ public void testFailingImmediately() throws InterruptedException {
Document document = document(
operation(OperationType.SUBSCRIPTION,
field("failingImmediately")));
AtomicReference<Response> response = new AtomicReference<>();
CountDownLatch finished = new CountDownLatch(1);
client.subscription(document)
.subscribe()
.with(item -> {
response.set(item);
}, throwable -> {
// nothing
}, () -> {
finished.countDown();
});
finished.await(10, TimeUnit.SECONDS);
Response actualResponse = response.get();
assertNotNull("One response was expected to arrive", actualResponse);
Assert.assertEquals(JsonValue.NULL, actualResponse.getData().get("failingImmediately"));
// FIXME: add an assertion about the contained error message
// right now, there is no error message present, which is a bug
AssertSubscriber<Response> subscriber = new AssertSubscriber<>(10);
client.subscription(document).subscribe(subscriber);
List<Response> messages = subscriber
.awaitNextItem(DURATION)
.awaitCompletion(DURATION)
.assertTerminated()
.getItems();
assertEquals(1, messages.size());
assertEquals("System error", messages.get(0).getErrors().get(0).getMessage());
}

@Test
public void testThrowingExceptionDirectly() throws InterruptedException {
Document document = document(
operation(OperationType.SUBSCRIPTION,
field("throwingExceptionDirectly")));
AssertSubscriber<Response> subscriber = new AssertSubscriber<>(10);
client.subscription(document).subscribe(subscriber);
Throwable failure = subscriber.awaitFailure(DURATION)
.getFailure();
assertTrue(failure instanceof GraphQLClientException);
assertEquals("System error", ((GraphQLClientException) failure).getErrors().get(0).getMessage());
}

private void assertNoErrors(List<GraphQLError> errors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ public Multi<Integer> failingImmediately() {
return Multi.createFrom().failure(new RuntimeException("blabla"));
}

@Subscription
public Multi<Integer> throwingExceptionDirectly() {
throw new RuntimeException("blabla");
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.smallrye.graphql.tests.client.typesafe.subscription;

import static io.smallrye.graphql.client.core.Field.field;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.net.URL;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -22,6 +24,7 @@

import io.smallrye.graphql.client.GraphQLClientException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;

public abstract class AbstractTypesafeClientSubscriptionTest {

Expand All @@ -37,6 +40,8 @@ public static WebArchive deployment() {

protected SubscriptionClientApi client;

static Duration DURATION = Duration.ofSeconds(5);

@After
public void cleanup() {
try {
Expand Down Expand Up @@ -107,4 +112,26 @@ public void failingSubscriptionShouldCloseClient() throws InterruptedException {
" to an exception in server-side processing", ended);
}

@Test
public void testFailingImmediately() {
AssertSubscriber<Integer> subscriber = new AssertSubscriber<>(10);
client.failingImmediately().subscribe(subscriber);
Throwable failure = subscriber
.awaitFailure(DURATION)
.assertHasNotReceivedAnyItem()
.getFailure();
assertTrue(failure instanceof GraphQLClientException);
}

@Test
public void testThrowingExceptionDirectly() {
AssertSubscriber<Integer> subscriber = new AssertSubscriber<>(10);
client.failingImmediately().subscribe(subscriber);
Throwable failure = subscriber
.awaitFailure(DURATION)
.assertHasNotReceivedAnyItem()
.getFailure();
assertTrue(failure instanceof GraphQLClientException);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,14 @@ public Integer failingSourceField(@Source Dummy dummy) {
throw new RuntimeException("FAILED SOURCE FIELD");
}

@Subscription
public Multi<Integer> failingImmediately() {
return Multi.createFrom().failure(new RuntimeException("blabla"));
}

@Subscription
public Multi<Integer> throwingExceptionDirectly() {
throw new RuntimeException("blabla");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ public interface SubscriptionClientApi extends Closeable {
@Subscription(value = "countToFive")
Multi<DummyWithErrorOrOnFailingSourceField> countToFiveWithFailingSourceFieldInErrorOr(boolean shouldFail);

@Subscription
Multi<Integer> failingImmediately();

@Subscription
Multi<Integer> throwingExceptionDirectly();

}
6 changes: 6 additions & 0 deletions ui/graphiql/src/main/webapp/render.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ function graphQLFetcher(graphQLParams) {
break;
case 'pong':
break;
case 'error':
observer.next(data.payload);
webSocket.close();
default:
observer.next(data);
break;
Expand Down Expand Up @@ -182,6 +185,9 @@ function graphQLFetcher(graphQLParams) {
break;
case 'ka':
break;
case 'error':
observer.next(data);
webSocket.close();
default:
observer.next(data);
break;
Expand Down

0 comments on commit 7c05606

Please sign in to comment.