Subversion Repositories QNX 8.QNX8 LLVM/Clang compiler suite

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
14 pmbaty 1
//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2
//
3
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4
// See https://llvm.org/LICENSE.txt for license information.
5
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6
//
7
//===----------------------------------------------------------------------===//
8
//
9
// This file defines a crude C++11 based thread pool.
10
//
11
//===----------------------------------------------------------------------===//
12
 
13
#ifndef LLVM_SUPPORT_THREADPOOL_H
14
#define LLVM_SUPPORT_THREADPOOL_H
15
 
16
#include "llvm/ADT/DenseMap.h"
17
#include "llvm/Config/llvm-config.h"
18
#include "llvm/Support/RWMutex.h"
19
#include "llvm/Support/Threading.h"
20
#include "llvm/Support/thread.h"
21
 
22
#include <future>
23
 
24
#include <condition_variable>
25
#include <deque>
26
#include <functional>
27
#include <memory>
28
#include <mutex>
29
#include <utility>
30
 
31
namespace llvm {
32
 
33
class ThreadPoolTaskGroup;
34
 
35
/// A ThreadPool for asynchronous parallel execution on a defined number of
36
/// threads.
37
///
38
/// The pool keeps a vector of threads alive, waiting on a condition variable
39
/// for some work to become available.
40
///
41
/// It is possible to reuse one thread pool for different groups of tasks
42
/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
43
/// the same queue, but it is possible to wait only for a specific group of
44
/// tasks to finish.
45
///
46
/// It is also possible for worker threads to submit new tasks and wait for
47
/// them. Note that this may result in a deadlock in cases such as when a task
48
/// (directly or indirectly) tries to wait for its own completion, or when all
49
/// available threads are used up by tasks waiting for a task that has no thread
50
/// left to run on (this includes waiting on the returned future). It should be
51
/// generally safe to wait() for a group as long as groups do not form a cycle.
52
class ThreadPool {
53
public:
54
  /// Construct a pool using the hardware strategy \p S for mapping hardware
55
  /// execution resources (threads, cores, CPUs)
56
  /// Defaults to using the maximum execution resources in the system, but
57
  /// accounting for the affinity mask.
58
  ThreadPool(ThreadPoolStrategy S = hardware_concurrency());
59
 
60
  /// Blocking destructor: the pool will wait for all the threads to complete.
61
  ~ThreadPool();
62
 
63
  /// Asynchronous submission of a task to the pool. The returned future can be
64
  /// used to wait for the task to finish and is *non-blocking* on destruction.
65
  template <typename Function, typename... Args>
66
  auto async(Function &&F, Args &&...ArgList) {
67
    auto Task =
68
        std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
69
    return async(std::move(Task));
70
  }
71
 
72
  /// Overload, task will be in the given task group.
73
  template <typename Function, typename... Args>
74
  auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
75
    auto Task =
76
        std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
77
    return async(Group, std::move(Task));
78
  }
79
 
80
  /// Asynchronous submission of a task to the pool. The returned future can be
81
  /// used to wait for the task to finish and is *non-blocking* on destruction.
82
  template <typename Func>
83
  auto async(Func &&F) -> std::shared_future<decltype(F())> {
84
    return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
85
                     nullptr);
86
  }
87
 
88
  template <typename Func>
89
  auto async(ThreadPoolTaskGroup &Group, Func &&F)
90
      -> std::shared_future<decltype(F())> {
91
    return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
92
                     &Group);
93
  }
94
 
95
  /// Blocking wait for all the threads to complete and the queue to be empty.
96
  /// It is an error to try to add new tasks while blocking on this call.
97
  /// Calling wait() from a task would deadlock waiting for itself.
98
  void wait();
99
 
100
  /// Blocking wait for only all the threads in the given group to complete.
101
  /// It is possible to wait even inside a task, but waiting (directly or
102
  /// indirectly) on itself will deadlock. If called from a task running on a
103
  /// worker thread, the call may process pending tasks while waiting in order
104
  /// not to waste the thread.
105
  void wait(ThreadPoolTaskGroup &Group);
106
 
107
  // TODO: misleading legacy name warning!
108
  // Returns the maximum number of worker threads in the pool, not the current
109
  // number of threads!
110
  unsigned getThreadCount() const { return MaxThreadCount; }
111
 
112
  /// Returns true if the current thread is a worker thread of this thread pool.
113
  bool isWorkerThread() const;
114
 
115
private:
116
  /// Helpers to create a promise and a callable wrapper of \p Task that sets
117
  /// the result of the promise. Returns the callable and a future to access the
118
  /// result.
119
  template <typename ResTy>
120
  static std::pair<std::function<void()>, std::future<ResTy>>
121
  createTaskAndFuture(std::function<ResTy()> Task) {
122
    std::shared_ptr<std::promise<ResTy>> Promise =
123
        std::make_shared<std::promise<ResTy>>();
124
    auto F = Promise->get_future();
125
    return {
126
        [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
127
        std::move(F)};
128
  }
129
  static std::pair<std::function<void()>, std::future<void>>
130
  createTaskAndFuture(std::function<void()> Task) {
131
    std::shared_ptr<std::promise<void>> Promise =
132
        std::make_shared<std::promise<void>>();
133
    auto F = Promise->get_future();
134
    return {[Promise = std::move(Promise), Task]() {
135
              Task();
136
              Promise->set_value();
137
            },
138
            std::move(F)};
139
  }
140
 
141
  /// Returns true if all tasks in the given group have finished (nullptr means
142
  /// all tasks regardless of their group). QueueLock must be locked.
143
  bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
144
 
145
  /// Asynchronous submission of a task to the pool. The returned future can be
146
  /// used to wait for the task to finish and is *non-blocking* on destruction.
147
  template <typename ResTy>
148
  std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
149
                                      ThreadPoolTaskGroup *Group) {
150
 
151
#if LLVM_ENABLE_THREADS
152
    /// Wrap the Task in a std::function<void()> that sets the result of the
153
    /// corresponding future.
154
    auto R = createTaskAndFuture(Task);
155
 
156
    int requestedThreads;
157
    {
158
      // Lock the queue and push the new task
159
      std::unique_lock<std::mutex> LockGuard(QueueLock);
160
 
161
      // Don't allow enqueueing after disabling the pool
162
      assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
163
      Tasks.emplace_back(std::make_pair(std::move(R.first), Group));
164
      requestedThreads = ActiveThreads + Tasks.size();
165
    }
166
    QueueCondition.notify_one();
167
    grow(requestedThreads);
168
    return R.second.share();
169
 
170
#else // LLVM_ENABLE_THREADS Disabled
171
 
172
    // Get a Future with launch::deferred execution using std::async
173
    auto Future = std::async(std::launch::deferred, std::move(Task)).share();
174
    // Wrap the future so that both ThreadPool::wait() can operate and the
175
    // returned future can be sync'ed on.
176
    Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group));
177
    return Future;
178
#endif
179
  }
180
 
181
#if LLVM_ENABLE_THREADS
182
  // Grow to ensure that we have at least `requested` Threads, but do not go
183
  // over MaxThreadCount.
184
  void grow(int requested);
185
 
186
  void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
187
#endif
188
 
189
  /// Threads in flight
190
  std::vector<llvm::thread> Threads;
191
  /// Lock protecting access to the Threads vector.
192
  mutable llvm::sys::RWMutex ThreadsLock;
193
 
194
  /// Tasks waiting for execution in the pool.
195
  std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
196
 
197
  /// Locking and signaling for accessing the Tasks queue.
198
  std::mutex QueueLock;
199
  std::condition_variable QueueCondition;
200
 
201
  /// Signaling for job completion (all tasks or all tasks in a group).
202
  std::condition_variable CompletionCondition;
203
 
204
  /// Keep track of the number of thread actually busy
205
  unsigned ActiveThreads = 0;
206
  /// Number of threads active for tasks in the given group (only non-zero).
207
  DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
208
 
209
#if LLVM_ENABLE_THREADS // avoids warning for unused variable
210
  /// Signal for the destruction of the pool, asking thread to exit.
211
  bool EnableFlag = true;
212
#endif
213
 
214
  const ThreadPoolStrategy Strategy;
215
 
216
  /// Maximum number of threads to potentially grow this pool to.
217
  const unsigned MaxThreadCount;
218
};
219
 
220
/// A group of tasks to be run on a thread pool. Thread pool tasks in different
221
/// groups can run on the same threadpool but can be waited for separately.
222
/// It is even possible for tasks of one group to submit and wait for tasks
223
/// of another group, as long as this does not form a loop.
224
class ThreadPoolTaskGroup {
225
public:
226
  /// The ThreadPool argument is the thread pool to forward calls to.
227
  ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {}
228
 
229
  /// Blocking destructor: will wait for all the tasks in the group to complete
230
  /// by calling ThreadPool::wait().
231
  ~ThreadPoolTaskGroup() { wait(); }
232
 
233
  /// Calls ThreadPool::async() for this group.
234
  template <typename Function, typename... Args>
235
  inline auto async(Function &&F, Args &&...ArgList) {
236
    return Pool.async(*this, std::forward<Function>(F),
237
                      std::forward<Args>(ArgList)...);
238
  }
239
 
240
  /// Calls ThreadPool::wait() for this group.
241
  void wait() { Pool.wait(*this); }
242
 
243
private:
244
  ThreadPool &Pool;
245
};
246
 
247
} // namespace llvm
248
 
249
#endif // LLVM_SUPPORT_THREADPOOL_H