Skip to content

Commit

Permalink
Added test.
Browse files Browse the repository at this point in the history
  • Loading branch information
redboltz committed Oct 18, 2022
1 parent bb6ebdf commit bcc3dd5
Show file tree
Hide file tree
Showing 2 changed files with 529 additions and 116 deletions.
71 changes: 40 additions & 31 deletions include/mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
>* = nullptr
>
auto async_connect(CompletionToken&& token) {
return async_connect(any(), token);
return async_connect(any(), std::forward<CompletionToken>(token));
}

/**
Expand All @@ -1228,7 +1228,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
async_connect(
v5::properties{},
force_move(session_life_keeper),
token
std::forward<CompletionToken>(token)
);
}

Expand Down Expand Up @@ -1265,7 +1265,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
>* = nullptr
>
auto async_connect(v5::properties props, CompletionToken&& token) {
return async_connect(force_move(props), any(), token);
return async_connect(force_move(props), any(), std::forward<CompletionToken>(token));
}

/**
Expand Down Expand Up @@ -1344,7 +1344,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
async_connect(
force_move(socket),
v5::properties{}, any(),
token
std::forward<CompletionToken>(token)
);
}

Expand All @@ -1368,7 +1368,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
force_move(socket),
v5::properties{},
force_move(session_life_keeper),
token
std::forward<CompletionToken>(token)
);
}

Expand Down Expand Up @@ -1427,7 +1427,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
force_move(socket),
force_move(props),
any(),
token
std::forward<CompletionToken>(token)
);
}

Expand Down Expand Up @@ -1586,7 +1586,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
base::async_disconnect(
reason_code,
force_move(props),
token
std::forward<CompletionToken>(token)
);
}

Expand Down Expand Up @@ -1622,7 +1622,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
base::async_disconnect(
reason_code,
force_move(props),
token
std::forward<CompletionToken>(token)
);
}

Expand Down Expand Up @@ -1663,7 +1663,7 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
);
return
base::async_disconnect(
token
std::forward<CompletionToken>(token)
);
}

Expand Down Expand Up @@ -1962,9 +1962,11 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
) {
BOOST_ASSERT(state == initiate);
state = resolve;
resolver.async_resolve(
cl.host_,
cl.port_,
auto& a_cl{cl};
auto& a_resolver{resolver};
a_resolver.async_resolve(
a_cl.host_,
a_cl.port_,
force_move(self)
);
}
Expand All @@ -1989,13 +1991,14 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
async_start_session(force_move(self));
break;
#if defined(MQTT_USE_WS)
case tls_ws_handshake_tls:
case tls_ws_handshake_tls: {
state = tls_ws_handshake_ws;
auto& socket = *cl.socket_;
async_ws_handshake_socket(
*cl.socket_,
socket,
force_move(self)
);
break;
} break;
case tls_ws_handshake_ws:
async_start_session(force_move(self));
break;
Expand Down Expand Up @@ -2025,8 +2028,9 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
}
else {
state = connect;
auto& a_cl{cl};
as::async_connect(
cl.socket_->lowest_layer(), eps.begin(), eps.end(), force_move(self)
a_cl.socket_->lowest_layer(), eps.begin(), eps.end(), force_move(self)
);
}
}
Expand All @@ -2046,8 +2050,9 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
if (cl.ping_duration_ != std::chrono::steady_clock::duration::zero()) {
cl.set_timer();
}
auto& socket = *cl.socket_;
async_handshake_socket(
*cl.socket_,
socket,
force_move(self)
);
}
Expand All @@ -2067,9 +2072,10 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
ws_endpoint<as::ip::tcp::socket, Strand>& socket,
Self&& self) {
state = ws_handshake;
auto& a_cl{cl};
socket.async_handshake(
cl.host_,
cl.path_,
a_cl.host_,
a_cl.path_,
force_move(self)
);
}
Expand Down Expand Up @@ -2107,9 +2113,10 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
ws_endpoint<tls::stream<as::ip::tcp::socket>, Strand>& socket,
Self&& self) {
state = tls_ws_handshake_ws;
auto& a_cl{cl};
socket.async_handshake(
cl.host_,
cl.path_,
a_cl.host_,
a_cl.path_,
force_move(self)
);
}
Expand All @@ -2127,18 +2134,20 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
template <typename Self>
void async_start_session(Self&& self) {
state = complete;
cl.async_read_control_packet_type(force_move(session_life_keeper));
auto& a_cl{cl};
auto a_props = force_move(props);
a_cl.async_read_control_packet_type(force_move(session_life_keeper));
// sync base::connect() refer to parameters only in the function.
// So they can be passed as view.
cl.base_async_connect(
buffer(string_view(static_cast<base const&>(cl).get_client_id())),
(cl.user_name_ ? buffer(string_view(cl.user_name_.value()))
: optional<buffer>() ),
( cl.password_ ? buffer(string_view(cl.password_.value()))
: optional<buffer>() ),
cl.will_,
cl.keep_alive_sec_,
force_move(props),
a_cl.base_async_connect(
buffer(string_view(static_cast<base const&>(a_cl).get_client_id())),
(a_cl.user_name_ ? buffer(string_view(a_cl.user_name_.value()))
: optional<buffer>() ),
(a_cl.password_ ? buffer(string_view(a_cl.password_.value()))
: optional<buffer>() ),
a_cl.will_,
a_cl.keep_alive_sec_,
force_move(a_props),
force_move(self)
);
}
Expand Down
Loading

0 comments on commit bcc3dd5

Please sign in to comment.