Skip to content

Commit

Permalink
support wildcard mqtt subscribe (#215)
Browse files Browse the repository at this point in the history
* feat: support wildcard mqtt subscribe

Subscribe supports wildcard topics
There is a second callback option that provides the full topic of the received
message.

Also added const where appropriate
New fucntion to check whether external MQTT is configured

Signed-off-by: James Chapman <[email protected]>

* proposal for unique_ptr and const ref

Signed-off-by: aw <[email protected]>

* bugfix(lifetime): changed from unique_ptr to shared_ptr

- as there might be multiple handlers (running in different), which could consume the same message, it is not clear which handler will release the shared message
- hence, a shared pointer is used

Signed-off-by: aw <[email protected]>

* Bump version to 0.18.1

Signed-off-by: Kai-Uwe Hermann <[email protected]>

---------

Signed-off-by: James Chapman <[email protected]>
Signed-off-by: aw <[email protected]>
Signed-off-by: Kai-Uwe Hermann <[email protected]>
Co-authored-by: aw <[email protected]>
Co-authored-by: Kai-Uwe Hermann <[email protected]>
  • Loading branch information
3 people authored Dec 3, 2024
1 parent bb43ed6 commit 09fb55f
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 102 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.14)

project(everest-framework
VERSION 0.18.0
VERSION 0.18.1
DESCRIPTION "The open operating system for e-mobility charging stations"
LANGUAGES CXX C
)
Expand Down
6 changes: 6 additions & 0 deletions include/framework/ModuleAdapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct ModuleAdapter {
using GetErrorStateMonitorReqFunc = std::function<std::shared_ptr<error::ErrorStateMonitor>(const Requirement&)>;
using ExtMqttPublishFunc = std::function<void(const std::string&, const std::string&)>;
using ExtMqttSubscribeFunc = std::function<UnsubscribeToken(const std::string&, StringHandler)>;
using ExtMqttSubscribePairFunc = std::function<UnsubscribeToken(const std::string&, StringPairHandler)>;
using TelemetryPublishFunc =
std::function<void(const std::string&, const std::string&, const std::string&, const TelemetryMap&)>;
using GetMappingFunc = std::function<std::optional<ModuleTierMappings>()>;
Expand All @@ -106,6 +107,7 @@ struct ModuleAdapter {
GetGlobalErrorStateMonitorFunc get_global_error_state_monitor;
ExtMqttPublishFunc ext_mqtt_publish;
ExtMqttSubscribeFunc ext_mqtt_subscribe;
ExtMqttSubscribePairFunc ext_mqtt_subscribe_pair;
std::vector<cmd> registered_commands;
TelemetryPublishFunc telemetry_publish;
GetMappingFunc get_mapping;
Expand Down Expand Up @@ -158,6 +160,10 @@ class MqttProvider {
return ev.ext_mqtt_subscribe(topic, std::move(handler));
}

UnsubscribeToken subscribe(const std::string& topic, StringPairHandler handler) const {
return ev.ext_mqtt_subscribe_pair(topic, std::move(handler));
}

private:
ModuleAdapter& ev;
};
Expand Down
24 changes: 24 additions & 0 deletions include/framework/everest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#define FRAMEWORK_EVEREST_HPP

#include <chrono>
#include <functional>
#include <future>
#include <map>
#include <set>
Expand Down Expand Up @@ -127,6 +128,11 @@ class Everest {
///
UnsubscribeToken provide_external_mqtt_handler(const std::string& topic, const StringHandler& handler);

///
/// \brief Allows a module to indicate that it provides a external mqtt \p handler at the given \p topic
///
UnsubscribeToken provide_external_mqtt_handler(const std::string& topic, const StringPairHandler& handler);

///
/// \brief publishes the given telemetry \p data on the given \p topic
///
Expand Down Expand Up @@ -243,6 +249,24 @@ class Everest {
/// raised. The given \p clear_callback is called when an error is cleared
///
void subscribe_global_all_errors(const error::ErrorCallback& callback, const error::ErrorCallback& clear_callback);

///
/// \brief Check that external MQTT is configured - raises exception on error
///
void check_external_mqtt();

///
/// \brief Check that external MQTT is configured - raises exception on error
/// \returns the full external MQTT topic
///
std::string check_external_mqtt(const std::string& topic);

///
/// \brief Create external MQTT with an unsubscribe token
/// \returns the unsubscribe token
///
UnsubscribeToken create_external_handler(const std::string& topic, const std::string& external_topic,
const StringPairHandler& handler);
};

///
Expand Down
20 changes: 13 additions & 7 deletions include/utils/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,33 @@ using json = nlohmann::json;
struct Message {
std::string topic; ///< The MQTT topic where this message originated from
std::string payload; ///< The message payload
};

Message(const std::string& topic, const std::string& payload);
struct ParsedMessage {
std::string topic;
json data;
};

using MessageCallback = std::function<void(const Message&)>;

/// \brief Simple message queue that takes std::string messages, parsed them and dispatches them to handlers
class MessageQueue {

private:
std::thread worker_thread;
std::queue<std::shared_ptr<Message>> message_queue;
std::queue<std::unique_ptr<Message>> message_queue;
std::mutex queue_ctrl_mutex;
std::function<void(std::shared_ptr<Message> message)> message_callback;
MessageCallback message_callback;
std::condition_variable cv;
bool running;

public:
/// \brief Creates a message queue with the provided \p message_callback
explicit MessageQueue(const std::function<void(std::shared_ptr<Message> message)>& message_callback);
explicit MessageQueue(MessageCallback);
~MessageQueue();

/// \brief Adds a \p message to the message queue which will then be delivered to the message callback
void add(std::shared_ptr<Message> message);
void add(std::unique_ptr<Message>);

/// \brief Stops the message queue
void stop();
Expand All @@ -54,7 +60,7 @@ class MessageHandler {
private:
std::unordered_set<std::shared_ptr<TypedHandler>> handlers;
std::thread handler_thread;
std::queue<std::shared_ptr<json>> message_queue;
std::queue<std::shared_ptr<ParsedMessage>> message_queue;
std::mutex handler_ctrl_mutex;
std::mutex handler_list_mutex;
std::condition_variable cv;
Expand All @@ -68,7 +74,7 @@ class MessageHandler {
~MessageHandler();

/// \brief Adds a \p message to the message queue which will be delivered to the registered handlers
void add(std::shared_ptr<json> message);
void add(std::shared_ptr<ParsedMessage>);

/// \brief Stops the message handler
void stop();
Expand Down
2 changes: 1 addition & 1 deletion include/utils/mqtt_abstraction_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class MQTTAbstractionImpl {
static int open_nb_socket(const char* addr, const char* port);
bool connectBroker(std::string& socket_path);
bool connectBroker(const char* host, const char* port);
void on_mqtt_message(std::shared_ptr<Message> message);
void on_mqtt_message(const Message& message);
void on_mqtt_connect();
static void on_mqtt_disconnect();

Expand Down
3 changes: 2 additions & 1 deletion include/utils/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ using ModuleConfigs = std::map<std::string, ConfigMap>;
using Array = json::array_t;
using Object = json::object_t;
// TODO (aw): can we pass the handler arguments by const ref?
using Handler = std::function<void(json)>;
using Handler = std::function<void(const std::string&, json)>;
using StringHandler = std::function<void(std::string)>;
using StringPairHandler = std::function<void(const std::string& topic, const std::string& data)>;

enum class HandlerType {
Call,
Expand Down
104 changes: 58 additions & 46 deletions lib/everest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest
#include <future>
#include <map>
#include <memory>
#include <set>

#include <boost/any.hpp>
Expand Down Expand Up @@ -38,7 +39,7 @@ Everest::Everest(std::string module_id_, const Config& config_, bool validate_da
const std::string& telemetry_prefix, bool telemetry_enabled) :
mqtt_abstraction(mqtt_server_socket_path, mqtt_server_address, std::to_string(mqtt_server_port),
mqtt_everest_prefix, mqtt_external_prefix),
config(std::move(config_)),
config(config_),
module_id(std::move(module_id_)),
remote_cmd_res_timeout(remote_cmd_res_timeout_seconds),
validate_data_with_schema(validate_data_with_schema),
Expand Down Expand Up @@ -185,7 +186,7 @@ Everest::Everest(std::string module_id_, const Config& config_, bool validate_da
}

// register handler for global ready signal
Handler handle_ready_wrapper = [this](json data) { this->handle_ready(data); };
auto handle_ready_wrapper = [this](const std::string&, json data) { this->handle_ready(data); };
std::shared_ptr<TypedHandler> everest_ready =
std::make_shared<TypedHandler>(HandlerType::ExternalMQTT, std::make_shared<Handler>(handle_ready_wrapper));
this->mqtt_abstraction.register_handler(fmt::format("{}ready", mqtt_everest_prefix), everest_ready, QOS::QOS2);
Expand Down Expand Up @@ -235,7 +236,7 @@ void Everest::publish_metadata() {
if (manifest.contains("provides")) {
metadata["provides"] = json({});

for (auto& provides : manifest.at("provides").items()) {
for (const auto& provides : manifest.at("provides").items()) {
metadata["provides"][provides.key()] = json({});
metadata["provides"][provides.key()]["interface"] = provides.value().at("interface");
}
Expand All @@ -261,7 +262,7 @@ void Everest::check_code() {

json module_manifest =
this->config.get_manifests()[this->config.get_main_config()[this->module_id]["module"].get<std::string>()];
for (auto& element : module_manifest["provides"].items()) {
for (const auto& element : module_manifest["provides"].items()) {
auto const& impl_id = element.key();
auto impl_manifest = element.value();
auto interface_definition = this->config.get_interface_definition(impl_manifest.at("interface"));
Expand Down Expand Up @@ -361,7 +362,7 @@ json Everest::call_cmd(const Requirement& req, const std::string& cmd_name, json
std::promise<json> res_promise;
std::future<json> res_future = res_promise.get_future();

Handler res_handler = [this, &res_promise, call_id, connection, cmd_name, return_type](json data) {
auto res_handler = [this, &res_promise, call_id, connection, cmd_name, return_type](const std::string&, json data) {
auto& data_id = data.at("id");
if (data_id != call_id) {
EVLOG_debug << fmt::format("RES: data_id != call_id ({} != {})", data_id, call_id);
Expand Down Expand Up @@ -477,8 +478,8 @@ void Everest::subscribe_var(const Requirement& req, const std::string& var_name,

auto requirement_manifest_vardef = requirement_impl_manifest["vars"][var_name];

Handler handler = [this, requirement_module_id, requirement_impl_id, requirement_manifest_vardef, var_name,
callback](json const& data) {
auto handler = [this, requirement_module_id, requirement_impl_id, requirement_manifest_vardef, var_name,
callback](const std::string&, json const& data) {
EVLOG_verbose << fmt::format(
"Incoming {}->{}", this->config.printable_identifier(requirement_module_id, requirement_impl_id), var_name);

Expand Down Expand Up @@ -543,16 +544,17 @@ void Everest::subscribe_error(const Requirement& req, const error::ErrorType& er
return;
}

Handler raise_handler = [this, requirement_module_id, requirement_impl_id, error_type, callback](json const& data) {
auto raise_handler = [this, requirement_module_id, requirement_impl_id, error_type, callback](const std::string&,
json const& data) {
EVLOG_debug << fmt::format("Incoming error {}->{}",
this->config.printable_identifier(requirement_module_id, requirement_impl_id),
error_type);

callback(data.get<error::Error>());
};

Handler clear_handler = [this, requirement_module_id, requirement_impl_id, error_type,
clear_callback](json const& data) {
auto clear_handler = [this, requirement_module_id, requirement_impl_id, error_type,
clear_callback](const std::string&, json const& data) {
EVLOG_debug << fmt::format("Error cleared {}->{}",
this->config.printable_identifier(requirement_module_id, requirement_impl_id),
error_type);
Expand Down Expand Up @@ -640,15 +642,15 @@ void Everest::subscribe_global_all_errors(const error::ErrorCallback& callback,
return;
}

Handler raise_handler = [this, callback](json const& data) {
auto raise_handler = [this, callback](const std::string&, json const& data) {
error::Error error = data.get<error::Error>();
EVLOG_debug << fmt::format(
"Incoming error {}->{}",
this->config.printable_identifier(error.origin.module_id, error.origin.implementation_id), error.type);
callback(error);
};

Handler clear_handler = [this, clear_callback](json const& data) {
auto clear_handler = [this, clear_callback](const std::string&, json const& data) {
error::Error error = data.get<error::Error>();
EVLOG_debug << fmt::format(
"Incoming error cleared {}->{}",
Expand Down Expand Up @@ -705,43 +707,32 @@ void Everest::publish_cleared_error(const std::string& impl_id, const error::Err

void Everest::external_mqtt_publish(const std::string& topic, const std::string& data) {
BOOST_LOG_FUNCTION();

// check if external mqtt is enabled
if (!this->module_manifest.contains("enable_external_mqtt") &&
this->module_manifest["enable_external_mqtt"] == false) {
EVLOG_AND_THROW(EverestApiError(fmt::format("Module {} tries to subscribe to an external MQTT topic, but "
"didn't set 'enable_external_mqtt' to 'true' in its manifest",
this->config.printable_identifier(this->module_id))));
}

check_external_mqtt();
this->mqtt_abstraction.publish(fmt::format("{}{}", this->mqtt_external_prefix, topic), data);
}

UnsubscribeToken Everest::provide_external_mqtt_handler(const std::string& topic, const StringHandler& handler) {
BOOST_LOG_FUNCTION();
auto external_topic = check_external_mqtt(topic);
return create_external_handler(
topic, external_topic, [handler, external_topic](const std::string&, json const& data) {
EVLOG_verbose << fmt::format("Incoming external mqtt data for topic '{}'...", external_topic);
if (!data.is_string()) {
EVLOG_AND_THROW(
EverestInternalError("External mqtt result is not a string (that should never happen)"));
}
handler(data.get<std::string>());
});
}

// check if external mqtt is enabled
if (!this->module_manifest.contains("enable_external_mqtt") &&
this->module_manifest["enable_external_mqtt"] == false) {
EVLOG_AND_THROW(EverestApiError(fmt::format("Module {} tries to provide an external MQTT handler, but didn't "
"set 'enable_external_mqtt' to 'true' in its manifest",
this->config.printable_identifier(this->module_id))));
}

std::string external_topic = fmt::format("{}{}", this->mqtt_external_prefix, topic);

Handler external_handler = [handler, external_topic](json const& data) {
EVLOG_verbose << fmt::format("Incoming external mqtt data for topic '{}'...", external_topic);
if (!data.is_string()) {
EVLOG_AND_THROW(EverestInternalError("External mqtt result is not a string (that should never happen)"));
}
handler(data.get<std::string>());
};

std::shared_ptr<TypedHandler> token =
std::make_shared<TypedHandler>(HandlerType::ExternalMQTT, std::make_shared<Handler>(external_handler));
this->mqtt_abstraction.register_handler(external_topic, token, QOS::QOS0);
return [this, topic, token]() { this->mqtt_abstraction.unregister_handler(topic, token); };
UnsubscribeToken Everest::provide_external_mqtt_handler(const std::string& topic, const StringPairHandler& handler) {
BOOST_LOG_FUNCTION();
auto external_topic = check_external_mqtt(topic);
return create_external_handler(topic, external_topic, [handler](const std::string& topic, const json& data) {
EVLOG_verbose << fmt::format("Incoming external mqtt data for topic '{}'...", topic);
const std::string data_s = (data.is_string()) ? std::string(data) : data.dump();
handler(topic, data_s);
});
}

void Everest::telemetry_publish(const std::string& topic, const std::string& data) {
Expand Down Expand Up @@ -838,7 +829,7 @@ void Everest::provide_cmd(const std::string impl_id, const std::string cmd_name,
const auto cmd_topic = fmt::format("{}/cmd", this->config.mqtt_prefix(this->module_id, impl_id));

// define command wrapper
Handler wrapper = [this, cmd_topic, impl_id, cmd_name, handler, cmd_definition](json data) {
auto wrapper = [this, cmd_topic, impl_id, cmd_name, handler, cmd_definition](const std::string&, json data) {
BOOST_LOG_FUNCTION();

std::set<std::string> arg_names;
Expand Down Expand Up @@ -931,7 +922,7 @@ void Everest::provide_cmd(const cmd& cmd) {
json cmd_definition = get_cmd_definition(this->module_id, impl_id, cmd_name, false);

std::set<std::string> arg_names;
for (auto& arg_type : arg_types) {
for (const auto& arg_type : arg_types) {
arg_names.insert(arg_type.first);
}

Expand Down Expand Up @@ -1043,7 +1034,7 @@ std::string Everest::check_args(const Arguments& func_args, json manifest_args)
}
}

return std::string();
return {};
}

bool Everest::check_arg(ArgumentType arg_types, json manifest_arg) {
Expand Down Expand Up @@ -1081,6 +1072,27 @@ bool Everest::check_arg(ArgumentType arg_types, json manifest_arg) {
return true;
}

void Everest::check_external_mqtt() {
// check if external mqtt is enabled
if (!module_manifest.contains("enable_external_mqtt") && !module_manifest["enable_external_mqtt"]) {
EVLOG_AND_THROW(EverestApiError(fmt::format("Module {} tries to provide an external MQTT handler, but didn't "
"set 'enable_external_mqtt' to 'true' in its manifest",
config.printable_identifier(module_id))));
}
}

std::string Everest::check_external_mqtt(const std::string& topic) {
check_external_mqtt();
return fmt::format("{}{}", mqtt_external_prefix, topic);
}

UnsubscribeToken Everest::create_external_handler(const std::string& topic, const std::string& external_topic,
const StringPairHandler& handler) {
auto token = std::make_shared<TypedHandler>(HandlerType::ExternalMQTT, std::make_shared<Handler>(handler));
mqtt_abstraction.register_handler(external_topic, token, QOS::QOS0);
return [this, topic, token]() { this->mqtt_abstraction.unregister_handler(topic, token); };
}

std::optional<Mapping> get_impl_mapping(std::optional<ModuleTierMappings> module_tier_mappings,
const std::string& impl_id) {
if (not module_tier_mappings.has_value()) {
Expand Down
Loading

0 comments on commit 09fb55f

Please sign in to comment.