From 1fb7242ead25a89542dab655d7c5d64e4db6817e Mon Sep 17 00:00:00 2001 From: Aaron Marburg Date: Wed, 31 Jan 2024 10:11:58 -0800 Subject: [PATCH 1/4] Implement connection tracking and basic exception handler. --- include/liboculus/DataRx.h | 6 +++++- lib/DataRx.cpp | 5 ++++- lib/IoServiceThread.cpp | 10 +++++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/include/liboculus/DataRx.h b/include/liboculus/DataRx.h index 3e9738b..1e6af94 100644 --- a/include/liboculus/DataRx.h +++ b/include/liboculus/DataRx.h @@ -49,7 +49,10 @@ class DataRx : public OculusMessageHandler { explicit DataRx(const IoServiceThread::IoContextPtr &iosrv); ~DataRx(); - bool isConnected() const { return _socket.is_open(); } + // The socket interface can't actually track if a socket is _good_ + // just that it's _open_. + // So we track this state outselves. + bool isConnected() const { return _is_connected; } void connect(const boost::asio::ip::address &addr); void connect(const std::string &strAddr); @@ -105,6 +108,7 @@ class DataRx : public OculusMessageHandler { shared_ptr _buffer; OnConnectCallback _onConnectCallback; + bool _is_connected; }; // class DataRx diff --git a/lib/DataRx.cpp b/lib/DataRx.cpp index 494b45f..eb31e84 100644 --- a/lib/DataRx.cpp +++ b/lib/DataRx.cpp @@ -42,7 +42,8 @@ DataRx::DataRx(const IoServiceThread::IoContextPtr& iosrv) : OculusMessageHandler(), _socket(*iosrv), _buffer(std::make_shared()), - _onConnectCallback() {} + _onConnectCallback(), + _is_connected(false) {} DataRx::~DataRx() {} @@ -68,12 +69,14 @@ void DataRx::connect(const std::string& strAddr) { void DataRx::onConnect(const boost::system::error_code& ec) { if (ec) { LOG(WARNING) << "Error on connect: " << ec.message(); + _is_connected = false; _socket.close(); return; } LOG(DEBUG) << "Connected to sonar!"; + _is_connected = true; restartReceiveCycle(); if (_onConnectCallback) _onConnectCallback(); } diff --git a/lib/IoServiceThread.cpp b/lib/IoServiceThread.cpp index 9c9ffdf..9196a8a 100644 --- a/lib/IoServiceThread.cpp +++ b/lib/IoServiceThread.cpp @@ -31,6 +31,7 @@ #include "liboculus/IoServiceThread.h" #include +#include namespace liboculus { @@ -67,6 +68,13 @@ void IoServiceThread::join() { _thread.reset(); } -void IoServiceThread::threadExec() { _context->run(); } +void IoServiceThread::threadExec() { + try { + _context->run(); + } catch (std::exception& ex) { + std::cerr << "!! Unhandled ASIO exception in IoServiceThread: " << ex.what() + << std::endl; + } +} } // namespace liboculus From 92b8e9b2032feef208f5f23aae4491b5ee882041 Mon Sep 17 00:00:00 2001 From: Aaron Marburg Date: Wed, 31 Jan 2024 10:40:00 -0800 Subject: [PATCH 2/4] Added explicit check on send --- include/liboculus/DataRx.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/include/liboculus/DataRx.h b/include/liboculus/DataRx.h index 1e6af94..da16002 100644 --- a/include/liboculus/DataRx.h +++ b/include/liboculus/DataRx.h @@ -126,9 +126,14 @@ void DataRx::sendSimpleFireMessage(const SonarConfiguration &config) { data = config.serialize(); if (data.size() > 0) { - auto result = _socket.send(boost::asio::buffer(data)); - LOG(DEBUG) << "Sent " << result << " bytes to sonar"; - haveWritten(data); + try { + auto result = _socket.send(boost::asio::buffer(data)); + LOG(DEBUG) << "Sent " << result << " bytes to sonar"; + haveWritten(data); + } catch (boost::system::system_error &ex) { + LOG(WARNING) << "Exception when sending: " << ex.what(); + _is_connected = false; + } } } From 6973a7fb88d2dec133c5e2c9a2882d56b862e556 Mon Sep 17 00:00:00 2001 From: Aaron Marburg Date: Fri, 9 Feb 2024 11:08:39 -0800 Subject: [PATCH 3/4] Added onDisconnect and Timeout callbacks --- include/liboculus/DataRx.h | 21 ++++++++++++++-- lib/DataRx.cpp | 49 +++++++++++++++++++++++++++----------- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/include/liboculus/DataRx.h b/include/liboculus/DataRx.h index da16002..b092cfa 100644 --- a/include/liboculus/DataRx.h +++ b/include/liboculus/DataRx.h @@ -30,6 +30,7 @@ #pragma once +#include #include #include #include @@ -57,11 +58,23 @@ class DataRx : public OculusMessageHandler { void connect(const boost::asio::ip::address &addr); void connect(const std::string &strAddr); + void disconnect(); + typedef std::function OnConnectCallback; void setOnConnectCallback(OnConnectCallback callback) { _onConnectCallback = callback; } + typedef std::function OnDisconnectCallback; + void setOnDisconnectCallback(OnDisconnectCallback callback) { + _onDisconnectCallback = callback; + } + + typedef std::function OnTimeoutCallback; + void setOnTimeoutCallback(OnTimeoutCallback callback) { + _onTimeoutCallback = callback; + } + // By default, this function sends the config to the sonar // as an OculusSimpleFireMessage2. // @@ -77,6 +90,7 @@ class DataRx : public OculusMessageHandler { private: void onConnect(const boost::system::error_code &error); + void onTimeout(const boost::system::error_code &error); // Initiates a network read. // Note this function reads until the **total number of bytes @@ -108,8 +122,11 @@ class DataRx : public OculusMessageHandler { shared_ptr _buffer; OnConnectCallback _onConnectCallback; - bool _is_connected; + OnDisconnectCallback _onDisconnectCallback; + OnTimeoutCallback _onTimeoutCallback; + boost::asio::deadline_timer timeout_timer_; + bool _is_connected; }; // class DataRx template @@ -132,7 +149,7 @@ void DataRx::sendSimpleFireMessage(const SonarConfiguration &config) { haveWritten(data); } catch (boost::system::system_error &ex) { LOG(WARNING) << "Exception when sending: " << ex.what(); - _is_connected = false; + disconnect(); } } } diff --git a/lib/DataRx.cpp b/lib/DataRx.cpp index eb31e84..43b220e 100644 --- a/lib/DataRx.cpp +++ b/lib/DataRx.cpp @@ -43,7 +43,8 @@ DataRx::DataRx(const IoServiceThread::IoContextPtr& iosrv) _socket(*iosrv), _buffer(std::make_shared()), _onConnectCallback(), - _is_connected(false) {} + _is_connected(false), + timeout_timer_(*iosrv, boost::posix_time::seconds(2)) {} DataRx::~DataRx() {} @@ -53,7 +54,8 @@ void DataRx::connect(const asio::ip::address& addr) { uint16_t port = liboculus::DataPort; boost::asio::ip::tcp::endpoint sonarEndpoint(addr, port); - LOG(INFO) << "Connecting to sonar at " << sonarEndpoint; + LOG(DEBUG) << "Connecting to sonar at " << sonarEndpoint; + _is_connected = true; _socket.async_connect(sonarEndpoint, boost::bind(&DataRx::onConnect, this, _1)); @@ -68,19 +70,33 @@ void DataRx::connect(const std::string& strAddr) { void DataRx::onConnect(const boost::system::error_code& ec) { if (ec) { - LOG(WARNING) << "Error on connect: " << ec.message(); - _is_connected = false; - _socket.close(); - + LOG(DEBUG) << "Error on connect: " << ec.message(); + disconnect(); return; } - LOG(DEBUG) << "Connected to sonar!"; - _is_connected = true; + LOG(WARNING) << "Successful connection to sonar!"; restartReceiveCycle(); if (_onConnectCallback) _onConnectCallback(); } +void DataRx::disconnect() { + LOG(DEBUG) << " ... disconnecting"; + _socket.close(); + _is_connected = false; + if (_onDisconnectCallback) _onDisconnectCallback(); +} + +void DataRx::onTimeout(const boost::system::error_code& ec) { + if (ec == boost::asio::error::operation_aborted) { + return; + } else if (ec) { + LOG(WARNING) << "Error on timeout " << ec.message(); + } + LOG(DEBUG) << "!! timeout"; + if (_onTimeoutCallback) _onTimeoutCallback(); +} + //=== Readers void DataRx::readUpTo(size_t bytes, StateMachineCallback callback) { const size_t current_sz = _buffer->size(); @@ -104,6 +120,12 @@ void DataRx::restartReceiveCycle() { } else { _buffer->clear(); } + + // Reset timeout timer; this should cancel existing pending timeouts + timeout_timer_.expires_from_now(boost::posix_time::seconds(2)); + timeout_timer_.async_wait( + boost::bind(&DataRx::onTimeout, this, boost::placeholders::_1)); + readUpTo(sizeof(uint8_t), boost::bind(&DataRx::rxFirstByteOculusId, this, _1, _2)); } @@ -113,22 +135,21 @@ void DataRx::restartReceiveCycle() { void DataRx::rxFirstByteOculusId(const boost::system::error_code& ec, std::size_t bytes_transferred) { if (ec) { + // Failure of this first read usually indicates a network failure LOG(WARNING) << "Error on receive of header: " << ec.message(); - goto exit; + disconnect(); + return; } if (bytes_transferred != sizeof(uint8_t)) { - goto exit; + restartReceiveCycle(); + return; } if (_buffer->data()[0] == liboculus::PacketHeaderLSB) { readUpTo(sizeof(uint16_t), boost::bind(&DataRx::rxSecondByteOculusId, this, _1, _2)); - return; } - -exit: - restartReceiveCycle(); } void DataRx::rxSecondByteOculusId(const boost::system::error_code& ec, From c0756f01c3496aed11b75232ae0362840d021aa3 Mon Sep 17 00:00:00 2001 From: Aaron Marburg Date: Sun, 11 Feb 2024 10:49:04 -0800 Subject: [PATCH 4/4] Add code to catch timeouts --- include/liboculus/DataRx.h | 36 +++++++++++++++++++++++-- lib/DataRx.cpp | 54 +++++++++++++++++++++++++++----------- 2 files changed, 72 insertions(+), 18 deletions(-) diff --git a/include/liboculus/DataRx.h b/include/liboculus/DataRx.h index b092cfa..3b66f98 100644 --- a/include/liboculus/DataRx.h +++ b/include/liboculus/DataRx.h @@ -45,6 +45,28 @@ namespace liboculus { using std::shared_ptr; +template +class MutexedVariable { + public: + MutexedVariable(const T &initial_value) : var_(initial_value), mutex_() { ; } + + T get() const { + std::lock_guard guard(mutex_); + return var_; + } + + T set(const T &value) { + std::lock_guard guard(mutex_); + var_ = value; + return var_; + } + + private: + // Mutable to allow locking mutex w/o breaking const correctness + mutable std::mutex mutex_; + T var_; +}; + class DataRx : public OculusMessageHandler { public: explicit DataRx(const IoServiceThread::IoContextPtr &iosrv); @@ -53,7 +75,7 @@ class DataRx : public OculusMessageHandler { // The socket interface can't actually track if a socket is _good_ // just that it's _open_. // So we track this state outselves. - bool isConnected() const { return _is_connected; } + bool isConnected() const { return is_connected_.get(); } void connect(const boost::asio::ip::address &addr); void connect(const std::string &strAddr); @@ -124,9 +146,19 @@ class DataRx : public OculusMessageHandler { OnConnectCallback _onConnectCallback; OnDisconnectCallback _onDisconnectCallback; OnTimeoutCallback _onTimeoutCallback; + + int timeout_secs_; boost::asio::deadline_timer timeout_timer_; - bool _is_connected; + MutexedVariable is_connected_; + + // There are many cases where a sonar disappearing (e.g. failing) + // can't be distinguished from a sonar simply not being present. + // + // This flag is essentially "send each error message once" + // to reduce driver verbosity. It is reset on good communications + // with the sonar. + bool has_complained_; }; // class DataRx template diff --git a/lib/DataRx.cpp b/lib/DataRx.cpp index 43b220e..3bfcf2f 100644 --- a/lib/DataRx.cpp +++ b/lib/DataRx.cpp @@ -43,8 +43,9 @@ DataRx::DataRx(const IoServiceThread::IoContextPtr& iosrv) _socket(*iosrv), _buffer(std::make_shared()), _onConnectCallback(), - _is_connected(false), - timeout_timer_(*iosrv, boost::posix_time::seconds(2)) {} + is_connected_(false), + timeout_secs_(2), + timeout_timer_(*iosrv) {} DataRx::~DataRx() {} @@ -54,8 +55,8 @@ void DataRx::connect(const asio::ip::address& addr) { uint16_t port = liboculus::DataPort; boost::asio::ip::tcp::endpoint sonarEndpoint(addr, port); - LOG(DEBUG) << "Connecting to sonar at " << sonarEndpoint; - _is_connected = true; + LOG(DEBUG) << "Attempting to connect to sonar at " << sonarEndpoint; + is_connected_.set(true); _socket.async_connect(sonarEndpoint, boost::bind(&DataRx::onConnect, this, _1)); @@ -73,9 +74,12 @@ void DataRx::onConnect(const boost::system::error_code& ec) { LOG(DEBUG) << "Error on connect: " << ec.message(); disconnect(); return; + } else if (!isConnected()) { + // A separate thread could have failed independently.. + return; } - LOG(WARNING) << "Successful connection to sonar!"; + LOG(INFO) << "Successful connection to sonar!"; restartReceiveCycle(); if (_onConnectCallback) _onConnectCallback(); } @@ -83,7 +87,7 @@ void DataRx::onConnect(const boost::system::error_code& ec) { void DataRx::disconnect() { LOG(DEBUG) << " ... disconnecting"; _socket.close(); - _is_connected = false; + is_connected_.set(false); if (_onDisconnectCallback) _onDisconnectCallback(); } @@ -93,7 +97,8 @@ void DataRx::onTimeout(const boost::system::error_code& ec) { } else if (ec) { LOG(WARNING) << "Error on timeout " << ec.message(); } - LOG(DEBUG) << "!! timeout"; + LOG(DEBUG) << "!! No data from sonar in " << timeout_secs_ + << " seconds, timeout"; if (_onTimeoutCallback) _onTimeoutCallback(); } @@ -122,7 +127,8 @@ void DataRx::restartReceiveCycle() { } // Reset timeout timer; this should cancel existing pending timeouts - timeout_timer_.expires_from_now(boost::posix_time::seconds(2)); + const auto timeout = boost::posix_time::seconds(timeout_secs_); + timeout_timer_.expires_from_now(timeout); timeout_timer_.async_wait( boost::bind(&DataRx::onTimeout, this, boost::placeholders::_1)); @@ -134,9 +140,13 @@ void DataRx::restartReceiveCycle() { void DataRx::rxFirstByteOculusId(const boost::system::error_code& ec, std::size_t bytes_transferred) { - if (ec) { + if (ec.value() == boost::asio::error::basic_errors::operation_aborted) { + LOG(DEBUG) << "Receive cancelled, giving up..."; + return; + } else if (ec) { // Failure of this first read usually indicates a network failure - LOG(WARNING) << "Error on receive of header: " << ec.message(); + LOG(WARNING) << "Error on receive of header: " << ec.value() << " " + << ec.message(); disconnect(); return; } @@ -154,8 +164,12 @@ void DataRx::rxFirstByteOculusId(const boost::system::error_code& ec, void DataRx::rxSecondByteOculusId(const boost::system::error_code& ec, std::size_t bytes_transferred) { - if (ec) { - LOG(WARNING) << "Error on receive of header: " << ec.message(); + if (ec.value() == boost::asio::error::basic_errors::operation_aborted) { + LOG(DEBUG) << "Receive ancelled, giving up..."; + return; + } else if (ec) { + LOG(WARNING) << "Error on receive of header: " << ec.value() << " " + << ec.message(); goto exit; } @@ -177,8 +191,12 @@ void DataRx::rxSecondByteOculusId(const boost::system::error_code& ec, void DataRx::rxHeader(const boost::system::error_code& ec, std::size_t bytes_transferred) { - if (ec) { - LOG(WARNING) << "Error on receive of header: " << ec.message(); + if (ec.value() == boost::asio::error::basic_errors::operation_aborted) { + LOG(DEBUG) << "Receive cancelled, giving up..."; + return; + } else if (ec) { + LOG(WARNING) << "Error on receive of header: " << ec.value() << " " + << ec.message(); return; } @@ -209,8 +227,12 @@ void DataRx::rxPacket(const boost::system::error_code& ec, std::size_t bytes_transferred) { MessageHeader hdr(_buffer); - if (ec) { - LOG(WARNING) << "Error on receive of packet data: " << ec.message(); + if (ec.value() == boost::asio::error::basic_errors::operation_aborted) { + LOG(DEBUG) << "Receive cancelled, giving up..."; + return; + } else if (ec) { + LOG(WARNING) << "Error on receive of packet data: " << ec.value() << " " + << ec.message(); goto exit; }