From d30fe4c2c3064f8ec9dc48380b84472f9626f96c Mon Sep 17 00:00:00 2001 From: Florian Reimold <11774314+FlorianReimold@users.noreply.github.com> Date: Mon, 26 Feb 2024 09:38:25 +0100 Subject: [PATCH] Added doc and test for thread-safety of join/leaveMulticastGroup (#11) --- tests/udpcap_test/src/udpcap_test.cpp | 160 +++++++++++++++++++++++++- udpcap/include/udpcap/udpcap_socket.h | 23 +++- 2 files changed, 177 insertions(+), 6 deletions(-) diff --git a/tests/udpcap_test/src/udpcap_test.cpp b/tests/udpcap_test/src/udpcap_test.cpp index a7fe88b..2c504cd 100644 --- a/tests/udpcap_test/src/udpcap_test.cpp +++ b/tests/udpcap_test/src/udpcap_test.cpp @@ -705,6 +705,7 @@ TEST(udpcap, MulticastReceive) receive_thread2.join(); } +// Create and destroy a bound many Udpcap sockets with a thread waiting for a datagram TEST(udpcap, ManySockets) { constexpr int num_udpcap_socket = 100; @@ -805,4 +806,161 @@ TEST(udpcap, ManySockets) // Join the send thread send_thread.join(); -} \ No newline at end of file +} + +// Create many Udpcap multicast sockets and join / leave multicast groups while receiving datagrams +TEST(udpcap, ManyMulticastSockets) +{ + constexpr int num_udpcap_socket = 10; + constexpr int num_test_loops = 5; + constexpr char* multicast_group_1 = "225.0.0.1"; + constexpr char* multicast_group_2 = "225.0.0.2"; + constexpr uint16_t port = 14000; + + // Create asio sockets to send datagrams to the multicast groups + asio::io_service io_service; + asio::ip::udp::socket asio_socket1(io_service, asio::ip::udp::v4()); + asio::ip::udp::socket asio_socket2(io_service, asio::ip::udp::v4()); + asio::ip::udp::endpoint endpoint1(asio::ip::make_address(multicast_group_1), port); + asio::ip::udp::endpoint endpoint2(asio::ip::make_address(multicast_group_2), port); + asio_socket1.set_option(asio::ip::multicast::hops(1)); + asio_socket2.set_option(asio::ip::multicast::hops(1)); + asio_socket1.set_option(asio::ip::multicast::enable_loopback(true)); + asio_socket2.set_option(asio::ip::multicast::enable_loopback(true)); + + asio_socket1.connect(endpoint1); + asio_socket2.connect(endpoint2); + + // Thread that constantly pushes datagrams via the asio sockets + std::thread send_thread1([&asio_socket1]() + { + std::string buffer_string = "Hello World"; + while (true) + { + asio::error_code ec; + asio_socket1.send(asio::buffer(buffer_string), 0, ec); + if (ec) + { + break; + } + } + }); + + std::thread send_thread2([&asio_socket2]() + { + std::string buffer_string = "Hello World"; + while (true) + { + asio::error_code ec; + asio_socket2.send(asio::buffer(buffer_string), 0, ec); + if (ec) + { + break; + } + } + }); + + // Create num_udpcap_socket udpcap sockets + std::vector udpcap_sockets; + std::vector receive_threads; + + // Reserve space for the sockets + udpcap_sockets.reserve(num_udpcap_socket); + + for (int i = 0; i < num_udpcap_socket; i++) + { + udpcap_sockets.emplace_back(); + ASSERT_TRUE(udpcap_sockets.back().isValid()); + const bool success = udpcap_sockets.back().bind(Udpcap::HostAddress::Any(), port); + ASSERT_TRUE(success); + udpcap_sockets.back().setMulticastLoopbackEnabled(true); + + // Create a receive thread that constantly receives datagrams + receive_threads.emplace_back([&udpcap_sockets, i, multicast_group_1, multicast_group_2]() + { + while (true) + { + // Initialize variables for the sender's address and port + Udpcap::HostAddress sender_address; + uint16_t sender_port(0); + Udpcap::Error error = Udpcap::Error::ErrorCode::GENERIC_ERROR; + + // Allocate buffer with max udp datagram size + std::vector received_datagram; + received_datagram.resize(65536); + + // blocking receive + const size_t received_bytes = udpcap_sockets[i].receiveDatagram(received_datagram.data(), received_datagram.size(), &sender_address, &sender_port, error); + received_datagram.resize(received_bytes); + + if (error) + { + // Indicates that somebody closed the socket + ASSERT_EQ(error, Udpcap::Error(Udpcap::Error::ErrorCode::SOCKET_CLOSED)); + + // Check that the socket is closed + ASSERT_TRUE(udpcap_sockets[i].isClosed()); + + break; + } + else + { + // Check if the received datagram is valid and contains "Hello World" + ASSERT_FALSE(received_datagram.empty()); + ASSERT_EQ(std::string(received_datagram.data(), received_datagram.size()), "Hello World"); + } + } + }); + } + + for (int i = 0; i < num_test_loops; i++) + { + // Join the multicast group 1 + for (auto& udpcap_socket : udpcap_sockets) + { + const bool success = udpcap_socket.joinMulticastGroup(Udpcap::HostAddress(multicast_group_1)); + ASSERT_TRUE(success); + } + + // Join the multicast group 2 + for (auto& udpcap_socket : udpcap_sockets) + { + const bool success = udpcap_socket.joinMulticastGroup(Udpcap::HostAddress(multicast_group_2)); + ASSERT_TRUE(success); + } + + // Leave the multicast group 1 + for (auto& udpcap_socket : udpcap_sockets) + { + const bool success = udpcap_socket.leaveMulticastGroup(Udpcap::HostAddress(multicast_group_1)); + ASSERT_TRUE(success); + } + + // Leave the multicast group 2 + for (auto& udpcap_socket : udpcap_sockets) + { + const bool success = udpcap_socket.leaveMulticastGroup(Udpcap::HostAddress(multicast_group_2)); + ASSERT_TRUE(success); + } + } + + // Close the sockets + for (auto& udpcap_socket : udpcap_sockets) + { + udpcap_socket.close(); + } + + // Join the threads + for (auto& receive_thread : receive_threads) + { + receive_thread.join(); + } + + // Close the asio sockets + asio_socket1.close(); + asio_socket2.close(); + + // Join the send threads + send_thread1.join(); + send_thread2.join(); +} diff --git a/udpcap/include/udpcap/udpcap_socket.h b/udpcap/include/udpcap/udpcap_socket.h index 205eb01..5b03cfb 100644 --- a/udpcap/include/udpcap/udpcap_socket.h +++ b/udpcap/include/udpcap/udpcap_socket.h @@ -66,7 +66,7 @@ namespace Udpcap * * Thread safety: * - There must only be 1 thread calling receiveDatagram() at the same time - * - It is safe to call close() while another thread is calling receiveDatagram() + * - It is safe to call close(), join and leave multicast groups while another thread is calling receiveDatagram() * - Other modifications to the socket must not be made while another thread is calling receiveDatagram() */ class UdpcapSocket @@ -152,9 +152,13 @@ namespace Udpcap * * Thread safety: * - This method must not be called from multiple threads at the same time - * - While one thread is calling this method, another thread may call close() - * - While one thread is calling this method, no modifications must be made to the socket (except close()) - * + * - While one thread is calling this method, another thread may call one (and only one) of the following functions: + * - close() + * - joinMulticastGroup() + * - leaveMulticastGroup() + * - setMulticastLoopbackEnabled() + * - While one thread is calling this method, no other modifications must be made to the socket + * * @param data [out]: The destination memory * @param max_len [in]: The maximum bytes available at the destination * @param timeout_ms [in]: Maximum time to wait for a datagram in ms. If -1, the method will block until a datagram is available @@ -195,6 +199,9 @@ namespace Udpcap * Joining a multicast group fails, when the Socket is invalid, not bound, * the given address is not a multicast address or this Socket has already * joined the group. + * + * Thread safety: + * - This function may be called while another thread is calling receiveDatagram() * * @param group_address: The multicast group to join * @@ -208,6 +215,9 @@ namespace Udpcap * Leaving a multicast group fails, when the Socket is invalid, not bound, * the given address is not a multicast address or this Socket has not * joined the group, yet. + * + * Thread safety: + * - This function may be called while another thread is calling receiveDatagram() * * @param group_address: The multicast group to leave * @@ -219,6 +229,9 @@ namespace Udpcap * @brief Sets whether local multicast traffic should be received * * If not set, the default value is true. + * + * Thread safety: + * - This function may be called while another thread is calling receiveDatagram() * * @param enables whether local multicast traffic should be received */ @@ -233,7 +246,7 @@ namespace Udpcap * @brief Closes the socket * * Thread safety: - * - It is safe to call this method while another thread is calling receiveDatagram() + * - This function may be called while another thread is calling receiveDatagram() */ UDPCAP_EXPORT void close();