Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues/cluster from dns #352

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/redis/client/Redis.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static Redis createClient(Vertx vertx) {
}

/**
* Create a new redis client using the default client options. Does not support rediss (redis over ssl scheme) for now.
* Create a new redis client using the default client options.
* @param connectionString a string URI following the scheme: redis://[username:password@][host][:port][/database]
* @param vertx the vertx instance
* @return the client
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/vertx/redis/client/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,17 @@ default Number toNumber() {
default @Nullable Double toDouble() {
final String msg = toString();
if (msg != null) {
return Double.parseDouble(msg);
switch (msg) {
case "-nan":
case "nan":
return Double.NaN;
case "inf":
return Double.POSITIVE_INFINITY;
case "-inf":
return Double.NEGATIVE_INFINITY;
default:
return Double.parseDouble(msg);
}
}
return null;
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/vertx/redis/client/impl/ReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,12 @@ Number readNumber(int end, NumericType type) {
case DECIMAL:
if (bytes.length == 3 && bytes[0] == 'i' && bytes[1] == 'n' && bytes[2] == 'f') {
number = Double.POSITIVE_INFINITY;
} else if (bytes.length == 3 && bytes[0] == 'n' && bytes[1] == 'a' && bytes[2] == 'n') {
number = Double.NaN;
} else if (bytes.length == 4 && bytes[0] == '-' && bytes[1] == 'i' && bytes[2] == 'n' && bytes[3] == 'f') {
number = Double.NEGATIVE_INFINITY;
} else if (bytes.length == 4 && bytes[0] == '-' && bytes[1] == 'n' && bytes[2] == 'a' && bytes[3] == 'n') {
number = Double.NaN;
} else {
number = Double.parseDouble(new String(bytes, StandardCharsets.US_ASCII));
}
Expand Down
19 changes: 18 additions & 1 deletion src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.vertx.redis.client.impl;

import io.vertx.core.*;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.redis.client.*;
Expand Down Expand Up @@ -120,10 +121,14 @@ public static void addMasterOnlyCommand(Command command) {
}

private final RedisOptions options;
private final DnsClient dns;

public RedisClusterClient(Vertx vertx, RedisOptions options) {
super(vertx, options);
this.options = options;
// TODO: allow DNS options
this.dns = vertx.createDnsClient();

// validate options
if (options.getMaxPoolWaiting() < options.getMaxPoolSize()) {
throw new IllegalStateException("Invalid options: maxPoolWaiting < maxPoolSize");
Expand All @@ -136,7 +141,19 @@ public RedisClusterClient(Vertx vertx, RedisOptions options) {
public Future<RedisConnection> connect() {
final Promise<RedisConnection> promise = vertx.promise();
// attempt to load the slots from the first good endpoint
connect(options.getEndpoints(), 0, promise);
List<String> endpoints = options.getEndpoints();
if (endpoints.size() == 1) {
dns.resolveA(endpoints.get(0))
.onFailure(err -> {
// default to regular single node lookup
connect(options.getEndpoints(), 0, promise);
})
.onSuccess(resolvedEndpoints -> {
connect(resolvedEndpoints, 0, promise);
});
} else {
connect(options.getEndpoints(), 0, promise);
}
return promise.future();
}

Expand Down
49 changes: 49 additions & 0 deletions src/test/java/io/vertx/redis/client/test/RedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1114,4 +1114,53 @@ public void testCommandWithoutReadOrWrite(TestContext should) {
});
});
}

@Test(timeout = 30_000)
public void autoFindNodesByName(TestContext should) {
final Async test = should.async();

final RedisOptions options = new RedisOptions()
.setType(RedisClientType.CLUSTER)
.setConnectionString("redis://localhost:7000")
// we will flood the redis server
.setMaxWaitingHandlers(128 * 1024)
.setMaxPoolSize(8)
.setMaxPoolWaiting(16);

// we only provide 1 node

final Redis client2 = Redis.createClient(rule.vertx(), options);

client2
.connect(onCreate -> {
should.assertTrue(onCreate.succeeded());

final RedisConnection cluster = onCreate.result();
cluster.exceptionHandler(should::fail);

final int len = (int) Math.pow(2, 17);
final AtomicInteger counter = new AtomicInteger();

for (int i = 0; i < len; i++) {
final String id = Integer.toString(i);
cluster.send(cmd(SET).arg(id).arg(id), set -> {
should.assertTrue(set.succeeded());
cluster.send(cmd(GET).arg(id), get -> {
should.assertTrue(get.succeeded());
should.assertEquals(id, get.result().toString());

final int cnt = counter.incrementAndGet();
if (cnt % 1024 == 0) {
System.out.print('.');
}

if (cnt == len) {
client2.close();
test.complete();
}
});
});
}
});
}
}
13 changes: 13 additions & 0 deletions src/test/java/io/vertx/test/redis/RedisClient7Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,17 @@ public void testBooleanVarArgs(TestContext should) {
});
});
}

@Test
public void testNaN(TestContext should) {
final Async test = should.async();
final String key = makeKey();

client.send(cmd(EVAL).arg("return tostring(0/0)").arg(0))
.onFailure(should::fail)
.onSuccess(ok -> {
should.assertTrue(Double.isNaN(ok.toDouble()));
test.complete();
});
}
}