Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/boostorg/capy
8 : //
9 :
10 : #include <boost/capy/ex/thread_pool.hpp>
11 : #include <boost/capy/core/intrusive_queue.hpp>
12 : #include <condition_variable>
13 : #include <mutex>
14 : #include <thread>
15 : #include <vector>
16 :
17 : namespace boost {
18 : namespace capy {
19 :
20 : //------------------------------------------------------------------------------
21 :
22 : // Pimpl implementation hides threading details from the header
23 : class thread_pool::impl
24 : {
25 : // Wraps a coroutine handle for queue storage
26 : struct work : intrusive_queue<work>::node
27 : {
28 : any_coro h_;
29 :
30 83 : explicit work(any_coro h) noexcept
31 83 : : h_(h)
32 : {
33 83 : }
34 :
35 83 : void run()
36 : {
37 : // delete before dispatch
38 83 : auto h = h_;
39 83 : delete this;
40 83 : h.resume();
41 83 : }
42 :
43 0 : void destroy()
44 : {
45 0 : delete this;
46 0 : }
47 : };
48 :
49 : std::mutex mutex_;
50 : std::condition_variable cv_;
51 : intrusive_queue<work> q_;
52 : std::vector<std::thread> threads_;
53 : bool stop_;
54 :
55 : public:
56 14 : ~impl()
57 : {
58 : {
59 14 : std::lock_guard<std::mutex> lock(mutex_);
60 14 : stop_ = true;
61 14 : }
62 14 : cv_.notify_all();
63 :
64 35 : for(auto& t : threads_)
65 21 : t.join();
66 :
67 : // Destroy any work items that were never executed
68 14 : while(auto* w = q_.pop())
69 0 : w->destroy();
70 14 : }
71 :
72 : explicit
73 14 : impl(std::size_t num_threads)
74 14 : : stop_(false)
75 : {
76 14 : if( num_threads == 0)
77 1 : num_threads = std::thread::hardware_concurrency();
78 : // Fallback
79 14 : if( num_threads == 0)
80 0 : num_threads = 1;
81 :
82 14 : threads_.reserve(num_threads);
83 35 : for(std::size_t i = 0; i < num_threads; ++i)
84 42 : threads_.emplace_back([this]{ run(); });
85 14 : }
86 :
87 : void
88 83 : post(any_coro h)
89 : {
90 83 : auto* w = new work(h);
91 : {
92 83 : std::lock_guard<std::mutex> lock(mutex_);
93 83 : q_.push(w);
94 83 : }
95 83 : cv_.notify_one();
96 83 : }
97 :
98 : private:
99 : void
100 21 : run()
101 : {
102 : for(;;)
103 : {
104 104 : work* w = nullptr;
105 : {
106 104 : std::unique_lock<std::mutex> lock(mutex_);
107 104 : cv_.wait(lock, [this]{
108 156 : return stop_ || !q_.empty();
109 : });
110 :
111 : // Only exit when stopped AND queue is drained
112 104 : if(stop_ && q_.empty())
113 42 : return;
114 :
115 83 : w = q_.pop();
116 104 : }
117 :
118 83 : w->run();
119 83 : }
120 : }
121 : };
122 :
123 : //------------------------------------------------------------------------------
124 :
125 14 : thread_pool::
126 : ~thread_pool()
127 : {
128 : // Order matters: shutdown services, then impl, then base
129 14 : shutdown();
130 14 : delete impl_;
131 14 : destroy();
132 14 : }
133 :
134 14 : thread_pool::
135 14 : thread_pool(std::size_t num_threads)
136 14 : : impl_(new impl(num_threads))
137 : {
138 14 : }
139 :
140 : //------------------------------------------------------------------------------
141 :
142 : void
143 83 : thread_pool::executor_type::
144 : post(any_coro h) const
145 : {
146 83 : pool_->impl_->post(h);
147 83 : }
148 :
149 : } // capy
150 : } // boost
|