Skip to content

Commit

Permalink
Added status notification support via named pipe
Browse files Browse the repository at this point in the history
- for better control, especially when testing, the manager now supports
  the option "status-fifo", which shall be a path to a named pipe
  (created via mkfifo)
- this file needs to be opened for reading (in order to listen on status
  updates of the manager), when the manager gets started, otherwise it
  will terminate assuming that the caller did something unintended
- on status updates like "all modules started" or similar, the manager
  will print simple strings on this pipe
- the manager does not read from this pipe
- if the caller stops reading from the pipe, the manager continues
  running but stops writing to the pipe when it becomes aware of that
  there is no reader anymore on the pipe

Signed-off-by: aw <[email protected]>
  • Loading branch information
a-w50 committed May 17, 2023
1 parent e85ffac commit 2f4e8a2
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 4 deletions.
36 changes: 36 additions & 0 deletions include/utils/status_fifo.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest
#ifndef STATUS_FIFO_HPP
#define STATUS_FIFO_HPP

#include <string>

namespace Everest {

class StatusFifo {
public:
// defined messages
static constexpr auto ALL_MODULES_STARTED = "ALL_MODULES_STARTED\n";

static StatusFifo create_from_path(const std::string&);
void update(const std::string&);

StatusFifo(StatusFifo const&) = delete;
StatusFifo& operator=(StatusFifo const&) = delete;
// NOTE (aw): the move constructor could be implementented, but we don't need it for now
StatusFifo(StatusFifo&&) = delete;
StatusFifo& operator=(StatusFifo&&) = delete;
~StatusFifo();

private:
StatusFifo() = default;
explicit StatusFifo(int fd_) : fd(fd_), disabled(false), opened(true){};

int fd{-1};
bool disabled{true};
bool opened{false};
};

} // namespace Everest

#endif // STATUS_FIFO_HPP
1 change: 1 addition & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ target_sources(framework
conversions.cpp
thread.cpp
serial.cpp
status_fifo.cpp
date.cpp
runtime.cpp
yaml_loader.cpp
Expand Down
57 changes: 57 additions & 0 deletions lib/status_fifo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest
#include <utils/status_fifo.hpp>

#include <stdexcept>

#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

namespace Everest {

StatusFifo StatusFifo::create_from_path(const std::string& fifo_path) {
if (fifo_path.length() == 0) {
return StatusFifo();
}

// try to open the file
auto fd = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);

// note: for fifo files, opening for write only in non-blocking mode will fail with ENXIO if the other end hasn't
// opened the file yet
if (fd == -1) {
if (errno == ENXIO) {
auto msg = std::string("Failed to open status fifo at ") + fifo_path + " (fifo not opened for read?)";
throw std::runtime_error(msg);
} else {
auto msg =
std::string("Failed to open status fifo at ") + fifo_path + " (fifo file not created with mkfifo?)";
throw std::runtime_error(msg);
}
}

return StatusFifo(fd);
}

void StatusFifo::update(const std::string& message) {
if (disabled) {
return;
}

const auto ret = write(fd, message.c_str(), message.length());
if (ret == -1) {
// NOTE (aw): if we fail to write, we might assume, that the reader of the fifo is not interested in us anymore
// so we won't send any further messages
disabled = true;
}
}

StatusFifo::~StatusFifo() {
if (opened) {
close(fd);
}
}
} // namespace Everest
18 changes: 14 additions & 4 deletions src/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <framework/runtime.hpp>
#include <utils/config.hpp>
#include <utils/mqtt_abstraction.hpp>
#include <utils/status_fifo.hpp>

#include "controller/ipc.hpp"

Expand Down Expand Up @@ -309,7 +310,7 @@ std::mutex modules_ready_mutex;
static std::map<pid_t, std::string> start_modules(Config& config, MQTTAbstraction& mqtt_abstraction,
const std::vector<std::string>& ignored_modules,
const std::vector<std::string>& standalone_modules,
const RuntimeSettings& rs) {
const RuntimeSettings& rs, StatusFifo& status_fifo) {

std::vector<ModuleStartInfo> modules_to_spawn;

Expand All @@ -328,7 +329,8 @@ static std::map<pid_t, std::string> start_modules(Config& config, MQTTAbstractio
auto module_it = modules_ready.emplace(module_name, ModuleReadyInfo{false, nullptr}).first;

Handler module_ready_handler = [module_name, &mqtt_abstraction, standalone_modules,
mqtt_everest_prefix = rs.mqtt_everest_prefix](nlohmann::json json) {
mqtt_everest_prefix = rs.mqtt_everest_prefix,
&status_fifo](nlohmann::json json) {
EVLOG_debug << fmt::format("received module ready signal for module: {}({})", module_name, json.dump());
std::unique_lock<std::mutex> lock(modules_ready_mutex);
// FIXME (aw): here are race conditions, if the ready handler gets called while modules are shut down!
Expand All @@ -350,6 +352,7 @@ static std::map<pid_t, std::string> start_modules(Config& config, MQTTAbstractio
[](const auto& element) { return element.second.ready; })) {
EVLOG_info << fmt::format(TERMINAL_STYLE_OK,
">>> All modules are initialized. EVerest up and running <<<");
status_fifo.update(StatusFifo::ALL_MODULES_STARTED);
mqtt_abstraction.publish(fmt::format("{}ready", mqtt_everest_prefix), nlohmann::json(true));
} else if (!standalone_modules.empty()) {
if (modules_spawned == modules_ready.size() - standalone_modules.size()) {
Expand Down Expand Up @@ -586,6 +589,9 @@ int boot(const po::variables_map& vm) {
ignored_modules = vm["ignore"].as<std::vector<std::string>>();
}

// create StatusFifo object
auto status_fifo = StatusFifo::create_from_path(vm["status-fifo"].as<std::string>());

MQTTAbstraction& mqtt_abstraction = MQTTAbstraction::get_instance(
rs.mqtt_broker_host, std::to_string(rs.mqtt_broker_port), rs.mqtt_everest_prefix, rs.mqtt_external_prefix);
if (!mqtt_abstraction.connect()) {
Expand All @@ -596,7 +602,8 @@ int boot(const po::variables_map& vm) {
mqtt_abstraction.spawn_main_loop_thread();

auto controller_handle = start_controller(rs);
auto module_handles = start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, rs);
auto module_handles =
start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, rs, status_fifo);
bool modules_started = true;
bool restart_modules = false;

Expand Down Expand Up @@ -640,7 +647,8 @@ int boot(const po::variables_map& vm) {
}

if (module_handles.size() == 0 && restart_modules) {
module_handles = start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, rs);
module_handles =
start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, rs, status_fifo);
restart_modules = false;
modules_started = true;
}
Expand Down Expand Up @@ -704,6 +712,8 @@ int main(int argc, char* argv[]) {
desc.add_options()("config", po::value<std::string>(),
"Full path to a config file. If the file does not exist and has no extension, it will be "
"looked up in the default config directory");
desc.add_options()("status-fifo", po::value<std::string>()->default_value(""),
"Path to a named pipe, that shall be used for status updates from the manager");

po::variables_map vm;

Expand Down

0 comments on commit 2f4e8a2

Please sign in to comment.