Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement connection tracking and basic exception handler. #35

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions include/liboculus/DataRx.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#pragma once

#include <chrono>
#include <memory>
#include <string>
#include <thread>
Expand All @@ -44,21 +45,58 @@ namespace liboculus {

using std::shared_ptr;

template <typename T>
class MutexedVariable {
public:
MutexedVariable(const T &initial_value) : var_(initial_value), mutex_() { ; }

T get() const {
std::lock_guard<std::mutex> guard(mutex_);
return var_;
}

T set(const T &value) {
std::lock_guard<std::mutex> guard(mutex_);
var_ = value;
return var_;
}

private:
// Mutable to allow locking mutex w/o breaking const correctness
mutable std::mutex mutex_;
T var_;
};

class DataRx : public OculusMessageHandler {
public:
explicit DataRx(const IoServiceThread::IoContextPtr &iosrv);
~DataRx();

bool isConnected() const { return _socket.is_open(); }
// The socket interface can't actually track if a socket is _good_
// just that it's _open_.
// So we track this state outselves.
bool isConnected() const { return is_connected_.get(); }

void connect(const boost::asio::ip::address &addr);
void connect(const std::string &strAddr);

void disconnect();

typedef std::function<void()> OnConnectCallback;
void setOnConnectCallback(OnConnectCallback callback) {
_onConnectCallback = callback;
}

typedef std::function<void()> OnDisconnectCallback;
void setOnDisconnectCallback(OnDisconnectCallback callback) {
_onDisconnectCallback = callback;
}

typedef std::function<void()> OnTimeoutCallback;
void setOnTimeoutCallback(OnTimeoutCallback callback) {
_onTimeoutCallback = callback;
}

// By default, this function sends the config to the sonar
// as an OculusSimpleFireMessage2.
//
Expand All @@ -74,6 +112,7 @@ class DataRx : public OculusMessageHandler {

private:
void onConnect(const boost::system::error_code &error);
void onTimeout(const boost::system::error_code &error);

// Initiates a network read.
// Note this function reads until the **total number of bytes
Expand Down Expand Up @@ -105,7 +144,21 @@ class DataRx : public OculusMessageHandler {
shared_ptr<ByteVector> _buffer;

OnConnectCallback _onConnectCallback;
OnDisconnectCallback _onDisconnectCallback;
OnTimeoutCallback _onTimeoutCallback;

int timeout_secs_;
boost::asio::deadline_timer timeout_timer_;

MutexedVariable<bool> is_connected_;

// There are many cases where a sonar disappearing (e.g. failing)
// can't be distinguished from a sonar simply not being present.
//
// This flag is essentially "send each error message once"
// to reduce driver verbosity. It is reset on good communications
// with the sonar.
bool has_complained_;
}; // class DataRx

template <typename FireMsg_t = OculusSimpleFireMessage2>
Expand All @@ -122,9 +175,14 @@ void DataRx::sendSimpleFireMessage(const SonarConfiguration &config) {
data = config.serialize<FireMsg_t>();

if (data.size() > 0) {
auto result = _socket.send(boost::asio::buffer(data));
LOG(DEBUG) << "Sent " << result << " bytes to sonar";
haveWritten(data);
try {
auto result = _socket.send(boost::asio::buffer(data));
LOG(DEBUG) << "Sent " << result << " bytes to sonar";
haveWritten(data);
} catch (boost::system::system_error &ex) {
LOG(WARNING) << "Exception when sending: " << ex.what();
disconnect();
}
}
}

Expand Down
86 changes: 66 additions & 20 deletions lib/DataRx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ DataRx::DataRx(const IoServiceThread::IoContextPtr& iosrv)
: OculusMessageHandler(),
_socket(*iosrv),
_buffer(std::make_shared<ByteVector>()),
_onConnectCallback() {}
_onConnectCallback(),
is_connected_(false),
timeout_secs_(2),
timeout_timer_(*iosrv) {}

DataRx::~DataRx() {}

Expand All @@ -52,7 +55,8 @@ void DataRx::connect(const asio::ip::address& addr) {
uint16_t port = liboculus::DataPort;

boost::asio::ip::tcp::endpoint sonarEndpoint(addr, port);
LOG(INFO) << "Connecting to sonar at " << sonarEndpoint;
LOG(DEBUG) << "Attempting to connect to sonar at " << sonarEndpoint;
is_connected_.set(true);

_socket.async_connect(sonarEndpoint,
boost::bind(&DataRx::onConnect, this, _1));
Expand All @@ -67,17 +71,37 @@ void DataRx::connect(const std::string& strAddr) {

void DataRx::onConnect(const boost::system::error_code& ec) {
if (ec) {
LOG(WARNING) << "Error on connect: " << ec.message();
_socket.close();

LOG(DEBUG) << "Error on connect: " << ec.message();
disconnect();
return;
} else if (!isConnected()) {
// A separate thread could have failed independently..
return;
}

LOG(DEBUG) << "Connected to sonar!";
LOG(INFO) << "Successful connection to sonar!";
restartReceiveCycle();
if (_onConnectCallback) _onConnectCallback();
}

void DataRx::disconnect() {
LOG(DEBUG) << " ... disconnecting";
_socket.close();
is_connected_.set(false);
if (_onDisconnectCallback) _onDisconnectCallback();
}

void DataRx::onTimeout(const boost::system::error_code& ec) {
if (ec == boost::asio::error::operation_aborted) {
return;
} else if (ec) {
LOG(WARNING) << "Error on timeout " << ec.message();
}
LOG(DEBUG) << "!! No data from sonar in " << timeout_secs_
<< " seconds, timeout";
if (_onTimeoutCallback) _onTimeoutCallback();
}

//=== Readers
void DataRx::readUpTo(size_t bytes, StateMachineCallback callback) {
const size_t current_sz = _buffer->size();
Expand All @@ -101,6 +125,13 @@ void DataRx::restartReceiveCycle() {
} else {
_buffer->clear();
}

// Reset timeout timer; this should cancel existing pending timeouts
const auto timeout = boost::posix_time::seconds(timeout_secs_);
timeout_timer_.expires_from_now(timeout);
timeout_timer_.async_wait(
boost::bind(&DataRx::onTimeout, this, boost::placeholders::_1));

readUpTo(sizeof(uint8_t),
boost::bind(&DataRx::rxFirstByteOculusId, this, _1, _2));
}
Expand All @@ -109,29 +140,36 @@ void DataRx::restartReceiveCycle() {

void DataRx::rxFirstByteOculusId(const boost::system::error_code& ec,
std::size_t bytes_transferred) {
if (ec) {
LOG(WARNING) << "Error on receive of header: " << ec.message();
goto exit;
if (ec.value() == boost::asio::error::basic_errors::operation_aborted) {
LOG(DEBUG) << "Receive cancelled, giving up...";
return;
} else if (ec) {
// Failure of this first read usually indicates a network failure
LOG(WARNING) << "Error on receive of header: " << ec.value() << " "
<< ec.message();
disconnect();
return;
}

if (bytes_transferred != sizeof(uint8_t)) {
goto exit;
restartReceiveCycle();
return;
}

if (_buffer->data()[0] == liboculus::PacketHeaderLSB) {
readUpTo(sizeof(uint16_t),
boost::bind(&DataRx::rxSecondByteOculusId, this, _1, _2));
return;
}

exit:
restartReceiveCycle();
}

void DataRx::rxSecondByteOculusId(const boost::system::error_code& ec,
std::size_t bytes_transferred) {
if (ec) {
LOG(WARNING) << "Error on receive of header: " << ec.message();
if (ec.value() == boost::asio::error::basic_errors::operation_aborted) {
LOG(DEBUG) << "Receive ancelled, giving up...";
return;
} else if (ec) {
LOG(WARNING) << "Error on receive of header: " << ec.value() << " "
<< ec.message();
goto exit;
}

Expand All @@ -153,8 +191,12 @@ void DataRx::rxSecondByteOculusId(const boost::system::error_code& ec,

void DataRx::rxHeader(const boost::system::error_code& ec,
std::size_t bytes_transferred) {
if (ec) {
LOG(WARNING) << "Error on receive of header: " << ec.message();
if (ec.value() == boost::asio::error::basic_errors::operation_aborted) {
LOG(DEBUG) << "Receive cancelled, giving up...";
return;
} else if (ec) {
LOG(WARNING) << "Error on receive of header: " << ec.value() << " "
<< ec.message();
return;
}

Expand Down Expand Up @@ -185,8 +227,12 @@ void DataRx::rxPacket(const boost::system::error_code& ec,
std::size_t bytes_transferred) {
MessageHeader hdr(_buffer);

if (ec) {
LOG(WARNING) << "Error on receive of packet data: " << ec.message();
if (ec.value() == boost::asio::error::basic_errors::operation_aborted) {
LOG(DEBUG) << "Receive cancelled, giving up...";
return;
} else if (ec) {
LOG(WARNING) << "Error on receive of packet data: " << ec.value() << " "
<< ec.message();
goto exit;
}

Expand Down
10 changes: 9 additions & 1 deletion lib/IoServiceThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "liboculus/IoServiceThread.h"

#include <boost/bind.hpp>
#include <iostream>

namespace liboculus {

Expand Down Expand Up @@ -67,6 +68,13 @@ void IoServiceThread::join() {
_thread.reset();
}

void IoServiceThread::threadExec() { _context->run(); }
void IoServiceThread::threadExec() {
try {
_context->run();
} catch (std::exception& ex) {
std::cerr << "!! Unhandled ASIO exception in IoServiceThread: " << ex.what()
<< std::endl;
}
}

} // namespace liboculus