Skip to content

Commit

Permalink
Resolve the timeout exception (#336)
Browse files Browse the repository at this point in the history
* fix timeout problem

* skip deploy example
  • Loading branch information
laura-ding authored Aug 17, 2021
1 parent 450b103 commit 35297b6
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ public boolean validateObject(PooledObject<SyncConnection> p) {
return true;
}

@Override
public void activateObject(PooledObject<SyncConnection> p) throws Exception {
if (p.getObject() == null) {
throw new RuntimeException("The connection is null.");
}
if (!p.getObject().ping()) {
throw new RuntimeException("The connection is broken.");
}
super.activateObject(p);
}

public boolean init() {
return loadBalancer.isServersOK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public HostAddress getServerAddress() {

public abstract void open(HostAddress address, int timeout) throws IOErrorException;

public abstract void reopen() throws IOErrorException;

public abstract void close();

public abstract boolean ping();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.BaseObjectPoolConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
Expand All @@ -29,6 +30,8 @@ public class NebulaPool {
private final Logger log = LoggerFactory.getLogger(this.getClass());
// the wait time to get idle connection, unit ms
private int waitTime = 0;
private AtomicBoolean hasInit = new AtomicBoolean(false);
private AtomicBoolean isClosed = new AtomicBoolean(false);

private List<HostAddress> hostToIp(List<HostAddress> addresses)
throws UnknownHostException {
Expand Down Expand Up @@ -77,6 +80,8 @@ private void checkConfig(NebulaPoolConfig config) {
*/
public boolean init(List<HostAddress> addresses, NebulaPoolConfig config)
throws UnknownHostException, InvalidConfigException {
checkInit();
hasInit.set(true);
checkConfig(config);
this.waitTime = config.getWaitTime();
List<HostAddress> newAddrs = hostToIp(addresses);
Expand Down Expand Up @@ -105,6 +110,8 @@ public boolean init(List<HostAddress> addresses, NebulaPoolConfig config)
* close the pool, all connections will be closed
*/
public void close() {
checkClosed();
isClosed.set(true);
this.loadBalancer.close();
this.objectPool.close();
}
Expand All @@ -121,16 +128,19 @@ public void close() {
*/
public Session getSession(String userName, String password, boolean reconnect)
throws NotValidConnectionException, IOErrorException, AuthFailedException {
checkNoInitAndClosed();
SyncConnection connection = null;
try {
SyncConnection connection = getConnection();
connection = getConnection();
AuthResult authResult = connection.authenticate(userName, password);
return new Session(connection, authResult, this, reconnect);
} catch (NotValidConnectionException | AuthFailedException | IOErrorException e) {
throw e;
} catch (IllegalStateException e) {
throw new NotValidConnectionException(e.getMessage());
} catch (Exception e) {
throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
// if get the connection succeeded, but authenticate failed,
// needs to return connection to pool
if (connection != null) {
setInvalidateConnection(connection);
}
throw e;
}
}

Expand All @@ -139,6 +149,7 @@ public Session getSession(String userName, String password, boolean reconnect)
* @return the active connection number
*/
public int getActiveConnNum() {
checkNoInitAndClosed();
return objectPool.getNumActive();
}

Expand All @@ -147,6 +158,7 @@ public int getActiveConnNum() {
* @return the idle connection number
*/
public int getIdleConnNum() {
checkNoInitAndClosed();
return objectPool.getNumIdle();
}

Expand All @@ -155,6 +167,7 @@ public int getIdleConnNum() {
* @return the waiting connection number
*/
public int getWaitersNum() {
checkNoInitAndClosed();
return objectPool.getNumWaiters();
}

Expand All @@ -163,6 +176,7 @@ public int getWaitersNum() {
* it is called by Session and NebulaPool
*/
protected void updateServerStatus() {
checkNoInitAndClosed();
if (objectPool.getFactory() instanceof ConnObjectPool) {
((ConnObjectPool)objectPool.getFactory()).updateServerStatus();
}
Expand All @@ -173,6 +187,7 @@ protected void updateServerStatus() {
* @param connection the invalidate connection
*/
protected void setInvalidateConnection(SyncConnection connection) {
checkNoInitAndClosed();
try {
objectPool.invalidateObject(connection);
} catch (Exception e) {
Expand All @@ -185,52 +200,42 @@ protected void setInvalidateConnection(SyncConnection connection) {
* @param connection the return connection
*/
protected void returnConnection(SyncConnection connection) {
checkNoInitAndClosed();
objectPool.returnObject(connection);
}

protected SyncConnection getConnection() throws NotValidConnectionException, IOErrorException {
// If no idle connection, try once
int idleConnNum = getIdleConnNum();
int retry = idleConnNum == 0 ? 1 : idleConnNum;
SyncConnection connection = null;
boolean hasOkConn = false;
protected SyncConnection getConnection() throws NotValidConnectionException {
checkNoInitAndClosed();
try {
while (retry-- > 0) {
connection = objectPool.borrowObject(waitTime);
if (connection == null) {
continue;
}
if (!connection.ping()) {
log.info("The connection is broken, set invalidateObject");
setInvalidateConnection(connection);
continue;
}
hasOkConn = true;
break;
}
// All idle connections are broken, so need to create new one
if (!hasOkConn) {
connection = objectPool.borrowObject(waitTime);
if (connection == null) {
throw new NotValidConnectionException("Get null connection from the pool");
}
if (!connection.ping()) {
log.info("The connection is broken, set invalidateObject");
setInvalidateConnection(connection);
throw new NotValidConnectionException("The connection ping failed.");
}
log.info("Create new connection");
}
log.info(String.format("Get connection to %s:%d",
connection.getServerAddress().getHost(),
connection.getServerAddress().getPort()));
return connection;
} catch (NotValidConnectionException | IOErrorException e) {
throw e;
} catch (IllegalStateException e) {
throw new NotValidConnectionException(e.getMessage());
return objectPool.borrowObject(waitTime);
} catch (Exception e) {
throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
throw new NotValidConnectionException(e.getMessage());
}
}

private void checkNoInit() throws RuntimeException {
if (!hasInit.get()) {
throw new RuntimeException(
"The pool has not been initialized, please initialize it first.");
}
}

private void checkInit() throws RuntimeException {
if (hasInit.get()) {
throw new RuntimeException(
"The pool has already been initialized. "
+ "Please do not initialize the pool repeatedly.");
}
}

private void checkNoInitAndClosed() throws RuntimeException {
checkNoInit();
checkClosed();
}

private void checkClosed() throws RuntimeException {
if (isClosed.get()) {
throw new RuntimeException("The pool has closed. Couldn't use again.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ public synchronized void release() {
if (connection == null) {
return;
}
connection.signout(sessionID);
try {
connection.signout(sessionID);
pool.returnConnection(connection);
} catch (Exception e) {
log.warn("Return object to pool failed.");
log.warn("Release session or return object to pool failed:" + e.getMessage());
}
connection = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ public class SyncConnection extends Connection {
protected TTransport transport = null;
protected TProtocol protocol = null;
private GraphService.Client client = null;
private int timeout = 0;

@Override
public void open(HostAddress address, int timeout) throws IOErrorException {
this.serverAddr = address;
try {
int newTimeout = timeout <= 0 ? Integer.MAX_VALUE : timeout;
this.timeout = timeout <= 0 ? Integer.MAX_VALUE : timeout;
this.transport = new TSocket(
address.getHost(), address.getPort(), newTimeout, newTimeout);
address.getHost(), address.getPort(), this.timeout, this.timeout);
this.transport.open();
this.protocol = new TCompactProtocol(transport);
client = new GraphService.Client(protocol);
Expand All @@ -41,20 +42,44 @@ public void open(HostAddress address, int timeout) throws IOErrorException {
}
}

/*
* Because the code generated by Fbthrift does not handle the seqID,
* the message will be dislocation when the timeout occurs,
* resulting in unexpected response,
* so when the timeout occurs,
* the connection will be reopened to avoid the impact of the message.
* So when timeout happend need to use reopen
*
* @throws IOErrorException if io problem happen
*/
@Override
public void reopen() throws IOErrorException {
close();
open(serverAddr, timeout);
}

public AuthResult authenticate(String user, String password)
throws AuthFailedException, IOErrorException {
throws AuthFailedException, IOErrorException {
try {
AuthResponse resp = client.authenticate(user.getBytes(), password.getBytes());
if (resp.error_code != ErrorCode.SUCCEEDED) {
throw new AuthFailedException(new String(resp.error_msg).intern());
if (resp.error_msg != null) {
throw new AuthFailedException(new String(resp.error_msg));
} else {
throw new AuthFailedException(
"The error_msg is null, "
+ "maybe the service not set or the response is disorder.");
}
}
return new AuthResult(resp.getSession_id(), resp.getTime_zone_offset_seconds());
} catch (TException e) {
if (e instanceof TTransportException) {
TTransportException te = (TTransportException)e;
if (te.getType() == TTransportException.END_OF_FILE) {
throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
} else if (te.getType() == TTransportException.TIMED_OUT) {
} else if (te.getType() == TTransportException.TIMED_OUT
|| te.getMessage().contains("Read timed out")) {
reopen();
throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
} else if (te.getType() == TTransportException.NOT_OPEN) {
throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
Expand All @@ -73,18 +98,20 @@ public ExecutionResponse execute(long sessionID, String stmt)
TTransportException te = (TTransportException) e;
if (te.getType() == TTransportException.END_OF_FILE) {
throw new IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.getMessage());
} else if (te.getType() == TTransportException.NOT_OPEN) {
throw new IOErrorException(IOErrorException.E_NO_OPEN, te.getMessage());
} else if (te.getType() == TTransportException.TIMED_OUT
|| te.getMessage().contains("Read timed out")) {
reopen();
throw new IOErrorException(IOErrorException.E_TIME_OUT, te.getMessage());
}
}
throw new IOErrorException(IOErrorException.E_UNKNOWN, e.getMessage());
}
}

public void signout(long sessionId) {
try {
client.signout(sessionId);
} catch (TException e) {
this.close();
}
client.signout(sessionId);
}

@Override
Expand Down
Loading

0 comments on commit 35297b6

Please sign in to comment.