Skip to content

Commit

Permalink
Remove get_instance() logic
Browse files Browse the repository at this point in the history
- the classes Everest, MQTTAbstraction and MQTTAbstractionImpl are no
  longer singletons, which makes it possible to create multiple sessions
  to Everest within one process

Signed-off-by: aw <[email protected]>
  • Loading branch information
a-w50 committed May 17, 2023
1 parent 2f4e8a2 commit 33ceb8b
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 161 deletions.
58 changes: 30 additions & 28 deletions everestjs/everestjs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
namespace EverestJs {

struct EvModCtx {
EvModCtx(Everest::Everest& everest, const Everest::json& module_manifest, const Napi::Env& env) :
everest(everest),
EvModCtx(std::unique_ptr<Everest::Everest> everest_, const Everest::json& module_manifest, const Napi::Env& env) :
everest(std::move(everest_)),
module_manifest(module_manifest),
framework_ready_deferred(Napi::Promise::Deferred::New(env)),
framework_ready_flag{false} {
framework_ready_promise = Napi::Persistent(framework_ready_deferred.Promise());
};
Everest::Everest& everest;
std::unique_ptr<Everest::Everest> everest;
const Everest::json module_manifest;

const Napi::Promise::Deferred framework_ready_deferred;
Expand All @@ -56,7 +56,7 @@ static Napi::Value publish_var(const std::string& impl_id, const std::string& va
const auto& env = info.Env();

try {
ctx->everest.publish_var(impl_id, var_name, convertToJson(info[0]));
ctx->everest->publish_var(impl_id, var_name, convertToJson(info[0]));
} catch (std::exception& e) {
EVLOG_AND_RETHROW(env);
}
Expand Down Expand Up @@ -84,7 +84,7 @@ static Napi::Value setup_cmd_handler(const std::string& impl_id, const std::stri
cmd_handlers.insert({cmd_key, Napi::Persistent(handler)});
// FIXME (aw): in principle we could also pass this reference down to js_cb

ctx->everest.provide_cmd(impl_id, cmd_name, [cmd_key](Everest::json input) -> Everest::json {
ctx->everest->provide_cmd(impl_id, cmd_name, [cmd_key](Everest::json input) -> Everest::json {
Everest::json result;

ctx->js_cb->exec(
Expand Down Expand Up @@ -131,7 +131,7 @@ static Napi::Value set_var_subscription_handler(const Requirement& req, const st
var_subs.insert({sub_key, Napi::Persistent(handler)});

// FIXME (aw): in principle we could also pass this reference down to js_cb
ctx->everest.subscribe_var(req, var_name, [sub_key](Everest::json input) {
ctx->everest->subscribe_var(req, var_name, [sub_key](Everest::json input) {
ctx->js_cb->exec(
[&input, &sub_key](Napi::Env& env) {
const auto& arg = convertToNapiValue(env, input);
Expand All @@ -155,7 +155,7 @@ static Napi::Value signal_ready(const Napi::CallbackInfo& info) {
Napi::Value retval;

try {
ctx->everest.signal_ready();
ctx->everest->signal_ready();
retval = ctx->framework_ready_promise.Value();
} catch (std::exception& e) {
EVLOG_AND_RETHROW(env);
Expand Down Expand Up @@ -189,7 +189,7 @@ static Napi::Value mqtt_publish(const Napi::CallbackInfo& info) {
const auto& topic_alias = info[0].ToString().Utf8Value();
const auto& data = info[1].ToString().Utf8Value();

ctx->everest.external_mqtt_publish(topic_alias, data);
ctx->everest->external_mqtt_publish(topic_alias, data);
} catch (std::exception& e) {
EVLOG_AND_RETHROW(env);
}
Expand All @@ -215,7 +215,7 @@ static Napi::Value mqtt_subscribe(const Napi::CallbackInfo& info) {

ctx->mqtt_subscriptions.insert({topic_alias, Napi::Persistent(handler)});

ctx->everest.provide_external_mqtt_handler(topic_alias, [topic_alias](std::string data) {
ctx->everest->provide_external_mqtt_handler(topic_alias, [topic_alias](std::string data) {
ctx->js_cb->exec(
[&topic_alias, &data](Napi::Env& env) {
// in case we're not ready, the mod argument of the subscribe handler will be undefined, so no
Expand Down Expand Up @@ -247,11 +247,11 @@ static Napi::Value telemetry_publish(const Napi::CallbackInfo& info) {
if (length == 3) {
// assume it's category, subcategory, telemetry
auto telemetry = convertToTelemetryMap(info[2].As<Napi::Object>());
ctx->everest.telemetry_publish(category, subcategory, subcategory, telemetry);
ctx->everest->telemetry_publish(category, subcategory, subcategory, telemetry);
} else if (length == 4) {
// assume it's category, subcategory, type, telemetry
auto telemetry = convertToTelemetryMap(info[3].As<Napi::Object>());
ctx->everest.telemetry_publish(category, subcategory, type, telemetry);
ctx->everest->telemetry_publish(category, subcategory, type, telemetry);
}
}
} catch (std::exception& e) {
Expand All @@ -269,7 +269,7 @@ static Napi::Value call_cmd(const Requirement& req, const std::string& cmd_name,

try {
const auto& argument = convertToJson(info[0]);
const auto& retval = ctx->everest.call_cmd(req, cmd_name, argument);
const auto& retval = ctx->everest->call_cmd(req, cmd_name, argument);

cmd_result = convertToNapiValue(info.Env(), retval["retval"]);
} catch (std::exception& e) {
Expand Down Expand Up @@ -325,20 +325,6 @@ static Napi::Value boot_module(const Napi::CallbackInfo& info) {

Everest::Logging::update_process_name(module_identifier);

// connect to mqtt server and start mqtt mainloop thread
ctx = new EvModCtx(Everest::Everest::get_instance(module_id, *config, validate_schema, rs.mqtt_broker_host,
rs.mqtt_broker_port, rs.mqtt_everest_prefix,
rs.mqtt_external_prefix, rs.telemetry_prefix,
rs.telemetry_enabled),
module_manifest, env);
ctx->everest.connect();

ctx->js_module_ref = Napi::Persistent(module_this);

ctx->everest.spawn_main_loop_thread();

ctx->js_cb = std::make_unique<JsExecCtx>(env, callback_wrapper);

//
// fill in everything we know about the module
//
Expand Down Expand Up @@ -570,11 +556,27 @@ static Napi::Value boot_module(const Napi::CallbackInfo& info) {

// set telemetry_enabled
module_info_prop.DefineProperty(Napi::PropertyDescriptor::Value(
"telemetry_enabled", Napi::Boolean::New(env, ctx->everest.is_telemetry_enabled()), napi_enumerable));
"telemetry_enabled", Napi::Boolean::New(env, rs.telemetry_enabled), napi_enumerable));

module_this.DefineProperty(Napi::PropertyDescriptor::Value("info", module_info_prop, napi_enumerable));

ctx->everest.register_on_ready_handler(framework_ready_handler);
// connect to mqtt server and start mqtt mainloop thread
auto everest_handle = std::make_unique<Everest::Everest>(
module_id, *config, validate_schema, rs.mqtt_broker_host, rs.mqtt_broker_port, rs.mqtt_everest_prefix,
rs.mqtt_external_prefix, rs.telemetry_prefix, rs.telemetry_enabled);

ctx = new EvModCtx(std::move(everest_handle), module_manifest, env);

// FIXME (aw): passing the handle away and then still accessing it is bad design
// all of this should be solved, if we would have a module instance on the js side
ctx->everest->connect();

ctx->js_module_ref = Napi::Persistent(module_this);
ctx->js_cb = std::make_unique<JsExecCtx>(env, callback_wrapper);
ctx->everest->register_on_ready_handler(framework_ready_handler);

ctx->everest->spawn_main_loop_thread();

} catch (std::exception& e) {
EVLOG_AND_RETHROW(env);
}
Expand Down
20 changes: 10 additions & 10 deletions everestpy/everestpy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ int initialize(const std::string& prefix, const std::string& config_file, const
}
Everest::Logging::update_process_name(module_identifier);

Everest::Everest& everest =
Everest::Everest::get_instance(module_id, config, rs.validate_schema, rs.mqtt_broker_host,
rs.mqtt_broker_port, rs.mqtt_everest_prefix, rs.mqtt_external_prefix,
rs.telemetry_prefix, rs.telemetry_enabled);
// FIXME (aw): we do not want to have static here, but the module itself doesn't support any storage up to now
static auto everest =
Everest::Everest(module_id, config, rs.validate_schema, rs.mqtt_broker_host, rs.mqtt_broker_port,
rs.mqtt_everest_prefix, rs.mqtt_external_prefix, rs.telemetry_prefix, rs.telemetry_enabled);

EVLOG_info << fmt::format("Initializing module {}...", module_identifier);

Expand All @@ -156,7 +156,7 @@ int initialize(const std::string& prefix, const std::string& config_file, const
if (impl_intf.contains("vars")) {
for (const auto& var_entry : impl_intf["vars"].items()) {
const auto& var_name = var_entry.key();
reqs.pub_vars[impl_id][var_name] = [&everest, impl_id, var_name](json json_value) {
reqs.pub_vars[impl_id][var_name] = [impl_id, var_name](json json_value) {
everest.publish_var(impl_id, var_name, json_value);
};
}
Expand Down Expand Up @@ -207,14 +207,14 @@ int initialize(const std::string& prefix, const std::string& config_file, const

for (auto var_name : requirement_vars) {
reqs.vars[requirement_id][requirement_module_id][var_name] =
[&everest, requirement_id, i, var_name](std::function<void(json json_value)> callback) {
[requirement_id, i, var_name](std::function<void(json json_value)> callback) {
everest.subscribe_var({requirement_id, i}, var_name, callback);
};
}

for (auto const& cmd_name : requirement_cmds) {
reqs.call_cmds[requirement_id][requirement_module_id][cmd_name] = {
[&everest, requirement_id, i, cmd_name](json parameters) {
[requirement_id, i, cmd_name](json parameters) {
return everest.call_cmd({requirement_id, i}, cmd_name, parameters);
},
requirement_impl_intf.at("cmds").at(cmd_name).at("arguments")};
Expand All @@ -230,15 +230,15 @@ int initialize(const std::string& prefix, const std::string& config_file, const

Everest::ModuleAdapter module_adapter;

module_adapter.call = [&everest](const Requirement& req, const std::string& cmd_name, Parameters args) {
module_adapter.call = [](const Requirement& req, const std::string& cmd_name, Parameters args) {
return everest.call_cmd(req, cmd_name, args);
};

module_adapter.publish = [&everest](const std::string& param1, const std::string& param2, Value param3) {
module_adapter.publish = [](const std::string& param1, const std::string& param2, Value param3) {
return everest.publish_var(param1, param2, param3);
};

module_adapter.subscribe = [&everest](const Requirement& req, const std::string& var_name,
module_adapter.subscribe = [](const Requirement& req, const std::string& var_name,
const ValueCallback& callback) {
return everest.subscribe_var(req, var_name, callback);
};
Expand Down
76 changes: 32 additions & 44 deletions include/framework/everest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,41 +35,17 @@ using TelemetryMap = std::map<std::string, TelemetryEntry>;
/// \brief Contains the EVerest framework that provides convenience functionality for implementing EVerest modules
///
class Everest {

private:
MQTTAbstraction& mqtt_abstraction;
Config config;
std::string module_id;
std::map<std::string, std::set<std::string>> registered_cmds;
bool ready_received;
std::chrono::seconds remote_cmd_res_timeout;
bool validate_data_with_schema;
std::unique_ptr<std::function<void()>> on_ready;
std::thread heartbeat_thread;
std::string module_name;
std::future<void> main_loop_end{};
json module_manifest;
json module_classes;
std::string mqtt_everest_prefix;
std::string mqtt_external_prefix;
std::string telemetry_prefix;
boost::optional<TelemetryConfig> telemetry_config;
bool telemetry_enabled;

public:
Everest(std::string module_id, Config config, bool validate_data_with_schema,
const std::string& mqtt_server_address, int mqtt_server_port, const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix, const std::string& telemetry_prefix, bool telemetry_enabled);

void handle_ready(json data);

void heartbeat();

void publish_metadata();

static std::string check_args(const Arguments& func_args, json manifest_args);
static bool check_arg(ArgumentType arg_types, json manifest_arg);
// forbid copy assignment and copy construction
// NOTE (aw): move assignment and construction are also not supported because we're creating explicit references to
// our instance due to callback registration
Everest(Everest const&) = delete;
void operator=(Everest const&) = delete;

public:
json get_cmd_definition(const std::string& module_id, const std::string& impl_id, const std::string& cmd_name,
bool is_call);
json get_cmd_definition(const std::string& module_id, const std::string& impl_id, const std::string& cmd_name);
Expand Down Expand Up @@ -160,22 +136,34 @@ class Everest {
///
void register_on_ready_handler(const std::function<void()>& handler);

///
/// \returns the instance of the Everest singleton taking a \p module_id, the \p config, a \p mqtt_server_address
/// , \p mqtt_server_port , \p mqtt_everest_prefix and \p mqtt_external_prefix as parameters. If validation of data
/// with the known json schemas is needed this can be activated by setting \p validate_data_with_schema to true
static Everest& get_instance(std::string module_id, Config config, bool validate_data_with_schema,
const std::string& mqtt_server_address, int mqtt_server_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix,
const std::string& telemetry_prefix, bool telemetry_enabled) {
static Everest instance(module_id, config, validate_data_with_schema, mqtt_server_address, mqtt_server_port,
mqtt_everest_prefix, mqtt_external_prefix, telemetry_prefix, telemetry_enabled);
private:
MQTTAbstraction mqtt_abstraction;
Config config;
std::string module_id;
std::map<std::string, std::set<std::string>> registered_cmds;
bool ready_received;
std::chrono::seconds remote_cmd_res_timeout;
bool validate_data_with_schema;
std::unique_ptr<std::function<void()>> on_ready;
std::thread heartbeat_thread;
std::string module_name;
std::future<void> main_loop_end{};
json module_manifest;
json module_classes;
std::string mqtt_everest_prefix;
std::string mqtt_external_prefix;
std::string telemetry_prefix;
boost::optional<TelemetryConfig> telemetry_config;
bool telemetry_enabled;

return instance;
}
void handle_ready(json data);

Everest(Everest const&) = delete;
void operator=(Everest const&) = delete;
void heartbeat();

void publish_metadata();

static std::string check_args(const Arguments& func_args, json manifest_args);
static bool check_arg(ArgumentType arg_types, json manifest_arg);
};
} // namespace Everest

Expand Down
28 changes: 10 additions & 18 deletions include/utils/mqtt_abstraction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@
namespace Everest {
using json = nlohmann::json;

// forward declaration
class MQTTAbstractionImpl;

///
/// \brief Contains a C++ abstraction for using MQTT in EVerest modules
///
class MQTTAbstraction {

private:
public:
MQTTAbstraction(const std::string& mqtt_server_address, const std::string& mqtt_server_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix);
MQTTAbstractionImpl& mqtt_abstraction;

public:
// forbid copy assignment and copy construction
MQTTAbstraction(MQTTAbstraction const&) = delete;
void operator=(MQTTAbstraction const&) = delete;

~MQTTAbstraction();

///
/// \copydoc MQTTAbstractionImpl::connect()
bool connect();
Expand Down Expand Up @@ -73,20 +77,8 @@ class MQTTAbstraction {
/// \copydoc MQTTAbstractionImpl::unregister_handler(const std::string&, const Token&)
void unregister_handler(const std::string& topic, const Token& token);

///
/// \returns the instance of the MQTTAbstraction singleton taking a \p mqtt_server_address , \p mqtt_server_port ,
/// \p mqtt_everest_prefix and \p mqtt_external_prefix as parameters
static MQTTAbstraction& get_instance(const std::string& mqtt_server_address, const std::string& mqtt_server_port,
const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix) {
static MQTTAbstraction instance(mqtt_server_address, mqtt_server_port, mqtt_everest_prefix,
mqtt_external_prefix);

return instance;
}

MQTTAbstraction(MQTTAbstraction const&) = delete;
void operator=(MQTTAbstraction const&) = delete;
private:
std::unique_ptr<MQTTAbstractionImpl> mqtt_abstraction;
};
} // namespace Everest

Expand Down
Loading

0 comments on commit 33ceb8b

Please sign in to comment.