LCOV - code coverage report
Current view: top level - libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 91.8 % 61 56
Test Date: 2026-01-15 18:27:21 Functions: 91.7 % 12 11

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/boostorg/capy
       8              : //
       9              : 
      10              : #include <boost/capy/ex/thread_pool.hpp>
      11              : #include <boost/capy/core/intrusive_queue.hpp>
      12              : #include <condition_variable>
      13              : #include <mutex>
      14              : #include <thread>
      15              : #include <vector>
      16              : 
      17              : namespace boost {
      18              : namespace capy {
      19              : 
      20              : //------------------------------------------------------------------------------
      21              : 
      22              : // Pimpl implementation hides threading details from the header
      23              : class thread_pool::impl
      24              : {
      25              :     // Wraps a coroutine handle for queue storage
      26              :     struct work : intrusive_queue<work>::node
      27              :     {
      28              :         any_coro h_;
      29              : 
      30           83 :         explicit work(any_coro h) noexcept
      31           83 :             : h_(h)
      32              :         {
      33           83 :         }
      34              : 
      35           83 :         void run()
      36              :         {
      37              :             // delete before dispatch
      38           83 :             auto h = h_;
      39           83 :             delete this;
      40           83 :             h.resume();
      41           83 :         }
      42              : 
      43            0 :         void destroy()
      44              :         {
      45            0 :             delete this;
      46            0 :         }
      47              :     };
      48              : 
      49              :     std::mutex mutex_;
      50              :     std::condition_variable cv_;
      51              :     intrusive_queue<work> q_;
      52              :     std::vector<std::thread> threads_;
      53              :     bool stop_;
      54              : 
      55              : public:
      56           14 :     ~impl()
      57              :     {
      58              :         {
      59           14 :             std::lock_guard<std::mutex> lock(mutex_);
      60           14 :             stop_ = true;
      61           14 :         }
      62           14 :         cv_.notify_all();
      63              : 
      64           35 :         for(auto& t : threads_)
      65           21 :             t.join();
      66              : 
      67              :         // Destroy any work items that were never executed
      68           14 :         while(auto* w = q_.pop())
      69            0 :             w->destroy();
      70           14 :     }
      71              : 
      72              :     explicit
      73           14 :     impl(std::size_t num_threads)
      74           14 :         : stop_(false)
      75              :     {
      76           14 :         if( num_threads == 0)
      77            1 :             num_threads = std::thread::hardware_concurrency();
      78              :         // Fallback
      79           14 :         if( num_threads == 0)
      80            0 :             num_threads = 1;
      81              : 
      82           14 :         threads_.reserve(num_threads);
      83           35 :         for(std::size_t i = 0; i < num_threads; ++i)
      84           42 :             threads_.emplace_back([this]{ run(); });
      85           14 :     }
      86              : 
      87              :     void
      88           83 :     post(any_coro h)
      89              :     {
      90           83 :         auto* w = new work(h);
      91              :         {
      92           83 :             std::lock_guard<std::mutex> lock(mutex_);
      93           83 :             q_.push(w);
      94           83 :         }
      95           83 :         cv_.notify_one();
      96           83 :     }
      97              : 
      98              : private:
      99              :     void
     100           21 :     run()
     101              :     {
     102              :         for(;;)
     103              :         {
     104          104 :             work* w = nullptr;
     105              :             {
     106          104 :                 std::unique_lock<std::mutex> lock(mutex_);
     107          104 :                 cv_.wait(lock, [this]{
     108          156 :                     return stop_ || !q_.empty();
     109              :                 });
     110              : 
     111              :                 // Only exit when stopped AND queue is drained
     112          104 :                 if(stop_ && q_.empty())
     113           42 :                     return;
     114              : 
     115           83 :                 w = q_.pop();
     116          104 :             }
     117              : 
     118           83 :             w->run();
     119           83 :         }
     120              :     }
     121              : };
     122              : 
     123              : //------------------------------------------------------------------------------
     124              : 
     125           14 : thread_pool::
     126              : ~thread_pool()
     127              : {
     128              :     // Order matters: shutdown services, then impl, then base
     129           14 :     shutdown();
     130           14 :     delete impl_;
     131           14 :     destroy();
     132           14 : }
     133              : 
     134           14 : thread_pool::
     135           14 : thread_pool(std::size_t num_threads)
     136           14 :     : impl_(new impl(num_threads))
     137              : {
     138           14 : }
     139              : 
     140              : //------------------------------------------------------------------------------
     141              : 
     142              : void
     143           83 : thread_pool::executor_type::
     144              : post(any_coro h) const
     145              : {
     146           83 :     pool_->impl_->post(h);
     147           83 : }
     148              : 
     149              : } // capy
     150              : } // boost
        

Generated by: LCOV version 2.3