LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 99.3 % 147 146 1
Test Date: 2026-03-02 22:33:00 Functions: 90.6 % 556 504 52

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      11                 : #define BOOST_CAPY_WHEN_ANY_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/concept/executor.hpp>
      15                 : #include <boost/capy/concept/io_awaitable.hpp>
      16                 : #include <coroutine>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : #include <boost/capy/ex/frame_allocator.hpp>
      19                 : #include <boost/capy/ex/io_env.hpp>
      20                 : #include <boost/capy/task.hpp>
      21                 : 
      22                 : #include <array>
      23                 : #include <atomic>
      24                 : #include <exception>
      25                 : #include <optional>
      26                 : #include <ranges>
      27                 : #include <stdexcept>
      28                 : #include <stop_token>
      29                 : #include <tuple>
      30                 : #include <type_traits>
      31                 : #include <utility>
      32                 : #include <variant>
      33                 : #include <vector>
      34                 : 
      35                 : /*
      36                 :    when_any - Race multiple tasks, return first completion
      37                 :    ========================================================
      38                 : 
      39                 :    OVERVIEW:
      40                 :    ---------
      41                 :    when_any launches N tasks concurrently and completes when the FIRST task
      42                 :    finishes (success or failure). It then requests stop for all siblings and
      43                 :    waits for them to acknowledge before returning.
      44                 : 
      45                 :    ARCHITECTURE:
      46                 :    -------------
      47                 :    The design mirrors when_all but with inverted completion semantics:
      48                 : 
      49                 :      when_all:  complete when remaining_count reaches 0 (all done)
      50                 :      when_any:  complete when has_winner becomes true (first done)
      51                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      52                 : 
      53                 :    Key components:
      54                 :      - when_any_state:    Shared state tracking winner and completion
      55                 :      - when_any_runner:   Wrapper coroutine for each child task
      56                 :      - when_any_launcher: Awaitable that starts all runners concurrently
      57                 : 
      58                 :    CRITICAL INVARIANTS:
      59                 :    --------------------
      60                 :    1. Exactly one task becomes the winner (via atomic compare_exchange)
      61                 :    2. All tasks must complete before parent resumes (cleanup safety)
      62                 :    3. Stop is requested immediately when winner is determined
      63                 :    4. Only the winner's result/exception is stored
      64                 : 
      65                 :    POSITIONAL VARIANT:
      66                 :    -------------------
      67                 :    The variadic overload returns a std::variant with one alternative per
      68                 :    input task, preserving positional correspondence. Use .index() on
      69                 :    the variant to identify which task won.
      70                 : 
      71                 :    Example: when_any(task<int>, task<string>, task<int>)
      72                 :      - Raw types after void->monostate: int, string, int
      73                 :      - Result variant: std::variant<int, string, int>
      74                 :      - variant.index() tells you which task won (0, 1, or 2)
      75                 : 
      76                 :    VOID HANDLING:
      77                 :    --------------
      78                 :    void tasks contribute std::monostate to the variant.
      79                 :    All-void tasks result in: variant<monostate, monostate, monostate>
      80                 : 
      81                 :    MEMORY MODEL:
      82                 :    -------------
      83                 :    Synchronization chain from winner's write to parent's read:
      84                 : 
      85                 :    1. Winner thread writes result_/winner_exception_ (non-atomic)
      86                 :    2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
      87                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      88                 :       → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      89                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      90                 :    5. Parent coroutine resumes and reads result_/winner_exception_
      91                 : 
      92                 :    Synchronization analysis:
      93                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      94                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      95                 :      in the modification order of remaining_count_
      96                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
      97                 :      modification order, establishing happens-before from winner's writes
      98                 :    - Executor dispatch() is expected to provide queue-based synchronization
      99                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     100                 :    - Even inline executors work (same thread = sequenced-before)
     101                 : 
     102                 :    Alternative considered: Adding winner_ready_ atomic (set with release after
     103                 :    storing winner data, acquired before reading) would make synchronization
     104                 :    self-contained and not rely on executor implementation details. Current
     105                 :    approach is correct but requires careful reasoning about release sequences
     106                 :    and executor behavior.
     107                 : 
     108                 :    EXCEPTION SEMANTICS:
     109                 :    --------------------
     110                 :    Unlike when_all (which captures first exception, discards others), when_any
     111                 :    treats exceptions as valid completions. If the winning task threw, that
     112                 :    exception is rethrown. Exceptions from non-winners are silently discarded.
     113                 : */
     114                 : 
     115                 : namespace boost {
     116                 : namespace capy {
     117                 : 
     118                 : namespace detail {
     119                 : 
     120                 : /** Convert void to monostate for variant storage.
     121                 : 
     122                 :     std::variant<void, ...> is ill-formed, so void tasks contribute
     123                 :     std::monostate to the result variant instead. Non-void types
     124                 :     pass through unchanged.
     125                 : 
     126                 :     @tparam T The type to potentially convert (void becomes monostate).
     127                 : */
     128                 : template<typename T>
     129                 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
     130                 : 
     131                 : // Result variant: one alternative per task, preserving positional
     132                 : // correspondence. Use .index() to identify which task won.
     133                 : // void results become monostate.
     134                 : template<typename T0, typename... Ts>
     135                 : using when_any_variant_t = std::variant<void_to_monostate_t<T0>, void_to_monostate_t<Ts>...>;
     136                 : 
     137                 : /** Core shared state for when_any operations.
     138                 : 
     139                 :     Contains all members and methods common to both heterogeneous (variadic)
     140                 :     and homogeneous (range) when_any implementations. State classes embed
     141                 :     this via composition to avoid CRTP destructor ordering issues.
     142                 : 
     143                 :     @par Thread Safety
     144                 :     Atomic operations protect winner selection and completion count.
     145                 : */
     146                 : struct when_any_core
     147                 : {
     148                 :     std::atomic<std::size_t> remaining_count_;
     149                 :     std::size_t winner_index_{0};
     150                 :     std::exception_ptr winner_exception_;
     151                 :     std::stop_source stop_source_;
     152                 : 
     153                 :     // Bridges parent's stop token to our stop_source
     154                 :     struct stop_callback_fn
     155                 :     {
     156                 :         std::stop_source* source_;
     157 HIT           9 :         void operator()() const noexcept { source_->request_stop(); }
     158                 :     };
     159                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     160                 :     std::optional<stop_callback_t> parent_stop_callback_;
     161                 : 
     162                 :     std::coroutine_handle<> continuation_;
     163                 :     io_env const* caller_env_ = nullptr;
     164                 : 
     165                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     166                 :     std::atomic<bool> has_winner_{false};
     167                 : 
     168              65 :     explicit when_any_core(std::size_t count) noexcept
     169              65 :         : remaining_count_(count)
     170                 :     {
     171              65 :     }
     172                 : 
     173                 :     /** Atomically claim winner status; exactly one task succeeds. */
     174             190 :     bool try_win(std::size_t index) noexcept
     175                 :     {
     176             190 :         bool expected = false;
     177             190 :         if(has_winner_.compare_exchange_strong(
     178                 :             expected, true, std::memory_order_acq_rel))
     179                 :         {
     180              65 :             winner_index_ = index;
     181              65 :             stop_source_.request_stop();
     182              65 :             return true;
     183                 :         }
     184             125 :         return false;
     185                 :     }
     186                 : 
     187                 :     /** @pre try_win() returned true. */
     188               8 :     void set_winner_exception(std::exception_ptr ep) noexcept
     189                 :     {
     190               8 :         winner_exception_ = ep;
     191               8 :     }
     192                 : 
     193                 :     // Runners signal completion directly via final_suspend; no member function needed.
     194                 : };
     195                 : 
     196                 : /** Shared state for heterogeneous when_any operation.
     197                 : 
     198                 :     Coordinates winner selection, result storage, and completion tracking
     199                 :     for all child tasks in a when_any operation. Uses composition with
     200                 :     when_any_core for shared functionality.
     201                 : 
     202                 :     @par Lifetime
     203                 :     Allocated on the parent coroutine's frame, outlives all runners.
     204                 : 
     205                 :     @tparam T0 First task's result type.
     206                 :     @tparam Ts Remaining tasks' result types.
     207                 : */
     208                 : template<typename T0, typename... Ts>
     209                 : struct when_any_state
     210                 : {
     211                 :     static constexpr std::size_t task_count = 1 + sizeof...(Ts);
     212                 :     using variant_type = when_any_variant_t<T0, Ts...>;
     213                 : 
     214                 :     when_any_core core_;
     215                 :     std::optional<variant_type> result_;
     216                 :     std::array<std::coroutine_handle<>, task_count> runner_handles_{};
     217                 : 
     218              43 :     when_any_state()
     219              43 :         : core_(task_count)
     220                 :     {
     221              43 :     }
     222                 : 
     223                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     224                 : 
     225                 :     /** @pre core_.try_win() returned true.
     226                 :         @note Uses in_place_index (not type) for positional variant access.
     227                 :     */
     228                 :     template<std::size_t I, typename T>
     229              35 :     void set_winner_result(T value)
     230                 :         noexcept(std::is_nothrow_move_constructible_v<T>)
     231                 :     {
     232              35 :         result_.emplace(std::in_place_index<I>, std::move(value));
     233              35 :     }
     234                 : 
     235                 :     /** @pre core_.try_win() returned true. */
     236                 :     template<std::size_t I>
     237               3 :     void set_winner_void() noexcept
     238                 :     {
     239               3 :         result_.emplace(std::in_place_index<I>, std::monostate{});
     240               3 :     }
     241                 : };
     242                 : 
     243                 : /** Wrapper coroutine that runs a single child task for when_any.
     244                 : 
     245                 :     Propagates executor/stop_token to the child, attempts to claim winner
     246                 :     status on completion, and signals completion for cleanup coordination.
     247                 : 
     248                 :     @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
     249                 : */
     250                 : template<typename StateType>
     251                 : struct when_any_runner
     252                 : {
     253                 :     struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
     254                 :     {
     255                 :         StateType* state_ = nullptr;
     256                 :         std::size_t index_ = 0;
     257                 :         io_env env_;
     258                 : 
     259             190 :         when_any_runner get_return_object() noexcept
     260                 :         {
     261             190 :             return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
     262                 :         }
     263                 : 
     264                 :         // Starts suspended; launcher sets up state/ex/token then resumes
     265             190 :         std::suspend_always initial_suspend() noexcept
     266                 :         {
     267             190 :             return {};
     268                 :         }
     269                 : 
     270             190 :         auto final_suspend() noexcept
     271                 :         {
     272                 :             struct awaiter
     273                 :             {
     274                 :                 promise_type* p_;
     275             190 :                 bool await_ready() const noexcept { return false; }
     276             190 :                 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
     277                 :                 {
     278                 :                     // Extract everything needed before self-destruction.
     279             190 :                     auto& core = p_->state_->core_;
     280             190 :                     auto* counter = &core.remaining_count_;
     281             190 :                     auto* caller_env = core.caller_env_;
     282             190 :                     auto cont = core.continuation_;
     283                 : 
     284             190 :                     h.destroy();
     285                 : 
     286                 :                     // If last runner, dispatch parent for symmetric transfer.
     287             190 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     288             190 :                     if(remaining == 1)
     289              65 :                         return caller_env->executor.dispatch(cont);
     290             125 :                     return std::noop_coroutine();
     291                 :                 }
     292 MIS           0 :                 void await_resume() const noexcept {}
     293                 :             };
     294 HIT         190 :             return awaiter{this};
     295                 :         }
     296                 : 
     297             178 :         void return_void() noexcept {}
     298                 : 
     299                 :         // Exceptions are valid completions in when_any (unlike when_all)
     300              12 :         void unhandled_exception()
     301                 :         {
     302              12 :             if(state_->core_.try_win(index_))
     303               8 :                 state_->core_.set_winner_exception(std::current_exception());
     304              12 :         }
     305                 : 
     306                 :         /** Injects executor and stop token into child awaitables. */
     307                 :         template<class Awaitable>
     308                 :         struct transform_awaiter
     309                 :         {
     310                 :             std::decay_t<Awaitable> a_;
     311                 :             promise_type* p_;
     312                 : 
     313             190 :             bool await_ready() { return a_.await_ready(); }
     314             190 :             auto await_resume() { return a_.await_resume(); }
     315                 : 
     316                 :             template<class Promise>
     317             185 :             auto await_suspend(std::coroutine_handle<Promise> h)
     318                 :             {
     319                 : #ifdef _MSC_VER
     320                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     321                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     322                 :                     a_.await_suspend(h, &p_->env_).resume();
     323                 :                 else
     324                 :                     return a_.await_suspend(h, &p_->env_);
     325                 : #else
     326             185 :                 return a_.await_suspend(h, &p_->env_);
     327                 : #endif
     328                 :             }
     329                 :         };
     330                 : 
     331                 :         template<class Awaitable>
     332             190 :         auto await_transform(Awaitable&& a)
     333                 :         {
     334                 :             using A = std::decay_t<Awaitable>;
     335                 :             if constexpr (IoAwaitable<A>)
     336                 :             {
     337                 :                 return transform_awaiter<Awaitable>{
     338             380 :                     std::forward<Awaitable>(a), this};
     339                 :             }
     340                 :             else
     341                 :             {
     342                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     343                 :             }
     344             190 :         }
     345                 :     };
     346                 : 
     347                 :     std::coroutine_handle<promise_type> h_;
     348                 : 
     349             190 :     explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
     350             190 :         : h_(h)
     351                 :     {
     352             190 :     }
     353                 : 
     354                 :     // Enable move for all clang versions - some versions need it
     355                 :     when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
     356                 : 
     357                 :     // Non-copyable
     358                 :     when_any_runner(when_any_runner const&) = delete;
     359                 :     when_any_runner& operator=(when_any_runner const&) = delete;
     360                 :     when_any_runner& operator=(when_any_runner&&) = delete;
     361                 : 
     362             190 :     auto release() noexcept
     363                 :     {
     364             190 :         return std::exchange(h_, nullptr);
     365                 :     }
     366                 : };
     367                 : 
     368                 : /** Indexed overload for heterogeneous when_any (compile-time index).
     369                 : 
     370                 :     Uses compile-time index I for variant construction via in_place_index.
     371                 :     Called from when_any_launcher::launch_one<I>().
     372                 : */
     373                 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
     374                 : when_any_runner<StateType>
     375             105 : make_when_any_runner(Awaitable inner, StateType* state)
     376                 : {
     377                 :     using T = awaitable_result_t<Awaitable>;
     378                 :     if constexpr (std::is_void_v<T>)
     379                 :     {
     380                 :         co_await std::move(inner);
     381                 :         if(state->core_.try_win(I))
     382                 :             state->template set_winner_void<I>();
     383                 :     }
     384                 :     else
     385                 :     {
     386                 :         auto result = co_await std::move(inner);
     387                 :         if(state->core_.try_win(I))
     388                 :         {
     389                 :             try
     390                 :             {
     391                 :                 state->template set_winner_result<I>(std::move(result));
     392                 :             }
     393                 :             catch(...)
     394                 :             {
     395                 :                 state->core_.set_winner_exception(std::current_exception());
     396                 :             }
     397                 :         }
     398                 :     }
     399             210 : }
     400                 : 
     401                 : /** Runtime-index overload for homogeneous when_any (range path).
     402                 : 
     403                 :     Uses requires-expressions to detect state capabilities:
     404                 :     - set_winner_void(): for heterogeneous void tasks (stores monostate)
     405                 :     - set_winner_result(): for non-void tasks
     406                 :     - Neither: for homogeneous void tasks (no result storage)
     407                 : */
     408                 : template<IoAwaitable Awaitable, typename StateType>
     409                 : when_any_runner<StateType>
     410              85 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
     411                 : {
     412                 :     using T = awaitable_result_t<Awaitable>;
     413                 :     if constexpr (std::is_void_v<T>)
     414                 :     {
     415                 :         co_await std::move(inner);
     416                 :         if(state->core_.try_win(index))
     417                 :         {
     418                 :             if constexpr (requires { state->set_winner_void(); })
     419                 :                 state->set_winner_void();
     420                 :         }
     421                 :     }
     422                 :     else
     423                 :     {
     424                 :         auto result = co_await std::move(inner);
     425                 :         if(state->core_.try_win(index))
     426                 :         {
     427                 :             try
     428                 :             {
     429                 :                 state->set_winner_result(std::move(result));
     430                 :             }
     431                 :             catch(...)
     432                 :             {
     433                 :                 state->core_.set_winner_exception(std::current_exception());
     434                 :             }
     435                 :         }
     436                 :     }
     437             170 : }
     438                 : 
     439                 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     440                 : template<IoAwaitable... Awaitables>
     441                 : class when_any_launcher
     442                 : {
     443                 :     using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
     444                 : 
     445                 :     std::tuple<Awaitables...>* tasks_;
     446                 :     state_type* state_;
     447                 : 
     448                 : public:
     449              43 :     when_any_launcher(
     450                 :         std::tuple<Awaitables...>* tasks,
     451                 :         state_type* state)
     452              43 :         : tasks_(tasks)
     453              43 :         , state_(state)
     454                 :     {
     455              43 :     }
     456                 : 
     457              43 :     bool await_ready() const noexcept
     458                 :     {
     459              43 :         return sizeof...(Awaitables) == 0;
     460                 :     }
     461                 : 
     462                 :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     463                 :         destroys this object before await_suspend returns. Must not reference
     464                 :         `this` after the final launch_one call.
     465                 :     */
     466              43 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     467                 :     {
     468              43 :         state_->core_.continuation_ = continuation;
     469              43 :         state_->core_.caller_env_ = caller_env;
     470                 : 
     471              43 :         if(caller_env->stop_token.stop_possible())
     472                 :         {
     473              18 :             state_->core_.parent_stop_callback_.emplace(
     474               9 :                 caller_env->stop_token,
     475               9 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     476                 : 
     477               9 :             if(caller_env->stop_token.stop_requested())
     478               3 :                 state_->core_.stop_source_.request_stop();
     479                 :         }
     480                 : 
     481              43 :         auto token = state_->core_.stop_source_.get_token();
     482              86 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     483              43 :             (..., launch_one<Is>(caller_env->executor, token));
     484              43 :         }(std::index_sequence_for<Awaitables...>{});
     485                 : 
     486              86 :         return std::noop_coroutine();
     487              43 :     }
     488                 : 
     489              43 :     void await_resume() const noexcept
     490                 :     {
     491              43 :     }
     492                 : 
     493                 : private:
     494                 :     /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
     495                 :     template<std::size_t I>
     496             105 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     497                 :     {
     498             105 :         auto runner = make_when_any_runner<I>(
     499             105 :             std::move(std::get<I>(*tasks_)), state_);
     500                 : 
     501             105 :         auto h = runner.release();
     502             105 :         h.promise().state_ = state_;
     503             105 :         h.promise().index_ = I;
     504             105 :         h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
     505                 : 
     506             105 :         std::coroutine_handle<> ch{h};
     507             105 :         state_->runner_handles_[I] = ch;
     508             105 :         caller_ex.post(ch);
     509             210 :     }
     510                 : };
     511                 : 
     512                 : } // namespace detail
     513                 : 
     514                 : /** Wait for the first awaitable to complete.
     515                 : 
     516                 :     Races multiple heterogeneous awaitables concurrently and returns when the
     517                 :     first one completes. The result is a variant with one alternative per
     518                 :     input task, preserving positional correspondence.
     519                 : 
     520                 :     @par Suspends
     521                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     522                 :     are launched concurrently and execute in parallel. The coroutine resumes
     523                 :     only after all awaitables have completed, even though the winner is
     524                 :     determined by the first to finish.
     525                 : 
     526                 :     @par Completion Conditions
     527                 :     @li Winner is determined when the first awaitable completes (success or exception)
     528                 :     @li Only one task can claim winner status via atomic compare-exchange
     529                 :     @li Once a winner exists, stop is requested for all remaining siblings
     530                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     531                 :     @li The winner's result is returned; if the winner threw, the exception is rethrown
     532                 : 
     533                 :     @par Cancellation Semantics
     534                 :     Cancellation is supported via stop_token propagated through the
     535                 :     IoAwaitable protocol:
     536                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     537                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     538                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     539                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     540                 :     @li Stop requests are cooperative; tasks must check and respond to them
     541                 : 
     542                 :     @par Concurrency/Overlap
     543                 :     All awaitables are launched concurrently before any can complete.
     544                 :     The launcher iterates through the arguments, starting each task on the
     545                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     546                 :     executors or interleave on single-threaded executors. There is no
     547                 :     guaranteed ordering of task completion.
     548                 : 
     549                 :     @par Notable Error Conditions
     550                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     551                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     552                 :     @li Cancellation: tasks may complete via cancellation without throwing
     553                 : 
     554                 :     @par Example
     555                 :     @code
     556                 :     task<void> example() {
     557                 :         auto result = co_await when_any(
     558                 :             fetch_int(),      // task<int>
     559                 :             fetch_string()    // task<std::string>
     560                 :         );
     561                 :         // result.index() is 0 or 1
     562                 :         if (result.index() == 0)
     563                 :             std::cout << "Got int: " << std::get<0>(result) << "\n";
     564                 :         else
     565                 :             std::cout << "Got string: " << std::get<1>(result) << "\n";
     566                 :     }
     567                 :     @endcode
     568                 : 
     569                 :     @tparam A0 First awaitable type (must satisfy IoAwaitable).
     570                 :     @tparam As Remaining awaitable types (must satisfy IoAwaitable).
     571                 :     @param a0 The first awaitable to race.
     572                 :     @param as Additional awaitables to race concurrently.
     573                 :     @return A task yielding a variant with one alternative per awaitable.
     574                 :         Use .index() to identify the winner. Void awaitables contribute
     575                 :         std::monostate.
     576                 : 
     577                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     578                 : 
     579                 :     @par Remarks
     580                 :     Awaitables are moved into the coroutine frame; original objects become
     581                 :     empty after the call. The variant preserves one alternative per input
     582                 :     task. Use .index() to determine which awaitable completed first.
     583                 :     Void awaitables contribute std::monostate to the variant.
     584                 : 
     585                 :     @see when_all, IoAwaitable
     586                 : */
     587                 : template<IoAwaitable A0, IoAwaitable... As>
     588              43 : [[nodiscard]] auto when_any(A0 a0, As... as)
     589                 :     -> task<detail::when_any_variant_t<
     590                 :         detail::awaitable_result_t<A0>,
     591                 :         detail::awaitable_result_t<As>...>>
     592                 : {
     593                 :     detail::when_any_state<
     594                 :         detail::awaitable_result_t<A0>,
     595                 :         detail::awaitable_result_t<As>...> state;
     596                 :     std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
     597                 : 
     598                 :     co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
     599                 : 
     600                 :     if(state.core_.winner_exception_)
     601                 :         std::rethrow_exception(state.core_.winner_exception_);
     602                 : 
     603                 :     co_return std::move(*state.result_);
     604              86 : }
     605                 : 
     606                 : /** Concept for ranges of full I/O awaitables.
     607                 : 
     608                 :     A range satisfies `IoAwaitableRange` if it is a sized input range
     609                 :     whose value type satisfies @ref IoAwaitable. This enables when_any
     610                 :     to accept any container or view of awaitables, not just std::vector.
     611                 : 
     612                 :     @tparam R The range type.
     613                 : 
     614                 :     @par Requirements
     615                 :     @li `R` must satisfy `std::ranges::input_range`
     616                 :     @li `R` must satisfy `std::ranges::sized_range`
     617                 :     @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
     618                 : 
     619                 :     @par Syntactic Requirements
     620                 :     Given `r` of type `R`:
     621                 :     @li `std::ranges::begin(r)` is valid
     622                 :     @li `std::ranges::end(r)` is valid
     623                 :     @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
     624                 :     @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
     625                 : 
     626                 :     @par Example
     627                 :     @code
     628                 :     template<IoAwaitableRange R>
     629                 :     task<void> race_all(R&& awaitables) {
     630                 :         auto winner = co_await when_any(std::forward<R>(awaitables));
     631                 :         // Process winner...
     632                 :     }
     633                 :     @endcode
     634                 : 
     635                 :     @see when_any, IoAwaitable
     636                 : */
     637                 : template<typename R>
     638                 : concept IoAwaitableRange =
     639                 :     std::ranges::input_range<R> &&
     640                 :     std::ranges::sized_range<R> &&
     641                 :     IoAwaitable<std::ranges::range_value_t<R>>;
     642                 : 
     643                 : namespace detail {
     644                 : 
     645                 : /** Shared state for homogeneous when_any (range overload).
     646                 : 
     647                 :     Uses composition with when_any_core for shared functionality.
     648                 :     Simpler than heterogeneous: optional<T> instead of variant, vector
     649                 :     instead of array for runner handles.
     650                 : */
     651                 : template<typename T>
     652                 : struct when_any_homogeneous_state
     653                 : {
     654                 :     when_any_core core_;
     655                 :     std::optional<T> result_;
     656                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     657                 : 
     658              19 :     explicit when_any_homogeneous_state(std::size_t count)
     659              19 :         : core_(count)
     660              38 :         , runner_handles_(count)
     661                 :     {
     662              19 :     }
     663                 : 
     664                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     665                 : 
     666                 :     /** @pre core_.try_win() returned true. */
     667              17 :     void set_winner_result(T value)
     668                 :         noexcept(std::is_nothrow_move_constructible_v<T>)
     669                 :     {
     670              17 :         result_.emplace(std::move(value));
     671              17 :     }
     672                 : };
     673                 : 
     674                 : /** Specialization for void tasks (no result storage needed). */
     675                 : template<>
     676                 : struct when_any_homogeneous_state<void>
     677                 : {
     678                 :     when_any_core core_;
     679                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     680                 : 
     681               3 :     explicit when_any_homogeneous_state(std::size_t count)
     682               3 :         : core_(count)
     683               6 :         , runner_handles_(count)
     684                 :     {
     685               3 :     }
     686                 : 
     687                 :     // Runners self-destruct in final_suspend. No destruction needed here.
     688                 : 
     689                 :     // No set_winner_result - void tasks have no result to store
     690                 : };
     691                 : 
     692                 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     693                 : template<IoAwaitableRange Range>
     694                 : class when_any_homogeneous_launcher
     695                 : {
     696                 :     using Awaitable = std::ranges::range_value_t<Range>;
     697                 :     using T = awaitable_result_t<Awaitable>;
     698                 : 
     699                 :     Range* range_;
     700                 :     when_any_homogeneous_state<T>* state_;
     701                 : 
     702                 : public:
     703              22 :     when_any_homogeneous_launcher(
     704                 :         Range* range,
     705                 :         when_any_homogeneous_state<T>* state)
     706              22 :         : range_(range)
     707              22 :         , state_(state)
     708                 :     {
     709              22 :     }
     710                 : 
     711              22 :     bool await_ready() const noexcept
     712                 :     {
     713              22 :         return std::ranges::empty(*range_);
     714                 :     }
     715                 : 
     716                 :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     717                 :         destroys this object before await_suspend returns. Must not reference
     718                 :         `this` after dispatching begins.
     719                 : 
     720                 :         Two-phase approach:
     721                 :         1. Create all runners (safe - no dispatch yet)
     722                 :         2. Dispatch all runners (any may complete synchronously)
     723                 :     */
     724              22 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     725                 :     {
     726              22 :         state_->core_.continuation_ = continuation;
     727              22 :         state_->core_.caller_env_ = caller_env;
     728                 : 
     729              22 :         if(caller_env->stop_token.stop_possible())
     730                 :         {
     731              14 :             state_->core_.parent_stop_callback_.emplace(
     732               7 :                 caller_env->stop_token,
     733               7 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     734                 : 
     735               7 :             if(caller_env->stop_token.stop_requested())
     736               4 :                 state_->core_.stop_source_.request_stop();
     737                 :         }
     738                 : 
     739              22 :         auto token = state_->core_.stop_source_.get_token();
     740                 : 
     741                 :         // Phase 1: Create all runners without dispatching.
     742                 :         // This iterates over *range_ safely because no runners execute yet.
     743              22 :         std::size_t index = 0;
     744             107 :         for(auto&& a : *range_)
     745                 :         {
     746              85 :             auto runner = make_when_any_runner(
     747              85 :                 std::move(a), state_, index);
     748                 : 
     749              85 :             auto h = runner.release();
     750              85 :             h.promise().state_ = state_;
     751              85 :             h.promise().index_ = index;
     752              85 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
     753                 : 
     754              85 :             state_->runner_handles_[index] = std::coroutine_handle<>{h};
     755              85 :             ++index;
     756                 :         }
     757                 : 
     758                 :         // Phase 2: Post all runners. Any may complete synchronously.
     759                 :         // After last post, state_ and this may be destroyed.
     760                 :         // Use raw pointer/count captured before posting.
     761              22 :         std::coroutine_handle<>* handles = state_->runner_handles_.data();
     762              22 :         std::size_t count = state_->runner_handles_.size();
     763             107 :         for(std::size_t i = 0; i < count; ++i)
     764              85 :             caller_env->executor.post(handles[i]);
     765                 : 
     766              44 :         return std::noop_coroutine();
     767             107 :     }
     768                 : 
     769              22 :     void await_resume() const noexcept
     770                 :     {
     771              22 :     }
     772                 : };
     773                 : 
     774                 : } // namespace detail
     775                 : 
     776                 : /** Wait for the first awaitable to complete (range overload).
     777                 : 
     778                 :     Races a range of awaitables with the same result type. Accepts any
     779                 :     sized input range of IoAwaitable types, enabling use with arrays,
     780                 :     spans, or custom containers.
     781                 : 
     782                 :     @par Suspends
     783                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     784                 :     in the range are launched concurrently and execute in parallel. The
     785                 :     coroutine resumes only after all awaitables have completed, even though
     786                 :     the winner is determined by the first to finish.
     787                 : 
     788                 :     @par Completion Conditions
     789                 :     @li Winner is determined when the first awaitable completes (success or exception)
     790                 :     @li Only one task can claim winner status via atomic compare-exchange
     791                 :     @li Once a winner exists, stop is requested for all remaining siblings
     792                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     793                 :     @li The winner's index and result are returned; if the winner threw, the exception is rethrown
     794                 : 
     795                 :     @par Cancellation Semantics
     796                 :     Cancellation is supported via stop_token propagated through the
     797                 :     IoAwaitable protocol:
     798                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     799                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     800                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     801                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     802                 :     @li Stop requests are cooperative; tasks must check and respond to them
     803                 : 
     804                 :     @par Concurrency/Overlap
     805                 :     All awaitables are launched concurrently before any can complete.
     806                 :     The launcher iterates through the range, starting each task on the
     807                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     808                 :     executors or interleave on single-threaded executors. There is no
     809                 :     guaranteed ordering of task completion.
     810                 : 
     811                 :     @par Notable Error Conditions
     812                 :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     813                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     814                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     815                 :     @li Cancellation: tasks may complete via cancellation without throwing
     816                 : 
     817                 :     @par Example
     818                 :     @code
     819                 :     task<void> example() {
     820                 :         std::array<task<Response>, 3> requests = {
     821                 :             fetch_from_server(0),
     822                 :             fetch_from_server(1),
     823                 :             fetch_from_server(2)
     824                 :         };
     825                 : 
     826                 :         auto [index, response] = co_await when_any(std::move(requests));
     827                 :     }
     828                 :     @endcode
     829                 : 
     830                 :     @par Example with Vector
     831                 :     @code
     832                 :     task<Response> fetch_fastest(std::vector<Server> const& servers) {
     833                 :         std::vector<task<Response>> requests;
     834                 :         for (auto const& server : servers)
     835                 :             requests.push_back(fetch_from(server));
     836                 : 
     837                 :         auto [index, response] = co_await when_any(std::move(requests));
     838                 :         co_return response;
     839                 :     }
     840                 :     @endcode
     841                 : 
     842                 :     @tparam R Range type satisfying IoAwaitableRange.
     843                 :     @param awaitables Range of awaitables to race concurrently (must not be empty).
     844                 :     @return A task yielding a pair of (winner_index, result).
     845                 : 
     846                 :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     847                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     848                 : 
     849                 :     @par Remarks
     850                 :     Elements are moved from the range; for lvalue ranges, the original
     851                 :     container will have moved-from elements after this call. The range
     852                 :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     853                 :     the variadic overload, no variant wrapper is needed since all tasks
     854                 :     share the same return type.
     855                 : 
     856                 :     @see when_any, IoAwaitableRange
     857                 : */
     858                 : template<IoAwaitableRange R>
     859                 :     requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
     860              21 : [[nodiscard]] auto when_any(R&& awaitables)
     861                 :     -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
     862                 : {
     863                 :     using Awaitable = std::ranges::range_value_t<R>;
     864                 :     using T = detail::awaitable_result_t<Awaitable>;
     865                 :     using result_type = std::pair<std::size_t, T>;
     866                 :     using OwnedRange = std::remove_cvref_t<R>;
     867                 : 
     868                 :     auto count = std::ranges::size(awaitables);
     869                 :     if(count == 0)
     870                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     871                 : 
     872                 :     // Move/copy range onto coroutine frame to ensure lifetime
     873                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     874                 : 
     875                 :     detail::when_any_homogeneous_state<T> state(count);
     876                 : 
     877                 :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     878                 : 
     879                 :     if(state.core_.winner_exception_)
     880                 :         std::rethrow_exception(state.core_.winner_exception_);
     881                 : 
     882                 :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     883              42 : }
     884                 : 
     885                 : /** Wait for the first awaitable to complete (void range overload).
     886                 : 
     887                 :     Races a range of void-returning awaitables. Since void awaitables have
     888                 :     no result value, only the winner's index is returned.
     889                 : 
     890                 :     @par Suspends
     891                 :     The calling coroutine suspends when co_await is invoked. All awaitables
     892                 :     in the range are launched concurrently and execute in parallel. The
     893                 :     coroutine resumes only after all awaitables have completed, even though
     894                 :     the winner is determined by the first to finish.
     895                 : 
     896                 :     @par Completion Conditions
     897                 :     @li Winner is determined when the first awaitable completes (success or exception)
     898                 :     @li Only one task can claim winner status via atomic compare-exchange
     899                 :     @li Once a winner exists, stop is requested for all remaining siblings
     900                 :     @li Parent coroutine resumes only after all siblings acknowledge completion
     901                 :     @li The winner's index is returned; if the winner threw, the exception is rethrown
     902                 : 
     903                 :     @par Cancellation Semantics
     904                 :     Cancellation is supported via stop_token propagated through the
     905                 :     IoAwaitable protocol:
     906                 :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     907                 :     @li When the parent's stop token is activated, the stop is forwarded to all children
     908                 :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     909                 :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     910                 :     @li Stop requests are cooperative; tasks must check and respond to them
     911                 : 
     912                 :     @par Concurrency/Overlap
     913                 :     All awaitables are launched concurrently before any can complete.
     914                 :     The launcher iterates through the range, starting each task on the
     915                 :     caller's executor. Tasks may execute in parallel on multi-threaded
     916                 :     executors or interleave on single-threaded executors. There is no
     917                 :     guaranteed ordering of task completion.
     918                 : 
     919                 :     @par Notable Error Conditions
     920                 :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     921                 :     @li Winner exception: if the winning task threw, that exception is rethrown
     922                 :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     923                 :     @li Cancellation: tasks may complete via cancellation without throwing
     924                 : 
     925                 :     @par Example
     926                 :     @code
     927                 :     task<void> example() {
     928                 :         std::vector<task<void>> tasks;
     929                 :         for (int i = 0; i < 5; ++i)
     930                 :             tasks.push_back(background_work(i));
     931                 : 
     932                 :         std::size_t winner = co_await when_any(std::move(tasks));
     933                 :         // winner is the index of the first task to complete
     934                 :     }
     935                 :     @endcode
     936                 : 
     937                 :     @par Example with Timeout
     938                 :     @code
     939                 :     task<void> with_timeout() {
     940                 :         std::vector<task<void>> tasks;
     941                 :         tasks.push_back(long_running_operation());
     942                 :         tasks.push_back(delay(std::chrono::seconds(5)));
     943                 : 
     944                 :         std::size_t winner = co_await when_any(std::move(tasks));
     945                 :         if (winner == 1) {
     946                 :             // Timeout occurred
     947                 :         }
     948                 :     }
     949                 :     @endcode
     950                 : 
     951                 :     @tparam R Range type satisfying IoAwaitableRange with void result.
     952                 :     @param awaitables Range of void awaitables to race concurrently (must not be empty).
     953                 :     @return A task yielding the winner's index (zero-based).
     954                 : 
     955                 :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     956                 :     @throws Rethrows the winner's exception if the winning task threw an exception.
     957                 : 
     958                 :     @par Remarks
     959                 :     Elements are moved from the range; for lvalue ranges, the original
     960                 :     container will have moved-from elements after this call. The range
     961                 :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     962                 :     the non-void overload, no result storage is needed since void tasks
     963                 :     produce no value.
     964                 : 
     965                 :     @see when_any, IoAwaitableRange
     966                 : */
     967                 : template<IoAwaitableRange R>
     968                 :     requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
     969               3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
     970                 : {
     971                 :     using OwnedRange = std::remove_cvref_t<R>;
     972                 : 
     973                 :     auto count = std::ranges::size(awaitables);
     974                 :     if(count == 0)
     975                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     976                 : 
     977                 :     // Move/copy range onto coroutine frame to ensure lifetime
     978                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     979                 : 
     980                 :     detail::when_any_homogeneous_state<void> state(count);
     981                 : 
     982                 :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     983                 : 
     984                 :     if(state.core_.winner_exception_)
     985                 :         std::rethrow_exception(state.core_.winner_exception_);
     986                 : 
     987                 :     co_return state.core_.winner_index_;
     988               6 : }
     989                 : 
     990                 : } // namespace capy
     991                 : } // namespace boost
     992                 : 
     993                 : #endif
        

Generated by: LCOV version 2.3