Eigen::ThreadLocal< T, Initialize, Release > Class Template Reference

Classes

struct  ThreadIdAndValue
 

Public Member Functions

void ForEach (std::function< void(std::thread::id, T &)> f)
 
Tlocal ()
 
 ThreadLocal (int capacity)
 
 ThreadLocal (int capacity, Initialize initialize)
 
 ThreadLocal (int capacity, Initialize initialize, Release release)
 
 ~ThreadLocal ()
 

Private Member Functions

TSpilledLocal (std::thread::id this_thread)
 

Private Attributes

const int capacity_
 
MaxSizeVector< ThreadIdAndValuedata_
 
std::atomic< int > filled_records_
 
Initialize initialize_
 
EIGEN_MUTEX mu_
 
std::unordered_map< std::thread::id, Tper_thread_map_
 
MaxSizeVector< std::atomic< ThreadIdAndValue * > > ptr_
 
Release release_
 

Detailed Description

template<typename T, typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
class Eigen::ThreadLocal< T, Initialize, Release >

Definition at line 113 of file ThreadLocal.h.

Constructor & Destructor Documentation

◆ ThreadLocal() [1/3]

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Eigen::ThreadLocal< T, Initialize, Release >::ThreadLocal ( int  capacity)
inlineexplicit

Definition at line 119 of file ThreadLocal.h.

120  : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize<T>(),
121  internal::ThreadLocalNoOpRelease<T>()) {}
ThreadLocal(int capacity)
Definition: ThreadLocal.h:119

◆ ThreadLocal() [2/3]

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Eigen::ThreadLocal< T, Initialize, Release >::ThreadLocal ( int  capacity,
Initialize  initialize 
)
inline

Definition at line 123 of file ThreadLocal.h.

124  : ThreadLocal(capacity, std::move(initialize),
125  internal::ThreadLocalNoOpRelease<T>()) {}

◆ ThreadLocal() [3/3]

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Eigen::ThreadLocal< T, Initialize, Release >::ThreadLocal ( int  capacity,
Initialize  initialize,
Release  release 
)
inline

Definition at line 127 of file ThreadLocal.h.

128  : initialize_(std::move(initialize)),
129  release_(std::move(release)),
130  capacity_(capacity),
131  data_(capacity_),
132  ptr_(capacity_),
133  filled_records_(0) {
134  eigen_assert(capacity_ >= 0);
135  data_.resize(capacity_);
136  for (int i = 0; i < capacity_; ++i) {
137  ptr_.emplace_back(nullptr);
138  }
139  }
#define eigen_assert(x)
Definition: Macros.h:902
Initialize initialize_
Definition: ThreadLocal.h:275
MaxSizeVector< ThreadIdAndValue > data_
Definition: ThreadLocal.h:281
MaxSizeVector< std::atomic< ThreadIdAndValue * > > ptr_
Definition: ThreadLocal.h:285
std::atomic< int > filled_records_
Definition: ThreadLocal.h:288
const int capacity_
Definition: ThreadLocal.h:277

◆ ~ThreadLocal()

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Eigen::ThreadLocal< T, Initialize, Release >::~ThreadLocal ( )
inline

Definition at line 235 of file ThreadLocal.h.

235  {
236  // Reading directly from `data_` is unsafe, because only CAS to the record
237  // in `ptr_` makes all changes visible to other threads.
238  for (auto& ptr : ptr_) {
239  ThreadIdAndValue* record = ptr.load();
240  if (record == nullptr) continue;
241  release_(record->value);
242  }
243 
244  // We did not spill into the map based storage.
245  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
246 
247  // Adds a happens before edge from the last call to SpilledLocal().
248  EIGEN_MUTEX_LOCK lock(mu_);
249  for (auto& kv : per_thread_map_) {
250  release_(kv.second);
251  }
252  }
#define EIGEN_MUTEX_LOCK
Definition: ThreadPool:58
EIGEN_MUTEX mu_
Definition: ThreadLocal.h:293
std::unordered_map< std::thread::id, T > per_thread_map_
Definition: ThreadLocal.h:294

Member Function Documentation

◆ ForEach()

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
void Eigen::ThreadLocal< T, Initialize, Release >::ForEach ( std::function< void(std::thread::id, T &)>  f)
inline

Definition at line 215 of file ThreadLocal.h.

215  {
216  // Reading directly from `data_` is unsafe, because only CAS to the
217  // record in `ptr_` makes all changes visible to other threads.
218  for (auto& ptr : ptr_) {
219  ThreadIdAndValue* record = ptr.load();
220  if (record == nullptr) continue;
221  f(record->thread_id, record->value);
222  }
223 
224  // We did not spill into the map based storage.
225  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
226 
227  // Adds a happens before edge from the last call to SpilledLocal().
228  EIGEN_MUTEX_LOCK lock(mu_);
229  for (auto& kv : per_thread_map_) {
230  f(kv.first, kv.second);
231  }
232  }

◆ local()

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
T& Eigen::ThreadLocal< T, Initialize, Release >::local ( )
inline

Definition at line 141 of file ThreadLocal.h.

141  {
142  std::thread::id this_thread = std::this_thread::get_id();
143  if (capacity_ == 0) return SpilledLocal(this_thread);
144 
145  std::size_t h = std::hash<std::thread::id>()(this_thread);
146  const int start_idx = h % capacity_;
147 
148  // NOTE: From the definition of `std::this_thread::get_id()` it is
149  // guaranteed that we never can have concurrent insertions with the same key
150  // to our hash-map like data structure. If we didn't find an element during
151  // the initial traversal, it's guaranteed that no one else could have
152  // inserted it while we are in this function. This allows to massively
153  // simplify out lock-free insert-only hash map.
154 
155  // Check if we already have an element for `this_thread`.
156  int idx = start_idx;
157  while (ptr_[idx].load() != nullptr) {
158  ThreadIdAndValue& record = *(ptr_[idx].load());
159  if (record.thread_id == this_thread) return record.value;
160 
161  idx += 1;
162  if (idx >= capacity_) idx -= capacity_;
163  if (idx == start_idx) break;
164  }
165 
166  // If we are here, it means that we found an insertion point in lookup
167  // table at `idx`, or we did a full traversal and table is full.
168 
169  // If lock-free storage is full, fallback on mutex.
170  if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread);
171 
172  // We double check that we still have space to insert an element into a lock
173  // free storage. If old value in `filled_records_` is larger than the
174  // records capacity, it means that some other thread added an element while
175  // we were traversing lookup table.
176  int insertion_index =
177  filled_records_.fetch_add(1, std::memory_order_relaxed);
178  if (insertion_index >= capacity_) return SpilledLocal(this_thread);
179 
180  // At this point it's guaranteed that we can access to
181  // data_[insertion_index_] without a data race.
182  data_[insertion_index].thread_id = this_thread;
183  initialize_(data_[insertion_index].value);
184 
185  // That's the pointer we'll put into the lookup table.
186  ThreadIdAndValue* inserted = &data_[insertion_index];
187 
188  // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop.
189  ThreadIdAndValue* empty = nullptr;
190 
191  // Now we have to find an insertion point into the lookup table. We start
192  // from the `idx` that was identified as an insertion point above, it's
193  // guaranteed that we will have an empty record somewhere in a lookup table
194  // (because we created a record in the `data_`).
195  const int insertion_idx = idx;
196 
197  do {
198  // Always start search from the original insertion candidate.
199  idx = insertion_idx;
200  while (ptr_[idx].load() != nullptr) {
201  idx += 1;
202  if (idx >= capacity_) idx -= capacity_;
203  // If we did a full loop, it means that we don't have any free entries
204  // in the lookup table, and this means that something is terribly wrong.
205  eigen_assert(idx != insertion_idx);
206  }
207  // Atomic CAS of the pointer guarantees that any other thread, that will
208  // follow this pointer will see all the mutations in the `data_`.
209  } while (!ptr_[idx].compare_exchange_weak(empty, inserted));
210 
211  return inserted->value;
212  }
T & SpilledLocal(std::thread::id this_thread)
Definition: ThreadLocal.h:261

◆ SpilledLocal()

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
T& Eigen::ThreadLocal< T, Initialize, Release >::SpilledLocal ( std::thread::id  this_thread)
inlineprivate

Definition at line 261 of file ThreadLocal.h.

261  {
262  EIGEN_MUTEX_LOCK lock(mu_);
263 
264  auto it = per_thread_map_.find(this_thread);
265  if (it == per_thread_map_.end()) {
266  auto result = per_thread_map_.emplace(this_thread, T());
267  eigen_assert(result.second);
268  initialize_((*result.first).second);
269  return (*result.first).second;
270  } else {
271  return it->second;
272  }
273  }
Eigen::Triplet< double > T

Member Data Documentation

◆ capacity_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
const int Eigen::ThreadLocal< T, Initialize, Release >::capacity_
private

Definition at line 277 of file ThreadLocal.h.

◆ data_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
MaxSizeVector<ThreadIdAndValue> Eigen::ThreadLocal< T, Initialize, Release >::data_
private

Definition at line 281 of file ThreadLocal.h.

◆ filled_records_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
std::atomic<int> Eigen::ThreadLocal< T, Initialize, Release >::filled_records_
private

Definition at line 288 of file ThreadLocal.h.

◆ initialize_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Initialize Eigen::ThreadLocal< T, Initialize, Release >::initialize_
private

Definition at line 275 of file ThreadLocal.h.

◆ mu_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
EIGEN_MUTEX Eigen::ThreadLocal< T, Initialize, Release >::mu_
private

Definition at line 293 of file ThreadLocal.h.

◆ per_thread_map_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
std::unordered_map<std::thread::id, T> Eigen::ThreadLocal< T, Initialize, Release >::per_thread_map_
private

Definition at line 294 of file ThreadLocal.h.

◆ ptr_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
MaxSizeVector<std::atomic<ThreadIdAndValue*> > Eigen::ThreadLocal< T, Initialize, Release >::ptr_
private

Definition at line 285 of file ThreadLocal.h.

◆ release_

template<typename T , typename Initialize = internal::ThreadLocalNoOpInitialize<T>, typename Release = internal::ThreadLocalNoOpRelease<T>>
Release Eigen::ThreadLocal< T, Initialize, Release >::release_
private

Definition at line 276 of file ThreadLocal.h.


The documentation for this class was generated from the following file: