TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/concept/executor.hpp>
15 : #include <boost/capy/concept/io_awaitable.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/frame_allocator.hpp>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/task.hpp>
21 :
22 : #include <array>
23 : #include <atomic>
24 : #include <exception>
25 : #include <optional>
26 : #include <ranges>
27 : #include <stdexcept>
28 : #include <stop_token>
29 : #include <tuple>
30 : #include <type_traits>
31 : #include <utility>
32 : #include <variant>
33 : #include <vector>
34 :
35 : /*
36 : when_any - Race multiple tasks, return first completion
37 : ========================================================
38 :
39 : OVERVIEW:
40 : ---------
41 : when_any launches N tasks concurrently and completes when the FIRST task
42 : finishes (success or failure). It then requests stop for all siblings and
43 : waits for them to acknowledge before returning.
44 :
45 : ARCHITECTURE:
46 : -------------
47 : The design mirrors when_all but with inverted completion semantics:
48 :
49 : when_all: complete when remaining_count reaches 0 (all done)
50 : when_any: complete when has_winner becomes true (first done)
51 : BUT still wait for remaining_count to reach 0 for cleanup
52 :
53 : Key components:
54 : - when_any_state: Shared state tracking winner and completion
55 : - when_any_runner: Wrapper coroutine for each child task
56 : - when_any_launcher: Awaitable that starts all runners concurrently
57 :
58 : CRITICAL INVARIANTS:
59 : --------------------
60 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 : 2. All tasks must complete before parent resumes (cleanup safety)
62 : 3. Stop is requested immediately when winner is determined
63 : 4. Only the winner's result/exception is stored
64 :
65 : POSITIONAL VARIANT:
66 : -------------------
67 : The variadic overload returns a std::variant with one alternative per
68 : input task, preserving positional correspondence. Use .index() on
69 : the variant to identify which task won.
70 :
71 : Example: when_any(task<int>, task<string>, task<int>)
72 : - Raw types after void->monostate: int, string, int
73 : - Result variant: std::variant<int, string, int>
74 : - variant.index() tells you which task won (0, 1, or 2)
75 :
76 : VOID HANDLING:
77 : --------------
78 : void tasks contribute std::monostate to the variant.
79 : All-void tasks result in: variant<monostate, monostate, monostate>
80 :
81 : MEMORY MODEL:
82 : -------------
83 : Synchronization chain from winner's write to parent's read:
84 :
85 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
86 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
87 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
88 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
89 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
90 : 5. Parent coroutine resumes and reads result_/winner_exception_
91 :
92 : Synchronization analysis:
93 : - All fetch_sub operations on remaining_count_ form a release sequence
94 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
95 : in the modification order of remaining_count_
96 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
97 : modification order, establishing happens-before from winner's writes
98 : - Executor dispatch() is expected to provide queue-based synchronization
99 : (release-on-post, acquire-on-execute) completing the chain to parent
100 : - Even inline executors work (same thread = sequenced-before)
101 :
102 : Alternative considered: Adding winner_ready_ atomic (set with release after
103 : storing winner data, acquired before reading) would make synchronization
104 : self-contained and not rely on executor implementation details. Current
105 : approach is correct but requires careful reasoning about release sequences
106 : and executor behavior.
107 :
108 : EXCEPTION SEMANTICS:
109 : --------------------
110 : Unlike when_all (which captures first exception, discards others), when_any
111 : treats exceptions as valid completions. If the winning task threw, that
112 : exception is rethrown. Exceptions from non-winners are silently discarded.
113 : */
114 :
115 : namespace boost {
116 : namespace capy {
117 :
118 : namespace detail {
119 :
120 : /** Convert void to monostate for variant storage.
121 :
122 : std::variant<void, ...> is ill-formed, so void tasks contribute
123 : std::monostate to the result variant instead. Non-void types
124 : pass through unchanged.
125 :
126 : @tparam T The type to potentially convert (void becomes monostate).
127 : */
128 : template<typename T>
129 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
130 :
131 : // Result variant: one alternative per task, preserving positional
132 : // correspondence. Use .index() to identify which task won.
133 : // void results become monostate.
134 : template<typename T0, typename... Ts>
135 : using when_any_variant_t = std::variant<void_to_monostate_t<T0>, void_to_monostate_t<Ts>...>;
136 :
137 : /** Core shared state for when_any operations.
138 :
139 : Contains all members and methods common to both heterogeneous (variadic)
140 : and homogeneous (range) when_any implementations. State classes embed
141 : this via composition to avoid CRTP destructor ordering issues.
142 :
143 : @par Thread Safety
144 : Atomic operations protect winner selection and completion count.
145 : */
146 : struct when_any_core
147 : {
148 : std::atomic<std::size_t> remaining_count_;
149 : std::size_t winner_index_{0};
150 : std::exception_ptr winner_exception_;
151 : std::stop_source stop_source_;
152 :
153 : // Bridges parent's stop token to our stop_source
154 : struct stop_callback_fn
155 : {
156 : std::stop_source* source_;
157 HIT 9 : void operator()() const noexcept { source_->request_stop(); }
158 : };
159 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
160 : std::optional<stop_callback_t> parent_stop_callback_;
161 :
162 : std::coroutine_handle<> continuation_;
163 : io_env const* caller_env_ = nullptr;
164 :
165 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
166 : std::atomic<bool> has_winner_{false};
167 :
168 65 : explicit when_any_core(std::size_t count) noexcept
169 65 : : remaining_count_(count)
170 : {
171 65 : }
172 :
173 : /** Atomically claim winner status; exactly one task succeeds. */
174 190 : bool try_win(std::size_t index) noexcept
175 : {
176 190 : bool expected = false;
177 190 : if(has_winner_.compare_exchange_strong(
178 : expected, true, std::memory_order_acq_rel))
179 : {
180 65 : winner_index_ = index;
181 65 : stop_source_.request_stop();
182 65 : return true;
183 : }
184 125 : return false;
185 : }
186 :
187 : /** @pre try_win() returned true. */
188 8 : void set_winner_exception(std::exception_ptr ep) noexcept
189 : {
190 8 : winner_exception_ = ep;
191 8 : }
192 :
193 : // Runners signal completion directly via final_suspend; no member function needed.
194 : };
195 :
196 : /** Shared state for heterogeneous when_any operation.
197 :
198 : Coordinates winner selection, result storage, and completion tracking
199 : for all child tasks in a when_any operation. Uses composition with
200 : when_any_core for shared functionality.
201 :
202 : @par Lifetime
203 : Allocated on the parent coroutine's frame, outlives all runners.
204 :
205 : @tparam T0 First task's result type.
206 : @tparam Ts Remaining tasks' result types.
207 : */
208 : template<typename T0, typename... Ts>
209 : struct when_any_state
210 : {
211 : static constexpr std::size_t task_count = 1 + sizeof...(Ts);
212 : using variant_type = when_any_variant_t<T0, Ts...>;
213 :
214 : when_any_core core_;
215 : std::optional<variant_type> result_;
216 : std::array<std::coroutine_handle<>, task_count> runner_handles_{};
217 :
218 43 : when_any_state()
219 43 : : core_(task_count)
220 : {
221 43 : }
222 :
223 : // Runners self-destruct in final_suspend. No destruction needed here.
224 :
225 : /** @pre core_.try_win() returned true.
226 : @note Uses in_place_index (not type) for positional variant access.
227 : */
228 : template<std::size_t I, typename T>
229 35 : void set_winner_result(T value)
230 : noexcept(std::is_nothrow_move_constructible_v<T>)
231 : {
232 35 : result_.emplace(std::in_place_index<I>, std::move(value));
233 35 : }
234 :
235 : /** @pre core_.try_win() returned true. */
236 : template<std::size_t I>
237 3 : void set_winner_void() noexcept
238 : {
239 3 : result_.emplace(std::in_place_index<I>, std::monostate{});
240 3 : }
241 : };
242 :
243 : /** Wrapper coroutine that runs a single child task for when_any.
244 :
245 : Propagates executor/stop_token to the child, attempts to claim winner
246 : status on completion, and signals completion for cleanup coordination.
247 :
248 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
249 : */
250 : template<typename StateType>
251 : struct when_any_runner
252 : {
253 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
254 : {
255 : StateType* state_ = nullptr;
256 : std::size_t index_ = 0;
257 : io_env env_;
258 :
259 190 : when_any_runner get_return_object() noexcept
260 : {
261 190 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
262 : }
263 :
264 : // Starts suspended; launcher sets up state/ex/token then resumes
265 190 : std::suspend_always initial_suspend() noexcept
266 : {
267 190 : return {};
268 : }
269 :
270 190 : auto final_suspend() noexcept
271 : {
272 : struct awaiter
273 : {
274 : promise_type* p_;
275 190 : bool await_ready() const noexcept { return false; }
276 190 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
277 : {
278 : // Extract everything needed before self-destruction.
279 190 : auto& core = p_->state_->core_;
280 190 : auto* counter = &core.remaining_count_;
281 190 : auto* caller_env = core.caller_env_;
282 190 : auto cont = core.continuation_;
283 :
284 190 : h.destroy();
285 :
286 : // If last runner, dispatch parent for symmetric transfer.
287 190 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
288 190 : if(remaining == 1)
289 65 : return caller_env->executor.dispatch(cont);
290 125 : return std::noop_coroutine();
291 : }
292 MIS 0 : void await_resume() const noexcept {}
293 : };
294 HIT 190 : return awaiter{this};
295 : }
296 :
297 178 : void return_void() noexcept {}
298 :
299 : // Exceptions are valid completions in when_any (unlike when_all)
300 12 : void unhandled_exception()
301 : {
302 12 : if(state_->core_.try_win(index_))
303 8 : state_->core_.set_winner_exception(std::current_exception());
304 12 : }
305 :
306 : /** Injects executor and stop token into child awaitables. */
307 : template<class Awaitable>
308 : struct transform_awaiter
309 : {
310 : std::decay_t<Awaitable> a_;
311 : promise_type* p_;
312 :
313 190 : bool await_ready() { return a_.await_ready(); }
314 190 : auto await_resume() { return a_.await_resume(); }
315 :
316 : template<class Promise>
317 185 : auto await_suspend(std::coroutine_handle<Promise> h)
318 : {
319 : #ifdef _MSC_VER
320 : using R = decltype(a_.await_suspend(h, &p_->env_));
321 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
322 : a_.await_suspend(h, &p_->env_).resume();
323 : else
324 : return a_.await_suspend(h, &p_->env_);
325 : #else
326 185 : return a_.await_suspend(h, &p_->env_);
327 : #endif
328 : }
329 : };
330 :
331 : template<class Awaitable>
332 190 : auto await_transform(Awaitable&& a)
333 : {
334 : using A = std::decay_t<Awaitable>;
335 : if constexpr (IoAwaitable<A>)
336 : {
337 : return transform_awaiter<Awaitable>{
338 380 : std::forward<Awaitable>(a), this};
339 : }
340 : else
341 : {
342 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
343 : }
344 190 : }
345 : };
346 :
347 : std::coroutine_handle<promise_type> h_;
348 :
349 190 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
350 190 : : h_(h)
351 : {
352 190 : }
353 :
354 : // Enable move for all clang versions - some versions need it
355 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
356 :
357 : // Non-copyable
358 : when_any_runner(when_any_runner const&) = delete;
359 : when_any_runner& operator=(when_any_runner const&) = delete;
360 : when_any_runner& operator=(when_any_runner&&) = delete;
361 :
362 190 : auto release() noexcept
363 : {
364 190 : return std::exchange(h_, nullptr);
365 : }
366 : };
367 :
368 : /** Indexed overload for heterogeneous when_any (compile-time index).
369 :
370 : Uses compile-time index I for variant construction via in_place_index.
371 : Called from when_any_launcher::launch_one<I>().
372 : */
373 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
374 : when_any_runner<StateType>
375 105 : make_when_any_runner(Awaitable inner, StateType* state)
376 : {
377 : using T = awaitable_result_t<Awaitable>;
378 : if constexpr (std::is_void_v<T>)
379 : {
380 : co_await std::move(inner);
381 : if(state->core_.try_win(I))
382 : state->template set_winner_void<I>();
383 : }
384 : else
385 : {
386 : auto result = co_await std::move(inner);
387 : if(state->core_.try_win(I))
388 : {
389 : try
390 : {
391 : state->template set_winner_result<I>(std::move(result));
392 : }
393 : catch(...)
394 : {
395 : state->core_.set_winner_exception(std::current_exception());
396 : }
397 : }
398 : }
399 210 : }
400 :
401 : /** Runtime-index overload for homogeneous when_any (range path).
402 :
403 : Uses requires-expressions to detect state capabilities:
404 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
405 : - set_winner_result(): for non-void tasks
406 : - Neither: for homogeneous void tasks (no result storage)
407 : */
408 : template<IoAwaitable Awaitable, typename StateType>
409 : when_any_runner<StateType>
410 85 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
411 : {
412 : using T = awaitable_result_t<Awaitable>;
413 : if constexpr (std::is_void_v<T>)
414 : {
415 : co_await std::move(inner);
416 : if(state->core_.try_win(index))
417 : {
418 : if constexpr (requires { state->set_winner_void(); })
419 : state->set_winner_void();
420 : }
421 : }
422 : else
423 : {
424 : auto result = co_await std::move(inner);
425 : if(state->core_.try_win(index))
426 : {
427 : try
428 : {
429 : state->set_winner_result(std::move(result));
430 : }
431 : catch(...)
432 : {
433 : state->core_.set_winner_exception(std::current_exception());
434 : }
435 : }
436 : }
437 170 : }
438 :
439 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
440 : template<IoAwaitable... Awaitables>
441 : class when_any_launcher
442 : {
443 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
444 :
445 : std::tuple<Awaitables...>* tasks_;
446 : state_type* state_;
447 :
448 : public:
449 43 : when_any_launcher(
450 : std::tuple<Awaitables...>* tasks,
451 : state_type* state)
452 43 : : tasks_(tasks)
453 43 : , state_(state)
454 : {
455 43 : }
456 :
457 43 : bool await_ready() const noexcept
458 : {
459 43 : return sizeof...(Awaitables) == 0;
460 : }
461 :
462 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
463 : destroys this object before await_suspend returns. Must not reference
464 : `this` after the final launch_one call.
465 : */
466 43 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
467 : {
468 43 : state_->core_.continuation_ = continuation;
469 43 : state_->core_.caller_env_ = caller_env;
470 :
471 43 : if(caller_env->stop_token.stop_possible())
472 : {
473 18 : state_->core_.parent_stop_callback_.emplace(
474 9 : caller_env->stop_token,
475 9 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
476 :
477 9 : if(caller_env->stop_token.stop_requested())
478 3 : state_->core_.stop_source_.request_stop();
479 : }
480 :
481 43 : auto token = state_->core_.stop_source_.get_token();
482 86 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
483 43 : (..., launch_one<Is>(caller_env->executor, token));
484 43 : }(std::index_sequence_for<Awaitables...>{});
485 :
486 86 : return std::noop_coroutine();
487 43 : }
488 :
489 43 : void await_resume() const noexcept
490 : {
491 43 : }
492 :
493 : private:
494 : /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
495 : template<std::size_t I>
496 105 : void launch_one(executor_ref caller_ex, std::stop_token token)
497 : {
498 105 : auto runner = make_when_any_runner<I>(
499 105 : std::move(std::get<I>(*tasks_)), state_);
500 :
501 105 : auto h = runner.release();
502 105 : h.promise().state_ = state_;
503 105 : h.promise().index_ = I;
504 105 : h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
505 :
506 105 : std::coroutine_handle<> ch{h};
507 105 : state_->runner_handles_[I] = ch;
508 105 : caller_ex.post(ch);
509 210 : }
510 : };
511 :
512 : } // namespace detail
513 :
514 : /** Wait for the first awaitable to complete.
515 :
516 : Races multiple heterogeneous awaitables concurrently and returns when the
517 : first one completes. The result is a variant with one alternative per
518 : input task, preserving positional correspondence.
519 :
520 : @par Suspends
521 : The calling coroutine suspends when co_await is invoked. All awaitables
522 : are launched concurrently and execute in parallel. The coroutine resumes
523 : only after all awaitables have completed, even though the winner is
524 : determined by the first to finish.
525 :
526 : @par Completion Conditions
527 : @li Winner is determined when the first awaitable completes (success or exception)
528 : @li Only one task can claim winner status via atomic compare-exchange
529 : @li Once a winner exists, stop is requested for all remaining siblings
530 : @li Parent coroutine resumes only after all siblings acknowledge completion
531 : @li The winner's result is returned; if the winner threw, the exception is rethrown
532 :
533 : @par Cancellation Semantics
534 : Cancellation is supported via stop_token propagated through the
535 : IoAwaitable protocol:
536 : @li Each child awaitable receives a stop_token derived from a shared stop_source
537 : @li When the parent's stop token is activated, the stop is forwarded to all children
538 : @li When a winner is determined, stop_source_.request_stop() is called immediately
539 : @li Siblings must handle cancellation gracefully and complete before parent resumes
540 : @li Stop requests are cooperative; tasks must check and respond to them
541 :
542 : @par Concurrency/Overlap
543 : All awaitables are launched concurrently before any can complete.
544 : The launcher iterates through the arguments, starting each task on the
545 : caller's executor. Tasks may execute in parallel on multi-threaded
546 : executors or interleave on single-threaded executors. There is no
547 : guaranteed ordering of task completion.
548 :
549 : @par Notable Error Conditions
550 : @li Winner exception: if the winning task threw, that exception is rethrown
551 : @li Non-winner exceptions: silently discarded (only winner's result matters)
552 : @li Cancellation: tasks may complete via cancellation without throwing
553 :
554 : @par Example
555 : @code
556 : task<void> example() {
557 : auto result = co_await when_any(
558 : fetch_int(), // task<int>
559 : fetch_string() // task<std::string>
560 : );
561 : // result.index() is 0 or 1
562 : if (result.index() == 0)
563 : std::cout << "Got int: " << std::get<0>(result) << "\n";
564 : else
565 : std::cout << "Got string: " << std::get<1>(result) << "\n";
566 : }
567 : @endcode
568 :
569 : @tparam A0 First awaitable type (must satisfy IoAwaitable).
570 : @tparam As Remaining awaitable types (must satisfy IoAwaitable).
571 : @param a0 The first awaitable to race.
572 : @param as Additional awaitables to race concurrently.
573 : @return A task yielding a variant with one alternative per awaitable.
574 : Use .index() to identify the winner. Void awaitables contribute
575 : std::monostate.
576 :
577 : @throws Rethrows the winner's exception if the winning task threw an exception.
578 :
579 : @par Remarks
580 : Awaitables are moved into the coroutine frame; original objects become
581 : empty after the call. The variant preserves one alternative per input
582 : task. Use .index() to determine which awaitable completed first.
583 : Void awaitables contribute std::monostate to the variant.
584 :
585 : @see when_all, IoAwaitable
586 : */
587 : template<IoAwaitable A0, IoAwaitable... As>
588 43 : [[nodiscard]] auto when_any(A0 a0, As... as)
589 : -> task<detail::when_any_variant_t<
590 : detail::awaitable_result_t<A0>,
591 : detail::awaitable_result_t<As>...>>
592 : {
593 : detail::when_any_state<
594 : detail::awaitable_result_t<A0>,
595 : detail::awaitable_result_t<As>...> state;
596 : std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
597 :
598 : co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
599 :
600 : if(state.core_.winner_exception_)
601 : std::rethrow_exception(state.core_.winner_exception_);
602 :
603 : co_return std::move(*state.result_);
604 86 : }
605 :
606 : /** Concept for ranges of full I/O awaitables.
607 :
608 : A range satisfies `IoAwaitableRange` if it is a sized input range
609 : whose value type satisfies @ref IoAwaitable. This enables when_any
610 : to accept any container or view of awaitables, not just std::vector.
611 :
612 : @tparam R The range type.
613 :
614 : @par Requirements
615 : @li `R` must satisfy `std::ranges::input_range`
616 : @li `R` must satisfy `std::ranges::sized_range`
617 : @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
618 :
619 : @par Syntactic Requirements
620 : Given `r` of type `R`:
621 : @li `std::ranges::begin(r)` is valid
622 : @li `std::ranges::end(r)` is valid
623 : @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
624 : @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
625 :
626 : @par Example
627 : @code
628 : template<IoAwaitableRange R>
629 : task<void> race_all(R&& awaitables) {
630 : auto winner = co_await when_any(std::forward<R>(awaitables));
631 : // Process winner...
632 : }
633 : @endcode
634 :
635 : @see when_any, IoAwaitable
636 : */
637 : template<typename R>
638 : concept IoAwaitableRange =
639 : std::ranges::input_range<R> &&
640 : std::ranges::sized_range<R> &&
641 : IoAwaitable<std::ranges::range_value_t<R>>;
642 :
643 : namespace detail {
644 :
645 : /** Shared state for homogeneous when_any (range overload).
646 :
647 : Uses composition with when_any_core for shared functionality.
648 : Simpler than heterogeneous: optional<T> instead of variant, vector
649 : instead of array for runner handles.
650 : */
651 : template<typename T>
652 : struct when_any_homogeneous_state
653 : {
654 : when_any_core core_;
655 : std::optional<T> result_;
656 : std::vector<std::coroutine_handle<>> runner_handles_;
657 :
658 19 : explicit when_any_homogeneous_state(std::size_t count)
659 19 : : core_(count)
660 38 : , runner_handles_(count)
661 : {
662 19 : }
663 :
664 : // Runners self-destruct in final_suspend. No destruction needed here.
665 :
666 : /** @pre core_.try_win() returned true. */
667 17 : void set_winner_result(T value)
668 : noexcept(std::is_nothrow_move_constructible_v<T>)
669 : {
670 17 : result_.emplace(std::move(value));
671 17 : }
672 : };
673 :
674 : /** Specialization for void tasks (no result storage needed). */
675 : template<>
676 : struct when_any_homogeneous_state<void>
677 : {
678 : when_any_core core_;
679 : std::vector<std::coroutine_handle<>> runner_handles_;
680 :
681 3 : explicit when_any_homogeneous_state(std::size_t count)
682 3 : : core_(count)
683 6 : , runner_handles_(count)
684 : {
685 3 : }
686 :
687 : // Runners self-destruct in final_suspend. No destruction needed here.
688 :
689 : // No set_winner_result - void tasks have no result to store
690 : };
691 :
692 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
693 : template<IoAwaitableRange Range>
694 : class when_any_homogeneous_launcher
695 : {
696 : using Awaitable = std::ranges::range_value_t<Range>;
697 : using T = awaitable_result_t<Awaitable>;
698 :
699 : Range* range_;
700 : when_any_homogeneous_state<T>* state_;
701 :
702 : public:
703 22 : when_any_homogeneous_launcher(
704 : Range* range,
705 : when_any_homogeneous_state<T>* state)
706 22 : : range_(range)
707 22 : , state_(state)
708 : {
709 22 : }
710 :
711 22 : bool await_ready() const noexcept
712 : {
713 22 : return std::ranges::empty(*range_);
714 : }
715 :
716 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
717 : destroys this object before await_suspend returns. Must not reference
718 : `this` after dispatching begins.
719 :
720 : Two-phase approach:
721 : 1. Create all runners (safe - no dispatch yet)
722 : 2. Dispatch all runners (any may complete synchronously)
723 : */
724 22 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
725 : {
726 22 : state_->core_.continuation_ = continuation;
727 22 : state_->core_.caller_env_ = caller_env;
728 :
729 22 : if(caller_env->stop_token.stop_possible())
730 : {
731 14 : state_->core_.parent_stop_callback_.emplace(
732 7 : caller_env->stop_token,
733 7 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
734 :
735 7 : if(caller_env->stop_token.stop_requested())
736 4 : state_->core_.stop_source_.request_stop();
737 : }
738 :
739 22 : auto token = state_->core_.stop_source_.get_token();
740 :
741 : // Phase 1: Create all runners without dispatching.
742 : // This iterates over *range_ safely because no runners execute yet.
743 22 : std::size_t index = 0;
744 107 : for(auto&& a : *range_)
745 : {
746 85 : auto runner = make_when_any_runner(
747 85 : std::move(a), state_, index);
748 :
749 85 : auto h = runner.release();
750 85 : h.promise().state_ = state_;
751 85 : h.promise().index_ = index;
752 85 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
753 :
754 85 : state_->runner_handles_[index] = std::coroutine_handle<>{h};
755 85 : ++index;
756 : }
757 :
758 : // Phase 2: Post all runners. Any may complete synchronously.
759 : // After last post, state_ and this may be destroyed.
760 : // Use raw pointer/count captured before posting.
761 22 : std::coroutine_handle<>* handles = state_->runner_handles_.data();
762 22 : std::size_t count = state_->runner_handles_.size();
763 107 : for(std::size_t i = 0; i < count; ++i)
764 85 : caller_env->executor.post(handles[i]);
765 :
766 44 : return std::noop_coroutine();
767 107 : }
768 :
769 22 : void await_resume() const noexcept
770 : {
771 22 : }
772 : };
773 :
774 : } // namespace detail
775 :
776 : /** Wait for the first awaitable to complete (range overload).
777 :
778 : Races a range of awaitables with the same result type. Accepts any
779 : sized input range of IoAwaitable types, enabling use with arrays,
780 : spans, or custom containers.
781 :
782 : @par Suspends
783 : The calling coroutine suspends when co_await is invoked. All awaitables
784 : in the range are launched concurrently and execute in parallel. The
785 : coroutine resumes only after all awaitables have completed, even though
786 : the winner is determined by the first to finish.
787 :
788 : @par Completion Conditions
789 : @li Winner is determined when the first awaitable completes (success or exception)
790 : @li Only one task can claim winner status via atomic compare-exchange
791 : @li Once a winner exists, stop is requested for all remaining siblings
792 : @li Parent coroutine resumes only after all siblings acknowledge completion
793 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
794 :
795 : @par Cancellation Semantics
796 : Cancellation is supported via stop_token propagated through the
797 : IoAwaitable protocol:
798 : @li Each child awaitable receives a stop_token derived from a shared stop_source
799 : @li When the parent's stop token is activated, the stop is forwarded to all children
800 : @li When a winner is determined, stop_source_.request_stop() is called immediately
801 : @li Siblings must handle cancellation gracefully and complete before parent resumes
802 : @li Stop requests are cooperative; tasks must check and respond to them
803 :
804 : @par Concurrency/Overlap
805 : All awaitables are launched concurrently before any can complete.
806 : The launcher iterates through the range, starting each task on the
807 : caller's executor. Tasks may execute in parallel on multi-threaded
808 : executors or interleave on single-threaded executors. There is no
809 : guaranteed ordering of task completion.
810 :
811 : @par Notable Error Conditions
812 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
813 : @li Winner exception: if the winning task threw, that exception is rethrown
814 : @li Non-winner exceptions: silently discarded (only winner's result matters)
815 : @li Cancellation: tasks may complete via cancellation without throwing
816 :
817 : @par Example
818 : @code
819 : task<void> example() {
820 : std::array<task<Response>, 3> requests = {
821 : fetch_from_server(0),
822 : fetch_from_server(1),
823 : fetch_from_server(2)
824 : };
825 :
826 : auto [index, response] = co_await when_any(std::move(requests));
827 : }
828 : @endcode
829 :
830 : @par Example with Vector
831 : @code
832 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
833 : std::vector<task<Response>> requests;
834 : for (auto const& server : servers)
835 : requests.push_back(fetch_from(server));
836 :
837 : auto [index, response] = co_await when_any(std::move(requests));
838 : co_return response;
839 : }
840 : @endcode
841 :
842 : @tparam R Range type satisfying IoAwaitableRange.
843 : @param awaitables Range of awaitables to race concurrently (must not be empty).
844 : @return A task yielding a pair of (winner_index, result).
845 :
846 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
847 : @throws Rethrows the winner's exception if the winning task threw an exception.
848 :
849 : @par Remarks
850 : Elements are moved from the range; for lvalue ranges, the original
851 : container will have moved-from elements after this call. The range
852 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
853 : the variadic overload, no variant wrapper is needed since all tasks
854 : share the same return type.
855 :
856 : @see when_any, IoAwaitableRange
857 : */
858 : template<IoAwaitableRange R>
859 : requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
860 21 : [[nodiscard]] auto when_any(R&& awaitables)
861 : -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
862 : {
863 : using Awaitable = std::ranges::range_value_t<R>;
864 : using T = detail::awaitable_result_t<Awaitable>;
865 : using result_type = std::pair<std::size_t, T>;
866 : using OwnedRange = std::remove_cvref_t<R>;
867 :
868 : auto count = std::ranges::size(awaitables);
869 : if(count == 0)
870 : throw std::invalid_argument("when_any requires at least one awaitable");
871 :
872 : // Move/copy range onto coroutine frame to ensure lifetime
873 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
874 :
875 : detail::when_any_homogeneous_state<T> state(count);
876 :
877 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
878 :
879 : if(state.core_.winner_exception_)
880 : std::rethrow_exception(state.core_.winner_exception_);
881 :
882 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
883 42 : }
884 :
885 : /** Wait for the first awaitable to complete (void range overload).
886 :
887 : Races a range of void-returning awaitables. Since void awaitables have
888 : no result value, only the winner's index is returned.
889 :
890 : @par Suspends
891 : The calling coroutine suspends when co_await is invoked. All awaitables
892 : in the range are launched concurrently and execute in parallel. The
893 : coroutine resumes only after all awaitables have completed, even though
894 : the winner is determined by the first to finish.
895 :
896 : @par Completion Conditions
897 : @li Winner is determined when the first awaitable completes (success or exception)
898 : @li Only one task can claim winner status via atomic compare-exchange
899 : @li Once a winner exists, stop is requested for all remaining siblings
900 : @li Parent coroutine resumes only after all siblings acknowledge completion
901 : @li The winner's index is returned; if the winner threw, the exception is rethrown
902 :
903 : @par Cancellation Semantics
904 : Cancellation is supported via stop_token propagated through the
905 : IoAwaitable protocol:
906 : @li Each child awaitable receives a stop_token derived from a shared stop_source
907 : @li When the parent's stop token is activated, the stop is forwarded to all children
908 : @li When a winner is determined, stop_source_.request_stop() is called immediately
909 : @li Siblings must handle cancellation gracefully and complete before parent resumes
910 : @li Stop requests are cooperative; tasks must check and respond to them
911 :
912 : @par Concurrency/Overlap
913 : All awaitables are launched concurrently before any can complete.
914 : The launcher iterates through the range, starting each task on the
915 : caller's executor. Tasks may execute in parallel on multi-threaded
916 : executors or interleave on single-threaded executors. There is no
917 : guaranteed ordering of task completion.
918 :
919 : @par Notable Error Conditions
920 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
921 : @li Winner exception: if the winning task threw, that exception is rethrown
922 : @li Non-winner exceptions: silently discarded (only winner's result matters)
923 : @li Cancellation: tasks may complete via cancellation without throwing
924 :
925 : @par Example
926 : @code
927 : task<void> example() {
928 : std::vector<task<void>> tasks;
929 : for (int i = 0; i < 5; ++i)
930 : tasks.push_back(background_work(i));
931 :
932 : std::size_t winner = co_await when_any(std::move(tasks));
933 : // winner is the index of the first task to complete
934 : }
935 : @endcode
936 :
937 : @par Example with Timeout
938 : @code
939 : task<void> with_timeout() {
940 : std::vector<task<void>> tasks;
941 : tasks.push_back(long_running_operation());
942 : tasks.push_back(delay(std::chrono::seconds(5)));
943 :
944 : std::size_t winner = co_await when_any(std::move(tasks));
945 : if (winner == 1) {
946 : // Timeout occurred
947 : }
948 : }
949 : @endcode
950 :
951 : @tparam R Range type satisfying IoAwaitableRange with void result.
952 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
953 : @return A task yielding the winner's index (zero-based).
954 :
955 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
956 : @throws Rethrows the winner's exception if the winning task threw an exception.
957 :
958 : @par Remarks
959 : Elements are moved from the range; for lvalue ranges, the original
960 : container will have moved-from elements after this call. The range
961 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
962 : the non-void overload, no result storage is needed since void tasks
963 : produce no value.
964 :
965 : @see when_any, IoAwaitableRange
966 : */
967 : template<IoAwaitableRange R>
968 : requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
969 3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
970 : {
971 : using OwnedRange = std::remove_cvref_t<R>;
972 :
973 : auto count = std::ranges::size(awaitables);
974 : if(count == 0)
975 : throw std::invalid_argument("when_any requires at least one awaitable");
976 :
977 : // Move/copy range onto coroutine frame to ensure lifetime
978 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
979 :
980 : detail::when_any_homogeneous_state<void> state(count);
981 :
982 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
983 :
984 : if(state.core_.winner_exception_)
985 : std::rethrow_exception(state.core_.winner_exception_);
986 :
987 : co_return state.core_.winner_index_;
988 6 : }
989 :
990 : } // namespace capy
991 : } // namespace boost
992 :
993 : #endif
|