diff --git a/include/spead2/send_stream.h b/include/spead2/send_stream.h index d038d7372..ec3f891b5 100644 --- a/include/spead2/send_stream.h +++ b/include/spead2/send_stream.h @@ -419,6 +419,34 @@ class stream s_item_pointer_t cnt = -1, std::size_t substream_index = 0); + /** + * Send @a h asynchronously, with an arbitrary completion token. This + * overload is not used if the completion token is convertible to + * @ref completion_handler. + * + * Refer to the other overload for details. The boolean return of the other + * overload is absent. You will need to retrieve the asynchronous result + * and check for @c boost::asio::error::would_block to determine if the + * heaps were rejected due to lack of buffer space. + */ + template + auto async_send_heap(const heap &h, CompletionToken &&token, + s_item_pointer_t cnt = -1, + std::enable_if_t< + !std::is_convertible_v, + std::size_t + > substream_index = 0) + { + auto init = [this, &h, cnt, substream_index](auto handler) + { + // Explicit this-> is to work around bogus warning from clang + this->async_send_heap(h, std::move(handler), cnt, substream_index); + }; + return boost::asio::async_initiate< + CompletionToken, void(const boost::system::error_code &, item_pointer_t) + >(init, token); + } + /** * Send a group of heaps asynchronously, with @a handler called on * completion. The caller must ensure that the @ref heap objects @@ -466,6 +494,43 @@ class stream return async_send_heaps_impl(first, last, std::move(handler), mode); } + /** + * Send a group of heaps asynchronously, with an arbitrary completion + * token (e.g., @c boost::asio::use_future). This overload is not used + * if the completion token is convertible to @ref completion_handler. + * + * Refer to the other overload for details. There are a few differences: + * + * - The boolean return of the other overload is absent. You will need to + * retrieve the asynchronous result and check for @c + * boost::asio::error::would_block to determine if the heaps were + * rejected due to lack of buffer space. + * - Depending on the completion token, the iterators might not be used + * immediately. Using @c boost::asio::use_future causes them to be used + * immediately, but @c boost::asio::deferred or @c + * boost::asio::use_awaitable does not (they are only used when + * awaiting the result). If they are not used immediately, the caller + * must keep them valid (as well as the data they reference) until they + * are used. + */ + template + auto async_send_heaps(Iterator first, Iterator last, + CompletionToken &&token, + std::enable_if_t< + !std::is_convertible_v, + group_mode + > mode) + { + auto init = [this, first, last, mode](auto handler) + { + // Explicit this-> is to work around bogus warning from clang + this->async_send_heaps(first, last, std::move(handler), mode); + }; + return boost::asio::async_initiate< + CompletionToken, void(const boost::system::error_code &, item_pointer_t) + >(init, token); + } + /** * Get the number of substreams in this stream. */ diff --git a/src/meson.build b/src/meson.build index ce7bf66c6..a2eac19ec 100644 --- a/src/meson.build +++ b/src/meson.build @@ -150,6 +150,7 @@ if not get_option('python') 'unittest_recv_custom_memcpy.cpp', 'unittest_recv_stream_stats.cpp', 'unittest_semaphore.cpp', + 'unittest_send_completion.cpp', 'unittest_send_heap.cpp', 'unittest_send_streambuf.cpp', 'unittest_send_tcp.cpp', diff --git a/src/unittest_send_completion.cpp b/src/unittest_send_completion.cpp new file mode 100644 index 000000000..171121911 --- /dev/null +++ b/src/unittest_send_completion.cpp @@ -0,0 +1,174 @@ +/* Copyright 2023 National Research Foundation (SARAO) + * + * This program is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ + +/** + * @file + * + * Unit tests for send completion tokens. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace spead2::unittest +{ + +BOOST_AUTO_TEST_SUITE(send) +BOOST_AUTO_TEST_SUITE(completion) + +// empty heap: header, 4 standard items, padding item, padding +static constexpr std::size_t heap_size = 9 + 5 * sizeof(item_pointer_t); + +class promise_handler +{ + std::promise &promise; + +public: + explicit promise_handler(std::promise &promise) : promise(promise) {} + + void operator()(const boost::system::error_code &ec, item_pointer_t bytes_transferred) const + { + if (ec) + promise.set_exception(std::make_exception_ptr(boost::system::system_error(ec))); + else + promise.set_value(bytes_transferred); + } +}; + +static bool is_would_block(const boost::system::system_error &ex) +{ + return ex.code() == boost::asio::error::would_block; +} + +// Test async_send_heap with a completion handler +BOOST_AUTO_TEST_CASE(async_send_heap_handler) +{ + thread_pool tp; + auto queue = std::make_shared(); + spead2::send::inproc_stream stream(tp, {queue}); + spead2::send::heap heap; + + std::promise promise; + auto future = promise.get_future(); + bool result = stream.async_send_heap(heap, promise_handler(promise)); + BOOST_CHECK_EQUAL(result, true); + BOOST_CHECK_EQUAL(future.get(), heap_size); +} + +// Test async_send_heap with a generic token +BOOST_AUTO_TEST_CASE(async_send_heap_token) +{ + thread_pool tp; + auto queue = std::make_shared(); + spead2::send::inproc_stream stream(tp, {queue}); + spead2::send::heap heap; + + std::future future = stream.async_send_heap(heap, boost::asio::use_future); + BOOST_CHECK_EQUAL(future.get(), heap_size); +} + +// Test async_send_heaps with a completion handler +BOOST_AUTO_TEST_CASE(async_send_heaps_handler) +{ + thread_pool tp; + auto queue = std::make_shared(); + spead2::send::inproc_stream stream(tp, {queue}); + std::array heaps; + + std::promise promise; + auto future = promise.get_future(); + bool result = stream.async_send_heaps( + heaps.begin(), heaps.end(), + promise_handler(promise), + spead2::send::group_mode::SERIAL + ); + BOOST_CHECK_EQUAL(result, true); + BOOST_CHECK_EQUAL(future.get(), heap_size * heaps.size()); +} + +// Test async_send_heaps with a completion token +BOOST_AUTO_TEST_CASE(async_send_heaps_token) +{ + thread_pool tp; + auto queue = std::make_shared(); + spead2::send::inproc_stream stream(tp, {queue}); + std::array heaps; + + std::future future = stream.async_send_heaps( + heaps.begin(), heaps.end(), + boost::asio::use_future, + spead2::send::group_mode::SERIAL + ); + BOOST_CHECK_EQUAL(future.get(), heap_size * heaps.size()); +} + +// Test async_send_heaps failure case with a completion handler +BOOST_AUTO_TEST_CASE(async_send_heaps_failure_handler) +{ + thread_pool tp; + auto queue = std::make_shared(); + spead2::send::inproc_stream stream(tp, {queue}, spead2::send::stream_config().set_max_heaps(1)); + std::array heaps; + + std::promise promise; + auto future = promise.get_future(); + bool result = stream.async_send_heaps( + heaps.begin(), heaps.end(), + promise_handler(promise), + spead2::send::group_mode::SERIAL + ); + BOOST_CHECK_EQUAL(result, false); + BOOST_CHECK_EXCEPTION(future.get(), boost::system::system_error, is_would_block); +} + +// Test async_send_heaps failure case with a completion token +BOOST_AUTO_TEST_CASE(async_send_heaps_failure_token) +{ + thread_pool tp; + auto queue = std::make_shared(); + spead2::send::inproc_stream stream(tp, {queue}, spead2::send::stream_config().set_max_heaps(1)); + std::array heaps; + + std::promise promise; + auto future = promise.get_future(); + bool result = stream.async_send_heaps( + heaps.begin(), heaps.end(), + [&promise](const boost::system::error_code &ec, item_pointer_t bytes_transferred) + { + if (ec) + promise.set_exception(std::make_exception_ptr(boost::system::system_error(ec))); + else + promise.set_value(bytes_transferred); + }, + spead2::send::group_mode::SERIAL + ); + BOOST_CHECK_EQUAL(result, false); + BOOST_CHECK_EXCEPTION(future.get(), boost::system::system_error, is_would_block); +} + +BOOST_AUTO_TEST_SUITE_END() // completion +BOOST_AUTO_TEST_SUITE_END() // send + +} // namespace spead2::unittest