1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/io_result.hpp>
21  

21  

22  
#include <concepts>
22  
#include <concepts>
23  
#include <coroutine>
23  
#include <coroutine>
24 -
#include <exception>
 
25  
#include <cstddef>
24  
#include <cstddef>
26  
#include <new>
25  
#include <new>
27  
#include <span>
26  
#include <span>
28  
#include <stop_token>
27  
#include <stop_token>
29  
#include <system_error>
28  
#include <system_error>
30  
#include <utility>
29  
#include <utility>
31  

30  

32  
namespace boost {
31  
namespace boost {
33  
namespace capy {
32  
namespace capy {
34  

33  

35  
/** Type-erased wrapper for any ReadStream.
34  
/** Type-erased wrapper for any ReadStream.
36  

35  

37  
    This class provides type erasure for any type satisfying the
36  
    This class provides type erasure for any type satisfying the
38  
    @ref ReadStream concept, enabling runtime polymorphism for
37  
    @ref ReadStream concept, enabling runtime polymorphism for
39  
    read operations. It uses cached awaitable storage to achieve
38  
    read operations. It uses cached awaitable storage to achieve
40  
    zero steady-state allocation after construction.
39  
    zero steady-state allocation after construction.
41  

40  

42  
    The wrapper supports two construction modes:
41  
    The wrapper supports two construction modes:
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
42  
    - **Owning**: Pass by value to transfer ownership. The wrapper
44  
      allocates storage and owns the stream.
43  
      allocates storage and owns the stream.
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
44  
    - **Reference**: Pass a pointer to wrap without ownership. The
46  
      pointed-to stream must outlive this wrapper.
45  
      pointed-to stream must outlive this wrapper.
47  

46  

48  
    @par Awaitable Preallocation
47  
    @par Awaitable Preallocation
49  
    The constructor preallocates storage for the type-erased awaitable.
48  
    The constructor preallocates storage for the type-erased awaitable.
50  
    This reserves all virtual address space at server startup
49  
    This reserves all virtual address space at server startup
51  
    so memory usage can be measured up front, rather than
50  
    so memory usage can be measured up front, rather than
52  
    allocating piecemeal as traffic arrives.
51  
    allocating piecemeal as traffic arrives.
53  

52  

54  
    @par Immediate Completion
53  
    @par Immediate Completion
55  
    When the underlying stream's awaitable reports ready immediately
54  
    When the underlying stream's awaitable reports ready immediately
56  
    (e.g. buffered data already available), the wrapper skips
55  
    (e.g. buffered data already available), the wrapper skips
57  
    coroutine suspension entirely and returns the result inline.
56  
    coroutine suspension entirely and returns the result inline.
58  

57  

59  
    @par Thread Safety
58  
    @par Thread Safety
60  
    Not thread-safe. Concurrent operations on the same wrapper
59  
    Not thread-safe. Concurrent operations on the same wrapper
61  
    are undefined behavior.
60  
    are undefined behavior.
62  

61  

63  
    @par Example
62  
    @par Example
64  
    @code
63  
    @code
65  
    // Owning - takes ownership of the stream
64  
    // Owning - takes ownership of the stream
66  
    any_read_stream stream(socket{ioc});
65  
    any_read_stream stream(socket{ioc});
67  

66  

68  
    // Reference - wraps without ownership
67  
    // Reference - wraps without ownership
69  
    socket sock(ioc);
68  
    socket sock(ioc);
70  
    any_read_stream stream(&sock);
69  
    any_read_stream stream(&sock);
71  

70  

72  
    mutable_buffer buf(data, size);
71  
    mutable_buffer buf(data, size);
73  
    auto [ec, n] = co_await stream.read_some(buf);
72  
    auto [ec, n] = co_await stream.read_some(buf);
74  
    @endcode
73  
    @endcode
75  

74  

76  
    @see any_write_stream, any_stream, ReadStream
75  
    @see any_write_stream, any_stream, ReadStream
77  
*/
76  
*/
78  
class any_read_stream
77  
class any_read_stream
79  
{
78  
{
80  
    struct vtable;
79  
    struct vtable;
81  

80  

82  
    template<ReadStream S>
81  
    template<ReadStream S>
83  
    struct vtable_for_impl;
82  
    struct vtable_for_impl;
84  

83  

85  
    // ordered for cache line coherence
84  
    // ordered for cache line coherence
86  
    void* stream_ = nullptr;
85  
    void* stream_ = nullptr;
87  
    vtable const* vt_ = nullptr;
86  
    vtable const* vt_ = nullptr;
88  
    void* cached_awaitable_ = nullptr;
87  
    void* cached_awaitable_ = nullptr;
89  
    void* storage_ = nullptr;
88  
    void* storage_ = nullptr;
90  
    bool awaitable_active_ = false;
89  
    bool awaitable_active_ = false;
91  

90  

92  
public:
91  
public:
93  
    /** Destructor.
92  
    /** Destructor.
94  

93  

95  
        Destroys the owned stream (if any) and releases the cached
94  
        Destroys the owned stream (if any) and releases the cached
96  
        awaitable storage.
95  
        awaitable storage.
97  
    */
96  
    */
98  
    ~any_read_stream();
97  
    ~any_read_stream();
99  

98  

100  
    /** Default constructor.
99  
    /** Default constructor.
101  

100  

102  
        Constructs an empty wrapper. Operations on a default-constructed
101  
        Constructs an empty wrapper. Operations on a default-constructed
103  
        wrapper result in undefined behavior.
102  
        wrapper result in undefined behavior.
104  
    */
103  
    */
105  
    any_read_stream() = default;
104  
    any_read_stream() = default;
106  

105  

107  
    /** Non-copyable.
106  
    /** Non-copyable.
108  

107  

109  
        The awaitable cache is per-instance and cannot be shared.
108  
        The awaitable cache is per-instance and cannot be shared.
110  
    */
109  
    */
111  
    any_read_stream(any_read_stream const&) = delete;
110  
    any_read_stream(any_read_stream const&) = delete;
112  
    any_read_stream& operator=(any_read_stream const&) = delete;
111  
    any_read_stream& operator=(any_read_stream const&) = delete;
113  

112  

114  
    /** Move constructor.
113  
    /** Move constructor.
115  

114  

116  
        Transfers ownership of the wrapped stream (if owned) and
115  
        Transfers ownership of the wrapped stream (if owned) and
117  
        cached awaitable storage from `other`. After the move, `other` is
116  
        cached awaitable storage from `other`. After the move, `other` is
118  
        in a default-constructed state.
117  
        in a default-constructed state.
119  

118  

120  
        @param other The wrapper to move from.
119  
        @param other The wrapper to move from.
121  
    */
120  
    */
122  
    any_read_stream(any_read_stream&& other) noexcept
121  
    any_read_stream(any_read_stream&& other) noexcept
123  
        : stream_(std::exchange(other.stream_, nullptr))
122  
        : stream_(std::exchange(other.stream_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
123  
        , vt_(std::exchange(other.vt_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
124  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
125  
        , storage_(std::exchange(other.storage_, nullptr))
127  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
126  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
128  
    {
127  
    {
129  
    }
128  
    }
130  

129  

131  
    /** Move assignment operator.
130  
    /** Move assignment operator.
132  

131  

133  
        Destroys any owned stream and releases existing resources,
132  
        Destroys any owned stream and releases existing resources,
134  
        then transfers ownership from `other`.
133  
        then transfers ownership from `other`.
135  

134  

136  
        @param other The wrapper to move from.
135  
        @param other The wrapper to move from.
137  
        @return Reference to this wrapper.
136  
        @return Reference to this wrapper.
138  
    */
137  
    */
139  
    any_read_stream&
138  
    any_read_stream&
140  
    operator=(any_read_stream&& other) noexcept;
139  
    operator=(any_read_stream&& other) noexcept;
141  

140  

142  
    /** Construct by taking ownership of a ReadStream.
141  
    /** Construct by taking ownership of a ReadStream.
143  

142  

144  
        Allocates storage and moves the stream into this wrapper.
143  
        Allocates storage and moves the stream into this wrapper.
145  
        The wrapper owns the stream and will destroy it.
144  
        The wrapper owns the stream and will destroy it.
146  

145  

147  
        @param s The stream to take ownership of.
146  
        @param s The stream to take ownership of.
148  
    */
147  
    */
149  
    template<ReadStream S>
148  
    template<ReadStream S>
150  
        requires (!std::same_as<std::decay_t<S>, any_read_stream>)
149  
        requires (!std::same_as<std::decay_t<S>, any_read_stream>)
151  
    any_read_stream(S s);
150  
    any_read_stream(S s);
152  

151  

153  
    /** Construct by wrapping a ReadStream without ownership.
152  
    /** Construct by wrapping a ReadStream without ownership.
154  

153  

155  
        Wraps the given stream by pointer. The stream must remain
154  
        Wraps the given stream by pointer. The stream must remain
156  
        valid for the lifetime of this wrapper.
155  
        valid for the lifetime of this wrapper.
157  

156  

158  
        @param s Pointer to the stream to wrap.
157  
        @param s Pointer to the stream to wrap.
159  
    */
158  
    */
160  
    template<ReadStream S>
159  
    template<ReadStream S>
161  
    any_read_stream(S* s);
160  
    any_read_stream(S* s);
162  

161  

163  
    /** Check if the wrapper contains a valid stream.
162  
    /** Check if the wrapper contains a valid stream.
164  

163  

165  
        @return `true` if wrapping a stream, `false` if default-constructed
164  
        @return `true` if wrapping a stream, `false` if default-constructed
166  
            or moved-from.
165  
            or moved-from.
167  
    */
166  
    */
168  
    bool
167  
    bool
169  
    has_value() const noexcept
168  
    has_value() const noexcept
170  
    {
169  
    {
171  
        return stream_ != nullptr;
170  
        return stream_ != nullptr;
172  
    }
171  
    }
173  

172  

174  
    /** Check if the wrapper contains a valid stream.
173  
    /** Check if the wrapper contains a valid stream.
175  

174  

176  
        @return `true` if wrapping a stream, `false` if default-constructed
175  
        @return `true` if wrapping a stream, `false` if default-constructed
177  
            or moved-from.
176  
            or moved-from.
178  
    */
177  
    */
179  
    explicit
178  
    explicit
180  
    operator bool() const noexcept
179  
    operator bool() const noexcept
181  
    {
180  
    {
182  
        return has_value();
181  
        return has_value();
183  
    }
182  
    }
184  

183  

185  
    /** Initiate an asynchronous read operation.
184  
    /** Initiate an asynchronous read operation.
186  

185  

187  
        Reads data into the provided buffer sequence. The operation
186  
        Reads data into the provided buffer sequence. The operation
188  
        completes when at least one byte has been read, or an error
187  
        completes when at least one byte has been read, or an error
189  
        occurs.
188  
        occurs.
190  

189  

191  
        @param buffers The buffer sequence to read into. Passed by
190  
        @param buffers The buffer sequence to read into. Passed by
192  
            value to ensure the sequence lives in the coroutine frame
191  
            value to ensure the sequence lives in the coroutine frame
193  
            across suspension points.
192  
            across suspension points.
194  

193  

195  
        @return An awaitable yielding `(error_code,std::size_t)`.
194  
        @return An awaitable yielding `(error_code,std::size_t)`.
196  

195  

197  
        @par Immediate Completion
196  
        @par Immediate Completion
198  
        The operation completes immediately without suspending
197  
        The operation completes immediately without suspending
199  
        the calling coroutine when the underlying stream's
198  
        the calling coroutine when the underlying stream's
200  
        awaitable reports immediate readiness via `await_ready`.
199  
        awaitable reports immediate readiness via `await_ready`.
201  

200  

202  
        @note This is a partial operation and may not process the
201  
        @note This is a partial operation and may not process the
203  
        entire buffer sequence. Use the composed @ref read algorithm
202  
        entire buffer sequence. Use the composed @ref read algorithm
204  
        for guaranteed complete transfer.
203  
        for guaranteed complete transfer.
205  

204  

206  
        @par Preconditions
205  
        @par Preconditions
207  
        The wrapper must contain a valid stream (`has_value() == true`).
206  
        The wrapper must contain a valid stream (`has_value() == true`).
208  
        The caller must not call this function again after a prior
207  
        The caller must not call this function again after a prior
209  
        call returned an error (including EOF).
208  
        call returned an error (including EOF).
210  
    */
209  
    */
211  
    template<MutableBufferSequence MB>
210  
    template<MutableBufferSequence MB>
212  
    auto
211  
    auto
213  
    read_some(MB buffers);
212  
    read_some(MB buffers);
214  

213  

215  
protected:
214  
protected:
216  
    /** Rebind to a new stream after move.
215  
    /** Rebind to a new stream after move.
217  

216  

218  
        Updates the internal pointer to reference a new stream object.
217  
        Updates the internal pointer to reference a new stream object.
219  
        Used by owning wrappers after move assignment when the owned
218  
        Used by owning wrappers after move assignment when the owned
220  
        object has moved to a new location.
219  
        object has moved to a new location.
221  

220  

222  
        @param new_stream The new stream to bind to. Must be the same
221  
        @param new_stream The new stream to bind to. Must be the same
223  
            type as the original stream.
222  
            type as the original stream.
224  

223  

225  
        @note Terminates if called with a stream of different type
224  
        @note Terminates if called with a stream of different type
226  
            than the original.
225  
            than the original.
227  
    */
226  
    */
228  
    template<ReadStream S>
227  
    template<ReadStream S>
229  
    void
228  
    void
230  
    rebind(S& new_stream) noexcept
229  
    rebind(S& new_stream) noexcept
231  
    {
230  
    {
232  
        if(vt_ != &vtable_for_impl<S>::value)
231  
        if(vt_ != &vtable_for_impl<S>::value)
233  
            std::terminate();
232  
            std::terminate();
234  
        stream_ = &new_stream;
233  
        stream_ = &new_stream;
235  
    }
234  
    }
236  
};
235  
};
237  

236  

238  
//----------------------------------------------------------
237  
//----------------------------------------------------------
239  

238  

240  
struct any_read_stream::vtable
239  
struct any_read_stream::vtable
241  
{
240  
{
242  
    // ordered by call frequency for cache line coherence
241  
    // ordered by call frequency for cache line coherence
243  
    void (*construct_awaitable)(
242  
    void (*construct_awaitable)(
244  
        void* stream,
243  
        void* stream,
245  
        void* storage,
244  
        void* storage,
246  
        std::span<mutable_buffer const> buffers);
245  
        std::span<mutable_buffer const> buffers);
247  
    bool (*await_ready)(void*);
246  
    bool (*await_ready)(void*);
248  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
247  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
249  
    io_result<std::size_t> (*await_resume)(void*);
248  
    io_result<std::size_t> (*await_resume)(void*);
250  
    void (*destroy_awaitable)(void*) noexcept;
249  
    void (*destroy_awaitable)(void*) noexcept;
251  
    std::size_t awaitable_size;
250  
    std::size_t awaitable_size;
252  
    std::size_t awaitable_align;
251  
    std::size_t awaitable_align;
253  
    void (*destroy)(void*) noexcept;
252  
    void (*destroy)(void*) noexcept;
254  
};
253  
};
255  

254  

256  
template<ReadStream S>
255  
template<ReadStream S>
257  
struct any_read_stream::vtable_for_impl
256  
struct any_read_stream::vtable_for_impl
258  
{
257  
{
259  
    using Awaitable = decltype(std::declval<S&>().read_some(
258  
    using Awaitable = decltype(std::declval<S&>().read_some(
260  
        std::span<mutable_buffer const>{}));
259  
        std::span<mutable_buffer const>{}));
261  

260  

262  
    static void
261  
    static void
263  
    do_destroy_impl(void* stream) noexcept
262  
    do_destroy_impl(void* stream) noexcept
264  
    {
263  
    {
265  
        static_cast<S*>(stream)->~S();
264  
        static_cast<S*>(stream)->~S();
266  
    }
265  
    }
267  

266  

268  
    static void
267  
    static void
269  
    construct_awaitable_impl(
268  
    construct_awaitable_impl(
270  
        void* stream,
269  
        void* stream,
271  
        void* storage,
270  
        void* storage,
272  
        std::span<mutable_buffer const> buffers)
271  
        std::span<mutable_buffer const> buffers)
273  
    {
272  
    {
274  
        auto& s = *static_cast<S*>(stream);
273  
        auto& s = *static_cast<S*>(stream);
275  
        ::new(storage) Awaitable(s.read_some(buffers));
274  
        ::new(storage) Awaitable(s.read_some(buffers));
276  
    }
275  
    }
277  

276  

278  
    static constexpr vtable value = {
277  
    static constexpr vtable value = {
279  
        &construct_awaitable_impl,
278  
        &construct_awaitable_impl,
280  
        +[](void* p) {
279  
        +[](void* p) {
281  
            return static_cast<Awaitable*>(p)->await_ready();
280  
            return static_cast<Awaitable*>(p)->await_ready();
282  
        },
281  
        },
283  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
282  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
284  
            return detail::call_await_suspend(
283  
            return detail::call_await_suspend(
285  
                static_cast<Awaitable*>(p), h, env);
284  
                static_cast<Awaitable*>(p), h, env);
286  
        },
285  
        },
287  
        +[](void* p) {
286  
        +[](void* p) {
288  
            return static_cast<Awaitable*>(p)->await_resume();
287  
            return static_cast<Awaitable*>(p)->await_resume();
289  
        },
288  
        },
290  
        +[](void* p) noexcept {
289  
        +[](void* p) noexcept {
291  
            static_cast<Awaitable*>(p)->~Awaitable();
290  
            static_cast<Awaitable*>(p)->~Awaitable();
292  
        },
291  
        },
293  
        sizeof(Awaitable),
292  
        sizeof(Awaitable),
294  
        alignof(Awaitable),
293  
        alignof(Awaitable),
295  
        &do_destroy_impl
294  
        &do_destroy_impl
296  
    };
295  
    };
297  
};
296  
};
298  

297  

299  
//----------------------------------------------------------
298  
//----------------------------------------------------------
300  

299  

301  
inline
300  
inline
302  
any_read_stream::~any_read_stream()
301  
any_read_stream::~any_read_stream()
303  
{
302  
{
304  
    if(storage_)
303  
    if(storage_)
305  
    {
304  
    {
306  
        vt_->destroy(stream_);
305  
        vt_->destroy(stream_);
307  
        ::operator delete(storage_);
306  
        ::operator delete(storage_);
308  
    }
307  
    }
309  
    if(cached_awaitable_)
308  
    if(cached_awaitable_)
310  
    {
309  
    {
311  
        if(awaitable_active_)
310  
        if(awaitable_active_)
312  
            vt_->destroy_awaitable(cached_awaitable_);
311  
            vt_->destroy_awaitable(cached_awaitable_);
313  
        ::operator delete(cached_awaitable_);
312  
        ::operator delete(cached_awaitable_);
314  
    }
313  
    }
315  
}
314  
}
316  

315  

317  
inline any_read_stream&
316  
inline any_read_stream&
318  
any_read_stream::operator=(any_read_stream&& other) noexcept
317  
any_read_stream::operator=(any_read_stream&& other) noexcept
319  
{
318  
{
320  
    if(this != &other)
319  
    if(this != &other)
321  
    {
320  
    {
322  
        if(storage_)
321  
        if(storage_)
323  
        {
322  
        {
324  
            vt_->destroy(stream_);
323  
            vt_->destroy(stream_);
325  
            ::operator delete(storage_);
324  
            ::operator delete(storage_);
326  
        }
325  
        }
327  
        if(cached_awaitable_)
326  
        if(cached_awaitable_)
328  
        {
327  
        {
329  
            if(awaitable_active_)
328  
            if(awaitable_active_)
330  
                vt_->destroy_awaitable(cached_awaitable_);
329  
                vt_->destroy_awaitable(cached_awaitable_);
331  
            ::operator delete(cached_awaitable_);
330  
            ::operator delete(cached_awaitable_);
332  
        }
331  
        }
333  
        stream_ = std::exchange(other.stream_, nullptr);
332  
        stream_ = std::exchange(other.stream_, nullptr);
334  
        vt_ = std::exchange(other.vt_, nullptr);
333  
        vt_ = std::exchange(other.vt_, nullptr);
335  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
334  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
336  
        storage_ = std::exchange(other.storage_, nullptr);
335  
        storage_ = std::exchange(other.storage_, nullptr);
337  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
336  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
338  
    }
337  
    }
339  
    return *this;
338  
    return *this;
340  
}
339  
}
341  

340  

342  
template<ReadStream S>
341  
template<ReadStream S>
343  
    requires (!std::same_as<std::decay_t<S>, any_read_stream>)
342  
    requires (!std::same_as<std::decay_t<S>, any_read_stream>)
344  
any_read_stream::any_read_stream(S s)
343  
any_read_stream::any_read_stream(S s)
345  
    : vt_(&vtable_for_impl<S>::value)
344  
    : vt_(&vtable_for_impl<S>::value)
346  
{
345  
{
347  
    struct guard {
346  
    struct guard {
348  
        any_read_stream* self;
347  
        any_read_stream* self;
349  
        bool committed = false;
348  
        bool committed = false;
350  
        ~guard() {
349  
        ~guard() {
351  
            if(!committed && self->storage_) {
350  
            if(!committed && self->storage_) {
352  
                self->vt_->destroy(self->stream_);
351  
                self->vt_->destroy(self->stream_);
353  
                ::operator delete(self->storage_);
352  
                ::operator delete(self->storage_);
354  
                self->storage_ = nullptr;
353  
                self->storage_ = nullptr;
355  
                self->stream_ = nullptr;
354  
                self->stream_ = nullptr;
356  
            }
355  
            }
357  
        }
356  
        }
358  
    } g{this};
357  
    } g{this};
359  

358  

360  
    storage_ = ::operator new(sizeof(S));
359  
    storage_ = ::operator new(sizeof(S));
361  
    stream_ = ::new(storage_) S(std::move(s));
360  
    stream_ = ::new(storage_) S(std::move(s));
362  

361  

363  
    // Preallocate the awaitable storage
362  
    // Preallocate the awaitable storage
364  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
363  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
365  

364  

366  
    g.committed = true;
365  
    g.committed = true;
367  
}
366  
}
368  

367  

369  
template<ReadStream S>
368  
template<ReadStream S>
370  
any_read_stream::any_read_stream(S* s)
369  
any_read_stream::any_read_stream(S* s)
371  
    : stream_(s)
370  
    : stream_(s)
372  
    , vt_(&vtable_for_impl<S>::value)
371  
    , vt_(&vtable_for_impl<S>::value)
373  
{
372  
{
374  
    // Preallocate the awaitable storage
373  
    // Preallocate the awaitable storage
375  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
374  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
376  
}
375  
}
377  

376  

378  
//----------------------------------------------------------
377  
//----------------------------------------------------------
379  

378  

380  
template<MutableBufferSequence MB>
379  
template<MutableBufferSequence MB>
381  
auto
380  
auto
382  
any_read_stream::read_some(MB buffers)
381  
any_read_stream::read_some(MB buffers)
383  
{
382  
{
384  
    // VFALCO in theory, we could use if constexpr to detect a
383  
    // VFALCO in theory, we could use if constexpr to detect a
385  
    // span and then pass that through to read_some without the array
384  
    // span and then pass that through to read_some without the array
386  
    struct awaitable
385  
    struct awaitable
387  
    {
386  
    {
388  
        any_read_stream* self_;
387  
        any_read_stream* self_;
389  
        mutable_buffer_array<detail::max_iovec_> ba_;
388  
        mutable_buffer_array<detail::max_iovec_> ba_;
390  

389  

391  
        bool
390  
        bool
392  
        await_ready()
391  
        await_ready()
393  
        {
392  
        {
394  
            self_->vt_->construct_awaitable(
393  
            self_->vt_->construct_awaitable(
395  
                self_->stream_,
394  
                self_->stream_,
396  
                self_->cached_awaitable_,
395  
                self_->cached_awaitable_,
397  
                ba_.to_span());
396  
                ba_.to_span());
398  
            self_->awaitable_active_ = true;
397  
            self_->awaitable_active_ = true;
399  

398  

400  
            return self_->vt_->await_ready(
399  
            return self_->vt_->await_ready(
401  
                self_->cached_awaitable_);
400  
                self_->cached_awaitable_);
402  
        }
401  
        }
403  

402  

404  
        std::coroutine_handle<>
403  
        std::coroutine_handle<>
405  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
404  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
406  
        {
405  
        {
407  
            return self_->vt_->await_suspend(
406  
            return self_->vt_->await_suspend(
408  
                self_->cached_awaitable_, h, env);
407  
                self_->cached_awaitable_, h, env);
409  
        }
408  
        }
410  

409  

411  
        io_result<std::size_t>
410  
        io_result<std::size_t>
412  
        await_resume()
411  
        await_resume()
413  
        {
412  
        {
414  
            struct guard {
413  
            struct guard {
415  
                any_read_stream* self;
414  
                any_read_stream* self;
416  
                ~guard() {
415  
                ~guard() {
417  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
416  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
418  
                    self->awaitable_active_ = false;
417  
                    self->awaitable_active_ = false;
419  
                }
418  
                }
420  
            } g{self_};
419  
            } g{self_};
421  
            return self_->vt_->await_resume(
420  
            return self_->vt_->await_resume(
422  
                self_->cached_awaitable_);
421  
                self_->cached_awaitable_);
423  
        }
422  
        }
424  
    };
423  
    };
425  
    return awaitable{this,
424  
    return awaitable{this,
426  
        mutable_buffer_array<detail::max_iovec_>(buffers)};
425  
        mutable_buffer_array<detail::max_iovec_>(buffers)};
427  
}
426  
}
428  

427  

429  
} // namespace capy
428  
} // namespace capy
430  
} // namespace boost
429  
} // namespace boost
431  

430  

432  
#endif
431  
#endif