Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc over http/2 poc #26

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
- No @author tags in any javadoc.
- Use try-with-resources blocks whenever is possible.
- TODOs should be associated to at least one issue.
- Always format the contributed code.
- Always format the contributed code. In Intellij, it is recommended to enable "Reformat Code" & "Optimize Imports"
via "Tools > Actions on Save".

## Unit tests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ repositories {
ext {
lombok_version = "1.18.26"
slf4j_version = "2.0.7"
vertx_version = "4.4.1"
javax_version = "1.3.2"
vertx_version = "4.4.4"
protoc_version = "3.23.2"
grpc_version = "1.56.0"

otl_version = "1.25.0"
micrometer_version = "1.10.6"
Expand Down
33 changes: 32 additions & 1 deletion server/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,36 @@
import org.gradle.nativeplatform.platform.internal.DefaultNativePlatform

buildscript {
dependencies {
// ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.3'
}
}

plugins {
id 'com.flipkart.varadhi.java-application-conventions'
id 'com.google.protobuf' version "0.9.3"
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:$protoc_version"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version"
}
vertx {
artifact = "io.vertx:vertx-grpc-protoc-plugin2:$vertx_version"
}
}
generateProtoTasks {
all()*.plugins {
grpc
vertx
}
}
}

dependencies {

Expand All @@ -16,19 +43,23 @@ dependencies {
implementation('org.apache.logging.log4j:log4j-core')

implementation('com.fasterxml.jackson.core:jackson-databind')
implementation("javax.annotation:javax.annotation-api:$javax_version")

implementation("io.vertx:vertx-core")
implementation("io.vertx:vertx-config")
implementation("io.vertx:vertx-config-yaml")
implementation("io.vertx:vertx-web")
implementation("io.vertx:vertx-grpc-server:$vertx_version")
implementation("io.vertx:vertx-grpc-client:$vertx_version")
implementation("io.vertx:vertx-grpc-context-storage:$vertx_version")
implementation("io.vertx:vertx-auth-common")
implementation("io.vertx:vertx-auth-jwt")
implementation("io.vertx:vertx-opentelemetry")
implementation("io.vertx:vertx-micrometer-metrics")

// TODO: check why still getting warning on class not found.
if (DefaultNativePlatform.getCurrentOperatingSystem().isMacOsX()) {
runtimeOnly('io.netty:netty-resolver-dns-native-macos')
runtimeOnly('io.netty:netty-resolver-dns-native-macos:4.1.91.Final:osx-x86_64')
}

implementation("io.opentelemetry:opentelemetry-sdk")
Expand Down
12 changes: 3 additions & 9 deletions server/src/main/java/com/flipkart/varadhi/CoreServices.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.flipkart.varadhi.db.MetaStoreOptions;
import com.flipkart.varadhi.db.MetaStoreProvider;
import com.flipkart.varadhi.exceptions.InvalidConfigException;
import com.flipkart.varadhi.services.MessagingStackProvider;
import com.flipkart.varadhi.services.MessagingStackOptions;
import com.flipkart.varadhi.services.MessagingStackProvider;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.jmx.JmxConfig;
Expand Down Expand Up @@ -66,7 +66,7 @@ private MetaStoreProvider setupMetaStoreProvider(MetaStoreOptions metaStoreOptio
provider.init(metaStoreOptions);
return provider;
}

private MessagingStackProvider setupMessagingStackProvider(MessagingStackOptions messagingStackOptions) {
MessagingStackProvider provider = loadClass(messagingStackOptions.getProviderClassName());
provider.init(messagingStackOptions);
Expand All @@ -81,9 +81,7 @@ private <T> T loadClass(String className) {
}
throw new InvalidConfigException("No class provided.");
} catch (Exception e) {
String errorMsg = String.format("Fail to load class %s.", className);
log.error(errorMsg, e);
throw new InvalidConfigException(e);
throw new InvalidConfigException(String.format("Fail to load class %s.", className), e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts for removing logging ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging error and then throwing the same leads to redundant logs.

For eg:

Before

2023-06-26 12:19:05 ERROR CoreServices:85 - Fail to load class com.flipkart.varadhi.pulsar.PulsarStackProvider.
java.lang.ClassNotFoundException: com.flipkart.varadhi.pulsar.PulsarStackProvider
        at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[?:?]
        at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[?:?]
        at java.lang.Class.forName0(Native Method) ~[?:?]
        at java.lang.Class.forName(Class.java:375) ~[?:?]
        at com.flipkart.varadhi.CoreServices.loadClass(CoreServices.java:79) [main/:?]
        at com.flipkart.varadhi.CoreServices.setupMessagingStackProvider(CoreServices.java:71) [main/:?]
        at com.flipkart.varadhi.CoreServices.<init>(CoreServices.java:41) [main/:?]
        at com.flipkart.varadhi.Server.main(Server.java:22) [main/:?]
2023-06-26 12:19:05 ERROR Server:27 - Failed to initialise the server.
com.flipkart.varadhi.exceptions.InvalidConfigException: com.flipkart.varadhi.pulsar.PulsarStackProvider
        at com.flipkart.varadhi.CoreServices.loadClass(CoreServices.java:86) ~[main/:?]
        at com.flipkart.varadhi.CoreServices.setupMessagingStackProvider(CoreServices.java:71) ~[main/:?]
        at com.flipkart.varadhi.CoreServices.<init>(CoreServices.java:41) ~[main/:?]
        at com.flipkart.varadhi.Server.main(Server.java:22) [main/:?]
Caused by: java.lang.ClassNotFoundException: com.flipkart.varadhi.pulsar.PulsarStackProvider
        at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[?:?]
        at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[?:?]
        at java.lang.Class.forName0(Native Method) ~[?:?]
        at java.lang.Class.forName(Class.java:375) ~[?:?]
        at com.flipkart.varadhi.CoreServices.loadClass(CoreServices.java:79) ~[main/:?]
        ... 3 more
Failed to initialise the server:com.flipkart.varadhi.exceptions.InvalidConfigException: com.flipkart.varadhi.pulsar.PulsarStackProvider

After

2023-06-26 12:19:53 ERROR Server:27 - Failed to initialise the server.
com.flipkart.varadhi.exceptions.InvalidConfigException: Fail to load class com.flipkart.varadhi.pulsar.PulsarStackProvider.
        at com.flipkart.varadhi.CoreServices.loadClass(CoreServices.java:84) ~[main/:?]
        at com.flipkart.varadhi.CoreServices.setupMessagingStackProvider(CoreServices.java:71) ~[main/:?]
        at com.flipkart.varadhi.CoreServices.<init>(CoreServices.java:41) ~[main/:?]
        at com.flipkart.varadhi.Server.main(Server.java:22) [main/:?]
Caused by: java.lang.ClassNotFoundException: com.flipkart.varadhi.pulsar.PulsarStackProvider
        at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[?:?]
        at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[?:?]
        at java.lang.Class.forName0(Native Method) ~[?:?]
        at java.lang.Class.forName(Class.java:375) ~[?:?]
        at com.flipkart.varadhi.CoreServices.loadClass(CoreServices.java:79) ~[main/:?]
        ... 3 more

Copy link
Collaborator Author

@gauravAshok gauravAshok Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to follow the rule for error logs which says: Don't log if you are throwing. can log if you are handling.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally I tend to follow log at source of the event to capture all data along that leads to exception.
But I guess, we can avoid exception logging from that and keep only data part and leave exception logging to either default exception handler at base or log where it is being handled ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still capture what we want to capture in the "exception" that we are throwing.

}
}

Expand Down Expand Up @@ -115,10 +113,6 @@ private ObservabilityStack setupObservabilityStack(ServerConfiguration configura
return new ObservabilityStack(openTelemetry, meterRegistry);
}





@Getter
@AllArgsConstructor
public static class ObservabilityStack {
Expand Down
39 changes: 35 additions & 4 deletions server/src/main/java/com/flipkart/varadhi/RestVerticle.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
package com.flipkart.varadhi;

import com.flipkart.varadhi.exceptions.InvalidStateException;
import com.flipkart.varadhi.web.Extensions;
import com.flipkart.varadhi.web.FailureHandler;
import com.flipkart.varadhi.web.routes.RouteBehaviour;
import com.flipkart.varadhi.web.routes.RouteBehaviourProvider;
import com.flipkart.varadhi.web.routes.RouteConfigurator;
import com.flipkart.varadhi.web.routes.RouteDefinition;
import com.flipkart.varadhi.web.v1.proto.MessageProducerGrpc;
import com.flipkart.varadhi.web.v1.proto.SingleMessageResponse;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.grpc.common.GrpcStatus;
import io.vertx.grpc.server.GrpcServer;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

@Slf4j
public class RestVerticle extends AbstractVerticle {
private final List<RouteDefinition> apiRoutes;
private final Map<RouteBehaviour, RouteBehaviourProvider> behaviorProviders;
private final Map<RouteBehaviour, RouteConfigurator> behaviorProviders;

private HttpServer httpServer;

public RestVerticle(
List<RouteDefinition> apiRoutes, Map<RouteBehaviour, RouteBehaviourProvider> behaviorProviders
List<RouteDefinition> apiRoutes, Map<RouteBehaviour, RouteConfigurator> behaviorProviders
) {
this.apiRoutes = apiRoutes;
this.behaviorProviders = behaviorProviders;
Expand All @@ -36,11 +44,32 @@ public void start(Promise<Void> startPromise) {
log.info("HttpServer Starting.");
Router router = Router.router(vertx);

// grpc
GrpcServer grpcServer = GrpcServer.server(vertx);

grpcServer.callHandler(MessageProducerGrpc.getProduceMethod(), request -> {
request.response().status(GrpcStatus.OK).statusMessage("OK")
.end(SingleMessageResponse.newBuilder().setOffset("0001").build());
});

router.route()
.consumes("application/grpc")
.produces("application/grpc")
.handler(req -> grpcServer.handle(req.request()));

router.route()
.consumes("application/json")
.produces("application/json")
.path("/topics/produce")
.method(HttpMethod.POST)
.handler(rc -> Extensions.RoutingContextExtension.endRequestWithResponse(rc, 200, "0001"));

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Planning to keep it or move it appropriately to ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya. its a TODO, will fix it in this PR.

// http
FailureHandler failureHandler = new FailureHandler();
for (RouteDefinition def : apiRoutes) {
Route route = router.route().method(def.method()).path(def.path());
def.behaviours().forEach(behaviour -> {
RouteBehaviourProvider behaviorProvider = behaviorProviders.getOrDefault(behaviour, null);
RouteConfigurator behaviorProvider = behaviorProviders.getOrDefault(behaviour, null);
if (null != behaviorProvider) {
behaviorProvider.configure(route, def);
} else {
Expand All @@ -58,6 +87,8 @@ public void start(Promise<Void> startPromise) {
HttpServerOptions options = new HttpServerOptions();
// TODO: why?
options.setDecompressionSupported(false);
options.setAlpnVersions(Arrays.asList(HttpVersion.HTTP_2, HttpVersion.HTTP_1_1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEFAULT_ALPN_VERSIONS constant ?

options.setUseAlpn(true);

// TODO: create config for http server
httpServer = vertx.createHttpServer(options).requestHandler(router).listen(8080, h -> {
Expand Down
1 change: 0 additions & 1 deletion server/src/main/java/com/flipkart/varadhi/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public static void main(String[] args) {
log.info("Server Started.");
} catch (Exception e) {
log.error("Failed to initialise the server.", e);
System.out.println("Failed to initialise the server:" + e);
System.exit(-1);
}
// TODO: check need for shutdown hook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.flipkart.varadhi.services.VaradhiTopicService;
import com.flipkart.varadhi.web.AuthHandlers;
import com.flipkart.varadhi.web.routes.RouteBehaviour;
import com.flipkart.varadhi.web.routes.RouteBehaviourProvider;
import com.flipkart.varadhi.web.routes.RouteConfigurator;
import com.flipkart.varadhi.web.routes.RouteDefinition;
import com.flipkart.varadhi.web.v1.HealthCheckHandler;
import com.flipkart.varadhi.web.v1.TopicHandlers;
Expand All @@ -29,7 +29,7 @@
public class VerticleDeployer {
private final TopicHandlers topicHandlers;
private final HealthCheckHandler healthCheckHandler;
private final Map<RouteBehaviour, RouteBehaviourProvider> behaviorProviders = new HashMap<>();
private final Map<RouteBehaviour, RouteConfigurator> behaviorProviders = new HashMap<>();

public VerticleDeployer(
Vertx vertx,
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.flipkart.varadhi.auth.AuthorizationProvider;
import com.flipkart.varadhi.exceptions.InvalidConfigException;
import com.flipkart.varadhi.exceptions.VaradhiException;
import com.flipkart.varadhi.web.routes.RouteBehaviourProvider;
import com.flipkart.varadhi.web.routes.RouteConfigurator;
import com.flipkart.varadhi.web.routes.RouteDefinition;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand All @@ -25,7 +25,7 @@

import static java.net.HttpURLConnection.HTTP_OK;

public class AuthHandlers implements RouteBehaviourProvider {
public class AuthHandlers implements RouteConfigurator {
private final Handler<RoutingContext> authenticationHandler;
private final AuthorizationHandlerBuilder authorizationHandlerBuilder;

Expand Down
52 changes: 52 additions & 0 deletions server/src/main/java/com/flipkart/varadhi/web/Extensions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.flipkart.varadhi.web;

import com.flipkart.varadhi.utils.JsonMapper;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;

public class Extensions {

public static class RequestBodyExtension {

/*
Extension method for vertx RequestBody.
builtin asPojo() method is not working because of jackson issues i.e.
it needs default constructor and none final fields.

Extending RequestBody to have asPojo() custom deserializer to convert requestBody to appropriate Pojo.
*/
public static <T> T asPojo(RequestBody body, Class<T> clazz) {
return JsonMapper.jsonDeserialize(body.asString(), clazz);
}
}

@Slf4j
public static class RoutingContextExtension {
public static <T> void endRequestWithResponse(RoutingContext ctx, T response) {
String responseBody = JsonMapper.jsonSerialize(response);
ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
ctx.response().putHeader(HttpHeaders.CONTENT_ENCODING, "utf-8");
ctx.response().end(responseBody, (r) -> {
HttpServerRequest request = ctx.request();
if (r.succeeded()) {
log.debug("Request {}:{} completed successfully.", request.method(), request.path());
} else {
log.error("Request {}:{} Failed to send response: {}", request.method(), request.path(), r.cause());
}
});
}

public static <T> void endRequestWithResponse(RoutingContext ctx, int status, T response) {
ctx.response().setStatusCode(status);
endRequestWithResponse(ctx, response);
}

public static void todo(RoutingContext context) {
context.response().setStatusCode(500).setStatusMessage("Not Implemented").end();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

import io.vertx.ext.web.Route;

public interface RouteBehaviourProvider {
public interface RouteConfigurator {
void configure(Route route, RouteDefinition routeDef);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.flipkart.varadhi.web.v1;

import com.flipkart.varadhi.utils.ResponseExtension;
import com.flipkart.varadhi.web.routes.RouteProvider;
import com.flipkart.varadhi.web.Extensions.RoutingContextExtension;
import com.flipkart.varadhi.web.routes.RouteDefinition;
import com.flipkart.varadhi.web.routes.RouteProvider;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.RoutingContext;
Expand All @@ -15,7 +15,7 @@
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;

@ExtensionMethod({ResponseExtension.class})
@ExtensionMethod({RoutingContextExtension.class})
public class HealthCheckHandler implements Handler<RoutingContext>, RouteProvider {

// TODO: add appropriate checks
Expand Down
Loading