66#include < condition_variable>
77#include < vector>
88#include < thread>
9- #include < type_traits>
109#include < queue>
1110#include < functional>
1211#include < mutex>
13- #include < algorithm>
14- #include < memory>
15- #include < future>
1612#include < expected>
13+ #include < future>
1714#include " Log.hpp"
1815#include " NonCopyable.h"
1916
2017namespace slark {
2118
2219struct ThreadPoolConfig {
2320 uint32_t threadCount = 4 ;
24- uint32_t maxQueueSize = 10000 ;
25- std::chrono::milliseconds timeout{100 };
2621};
2722
2823class ThreadPool : public NonCopyable {
@@ -45,16 +40,21 @@ class ThreadPool: public NonCopyable {
4540 template <class F , typename ... Args>
4641 auto submit (F&& f, Args&&... args) -> std::expected<std::future<std::invoke_result_t<F, Args...>>, bool>;
4742
43+ size_t getTaskCount () const noexcept {
44+ std::lock_guard<std::mutex> lock (mutex_);
45+ return tasks_.size ();
46+ }
47+
4848 void waitForAll () noexcept {
4949 std::unique_lock<std::mutex> lock (mutex_);
50- notFull_ .wait (lock, [this ] {
50+ allTasksDone_ .wait (lock, [this ] {
5151 return tasks_.empty () && activeThreads_ == 0 ;
5252 });
5353 }
54-
55- size_t getTaskCount () const noexcept {
54+
55+ void clear () noexcept {
5656 std::lock_guard<std::mutex> lock (mutex_);
57- return tasks_.size ();
57+ tasks_.clear ();
5858 }
5959
6060 size_t getThreadCount () const noexcept {
@@ -70,11 +70,11 @@ class ThreadPool: public NonCopyable {
7070 void workThread ();
7171
7272 std::vector<std::thread> workers_;
73- std::queue <std::function<void ()>> tasks_;
73+ std::deque <std::function<void ()>> tasks_;
7474
7575 mutable std::mutex mutex_;
7676 std::condition_variable notEmpty_;
77- std::condition_variable notFull_ ;
77+ std::condition_variable allTasksDone_ ;
7878
7979 std::atomic<bool > shutdown_{false };
8080 std::atomic<size_t > activeThreads_{0 };
@@ -106,8 +106,7 @@ inline void ThreadPool::workThread() {
106106 }
107107
108108 task = std::move (tasks_.front ());
109- tasks_.pop ();
110- notFull_.notify_one ();
109+ tasks_.pop_front ();
111110 }
112111
113112 ++activeThreads_;
@@ -117,6 +116,7 @@ inline void ThreadPool::workThread() {
117116 LogE (" error in executing task" );
118117 }
119118 --activeThreads_;
119+ allTasksDone_.notify_all ();
120120 }
121121}
122122
@@ -133,23 +133,15 @@ auto ThreadPool::submit(F&& f, Args&&... args)
133133 std::future<ReturnType> future = task->get_future ();
134134 {
135135 std::unique_lock<std::mutex> lock (mutex_);
136-
137- if (!notFull_.wait_for (lock, config_.timeout , [this ] {
138- return tasks_.size () < config_.maxQueueSize || shutdown_;
139- })) {
140- LogE (" The task queue is full" );
141- return std::unexpected (false );
142- }
143-
144136 if (shutdown_) {
145137 LogE (" The thread pool is exiting" );
146138 return std::unexpected (false );
147139 }
148- tasks_.emplace ([task]() { (*task)(); });
140+ tasks_.emplace_back ([task]() { (*task)(); });
149141 }
150142
151143 notEmpty_.notify_one ();
152144 return future;
153145}
154146
155- }
147+ }
0 commit comments