Subversion Repositories QNX 8.QNX8 LLVM/Clang compiler suite

Rev

Blame | Last modification | View Log | Download | RSS feed

  1. //===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
  2. //
  3. // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
  4. // See https://llvm.org/LICENSE.txt for license information.
  5. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  6. //
  7. //===----------------------------------------------------------------------===//
  8. //
  9. // This file defines a crude C++11 based thread pool.
  10. //
  11. //===----------------------------------------------------------------------===//
  12.  
  13. #ifndef LLVM_SUPPORT_THREADPOOL_H
  14. #define LLVM_SUPPORT_THREADPOOL_H
  15.  
  16. #include "llvm/ADT/DenseMap.h"
  17. #include "llvm/Config/llvm-config.h"
  18. #include "llvm/Support/RWMutex.h"
  19. #include "llvm/Support/Threading.h"
  20. #include "llvm/Support/thread.h"
  21.  
  22. #include <future>
  23.  
  24. #include <condition_variable>
  25. #include <deque>
  26. #include <functional>
  27. #include <memory>
  28. #include <mutex>
  29. #include <utility>
  30.  
  31. namespace llvm {
  32.  
  33. class ThreadPoolTaskGroup;
  34.  
  35. /// A ThreadPool for asynchronous parallel execution on a defined number of
  36. /// threads.
  37. ///
  38. /// The pool keeps a vector of threads alive, waiting on a condition variable
  39. /// for some work to become available.
  40. ///
  41. /// It is possible to reuse one thread pool for different groups of tasks
  42. /// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
  43. /// the same queue, but it is possible to wait only for a specific group of
  44. /// tasks to finish.
  45. ///
  46. /// It is also possible for worker threads to submit new tasks and wait for
  47. /// them. Note that this may result in a deadlock in cases such as when a task
  48. /// (directly or indirectly) tries to wait for its own completion, or when all
  49. /// available threads are used up by tasks waiting for a task that has no thread
  50. /// left to run on (this includes waiting on the returned future). It should be
  51. /// generally safe to wait() for a group as long as groups do not form a cycle.
  52. class ThreadPool {
  53. public:
  54.   /// Construct a pool using the hardware strategy \p S for mapping hardware
  55.   /// execution resources (threads, cores, CPUs)
  56.   /// Defaults to using the maximum execution resources in the system, but
  57.   /// accounting for the affinity mask.
  58.   ThreadPool(ThreadPoolStrategy S = hardware_concurrency());
  59.  
  60.   /// Blocking destructor: the pool will wait for all the threads to complete.
  61.   ~ThreadPool();
  62.  
  63.   /// Asynchronous submission of a task to the pool. The returned future can be
  64.   /// used to wait for the task to finish and is *non-blocking* on destruction.
  65.   template <typename Function, typename... Args>
  66.   auto async(Function &&F, Args &&...ArgList) {
  67.     auto Task =
  68.         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
  69.     return async(std::move(Task));
  70.   }
  71.  
  72.   /// Overload, task will be in the given task group.
  73.   template <typename Function, typename... Args>
  74.   auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
  75.     auto Task =
  76.         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
  77.     return async(Group, std::move(Task));
  78.   }
  79.  
  80.   /// Asynchronous submission of a task to the pool. The returned future can be
  81.   /// used to wait for the task to finish and is *non-blocking* on destruction.
  82.   template <typename Func>
  83.   auto async(Func &&F) -> std::shared_future<decltype(F())> {
  84.     return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
  85.                      nullptr);
  86.   }
  87.  
  88.   template <typename Func>
  89.   auto async(ThreadPoolTaskGroup &Group, Func &&F)
  90.       -> std::shared_future<decltype(F())> {
  91.     return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
  92.                      &Group);
  93.   }
  94.  
  95.   /// Blocking wait for all the threads to complete and the queue to be empty.
  96.   /// It is an error to try to add new tasks while blocking on this call.
  97.   /// Calling wait() from a task would deadlock waiting for itself.
  98.   void wait();
  99.  
  100.   /// Blocking wait for only all the threads in the given group to complete.
  101.   /// It is possible to wait even inside a task, but waiting (directly or
  102.   /// indirectly) on itself will deadlock. If called from a task running on a
  103.   /// worker thread, the call may process pending tasks while waiting in order
  104.   /// not to waste the thread.
  105.   void wait(ThreadPoolTaskGroup &Group);
  106.  
  107.   // TODO: misleading legacy name warning!
  108.   // Returns the maximum number of worker threads in the pool, not the current
  109.   // number of threads!
  110.   unsigned getThreadCount() const { return MaxThreadCount; }
  111.  
  112.   /// Returns true if the current thread is a worker thread of this thread pool.
  113.   bool isWorkerThread() const;
  114.  
  115. private:
  116.   /// Helpers to create a promise and a callable wrapper of \p Task that sets
  117.   /// the result of the promise. Returns the callable and a future to access the
  118.   /// result.
  119.   template <typename ResTy>
  120.   static std::pair<std::function<void()>, std::future<ResTy>>
  121.   createTaskAndFuture(std::function<ResTy()> Task) {
  122.     std::shared_ptr<std::promise<ResTy>> Promise =
  123.         std::make_shared<std::promise<ResTy>>();
  124.     auto F = Promise->get_future();
  125.     return {
  126.         [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
  127.         std::move(F)};
  128.   }
  129.   static std::pair<std::function<void()>, std::future<void>>
  130.   createTaskAndFuture(std::function<void()> Task) {
  131.     std::shared_ptr<std::promise<void>> Promise =
  132.         std::make_shared<std::promise<void>>();
  133.     auto F = Promise->get_future();
  134.     return {[Promise = std::move(Promise), Task]() {
  135.               Task();
  136.               Promise->set_value();
  137.             },
  138.             std::move(F)};
  139.   }
  140.  
  141.   /// Returns true if all tasks in the given group have finished (nullptr means
  142.   /// all tasks regardless of their group). QueueLock must be locked.
  143.   bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
  144.  
  145.   /// Asynchronous submission of a task to the pool. The returned future can be
  146.   /// used to wait for the task to finish and is *non-blocking* on destruction.
  147.   template <typename ResTy>
  148.   std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
  149.                                       ThreadPoolTaskGroup *Group) {
  150.  
  151. #if LLVM_ENABLE_THREADS
  152.     /// Wrap the Task in a std::function<void()> that sets the result of the
  153.     /// corresponding future.
  154.     auto R = createTaskAndFuture(Task);
  155.  
  156.     int requestedThreads;
  157.     {
  158.       // Lock the queue and push the new task
  159.       std::unique_lock<std::mutex> LockGuard(QueueLock);
  160.  
  161.       // Don't allow enqueueing after disabling the pool
  162.       assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
  163.       Tasks.emplace_back(std::make_pair(std::move(R.first), Group));
  164.       requestedThreads = ActiveThreads + Tasks.size();
  165.     }
  166.     QueueCondition.notify_one();
  167.     grow(requestedThreads);
  168.     return R.second.share();
  169.  
  170. #else // LLVM_ENABLE_THREADS Disabled
  171.  
  172.     // Get a Future with launch::deferred execution using std::async
  173.     auto Future = std::async(std::launch::deferred, std::move(Task)).share();
  174.     // Wrap the future so that both ThreadPool::wait() can operate and the
  175.     // returned future can be sync'ed on.
  176.     Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group));
  177.     return Future;
  178. #endif
  179.   }
  180.  
  181. #if LLVM_ENABLE_THREADS
  182.   // Grow to ensure that we have at least `requested` Threads, but do not go
  183.   // over MaxThreadCount.
  184.   void grow(int requested);
  185.  
  186.   void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
  187. #endif
  188.  
  189.   /// Threads in flight
  190.   std::vector<llvm::thread> Threads;
  191.   /// Lock protecting access to the Threads vector.
  192.   mutable llvm::sys::RWMutex ThreadsLock;
  193.  
  194.   /// Tasks waiting for execution in the pool.
  195.   std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
  196.  
  197.   /// Locking and signaling for accessing the Tasks queue.
  198.   std::mutex QueueLock;
  199.   std::condition_variable QueueCondition;
  200.  
  201.   /// Signaling for job completion (all tasks or all tasks in a group).
  202.   std::condition_variable CompletionCondition;
  203.  
  204.   /// Keep track of the number of thread actually busy
  205.   unsigned ActiveThreads = 0;
  206.   /// Number of threads active for tasks in the given group (only non-zero).
  207.   DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
  208.  
  209. #if LLVM_ENABLE_THREADS // avoids warning for unused variable
  210.   /// Signal for the destruction of the pool, asking thread to exit.
  211.   bool EnableFlag = true;
  212. #endif
  213.  
  214.   const ThreadPoolStrategy Strategy;
  215.  
  216.   /// Maximum number of threads to potentially grow this pool to.
  217.   const unsigned MaxThreadCount;
  218. };
  219.  
  220. /// A group of tasks to be run on a thread pool. Thread pool tasks in different
  221. /// groups can run on the same threadpool but can be waited for separately.
  222. /// It is even possible for tasks of one group to submit and wait for tasks
  223. /// of another group, as long as this does not form a loop.
  224. class ThreadPoolTaskGroup {
  225. public:
  226.   /// The ThreadPool argument is the thread pool to forward calls to.
  227.   ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {}
  228.  
  229.   /// Blocking destructor: will wait for all the tasks in the group to complete
  230.   /// by calling ThreadPool::wait().
  231.   ~ThreadPoolTaskGroup() { wait(); }
  232.  
  233.   /// Calls ThreadPool::async() for this group.
  234.   template <typename Function, typename... Args>
  235.   inline auto async(Function &&F, Args &&...ArgList) {
  236.     return Pool.async(*this, std::forward<Function>(F),
  237.                       std::forward<Args>(ArgList)...);
  238.   }
  239.  
  240.   /// Calls ThreadPool::wait() for this group.
  241.   void wait() { Pool.wait(*this); }
  242.  
  243. private:
  244.   ThreadPool &Pool;
  245. };
  246.  
  247. } // namespace llvm
  248.  
  249. #endif // LLVM_SUPPORT_THREADPOOL_H
  250.