1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
10  
#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11  
#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11  
#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/ex/frame_allocator.hpp>
14  
#include <boost/capy/ex/frame_allocator.hpp>
15  

15  

16  
#include <coroutine>
16  
#include <coroutine>
17  
#include <cstddef>
17  
#include <cstddef>
18  
#include <exception>
18  
#include <exception>
19  

19  

20  
namespace boost {
20  
namespace boost {
21  
namespace capy {
21  
namespace capy {
22  
namespace detail {
22  
namespace detail {
23  

23  

24  
class strand_queue;
24  
class strand_queue;
25  

25  

26  
//----------------------------------------------------------
26  
//----------------------------------------------------------
27  

27  

28  
// Metadata stored before the coroutine frame
28  
// Metadata stored before the coroutine frame
29  
struct frame_prefix
29  
struct frame_prefix
30  
{
30  
{
31  
    frame_prefix* next;
31  
    frame_prefix* next;
32  
    strand_queue* queue;
32  
    strand_queue* queue;
33  
    std::size_t alloc_size;
33  
    std::size_t alloc_size;
34  
};
34  
};
35  

35  

36  
//----------------------------------------------------------
36  
//----------------------------------------------------------
37  

37  

38  
/** Wrapper coroutine for strand queue dispatch operations.
38  
/** Wrapper coroutine for strand queue dispatch operations.
39  

39  

40  
    This coroutine wraps a target coroutine handle and resumes
40  
    This coroutine wraps a target coroutine handle and resumes
41  
    it when dispatched. The wrapper ensures control returns to
41  
    it when dispatched. The wrapper ensures control returns to
42  
    the dispatch loop after the target suspends or completes.
42  
    the dispatch loop after the target suspends or completes.
43  

43  

44  
    The promise contains an intrusive list node for queue
44  
    The promise contains an intrusive list node for queue
45  
    storage and supports a custom allocator that recycles
45  
    storage and supports a custom allocator that recycles
46  
    coroutine frames via a free list.
46  
    coroutine frames via a free list.
47  
*/
47  
*/
48  
struct strand_op
48  
struct strand_op
49  
{
49  
{
50  
    struct promise_type
50  
    struct promise_type
51  
    {
51  
    {
52  
        promise_type* next = nullptr;
52  
        promise_type* next = nullptr;
53  

53  

54  
        void*
54  
        void*
55  
        operator new(
55  
        operator new(
56  
            std::size_t size,
56  
            std::size_t size,
57  
            strand_queue& q,
57  
            strand_queue& q,
58  
            std::coroutine_handle<void>);
58  
            std::coroutine_handle<void>);
59  

59  

60  
        void
60  
        void
61  
        operator delete(void* p, std::size_t);
61  
        operator delete(void* p, std::size_t);
62  

62  

63  
        strand_op
63  
        strand_op
64  
        get_return_object() noexcept
64  
        get_return_object() noexcept
65  
        {
65  
        {
66  
            return {std::coroutine_handle<promise_type>::from_promise(*this)};
66  
            return {std::coroutine_handle<promise_type>::from_promise(*this)};
67  
        }
67  
        }
68  

68  

69  
        std::suspend_always
69  
        std::suspend_always
70  
        initial_suspend() noexcept
70  
        initial_suspend() noexcept
71  
        {
71  
        {
72  
            return {};
72  
            return {};
73  
        }
73  
        }
74  

74  

75  
        std::suspend_always
75  
        std::suspend_always
76  
        final_suspend() noexcept
76  
        final_suspend() noexcept
77  
        {
77  
        {
78  
            return {};
78  
            return {};
79  
        }
79  
        }
80  

80  

81  
        void
81  
        void
82  
        return_void() noexcept
82  
        return_void() noexcept
83  
        {
83  
        {
84  
        }
84  
        }
85  

85  

86  
        void
86  
        void
87  
        unhandled_exception()
87  
        unhandled_exception()
88  
        {
88  
        {
89  
            std::terminate();
89  
            std::terminate();
90  
        }
90  
        }
91  
    };
91  
    };
92  

92  

93  
    std::coroutine_handle<promise_type> h_;
93  
    std::coroutine_handle<promise_type> h_;
94  
};
94  
};
95  

95  

96  
//----------------------------------------------------------
96  
//----------------------------------------------------------
97  

97  

98  
/** Single-threaded dispatch queue for coroutine handles.
98  
/** Single-threaded dispatch queue for coroutine handles.
99  

99  

100  
    This queue stores coroutine handles and resumes them
100  
    This queue stores coroutine handles and resumes them
101  
    sequentially when dispatch() is called. Each pushed
101  
    sequentially when dispatch() is called. Each pushed
102  
    handle is wrapped in a strand_op coroutine that ensures
102  
    handle is wrapped in a strand_op coroutine that ensures
103  
    control returns to the dispatch loop after the target
103  
    control returns to the dispatch loop after the target
104  
    suspends or completes.
104  
    suspends or completes.
105  

105  

106  
    The queue uses an intrusive singly-linked list through
106  
    The queue uses an intrusive singly-linked list through
107  
    the promise type to avoid separate node allocations.
107  
    the promise type to avoid separate node allocations.
108  
    A free list recycles wrapper coroutine frames to reduce
108  
    A free list recycles wrapper coroutine frames to reduce
109  
    allocation overhead during repeated push/dispatch cycles.
109  
    allocation overhead during repeated push/dispatch cycles.
110  

110  

111  
    @par Thread Safety
111  
    @par Thread Safety
112  
    This class is not thread-safe. All operations must be
112  
    This class is not thread-safe. All operations must be
113  
    called from a single thread.
113  
    called from a single thread.
114  
*/
114  
*/
115  
class strand_queue
115  
class strand_queue
116  
{
116  
{
117  
    using promise_type = strand_op::promise_type;
117  
    using promise_type = strand_op::promise_type;
118  

118  

119  
    promise_type* head_ = nullptr;
119  
    promise_type* head_ = nullptr;
120  
    promise_type* tail_ = nullptr;
120  
    promise_type* tail_ = nullptr;
121  
    frame_prefix* free_list_ = nullptr;
121  
    frame_prefix* free_list_ = nullptr;
122  

122  

123  
    friend struct strand_op::promise_type;
123  
    friend struct strand_op::promise_type;
124  

124  

125  
    static
125  
    static
126  
    strand_op
126  
    strand_op
127  
    make_strand_op(
127  
    make_strand_op(
128  
        strand_queue& q,
128  
        strand_queue& q,
129  
        std::coroutine_handle<void> target)
129  
        std::coroutine_handle<void> target)
130  
    {
130  
    {
131  
        (void)q;
131  
        (void)q;
132  
        safe_resume(target);
132  
        safe_resume(target);
133  
        co_return;
133  
        co_return;
134  
    }
134  
    }
135  

135  

136  
public:
136  
public:
137  
    strand_queue() = default;
137  
    strand_queue() = default;
138  

138  

139  
    strand_queue(strand_queue const&) = delete;
139  
    strand_queue(strand_queue const&) = delete;
140  
    strand_queue& operator=(strand_queue const&) = delete;
140  
    strand_queue& operator=(strand_queue const&) = delete;
141  

141  

142  
    /** Destructor.
142  
    /** Destructor.
143  

143  

144  
        Destroys any pending wrappers without resuming them,
144  
        Destroys any pending wrappers without resuming them,
145  
        then frees all memory in the free list.
145  
        then frees all memory in the free list.
146  
    */
146  
    */
147  
    ~strand_queue()
147  
    ~strand_queue()
148  
    {
148  
    {
149  
        // Destroy pending wrappers
149  
        // Destroy pending wrappers
150  
        while(head_)
150  
        while(head_)
151  
        {
151  
        {
152  
            promise_type* p = head_;
152  
            promise_type* p = head_;
153  
            head_ = p->next;
153  
            head_ = p->next;
154  

154  

155  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
155  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
156  
            h.destroy();
156  
            h.destroy();
157  
        }
157  
        }
158  

158  

159  
        // Free the free list memory
159  
        // Free the free list memory
160  
        while(free_list_)
160  
        while(free_list_)
161  
        {
161  
        {
162  
            frame_prefix* prefix = free_list_;
162  
            frame_prefix* prefix = free_list_;
163  
            free_list_ = prefix->next;
163  
            free_list_ = prefix->next;
164  
            ::operator delete(prefix);
164  
            ::operator delete(prefix);
165  
        }
165  
        }
166  
    }
166  
    }
167  

167  

168  
    /** Returns true if there are no pending operations.
168  
    /** Returns true if there are no pending operations.
169  
    */
169  
    */
170  
    bool
170  
    bool
171  
    empty() const noexcept
171  
    empty() const noexcept
172  
    {
172  
    {
173  
        return head_ == nullptr;
173  
        return head_ == nullptr;
174  
    }
174  
    }
175  

175  

176  
    /** Push a coroutine handle to the queue.
176  
    /** Push a coroutine handle to the queue.
177  

177  

178  
        Creates a wrapper coroutine and appends it to the
178  
        Creates a wrapper coroutine and appends it to the
179  
        queue. The wrapper will resume the target handle
179  
        queue. The wrapper will resume the target handle
180  
        when dispatch() processes it.
180  
        when dispatch() processes it.
181  

181  

182  
        @param h The coroutine handle to dispatch.
182  
        @param h The coroutine handle to dispatch.
183  
    */
183  
    */
184  
    void
184  
    void
185  
    push(std::coroutine_handle<void> h)
185  
    push(std::coroutine_handle<void> h)
186  
    {
186  
    {
187  
        strand_op op = make_strand_op(*this, h);
187  
        strand_op op = make_strand_op(*this, h);
188  

188  

189  
        promise_type* p = &op.h_.promise();
189  
        promise_type* p = &op.h_.promise();
190  
        p->next = nullptr;
190  
        p->next = nullptr;
191  

191  

192  
        if(tail_)
192  
        if(tail_)
193  
            tail_->next = p;
193  
            tail_->next = p;
194  
        else
194  
        else
195  
            head_ = p;
195  
            head_ = p;
196  
        tail_ = p;
196  
        tail_ = p;
197  
    }
197  
    }
198  

198  

199  
    /** Resume all queued coroutines in sequence.
199  
    /** Resume all queued coroutines in sequence.
200  

200  

201  
        Processes each wrapper in FIFO order, resuming its
201  
        Processes each wrapper in FIFO order, resuming its
202  
        target coroutine. After each target suspends or
202  
        target coroutine. After each target suspends or
203  
        completes, the wrapper is destroyed and its frame
203  
        completes, the wrapper is destroyed and its frame
204  
        is added to the free list for reuse.
204  
        is added to the free list for reuse.
205  

205  

206  
        Coroutines resumed during dispatch may push new
206  
        Coroutines resumed during dispatch may push new
207  
        handles, which will also be processed in the same
207  
        handles, which will also be processed in the same
208  
        dispatch call.
208  
        dispatch call.
209  

209  

210  
        @warning Not thread-safe. Do not call while another
210  
        @warning Not thread-safe. Do not call while another
211  
            thread may be calling push().
211  
            thread may be calling push().
212  
    */
212  
    */
213  
    void
213  
    void
214  
    dispatch()
214  
    dispatch()
215  
    {
215  
    {
216  
        while(head_)
216  
        while(head_)
217  
        {
217  
        {
218  
            promise_type* p = head_;
218  
            promise_type* p = head_;
219  
            head_ = p->next;
219  
            head_ = p->next;
220  
            if(!head_)
220  
            if(!head_)
221  
                tail_ = nullptr;
221  
                tail_ = nullptr;
222  

222  

223  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
223  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
224  
            safe_resume(h);
224  
            safe_resume(h);
225  
            h.destroy();
225  
            h.destroy();
226  
        }
226  
        }
227  
    }
227  
    }
228  

228  

229  
    /** Batch of taken items for thread-safe dispatch. */
229  
    /** Batch of taken items for thread-safe dispatch. */
230  
    struct taken_batch
230  
    struct taken_batch
231  
    {
231  
    {
232  
        promise_type* head = nullptr;
232  
        promise_type* head = nullptr;
233  
        promise_type* tail = nullptr;
233  
        promise_type* tail = nullptr;
234  
    };
234  
    };
235  

235  

236  
    /** Take all pending items atomically.
236  
    /** Take all pending items atomically.
237  

237  

238  
        Removes all items from the queue and returns them
238  
        Removes all items from the queue and returns them
239  
        as a batch. The queue is left empty.
239  
        as a batch. The queue is left empty.
240  

240  

241  
        @return The batch of taken items.
241  
        @return The batch of taken items.
242  
    */
242  
    */
243  
    taken_batch
243  
    taken_batch
244  
    take_all() noexcept
244  
    take_all() noexcept
245  
    {
245  
    {
246  
        taken_batch batch{head_, tail_};
246  
        taken_batch batch{head_, tail_};
247  
        head_ = tail_ = nullptr;
247  
        head_ = tail_ = nullptr;
248  
        return batch;
248  
        return batch;
249  
    }
249  
    }
250  

250  

251  
    /** Dispatch a batch of taken items.
251  
    /** Dispatch a batch of taken items.
252  

252  

253  
        @param batch The batch to dispatch.
253  
        @param batch The batch to dispatch.
254  

254  

255  
        @note This is thread-safe w.r.t. push() because it doesn't
255  
        @note This is thread-safe w.r.t. push() because it doesn't
256  
            access the queue's free_list_. Frames are deleted directly
256  
            access the queue's free_list_. Frames are deleted directly
257  
            rather than recycled.
257  
            rather than recycled.
258  
    */
258  
    */
259  
    static
259  
    static
260  
    void
260  
    void
261  
    dispatch_batch(taken_batch& batch)
261  
    dispatch_batch(taken_batch& batch)
262  
    {
262  
    {
263  
        while(batch.head)
263  
        while(batch.head)
264  
        {
264  
        {
265  
            promise_type* p = batch.head;
265  
            promise_type* p = batch.head;
266  
            batch.head = p->next;
266  
            batch.head = p->next;
267  

267  

268  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
268  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
269  
            safe_resume(h);
269  
            safe_resume(h);
270  
            // Don't use h.destroy() - it would call operator delete which
270  
            // Don't use h.destroy() - it would call operator delete which
271  
            // accesses the queue's free_list_ (race with push).
271  
            // accesses the queue's free_list_ (race with push).
272  
            // Instead, manually free the frame without recycling.
272  
            // Instead, manually free the frame without recycling.
273  
            // h.address() returns the frame base (what operator new returned).
273  
            // h.address() returns the frame base (what operator new returned).
274  
            frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
274  
            frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
275  
            ::operator delete(prefix);
275  
            ::operator delete(prefix);
276  
        }
276  
        }
277  
        batch.tail = nullptr;
277  
        batch.tail = nullptr;
278  
    }
278  
    }
279  
};
279  
};
280  

280  

281  
//----------------------------------------------------------
281  
//----------------------------------------------------------
282  

282  

283  
inline
283  
inline
284  
void*
284  
void*
285  
strand_op::promise_type::operator new(
285  
strand_op::promise_type::operator new(
286  
    std::size_t size,
286  
    std::size_t size,
287  
    strand_queue& q,
287  
    strand_queue& q,
288  
    std::coroutine_handle<void>)
288  
    std::coroutine_handle<void>)
289  
{
289  
{
290  
    // Total size includes prefix
290  
    // Total size includes prefix
291  
    std::size_t alloc_size = size + sizeof(frame_prefix);
291  
    std::size_t alloc_size = size + sizeof(frame_prefix);
292  
    void* raw;
292  
    void* raw;
293  
    
293  
    
294  
    // Try to reuse from free list
294  
    // Try to reuse from free list
295  
    if(q.free_list_)
295  
    if(q.free_list_)
296  
    {
296  
    {
297  
        frame_prefix* prefix = q.free_list_;
297  
        frame_prefix* prefix = q.free_list_;
298  
        q.free_list_ = prefix->next;
298  
        q.free_list_ = prefix->next;
299  
        raw = prefix;
299  
        raw = prefix;
300  
    }
300  
    }
301  
    else
301  
    else
302  
    {
302  
    {
303  
        raw = ::operator new(alloc_size);
303  
        raw = ::operator new(alloc_size);
304  
    }
304  
    }
305  

305  

306  
    // Initialize prefix
306  
    // Initialize prefix
307  
    frame_prefix* prefix = static_cast<frame_prefix*>(raw);
307  
    frame_prefix* prefix = static_cast<frame_prefix*>(raw);
308  
    prefix->next = nullptr;
308  
    prefix->next = nullptr;
309  
    prefix->queue = &q;
309  
    prefix->queue = &q;
310  
    prefix->alloc_size = alloc_size;
310  
    prefix->alloc_size = alloc_size;
311  

311  

312  
    // Return pointer AFTER the prefix (this is where coroutine frame goes)
312  
    // Return pointer AFTER the prefix (this is where coroutine frame goes)
313  
    return prefix + 1;
313  
    return prefix + 1;
314  
}
314  
}
315  

315  

316  
inline
316  
inline
317  
void
317  
void
318  
strand_op::promise_type::operator delete(void* p, std::size_t)
318  
strand_op::promise_type::operator delete(void* p, std::size_t)
319  
{
319  
{
320  
    // Calculate back to get the prefix
320  
    // Calculate back to get the prefix
321  
    frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
321  
    frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
322  

322  

323  
    // Add to free list
323  
    // Add to free list
324  
    prefix->next = prefix->queue->free_list_;
324  
    prefix->next = prefix->queue->free_list_;
325  
    prefix->queue->free_list_ = prefix;
325  
    prefix->queue->free_list_ = prefix;
326  
}
326  
}
327  

327  

328  
} // namespace detail
328  
} // namespace detail
329  
} // namespace capy
329  
} // namespace capy
330  
} // namespace boost
330  
} // namespace boost
331  

331  

332  
#endif
332  
#endif