TensorDeviceThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12 
13 #include "./InternalHeaderCheck.h"
14 
15 namespace Eigen {
16 
17 // Runs an arbitrary function and then calls Notify() on the passed in
18 // Notification.
19 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
20 {
21  static void run(Notification* n, Function f, Args... args) {
22  f(args...);
23  if (n) {
24  n->Notify();
25  }
26  }
27 };
28 
29 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
30 {
31  static void run(Barrier* b, Function f, Args... args) {
32  f(args...);
33  if (b) {
34  b->Notify();
35  }
36  }
37 };
38 
39 template <typename SyncType>
40 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
41  if (n) {
42  n->Wait();
43  }
44 }
45 
46 // An abstract interface to a device specific memory allocator.
47 class Allocator {
48  public:
49  virtual ~Allocator() {}
50  virtual void* allocate(size_t num_bytes) const = 0;
51  virtual void deallocate(void* buffer) const = 0;
52 };
53 
54 // Build a thread pool device on top the an existing pool of threads.
55 struct ThreadPoolDevice {
56  // The ownership of the thread pool remains with the caller.
57  ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
58  : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
59 
60  EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
61  return allocator_ ? allocator_->allocate(num_bytes)
62  : internal::aligned_malloc(num_bytes);
63  }
64 
65  EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
66  if (allocator_) {
67  allocator_->deallocate(buffer);
68  } else {
69  internal::aligned_free(buffer);
70  }
71  }
72 
73  EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const {
74  return allocate(num_bytes);
75  }
76 
77  EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const {
78  deallocate(buffer);
79  }
80 
81  template<typename Type>
82  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
83  return data;
84  }
85 
86  EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
87 #ifdef __ANDROID__
88  ::memcpy(dst, src, n);
89 #else
90  // TODO(rmlarsen): Align blocks on cache lines.
91  // We have observed that going beyond 4 threads usually just wastes
92  // CPU cycles due to the threads competing for memory bandwidth, so we
93  // statically schedule at most 4 block copies here.
94  const size_t kMinBlockSize = 32768;
95  const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
96  if (n <= kMinBlockSize || num_threads < 2) {
97  ::memcpy(dst, src, n);
98  } else {
99  const char* src_ptr = static_cast<const char*>(src);
100  char* dst_ptr = static_cast<char*>(dst);
101  const size_t blocksize = (n + (num_threads - 1)) / num_threads;
102  Barrier barrier(static_cast<int>(num_threads - 1));
103  // Launch the last 3 blocks on worker threads.
104  for (size_t i = 1; i < num_threads; ++i) {
105  enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
106  ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize,
107  numext::mini(blocksize, n - (i * blocksize)));
108  });
109  }
110  // Launch the first block on the main thread.
111  ::memcpy(dst_ptr, src_ptr, blocksize);
112  barrier.Wait();
113  }
114 #endif
115  }
116  EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
117  memcpy(dst, src, n);
118  }
119  EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
120  memcpy(dst, src, n);
121  }
122 
123  EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
124  ::memset(buffer, c, n);
125  }
126 
127  template<typename T>
128  EIGEN_STRONG_INLINE void fill(T* begin, T* end, const T& value) const {
129  std::fill(begin, end, value);
130  }
131 
132  EIGEN_STRONG_INLINE int numThreads() const {
133  return num_threads_;
134  }
135 
136  // Number of theads available in the underlying thread pool. This number can
137  // be different from the value returned by numThreads().
138  EIGEN_STRONG_INLINE int numThreadsInPool() const {
139  return pool_->NumThreads();
140  }
141 
142  EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
143  return l1CacheSize();
144  }
145 
146  EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
147  // The l3 cache size is shared between all the cores.
148  return l3CacheSize() / num_threads_;
149  }
150 
151  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void synchronize() const {
152  // Nothing. Threadpool device operations are synchronous.
153  }
154 
155  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
156  // Should return an enum that encodes the ISA supported by the CPU
157  return 1;
158  }
159 
160  template <class Function, class... Args>
161  EIGEN_STRONG_INLINE Notification* enqueue(Function&& f,
162  Args&&... args) const {
163  Notification* n = new Notification();
164  pool_->Schedule(
165  std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n,
166  std::move(f), args...));
167  return n;
168  }
169 
170  template <class Function, class... Args>
171  EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f,
172  Args&&... args) const {
173  pool_->Schedule(
174  std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b,
175  std::move(f), args...));
176  }
177 
178  template <class Function, class... Args>
179  EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f,
180  Args&&... args) const {
181  if (sizeof...(args) > 0) {
182  pool_->Schedule(std::bind(std::move(f), args...));
183  } else {
184  pool_->Schedule(std::move(f));
185  }
186  }
187 
188  // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
189  // called from one of the threads in pool_. Returns -1 otherwise.
190  EIGEN_STRONG_INLINE int currentThreadId() const {
191  return pool_->CurrentThreadId();
192  }
193 
194  // WARNING: This function is synchronous and will block the calling thread.
195  //
196  // Synchronous parallelFor executes f with [0, n) arguments in parallel and
197  // waits for completion. F accepts a half-open interval [first, last). Block
198  // size is chosen based on the iteration cost and resulting parallel
199  // efficiency. If block_align is not nullptr, it is called to round up the
200  // block size.
201  void parallelFor(Index n, const TensorOpCost& cost,
202  std::function<Index(Index)> block_align,
203  std::function<void(Index, Index)> f) const {
204  if (EIGEN_PREDICT_FALSE(n <= 0)){
205  return;
206  // Compute small problems directly in the caller thread.
207  } else if (n == 1 || numThreads() == 1 ||
208  CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
209  f(0, n);
210  return;
211  }
212 
213  // Compute block size and total count of blocks.
214  ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
215 
216  // Recursively divide size into halves until we reach block_size.
217  // Division code rounds mid to block_size, so we are guaranteed to get
218  // block_count leaves that do actual computations.
219  Barrier barrier(static_cast<unsigned int>(block.count));
220  std::function<void(Index, Index)> handleRange;
221  handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
222  Index lastIdx) {
223  while (lastIdx - firstIdx > block.size) {
224  // Split into halves and schedule the second half on a different thread.
225  const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
226  pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
227  lastIdx = midIdx;
228  }
229  // Single block or less, execute directly.
230  f(firstIdx, lastIdx);
231  barrier.Notify();
232  };
233 
234  if (block.count <= numThreads()) {
235  // Avoid a thread hop by running the root of the tree and one block on the
236  // main thread.
237  handleRange(0, n);
238  } else {
239  // Execute the root in the thread pool to avoid running work on more than
240  // numThreads() threads.
241  pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
242  }
243 
244  barrier.Wait();
245  }
246 
247  // Convenience wrapper for parallelFor that does not align blocks.
248  void parallelFor(Index n, const TensorOpCost& cost,
249  std::function<void(Index, Index)> f) const {
250  parallelFor(n, cost, nullptr, std::move(f));
251  }
252 
253  // WARNING: This function is asynchronous and will not block the calling thread.
254  //
255  // Asynchronous parallelFor executes f with [0, n) arguments in parallel
256  // without waiting for completion. When the last block finished, it will call
257  // 'done' callback. F accepts a half-open interval [first, last). Block size
258  // is chosen based on the iteration cost and resulting parallel efficiency. If
259  // block_align is not nullptr, it is called to round up the block size.
260  void parallelForAsync(Index n, const TensorOpCost& cost,
261  std::function<Index(Index)> block_align,
262  std::function<void(Index, Index)> f,
263  std::function<void()> done) const {
264  // Compute small problems directly in the caller thread.
265  if (n <= 1 || numThreads() == 1 ||
266  CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
267  f(0, n);
268  done();
269  return;
270  }
271 
272  // Compute block size and total count of blocks.
273  ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
274 
275  ParallelForAsyncContext* const ctx =
276  new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
277 
278  // Recursively divide size into halves until we reach block_size.
279  // Division code rounds mid to block_size, so we are guaranteed to get
280  // block_count leaves that do actual computations.
281  ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
282  while (lastIdx - firstIdx > block.size) {
283  // Split into halves and schedule the second half on a different thread.
284  const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
285  pool_->Schedule(
286  [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
287  lastIdx = midIdx;
288  }
289 
290  // Single block or less, execute directly.
291  ctx->f(firstIdx, lastIdx);
292 
293  // Delete async context if it was the last block.
294  if (ctx->count.fetch_sub(1) == 1) delete ctx;
295  };
296 
297  if (block.count <= numThreads()) {
298  // Avoid a thread hop by running the root of the tree and one block on the
299  // main thread.
300  ctx->handle_range(0, n);
301  } else {
302  // Execute the root in the thread pool to avoid running work on more than
303  // numThreads() threads.
304  pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
305  }
306  }
307 
308  // Convenience wrapper for parallelForAsync that does not align blocks.
309  void parallelForAsync(Index n, const TensorOpCost& cost,
310  std::function<void(Index, Index)> f,
311  std::function<void()> done) const {
312  parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
313  }
314 
315  // Thread pool accessor.
316  ThreadPoolInterface* getPool() const { return pool_; }
317 
318  // Allocator accessor.
319  Allocator* allocator() const { return allocator_; }
320 
321  private:
322  typedef TensorCostModel<ThreadPoolDevice> CostModel;
323 
324  // For parallelForAsync we must keep passed in closures on the heap, and
325  // delete them only after `done` callback finished.
326  struct ParallelForAsyncContext {
327  ParallelForAsyncContext(Index block_count,
328  std::function<void(Index, Index)> block_f,
329  std::function<void()> done_callback)
330  : count(block_count),
331  f(std::move(block_f)),
332  done(std::move(done_callback)) {}
333  ~ParallelForAsyncContext() { done(); }
334 
335  std::atomic<Index> count;
336  std::function<void(Index, Index)> f;
337  std::function<void()> done;
338 
339  std::function<void(Index, Index)> handle_range;
340  };
341 
342  struct ParallelForBlock {
343  Index size; // block size
344  Index count; // number of blocks
345  };
346 
347  // Calculates block size based on (1) the iteration cost and (2) parallel
348  // efficiency. We want blocks to be not too small to mitigate parallelization
349  // overheads; not too large to mitigate tail effect and potential load
350  // imbalance and we also want number of blocks to be evenly dividable across
351  // threads.
352  ParallelForBlock CalculateParallelForBlock(
353  const Index n, const TensorOpCost& cost,
354  std::function<Index(Index)> block_align) const {
355  const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
356  const Index max_oversharding_factor = 4;
357  Index block_size = numext::mini(
358  n, numext::maxi<Index>(
359  divup<Index>(n, max_oversharding_factor * numThreads()),
360  block_size_f));
361  const Index max_block_size = numext::mini(n, 2 * block_size);
362 
363  if (block_align) {
364  Index new_block_size = block_align(block_size);
365  eigen_assert(new_block_size >= block_size);
366  block_size = numext::mini(n, new_block_size);
367  }
368 
369  Index block_count = divup(n, block_size);
370 
371  // Calculate parallel efficiency as fraction of total CPU time used for
372  // computations:
373  double max_efficiency =
374  static_cast<double>(block_count) /
375  (divup<int>(block_count, numThreads()) * numThreads());
376 
377  // Now try to increase block size up to max_block_size as long as it
378  // doesn't decrease parallel efficiency.
379  for (Index prev_block_count = block_count;
380  max_efficiency < 1.0 && prev_block_count > 1;) {
381  // This is the next block size that divides size into a smaller number
382  // of blocks than the current block_size.
383  Index coarser_block_size = divup(n, prev_block_count - 1);
384  if (block_align) {
385  Index new_block_size = block_align(coarser_block_size);
386  eigen_assert(new_block_size >= coarser_block_size);
387  coarser_block_size = numext::mini(n, new_block_size);
388  }
389  if (coarser_block_size > max_block_size) {
390  break; // Reached max block size. Stop.
391  }
392  // Recalculate parallel efficiency.
393  const Index coarser_block_count = divup(n, coarser_block_size);
394  eigen_assert(coarser_block_count < prev_block_count);
395  prev_block_count = coarser_block_count;
396  const double coarser_efficiency =
397  static_cast<double>(coarser_block_count) /
398  (divup<int>(coarser_block_count, numThreads()) * numThreads());
399  if (coarser_efficiency + 0.01 >= max_efficiency) {
400  // Taking it.
401  block_size = coarser_block_size;
402  block_count = coarser_block_count;
403  if (max_efficiency < coarser_efficiency) {
404  max_efficiency = coarser_efficiency;
405  }
406  }
407  }
408 
409  return {block_size, block_count};
410  }
411 
412  ThreadPoolInterface* pool_;
413  int num_threads_;
414  Allocator* allocator_;
415 };
416 
417 
418 } // end namespace Eigen
419 
420 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
int n
int i
EIGEN_DOC_BLOCK_ADDONS_NOT_INNER_PANEL FixedBlockXpr< NRows, NCols >::Type block(Index startRow, Index startCol)
#define EIGEN_PREDICT_FALSE(x)
#define EIGEN_DEVICE_FUNC
#define eigen_assert(x)
int data[]
static const lastp1_t end
Type
void * aligned_malloc(std::size_t size)
void aligned_free(void *ptr)
EIGEN_ALWAYS_INLINE T mini(const T &x, const T &y)
: TensorContractionSycl.h, provides various tensor contraction kernel for SYCL backend
std::ptrdiff_t l1CacheSize()
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
EIGEN_ALWAYS_INLINE T divup(const X x, const Y y)
Definition: TensorMeta.h:32
std::ptrdiff_t l3CacheSize()
SparseMat::Index size