diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4591abc..c02f273 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -15,7 +15,7 @@ jobs: os: - ubuntu-24.04 - ubuntu-22.04 - - ubuntu-20.04 + #- ubuntu-20.04 -> disabled during introduction of KDMqtt - windows-2022 - macos-13 - macos-14 @@ -26,6 +26,9 @@ jobs: - ON - OFF + env: + MOSQUITTO_VERSION: 2.0.20 + steps: - name: Checkout sources uses: actions/checkout@v4 @@ -43,7 +46,20 @@ jobs: sudo apt update -qq sudo apt install -y libxkbcommon-dev libxcb-xkb-dev \ libxkbcommon-x11-dev wayland-scanner++ wayland-protocols \ - libwayland-dev xvfb ninja-build + libwayland-dev xvfb ninja-build \ + libmosquitto-dev + + - name: Download mosquitto (MacOS) + if: runner.os == 'macOS' + run: | + brew install mosquitto + + - name: Download mosquitto (Windows) + if: runner.os == 'Windows' + run: | + curl --no-progress-meter --location --remote-name ` + https://mosquitto.org/files/binary/win64/mosquitto-$env:MOSQUITTO_VERSION-install-windows-x64.exe + & .\mosquitto-$env:MOSQUITTO_VERSION-install-windows-x64.exe /S - name: Configure project run: > diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index 49735ad..f9b9857 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -42,7 +42,7 @@ jobs: sudo apt update -qq sudo apt install -y libxkbcommon-dev libxcb-xkb-dev \ libxkbcommon-x11-dev wayland-scanner++ wayland-protocols \ - libwayland-dev xvfb ninja-build cppcheck + libwayland-dev xvfb ninja-build cppcheck libmosquitto-dev - name: Configure project run: > diff --git a/CMakeLists.txt b/CMakeLists.txt index 91a1991..425de70 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,12 @@ if(ANDROID) else() option(KDUTILS_BUILD_TESTS "Build the tests" ON) option(BUILD_SHARED_LIBS "Build shared libraries" ON) +endif() +if(LINUX) + option(KDUTILS_BUILD_MQTT_SUPPORT "Build KDMqtt" ON) +else() + option(KDUTILS_BUILD_MQTT_SUPPORT "Build KDMqtt" OFF) endif() list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake") @@ -61,6 +66,10 @@ add_subdirectory(src/KDUtils) add_subdirectory(src/KDFoundation) add_subdirectory(src/KDGui) +if(KDUTILS_BUILD_MQTT_SUPPORT) + add_subdirectory(src/KDMqtt) +endif() + if(KDUTILS_BUILD_EXAMPLES) add_subdirectory(examples) endif() diff --git a/README.md b/README.md index b5b72d9..d40b6a7 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,21 @@ You get: - Keyboard and mouse events - GuiApplication +## KDMqtt + +KDMqtt provides a cross-platform MQTT client solution, based on [mosquitto](https://mosquitto.org/). + +You get a thin wrapper around libmosquitto that + +* seamlessly integrates into KDFoundation's `CoreAppliation` +* comes with an API featuring properties and signals using [KDBindings](https://github.com/KDAB/KDBindings) + +For now, we don't support building mosquitto from source so you need to have it installed on your machine. + +You can find setup instructions and download links at: https://mosquitto.org/download/ + +You may also want to have a look at the [CI configuration](https://github.com/KDAB/KDUtils/blob/main/.github/workflows/build.yml) of this project to see which dependencies we use to build KDMqtt. + ## KDUtils/KDUtils This namespace is the namesake of the entire library, and contains the STL @@ -52,7 +67,7 @@ You get: On Linux: ```bash -sudo apt install libxkbcommon-dev libxcb-xkb-dev libxkbcommon-x11-dev wayland-scanner++ wayland-protocols libwayland-dev +sudo apt install libxkbcommon-dev libxcb-xkb-dev libxkbcommon-x11-dev wayland-scanner++ wayland-protocols libwayland-dev libmosquitto-dev ``` For debug builds with code coverage: diff --git a/cmake/FindMosquitto.cmake b/cmake/FindMosquitto.cmake new file mode 100644 index 0000000..e71dec0 --- /dev/null +++ b/cmake/FindMosquitto.cmake @@ -0,0 +1,76 @@ +# This file is part of KDUtils. +# +# SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company +# Author: Marco Thaller +# +# SPDX-License-Identifier: MIT +# +# Contact KDAB at for commercial licensing options. +# + +find_file( + MOSQUITTO_HEADER + NAMES mosquitto.h + PATHS /usr/include + /usr/local/include + /usr/local/opt/mosquitto/include + $ENV{PROGRAMFILES}/mosquitto/devel + $ENV{PROGRAMFILES\(X86\)}/mosquitto/devel +) + +if(APPLE OR UNIX) + find_library( + MOSQUITTO_LIBRARY + NAMES mosquitto + PATHS /usr/lib /usr/local/lib /usr/local/opt/mosquitto/lib + ) + + if(MOSQUITTO_HEADER AND MOSQUITTO_LIBRARY) + set(Mosquitto_FOUND TRUE) + endif() +endif() + +if(WIN32) + find_file( + MOSQUITTO_DLL + NAMES mosquitto.dll + PATHS $ENV{PROGRAMFILES}/mosquitto $ENV{PROGRAMFILES\(X86\)}/mosquitto + ) + + find_library( + MOSQUITTO_LIBRARY + NAMES mosquitto + PATHS $ENV{PROGRAMFILES}/mosquitto/devel $ENV{PROGRAMFILES\(X86\)}/mosquitto/devel + ) + + file(GLOB MOSQUITTO_RUNTIME_DLLS "$ENV{PROGRAMFILES}/mosquitto/*.dll" "$ENV{PROGRAMFILES\(X86\)}/mosquitto/*.dll") + + if(MOSQUITTO_HEADER + AND MOSQUITTO_DLL + AND MOSQUITTO_LIBRARY + ) + set(Mosquitto_FOUND TRUE) + endif() +endif() + +if(Mosquitto_FOUND) + cmake_path(GET MOSQUITTO_HEADER PARENT_PATH MOSQUITTO_INCLUDE_DIRECTORY) + + add_library(Mosquitto::Mosquitto SHARED IMPORTED) + + if(APPLE OR UNIX) + set_target_properties( + Mosquitto::Mosquitto PROPERTIES IMPORTED_LOCATION "${MOSQUITTO_LIBRARY}" INTERFACE_INCLUDE_DIRECTORIES + "${MOSQUITTO_INCLUDE_DIRECTORY}" + ) + endif() + + if(WIN32) + set_target_properties( + Mosquitto::Mosquitto + PROPERTIES IMPORTED_IMPLIB "${MOSQUITTO_LIBRARY}" + IMPORTED_LOCATION "${MOSQUITTO_DLL}" + INTERFACE_INCLUDE_DIRECTORIES "${MOSQUITTO_INCLUDE_DIRECTORY}" + ) + endif() +endif() diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index f42d8e5..6356815 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -101,3 +101,8 @@ if(NOT TARGET mio::mio) ) fetchcontent_makeavailable(mio) endif() + +# mosquitto library +if(KDUTILS_BUILD_MQTT_SUPPORT) + find_package(Mosquitto REQUIRED) +endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 2fde198..c32b1f0 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -8,3 +8,7 @@ # add_subdirectory(gui_window) + +if(KDUTILS_BUILD_MQTT_SUPPORT) + add_subdirectory(mqtt_client) +endif() diff --git a/examples/mqtt_client/CMakeLists.txt b/examples/mqtt_client/CMakeLists.txt new file mode 100644 index 0000000..d50b483 --- /dev/null +++ b/examples/mqtt_client/CMakeLists.txt @@ -0,0 +1,34 @@ +# This file is part of KDUtils. +# +# SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company +# Author: Marco Thaller +# +# SPDX-License-Identifier: MIT +# +# Contact KDAB at for commercial licensing options. +# + +project(mqtt_client_example LANGUAGES CXX) + +add_executable( + ${PROJECT_NAME} + mqtt_client.cpp +) + +target_link_libraries( + ${PROJECT_NAME} KDMqtt +) + +if(WIN32) + # Deployment: On Windows, copy all DLLs from the mosquitto install directory next to the application binary so that they're found. + if(Mosquitto_FOUND AND MOSQUITTO_RUNTIME_DLLS) + foreach(MOSQUITTO_RUNTIME_DLL ${MOSQUITTO_RUNTIME_DLLS}) + add_custom_command( + TARGET ${PROJECT_NAME} + POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different "${MOSQUITTO_RUNTIME_DLL}" + $ + ) + endforeach() + endif() +endif() diff --git a/examples/mqtt_client/mqtt_client.cpp b/examples/mqtt_client/mqtt_client.cpp new file mode 100644 index 0000000..63a2c21 --- /dev/null +++ b/examples/mqtt_client/mqtt_client.cpp @@ -0,0 +1,54 @@ +/* + This file is part of KDUtils. + + SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company + Author: Marco Thaller + + SPDX-License-Identifier: MIT + + Contact KDAB at for commercial licensing options. +*/ +#include +#include + +using namespace KDFoundation; +using namespace KDMqtt; + +int main() +{ + const Url url("test.mosquitto.org"); + const std::string topic = "mytopic"; + const std::string payload = "Hello World!"; + + CoreApplication app; + + MqttLib::instance().init(); + MqttClient mqttClient("KDMqttClient", MqttClient::Option::CLEAN_SESSION | MqttClient::Option::VERBOSE); + + auto onMqttConnectionStateChanged = [&](const MqttClient::ConnectionState &connectionState) { + if (connectionState == MqttClient::ConnectionState::CONNECTED) { + mqttClient.subscribe(topic.c_str()); + } + }; + std::ignore = mqttClient.connectionState.valueChanged().connect(onMqttConnectionStateChanged); + + auto onMqttSubscriptionStateChanged = [&](const MqttClient::SubscriptionState &subscriptionState) { + if (subscriptionState == MqttClient::SubscriptionState::SUBSCRIBED) { + mqttClient.publish(nullptr, topic.c_str(), payload.length(), payload.c_str()); + } + }; + std::ignore = mqttClient.subscriptionState.valueChanged().connect(onMqttSubscriptionStateChanged); + + auto onMqttMessageReceived = [&](const MqttClient::Message message) { + const auto timestamp = std::time(nullptr); + const auto timestring = std::string(std::asctime(std::localtime(×tamp))); + const auto topic = message.topic; + const auto payload = std::string(message.payload.toStdString()); + spdlog::info("Received MQTT message. Topic: {}. Payload: {}", topic, payload); + }; + std::ignore = mqttClient.msgReceived.connect(onMqttMessageReceived); + + mqttClient.connect(url); + + app.exec(); +} diff --git a/src/KDMqtt/CMakeLists.txt b/src/KDMqtt/CMakeLists.txt new file mode 100644 index 0000000..17f8d06 --- /dev/null +++ b/src/KDMqtt/CMakeLists.txt @@ -0,0 +1,85 @@ +# This file is part of KDUtils. +# +# SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company +# Author: Marco Thaller +# +# SPDX-License-Identifier: MIT +# +# Contact KDAB at for commercial licensing options. +# + +set(SOURCES mqtt.cpp) + +set(HEADERS mosquitto_wrapper.h mqtt.h) + +add_library( + KDMqtt + ${SOURCES} ${HEADERS} +) + +add_library( + KDUtils::KDMqtt ALIAS KDMqtt +) + +target_link_libraries( + KDMqtt + PUBLIC KDUtils::KDFoundation + PUBLIC Mosquitto::Mosquitto +) + +target_include_directories( + KDMqtt + PRIVATE $ +) + +# +# Logging configuration +# +# Compile out some of the SPDLOG macros based on build type +target_compile_definitions( + KDMqtt PRIVATE SPDLOG_ACTIVE_LEVEL=$,SPDLOG_LEVEL_TRACE,SPDLOG_LEVEL_WARN> +) +target_link_libraries( + KDMqtt + PRIVATE spdlog::spdlog +) + +generate_export_header(KDMqtt BASE_NAME kdmqtt) +configure_file(${CMAKE_CURRENT_BINARY_DIR}/kdmqtt_export.h ${CMAKE_BINARY_DIR}/include/KDMqtt/kdmqtt_export.h) +install( + FILES ${CMAKE_CURRENT_BINARY_DIR}/kdmqtt_export.h + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/KDMqtt +) + +foreach(file ${HEADERS}) + get_filename_component(dir ${file} DIRECTORY) + install(FILES ${file} DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/KDMqtt/${dir}) +endforeach() + +set(project_config_in "${CMAKE_CURRENT_LIST_DIR}/cmake/KDMqttConfig.cmake.in") +set(project_config_out "${CMAKE_CURRENT_BINARY_DIR}/KDMqttConfig.cmake") + +install( + TARGETS KDMqtt + EXPORT KDMqtt + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} +) + +install( + EXPORT KDMqtt + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/KDMqtt + NAMESPACE KDUtils:: + FILE KDMqttConfigTargets.cmake +) +include(CMakePackageConfigHelpers) +configure_file("${project_config_in}" "${project_config_out}" @ONLY) +install( + FILES "${project_config_out}" + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/KDMqtt +) +install( + FILES ${HEADERS} + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/KDMqtt +) diff --git a/src/KDMqtt/cmake/KDMqttConfig.cmake.in b/src/KDMqtt/cmake/KDMqttConfig.cmake.in new file mode 100644 index 0000000..046531a --- /dev/null +++ b/src/KDMqtt/cmake/KDMqttConfig.cmake.in @@ -0,0 +1,15 @@ +# This file is part of KDUtils. +# +# SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company +# Author: Marco Thaller +# +# SPDX-License-Identifier: MIT +# +# Contact KDAB at for commercial licensing options. +# + +include(CMakeFindDependencyMacro) +find_dependency(KDFoundation) +find_dependency(mosquitto) + +include("${CMAKE_CURRENT_LIST_DIR}/KDMqttConfigTargets.cmake") diff --git a/src/KDMqtt/kdmqtt_global.h b/src/KDMqtt/kdmqtt_global.h new file mode 100644 index 0000000..c31f947 --- /dev/null +++ b/src/KDMqtt/kdmqtt_global.h @@ -0,0 +1,16 @@ +/* + This file is part of KDUtils. + + SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company + Author: Marco Thaller + + SPDX-License-Identifier: MIT + + Contact KDAB at for commercial licensing options. +*/ + +#pragma once + +#include + +#define KDMQTT_API KDMQTT_EXPORT diff --git a/src/KDMqtt/mosquitto_wrapper.h b/src/KDMqtt/mosquitto_wrapper.h new file mode 100644 index 0000000..beaebb5 --- /dev/null +++ b/src/KDMqtt/mosquitto_wrapper.h @@ -0,0 +1,247 @@ +/* + This file is part of KDUtils. + + SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company + Author: Marco Thaller + + SPDX-License-Identifier: MIT + + Contact KDAB at for commercial licensing options. + + + This file implements C++ wrappers around the mosquitto C library. + + The classes implemented in this file differ from mosquittopp, + a wrapper class maintained in the Eclipse mosquitto repository, + in respect to the following aspects: + - library and client functions are separated into individual classes + - all functions are virtual for the purpose of convenient mocking + - some functions wrapped by mosquittopp are not wrapped here [1] + - some functions wrapped here are not wrapped by mosquittopp [1] + - mosquitto event callbacks are exposed as KDBindings::Signals + + [1] as of January 2024 + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace KDMqtt { + +/* + * This is a C++ class wrapping library specific functions + * of the mosquitto C library. + */ +class MosquittoLib +{ +private: + MosquittoLib() = default; + ~MosquittoLib() = default; + + MosquittoLib(const MosquittoLib &) = delete; + MosquittoLib &operator=(const MosquittoLib &) = delete; + +public: + static MosquittoLib &instance() + { + static MosquittoLib instance; + return instance; + } + + virtual int init() + { + return mosquitto_lib_init(); + } + + virtual int cleanup() + { + return mosquitto_lib_cleanup(); + } + + virtual int version(int *major, int *minor, int *revision) const + { + return mosquitto_lib_version(major, minor, revision); + } + + [[nodiscard]] virtual std::string_view connackString(int connack_code) const + { + return mosquitto_connack_string(connack_code); + } + + [[nodiscard]] virtual std::string_view errorString(int mosq_errno) const + { + return mosquitto_strerror(mosq_errno); + } + + [[nodiscard]] virtual std::string_view reasonString(int reason_code) const + { + return mosquitto_reason_string(reason_code); + } + + [[nodiscard]] virtual bool isValidTopicNameForSubscription(const std::string &topic) const + { + const auto result = mosquitto_sub_topic_check(topic.c_str()); + return (result == MOSQ_ERR_SUCCESS); + } +}; + +/* + * This is a C++ class wrapping client specific functions + * of the mosquitto C library. + */ +class MosquittoClient +{ +public: + MosquittoClient(const std::string &clientId, bool clean_session = true) + { + m_clientInstance = mosquitto_new(clientId.c_str(), clean_session, this); + + mosquitto_connect_callback_set(m_clientInstance, onConnect); + mosquitto_disconnect_callback_set(m_clientInstance, onDisconnect); + mosquitto_publish_callback_set(m_clientInstance, onPublish); + mosquitto_message_callback_set(m_clientInstance, onMessage); + mosquitto_subscribe_callback_set(m_clientInstance, onSubscribe); + mosquitto_unsubscribe_callback_set(m_clientInstance, onUnsubscribe); + mosquitto_log_callback_set(m_clientInstance, onLog); + } + + ~MosquittoClient() + { + mosquitto_destroy(m_clientInstance); + } + + virtual int connect(const std::string &host, int port, int keepalive) + { + // TODO_MT -> async or not? + return mosquitto_connect_async(m_clientInstance, host.c_str(), port, keepalive); + // return mosquitto_connect(m_clientInstance, host.c_str(), port, keepalive); + } + + virtual int disconnect() + { + return mosquitto_disconnect(m_clientInstance); + } + + virtual int publish(int *msg_id, const std::string &topic, int payloadlen = 0, const void *payload = nullptr, int qos = 0, bool retain = false) + { + return mosquitto_publish(m_clientInstance, msg_id, topic.c_str(), payloadlen, payload, qos, retain); + } + + virtual int subscribe(int *msg_id, const std::string &sub, int qos = 0) + { + return mosquitto_subscribe(m_clientInstance, msg_id, sub.c_str(), qos); + } + + virtual int unsubscribe(int *msg_id, const std::string &sub) + { + return mosquitto_unsubscribe(m_clientInstance, msg_id, sub.c_str()); + } + + virtual int loopMisc() + { + return mosquitto_loop_misc(m_clientInstance); + } + + virtual int loopRead(int max_packets = 1) + { + return mosquitto_loop_read(m_clientInstance, max_packets); + } + + virtual int loopWrite(int max_packets = 1) + { + return mosquitto_loop_write(m_clientInstance, max_packets); + } + + virtual int socket() + { + return mosquitto_socket(m_clientInstance); + } + + virtual bool wantWrite() + { + return mosquitto_want_write(m_clientInstance); + } + + virtual void *sslGet() + { + return mosquitto_ssl_get(m_clientInstance); + } + + virtual int tlsSet(std::optional cafile, std::optional capath, std::optional certfile, std::optional keyfile, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata) = nullptr) + { + return mosquitto_tls_set(m_clientInstance, cafile ? cafile->c_str() : nullptr, capath ? capath->c_str() : nullptr, certfile ? certfile->c_str() : nullptr, keyfile ? keyfile->c_str() : nullptr, pw_callback); + } + + // NOTE: on Windows, OpenSSL used by mosquitto doesn't use the system store by default + virtual int tlsEnableUseOsCertificates() + { + return mosquitto_int_option(m_clientInstance, MOSQ_OPT_TLS_USE_OS_CERTS, 1); + } + + virtual int tlsDisableUseOsCertificates() + { + return mosquitto_int_option(m_clientInstance, MOSQ_OPT_TLS_USE_OS_CERTS, 0); + } + + virtual int usernamePasswordSet(const std::string &username, const std::string &password) + { + return mosquitto_username_pw_set(m_clientInstance, username.c_str(), password.c_str()); + } + + virtual int willSet(const std::string &topic, int payloadlen = 0, const void *payload = nullptr, int qos = 0, bool retain = false) + { + return mosquitto_will_set(m_clientInstance, topic.c_str(), payloadlen, payload, qos, retain); + } + + KDBindings::Signal connected; + KDBindings::Signal disconnected; + KDBindings::Signal published; + KDBindings::Signal message; + KDBindings::Signal subscribed; + KDBindings::Signal unsubscribed; + KDBindings::Signal log; + KDBindings::Signal<> error; + +private: + static void onConnect([[maybe_unused]] struct mosquitto *client, void *self, int connackCode) + { + static_cast(self)->connected.emit(connackCode); + } + static void onDisconnect([[maybe_unused]] struct mosquitto *client, void *self, int reasonCode) + { + static_cast(self)->disconnected.emit(reasonCode); + } + static void onPublish([[maybe_unused]] struct mosquitto *client, void *self, int msgId) + { + static_cast(self)->published.emit(msgId); + } + static void onMessage([[maybe_unused]] struct mosquitto *client, void *self, const struct mosquitto_message *message) + { + static_cast(self)->message.emit(message); + } + static void onSubscribe([[maybe_unused]] struct mosquitto *client, void *self, int msg_id, int qos_count, const int *granted_qos) + { + static_cast(self)->subscribed.emit(msg_id, qos_count, granted_qos); + } + static void onUnsubscribe([[maybe_unused]] struct mosquitto *client, void *self, int msg_id) + { + static_cast(self)->unsubscribed.emit(msg_id); + } + static void onLog([[maybe_unused]] struct mosquitto *client, void *self, int level, const char *str) + { + static_cast(self)->log.emit(level, str); + } + static void onError([[maybe_unused]] struct mosquitto *client, void *self) + { + static_cast(self)->error.emit(); + } + + struct mosquitto *m_clientInstance; +}; + +} // namespace KDMqtt diff --git a/src/KDMqtt/mqtt.cpp b/src/KDMqtt/mqtt.cpp new file mode 100644 index 0000000..78cbb76 --- /dev/null +++ b/src/KDMqtt/mqtt.cpp @@ -0,0 +1,548 @@ +/* + This file is part of KDUtils. + + SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company + Author: Marco Thaller + + SPDX-License-Identifier: MIT + + Contact KDAB at for commercial licensing options. +*/ +#include "mqtt.h" +#include +#include + +namespace KDMqtt { + +constexpr std::chrono::duration c_miscTaskInterval = std::chrono::seconds(1); + +using namespace KDFoundation; + +MqttLib::MqttLib() + : m_isInitialized{ false } + , m_mosquittoLib(&MosquittoLib::instance()) +{ +} + +MqttLib::~MqttLib() +{ + cleanup(); +} + +MqttLib &MqttLib::instance() +{ + static MqttLib s_instance; + return s_instance; +} + +int MqttLib::init() +{ + int result = MOSQ_ERR_UNKNOWN; + + if (!m_isInitialized) { + result = m_mosquittoLib->init(); + const auto hasError = checkMosquittoResultAndDoDebugPrints(result, "MqttLib::init()"); + m_isInitialized = !hasError; + if (m_isInitialized) { + int major, minor, revision = 0; + version(&major, &minor, &revision); + spdlog::info("MqttLib::init() - using libmosquitto v{}.{}.{}", major, minor, revision); + } + } else { + spdlog::warn("MqttLib::init() - Library is already initialized."); + } + return result; +} + +int MqttLib::cleanup() +{ + const auto result = m_mosquittoLib->cleanup(); + const auto hasError = checkMosquittoResultAndDoDebugPrints(result, "MqttLib::cleanup()"); + m_isInitialized = hasError ? m_isInitialized : false; + return result; +} + +bool MqttLib::isInitialized() const +{ + return m_isInitialized; +} + +bool MqttLib::isValidTopicNameForSubscription(const std::string &topic) +{ + return m_mosquittoLib->isValidTopicNameForSubscription(topic); +} + +int MqttLib::version(int *major, int *minor, int *revision) +{ + return m_mosquittoLib->version(major, minor, revision); +} + +bool MqttLib::checkMosquittoResultAndDoDebugPrints(int result, std::string_view func) +{ + const auto isError = (result != MOSQ_ERR_SUCCESS); + if (isError) { + const auto funcString = func.empty() ? "mosquitto function" : func; + spdlog::error("{} - error: {}", funcString, errorString(result)); + } + return isError; +} + +std::string_view MqttLib::connackString(int connackCode) +{ + return m_mosquittoLib->connackString(connackCode); +} + +std::string_view MqttLib::errorString(int errorCode) +{ + return m_mosquittoLib->errorString(errorCode); +} + +std::string_view MqttLib::reasonString(int reasonCode) +{ + return m_mosquittoLib->reasonString(reasonCode); +} + +MqttClient::MqttClient(const std::string &clientId, Options options) + : m_verbose{ static_cast(options & VERBOSE) } +{ + if (!MqttLib::instance().isInitialized()) { + spdlog::error("MqttClient::MqttClient() - CTOR called before MqttLib::init(). Initialize lib before instantiating MqttClient object!"); + } + + auto *client = new MosquittoClient(clientId, static_cast(options & CLEAN_SESSION)); + m_mosquitto.init(client, this); + + m_eventLoopHook.init(c_miscTaskInterval, this); +} + +int MqttClient::setTls(std::optional cafile, bool useOsCertStore) +{ + spdlog::debug("MqttClient::setTls() - cafile: {}, useOsCertStore:{}", cafile ? cafile->path() : "none", useOsCertStore); + + if (connectionState.get() != ConnectionState::DISCONNECTED) { + spdlog::error("MqttClient::setTls() - Setting TLS is only allowed when disconnected."); + return MOSQ_ERR_UNKNOWN; + } + + if (cafile.has_value() && !cafile->exists()) { + spdlog::error("MqttClient::setTls() - Specified cafile does not exist."); + return MOSQ_ERR_UNKNOWN; + } + + if (!cafile.has_value() && !useOsCertStore) { + spdlog::error("MqttClient::setTls() - No cafile specified and use of OS cert store is disabled."); + return MOSQ_ERR_UNKNOWN; + } + + const auto caFilePath = cafile ? std::optional(cafile->path()) : std::nullopt; + auto result = m_mosquitto.client()->tlsSet(caFilePath, std::nullopt, std::nullopt, std::nullopt); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::setTls()"); + + if (result == MOSQ_ERR_SUCCESS && useOsCertStore) { + // NOTE: on Windows, OpenSSL used by mosquitto doesn't use the system store by default + result = m_mosquitto.client()->tlsEnableUseOsCertificates(); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::setTls()"); + } + + return result; +} + +int MqttClient::setUsernameAndPassword(const std::string &username, const std::string &password) +{ + spdlog::debug("MqttClient::setUsernameAndPassword()"); + + if (connectionState.get() != ConnectionState::DISCONNECTED) { + spdlog::error("MqttClient::setUsernameAndPassword() - Setting AUTH is only allowed when disconnected."); + return MOSQ_ERR_UNKNOWN; + } + + const auto result = m_mosquitto.client()->usernamePasswordSet(username, password); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::setUsernameAndPassword()"); + return result; +} + +int MqttClient::setWill(const std::string &topic, int payloadlen, const void *payload, int qos, bool retain) +{ + spdlog::debug("MqttClient::setWill() - topic:{}, qos:{}, retain:{})", topic, qos, retain); + + if (connectionState.get() != ConnectionState::DISCONNECTED) { + spdlog::error("MqttClient::setWill() - Setting will is only allowed when disconnected."); + return MOSQ_ERR_UNKNOWN; + } + + const auto result = m_mosquitto.client()->willSet(topic.c_str(), payloadlen, payload, qos, retain); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::setWill()"); + return result; +} + +int MqttClient::connect(const Url &host, int port, int keepalive) +{ + spdlog::debug("MqttClient::connect() - host:{}, port:{}, keepalive:{})", host.url(), port, keepalive); + + if (connectionState.get() == ConnectionState::CONNECTING) { + spdlog::error("MqttClient::connect() - Already connecting to host."); + return MOSQ_ERR_UNKNOWN; + } + + if (connectionState.get() == ConnectionState::CONNECTED) { + spdlog::error("MqttClient::connect() - Already connected to a host. Disconnect from current host first."); + return MOSQ_ERR_UNKNOWN; + } + + connectionState.set(ConnectionState::CONNECTING); + // TODO -> TLS only works with connect(), it does not work with connect_async() + // I'm uncertain if there is a setup that allows for connect_async() with TLS (I didn't manage to find one) + // (the use of non-blocking connect_async() would be preferred from our POV) + // other people seem to have encountered similar behaviour before, though this issue should have been fixed a while ago + // -> https://github.com/eclipse/mosquitto/issues/990 + const auto start = clock(); + const auto result = m_mosquitto.client()->connect(host.url().c_str(), port, keepalive); + const auto end = clock(); + const auto elapsedTimeMs = std::round((double(end - start) / double(CLOCKS_PER_SEC)) * 1000000.0); + spdlog::info("MqttClient::connect() - blocking call of MosquittoClient::connect() took {} µs", elapsedTimeMs); + + const auto hasError = MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::connect()"); + if (!hasError) { + m_eventLoopHook.engage(m_mosquitto.client()->socket()); + } + return result; +} + +int MqttClient::disconnect() +{ + spdlog::debug("MqttClient::disconnect()"); + + if (connectionState.get() == ConnectionState::DISCONNECTING) { + spdlog::error("MqttClient::disconnect() - Already disconnecting from host."); + return MOSQ_ERR_UNKNOWN; + } + + if (connectionState.get() == ConnectionState::DISCONNECTED) { + spdlog::error("MqttClient::disconnect() - Not connected to any host."); + return MOSQ_ERR_UNKNOWN; + } + + connectionState.set(ConnectionState::DISCONNECTING); + const auto result = m_mosquitto.client()->disconnect(); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::disconnect()"); + return result; +} + +int MqttClient::publish(int *msgId, const char *topic, int payloadlen, const void *payload, int qos, bool retain) +{ + spdlog::debug("MqttClient::publish() - topic:{}, qos:{}, retain:{}", topic, qos, retain); + + if (connectionState.get() == ConnectionState::DISCONNECTED) { + spdlog::error("MqttClient::publish() - Not connected to any host."); + return MOSQ_ERR_UNKNOWN; + } + + const auto result = m_mosquitto.client()->publish(msgId, topic, payloadlen, payload, qos, retain); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::publish()"); + return result; +} + +int MqttClient::subscribe(const char *pattern, int qos) +{ + spdlog::debug("MqttClient::subscribe() - subscribe pattern:{}, qos:{}", pattern, qos); + + if (connectionState.get() == ConnectionState::DISCONNECTED) { + spdlog::error("MqttClient::subscribe() - Not connected to any host."); + return MOSQ_ERR_UNKNOWN; + } + + int msgId; + const auto result = m_mosquitto.client()->subscribe(&msgId, pattern, qos); + const auto hasError = MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::subscribe()"); + if (!hasError) { + const auto topic = std::string(pattern); + m_subscriptionsRegistry.registerPendingRegistryOperation(topic, msgId); + subscriptionState.set(SubscriptionState::SUBSCRIBING); + } + return result; +} + +int MqttClient::unsubscribe(const char *pattern) +{ + spdlog::debug("MqttClient::unsubscribe() - unsubscribe pattern:{}", pattern); + + if (connectionState.get() == ConnectionState::DISCONNECTED) { + spdlog::error("MqttClient::unsubscribe() - Not connected to any host."); + return MOSQ_ERR_UNKNOWN; + } + + int msgId; + const auto result = m_mosquitto.client()->unsubscribe(&msgId, pattern); + const auto hasError = MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "MqttClient::unsubscribe()"); + if (!hasError) { + const auto topic = std::string(pattern); + m_subscriptionsRegistry.registerPendingRegistryOperation(topic, msgId); + subscriptionState.set(SubscriptionState::UNSUBSCRIBING); + } + return result; +} + +void MqttClient::onConnected(int connackCode) +{ + spdlog::debug("MqttClient::onConnected() - connackCode({}): {}", connackCode, MqttLib::instance().connackString(connackCode)); + const auto hasError = (connackCode != 0); + if (hasError) { + // TODO -> I'm uncertain if calling unhookFromEventLoop() here is perfectly fine in every case + // I noticed on_diconnect (sometimes) gets called after on_connect was called with CONNACK!=0 + // in this case we may want to stay hooked to the event loop until we're finally disconnected + // and on_disconnect is called. + // For now I won't call unhookFromEventLoop() here and see if we ever run into a sw-path were + // we never unhook from event loop. In this case we would need to add the following call here (only for certain connackCodes): + // unhookFromEventLoop(); + } + + if (m_verbose) { + const auto tlsIsEnabled = (m_mosquitto.client()->sslGet() != nullptr); + spdlog::info("MqttClient::onConnected() - This connection {} TLS encrypted", tlsIsEnabled ? "is" : "is not"); + } + + const auto state = hasError ? ConnectionState::DISCONNECTED : ConnectionState::CONNECTED; + connectionState.set(state); +} + +void MqttClient::onDisconnected(int reasonCode) +{ + spdlog::debug("MqttClient::onDisconnected() - reasonCode({}): {}", reasonCode, MqttLib::instance().reasonString(reasonCode)); + + m_eventLoopHook.disengage(); + + connectionState.set(ConnectionState::DISCONNECTED); +} + +void MqttClient::onPublished(int msgId) +{ + spdlog::debug("MqttClient::onPublished() - msgId:{}", msgId); + msgPublished.emit(msgId); +} + +void MqttClient::onMessage(const mosquitto_message *msg) +{ + spdlog::debug("MqttClient::onMessage() - message.id:{}, message.topic:{}", msg->mid, msg->topic); + + Message message{ + .msgId = msg->mid, + .topic = msg->topic, + .payload = ByteArray(static_cast(msg->payload), msg->payloadlen), + .qos = msg->qos, + .retain = msg->retain + }; + msgReceived.emit(std::move(message)); +} + +void MqttClient::onSubscribed(int msgId, int qosCount, const int *grantedQos) +{ + // we only handle subscriptions to one single topic with one single QOS value for now. + // in case mosquitto_subscribe_multiple is added to MosquittoClient some time in the future, + // add handling of multiple topic/QOS pairs here. + assert(qosCount == 1); + + const auto topic = m_subscriptionsRegistry.registerTopicSubscriptionAndReturnTopicName(msgId, grantedQos[0]); + spdlog::debug("MqttClient::onSubscribed() - msgId:{}, topic:{}, qosCount:{}, grantedQos:{}", msgId, topic, qosCount, grantedQos[0]); + + const auto state = m_subscriptionsRegistry.subscribedTopics().empty() ? SubscriptionState::UNSUBSCRIBED : SubscriptionState::SUBSCRIBED; + subscriptionState.set(state); + subscriptions.set(m_subscriptionsRegistry.subscribedTopics()); +} + +void MqttClient::onUnsubscribed(int msgId) +{ + const auto topic = m_subscriptionsRegistry.unregisterTopicSubscriptionAndReturnTopicName(msgId); + spdlog::debug("MqttClient::onUnsubscribed() - msgId:{}, topic:{}", msgId, topic); + + const auto state = m_subscriptionsRegistry.subscribedTopics().empty() ? SubscriptionState::UNSUBSCRIBED : SubscriptionState::SUBSCRIBED; + subscriptionState.set(state); + subscriptions.set(m_subscriptionsRegistry.subscribedTopics()); +} + +void MqttClient::onLog(int level, const char *str) const +{ + if (m_verbose) { + spdlog::info("MqttClient::onLog() - level:{}, string:{})", level, str); + } +} + +void MqttClient::onError() +{ + spdlog::error("MqttClient::onError()"); + error.emit(); +} + +void MqttClient::onReadOpRequested() +{ + auto result = m_mosquitto.client()->loopRead(); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "loopRead()"); +} + +void MqttClient::onWriteOpRequested() +{ + const auto writeOpIsPending = m_mosquitto.client()->wantWrite(); + if (!writeOpIsPending) { + return; + } + + auto result = m_mosquitto.client()->loopWrite(); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "loopWrite()"); +} + +void MqttClient::onMiscTaskRequested() +{ + auto result = m_mosquitto.client()->loopMisc(); + MqttLib::instance().checkMosquittoResultAndDoDebugPrints(result, "loopMisc()"); +} + +void MqttClient::EventLoopHook::init(const std::chrono::milliseconds miscTaskInterval, MqttClient *parent) +{ + spdlog::debug("MqttClient::EventLoopHook::init()"); + assert(parent != nullptr); + + this->parent = parent; + + miscTaskTimer = std::make_unique(); + miscTaskTimer->interval.set(miscTaskInterval); + miscTaskTimer->running.set(false); + std::ignore = miscTaskTimer->timeout.connect(&MqttClient::onMiscTaskRequested, parent); +} + +void MqttClient::EventLoopHook::engage(const int socket) +{ + spdlog::debug("MqttClient::EventLoopHook::engage()"); + + if (!isSetup()) { + spdlog::error("MqttClient::EventLoopHook::engage() - EventLoopHook is not initialized. Call MqttClient::EventLoopHook::init() first."); + return; + } + + if (isEngaged()) { + spdlog::error("MqttClient::EventLoopHook::engage() - Already engaged."); + return; + } + + if (socket < 0) { + spdlog::error("MqttClient::EventLoopHook::engage() - Invalid socket."); + return; + } + + readOpNotifier = std::make_unique(socket, FileDescriptorNotifier::NotificationType::Read); + writeOpNotifier = std::make_unique(socket, FileDescriptorNotifier::NotificationType::Write); + + std::ignore = readOpNotifier->triggered.connect(&MqttClient::onReadOpRequested, parent); + std::ignore = writeOpNotifier->triggered.connect(&MqttClient::onWriteOpRequested, parent); + + miscTaskTimer->running.set(true); +} + +void MqttClient::EventLoopHook::disengage() +{ + spdlog::debug("MqttClient::EventLoopHook::disengage()"); + + if (!isEngaged()) { + spdlog::error("MqttClient::EventLoopHook::disengage() - Already disengaged."); + return; + } + + miscTaskTimer->running.set(false); + + readOpNotifier->triggered.disconnectAll(); + writeOpNotifier->triggered.disconnectAll(); + + readOpNotifier = {}; + writeOpNotifier = {}; +} + +bool MqttClient::EventLoopHook::isSetup() const +{ + return (miscTaskTimer && (parent != nullptr)); +} + +bool MqttClient::EventLoopHook::isEngaged() const +{ + return (readOpNotifier && writeOpNotifier); +} + +void MqttClient::MosquittoClientDependency::init(MosquittoClient *client, MqttClient *parent) +{ + spdlog::debug("MqttClient::MosquittoClientDependency::init()"); + assert(parent != nullptr); + + delete mosquittoClient; + mosquittoClient = client; + + std::ignore = mosquittoClient->connected.connect(&MqttClient::onConnected, parent); + std::ignore = mosquittoClient->disconnected.connect(&MqttClient::onDisconnected, parent); + std::ignore = mosquittoClient->published.connect(&MqttClient::onPublished, parent); + std::ignore = mosquittoClient->message.connect(&MqttClient::onMessage, parent); + std::ignore = mosquittoClient->subscribed.connect(&MqttClient::onSubscribed, parent); + std::ignore = mosquittoClient->unsubscribed.connect(&MqttClient::onUnsubscribed, parent); + std::ignore = mosquittoClient->log.connect(&MqttClient::onLog, parent); + std::ignore = mosquittoClient->error.connect(&MqttClient::onError, parent); +} + +MosquittoClient *MqttClient::MosquittoClientDependency::client() +{ + return mosquittoClient; +} + +void MqttClient::SubscriptionsRegistry::registerPendingRegistryOperation(std::string_view topic, int msgId) +{ + spdlog::debug("MqttClient::SubscriptionsRegistry::registerPendingRegistryOperation() - topic:{}, msgId:{}", topic, msgId); + topicByMsgIdOfPendingOperations[msgId] = topic; +} + +std::string MqttClient::SubscriptionsRegistry::registerTopicSubscriptionAndReturnTopicName(int msgId, int grantedQos) +{ + spdlog::debug("MqttClient::SubscriptionsRegistry::registerTopicSubscriptionAndReturnTopicName() - msgId:{}, grantedQos:{}", msgId, grantedQos); + + auto it = topicByMsgIdOfPendingOperations.find(msgId); + if (it == topicByMsgIdOfPendingOperations.end()) { + spdlog::error("MqttClient::SubscriptionsRegistry::registerTopicSubscriptionAndReturnTopicName() - No pending operation with msgId: {}.", msgId); + return {}; + } + + auto topic = it->second; + topicByMsgIdOfPendingOperations.erase(it); + + qosByTopicOfActiveSubscriptions[topic] = grantedQos; + + return topic; +} + +std::string MqttClient::SubscriptionsRegistry::unregisterTopicSubscriptionAndReturnTopicName(int msgId) +{ + spdlog::debug("MqttClient::SubscriptionsRegistry::unregisterTopicSubscriptionAndReturnTopicName() - msgId:{}", msgId); + + auto it = topicByMsgIdOfPendingOperations.find(msgId); + if (it == topicByMsgIdOfPendingOperations.end()) { + spdlog::error("MqttClient::SubscriptionsRegistry::unregisterTopicSubscriptionAndReturnTopicName() - No pending operation with msgId: {}.", msgId); + return {}; + } + + const auto &topic = it->second; + topicByMsgIdOfPendingOperations.erase(it); + + qosByTopicOfActiveSubscriptions.erase(topic); + + return topic; +} + +std::vector MqttClient::SubscriptionsRegistry::subscribedTopics() const +{ + std::vector keys; + keys.reserve(qosByTopicOfActiveSubscriptions.size()); + for (const auto &pair : qosByTopicOfActiveSubscriptions) { + keys.push_back(pair.first); + } + std::sort(keys.begin(), keys.end()); + return keys; +} + +int MqttClient::SubscriptionsRegistry::grantedQosForTopic(const std::string &topic) const +{ + return qosByTopicOfActiveSubscriptions.at(topic); +} + +} // namespace KDMqtt diff --git a/src/KDMqtt/mqtt.h b/src/KDMqtt/mqtt.h new file mode 100644 index 0000000..daccb9f --- /dev/null +++ b/src/KDMqtt/mqtt.h @@ -0,0 +1,266 @@ +/* + This file is part of KDUtils. + + SPDX-FileCopyrightText: 2024 Klarälvdalens Datakonsult AB, a KDAB Group company + Author: Marco Thaller + + SPDX-License-Identifier: MIT + + Contact KDAB at for commercial licensing options. +*/ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace KDFoundation; +using namespace KDUtils; + +namespace KDMqtt { + +constexpr int c_defaultPort = 1883; +constexpr int c_defaultKeepAliveSeconds = 60; + +/* + * Class: IMqttLib + * + * This is an abstract class specifying a generic + * interface exposed to application / business logic + * to access MQTT library implementations. + */ +class KDMQTT_API IMqttLib +{ +protected: + IMqttLib() = default; + virtual ~IMqttLib() { } + +public: + IMqttLib(const IMqttLib &) = delete; + IMqttLib &operator=(const IMqttLib &) = delete; + + virtual int init() = 0; + virtual int cleanup() = 0; + + [[nodiscard]] virtual bool isInitialized() const = 0; + virtual bool isValidTopicNameForSubscription(const std::string &topic) = 0; +}; + +/* + * Class: MqttLib + * + * This class exposes mosquitto library functions to + * the application / business logic. + */ +class KDMQTT_API MqttLib : public IMqttLib +{ + friend class MqttClient; + friend class MqttUnitTestHarness; + +private: + MqttLib(); + ~MqttLib(); + +public: + static MqttLib &instance(); + + int init() override; + int cleanup() override; + + [[nodiscard]] bool isInitialized() const override; + bool isValidTopicNameForSubscription(const std::string &topic) override; + +protected: + int version(int *major, int *minor, int *revision); + + bool checkMosquittoResultAndDoDebugPrints(int result, std::string_view func = {}); + + std::string_view connackString(int connackCode); + std::string_view errorString(int errorCode); + std::string_view reasonString(int reasonCode); + +private: + MosquittoLib *m_mosquittoLib; + bool m_isInitialized; +}; + +/* + * Class: IMqttClient + * + * This is an abstract class specifying a generic + * interface exposed to application / business logic + * to access MQTT client implementations. + */ +class KDMQTT_API IMqttClient +{ +public: + enum class ConnectionState { + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED + }; + + enum class SubscriptionState { + SUBSCRIBING, + SUBSCRIBED, + UNSUBSCRIBING, + UNSUBSCRIBED + }; + + struct Message { + int msgId; + std::string topic; + ByteArray payload; + int qos; + bool retain; + }; + + KDBindings::Property connectionState{ ConnectionState::DISCONNECTED }; + KDBindings::Property subscriptionState{ SubscriptionState::UNSUBSCRIBED }; + + KDBindings::Property> subscriptions{}; + + KDBindings::Signal msgPublished; + KDBindings::Signal msgReceived; + + KDBindings::Signal<> error; + + virtual int setTls(std::optional cafile, bool useOsCertStore = true) = 0; + virtual int setUsernameAndPassword(const std::string &username, const std::string &password) = 0; + virtual int setWill(const std::string &topic, int payloadlen = 0, const void *payload = nullptr, int qos = 0, bool retain = false) = 0; + + virtual int connect(const Url &host, int port = c_defaultPort, int keepalive = c_defaultKeepAliveSeconds) = 0; + virtual int disconnect() = 0; + + virtual int publish(int *msgId, const char *topic, int payloadlen = 0, const void *payload = nullptr, int qos = 0, bool retain = false) = 0; + + virtual int subscribe(const char *pattern, int qos = 0) = 0; + virtual int unsubscribe(const char *pattern) = 0; +}; + +/* + * Class: MqttClient + * + * This class exposes mosquitto client functions to + * the application / business logic. + */ +class KDMQTT_API MqttClient : public IMqttClient +{ + friend class MqttUnitTestHarness; + +public: + enum Option { + CLEAN_SESSION = 0x00000001, + VERBOSE = 0x00000002 + }; + using Options = uint32_t; + + MqttClient(const std::string &clientId, Options options = CLEAN_SESSION); + ~MqttClient() = default; + + int setTls(std::optional cafile, bool useOsCertStore = true) override; + int setUsernameAndPassword(const std::string &username, const std::string &password) override; + int setWill(const std::string &topic, int payloadlen = 0, const void *payload = nullptr, int qos = 0, bool retain = false) override; + + int connect(const Url &host, int port = c_defaultPort, int keepalive = c_defaultKeepAliveSeconds) override; + int disconnect() override; + + int publish(int *msgId, const char *topic, int payloadlen = 0, const void *payload = nullptr, int qos = 0, bool retain = false) override; + + int subscribe(const char *pattern, int qos = 0) override; + int unsubscribe(const char *pattern) override; + +private: + bool m_verbose; + + /* + * Mosquitto client event handlers + */ + void onConnected(int connackCode); + void onDisconnected(int reasonCode); + void onPublished(int msgId); + void onMessage(const struct mosquitto_message *msg); + void onSubscribed(int msgId, int qosCount, const int *grantedQos); + void onUnsubscribed(int msgId); + void onLog(int level, const char *str) const; + void onError(); + + /* + * Event loop handlers + */ + void onReadOpRequested(); + void onWriteOpRequested(); + void onMiscTaskRequested(); + + /* + * This struct modularizes the mechanism to hook mosquitto's + * so called Network Loop to the application's event loop. + * This is done by monitoring the client's network socket + * using FileDescriptorNotifiers and having an additional + * timer to trigger cyclic misc tasks. + */ + struct EventLoopHook { + public: + void init(std::chrono::milliseconds miscTaskInterval, MqttClient *parent); + + void engage(int socket); + void disengage(); + + [[nodiscard]] bool isSetup() const; + [[nodiscard]] bool isEngaged() const; + + private: + std::unique_ptr readOpNotifier; + std::unique_ptr writeOpNotifier; + std::unique_ptr miscTaskTimer; + MqttClient *parent{ nullptr }; + }; + EventLoopHook m_eventLoopHook; + + /* + * This struct modularizes the dependency to the mosquitto + * library's client implementation. + * It owns the mosquitto client instance and is responsible + * for initializing MqttClient with the provided mosquitto + * client instance. + * This is also relevant when passing a mosquitto client mock + * for unit testing. + */ + struct MosquittoClientDependency { + public: + void init(MosquittoClient *client, MqttClient *parent); + MosquittoClient *client(); + + private: + MosquittoClient *mosquittoClient{ nullptr }; + }; + MosquittoClientDependency m_mosquitto; + + /* + * This struct modularizes the registry maintaining all + * subscriptions to MQTT topics this MqttClient has. + */ + struct SubscriptionsRegistry { + public: + void registerPendingRegistryOperation(std::string_view topic, int msgId); + std::string registerTopicSubscriptionAndReturnTopicName(int msgId, int grantedQos); + std::string unregisterTopicSubscriptionAndReturnTopicName(int msgId); + + std::vector subscribedTopics() const; + int grantedQosForTopic(const std::string &topic) const; + + private: + std::unordered_map qosByTopicOfActiveSubscriptions; + std::unordered_map topicByMsgIdOfPendingOperations; + }; + SubscriptionsRegistry m_subscriptionsRegistry; +}; + +} // namespace KDMqtt