include/boost/capy/when_any.hpp

99.3% Lines (133/134) 93.0% Functions (452/486)
include/boost/capy/when_any.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 #define BOOST_CAPY_WHEN_ANY_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <coroutine>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/task.hpp>
21
22 #include <array>
23 #include <atomic>
24 #include <exception>
25 #include <optional>
26 #include <ranges>
27 #include <stdexcept>
28 #include <stop_token>
29 #include <tuple>
30 #include <type_traits>
31 #include <utility>
32 #include <variant>
33 #include <vector>
34
35 /*
36 when_any - Race multiple tasks, return first completion
37 ========================================================
38
39 OVERVIEW:
40 ---------
41 when_any launches N tasks concurrently and completes when the FIRST task
42 finishes (success or failure). It then requests stop for all siblings and
43 waits for them to acknowledge before returning.
44
45 ARCHITECTURE:
46 -------------
47 The design mirrors when_all but with inverted completion semantics:
48
49 when_all: complete when remaining_count reaches 0 (all done)
50 when_any: complete when has_winner becomes true (first done)
51 BUT still wait for remaining_count to reach 0 for cleanup
52
53 Key components:
54 - when_any_state: Shared state tracking winner and completion
55 - when_any_runner: Wrapper coroutine for each child task
56 - when_any_launcher: Awaitable that starts all runners concurrently
57
58 CRITICAL INVARIANTS:
59 --------------------
60 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 2. All tasks must complete before parent resumes (cleanup safety)
62 3. Stop is requested immediately when winner is determined
63 4. Only the winner's result/exception is stored
64
65 POSITIONAL VARIANT:
66 -------------------
67 The variadic overload returns a std::variant with one alternative per
68 input task, preserving positional correspondence. Use .index() on
69 the variant to identify which task won.
70
71 Example: when_any(task<int>, task<string>, task<int>)
72 - Raw types after void->monostate: int, string, int
73 - Result variant: std::variant<int, string, int>
74 - variant.index() tells you which task won (0, 1, or 2)
75
76 VOID HANDLING:
77 --------------
78 void tasks contribute std::monostate to the variant.
79 All-void tasks result in: variant<monostate, monostate, monostate>
80
81 MEMORY MODEL:
82 -------------
83 Synchronization chain from winner's write to parent's read:
84
85 1. Winner thread writes result_/winner_exception_ (non-atomic)
86 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
87 3. Last task thread (may be winner or non-winner) calls signal_completion()
88 → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
89 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
90 5. Parent coroutine resumes and reads result_/winner_exception_
91
92 Synchronization analysis:
93 - All fetch_sub operations on remaining_count_ form a release sequence
94 - Winner's fetch_sub releases; subsequent fetch_sub operations participate
95 in the modification order of remaining_count_
96 - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
97 modification order, establishing happens-before from winner's writes
98 - Executor dispatch() is expected to provide queue-based synchronization
99 (release-on-post, acquire-on-execute) completing the chain to parent
100 - Even inline executors work (same thread = sequenced-before)
101
102 Alternative considered: Adding winner_ready_ atomic (set with release after
103 storing winner data, acquired before reading) would make synchronization
104 self-contained and not rely on executor implementation details. Current
105 approach is correct but requires careful reasoning about release sequences
106 and executor behavior.
107
108 EXCEPTION SEMANTICS:
109 --------------------
110 Unlike when_all (which captures first exception, discards others), when_any
111 treats exceptions as valid completions. If the winning task threw, that
112 exception is rethrown. Exceptions from non-winners are silently discarded.
113 */
114
115 namespace boost {
116 namespace capy {
117
118 namespace detail {
119
120 /** Convert void to monostate for variant storage.
121
122 std::variant<void, ...> is ill-formed, so void tasks contribute
123 std::monostate to the result variant instead. Non-void types
124 pass through unchanged.
125
126 @tparam T The type to potentially convert (void becomes monostate).
127 */
128 template<typename T>
129 using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
130
131 // Result variant: one alternative per task, preserving positional
132 // correspondence. Use .index() to identify which task won.
133 // void results become monostate.
134 template<typename T0, typename... Ts>
135 using when_any_variant_t = std::variant<void_to_monostate_t<T0>, void_to_monostate_t<Ts>...>;
136
137 /** Core shared state for when_any operations.
138
139 Contains all members and methods common to both heterogeneous (variadic)
140 and homogeneous (range) when_any implementations. State classes embed
141 this via composition to avoid CRTP destructor ordering issues.
142
143 @par Thread Safety
144 Atomic operations protect winner selection and completion count.
145 */
146 struct when_any_core
147 {
148 std::atomic<std::size_t> remaining_count_;
149 std::size_t winner_index_{0};
150 std::exception_ptr winner_exception_;
151 std::stop_source stop_source_;
152
153 // Bridges parent's stop token to our stop_source
154 struct stop_callback_fn
155 {
156 std::stop_source* source_;
157 9 void operator()() const noexcept { source_->request_stop(); }
158 };
159 using stop_callback_t = std::stop_callback<stop_callback_fn>;
160 std::optional<stop_callback_t> parent_stop_callback_;
161
162 std::coroutine_handle<> continuation_;
163 io_env const* caller_env_ = nullptr;
164
165 // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
166 std::atomic<bool> has_winner_{false};
167
168 65 explicit when_any_core(std::size_t count) noexcept
169 65 : remaining_count_(count)
170 {
171 65 }
172
173 /** Atomically claim winner status; exactly one task succeeds. */
174 190 bool try_win(std::size_t index) noexcept
175 {
176 190 bool expected = false;
177 190 if(has_winner_.compare_exchange_strong(
178 expected, true, std::memory_order_acq_rel))
179 {
180 65 winner_index_ = index;
181 65 stop_source_.request_stop();
182 65 return true;
183 }
184 125 return false;
185 }
186
187 /** @pre try_win() returned true. */
188 8 void set_winner_exception(std::exception_ptr ep) noexcept
189 {
190 8 winner_exception_ = ep;
191 8 }
192
193 // Runners signal completion directly via final_suspend; no member function needed.
194 };
195
196 /** Shared state for heterogeneous when_any operation.
197
198 Coordinates winner selection, result storage, and completion tracking
199 for all child tasks in a when_any operation. Uses composition with
200 when_any_core for shared functionality.
201
202 @par Lifetime
203 Allocated on the parent coroutine's frame, outlives all runners.
204
205 @tparam T0 First task's result type.
206 @tparam Ts Remaining tasks' result types.
207 */
208 template<typename T0, typename... Ts>
209 struct when_any_state
210 {
211 static constexpr std::size_t task_count = 1 + sizeof...(Ts);
212 using variant_type = when_any_variant_t<T0, Ts...>;
213
214 when_any_core core_;
215 std::optional<variant_type> result_;
216 std::array<std::coroutine_handle<>, task_count> runner_handles_{};
217
218 43 when_any_state()
219 43 : core_(task_count)
220 {
221 43 }
222
223 // Runners self-destruct in final_suspend. No destruction needed here.
224
225 /** @pre core_.try_win() returned true.
226 @note Uses in_place_index (not type) for positional variant access.
227 */
228 template<std::size_t I, typename T>
229 35 void set_winner_result(T value)
230 noexcept(std::is_nothrow_move_constructible_v<T>)
231 {
232 35 result_.emplace(std::in_place_index<I>, std::move(value));
233 35 }
234
235 /** @pre core_.try_win() returned true. */
236 template<std::size_t I>
237 3 void set_winner_void() noexcept
238 {
239 3 result_.emplace(std::in_place_index<I>, std::monostate{});
240 3 }
241 };
242
243 /** Wrapper coroutine that runs a single child task for when_any.
244
245 Propagates executor/stop_token to the child, attempts to claim winner
246 status on completion, and signals completion for cleanup coordination.
247
248 @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
249 */
250 template<typename StateType>
251 struct when_any_runner
252 {
253 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
254 {
255 StateType* state_ = nullptr;
256 std::size_t index_ = 0;
257 io_env env_;
258
259 190 when_any_runner get_return_object() noexcept
260 {
261 190 return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
262 }
263
264 // Starts suspended; launcher sets up state/ex/token then resumes
265 190 std::suspend_always initial_suspend() noexcept
266 {
267 190 return {};
268 }
269
270 190 auto final_suspend() noexcept
271 {
272 struct awaiter
273 {
274 promise_type* p_;
275 bool await_ready() const noexcept { return false; }
276 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
277 {
278 // Extract everything needed before self-destruction.
279 auto& core = p_->state_->core_;
280 auto* counter = &core.remaining_count_;
281 auto* caller_env = core.caller_env_;
282 auto cont = core.continuation_;
283
284 h.destroy();
285
286 // If last runner, dispatch parent for symmetric transfer.
287 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
288 if(remaining == 1)
289 return caller_env->executor.dispatch(cont);
290 return std::noop_coroutine();
291 }
292 void await_resume() const noexcept {}
293 };
294 190 return awaiter{this};
295 }
296
297 178 void return_void() noexcept {}
298
299 // Exceptions are valid completions in when_any (unlike when_all)
300 12 void unhandled_exception()
301 {
302 12 if(state_->core_.try_win(index_))
303 8 state_->core_.set_winner_exception(std::current_exception());
304 12 }
305
306 /** Injects executor and stop token into child awaitables. */
307 template<class Awaitable>
308 struct transform_awaiter
309 {
310 std::decay_t<Awaitable> a_;
311 promise_type* p_;
312
313 190 bool await_ready() { return a_.await_ready(); }
314 190 auto await_resume() { return a_.await_resume(); }
315
316 template<class Promise>
317 185 auto await_suspend(std::coroutine_handle<Promise> h)
318 {
319 #ifdef _MSC_VER
320 using R = decltype(a_.await_suspend(h, &p_->env_));
321 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
322 a_.await_suspend(h, &p_->env_).resume();
323 else
324 return a_.await_suspend(h, &p_->env_);
325 #else
326 185 return a_.await_suspend(h, &p_->env_);
327 #endif
328 }
329 };
330
331 template<class Awaitable>
332 190 auto await_transform(Awaitable&& a)
333 {
334 using A = std::decay_t<Awaitable>;
335 if constexpr (IoAwaitable<A>)
336 {
337 return transform_awaiter<Awaitable>{
338 380 std::forward<Awaitable>(a), this};
339 }
340 else
341 {
342 static_assert(sizeof(A) == 0, "requires IoAwaitable");
343 }
344 190 }
345 };
346
347 std::coroutine_handle<promise_type> h_;
348
349 190 explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
350 190 : h_(h)
351 {
352 190 }
353
354 // Enable move for all clang versions - some versions need it
355 when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
356
357 // Non-copyable
358 when_any_runner(when_any_runner const&) = delete;
359 when_any_runner& operator=(when_any_runner const&) = delete;
360 when_any_runner& operator=(when_any_runner&&) = delete;
361
362 190 auto release() noexcept
363 {
364 190 return std::exchange(h_, nullptr);
365 }
366 };
367
368 /** Indexed overload for heterogeneous when_any (compile-time index).
369
370 Uses compile-time index I for variant construction via in_place_index.
371 Called from when_any_launcher::launch_one<I>().
372 */
373 template<std::size_t I, IoAwaitable Awaitable, typename StateType>
374 when_any_runner<StateType>
375 105 make_when_any_runner(Awaitable inner, StateType* state)
376 {
377 using T = awaitable_result_t<Awaitable>;
378 if constexpr (std::is_void_v<T>)
379 {
380 co_await std::move(inner);
381 if(state->core_.try_win(I))
382 state->template set_winner_void<I>();
383 }
384 else
385 {
386 auto result = co_await std::move(inner);
387 if(state->core_.try_win(I))
388 {
389 try
390 {
391 state->template set_winner_result<I>(std::move(result));
392 }
393 catch(...)
394 {
395 state->core_.set_winner_exception(std::current_exception());
396 }
397 }
398 }
399 210 }
400
401 /** Runtime-index overload for homogeneous when_any (range path).
402
403 Uses requires-expressions to detect state capabilities:
404 - set_winner_void(): for heterogeneous void tasks (stores monostate)
405 - set_winner_result(): for non-void tasks
406 - Neither: for homogeneous void tasks (no result storage)
407 */
408 template<IoAwaitable Awaitable, typename StateType>
409 when_any_runner<StateType>
410 85 make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
411 {
412 using T = awaitable_result_t<Awaitable>;
413 if constexpr (std::is_void_v<T>)
414 {
415 co_await std::move(inner);
416 if(state->core_.try_win(index))
417 {
418 if constexpr (requires { state->set_winner_void(); })
419 state->set_winner_void();
420 }
421 }
422 else
423 {
424 auto result = co_await std::move(inner);
425 if(state->core_.try_win(index))
426 {
427 try
428 {
429 state->set_winner_result(std::move(result));
430 }
431 catch(...)
432 {
433 state->core_.set_winner_exception(std::current_exception());
434 }
435 }
436 }
437 170 }
438
439 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
440 template<IoAwaitable... Awaitables>
441 class when_any_launcher
442 {
443 using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
444
445 std::tuple<Awaitables...>* tasks_;
446 state_type* state_;
447
448 public:
449 43 when_any_launcher(
450 std::tuple<Awaitables...>* tasks,
451 state_type* state)
452 43 : tasks_(tasks)
453 43 , state_(state)
454 {
455 43 }
456
457 43 bool await_ready() const noexcept
458 {
459 43 return sizeof...(Awaitables) == 0;
460 }
461
462 /** CRITICAL: If the last task finishes synchronously, parent resumes and
463 destroys this object before await_suspend returns. Must not reference
464 `this` after the final launch_one call.
465 */
466 43 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
467 {
468 43 state_->core_.continuation_ = continuation;
469 43 state_->core_.caller_env_ = caller_env;
470
471 43 if(caller_env->stop_token.stop_possible())
472 {
473 18 state_->core_.parent_stop_callback_.emplace(
474 9 caller_env->stop_token,
475 9 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
476
477 9 if(caller_env->stop_token.stop_requested())
478 3 state_->core_.stop_source_.request_stop();
479 }
480
481 43 auto token = state_->core_.stop_source_.get_token();
482 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
483 (..., launch_one<Is>(caller_env->executor, token));
484 43 }(std::index_sequence_for<Awaitables...>{});
485
486 86 return std::noop_coroutine();
487 43 }
488
489 43 void await_resume() const noexcept
490 {
491 43 }
492
493 private:
494 /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
495 template<std::size_t I>
496 105 void launch_one(executor_ref caller_ex, std::stop_token token)
497 {
498 105 auto runner = make_when_any_runner<I>(
499 105 std::move(std::get<I>(*tasks_)), state_);
500
501 105 auto h = runner.release();
502 105 h.promise().state_ = state_;
503 105 h.promise().index_ = I;
504 105 h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
505
506 105 std::coroutine_handle<> ch{h};
507 105 state_->runner_handles_[I] = ch;
508 105 caller_ex.post(ch);
509 210 }
510 };
511
512 } // namespace detail
513
514 /** Wait for the first awaitable to complete.
515
516 Races multiple heterogeneous awaitables concurrently and returns when the
517 first one completes. The result is a variant with one alternative per
518 input task, preserving positional correspondence.
519
520 @par Suspends
521 The calling coroutine suspends when co_await is invoked. All awaitables
522 are launched concurrently and execute in parallel. The coroutine resumes
523 only after all awaitables have completed, even though the winner is
524 determined by the first to finish.
525
526 @par Completion Conditions
527 @li Winner is determined when the first awaitable completes (success or exception)
528 @li Only one task can claim winner status via atomic compare-exchange
529 @li Once a winner exists, stop is requested for all remaining siblings
530 @li Parent coroutine resumes only after all siblings acknowledge completion
531 @li The winner's result is returned; if the winner threw, the exception is rethrown
532
533 @par Cancellation Semantics
534 Cancellation is supported via stop_token propagated through the
535 IoAwaitable protocol:
536 @li Each child awaitable receives a stop_token derived from a shared stop_source
537 @li When the parent's stop token is activated, the stop is forwarded to all children
538 @li When a winner is determined, stop_source_.request_stop() is called immediately
539 @li Siblings must handle cancellation gracefully and complete before parent resumes
540 @li Stop requests are cooperative; tasks must check and respond to them
541
542 @par Concurrency/Overlap
543 All awaitables are launched concurrently before any can complete.
544 The launcher iterates through the arguments, starting each task on the
545 caller's executor. Tasks may execute in parallel on multi-threaded
546 executors or interleave on single-threaded executors. There is no
547 guaranteed ordering of task completion.
548
549 @par Notable Error Conditions
550 @li Winner exception: if the winning task threw, that exception is rethrown
551 @li Non-winner exceptions: silently discarded (only winner's result matters)
552 @li Cancellation: tasks may complete via cancellation without throwing
553
554 @par Example
555 @code
556 task<void> example() {
557 auto result = co_await when_any(
558 fetch_int(), // task<int>
559 fetch_string() // task<std::string>
560 );
561 // result.index() is 0 or 1
562 if (result.index() == 0)
563 std::cout << "Got int: " << std::get<0>(result) << "\n";
564 else
565 std::cout << "Got string: " << std::get<1>(result) << "\n";
566 }
567 @endcode
568
569 @tparam A0 First awaitable type (must satisfy IoAwaitable).
570 @tparam As Remaining awaitable types (must satisfy IoAwaitable).
571 @param a0 The first awaitable to race.
572 @param as Additional awaitables to race concurrently.
573 @return A task yielding a variant with one alternative per awaitable.
574 Use .index() to identify the winner. Void awaitables contribute
575 std::monostate.
576
577 @throws Rethrows the winner's exception if the winning task threw an exception.
578
579 @par Remarks
580 Awaitables are moved into the coroutine frame; original objects become
581 empty after the call. The variant preserves one alternative per input
582 task. Use .index() to determine which awaitable completed first.
583 Void awaitables contribute std::monostate to the variant.
584
585 @see when_all, IoAwaitable
586 */
587 template<IoAwaitable A0, IoAwaitable... As>
588 43 [[nodiscard]] auto when_any(A0 a0, As... as)
589 -> task<detail::when_any_variant_t<
590 detail::awaitable_result_t<A0>,
591 detail::awaitable_result_t<As>...>>
592 {
593 detail::when_any_state<
594 detail::awaitable_result_t<A0>,
595 detail::awaitable_result_t<As>...> state;
596 std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
597
598 co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
599
600 if(state.core_.winner_exception_)
601 std::rethrow_exception(state.core_.winner_exception_);
602
603 co_return std::move(*state.result_);
604 86 }
605
606 /** Concept for ranges of full I/O awaitables.
607
608 A range satisfies `IoAwaitableRange` if it is a sized input range
609 whose value type satisfies @ref IoAwaitable. This enables when_any
610 to accept any container or view of awaitables, not just std::vector.
611
612 @tparam R The range type.
613
614 @par Requirements
615 @li `R` must satisfy `std::ranges::input_range`
616 @li `R` must satisfy `std::ranges::sized_range`
617 @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
618
619 @par Syntactic Requirements
620 Given `r` of type `R`:
621 @li `std::ranges::begin(r)` is valid
622 @li `std::ranges::end(r)` is valid
623 @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
624 @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
625
626 @par Example
627 @code
628 template<IoAwaitableRange R>
629 task<void> race_all(R&& awaitables) {
630 auto winner = co_await when_any(std::forward<R>(awaitables));
631 // Process winner...
632 }
633 @endcode
634
635 @see when_any, IoAwaitable
636 */
637 template<typename R>
638 concept IoAwaitableRange =
639 std::ranges::input_range<R> &&
640 std::ranges::sized_range<R> &&
641 IoAwaitable<std::ranges::range_value_t<R>>;
642
643 namespace detail {
644
645 /** Shared state for homogeneous when_any (range overload).
646
647 Uses composition with when_any_core for shared functionality.
648 Simpler than heterogeneous: optional<T> instead of variant, vector
649 instead of array for runner handles.
650 */
651 template<typename T>
652 struct when_any_homogeneous_state
653 {
654 when_any_core core_;
655 std::optional<T> result_;
656 std::vector<std::coroutine_handle<>> runner_handles_;
657
658 19 explicit when_any_homogeneous_state(std::size_t count)
659 19 : core_(count)
660 38 , runner_handles_(count)
661 {
662 19 }
663
664 // Runners self-destruct in final_suspend. No destruction needed here.
665
666 /** @pre core_.try_win() returned true. */
667 17 void set_winner_result(T value)
668 noexcept(std::is_nothrow_move_constructible_v<T>)
669 {
670 17 result_.emplace(std::move(value));
671 17 }
672 };
673
674 /** Specialization for void tasks (no result storage needed). */
675 template<>
676 struct when_any_homogeneous_state<void>
677 {
678 when_any_core core_;
679 std::vector<std::coroutine_handle<>> runner_handles_;
680
681 3 explicit when_any_homogeneous_state(std::size_t count)
682 3 : core_(count)
683 6 , runner_handles_(count)
684 {
685 3 }
686
687 // Runners self-destruct in final_suspend. No destruction needed here.
688
689 // No set_winner_result - void tasks have no result to store
690 };
691
692 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
693 template<IoAwaitableRange Range>
694 class when_any_homogeneous_launcher
695 {
696 using Awaitable = std::ranges::range_value_t<Range>;
697 using T = awaitable_result_t<Awaitable>;
698
699 Range* range_;
700 when_any_homogeneous_state<T>* state_;
701
702 public:
703 22 when_any_homogeneous_launcher(
704 Range* range,
705 when_any_homogeneous_state<T>* state)
706 22 : range_(range)
707 22 , state_(state)
708 {
709 22 }
710
711 22 bool await_ready() const noexcept
712 {
713 22 return std::ranges::empty(*range_);
714 }
715
716 /** CRITICAL: If the last task finishes synchronously, parent resumes and
717 destroys this object before await_suspend returns. Must not reference
718 `this` after dispatching begins.
719
720 Two-phase approach:
721 1. Create all runners (safe - no dispatch yet)
722 2. Dispatch all runners (any may complete synchronously)
723 */
724 22 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
725 {
726 22 state_->core_.continuation_ = continuation;
727 22 state_->core_.caller_env_ = caller_env;
728
729 22 if(caller_env->stop_token.stop_possible())
730 {
731 14 state_->core_.parent_stop_callback_.emplace(
732 7 caller_env->stop_token,
733 7 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
734
735 7 if(caller_env->stop_token.stop_requested())
736 4 state_->core_.stop_source_.request_stop();
737 }
738
739 22 auto token = state_->core_.stop_source_.get_token();
740
741 // Phase 1: Create all runners without dispatching.
742 // This iterates over *range_ safely because no runners execute yet.
743 22 std::size_t index = 0;
744 107 for(auto&& a : *range_)
745 {
746 85 auto runner = make_when_any_runner(
747 85 std::move(a), state_, index);
748
749 85 auto h = runner.release();
750 85 h.promise().state_ = state_;
751 85 h.promise().index_ = index;
752 85 h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
753
754 85 state_->runner_handles_[index] = std::coroutine_handle<>{h};
755 85 ++index;
756 }
757
758 // Phase 2: Post all runners. Any may complete synchronously.
759 // After last post, state_ and this may be destroyed.
760 // Use raw pointer/count captured before posting.
761 22 std::coroutine_handle<>* handles = state_->runner_handles_.data();
762 22 std::size_t count = state_->runner_handles_.size();
763 107 for(std::size_t i = 0; i < count; ++i)
764 85 caller_env->executor.post(handles[i]);
765
766 44 return std::noop_coroutine();
767 107 }
768
769 22 void await_resume() const noexcept
770 {
771 22 }
772 };
773
774 } // namespace detail
775
776 /** Wait for the first awaitable to complete (range overload).
777
778 Races a range of awaitables with the same result type. Accepts any
779 sized input range of IoAwaitable types, enabling use with arrays,
780 spans, or custom containers.
781
782 @par Suspends
783 The calling coroutine suspends when co_await is invoked. All awaitables
784 in the range are launched concurrently and execute in parallel. The
785 coroutine resumes only after all awaitables have completed, even though
786 the winner is determined by the first to finish.
787
788 @par Completion Conditions
789 @li Winner is determined when the first awaitable completes (success or exception)
790 @li Only one task can claim winner status via atomic compare-exchange
791 @li Once a winner exists, stop is requested for all remaining siblings
792 @li Parent coroutine resumes only after all siblings acknowledge completion
793 @li The winner's index and result are returned; if the winner threw, the exception is rethrown
794
795 @par Cancellation Semantics
796 Cancellation is supported via stop_token propagated through the
797 IoAwaitable protocol:
798 @li Each child awaitable receives a stop_token derived from a shared stop_source
799 @li When the parent's stop token is activated, the stop is forwarded to all children
800 @li When a winner is determined, stop_source_.request_stop() is called immediately
801 @li Siblings must handle cancellation gracefully and complete before parent resumes
802 @li Stop requests are cooperative; tasks must check and respond to them
803
804 @par Concurrency/Overlap
805 All awaitables are launched concurrently before any can complete.
806 The launcher iterates through the range, starting each task on the
807 caller's executor. Tasks may execute in parallel on multi-threaded
808 executors or interleave on single-threaded executors. There is no
809 guaranteed ordering of task completion.
810
811 @par Notable Error Conditions
812 @li Empty range: throws std::invalid_argument immediately (not via co_return)
813 @li Winner exception: if the winning task threw, that exception is rethrown
814 @li Non-winner exceptions: silently discarded (only winner's result matters)
815 @li Cancellation: tasks may complete via cancellation without throwing
816
817 @par Example
818 @code
819 task<void> example() {
820 std::array<task<Response>, 3> requests = {
821 fetch_from_server(0),
822 fetch_from_server(1),
823 fetch_from_server(2)
824 };
825
826 auto [index, response] = co_await when_any(std::move(requests));
827 }
828 @endcode
829
830 @par Example with Vector
831 @code
832 task<Response> fetch_fastest(std::vector<Server> const& servers) {
833 std::vector<task<Response>> requests;
834 for (auto const& server : servers)
835 requests.push_back(fetch_from(server));
836
837 auto [index, response] = co_await when_any(std::move(requests));
838 co_return response;
839 }
840 @endcode
841
842 @tparam R Range type satisfying IoAwaitableRange.
843 @param awaitables Range of awaitables to race concurrently (must not be empty).
844 @return A task yielding a pair of (winner_index, result).
845
846 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
847 @throws Rethrows the winner's exception if the winning task threw an exception.
848
849 @par Remarks
850 Elements are moved from the range; for lvalue ranges, the original
851 container will have moved-from elements after this call. The range
852 is moved onto the coroutine frame to ensure lifetime safety. Unlike
853 the variadic overload, no variant wrapper is needed since all tasks
854 share the same return type.
855
856 @see when_any, IoAwaitableRange
857 */
858 template<IoAwaitableRange R>
859 requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
860 21 [[nodiscard]] auto when_any(R&& awaitables)
861 -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
862 {
863 using Awaitable = std::ranges::range_value_t<R>;
864 using T = detail::awaitable_result_t<Awaitable>;
865 using result_type = std::pair<std::size_t, T>;
866 using OwnedRange = std::remove_cvref_t<R>;
867
868 auto count = std::ranges::size(awaitables);
869 if(count == 0)
870 throw std::invalid_argument("when_any requires at least one awaitable");
871
872 // Move/copy range onto coroutine frame to ensure lifetime
873 OwnedRange owned_awaitables = std::forward<R>(awaitables);
874
875 detail::when_any_homogeneous_state<T> state(count);
876
877 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
878
879 if(state.core_.winner_exception_)
880 std::rethrow_exception(state.core_.winner_exception_);
881
882 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
883 42 }
884
885 /** Wait for the first awaitable to complete (void range overload).
886
887 Races a range of void-returning awaitables. Since void awaitables have
888 no result value, only the winner's index is returned.
889
890 @par Suspends
891 The calling coroutine suspends when co_await is invoked. All awaitables
892 in the range are launched concurrently and execute in parallel. The
893 coroutine resumes only after all awaitables have completed, even though
894 the winner is determined by the first to finish.
895
896 @par Completion Conditions
897 @li Winner is determined when the first awaitable completes (success or exception)
898 @li Only one task can claim winner status via atomic compare-exchange
899 @li Once a winner exists, stop is requested for all remaining siblings
900 @li Parent coroutine resumes only after all siblings acknowledge completion
901 @li The winner's index is returned; if the winner threw, the exception is rethrown
902
903 @par Cancellation Semantics
904 Cancellation is supported via stop_token propagated through the
905 IoAwaitable protocol:
906 @li Each child awaitable receives a stop_token derived from a shared stop_source
907 @li When the parent's stop token is activated, the stop is forwarded to all children
908 @li When a winner is determined, stop_source_.request_stop() is called immediately
909 @li Siblings must handle cancellation gracefully and complete before parent resumes
910 @li Stop requests are cooperative; tasks must check and respond to them
911
912 @par Concurrency/Overlap
913 All awaitables are launched concurrently before any can complete.
914 The launcher iterates through the range, starting each task on the
915 caller's executor. Tasks may execute in parallel on multi-threaded
916 executors or interleave on single-threaded executors. There is no
917 guaranteed ordering of task completion.
918
919 @par Notable Error Conditions
920 @li Empty range: throws std::invalid_argument immediately (not via co_return)
921 @li Winner exception: if the winning task threw, that exception is rethrown
922 @li Non-winner exceptions: silently discarded (only winner's result matters)
923 @li Cancellation: tasks may complete via cancellation without throwing
924
925 @par Example
926 @code
927 task<void> example() {
928 std::vector<task<void>> tasks;
929 for (int i = 0; i < 5; ++i)
930 tasks.push_back(background_work(i));
931
932 std::size_t winner = co_await when_any(std::move(tasks));
933 // winner is the index of the first task to complete
934 }
935 @endcode
936
937 @par Example with Timeout
938 @code
939 task<void> with_timeout() {
940 std::vector<task<void>> tasks;
941 tasks.push_back(long_running_operation());
942 tasks.push_back(delay(std::chrono::seconds(5)));
943
944 std::size_t winner = co_await when_any(std::move(tasks));
945 if (winner == 1) {
946 // Timeout occurred
947 }
948 }
949 @endcode
950
951 @tparam R Range type satisfying IoAwaitableRange with void result.
952 @param awaitables Range of void awaitables to race concurrently (must not be empty).
953 @return A task yielding the winner's index (zero-based).
954
955 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
956 @throws Rethrows the winner's exception if the winning task threw an exception.
957
958 @par Remarks
959 Elements are moved from the range; for lvalue ranges, the original
960 container will have moved-from elements after this call. The range
961 is moved onto the coroutine frame to ensure lifetime safety. Unlike
962 the non-void overload, no result storage is needed since void tasks
963 produce no value.
964
965 @see when_any, IoAwaitableRange
966 */
967 template<IoAwaitableRange R>
968 requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
969 3 [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
970 {
971 using OwnedRange = std::remove_cvref_t<R>;
972
973 auto count = std::ranges::size(awaitables);
974 if(count == 0)
975 throw std::invalid_argument("when_any requires at least one awaitable");
976
977 // Move/copy range onto coroutine frame to ensure lifetime
978 OwnedRange owned_awaitables = std::forward<R>(awaitables);
979
980 detail::when_any_homogeneous_state<void> state(count);
981
982 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
983
984 if(state.core_.winner_exception_)
985 std::rethrow_exception(state.core_.winner_exception_);
986
987 co_return state.core_.winner_index_;
988 6 }
989
990 } // namespace capy
991 } // namespace boost
992
993 #endif
994