Details | Last modification | View Log | RSS feed
Rev | Author | Line No. | Line |
---|---|---|---|
14 | pmbaty | 1 | //===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===// |
2 | // |
||
3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
||
4 | // See https://llvm.org/LICENSE.txt for license information. |
||
5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
||
6 | // |
||
7 | //===----------------------------------------------------------------------===// |
||
8 | // |
||
9 | // This file defines a crude C++11 based thread pool. |
||
10 | // |
||
11 | //===----------------------------------------------------------------------===// |
||
12 | |||
13 | #ifndef LLVM_SUPPORT_THREADPOOL_H |
||
14 | #define LLVM_SUPPORT_THREADPOOL_H |
||
15 | |||
16 | #include "llvm/ADT/DenseMap.h" |
||
17 | #include "llvm/Config/llvm-config.h" |
||
18 | #include "llvm/Support/RWMutex.h" |
||
19 | #include "llvm/Support/Threading.h" |
||
20 | #include "llvm/Support/thread.h" |
||
21 | |||
22 | #include <future> |
||
23 | |||
24 | #include <condition_variable> |
||
25 | #include <deque> |
||
26 | #include <functional> |
||
27 | #include <memory> |
||
28 | #include <mutex> |
||
29 | #include <utility> |
||
30 | |||
31 | namespace llvm { |
||
32 | |||
33 | class ThreadPoolTaskGroup; |
||
34 | |||
35 | /// A ThreadPool for asynchronous parallel execution on a defined number of |
||
36 | /// threads. |
||
37 | /// |
||
38 | /// The pool keeps a vector of threads alive, waiting on a condition variable |
||
39 | /// for some work to become available. |
||
40 | /// |
||
41 | /// It is possible to reuse one thread pool for different groups of tasks |
||
42 | /// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using |
||
43 | /// the same queue, but it is possible to wait only for a specific group of |
||
44 | /// tasks to finish. |
||
45 | /// |
||
46 | /// It is also possible for worker threads to submit new tasks and wait for |
||
47 | /// them. Note that this may result in a deadlock in cases such as when a task |
||
48 | /// (directly or indirectly) tries to wait for its own completion, or when all |
||
49 | /// available threads are used up by tasks waiting for a task that has no thread |
||
50 | /// left to run on (this includes waiting on the returned future). It should be |
||
51 | /// generally safe to wait() for a group as long as groups do not form a cycle. |
||
52 | class ThreadPool { |
||
53 | public: |
||
54 | /// Construct a pool using the hardware strategy \p S for mapping hardware |
||
55 | /// execution resources (threads, cores, CPUs) |
||
56 | /// Defaults to using the maximum execution resources in the system, but |
||
57 | /// accounting for the affinity mask. |
||
58 | ThreadPool(ThreadPoolStrategy S = hardware_concurrency()); |
||
59 | |||
60 | /// Blocking destructor: the pool will wait for all the threads to complete. |
||
61 | ~ThreadPool(); |
||
62 | |||
63 | /// Asynchronous submission of a task to the pool. The returned future can be |
||
64 | /// used to wait for the task to finish and is *non-blocking* on destruction. |
||
65 | template <typename Function, typename... Args> |
||
66 | auto async(Function &&F, Args &&...ArgList) { |
||
67 | auto Task = |
||
68 | std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...); |
||
69 | return async(std::move(Task)); |
||
70 | } |
||
71 | |||
72 | /// Overload, task will be in the given task group. |
||
73 | template <typename Function, typename... Args> |
||
74 | auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) { |
||
75 | auto Task = |
||
76 | std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...); |
||
77 | return async(Group, std::move(Task)); |
||
78 | } |
||
79 | |||
80 | /// Asynchronous submission of a task to the pool. The returned future can be |
||
81 | /// used to wait for the task to finish and is *non-blocking* on destruction. |
||
82 | template <typename Func> |
||
83 | auto async(Func &&F) -> std::shared_future<decltype(F())> { |
||
84 | return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)), |
||
85 | nullptr); |
||
86 | } |
||
87 | |||
88 | template <typename Func> |
||
89 | auto async(ThreadPoolTaskGroup &Group, Func &&F) |
||
90 | -> std::shared_future<decltype(F())> { |
||
91 | return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)), |
||
92 | &Group); |
||
93 | } |
||
94 | |||
95 | /// Blocking wait for all the threads to complete and the queue to be empty. |
||
96 | /// It is an error to try to add new tasks while blocking on this call. |
||
97 | /// Calling wait() from a task would deadlock waiting for itself. |
||
98 | void wait(); |
||
99 | |||
100 | /// Blocking wait for only all the threads in the given group to complete. |
||
101 | /// It is possible to wait even inside a task, but waiting (directly or |
||
102 | /// indirectly) on itself will deadlock. If called from a task running on a |
||
103 | /// worker thread, the call may process pending tasks while waiting in order |
||
104 | /// not to waste the thread. |
||
105 | void wait(ThreadPoolTaskGroup &Group); |
||
106 | |||
107 | // TODO: misleading legacy name warning! |
||
108 | // Returns the maximum number of worker threads in the pool, not the current |
||
109 | // number of threads! |
||
110 | unsigned getThreadCount() const { return MaxThreadCount; } |
||
111 | |||
112 | /// Returns true if the current thread is a worker thread of this thread pool. |
||
113 | bool isWorkerThread() const; |
||
114 | |||
115 | private: |
||
116 | /// Helpers to create a promise and a callable wrapper of \p Task that sets |
||
117 | /// the result of the promise. Returns the callable and a future to access the |
||
118 | /// result. |
||
119 | template <typename ResTy> |
||
120 | static std::pair<std::function<void()>, std::future<ResTy>> |
||
121 | createTaskAndFuture(std::function<ResTy()> Task) { |
||
122 | std::shared_ptr<std::promise<ResTy>> Promise = |
||
123 | std::make_shared<std::promise<ResTy>>(); |
||
124 | auto F = Promise->get_future(); |
||
125 | return { |
||
126 | [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); }, |
||
127 | std::move(F)}; |
||
128 | } |
||
129 | static std::pair<std::function<void()>, std::future<void>> |
||
130 | createTaskAndFuture(std::function<void()> Task) { |
||
131 | std::shared_ptr<std::promise<void>> Promise = |
||
132 | std::make_shared<std::promise<void>>(); |
||
133 | auto F = Promise->get_future(); |
||
134 | return {[Promise = std::move(Promise), Task]() { |
||
135 | Task(); |
||
136 | Promise->set_value(); |
||
137 | }, |
||
138 | std::move(F)}; |
||
139 | } |
||
140 | |||
141 | /// Returns true if all tasks in the given group have finished (nullptr means |
||
142 | /// all tasks regardless of their group). QueueLock must be locked. |
||
143 | bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const; |
||
144 | |||
145 | /// Asynchronous submission of a task to the pool. The returned future can be |
||
146 | /// used to wait for the task to finish and is *non-blocking* on destruction. |
||
147 | template <typename ResTy> |
||
148 | std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task, |
||
149 | ThreadPoolTaskGroup *Group) { |
||
150 | |||
151 | #if LLVM_ENABLE_THREADS |
||
152 | /// Wrap the Task in a std::function<void()> that sets the result of the |
||
153 | /// corresponding future. |
||
154 | auto R = createTaskAndFuture(Task); |
||
155 | |||
156 | int requestedThreads; |
||
157 | { |
||
158 | // Lock the queue and push the new task |
||
159 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
||
160 | |||
161 | // Don't allow enqueueing after disabling the pool |
||
162 | assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); |
||
163 | Tasks.emplace_back(std::make_pair(std::move(R.first), Group)); |
||
164 | requestedThreads = ActiveThreads + Tasks.size(); |
||
165 | } |
||
166 | QueueCondition.notify_one(); |
||
167 | grow(requestedThreads); |
||
168 | return R.second.share(); |
||
169 | |||
170 | #else // LLVM_ENABLE_THREADS Disabled |
||
171 | |||
172 | // Get a Future with launch::deferred execution using std::async |
||
173 | auto Future = std::async(std::launch::deferred, std::move(Task)).share(); |
||
174 | // Wrap the future so that both ThreadPool::wait() can operate and the |
||
175 | // returned future can be sync'ed on. |
||
176 | Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group)); |
||
177 | return Future; |
||
178 | #endif |
||
179 | } |
||
180 | |||
181 | #if LLVM_ENABLE_THREADS |
||
182 | // Grow to ensure that we have at least `requested` Threads, but do not go |
||
183 | // over MaxThreadCount. |
||
184 | void grow(int requested); |
||
185 | |||
186 | void processTasks(ThreadPoolTaskGroup *WaitingForGroup); |
||
187 | #endif |
||
188 | |||
189 | /// Threads in flight |
||
190 | std::vector<llvm::thread> Threads; |
||
191 | /// Lock protecting access to the Threads vector. |
||
192 | mutable llvm::sys::RWMutex ThreadsLock; |
||
193 | |||
194 | /// Tasks waiting for execution in the pool. |
||
195 | std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks; |
||
196 | |||
197 | /// Locking and signaling for accessing the Tasks queue. |
||
198 | std::mutex QueueLock; |
||
199 | std::condition_variable QueueCondition; |
||
200 | |||
201 | /// Signaling for job completion (all tasks or all tasks in a group). |
||
202 | std::condition_variable CompletionCondition; |
||
203 | |||
204 | /// Keep track of the number of thread actually busy |
||
205 | unsigned ActiveThreads = 0; |
||
206 | /// Number of threads active for tasks in the given group (only non-zero). |
||
207 | DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups; |
||
208 | |||
209 | #if LLVM_ENABLE_THREADS // avoids warning for unused variable |
||
210 | /// Signal for the destruction of the pool, asking thread to exit. |
||
211 | bool EnableFlag = true; |
||
212 | #endif |
||
213 | |||
214 | const ThreadPoolStrategy Strategy; |
||
215 | |||
216 | /// Maximum number of threads to potentially grow this pool to. |
||
217 | const unsigned MaxThreadCount; |
||
218 | }; |
||
219 | |||
220 | /// A group of tasks to be run on a thread pool. Thread pool tasks in different |
||
221 | /// groups can run on the same threadpool but can be waited for separately. |
||
222 | /// It is even possible for tasks of one group to submit and wait for tasks |
||
223 | /// of another group, as long as this does not form a loop. |
||
224 | class ThreadPoolTaskGroup { |
||
225 | public: |
||
226 | /// The ThreadPool argument is the thread pool to forward calls to. |
||
227 | ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {} |
||
228 | |||
229 | /// Blocking destructor: will wait for all the tasks in the group to complete |
||
230 | /// by calling ThreadPool::wait(). |
||
231 | ~ThreadPoolTaskGroup() { wait(); } |
||
232 | |||
233 | /// Calls ThreadPool::async() for this group. |
||
234 | template <typename Function, typename... Args> |
||
235 | inline auto async(Function &&F, Args &&...ArgList) { |
||
236 | return Pool.async(*this, std::forward<Function>(F), |
||
237 | std::forward<Args>(ArgList)...); |
||
238 | } |
||
239 | |||
240 | /// Calls ThreadPool::wait() for this group. |
||
241 | void wait() { Pool.wait(*this); } |
||
242 | |||
243 | private: |
||
244 | ThreadPool &Pool; |
||
245 | }; |
||
246 | |||
247 | } // namespace llvm |
||
248 | |||
249 | #endif // LLVM_SUPPORT_THREADPOOL_H |