aboutsummaryrefslogtreecommitdiff
path: root/gitstatus/src/thread_pool.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gitstatus/src/thread_pool.cc')
-rw-r--r--gitstatus/src/thread_pool.cc87
1 files changed, 87 insertions, 0 deletions
diff --git a/gitstatus/src/thread_pool.cc b/gitstatus/src/thread_pool.cc
new file mode 100644
index 00000000..b37eb203
--- /dev/null
+++ b/gitstatus/src/thread_pool.cc
@@ -0,0 +1,87 @@
+#include "thread_pool.h"
+
+#include <cassert>
+#include <utility>
+
+#include "check.h"
+#include "logging.h"
+
+namespace gitstatus {
+
+ThreadPool::ThreadPool(size_t num_threads) : num_inflight_(num_threads) {
+ for (size_t i = 0; i != num_threads; ++i) {
+ threads_.emplace_back([=]() { Loop(i + 1); });
+ }
+}
+
+ThreadPool::~ThreadPool() {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ exit_ = true;
+ }
+ cv_.notify_all();
+ sleeper_cv_.notify_one();
+ for (std::thread& t : threads_) t.join();
+}
+
+void ThreadPool::Schedule(Time t, std::function<void()> f) {
+ std::condition_variable* wake = nullptr;
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ work_.push(Work{std::move(t), ++last_idx_, std::move(f)});
+ if (work_.top().idx == last_idx_) wake = have_sleeper_ ? &sleeper_cv_ : &cv_;
+ }
+ if (wake) wake->notify_one();
+}
+
+void ThreadPool::Loop(size_t tid) {
+ auto Next = [&]() -> std::function<void()> {
+ std::unique_lock<std::mutex> lock(mutex_);
+ --num_inflight_;
+ if (work_.empty() && num_inflight_ == 0) idle_cv_.notify_all();
+ while (true) {
+ if (exit_) return nullptr;
+ if (work_.empty()) {
+ cv_.wait(lock);
+ continue;
+ }
+ Time now = Clock::now();
+ const Work& top = work_.top();
+ if (top.t <= now) {
+ std::function<void()> res = std::move(top.f);
+ work_.pop();
+ ++num_inflight_;
+ bool notify = !work_.empty() && !have_sleeper_;
+ lock.unlock();
+ if (notify) cv_.notify_one();
+ return res;
+ }
+ if (have_sleeper_) {
+ cv_.wait(lock);
+ continue;
+ }
+ have_sleeper_ = true;
+ sleeper_cv_.wait_until(lock, top.t);
+ assert(have_sleeper_);
+ have_sleeper_ = false;
+ }
+ };
+ while (std::function<void()> f = Next()) f();
+}
+
+void ThreadPool::Wait() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ idle_cv_.wait(lock, [&] { return work_.empty() && num_inflight_ == 0; });
+}
+
+static ThreadPool* g_thread_pool = nullptr;
+
+void InitGlobalThreadPool(size_t num_threads) {
+ CHECK(!g_thread_pool);
+ LOG(INFO) << "Spawning " << num_threads << " thread(s)";
+ g_thread_pool = new ThreadPool(num_threads);
+}
+
+ThreadPool* GlobalThreadPool() { return g_thread_pool; }
+
+} // namespace gitstatus