diff --git a/components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/DataEndpoint.java b/components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/DataEndpoint.java index 8f9c316e36..7f592ebe19 100644 --- a/components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/DataEndpoint.java +++ b/components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/DataEndpoint.java @@ -73,6 +73,8 @@ public abstract class DataEndpoint { public static HashMap delayMap=new HashMap(); + public boolean invalidateTransportPool = false; + public long getReConnectTimestamp() { return reConnectTimestamp; } @@ -349,6 +351,15 @@ public void setPoolSemaphore(Semaphore semaphore) { private void publish() throws DataEndpointException, SessionTimeoutException, UndefinedEventTypeException { Object client = getClient(); + if (invalidateTransportPool) { + log.debug( + "invalidateTransportPool' is 'true'. Going to discard existing client and get new client " + + "for the DataEndpoint"); + discardClient(client); + client = getClient(); + invalidateTransportPool = false; + log.debug("'invalidateTransportPool' is set to 'false' for the DataEndpoint"); + } try { send(client, this.events); semaphoreRelease(); diff --git a/components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/binary/BinaryDataEndpoint.java b/components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/binary/BinaryDataEndpoint.java index 431fedc1d4..8f6d392340 100644 --- a/components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/binary/BinaryDataEndpoint.java +++ b/components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/binary/BinaryDataEndpoint.java @@ -17,6 +17,8 @@ */ package org.wso2.carbon.databridge.agent.endpoint.binary; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.wso2.carbon.databridge.agent.endpoint.DataEndpoint; import org.wso2.carbon.databridge.agent.exception.DataEndpointAuthenticationException; import org.wso2.carbon.databridge.agent.exception.DataEndpointException; @@ -35,6 +37,8 @@ */ public class BinaryDataEndpoint extends DataEndpoint { + private static Log log = LogFactory.getLog(BinaryDataEndpoint.class); + @Override protected String login(Object client, String userName, String password) throws DataEndpointAuthenticationException, DataEndpointLoginException { @@ -84,6 +88,8 @@ protected void send(Object client, List events) throws DataEndpointExcept } else if (e instanceof SessionTimeoutException) { throw new SessionTimeoutException("Binary Session Expired Exception ", e); } else { + log.debug("Setting 'invalidateTransportPool' to 'true' for binary data transport"); + this.invalidateTransportPool = true; throw new DataEndpointException("Error while trying to publish events to data receiver :" + socket.getRemoteSocketAddress().toString(), e); }