10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
17 template <
typename Environment>
20 typedef typename Environment::Task
Task;
27 Environment env = Environment())
53 #ifndef EIGEN_THREAD_LOCAL
60 env_.CreateThread([
this,
i]() { WorkerLoop(i); }));
62 #ifndef EIGEN_THREAD_LOCAL
95 const auto& pair = partitions[
i];
96 unsigned start = pair.first,
end = pair.second;
108 int limit)
override {
109 Task t =
env_.CreateTask(std::move(fn));
111 if (pt->
pool ==
this) {
120 int num_queues = limit - start;
121 int rnd =
Rand(&pt->
rand) % num_queues;
145 #ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
159 if (pt->
pool ==
this) {
194 thread_data_[
i].steal_partition.store(val, std::memory_order_relaxed);
198 return thread_data_[
i].steal_partition.load(std::memory_order_relaxed);
202 for (
int i = 1;
i <= N;
i++) {
217 typedef typename Environment::EnvThread
Thread;
224 #ifndef EIGEN_THREAD_LOCAL
249 #ifndef EIGEN_THREAD_LOCAL
257 #ifndef EIGEN_THREAD_LOCAL
258 std::unique_ptr<PerThread> new_pt(
new PerThread());
277 const int spin_count =
288 for (
int i = 0;
i < spin_count && !t.f;
i++) {
289 if (!
cancelled_.load(std::memory_order_relaxed)) {
312 for (
int i = 0;
i < spin_count && !t.f;
i++) {
313 if (!
cancelled_.load(std::memory_order_relaxed)) {
340 const size_t size = limit - start;
349 for (
unsigned i = 0;
i <
size;
i++) {
356 if (victim >=
size) {
370 unsigned start, limit;
374 return Steal(start, limit);
441 unsigned victim = r %
size;
442 for (
unsigned i = 0;
i <
size;
i++) {
447 if (victim >=
size) {
455 return std::hash<std::thread::id>()(std::this_thread::get_id());
459 #ifndef EIGEN_THREAD_LOCAL
465 return it->second.get();
468 EIGEN_THREAD_LOCAL
PerThread per_thread_;
477 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
479 return static_cast<unsigned>((current ^ (current >> 22)) >>
480 (22 + (current >> 61)));
#define eigen_plain_assert(condition)
#define EIGEN_UNUSED_VARIABLE(var)
void CommitWait(Waiter *w)
void Notify(bool notifyAll)
void push_back(const T &t)
void AssertBounds(int start, int end)
void Cancel() EIGEN_OVERRIDE
static uint64_t GlobalThreadIdHash()
bool WaitForWork(EventCount::Waiter *waiter, Task *t)
std::atomic< unsigned > blocked_
int CurrentThreadId() const EIGEN_FINAL
void Schedule(std::function< void()> fn) EIGEN_OVERRIDE
EIGEN_MUTEX per_thread_map_mutex_
void ComputeCoprimes(int N, MaxSizeVector< unsigned > *coprimes)
ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env=Environment())
void ScheduleWithHint(std::function< void()> fn, int start, int limit) override
void SetStealPartition(size_t i, unsigned val)
static unsigned Rand(uint64_t *state)
MaxSizeVector< ThreadData > thread_data_
std::unordered_map< uint64_t, std::unique_ptr< PerThread > > per_thread_map_
static const int kMaxPartitionBits
Environment::EnvThread Thread
const bool allow_spinning_
static const int kMaxThreads
std::unique_ptr< Barrier > init_barrier_
int NumThreads() const EIGEN_FINAL
void DecodePartition(unsigned val, unsigned *start, unsigned *limit)
unsigned GetStealPartition(int i)
ThreadPoolTempl(int num_threads, Environment env=Environment())
unsigned EncodePartition(unsigned start, unsigned limit)
std::atomic< bool > spinning_
MaxSizeVector< MaxSizeVector< unsigned > > all_coprimes_
std::atomic< bool > cancelled_
PerThread * GetPerThread()
void SetStealPartitions(const std::vector< std::pair< unsigned, unsigned >> &partitions)
RunQueue< Task, 1024 > Queue
std::atomic< bool > done_
unsigned global_steal_partition_
MaxSizeVector< EventCount::Waiter > waiters_
void WorkerLoop(int thread_id)
Task Steal(unsigned start, unsigned limit)
static const lastp1_t end
ThreadPoolTempl< StlThreadEnvironment > ThreadPool
std::unique_ptr< Thread > thread
std::atomic< unsigned > steal_partition