Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diregard whitespaces in connection string + Implement connection retries to limit connection attempts. #18

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,16 @@ Factory method to create a new zookeeper [client](#client) instance.
* `sessionTimeout` Session timeout in milliseconds, defaults to 30 seconds.
* `spinDelay` The delay (in milliseconds) between each connection attempts.
* `retries` The number of retry attempts for connection loss exception.
* `connectionRetries` The number of retry attempts to connect to ZooKeeper (intial connection + connection loss), defaults to -1 in which the client will try to connect indefinetly.

Defaults options:

```javascript
{
sessionTimeout: 30000,
spinDelay : 1000,
retries : 0
retries : 0,
connectionRetries : -1
}
```

Expand Down Expand Up @@ -637,6 +639,7 @@ instances:
* `State.DISCONNECTED` - The connection between client and server is dropped.
* `State.EXPIRED` - The client session is expired.
* `State.AUTH_FAILED` - Failed to authenticate with the server.
* `State.CONNECT_TIMEOUT` - Timeout trying to connect to server.

```javascript
client.on('state', function (state) {
Expand Down
6 changes: 5 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ var ConnectionManager = require('./lib/ConnectionManager.js');
var CLIENT_DEFAULT_OPTIONS = {
sessionTimeout : 30000, // Default to 30 seconds.
spinDelay : 1000, // Defaults to 1 second.
retries : 0 // Defaults to 0, no retry.
retries : 0, // Defaults to 0, no retry.
connectionRetries : -1 // Defaults to -1, retry indefinetly.
};

var DATA_SIZE_LIMIT = 1048576; // 1 mega bytes.
Expand Down Expand Up @@ -252,6 +253,9 @@ Client.prototype.onConnectionManagerState = function (connectionManagerState) {
case ConnectionManager.STATES.AUTHENTICATION_FAILED:
state = State.AUTH_FAILED;
break;
case ConnectionManager.STATES.CONNECT_TIMEOUT:
state = State.CONNECT_TIMEOUT;
break;
default:
// Not a event in which client is interested, so skip it.
return;
Expand Down
65 changes: 45 additions & 20 deletions lib/ConnectionManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ var STATES = { // Connection States.
CLOSING : -1,
CLOSED : -2,
SESSION_EXPIRED : -3,
AUTHENTICATION_FAILED : -4
AUTHENTICATION_FAILED : -4,
CONNECT_TIMEOUT : -5
};


Expand Down Expand Up @@ -58,6 +59,8 @@ function ConnectionManager(connectionString, options, stateListener) {

this.options = options;
this.spinDelay = options.spinDelay;
this.connectionRetries = options.connectionRetries * this.servers.length;
this.connectionRetriesAttempts = 0;

this.updateTimeout(options.sessionTimeout);
this.connectTimeoutHandler = null;
Expand Down Expand Up @@ -134,19 +137,27 @@ ConnectionManager.prototype.findNextServer = function (callback) {
self.nextServerIndex %= self.servers.length;

if (self.serverAttempts === self.servers.length) {
setTimeout(function () {
callback(self.servers[self.nextServerIndex]);
self.nextServerIndex += 1;
if ((self.connectionRetries >= 0) && (self.connectionRetriesAttempts > self.connectionRetries)) {
callback(null);
} else {
setTimeout(function () {
callback(self.servers[self.nextServerIndex]);
self.nextServerIndex += 1;

// reset attempts since we already waited for enough time.
self.serverAttempts = 0;
}, Math.random() * self.spinDelay);
// reset attempts since we already waited for enough time.
self.serverAttempts = 0;

if (self.connectionRetries >= 0) {
self.connectionRetriesAttempts += 1;
}
}, Math.random() * self.spinDelay);
}
} else {
self.serverAttempts += 1;

process.nextTick(function () {
callback(self.servers[self.nextServerIndex]);
self.nextServerIndex += 1;
self.findNextServer(callback);
});
}
};
Expand Down Expand Up @@ -217,21 +228,32 @@ ConnectionManager.prototype.connect = function () {
self.setState(STATES.CONNECTING);

self.findNextServer(function (server) {
self.socket = net.connect(server);

self.connectTimeoutHandler = setTimeout(
self.onSocketConnectTimeout.bind(self),
self.connectTimeout
);
if (server === null) {
self.socket.destroy();

// Disable the Nagle algorithm.
self.socket.setNoDelay();
if (this.connectTimeoutHandler) {
clearTimeout(this.connectTimeoutHandler);
}

self.socket.on('connect', self.onSocketConnected.bind(self));
self.socket.on('data', self.onSocketData.bind(self));
self.socket.on('drain', self.onSocketDrain.bind(self));
self.socket.on('close', self.onSocketClosed.bind(self));
self.socket.on('error', self.onSocketError.bind(self));
self.setState(STATES.CONNECT_TIMEOUT);
} else {
self.socket = net.connect(server);

self.connectTimeoutHandler = setTimeout(
self.onSocketConnectTimeout.bind(self),
self.connectTimeout
);

// Disable the Nagle algorithm.
self.socket.setNoDelay();

self.socket.on('connect', self.onSocketConnected.bind(self));
self.socket.on('data', self.onSocketData.bind(self));
self.socket.on('drain', self.onSocketDrain.bind(self));
self.socket.on('close', self.onSocketClosed.bind(self));
self.socket.on('error', self.onSocketError.bind(self));
}
});
};

Expand Down Expand Up @@ -435,6 +457,9 @@ ConnectionManager.prototype.onSocketData = function (buffer) {
// Reset the server connection attempts since we connected now.
self.serverAttempts = 0;

// Reset connection retries
this.connectionRetriesAttempts = 0;

self.sessionId = connectResponse.sessionId;
self.sessionPassword = connectResponse.passwd;
self.updateTimeout(connectResponse.timeOut);
Expand Down
20 changes: 15 additions & 5 deletions lib/ConnectionStringParser.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ function ConnectionStringParser(connectionString) {
'connectionString must be a non-empty string.'
);

// Remove all white spaces
connectionString = connectionString.replace(/\s+/g, '');

assert(
connectionString.trim() !== '',
'connectionString must be a non-empty string.'
);

this.connectionString = connectionString;

// Handle chroot
Expand All @@ -56,13 +64,15 @@ function ConnectionStringParser(connectionString) {

hostList.filter(function (item) {
// Filter out empty string.
return item;
return item.trim();
}).forEach(function (item) {
var parts = item.split(':');
servers.push({
host : parts[0],
port : parts[1] || DEFAULT_PORT
});
if (parts[0] !== '') {
servers.push({
host: parts[0],
port : parts[1] || DEFAULT_PORT
});
}
});

assert(
Expand Down
3 changes: 2 additions & 1 deletion lib/State.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ var STATES = {
AUTH_FAILED : new State('AUTH_FAILED', 4),
CONNECTED_READ_ONLY : new State('CONNECTED_READ_ONLY', 5),
SASL_AUTHENTICATED : new State('SASL_AUTHENTICATED', 6),
EXPIRED : new State('EXPIRED', -122)
EXPIRED : new State('EXPIRED', -122),
CONNECT_TIMEOUT : new State('CONNECT_TIMEOUT', -123),
};

module.exports = STATES;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-zookeeper-client",
"version": "0.2.0",
"version": "0.2.2",
"description": "A pure Javascript ZooKeeper client for Node.js.",
"author": "Alex Guan <[email protected]>",
"main": "index.js",
Expand Down
22 changes: 22 additions & 0 deletions test/lib/ConnectionStringParser.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ describe('ConnectionStringParser', function () {
to.throw('non-empty string');
expect(function () { return new ConnectionStringParser(''); }).
to.throw('non-empty string');
expect(function () { return new ConnectionStringParser(' '); }).
to.throw('non-empty string');
});

it('should reject invalid chroot path', function () {
Expand All @@ -40,6 +42,13 @@ describe('ConnectionStringParser', function () {

expect(parser.getConnectionString()).to.equal(s);
});

it('should diregard white spaces', function () {
var s = ' localhost : 2181 ',
parser = new ConnectionStringParser(s);

expect(parser.getConnectionString()).to.equal('localhost:2181');
});
});

describe('getChrootPath', function () {
Expand Down Expand Up @@ -75,7 +84,20 @@ describe('ConnectionStringParser', function () {
expect(servers).to.have.deep.property('[0].port').match(/218[12]/);
expect(servers).to.have.deep.property('[1].host', 'localhost');
expect(servers).to.have.deep.property('[1].port').match(/218[12]/);
});

it('should return an array of host:port objects diregarding white spaces', function () {
var s = ', localhost : 2181, localhost , localhost:, , ',
parser = new ConnectionStringParser(s),
servers = parser.getServers();

expect(servers).to.be.instanceof(Array).that.have.length(3);
expect(servers).to.have.deep.property('[0].host', 'localhost');
expect(servers).to.have.deep.property('[0].port').match(/218[12]/);
expect(servers).to.have.deep.property('[1].host', 'localhost');
expect(servers).to.have.deep.property('[1].port').match(/218[12]/);
expect(servers).to.have.deep.property('[2].host', 'localhost');
expect(servers).to.have.deep.property('[2].port').match(/218[12]/);
});

it('should add default port if port is not provided', function () {
Expand Down
1 change: 1 addition & 0 deletions test/lib/State.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe('State', function () {
expect(State.CONNECTED_READ_ONLY).to.exist;
expect(State.SASL_AUTHENTICATED).to.exist;
expect(State.EXPIRED).to.exist;
expect(State.CONNECT_TIMEOUT).to.exist;
});
});
});
Expand Down