diff options
author | romkatv <roman.perepelitsa@gmail.com> | 2020-05-10 16:58:05 +0300 |
---|---|---|
committer | romkatv <roman.perepelitsa@gmail.com> | 2020-05-10 16:58:05 +0300 |
commit | 97fac973afa021ae3ef49e0feae203fd09b231e1 (patch) | |
tree | 5c1ba4f09905cc53fdfc75d3668a876d3e14a447 /gitstatus/src/thread_pool.h | |
parent | c159f3aaefe13724421655d06df990b2ddf23e59 (diff) | |
parent | 1531d6e5439daae01627b2645684876b75eaf5eb (diff) |
Merge commit '1531d6e5439daae01627b2645684876b75eaf5eb' as 'gitstatus'
Diffstat (limited to 'gitstatus/src/thread_pool.h')
-rw-r--r-- | gitstatus/src/thread_pool.h | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/gitstatus/src/thread_pool.h b/gitstatus/src/thread_pool.h new file mode 100644 index 00000000..1e39b915 --- /dev/null +++ b/gitstatus/src/thread_pool.h @@ -0,0 +1,74 @@ +#ifndef ROMKATV_GITSTATUS_THREAD_POOL_H_ +#define ROMKATV_GITSTATUS_THREAD_POOL_H_ + +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <functional> +#include <mutex> +#include <queue> +#include <thread> +#include <tuple> +#include <utility> + +#include "time.h" + +namespace gitstatus { + +class ThreadPool { + public: + explicit ThreadPool(size_t num_threads); + ThreadPool(ThreadPool&&) = delete; + + // Waits for the currently running functions to finish. + // Does NOT wait for the queue of functions to drain. + // If you want the latter, call Wait() manually. + ~ThreadPool(); + + // Runs `f` on one of the threads at or after time `t`. Can be called + // from any thread. Can be called concurrently. + // + // Does not block. + void Schedule(Time t, std::function<void()> f); + + void Schedule(std::function<void()> f) { Schedule(Clock::now(), std::move(f)); } + + // Blocks until the work queue is empty and there are no currently + // running functions. + void Wait(); + + size_t num_threads() const { return threads_.size(); } + + private: + struct Work { + bool operator<(const Work& w) const { return std::tie(w.t, w.idx) < std::tie(t, idx); } + Time t; + int64_t idx; + mutable std::function<void()> f; + }; + + void Loop(size_t tid); + + int64_t last_idx_ = 0; + int64_t num_inflight_; + bool exit_ = false; + // Do we have a thread waiting on sleeper_cv_? + bool have_sleeper_ = false; + std::mutex mutex_; + // Any number of threads can wait on this condvar. Always without a timeout. + std::condition_variable cv_; + // At most one thread can wait on this condvar at a time. Always with a timeout. + std::condition_variable sleeper_cv_; + // Signalled when the work queue is empty and there is nothing inflight. + std::condition_variable idle_cv_; + std::priority_queue<Work> work_; + std::vector<std::thread> threads_; +}; + +void InitGlobalThreadPool(size_t num_threads); + +ThreadPool* GlobalThreadPool(); + +} // namespace gitstatus + +#endif // ROMKATV_GITSTATUS_THREAD_POOL_H_ |