diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..fbb6335 --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +test: clean + mvn verify + +clean: + mvn clean + rm -rf coverage + +publish: clean + mvn deploy + +purge: clean + rm -rf .idea *.iml pom.xml.releaseBackup release.properties diff --git a/src/main/java/com/simplaex/clients/druid/DruidClient.java b/src/main/java/com/simplaex/clients/druid/DruidClient.java index aa64744..5e5c586 100644 --- a/src/main/java/com/simplaex/clients/druid/DruidClient.java +++ b/src/main/java/com/simplaex/clients/druid/DruidClient.java @@ -6,6 +6,9 @@ import io.druid.query.Query; import io.druid.query.QueryPlus; +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.time.Duration; import java.util.List; @@ -18,7 +21,7 @@ interface EventEmitter extends Emitter { default void start() { } - void emit(Event var1); + void emit(Event event); @Override default void flush() { @@ -30,30 +33,36 @@ default void close() { } - static DruidClient create(final String hostname, final int port) { + @Nonnull + static DruidClient create(final @Nonnull String hostname, final @Nonnegative int port) { return create(DruidClientConfig.builder().host(hostname).port(port).build()); } - static DruidClient create(final String hostname, final int port, final EventEmitter eventEmitter) { + @Nonnull + static DruidClient create(final @Nonnull String hostname, final @Nonnegative int port, final @Nonnull EventEmitter eventEmitter) { return new DruidClientImpl(DruidClientConfig.builder().host(hostname).port(port).eventEmitter(eventEmitter).build()); } - static DruidClient create(final DruidClientConfig config) { + @Nonnull + static DruidClient create(final @Nonnull DruidClientConfig config) { return new DruidClientImpl(config); } - default DruidResult run(final Query query) { + @Nonnull + default DruidResult run(final @Nonnull Query query) { return run(QueryPlus.wrap(query)); } - DruidResult run(QueryPlus queryPlus); + @Nonnull + DruidResult run(@Nonnull QueryPlus queryPlus); - void cancel(DruidResult druidResult); + void cancel(@Nonnull DruidResult druidResult); - default Promise> run(final Query query, final Duration timeout) { + @Nonnull + default Promise> run(final @Nonnull Query query, final @Nullable Duration timeout) { return run(QueryPlus.wrap(query), timeout); } - Promise> run(QueryPlus queryPlus, Duration timeout); + Promise> run(@Nonnull QueryPlus queryPlus, @Nullable Duration timeout); } diff --git a/src/main/java/com/simplaex/clients/druid/DruidClientConfig.java b/src/main/java/com/simplaex/clients/druid/DruidClientConfig.java index 73afa58..b342fac 100644 --- a/src/main/java/com/simplaex/clients/druid/DruidClientConfig.java +++ b/src/main/java/com/simplaex/clients/druid/DruidClientConfig.java @@ -2,6 +2,8 @@ import lombok.*; +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Supplier; @@ -19,6 +21,7 @@ public class DruidClientConfig { private final DruidClient.EventEmitter eventEmitter; + @Nonnull public ExecutorService getExecutorService() { if (executorService == null) { if (executorServiceFactory == null) { @@ -29,6 +32,7 @@ public ExecutorService getExecutorService() { return executorService; } + @Nonnull public DruidClient.EventEmitter getEventEmitter() { if (eventEmitter == null) { return __ -> { @@ -37,6 +41,7 @@ public DruidClient.EventEmitter getEventEmitter() { return eventEmitter; } + @Nonnegative public int getPort() { return port != null && port > 0 ? port : 8080; } diff --git a/src/main/java/com/simplaex/clients/druid/DruidClientImpl.java b/src/main/java/com/simplaex/clients/druid/DruidClientImpl.java index 8384c7f..6347848 100644 --- a/src/main/java/com/simplaex/clients/druid/DruidClientImpl.java +++ b/src/main/java/com/simplaex/clients/druid/DruidClientImpl.java @@ -41,6 +41,8 @@ import io.druid.query.topn.*; import io.druid.server.QueryManager; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.time.Duration; import java.util.HashMap; @@ -285,8 +287,9 @@ private static HttpClient createHttpClient() { ); } + @Nonnull @Override - public DruidResult run(final QueryPlus queryPlus) { + public DruidResult run(@Nonnull final QueryPlus queryPlus) { final Query query = queryPlus.getQuery(); final Query queryWithId = query.getId() == null ? query.withId(UUID.randomUUID().toString()) : query; @@ -300,26 +303,39 @@ public DruidResult run(final QueryPlus queryPlus) { return new DruidResult<>(resultSequence, finalQuery); } + @Nonnull @Override - public Promise> run(final QueryPlus queryPlus, final Duration timeout) { + public Promise> run(@Nonnull final QueryPlus queryPlus, @Nullable final Duration timeout) { final Promise> promise = Promise.promise(); - final Future> future = executorService.submit(() -> { - final DruidResult result = run(queryPlus); - return result.toList(); - }); - executorService.submit(() -> { - try { - final List resultList = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - promise.fulfill(resultList); - } catch (final Exception exc) { - promise.fail(exc); - } - }); + if (timeout == null || timeout.isZero() || timeout.isNegative()) { + executorService.submit(() -> { + try { + final DruidResult result = run(queryPlus); + final List resultList = result.toList(); + promise.fulfill(resultList); + } catch (final Exception exc) { + promise.fail(exc); + } + }); + } else { + final Future> future = executorService.submit(() -> { + final DruidResult result = run(queryPlus); + return result.toList(); + }); + executorService.submit(() -> { + try { + final List resultList = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + promise.fulfill(resultList); + } catch (final Exception exc) { + promise.fail(exc); + } + }); + } return promise; } @Override - public void cancel(final DruidResult druidResult) { + public void cancel(@Nonnull final DruidResult druidResult) { queryManager.cancelQuery(druidResult.getQueryId()); }