From 23a724cf5c3026155550dc802d283d713fdfd97e Mon Sep 17 00:00:00 2001 From: Rogiel Sulzbach Date: Sat, 9 Sep 2023 23:30:17 -0300 Subject: [PATCH] Ensure that continuables that are resolved immediately are always symmetrically transferable --- .../continuable/detail/other/coroutines.hpp | 25 ++++++- .../multi/test-continuable-await.cpp | 72 +++++++++++++++++++ 2 files changed, 95 insertions(+), 2 deletions(-) diff --git a/include/continuable/detail/other/coroutines.hpp b/include/continuable/detail/other/coroutines.hpp index df15036..e4cf69f 100644 --- a/include/continuable/detail/other/coroutines.hpp +++ b/include/continuable/detail/other/coroutines.hpp @@ -96,6 +96,16 @@ class awaitable { /// A cache which is used to pass the result of the continuation /// to the coroutine. result_t result_; + /// Enumeration that represents the suspension state of the awaitable. + enum class state : std::uint8_t { + suspended, + pending, + resolved, + }; + /// An atomic that specifies whether the awaitable has suspended or not. + /// Allows to perform symmetric transfer on continuables that are + /// immediately resolved. + std::atomic state_{state::pending}; public: explicit constexpr awaitable(Continuable&& continuable) @@ -117,16 +127,27 @@ class awaitable { /// Suspend the current context // TODO Convert this to an r-value function once possible - void await_suspend(coroutine_handle<> h) { + bool await_suspend(coroutine_handle<> h) { assert(result_.is_empty()); // Forward every result to the current awaitable std::move(continuable_) .next([h, this](auto&&... args) mutable { assert(result_.is_empty()); result_ = result_t::from(std::forward(args)...); - h.resume(); + + // If true, it means that the promise was suspended (i.e., the + // awaitable await_suspend method has already returned). That + // means we must call the resume coroutine from the continuation + // chain. + if (state_.exchange(state::resolved, std::memory_order_acq_rel) == + state::suspended) { + return h.resume(); + } }) .done(); + + return state_.exchange(state::suspended, std::memory_order_acq_rel) != + state::resolved; } /// Resume the coroutine represented by the handle diff --git a/test/unit-test/multi/test-continuable-await.cpp b/test/unit-test/multi/test-continuable-await.cpp index 9d63ba4..d262913 100644 --- a/test/unit-test/multi/test-continuable-await.cpp +++ b/test/unit-test/multi/test-continuable-await.cpp @@ -169,6 +169,78 @@ TYPED_TEST(single_dimension_tests, are_awaitable_with_cancellation_from_coro) { ASSERT_ASYNC_CANCELLATION(resolve_coro_canceled(supply)) } +template +cti::continuable<> test_symmetric_transfer(S&& supplier) { + // If symmetric transfer is not working properly, large + // loops will quickly cause stack overflows. + for (size_t index = 0; index < 10000; index++) { + co_await supplier(); + } + co_return; +} + +TYPED_TEST(single_dimension_tests, are_symmetric_transferable) { + auto const& supply = [&]() { + return cti::make_continuable([](auto&& promise) { + promise.set_value(0); + }); + }; + + ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply)); +} + +TYPED_TEST(single_dimension_tests, are_symmetric_transferable_type_erased) { + auto const& supply = [&]() -> cti::continuable { + return cti::make_continuable([](auto&& promise) { + promise.set_value(0); + }); + }; + + ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply)); +} + +TYPED_TEST(single_dimension_tests, + are_symmetric_transferable_using_make_ready) { + auto const& supply = [&]() { + return cti::make_ready_continuable(0); + }; + + ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply)); +} + +TYPED_TEST(single_dimension_tests, + are_symmetric_transferable_using_type_erased_make_ready) { + auto const& supply = [&]() -> cti::continuable { + return cti::make_ready_continuable(0); + }; + + ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply)); +} + +TYPED_TEST(single_dimension_tests, are_symmetric_transferable_using_type_erased_from_thread) { + auto const& supply = [&]() -> cti::continuable { + return cti::make_continuable([](auto&& promise) { + std::async(std::launch::async, std::forward(promise), 0); + }); + }; + + ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply)); +} + +TYPED_TEST(single_dimension_tests, are_symmetric_transferable_except) { + size_t count = 0; + auto const& supply = [&]() -> cti::continuable { + // NOTE: The symmetric transfer loop does 10000 iterations. + if(++count == 5000) { + return cti::make_exceptional_continuable( + std::make_exception_ptr(std::runtime_error("Failed"))); + } + return cti::make_ready_continuable(0); + }; + + ASSERT_ASYNC_EXCEPTION_COMPLETION(test_symmetric_transfer(supply)); +} + # endif // CONTINUABLE_WITH_NO_EXCEPTIONS #endif // CONTINUABLE_HAS_EXPERIMENTAL_COROUTINE