RunQueue.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
12 
13 #include "./InternalHeaderCheck.h"
14 
15 namespace Eigen {
16 
17 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
18 // Operations on front of the queue must be done by a single thread (owner),
19 // operations on back of the queue can be done by multiple threads concurrently.
20 //
21 // Algorithm outline:
22 // All remote threads operating on the queue back are serialized by a mutex.
23 // This ensures that at most two threads access state: owner and one remote
24 // thread (Size aside). The algorithm ensures that the occupied region of the
25 // underlying array is logically continuous (can wraparound, but no stray
26 // occupied elements). Owner operates on one end of this region, remote thread
27 // operates on the other end. Synchronization between these threads
28 // (potential consumption of the last element and take up of the last empty
29 // element) happens by means of state variable in each element. States are:
30 // empty, busy (in process of insertion of removal) and ready. Threads claim
31 // elements (empty->busy and ready->busy transitions) by means of a CAS
32 // operation. The finishing transition (busy->empty and busy->ready) are done
33 // with plain store as the element is exclusively owned by the current thread.
34 //
35 // Note: we could permit only pointers as elements, then we would not need
36 // separate state variable as null/non-null pointer value would serve as state,
37 // but that would require malloc/free per operation for large, complex values
38 // (and this is designed to store std::function<()>).
39 template <typename Work, unsigned kSize>
40 class RunQueue {
41  public:
42  RunQueue() : front_(0), back_(0) {
43  // require power-of-two for fast masking
44  eigen_plain_assert((kSize & (kSize - 1)) == 0);
45  eigen_plain_assert(kSize > 2); // why would you do this?
46  eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter
47  for (unsigned i = 0; i < kSize; i++)
48  array_[i].state.store(kEmpty, std::memory_order_relaxed);
49  }
50 
52 
53  // PushFront inserts w at the beginning of the queue.
54  // If queue is full returns w, otherwise returns default-constructed Work.
55  Work PushFront(Work w) {
56  unsigned front = front_.load(std::memory_order_relaxed);
57  Elem* e = &array_[front & kMask];
58  uint8_t s = e->state.load(std::memory_order_relaxed);
59  if (s != kEmpty ||
60  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
61  return w;
62  front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
63  e->w = std::move(w);
64  e->state.store(kReady, std::memory_order_release);
65  return Work();
66  }
67 
68  // PopFront removes and returns the first element in the queue.
69  // If the queue was empty returns default-constructed Work.
70  Work PopFront() {
71  unsigned front = front_.load(std::memory_order_relaxed);
72  Elem* e = &array_[(front - 1) & kMask];
73  uint8_t s = e->state.load(std::memory_order_relaxed);
74  if (s != kReady ||
75  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
76  return Work();
77  Work w = std::move(e->w);
78  e->state.store(kEmpty, std::memory_order_release);
79  front = ((front - 1) & kMask2) | (front & ~kMask2);
80  front_.store(front, std::memory_order_relaxed);
81  return w;
82  }
83 
84  // PushBack adds w at the end of the queue.
85  // If queue is full returns w, otherwise returns default-constructed Work.
86  Work PushBack(Work w) {
88  unsigned back = back_.load(std::memory_order_relaxed);
89  Elem* e = &array_[(back - 1) & kMask];
90  uint8_t s = e->state.load(std::memory_order_relaxed);
91  if (s != kEmpty ||
92  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
93  return w;
94  back = ((back - 1) & kMask2) | (back & ~kMask2);
95  back_.store(back, std::memory_order_relaxed);
96  e->w = std::move(w);
97  e->state.store(kReady, std::memory_order_release);
98  return Work();
99  }
100 
101  // PopBack removes and returns the last elements in the queue.
102  Work PopBack() {
103  if (Empty()) return Work();
104  EIGEN_MUTEX_LOCK lock(mutex_);
105  unsigned back = back_.load(std::memory_order_relaxed);
106  Elem* e = &array_[back & kMask];
107  uint8_t s = e->state.load(std::memory_order_relaxed);
108  if (s != kReady ||
109  !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
110  return Work();
111  Work w = std::move(e->w);
112  e->state.store(kEmpty, std::memory_order_release);
113  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
114  return w;
115  }
116 
117  // PopBackHalf removes and returns half last elements in the queue.
118  // Returns number of elements removed.
119  unsigned PopBackHalf(std::vector<Work>* result) {
120  if (Empty()) return 0;
121  EIGEN_MUTEX_LOCK lock(mutex_);
122  unsigned back = back_.load(std::memory_order_relaxed);
123  unsigned size = Size();
124  unsigned mid = back;
125  if (size > 1) mid = back + (size - 1) / 2;
126  unsigned n = 0;
127  unsigned start = 0;
128  for (; static_cast<int>(mid - back) >= 0; mid--) {
129  Elem* e = &array_[mid & kMask];
130  uint8_t s = e->state.load(std::memory_order_relaxed);
131  if (n == 0) {
132  if (s != kReady || !e->state.compare_exchange_strong(
133  s, kBusy, std::memory_order_acquire))
134  continue;
135  start = mid;
136  } else {
137  // Note: no need to store temporal kBusy, we exclusively own these
138  // elements.
140  }
141  result->push_back(std::move(e->w));
142  e->state.store(kEmpty, std::memory_order_release);
143  n++;
144  }
145  if (n != 0)
146  back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
147  return n;
148  }
149 
150  // Size returns current queue size.
151  // Can be called by any thread at any time.
152  unsigned Size() const { return SizeOrNotEmpty<true>(); }
153 
154  // Empty tests whether container is empty.
155  // Can be called by any thread at any time.
156  bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
157 
158  // Delete all the elements from the queue.
159  void Flush() {
160  while (!Empty()) {
161  PopFront();
162  }
163  }
164 
165  private:
166  static const unsigned kMask = kSize - 1;
167  static const unsigned kMask2 = (kSize << 1) - 1;
168  struct Elem {
169  std::atomic<uint8_t> state;
170  Work w;
171  };
172  enum {
176  };
178  // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
179  // front/back, respectively. The remaining bits contain modification counters
180  // that are incremented on Push operations. This allows us to (1) distinguish
181  // between empty and full conditions (if we would use log(kSize) bits for
182  // position, these conditions would be indistinguishable); (2) obtain
183  // consistent snapshot of front_/back_ for Size operation using the
184  // modification counters.
185  std::atomic<unsigned> front_;
186  std::atomic<unsigned> back_;
187  Elem array_[kSize];
188 
189  // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
190  // only whether the size is 0 is guaranteed to be correct.
191  // Can be called by any thread at any time.
192  template<bool NeedSizeEstimate>
193  unsigned SizeOrNotEmpty() const {
194  // Emptiness plays critical role in thread pool blocking. So we go to great
195  // effort to not produce false positives (claim non-empty queue as empty).
196  unsigned front = front_.load(std::memory_order_acquire);
197  for (;;) {
198  // Capture a consistent snapshot of front/tail.
199  unsigned back = back_.load(std::memory_order_acquire);
200  unsigned front1 = front_.load(std::memory_order_relaxed);
201  if (front != front1) {
202  front = front1;
203  std::atomic_thread_fence(std::memory_order_acquire);
204  continue;
205  }
206  if (NeedSizeEstimate) {
207  return CalculateSize(front, back);
208  } else {
209  // This value will be 0 if the queue is empty, and undefined otherwise.
210  unsigned maybe_zero = ((front ^ back) & kMask2);
211  // Queue size estimate must agree with maybe zero check on the queue
212  // empty/non-empty state.
213  eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
214  return maybe_zero;
215  }
216  }
217  }
218 
220  unsigned CalculateSize(unsigned front, unsigned back) const {
221  int size = (front & kMask2) - (back & kMask2);
222  // Fix overflow.
223  if (size < 0) size += 2 * kSize;
224  // Order of modification in push/pop is crafted to make the queue look
225  // larger than it is during concurrent modifications. E.g. push can
226  // increment size before the corresponding pop has decremented it.
227  // So the computed size can be up to kSize + 1, fix it.
228  if (size > static_cast<int>(kSize)) size = kSize;
229  return static_cast<unsigned>(size);
230  }
231 
232  RunQueue(const RunQueue&) = delete;
233  void operator=(const RunQueue&) = delete;
234 };
235 
236 } // namespace Eigen
237 
238 #endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
#define eigen_plain_assert(condition)
Definition: Assert.h:156
int n
Array< double, 1, 3 > e(1./3., 0.5, 2.)
#define EIGEN_ALWAYS_INLINE
Definition: Macros.h:836
#define eigen_assert(x)
Definition: Macros.h:902
RowVector3d w
#define EIGEN_MUTEX
Definition: ThreadPool:55
#define EIGEN_MUTEX_LOCK
Definition: ThreadPool:58
void operator=(const RunQueue &)=delete
Work PushBack(Work w)
Definition: RunQueue.h:86
unsigned SizeOrNotEmpty() const
Definition: RunQueue.h:193
std::atomic< unsigned > front_
Definition: RunQueue.h:185
RunQueue(const RunQueue &)=delete
std::atomic< unsigned > back_
Definition: RunQueue.h:186
unsigned PopBackHalf(std::vector< Work > *result)
Definition: RunQueue.h:119
Work PopBack()
Definition: RunQueue.h:102
EIGEN_MUTEX mutex_
Definition: RunQueue.h:177
Work PopFront()
Definition: RunQueue.h:70
EIGEN_ALWAYS_INLINE unsigned CalculateSize(unsigned front, unsigned back) const
Definition: RunQueue.h:220
void Flush()
Definition: RunQueue.h:159
Work PushFront(Work w)
Definition: RunQueue.h:55
unsigned Size() const
Definition: RunQueue.h:152
static const unsigned kMask2
Definition: RunQueue.h:167
bool Empty() const
Definition: RunQueue.h:156
Elem array_[kSize]
Definition: RunQueue.h:187
static const unsigned kMask
Definition: RunQueue.h:166
std::uint8_t uint8_t
Definition: Meta.h:35
: InteropHeaders
Definition: Core:139
std::atomic< uint8_t > state
Definition: RunQueue.h:169