Skip to content

Commit

Permalink
Added javax.annotation Annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
scravy committed Mar 11, 2018
1 parent cb40ad7 commit eb8243d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 24 deletions.
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
test: clean
mvn verify

clean:
mvn clean
rm -rf coverage

publish: clean
mvn deploy

purge: clean
rm -rf .idea *.iml pom.xml.releaseBackup release.properties
27 changes: 18 additions & 9 deletions src/main/java/com/simplaex/clients/druid/DruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import io.druid.query.Query;
import io.druid.query.QueryPlus;

import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.List;

Expand All @@ -18,7 +21,7 @@ interface EventEmitter extends Emitter {
default void start() {
}

void emit(Event var1);
void emit(Event event);

@Override
default void flush() {
Expand All @@ -30,30 +33,36 @@ default void close() {

}

static DruidClient create(final String hostname, final int port) {
@Nonnull
static DruidClient create(final @Nonnull String hostname, final @Nonnegative int port) {
return create(DruidClientConfig.builder().host(hostname).port(port).build());
}

static DruidClient create(final String hostname, final int port, final EventEmitter eventEmitter) {
@Nonnull
static DruidClient create(final @Nonnull String hostname, final @Nonnegative int port, final @Nonnull EventEmitter eventEmitter) {
return new DruidClientImpl(DruidClientConfig.builder().host(hostname).port(port).eventEmitter(eventEmitter).build());
}

static DruidClient create(final DruidClientConfig config) {
@Nonnull
static DruidClient create(final @Nonnull DruidClientConfig config) {
return new DruidClientImpl(config);
}

default <T> DruidResult<T> run(final Query<T> query) {
@Nonnull
default <T> DruidResult<T> run(final @Nonnull Query<T> query) {
return run(QueryPlus.wrap(query));
}

<T> DruidResult<T> run(QueryPlus<T> queryPlus);
@Nonnull
<T> DruidResult<T> run(@Nonnull QueryPlus<T> queryPlus);

void cancel(DruidResult<?> druidResult);
void cancel(@Nonnull DruidResult<?> druidResult);

default <T> Promise<List<T>> run(final Query<T> query, final Duration timeout) {
@Nonnull
default <T> Promise<List<T>> run(final @Nonnull Query<T> query, final @Nullable Duration timeout) {
return run(QueryPlus.wrap(query), timeout);
}

<T> Promise<List<T>> run(QueryPlus<T> queryPlus, Duration timeout);
<T> Promise<List<T>> run(@Nonnull QueryPlus<T> queryPlus, @Nullable Duration timeout);

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import lombok.*;

import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
Expand All @@ -19,6 +21,7 @@ public class DruidClientConfig {

private final DruidClient.EventEmitter eventEmitter;

@Nonnull
public ExecutorService getExecutorService() {
if (executorService == null) {
if (executorServiceFactory == null) {
Expand All @@ -29,6 +32,7 @@ public ExecutorService getExecutorService() {
return executorService;
}

@Nonnull
public DruidClient.EventEmitter getEventEmitter() {
if (eventEmitter == null) {
return __ -> {
Expand All @@ -37,6 +41,7 @@ public DruidClient.EventEmitter getEventEmitter() {
return eventEmitter;
}

@Nonnegative
public int getPort() {
return port != null && port > 0 ? port : 8080;
}
Expand Down
46 changes: 31 additions & 15 deletions src/main/java/com/simplaex/clients/druid/DruidClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import io.druid.query.topn.*;
import io.druid.server.QueryManager;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -285,8 +287,9 @@ private static HttpClient createHttpClient() {
);
}

@Nonnull
@Override
public <T> DruidResult<T> run(final QueryPlus<T> queryPlus) {
public <T> DruidResult<T> run(@Nonnull final QueryPlus<T> queryPlus) {
final Query<T> query = queryPlus.getQuery();
final Query<T> queryWithId =
query.getId() == null ? query.withId(UUID.randomUUID().toString()) : query;
Expand All @@ -300,26 +303,39 @@ public <T> DruidResult<T> run(final QueryPlus<T> queryPlus) {
return new DruidResult<>(resultSequence, finalQuery);
}

@Nonnull
@Override
public <T> Promise<List<T>> run(final QueryPlus<T> queryPlus, final Duration timeout) {
public <T> Promise<List<T>> run(@Nonnull final QueryPlus<T> queryPlus, @Nullable final Duration timeout) {
final Promise<List<T>> promise = Promise.promise();
final Future<List<T>> future = executorService.submit(() -> {
final DruidResult<T> result = run(queryPlus);
return result.toList();
});
executorService.submit(() -> {
try {
final List<T> resultList = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
promise.fulfill(resultList);
} catch (final Exception exc) {
promise.fail(exc);
}
});
if (timeout == null || timeout.isZero() || timeout.isNegative()) {
executorService.submit(() -> {
try {
final DruidResult<T> result = run(queryPlus);
final List<T> resultList = result.toList();
promise.fulfill(resultList);
} catch (final Exception exc) {
promise.fail(exc);
}
});
} else {
final Future<List<T>> future = executorService.submit(() -> {
final DruidResult<T> result = run(queryPlus);
return result.toList();
});
executorService.submit(() -> {
try {
final List<T> resultList = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
promise.fulfill(resultList);
} catch (final Exception exc) {
promise.fail(exc);
}
});
}
return promise;
}

@Override
public void cancel(final DruidResult<?> druidResult) {
public void cancel(@Nonnull final DruidResult<?> druidResult) {
queryManager.cancelQuery(druidResult.getQueryId());
}

Expand Down

0 comments on commit eb8243d

Please sign in to comment.