Details | Last modification | View Log | RSS feed
| Rev | Author | Line No. | Line |
|---|---|---|---|
| 14 | pmbaty | 1 | //===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===// |
| 2 | // |
||
| 3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
||
| 4 | // See https://llvm.org/LICENSE.txt for license information. |
||
| 5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
||
| 6 | // |
||
| 7 | //===----------------------------------------------------------------------===// |
||
| 8 | // |
||
| 9 | // This file defines a crude C++11 based thread pool. |
||
| 10 | // |
||
| 11 | //===----------------------------------------------------------------------===// |
||
| 12 | |||
| 13 | #ifndef LLVM_SUPPORT_THREADPOOL_H |
||
| 14 | #define LLVM_SUPPORT_THREADPOOL_H |
||
| 15 | |||
| 16 | #include "llvm/ADT/DenseMap.h" |
||
| 17 | #include "llvm/Config/llvm-config.h" |
||
| 18 | #include "llvm/Support/RWMutex.h" |
||
| 19 | #include "llvm/Support/Threading.h" |
||
| 20 | #include "llvm/Support/thread.h" |
||
| 21 | |||
| 22 | #include <future> |
||
| 23 | |||
| 24 | #include <condition_variable> |
||
| 25 | #include <deque> |
||
| 26 | #include <functional> |
||
| 27 | #include <memory> |
||
| 28 | #include <mutex> |
||
| 29 | #include <utility> |
||
| 30 | |||
| 31 | namespace llvm { |
||
| 32 | |||
| 33 | class ThreadPoolTaskGroup; |
||
| 34 | |||
| 35 | /// A ThreadPool for asynchronous parallel execution on a defined number of |
||
| 36 | /// threads. |
||
| 37 | /// |
||
| 38 | /// The pool keeps a vector of threads alive, waiting on a condition variable |
||
| 39 | /// for some work to become available. |
||
| 40 | /// |
||
| 41 | /// It is possible to reuse one thread pool for different groups of tasks |
||
| 42 | /// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using |
||
| 43 | /// the same queue, but it is possible to wait only for a specific group of |
||
| 44 | /// tasks to finish. |
||
| 45 | /// |
||
| 46 | /// It is also possible for worker threads to submit new tasks and wait for |
||
| 47 | /// them. Note that this may result in a deadlock in cases such as when a task |
||
| 48 | /// (directly or indirectly) tries to wait for its own completion, or when all |
||
| 49 | /// available threads are used up by tasks waiting for a task that has no thread |
||
| 50 | /// left to run on (this includes waiting on the returned future). It should be |
||
| 51 | /// generally safe to wait() for a group as long as groups do not form a cycle. |
||
| 52 | class ThreadPool { |
||
| 53 | public: |
||
| 54 | /// Construct a pool using the hardware strategy \p S for mapping hardware |
||
| 55 | /// execution resources (threads, cores, CPUs) |
||
| 56 | /// Defaults to using the maximum execution resources in the system, but |
||
| 57 | /// accounting for the affinity mask. |
||
| 58 | ThreadPool(ThreadPoolStrategy S = hardware_concurrency()); |
||
| 59 | |||
| 60 | /// Blocking destructor: the pool will wait for all the threads to complete. |
||
| 61 | ~ThreadPool(); |
||
| 62 | |||
| 63 | /// Asynchronous submission of a task to the pool. The returned future can be |
||
| 64 | /// used to wait for the task to finish and is *non-blocking* on destruction. |
||
| 65 | template <typename Function, typename... Args> |
||
| 66 | auto async(Function &&F, Args &&...ArgList) { |
||
| 67 | auto Task = |
||
| 68 | std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...); |
||
| 69 | return async(std::move(Task)); |
||
| 70 | } |
||
| 71 | |||
| 72 | /// Overload, task will be in the given task group. |
||
| 73 | template <typename Function, typename... Args> |
||
| 74 | auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) { |
||
| 75 | auto Task = |
||
| 76 | std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...); |
||
| 77 | return async(Group, std::move(Task)); |
||
| 78 | } |
||
| 79 | |||
| 80 | /// Asynchronous submission of a task to the pool. The returned future can be |
||
| 81 | /// used to wait for the task to finish and is *non-blocking* on destruction. |
||
| 82 | template <typename Func> |
||
| 83 | auto async(Func &&F) -> std::shared_future<decltype(F())> { |
||
| 84 | return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)), |
||
| 85 | nullptr); |
||
| 86 | } |
||
| 87 | |||
| 88 | template <typename Func> |
||
| 89 | auto async(ThreadPoolTaskGroup &Group, Func &&F) |
||
| 90 | -> std::shared_future<decltype(F())> { |
||
| 91 | return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)), |
||
| 92 | &Group); |
||
| 93 | } |
||
| 94 | |||
| 95 | /// Blocking wait for all the threads to complete and the queue to be empty. |
||
| 96 | /// It is an error to try to add new tasks while blocking on this call. |
||
| 97 | /// Calling wait() from a task would deadlock waiting for itself. |
||
| 98 | void wait(); |
||
| 99 | |||
| 100 | /// Blocking wait for only all the threads in the given group to complete. |
||
| 101 | /// It is possible to wait even inside a task, but waiting (directly or |
||
| 102 | /// indirectly) on itself will deadlock. If called from a task running on a |
||
| 103 | /// worker thread, the call may process pending tasks while waiting in order |
||
| 104 | /// not to waste the thread. |
||
| 105 | void wait(ThreadPoolTaskGroup &Group); |
||
| 106 | |||
| 107 | // TODO: misleading legacy name warning! |
||
| 108 | // Returns the maximum number of worker threads in the pool, not the current |
||
| 109 | // number of threads! |
||
| 110 | unsigned getThreadCount() const { return MaxThreadCount; } |
||
| 111 | |||
| 112 | /// Returns true if the current thread is a worker thread of this thread pool. |
||
| 113 | bool isWorkerThread() const; |
||
| 114 | |||
| 115 | private: |
||
| 116 | /// Helpers to create a promise and a callable wrapper of \p Task that sets |
||
| 117 | /// the result of the promise. Returns the callable and a future to access the |
||
| 118 | /// result. |
||
| 119 | template <typename ResTy> |
||
| 120 | static std::pair<std::function<void()>, std::future<ResTy>> |
||
| 121 | createTaskAndFuture(std::function<ResTy()> Task) { |
||
| 122 | std::shared_ptr<std::promise<ResTy>> Promise = |
||
| 123 | std::make_shared<std::promise<ResTy>>(); |
||
| 124 | auto F = Promise->get_future(); |
||
| 125 | return { |
||
| 126 | [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); }, |
||
| 127 | std::move(F)}; |
||
| 128 | } |
||
| 129 | static std::pair<std::function<void()>, std::future<void>> |
||
| 130 | createTaskAndFuture(std::function<void()> Task) { |
||
| 131 | std::shared_ptr<std::promise<void>> Promise = |
||
| 132 | std::make_shared<std::promise<void>>(); |
||
| 133 | auto F = Promise->get_future(); |
||
| 134 | return {[Promise = std::move(Promise), Task]() { |
||
| 135 | Task(); |
||
| 136 | Promise->set_value(); |
||
| 137 | }, |
||
| 138 | std::move(F)}; |
||
| 139 | } |
||
| 140 | |||
| 141 | /// Returns true if all tasks in the given group have finished (nullptr means |
||
| 142 | /// all tasks regardless of their group). QueueLock must be locked. |
||
| 143 | bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const; |
||
| 144 | |||
| 145 | /// Asynchronous submission of a task to the pool. The returned future can be |
||
| 146 | /// used to wait for the task to finish and is *non-blocking* on destruction. |
||
| 147 | template <typename ResTy> |
||
| 148 | std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task, |
||
| 149 | ThreadPoolTaskGroup *Group) { |
||
| 150 | |||
| 151 | #if LLVM_ENABLE_THREADS |
||
| 152 | /// Wrap the Task in a std::function<void()> that sets the result of the |
||
| 153 | /// corresponding future. |
||
| 154 | auto R = createTaskAndFuture(Task); |
||
| 155 | |||
| 156 | int requestedThreads; |
||
| 157 | { |
||
| 158 | // Lock the queue and push the new task |
||
| 159 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
||
| 160 | |||
| 161 | // Don't allow enqueueing after disabling the pool |
||
| 162 | assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); |
||
| 163 | Tasks.emplace_back(std::make_pair(std::move(R.first), Group)); |
||
| 164 | requestedThreads = ActiveThreads + Tasks.size(); |
||
| 165 | } |
||
| 166 | QueueCondition.notify_one(); |
||
| 167 | grow(requestedThreads); |
||
| 168 | return R.second.share(); |
||
| 169 | |||
| 170 | #else // LLVM_ENABLE_THREADS Disabled |
||
| 171 | |||
| 172 | // Get a Future with launch::deferred execution using std::async |
||
| 173 | auto Future = std::async(std::launch::deferred, std::move(Task)).share(); |
||
| 174 | // Wrap the future so that both ThreadPool::wait() can operate and the |
||
| 175 | // returned future can be sync'ed on. |
||
| 176 | Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group)); |
||
| 177 | return Future; |
||
| 178 | #endif |
||
| 179 | } |
||
| 180 | |||
| 181 | #if LLVM_ENABLE_THREADS |
||
| 182 | // Grow to ensure that we have at least `requested` Threads, but do not go |
||
| 183 | // over MaxThreadCount. |
||
| 184 | void grow(int requested); |
||
| 185 | |||
| 186 | void processTasks(ThreadPoolTaskGroup *WaitingForGroup); |
||
| 187 | #endif |
||
| 188 | |||
| 189 | /// Threads in flight |
||
| 190 | std::vector<llvm::thread> Threads; |
||
| 191 | /// Lock protecting access to the Threads vector. |
||
| 192 | mutable llvm::sys::RWMutex ThreadsLock; |
||
| 193 | |||
| 194 | /// Tasks waiting for execution in the pool. |
||
| 195 | std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks; |
||
| 196 | |||
| 197 | /// Locking and signaling for accessing the Tasks queue. |
||
| 198 | std::mutex QueueLock; |
||
| 199 | std::condition_variable QueueCondition; |
||
| 200 | |||
| 201 | /// Signaling for job completion (all tasks or all tasks in a group). |
||
| 202 | std::condition_variable CompletionCondition; |
||
| 203 | |||
| 204 | /// Keep track of the number of thread actually busy |
||
| 205 | unsigned ActiveThreads = 0; |
||
| 206 | /// Number of threads active for tasks in the given group (only non-zero). |
||
| 207 | DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups; |
||
| 208 | |||
| 209 | #if LLVM_ENABLE_THREADS // avoids warning for unused variable |
||
| 210 | /// Signal for the destruction of the pool, asking thread to exit. |
||
| 211 | bool EnableFlag = true; |
||
| 212 | #endif |
||
| 213 | |||
| 214 | const ThreadPoolStrategy Strategy; |
||
| 215 | |||
| 216 | /// Maximum number of threads to potentially grow this pool to. |
||
| 217 | const unsigned MaxThreadCount; |
||
| 218 | }; |
||
| 219 | |||
| 220 | /// A group of tasks to be run on a thread pool. Thread pool tasks in different |
||
| 221 | /// groups can run on the same threadpool but can be waited for separately. |
||
| 222 | /// It is even possible for tasks of one group to submit and wait for tasks |
||
| 223 | /// of another group, as long as this does not form a loop. |
||
| 224 | class ThreadPoolTaskGroup { |
||
| 225 | public: |
||
| 226 | /// The ThreadPool argument is the thread pool to forward calls to. |
||
| 227 | ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {} |
||
| 228 | |||
| 229 | /// Blocking destructor: will wait for all the tasks in the group to complete |
||
| 230 | /// by calling ThreadPool::wait(). |
||
| 231 | ~ThreadPoolTaskGroup() { wait(); } |
||
| 232 | |||
| 233 | /// Calls ThreadPool::async() for this group. |
||
| 234 | template <typename Function, typename... Args> |
||
| 235 | inline auto async(Function &&F, Args &&...ArgList) { |
||
| 236 | return Pool.async(*this, std::forward<Function>(F), |
||
| 237 | std::forward<Args>(ArgList)...); |
||
| 238 | } |
||
| 239 | |||
| 240 | /// Calls ThreadPool::wait() for this group. |
||
| 241 | void wait() { Pool.wait(*this); } |
||
| 242 | |||
| 243 | private: |
||
| 244 | ThreadPool &Pool; |
||
| 245 | }; |
||
| 246 | |||
| 247 | } // namespace llvm |
||
| 248 | |||
| 249 | #endif // LLVM_SUPPORT_THREADPOOL_H |