Skip to content

Commit

Permalink
Fixed #313.
Browse files Browse the repository at this point in the history
Applied enable_shared_from_this to `client` similar as `basic_endpoint`.
  • Loading branch information
redboltz committed Jun 24, 2024
1 parent 77a7ba8 commit c1332b9
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 56 deletions.
20 changes: 10 additions & 10 deletions example/cl_cpp17_mqtt_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ using client_t = am::client<am::protocol_version::v5, am::protocol::mqtt>;

struct app {
app(as::any_io_executor exe, std::string_view host, std::string_view port)
: cli_{exe}
: cli_{client_t::create(exe)}
{
am::async_underlying_handshake(
cli_.next_layer(),
cli_->next_layer(),
host,
port,
[this](auto&&... args) {
Expand All @@ -45,7 +45,7 @@ struct app {
) {
std::cout << "underlying_handshake:" << ec.message() << std::endl;
if (ec) return;
cli_.async_start(
cli_->async_start(
true, // clean_start
std::uint16_t(0), // keep_alive
"", // Client Identifier, empty means generated by the broker
Expand All @@ -66,7 +66,7 @@ struct app {
if (ec) return;
if (connack_opt) {
std::cout << *connack_opt << std::endl;
cli_.async_publish(
cli_->async_publish(
"topic1",
"payload1",
am::qos::at_most_once,
Expand All @@ -76,8 +76,8 @@ struct app {
);
}
);
cli_.async_publish(
*cli_.acquire_unique_packet_id(), // sync version only works thread safe context
cli_->async_publish(
*cli_->acquire_unique_packet_id(), // sync version only works thread safe context
"topic2",
"payload2",
am::qos::at_least_once,
Expand All @@ -87,8 +87,8 @@ struct app {
);
}
);
cli_.async_publish(
*cli_.acquire_unique_packet_id(), // sync version only works thread safe context
cli_->async_publish(
*cli_->acquire_unique_packet_id(), // sync version only works thread safe context
"topic3",
"payload3",
am::qos::exactly_once,
Expand All @@ -115,11 +115,11 @@ struct app {
}
if (pubres.pubcomp_opt) {
std::cout << *pubres.pubcomp_opt << std::endl;
cli_.async_disconnect(as::detached);
cli_->async_disconnect(as::detached);
}
}

client_t cli_;
std::shared_ptr<client_t> cli_;
};

int main(int argc, char* argv[]) {
Expand Down
18 changes: 9 additions & 9 deletions example/cl_cpp17_mqtt_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ using client_t = am::client<am::protocol_version::v5, am::protocol::mqtt>;

struct app {
app(as::any_io_executor exe, std::string_view host, std::string_view port)
: cli_{exe}, host_{host}, port_{port}
: cli_{client_t::create(exe)}, host_{host}, port_{port}
{
connect();
}

private:
void connect() {
am::async_underlying_handshake(
cli_.next_layer(),
cli_->next_layer(),
host_,
port_,
[this](auto&&... args) {
Expand Down Expand Up @@ -63,7 +63,7 @@ struct app {
reconnect();
return;
}
cli_.async_start(
cli_->async_start(
true, // clean_start
std::uint16_t(0), // keep_alive
"", // Client Identifier, empty means generated by the broker
Expand Down Expand Up @@ -96,8 +96,8 @@ struct app {
{"topic2", am::qos::at_least_once},
{"topic3", am::qos::exactly_once},
};
cli_.async_subscribe(
*cli_.acquire_unique_packet_id(), // sync version only works thread safe context
cli_->async_subscribe(
*cli_->acquire_unique_packet_id(), // sync version only works thread safe context
am::force_move(sub_entry),
[this](auto&&... args) {
handle_subscribe_response(
Expand All @@ -119,7 +119,7 @@ struct app {
if (suback_opt) {
std::cout << *suback_opt << std::endl;
}
cli_.async_recv(
cli_->async_recv(
[this](auto&&... args) {
handle_recv(
std::forward<std::remove_reference_t<decltype(args)>>(args)...
Expand All @@ -140,7 +140,7 @@ struct app {
BOOST_ASSERT(pv);
std::cout << pv << std::endl;
// next receive
cli_.async_recv(
cli_->async_recv(
[this](auto&&... args) {
handle_recv(
std::forward<std::remove_reference_t<decltype(args)>>(args)...
Expand All @@ -149,10 +149,10 @@ struct app {
);
}

client_t cli_;
std::shared_ptr<client_t> cli_;
std::string host_;
std::string port_;
as::steady_timer tim_{cli_.get_executor()};
as::steady_timer tim_{cli_->get_executor()};
};

int main(int argc, char* argv[]) {
Expand Down
4 changes: 2 additions & 2 deletions example/cl_cpp20coro_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ int main(int argc, char* argv[]) {
return -1;
}
as::io_context ioc;
client_t amcl{ioc.get_executor()};
as::co_spawn(amcl.get_executor(), proc(amcl, argv[1], argv[2]), as::detached);
auto amcl = client_t::create(ioc.get_executor());
as::co_spawn(amcl->get_executor(), proc(*amcl, argv[1], argv[2]), as::detached);
ioc.run();
}
4 changes: 2 additions & 2 deletions example/cl_cpp20coro_mqtt_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ int main(int argc, char* argv[]) {
return -1;
}
as::io_context ioc;
awaitable_client amcl{ioc.get_executor()};
as::co_spawn(amcl.get_executor(), proc(amcl, argv[1], argv[2]), as::detached);
auto amcl = awaitable_client::create(ioc.get_executor());
as::co_spawn(amcl->get_executor(), proc(*amcl, argv[1], argv[2]), as::detached);
ioc.run();
}
4 changes: 2 additions & 2 deletions example/cl_cpp20coro_mqtt_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ int main(int argc, char* argv[]) {
return -1;
}
as::io_context ioc;
awaitable_client amcl{ioc.get_executor()};
as::co_spawn(amcl.get_executor(), proc(amcl, argv[1], argv[2]), as::detached);
auto amcl = awaitable_client::create(ioc.get_executor());
as::co_spawn(amcl->get_executor(), proc(*amcl, argv[1], argv[2]), as::detached);
ioc.run();
}
22 changes: 11 additions & 11 deletions example/footprint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@ int main() {

am::setup_log(am::severity_level::warning);
as::io_context ioc;
am::client<am::protocol_version::v5, am::protocol::mqtt> amcl{ioc.get_executor()};
auto amcl = am::client<am::protocol_version::v5, am::protocol::mqtt>::create(ioc.get_executor());
as::co_spawn(
amcl.get_executor(),
amcl->get_executor(),
[&] () -> as::awaitable<void> {
auto exe = co_await as::this_coro::executor;

// Resolve hostname
auto [ec_und] = co_await am::async_underlying_handshake(
amcl.next_layer(),
amcl->next_layer(),
"127.0.0.1",
"1883",
as::as_tuple(as::deferred)
);
if (ec_und) co_return;

// MQTT connect and receive loop start
auto [ec_con, connack_opt] = co_await amcl.async_start(
auto [ec_con, connack_opt] = co_await amcl->async_start(
true, // clean_session
std::uint16_t(0x1234), // keep_alive
"cid1",
Expand All @@ -54,8 +54,8 @@ int main() {
am::sub::retain_handling::send
},
};
auto [ec_sub, suback_opt] = co_await amcl.async_subscribe(
*amcl.acquire_unique_packet_id(), // sync version only in strand
auto [ec_sub, suback_opt] = co_await amcl->async_subscribe(
*amcl->acquire_unique_packet_id(), // sync version only in strand
am::force_move(sub_entry), // sub_entry variable is required to avoid g++ bug
as::as_tuple(as::use_awaitable)
);
Expand All @@ -64,29 +64,29 @@ int main() {

// publish
// MQTT publish QoS0 and wait response (socket write complete)
auto [ec_pub0, pubres0] = co_await amcl.async_publish(
auto [ec_pub0, pubres0] = co_await amcl->async_publish(
"topic1",
"payload1",
am::qos::at_most_once,
as::as_tuple(as::use_awaitable)
);
if (ec_pub0) co_return;

auto [ec_recv, pv] = co_await amcl.async_recv(
auto [ec_recv, pv] = co_await amcl->async_recv(
as::as_tuple(as::deferred)
);

// unsubscribe
auto [ec_unsub, unsuback_opt] = co_await amcl.async_unsubscribe(
*amcl.acquire_unique_packet_id(), // sync version only in strand
auto [ec_unsub, unsuback_opt] = co_await amcl->async_unsubscribe(
*amcl->acquire_unique_packet_id(), // sync version only in strand
std::vector<am::topic_sharename>{"topic1"},
as::as_tuple(as::use_awaitable)
);
if (ec_unsub) co_return;
if (!unsuback_opt) co_return;

// disconnect
auto [ec_discon] = co_await amcl.async_disconnect(
auto [ec_discon] = co_await amcl->async_disconnect(
as::as_tuple(as::use_awaitable)
);
if (ec_discon) co_return;
Expand Down
4 changes: 2 additions & 2 deletions example/separate_client/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ int main(int argc, char* argv[]) {
return -1;
}
as::io_context ioc;
client_t amcl{ioc.get_executor()};
as::co_spawn(amcl.get_executor(), proc(amcl, argv[1], argv[2]), as::detached);
auto amcl = client_t::create(ioc.get_executor());
as::co_spawn(amcl->get_executor(), proc(*amcl, argv[1], argv[2]), as::detached);
ioc.run();
}
55 changes: 39 additions & 16 deletions include/async_mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <async_mqtt/packet/packet_id_type.hpp>
#include <async_mqtt/endpoint_fwd.hpp>
#include <async_mqtt/detail/client_packet_type_getter.hpp>
#include <async_mqtt/util/make_shared_helper.hpp>

/**
* @defgroup client client (High level MQTT client)
Expand Down Expand Up @@ -78,8 +79,12 @@ namespace as = boost::asio;
* @tparam NextLayer Just next layer for basic_endpoint. mqtt, mqtts, ws, and wss are predefined.
*/
template <protocol_version Version, typename NextLayer>
class client {
class client : public std::enable_shared_from_this<client<Version, NextLayer>> {
using this_type = client<Version, NextLayer>;
using this_type_sp = std::shared_ptr<this_type>;

template <typename T>
friend class make_shared_helper;

public:
/// @brief type of endpoint
Expand Down Expand Up @@ -128,31 +133,22 @@ class client {
};

/**
* @brief constructor
* @brief create
* @tparam Args Types for the next layer
* @param args args for the next layer.
* - There are predefined next layer types:
* - protocol::mqtt
* - protocol::mqtts
* - protocol::ws
* - protocol::wss
* @return shared_ptr of client.
*/
template <typename... Args>
explicit
client(
static std::shared_ptr<this_type> create(
Args&&... args
);

/**
* @brief Rebinding constructor
* This constructor creates a client from the client with a different executor.
* @param other The other client to construct from.
*/
template <typename Other>
explicit
client(
client<Version, Other>&& other
);
) {
return make_shared_helper<this_type>::make_shared(std::forward<Args>(args)...);
}

/**
* @brief copy constructor **deleted**
Expand Down Expand Up @@ -729,6 +725,33 @@ class client {

private:

/**
* @brief constructor
* @tparam Args Types for the next layer
* @param args args for the next layer.
* - There are predefined next layer types:
* - protocol::mqtt
* - protocol::mqtts
* - protocol::ws
* - protocol::wss
*/
template <typename... Args>
explicit
client(
Args&&... args
);

/**
* @brief Rebinding constructor
* This constructor creates a client from the client with a different executor.
* @param other The other client to construct from.
*/
template <typename Other>
explicit
client(
client<Version, Other>&& other
);

template <
typename CompletionToken = as::default_completion_token_t<executor_type>
>
Expand Down
2 changes: 1 addition & 1 deletion include/async_mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ enum class filter {
};

template <role Role, std::size_t PacketIdBytes, typename NextLayer>
class basic_endpoint : public std::enable_shared_from_this<basic_endpoint<Role, PacketIdBytes, NextLayer>>{
class basic_endpoint : public std::enable_shared_from_this<basic_endpoint<Role, PacketIdBytes, NextLayer>> {
enum class connection_status {
connecting,
connected,
Expand Down
2 changes: 1 addition & 1 deletion include/async_mqtt/impl/client_impl.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ASYNC_MQTT_HEADER_ONLY_INLINE
void
client<Version, NextLayer>::recv_loop() {
ep_->async_recv(
[this]
[this, sp = this->shared_from_this()]
(error_code const& ec, packet_variant pv) mutable {
if (ec) {
recv_queue_.emplace_back(ec);
Expand Down
1 change: 1 addition & 0 deletions include/async_mqtt/impl/endpoint_recv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ recv_op {
std::set<control_packet_type> types = {};
std::optional<error_code> decided_error = std::nullopt;
enum { initiate, disconnect, close, read } state = initiate;
this_type_sp life_keeper = ep.shared_from_this();

template <typename Self>
void operator()(
Expand Down

0 comments on commit c1332b9

Please sign in to comment.