1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/write_stream.hpp>
18  
#include <boost/capy/concept/write_stream.hpp>
19  
#include <coroutine>
19  
#include <coroutine>
20  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  

22  

23  
#include <concepts>
23  
#include <concepts>
24  
#include <coroutine>
24  
#include <coroutine>
25 -
#include <exception>
 
26  
#include <cstddef>
25  
#include <cstddef>
27  
#include <new>
26  
#include <new>
28  
#include <span>
27  
#include <span>
29  
#include <stop_token>
28  
#include <stop_token>
30  
#include <system_error>
29  
#include <system_error>
31  
#include <utility>
30  
#include <utility>
32  

31  

33  
namespace boost {
32  
namespace boost {
34  
namespace capy {
33  
namespace capy {
35  

34  

36  
/** Type-erased wrapper for any WriteStream.
35  
/** Type-erased wrapper for any WriteStream.
37  

36  

38  
    This class provides type erasure for any type satisfying the
37  
    This class provides type erasure for any type satisfying the
39  
    @ref WriteStream concept, enabling runtime polymorphism for
38  
    @ref WriteStream concept, enabling runtime polymorphism for
40  
    write operations. It uses cached awaitable storage to achieve
39  
    write operations. It uses cached awaitable storage to achieve
41  
    zero steady-state allocation after construction.
40  
    zero steady-state allocation after construction.
42  

41  

43  
    The wrapper supports two construction modes:
42  
    The wrapper supports two construction modes:
44  
    - **Owning**: Pass by value to transfer ownership. The wrapper
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
      allocates storage and owns the stream.
44  
      allocates storage and owns the stream.
46  
    - **Reference**: Pass a pointer to wrap without ownership. The
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
      pointed-to stream must outlive this wrapper.
46  
      pointed-to stream must outlive this wrapper.
48  

47  

49  
    @par Awaitable Preallocation
48  
    @par Awaitable Preallocation
50  
    The constructor preallocates storage for the type-erased awaitable.
49  
    The constructor preallocates storage for the type-erased awaitable.
51  
    This reserves all virtual address space at server startup
50  
    This reserves all virtual address space at server startup
52  
    so memory usage can be measured up front, rather than
51  
    so memory usage can be measured up front, rather than
53  
    allocating piecemeal as traffic arrives.
52  
    allocating piecemeal as traffic arrives.
54  

53  

55  
    @par Immediate Completion
54  
    @par Immediate Completion
56  
    Operations complete immediately without suspending when the
55  
    Operations complete immediately without suspending when the
57  
    buffer sequence is empty, or when the underlying stream's
56  
    buffer sequence is empty, or when the underlying stream's
58  
    awaitable reports readiness via `await_ready`.
57  
    awaitable reports readiness via `await_ready`.
59  

58  

60  
    @par Thread Safety
59  
    @par Thread Safety
61  
    Not thread-safe. Concurrent operations on the same wrapper
60  
    Not thread-safe. Concurrent operations on the same wrapper
62  
    are undefined behavior.
61  
    are undefined behavior.
63  

62  

64  
    @par Example
63  
    @par Example
65  
    @code
64  
    @code
66  
    // Owning - takes ownership of the stream
65  
    // Owning - takes ownership of the stream
67  
    any_write_stream stream(socket{ioc});
66  
    any_write_stream stream(socket{ioc});
68  

67  

69  
    // Reference - wraps without ownership
68  
    // Reference - wraps without ownership
70  
    socket sock(ioc);
69  
    socket sock(ioc);
71  
    any_write_stream stream(&sock);
70  
    any_write_stream stream(&sock);
72  

71  

73  
    const_buffer buf(data, size);
72  
    const_buffer buf(data, size);
74  
    auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
73  
    auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
75  
    @endcode
74  
    @endcode
76  

75  

77  
    @see any_read_stream, any_stream, WriteStream
76  
    @see any_read_stream, any_stream, WriteStream
78  
*/
77  
*/
79  
class any_write_stream
78  
class any_write_stream
80  
{
79  
{
81  
    struct vtable;
80  
    struct vtable;
82  

81  

83  
    template<WriteStream S>
82  
    template<WriteStream S>
84  
    struct vtable_for_impl;
83  
    struct vtable_for_impl;
85  

84  

86  
    // ordered for cache line coherence
85  
    // ordered for cache line coherence
87  
    void* stream_ = nullptr;
86  
    void* stream_ = nullptr;
88  
    vtable const* vt_ = nullptr;
87  
    vtable const* vt_ = nullptr;
89  
    void* cached_awaitable_ = nullptr;
88  
    void* cached_awaitable_ = nullptr;
90  
    void* storage_ = nullptr;
89  
    void* storage_ = nullptr;
91  
    bool awaitable_active_ = false;
90  
    bool awaitable_active_ = false;
92  

91  

93  
public:
92  
public:
94  
    /** Destructor.
93  
    /** Destructor.
95  

94  

96  
        Destroys the owned stream (if any) and releases the cached
95  
        Destroys the owned stream (if any) and releases the cached
97  
        awaitable storage.
96  
        awaitable storage.
98  
    */
97  
    */
99  
    ~any_write_stream();
98  
    ~any_write_stream();
100  

99  

101  
    /** Default constructor.
100  
    /** Default constructor.
102  

101  

103  
        Constructs an empty wrapper. Operations on a default-constructed
102  
        Constructs an empty wrapper. Operations on a default-constructed
104  
        wrapper result in undefined behavior.
103  
        wrapper result in undefined behavior.
105  
    */
104  
    */
106  
    any_write_stream() = default;
105  
    any_write_stream() = default;
107  

106  

108  
    /** Non-copyable.
107  
    /** Non-copyable.
109  

108  

110  
        The awaitable cache is per-instance and cannot be shared.
109  
        The awaitable cache is per-instance and cannot be shared.
111  
    */
110  
    */
112  
    any_write_stream(any_write_stream const&) = delete;
111  
    any_write_stream(any_write_stream const&) = delete;
113  
    any_write_stream& operator=(any_write_stream const&) = delete;
112  
    any_write_stream& operator=(any_write_stream const&) = delete;
114  

113  

115  
    /** Move constructor.
114  
    /** Move constructor.
116  

115  

117  
        Transfers ownership of the wrapped stream (if owned) and
116  
        Transfers ownership of the wrapped stream (if owned) and
118  
        cached awaitable storage from `other`. After the move, `other` is
117  
        cached awaitable storage from `other`. After the move, `other` is
119  
        in a default-constructed state.
118  
        in a default-constructed state.
120  

119  

121  
        @param other The wrapper to move from.
120  
        @param other The wrapper to move from.
122  
    */
121  
    */
123  
    any_write_stream(any_write_stream&& other) noexcept
122  
    any_write_stream(any_write_stream&& other) noexcept
124  
        : stream_(std::exchange(other.stream_, nullptr))
123  
        : stream_(std::exchange(other.stream_, nullptr))
125  
        , vt_(std::exchange(other.vt_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
126  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127  
        , storage_(std::exchange(other.storage_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
128  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
127  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
129  
    {
128  
    {
130  
    }
129  
    }
131  

130  

132  
    /** Move assignment operator.
131  
    /** Move assignment operator.
133  

132  

134  
        Destroys any owned stream and releases existing resources,
133  
        Destroys any owned stream and releases existing resources,
135  
        then transfers ownership from `other`.
134  
        then transfers ownership from `other`.
136  

135  

137  
        @param other The wrapper to move from.
136  
        @param other The wrapper to move from.
138  
        @return Reference to this wrapper.
137  
        @return Reference to this wrapper.
139  
    */
138  
    */
140  
    any_write_stream&
139  
    any_write_stream&
141  
    operator=(any_write_stream&& other) noexcept;
140  
    operator=(any_write_stream&& other) noexcept;
142  

141  

143  
    /** Construct by taking ownership of a WriteStream.
142  
    /** Construct by taking ownership of a WriteStream.
144  

143  

145  
        Allocates storage and moves the stream into this wrapper.
144  
        Allocates storage and moves the stream into this wrapper.
146  
        The wrapper owns the stream and will destroy it.
145  
        The wrapper owns the stream and will destroy it.
147  

146  

148  
        @param s The stream to take ownership of.
147  
        @param s The stream to take ownership of.
149  
    */
148  
    */
150  
    template<WriteStream S>
149  
    template<WriteStream S>
151  
        requires (!std::same_as<std::decay_t<S>, any_write_stream>)
150  
        requires (!std::same_as<std::decay_t<S>, any_write_stream>)
152  
    any_write_stream(S s);
151  
    any_write_stream(S s);
153  

152  

154  
    /** Construct by wrapping a WriteStream without ownership.
153  
    /** Construct by wrapping a WriteStream without ownership.
155  

154  

156  
        Wraps the given stream by pointer. The stream must remain
155  
        Wraps the given stream by pointer. The stream must remain
157  
        valid for the lifetime of this wrapper.
156  
        valid for the lifetime of this wrapper.
158  

157  

159  
        @param s Pointer to the stream to wrap.
158  
        @param s Pointer to the stream to wrap.
160  
    */
159  
    */
161  
    template<WriteStream S>
160  
    template<WriteStream S>
162  
    any_write_stream(S* s);
161  
    any_write_stream(S* s);
163  

162  

164  
    /** Check if the wrapper contains a valid stream.
163  
    /** Check if the wrapper contains a valid stream.
165  

164  

166  
        @return `true` if wrapping a stream, `false` if default-constructed
165  
        @return `true` if wrapping a stream, `false` if default-constructed
167  
            or moved-from.
166  
            or moved-from.
168  
    */
167  
    */
169  
    bool
168  
    bool
170  
    has_value() const noexcept
169  
    has_value() const noexcept
171  
    {
170  
    {
172  
        return stream_ != nullptr;
171  
        return stream_ != nullptr;
173  
    }
172  
    }
174  

173  

175  
    /** Check if the wrapper contains a valid stream.
174  
    /** Check if the wrapper contains a valid stream.
176  

175  

177  
        @return `true` if wrapping a stream, `false` if default-constructed
176  
        @return `true` if wrapping a stream, `false` if default-constructed
178  
            or moved-from.
177  
            or moved-from.
179  
    */
178  
    */
180  
    explicit
179  
    explicit
181  
    operator bool() const noexcept
180  
    operator bool() const noexcept
182  
    {
181  
    {
183  
        return has_value();
182  
        return has_value();
184  
    }
183  
    }
185  

184  

186  
    /** Initiate an asynchronous write operation.
185  
    /** Initiate an asynchronous write operation.
187  

186  

188  
        Writes data from the provided buffer sequence. The operation
187  
        Writes data from the provided buffer sequence. The operation
189  
        completes when at least one byte has been written, or an error
188  
        completes when at least one byte has been written, or an error
190  
        occurs.
189  
        occurs.
191  

190  

192  
        @param buffers The buffer sequence containing data to write.
191  
        @param buffers The buffer sequence containing data to write.
193  
            Passed by value to ensure the sequence lives in the
192  
            Passed by value to ensure the sequence lives in the
194  
            coroutine frame across suspension points.
193  
            coroutine frame across suspension points.
195  

194  

196  
        @return An awaitable yielding `(error_code,std::size_t)`.
195  
        @return An awaitable yielding `(error_code,std::size_t)`.
197  

196  

198  
        @par Immediate Completion
197  
        @par Immediate Completion
199  
        The operation completes immediately without suspending
198  
        The operation completes immediately without suspending
200  
        the calling coroutine when:
199  
        the calling coroutine when:
201  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
200  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
202  
        @li The underlying stream's awaitable reports immediate
201  
        @li The underlying stream's awaitable reports immediate
203  
            readiness via `await_ready`.
202  
            readiness via `await_ready`.
204  

203  

205  
        @note This is a partial operation and may not process the
204  
        @note This is a partial operation and may not process the
206  
        entire buffer sequence. Use the composed @ref write algorithm
205  
        entire buffer sequence. Use the composed @ref write algorithm
207  
        for guaranteed complete transfer.
206  
        for guaranteed complete transfer.
208  

207  

209  
        @par Preconditions
208  
        @par Preconditions
210  
        The wrapper must contain a valid stream (`has_value() == true`).
209  
        The wrapper must contain a valid stream (`has_value() == true`).
211  
    */
210  
    */
212  
    template<ConstBufferSequence CB>
211  
    template<ConstBufferSequence CB>
213  
    auto
212  
    auto
214  
    write_some(CB buffers);
213  
    write_some(CB buffers);
215  

214  

216  
protected:
215  
protected:
217  
    /** Rebind to a new stream after move.
216  
    /** Rebind to a new stream after move.
218  

217  

219  
        Updates the internal pointer to reference a new stream object.
218  
        Updates the internal pointer to reference a new stream object.
220  
        Used by owning wrappers after move assignment when the owned
219  
        Used by owning wrappers after move assignment when the owned
221  
        object has moved to a new location.
220  
        object has moved to a new location.
222  

221  

223  
        @param new_stream The new stream to bind to. Must be the same
222  
        @param new_stream The new stream to bind to. Must be the same
224  
            type as the original stream.
223  
            type as the original stream.
225  

224  

226  
        @note Terminates if called with a stream of different type
225  
        @note Terminates if called with a stream of different type
227  
            than the original.
226  
            than the original.
228  
    */
227  
    */
229  
    template<WriteStream S>
228  
    template<WriteStream S>
230  
    void
229  
    void
231  
    rebind(S& new_stream) noexcept
230  
    rebind(S& new_stream) noexcept
232  
    {
231  
    {
233  
        if(vt_ != &vtable_for_impl<S>::value)
232  
        if(vt_ != &vtable_for_impl<S>::value)
234  
            std::terminate();
233  
            std::terminate();
235  
        stream_ = &new_stream;
234  
        stream_ = &new_stream;
236  
    }
235  
    }
237  
};
236  
};
238  

237  

239  
//----------------------------------------------------------
238  
//----------------------------------------------------------
240  

239  

241  
struct any_write_stream::vtable
240  
struct any_write_stream::vtable
242  
{
241  
{
243  
    // ordered by call frequency for cache line coherence
242  
    // ordered by call frequency for cache line coherence
244  
    void (*construct_awaitable)(
243  
    void (*construct_awaitable)(
245  
        void* stream,
244  
        void* stream,
246  
        void* storage,
245  
        void* storage,
247  
        std::span<const_buffer const> buffers);
246  
        std::span<const_buffer const> buffers);
248  
    bool (*await_ready)(void*);
247  
    bool (*await_ready)(void*);
249  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
250  
    io_result<std::size_t> (*await_resume)(void*);
249  
    io_result<std::size_t> (*await_resume)(void*);
251  
    void (*destroy_awaitable)(void*) noexcept;
250  
    void (*destroy_awaitable)(void*) noexcept;
252  
    std::size_t awaitable_size;
251  
    std::size_t awaitable_size;
253  
    std::size_t awaitable_align;
252  
    std::size_t awaitable_align;
254  
    void (*destroy)(void*) noexcept;
253  
    void (*destroy)(void*) noexcept;
255  
};
254  
};
256  

255  

257  
template<WriteStream S>
256  
template<WriteStream S>
258  
struct any_write_stream::vtable_for_impl
257  
struct any_write_stream::vtable_for_impl
259  
{
258  
{
260  
    using Awaitable = decltype(std::declval<S&>().write_some(
259  
    using Awaitable = decltype(std::declval<S&>().write_some(
261  
        std::span<const_buffer const>{}));
260  
        std::span<const_buffer const>{}));
262  

261  

263  
    static void
262  
    static void
264  
    do_destroy_impl(void* stream) noexcept
263  
    do_destroy_impl(void* stream) noexcept
265  
    {
264  
    {
266  
        static_cast<S*>(stream)->~S();
265  
        static_cast<S*>(stream)->~S();
267  
    }
266  
    }
268  

267  

269  
    static void
268  
    static void
270  
    construct_awaitable_impl(
269  
    construct_awaitable_impl(
271  
        void* stream,
270  
        void* stream,
272  
        void* storage,
271  
        void* storage,
273  
        std::span<const_buffer const> buffers)
272  
        std::span<const_buffer const> buffers)
274  
    {
273  
    {
275  
        auto& s = *static_cast<S*>(stream);
274  
        auto& s = *static_cast<S*>(stream);
276  
        ::new(storage) Awaitable(s.write_some(buffers));
275  
        ::new(storage) Awaitable(s.write_some(buffers));
277  
    }
276  
    }
278  

277  

279  
    static constexpr vtable value = {
278  
    static constexpr vtable value = {
280  
        &construct_awaitable_impl,
279  
        &construct_awaitable_impl,
281  
        +[](void* p) {
280  
        +[](void* p) {
282  
            return static_cast<Awaitable*>(p)->await_ready();
281  
            return static_cast<Awaitable*>(p)->await_ready();
283  
        },
282  
        },
284  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
285  
            return detail::call_await_suspend(
284  
            return detail::call_await_suspend(
286  
                static_cast<Awaitable*>(p), h, env);
285  
                static_cast<Awaitable*>(p), h, env);
287  
        },
286  
        },
288  
        +[](void* p) {
287  
        +[](void* p) {
289  
            return static_cast<Awaitable*>(p)->await_resume();
288  
            return static_cast<Awaitable*>(p)->await_resume();
290  
        },
289  
        },
291  
        +[](void* p) noexcept {
290  
        +[](void* p) noexcept {
292  
            static_cast<Awaitable*>(p)->~Awaitable();
291  
            static_cast<Awaitable*>(p)->~Awaitable();
293  
        },
292  
        },
294  
        sizeof(Awaitable),
293  
        sizeof(Awaitable),
295  
        alignof(Awaitable),
294  
        alignof(Awaitable),
296  
        &do_destroy_impl
295  
        &do_destroy_impl
297  
    };
296  
    };
298  
};
297  
};
299  

298  

300  
//----------------------------------------------------------
299  
//----------------------------------------------------------
301  

300  

302  
inline
301  
inline
303  
any_write_stream::~any_write_stream()
302  
any_write_stream::~any_write_stream()
304  
{
303  
{
305  
    if(storage_)
304  
    if(storage_)
306  
    {
305  
    {
307  
        vt_->destroy(stream_);
306  
        vt_->destroy(stream_);
308  
        ::operator delete(storage_);
307  
        ::operator delete(storage_);
309  
    }
308  
    }
310  
    if(cached_awaitable_)
309  
    if(cached_awaitable_)
311  
    {
310  
    {
312  
        if(awaitable_active_)
311  
        if(awaitable_active_)
313  
            vt_->destroy_awaitable(cached_awaitable_);
312  
            vt_->destroy_awaitable(cached_awaitable_);
314  
        ::operator delete(cached_awaitable_);
313  
        ::operator delete(cached_awaitable_);
315  
    }
314  
    }
316  
}
315  
}
317  

316  

318  
inline any_write_stream&
317  
inline any_write_stream&
319  
any_write_stream::operator=(any_write_stream&& other) noexcept
318  
any_write_stream::operator=(any_write_stream&& other) noexcept
320  
{
319  
{
321  
    if(this != &other)
320  
    if(this != &other)
322  
    {
321  
    {
323  
        if(storage_)
322  
        if(storage_)
324  
        {
323  
        {
325  
            vt_->destroy(stream_);
324  
            vt_->destroy(stream_);
326  
            ::operator delete(storage_);
325  
            ::operator delete(storage_);
327  
        }
326  
        }
328  
        if(cached_awaitable_)
327  
        if(cached_awaitable_)
329  
        {
328  
        {
330  
            if(awaitable_active_)
329  
            if(awaitable_active_)
331  
                vt_->destroy_awaitable(cached_awaitable_);
330  
                vt_->destroy_awaitable(cached_awaitable_);
332  
            ::operator delete(cached_awaitable_);
331  
            ::operator delete(cached_awaitable_);
333  
        }
332  
        }
334  
        stream_ = std::exchange(other.stream_, nullptr);
333  
        stream_ = std::exchange(other.stream_, nullptr);
335  
        vt_ = std::exchange(other.vt_, nullptr);
334  
        vt_ = std::exchange(other.vt_, nullptr);
336  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
335  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
337  
        storage_ = std::exchange(other.storage_, nullptr);
336  
        storage_ = std::exchange(other.storage_, nullptr);
338  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
337  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
339  
    }
338  
    }
340  
    return *this;
339  
    return *this;
341  
}
340  
}
342  

341  

343  
template<WriteStream S>
342  
template<WriteStream S>
344  
    requires (!std::same_as<std::decay_t<S>, any_write_stream>)
343  
    requires (!std::same_as<std::decay_t<S>, any_write_stream>)
345  
any_write_stream::any_write_stream(S s)
344  
any_write_stream::any_write_stream(S s)
346  
    : vt_(&vtable_for_impl<S>::value)
345  
    : vt_(&vtable_for_impl<S>::value)
347  
{
346  
{
348  
    struct guard {
347  
    struct guard {
349  
        any_write_stream* self;
348  
        any_write_stream* self;
350  
        bool committed = false;
349  
        bool committed = false;
351  
        ~guard() {
350  
        ~guard() {
352  
            if(!committed && self->storage_) {
351  
            if(!committed && self->storage_) {
353  
                self->vt_->destroy(self->stream_);
352  
                self->vt_->destroy(self->stream_);
354  
                ::operator delete(self->storage_);
353  
                ::operator delete(self->storage_);
355  
                self->storage_ = nullptr;
354  
                self->storage_ = nullptr;
356  
                self->stream_ = nullptr;
355  
                self->stream_ = nullptr;
357  
            }
356  
            }
358  
        }
357  
        }
359  
    } g{this};
358  
    } g{this};
360  

359  

361  
    storage_ = ::operator new(sizeof(S));
360  
    storage_ = ::operator new(sizeof(S));
362  
    stream_ = ::new(storage_) S(std::move(s));
361  
    stream_ = ::new(storage_) S(std::move(s));
363  

362  

364  
    // Preallocate the awaitable storage
363  
    // Preallocate the awaitable storage
365  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
364  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
366  

365  

367  
    g.committed = true;
366  
    g.committed = true;
368  
}
367  
}
369  

368  

370  
template<WriteStream S>
369  
template<WriteStream S>
371  
any_write_stream::any_write_stream(S* s)
370  
any_write_stream::any_write_stream(S* s)
372  
    : stream_(s)
371  
    : stream_(s)
373  
    , vt_(&vtable_for_impl<S>::value)
372  
    , vt_(&vtable_for_impl<S>::value)
374  
{
373  
{
375  
    // Preallocate the awaitable storage
374  
    // Preallocate the awaitable storage
376  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
375  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
377  
}
376  
}
378  

377  

379  
//----------------------------------------------------------
378  
//----------------------------------------------------------
380  

379  

381  
template<ConstBufferSequence CB>
380  
template<ConstBufferSequence CB>
382  
auto
381  
auto
383  
any_write_stream::write_some(CB buffers)
382  
any_write_stream::write_some(CB buffers)
384  
{
383  
{
385  
    struct awaitable
384  
    struct awaitable
386  
    {
385  
    {
387  
        any_write_stream* self_;
386  
        any_write_stream* self_;
388  
        const_buffer_array<detail::max_iovec_> ba_;
387  
        const_buffer_array<detail::max_iovec_> ba_;
389  

388  

390  
        awaitable(
389  
        awaitable(
391  
            any_write_stream* self,
390  
            any_write_stream* self,
392  
            CB const& buffers) noexcept
391  
            CB const& buffers) noexcept
393  
            : self_(self)
392  
            : self_(self)
394  
            , ba_(buffers)
393  
            , ba_(buffers)
395  
        {
394  
        {
396  
        }
395  
        }
397  

396  

398  
        bool
397  
        bool
399  
        await_ready() const noexcept
398  
        await_ready() const noexcept
400  
        {
399  
        {
401  
            return ba_.to_span().empty();
400  
            return ba_.to_span().empty();
402  
        }
401  
        }
403  

402  

404  
        std::coroutine_handle<>
403  
        std::coroutine_handle<>
405  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
404  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
406  
        {
405  
        {
407  
            self_->vt_->construct_awaitable(
406  
            self_->vt_->construct_awaitable(
408  
                self_->stream_,
407  
                self_->stream_,
409  
                self_->cached_awaitable_,
408  
                self_->cached_awaitable_,
410  
                ba_.to_span());
409  
                ba_.to_span());
411  
            self_->awaitable_active_ = true;
410  
            self_->awaitable_active_ = true;
412  

411  

413  
            if(self_->vt_->await_ready(self_->cached_awaitable_))
412  
            if(self_->vt_->await_ready(self_->cached_awaitable_))
414  
                return h;
413  
                return h;
415  

414  

416  
            return self_->vt_->await_suspend(
415  
            return self_->vt_->await_suspend(
417  
                self_->cached_awaitable_, h, env);
416  
                self_->cached_awaitable_, h, env);
418  
        }
417  
        }
419  

418  

420  
        io_result<std::size_t>
419  
        io_result<std::size_t>
421  
        await_resume()
420  
        await_resume()
422  
        {
421  
        {
423  
            if(!self_->awaitable_active_)
422  
            if(!self_->awaitable_active_)
424  
                return {{}, 0};
423  
                return {{}, 0};
425  
            struct guard {
424  
            struct guard {
426  
                any_write_stream* self;
425  
                any_write_stream* self;
427  
                ~guard() {
426  
                ~guard() {
428  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
427  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
429  
                    self->awaitable_active_ = false;
428  
                    self->awaitable_active_ = false;
430  
                }
429  
                }
431  
            } g{self_};
430  
            } g{self_};
432  
            return self_->vt_->await_resume(
431  
            return self_->vt_->await_resume(
433  
                self_->cached_awaitable_);
432  
                self_->cached_awaitable_);
434  
        }
433  
        }
435  
    };
434  
    };
436  
    return awaitable{this, buffers};
435  
    return awaitable{this, buffers};
437  
}
436  
}
438  

437  

439  
} // namespace capy
438  
} // namespace capy
440  
} // namespace boost
439  
} // namespace boost
441  

440  

442  
#endif
441  
#endif