Skip to content

Commit

Permalink
Merge pull request #293 from ska-sa/completion-tokens
Browse files Browse the repository at this point in the history
Make async_send_heap and async_send_heaps work with completion tokens
  • Loading branch information
bmerry authored Nov 28, 2023
2 parents 2a80f54 + a3b39f6 commit 4cae56d
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 0 deletions.
65 changes: 65 additions & 0 deletions include/spead2/send_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,34 @@ class stream
s_item_pointer_t cnt = -1,
std::size_t substream_index = 0);

/**
* Send @a h asynchronously, with an arbitrary completion token. This
* overload is not used if the completion token is convertible to
* @ref completion_handler.
*
* Refer to the other overload for details. The boolean return of the other
* overload is absent. You will need to retrieve the asynchronous result
* and check for @c boost::asio::error::would_block to determine if the
* heaps were rejected due to lack of buffer space.
*/
template<typename CompletionToken>
auto async_send_heap(const heap &h, CompletionToken &&token,
s_item_pointer_t cnt = -1,
std::enable_if_t<
!std::is_convertible_v<CompletionToken, completion_handler>,
std::size_t
> substream_index = 0)
{
auto init = [this, &h, cnt, substream_index](auto handler)
{
// Explicit this-> is to work around bogus warning from clang
this->async_send_heap(h, std::move(handler), cnt, substream_index);
};
return boost::asio::async_initiate<
CompletionToken, void(const boost::system::error_code &, item_pointer_t)
>(init, token);
}

/**
* Send a group of heaps asynchronously, with @a handler called on
* completion. The caller must ensure that the @ref heap objects
Expand Down Expand Up @@ -466,6 +494,43 @@ class stream
return async_send_heaps_impl<unwinder, Iterator>(first, last, std::move(handler), mode);
}

/**
* Send a group of heaps asynchronously, with an arbitrary completion
* token (e.g., @c boost::asio::use_future). This overload is not used
* if the completion token is convertible to @ref completion_handler.
*
* Refer to the other overload for details. There are a few differences:
*
* - The boolean return of the other overload is absent. You will need to
* retrieve the asynchronous result and check for @c
* boost::asio::error::would_block to determine if the heaps were
* rejected due to lack of buffer space.
* - Depending on the completion token, the iterators might not be used
* immediately. Using @c boost::asio::use_future causes them to be used
* immediately, but @c boost::asio::deferred or @c
* boost::asio::use_awaitable does not (they are only used when
* awaiting the result). If they are not used immediately, the caller
* must keep them valid (as well as the data they reference) until they
* are used.
*/
template<typename Iterator, typename CompletionToken>
auto async_send_heaps(Iterator first, Iterator last,
CompletionToken &&token,
std::enable_if_t<
!std::is_convertible_v<CompletionToken, completion_handler>,
group_mode
> mode)
{
auto init = [this, first, last, mode](auto handler)
{
// Explicit this-> is to work around bogus warning from clang
this->async_send_heaps(first, last, std::move(handler), mode);
};
return boost::asio::async_initiate<
CompletionToken, void(const boost::system::error_code &, item_pointer_t)
>(init, token);
}

/**
* Get the number of substreams in this stream.
*/
Expand Down
1 change: 1 addition & 0 deletions src/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ if not get_option('python')
'unittest_recv_custom_memcpy.cpp',
'unittest_recv_stream_stats.cpp',
'unittest_semaphore.cpp',
'unittest_send_completion.cpp',
'unittest_send_heap.cpp',
'unittest_send_streambuf.cpp',
'unittest_send_tcp.cpp',
Expand Down
174 changes: 174 additions & 0 deletions src/unittest_send_completion.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/* Copyright 2023 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

/**
* @file
*
* Unit tests for send completion tokens.
*/

#include <exception>
#include <memory>
#include <future>
#include <utility>
#include <cstddef>
#include <boost/asio.hpp>
#include <boost/test/unit_test.hpp>
#include <spead2/common_defines.h>
#include <spead2/common_thread_pool.h>
#include <spead2/common_inproc.h>
#include <spead2/send_inproc.h>

namespace spead2::unittest
{

BOOST_AUTO_TEST_SUITE(send)
BOOST_AUTO_TEST_SUITE(completion)

// empty heap: header, 4 standard items, padding item, padding
static constexpr std::size_t heap_size = 9 + 5 * sizeof(item_pointer_t);

class promise_handler
{
std::promise<item_pointer_t> &promise;

public:
explicit promise_handler(std::promise<item_pointer_t> &promise) : promise(promise) {}

void operator()(const boost::system::error_code &ec, item_pointer_t bytes_transferred) const
{
if (ec)
promise.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
else
promise.set_value(bytes_transferred);
}
};

static bool is_would_block(const boost::system::system_error &ex)
{
return ex.code() == boost::asio::error::would_block;
}

// Test async_send_heap with a completion handler
BOOST_AUTO_TEST_CASE(async_send_heap_handler)
{
thread_pool tp;
auto queue = std::make_shared<inproc_queue>();
spead2::send::inproc_stream stream(tp, {queue});
spead2::send::heap heap;

std::promise<item_pointer_t> promise;
auto future = promise.get_future();
bool result = stream.async_send_heap(heap, promise_handler(promise));
BOOST_CHECK_EQUAL(result, true);
BOOST_CHECK_EQUAL(future.get(), heap_size);
}

// Test async_send_heap with a generic token
BOOST_AUTO_TEST_CASE(async_send_heap_token)
{
thread_pool tp;
auto queue = std::make_shared<inproc_queue>();
spead2::send::inproc_stream stream(tp, {queue});
spead2::send::heap heap;

std::future<item_pointer_t> future = stream.async_send_heap(heap, boost::asio::use_future);
BOOST_CHECK_EQUAL(future.get(), heap_size);
}

// Test async_send_heaps with a completion handler
BOOST_AUTO_TEST_CASE(async_send_heaps_handler)
{
thread_pool tp;
auto queue = std::make_shared<inproc_queue>();
spead2::send::inproc_stream stream(tp, {queue});
std::array<spead2::send::heap, 2> heaps;

std::promise<item_pointer_t> promise;
auto future = promise.get_future();
bool result = stream.async_send_heaps(
heaps.begin(), heaps.end(),
promise_handler(promise),
spead2::send::group_mode::SERIAL
);
BOOST_CHECK_EQUAL(result, true);
BOOST_CHECK_EQUAL(future.get(), heap_size * heaps.size());
}

// Test async_send_heaps with a completion token
BOOST_AUTO_TEST_CASE(async_send_heaps_token)
{
thread_pool tp;
auto queue = std::make_shared<inproc_queue>();
spead2::send::inproc_stream stream(tp, {queue});
std::array<spead2::send::heap, 2> heaps;

std::future<item_pointer_t> future = stream.async_send_heaps(
heaps.begin(), heaps.end(),
boost::asio::use_future,
spead2::send::group_mode::SERIAL
);
BOOST_CHECK_EQUAL(future.get(), heap_size * heaps.size());
}

// Test async_send_heaps failure case with a completion handler
BOOST_AUTO_TEST_CASE(async_send_heaps_failure_handler)
{
thread_pool tp;
auto queue = std::make_shared<inproc_queue>();
spead2::send::inproc_stream stream(tp, {queue}, spead2::send::stream_config().set_max_heaps(1));
std::array<spead2::send::heap, 2> heaps;

std::promise<item_pointer_t> promise;
auto future = promise.get_future();
bool result = stream.async_send_heaps(
heaps.begin(), heaps.end(),
promise_handler(promise),
spead2::send::group_mode::SERIAL
);
BOOST_CHECK_EQUAL(result, false);
BOOST_CHECK_EXCEPTION(future.get(), boost::system::system_error, is_would_block);
}

// Test async_send_heaps failure case with a completion token
BOOST_AUTO_TEST_CASE(async_send_heaps_failure_token)
{
thread_pool tp;
auto queue = std::make_shared<inproc_queue>();
spead2::send::inproc_stream stream(tp, {queue}, spead2::send::stream_config().set_max_heaps(1));
std::array<spead2::send::heap, 2> heaps;

std::promise<item_pointer_t> promise;
auto future = promise.get_future();
bool result = stream.async_send_heaps(
heaps.begin(), heaps.end(),
[&promise](const boost::system::error_code &ec, item_pointer_t bytes_transferred)
{
if (ec)
promise.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
else
promise.set_value(bytes_transferred);
},
spead2::send::group_mode::SERIAL
);
BOOST_CHECK_EQUAL(result, false);
BOOST_CHECK_EXCEPTION(future.get(), boost::system::system_error, is_would_block);
}

BOOST_AUTO_TEST_SUITE_END() // completion
BOOST_AUTO_TEST_SUITE_END() // send

} // namespace spead2::unittest

0 comments on commit 4cae56d

Please sign in to comment.