10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
13 #include "./InternalHeaderCheck.h"
19 template <
typename Function,
typename... Args>
struct FunctionWrapperWithNotification
21 static void run(Notification* n, Function f, Args... args) {
29 template <
typename Function,
typename... Args>
struct FunctionWrapperWithBarrier
31 static void run(Barrier*
b, Function f, Args... args) {
39 template <
typename SyncType>
40 static EIGEN_STRONG_INLINE
void wait_until_ready(SyncType* n) {
49 virtual ~Allocator() {}
50 virtual void* allocate(
size_t num_bytes)
const = 0;
51 virtual void deallocate(
void* buffer)
const = 0;
55 struct ThreadPoolDevice {
57 ThreadPoolDevice(ThreadPoolInterface* pool,
int num_cores, Allocator* allocator =
nullptr)
58 : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
60 EIGEN_STRONG_INLINE
void* allocate(
size_t num_bytes)
const {
61 return allocator_ ? allocator_->allocate(num_bytes)
65 EIGEN_STRONG_INLINE
void deallocate(
void* buffer)
const {
67 allocator_->deallocate(buffer);
73 EIGEN_STRONG_INLINE
void* allocate_temp(
size_t num_bytes)
const {
74 return allocate(num_bytes);
77 EIGEN_STRONG_INLINE
void deallocate_temp(
void* buffer)
const {
81 template<
typename Type>
86 EIGEN_STRONG_INLINE
void memcpy(
void* dst,
const void* src,
size_t n)
const {
88 ::memcpy(dst, src, n);
94 const size_t kMinBlockSize = 32768;
95 const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
96 if (n <= kMinBlockSize || num_threads < 2) {
97 ::memcpy(dst, src, n);
99 const char* src_ptr =
static_cast<const char*
>(src);
100 char* dst_ptr =
static_cast<char*
>(dst);
101 const size_t blocksize = (
n + (num_threads - 1)) / num_threads;
102 Barrier barrier(
static_cast<int>(num_threads - 1));
104 for (
size_t i = 1;
i < num_threads; ++
i) {
105 enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
106 ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize,
111 ::memcpy(dst_ptr, src_ptr, blocksize);
116 EIGEN_STRONG_INLINE
void memcpyHostToDevice(
void* dst,
const void* src,
size_t n)
const {
119 EIGEN_STRONG_INLINE
void memcpyDeviceToHost(
void* dst,
const void* src,
size_t n)
const {
123 EIGEN_STRONG_INLINE
void memset(
void* buffer,
int c,
size_t n)
const {
124 ::memset(buffer, c, n);
128 EIGEN_STRONG_INLINE
void fill(
T* begin,
T*
end,
const T& value)
const {
129 std::fill(begin,
end, value);
132 EIGEN_STRONG_INLINE
int numThreads()
const {
138 EIGEN_STRONG_INLINE
int numThreadsInPool()
const {
139 return pool_->NumThreads();
142 EIGEN_STRONG_INLINE
size_t firstLevelCacheSize()
const {
146 EIGEN_STRONG_INLINE
size_t lastLevelCacheSize()
const {
160 template <
class Function,
class... Args>
161 EIGEN_STRONG_INLINE Notification* enqueue(Function&& f,
162 Args&&... args)
const {
163 Notification*
n =
new Notification();
165 std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n,
166 std::move(f), args...));
170 template <
class Function,
class... Args>
171 EIGEN_STRONG_INLINE
void enqueue_with_barrier(Barrier*
b, Function&& f,
172 Args&&... args)
const {
174 std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run,
b,
175 std::move(f), args...));
178 template <
class Function,
class... Args>
179 EIGEN_STRONG_INLINE
void enqueueNoNotification(Function&& f,
180 Args&&... args)
const {
181 if (
sizeof...(args) > 0) {
182 pool_->Schedule(std::bind(std::move(f), args...));
184 pool_->Schedule(std::move(f));
190 EIGEN_STRONG_INLINE
int currentThreadId()
const {
191 return pool_->CurrentThreadId();
201 void parallelFor(
Index n,
const TensorOpCost& cost,
207 }
else if (n == 1 || numThreads() == 1 ||
208 CostModel::numThreads(n, cost,
static_cast<int>(numThreads())) == 1) {
214 ParallelForBlock
block = CalculateParallelForBlock(n, cost, block_align);
219 Barrier barrier(
static_cast<unsigned int>(
block.count));
220 std::function<void(
Index,
Index)> handleRange;
221 handleRange = [=, &handleRange, &barrier, &f](
Index firstIdx,
223 while (lastIdx - firstIdx >
block.size) {
226 pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
230 f(firstIdx, lastIdx);
234 if (
block.count <= numThreads()) {
241 pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
248 void parallelFor(
Index n,
const TensorOpCost& cost,
250 parallelFor(n, cost,
nullptr, std::move(f));
260 void parallelForAsync(
Index n,
const TensorOpCost& cost,
263 std::function<
void()> done)
const {
265 if (n <= 1 || numThreads() == 1 ||
266 CostModel::numThreads(n, cost,
static_cast<int>(numThreads())) == 1) {
273 ParallelForBlock
block = CalculateParallelForBlock(n, cost, block_align);
275 ParallelForAsyncContext*
const ctx =
276 new ParallelForAsyncContext(
block.count, std::move(f), std::move(done));
281 ctx->handle_range = [
this, ctx,
block](
Index firstIdx,
Index lastIdx) {
282 while (lastIdx - firstIdx >
block.size) {
286 [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
291 ctx->f(firstIdx, lastIdx);
294 if (ctx->count.fetch_sub(1) == 1)
delete ctx;
297 if (
block.count <= numThreads()) {
300 ctx->handle_range(0, n);
304 pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
309 void parallelForAsync(
Index n,
const TensorOpCost& cost,
311 std::function<
void()> done)
const {
312 parallelForAsync(n, cost,
nullptr, std::move(f), std::move(done));
316 ThreadPoolInterface* getPool()
const {
return pool_; }
319 Allocator* allocator()
const {
return allocator_; }
322 typedef TensorCostModel<ThreadPoolDevice> CostModel;
326 struct ParallelForAsyncContext {
327 ParallelForAsyncContext(
Index block_count,
329 std::function<
void()> done_callback)
330 : count(block_count),
331 f(
std::move(block_f)),
332 done(
std::move(done_callback)) {}
333 ~ParallelForAsyncContext() { done(); }
335 std::atomic<Index> count;
337 std::function<void()> done;
339 std::function<void(
Index,
Index)> handle_range;
342 struct ParallelForBlock {
352 ParallelForBlock CalculateParallelForBlock(
353 const Index n,
const TensorOpCost& cost,
354 std::function<
Index(
Index)> block_align)
const {
355 const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
356 const Index max_oversharding_factor = 4;
358 n, numext::maxi<Index>(
359 divup<Index>(n, max_oversharding_factor * numThreads()),
364 Index new_block_size = block_align(block_size);
373 double max_efficiency =
374 static_cast<double>(block_count) /
375 (divup<int>(block_count, numThreads()) * numThreads());
379 for (
Index prev_block_count = block_count;
380 max_efficiency < 1.0 && prev_block_count > 1;) {
383 Index coarser_block_size =
divup(n, prev_block_count - 1);
385 Index new_block_size = block_align(coarser_block_size);
389 if (coarser_block_size > max_block_size) {
393 const Index coarser_block_count =
divup(n, coarser_block_size);
395 prev_block_count = coarser_block_count;
396 const double coarser_efficiency =
397 static_cast<double>(coarser_block_count) /
398 (divup<int>(coarser_block_count, numThreads()) * numThreads());
399 if (coarser_efficiency + 0.01 >= max_efficiency) {
401 block_size = coarser_block_size;
402 block_count = coarser_block_count;
403 if (max_efficiency < coarser_efficiency) {
404 max_efficiency = coarser_efficiency;
409 return {block_size, block_count};
412 ThreadPoolInterface* pool_;
414 Allocator* allocator_;
EIGEN_DOC_BLOCK_ADDONS_NOT_INNER_PANEL FixedBlockXpr< NRows, NCols >::Type block(Index startRow, Index startCol)
#define EIGEN_PREDICT_FALSE(x)
#define EIGEN_DEVICE_FUNC
static const lastp1_t end
void * aligned_malloc(std::size_t size)
void aligned_free(void *ptr)
EIGEN_ALWAYS_INLINE T mini(const T &x, const T &y)
: TensorContractionSycl.h, provides various tensor contraction kernel for SYCL backend
std::ptrdiff_t l1CacheSize()
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
EIGEN_ALWAYS_INLINE T divup(const X x, const Y y)
std::ptrdiff_t l3CacheSize()