mpmc_node_queue.h
12.0 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#pragma once
#include "../C/Baselib_Memory.h"
#include "../C/Baselib_Atomic_LLSC.h"
#include "mpmc_node.h"
namespace baselib
{
BASELIB_CPP_INTERFACE
{
// In computer science, a queue is a collection in which the entities in the collection are kept in order and the principal (or only) operations on the
// collection are the addition of entities to the rear terminal position, known as enqueue, and removal of entities from the front terminal position, known
// as dequeue. This makes the queue a First-In-First-Out (FIFO) data structure. In a FIFO data structure, the first element added to the queue will be the
// first one to be removed. This is equivalent to the requirement that once a new element is added, all elements that were added before have to be removed
// before the new element can be removed. Often a peek or front operation is also entered, returning the value of the front element without dequeuing it.
// A queue is an example of a linear data structure, or more abstractly a sequential collection.
//
// "Queue (abstract data type)", Wikipedia: The Free Encyclopedia
// https://en.wikipedia.org/w/index.php?title=Queue_(abstract_data_type)&oldid=878671332
//
// This implementation is a lockless node queue capable of handling multiple concurrent producers and consumers
//
// Node types are required to inherit the mpmc_node class. No data from the inherited class is modified/copied, so no restrictions apply.
// The node memory is allocated and destroyed by the user (user owned).
// Dequeued nodes may be overwritten/discarded and/or reused.
// Dequeued nodes may not be deleted (released from user space memory) while any consumer thread is in the scope of a deque call.
//
// Notes consumer threads:
// While dequeued nodes may be reused and/or overwritten they must however remain in application readable memory (user space memory) until it can be
// guaranteed no consumer thread is still processing the node i.e. not within the scope of a dequeue call.
// Even though the value is ignored (discarded by version check) any consumer thread may still read the node link information.
// Consumer threads are concurrently attempting to dequeue the front in a DCAS loop and the first to succeed will update the queue front and other
// threads continue processing the next front node in the queue. Threads are garuanteed to progress dequeuing nodes even if another consumer
// thread falls asleep during a dequeue, but may fail to dequeue in the combination of the queue getting pre-emptied and the thread resetting the
// state (reload back) falls asleep while swapping the back (between 2x consecutive CAS operations).
// This is usually an extremely infrequent occurence due to the combination required (can not happen unless there's exactly one item in the queue).
// Producer threads always progress independently.
//
// Notes on producer threads:
// A producer thread swaps the back and writes the link information in two consecutive atomic operations. If a producer thread falls asleep after the
// swap and before the link information has been written, the consumer thread(s) will not advance past this point since it doesn't have
// the information yet. Therefore the consumer threads calls will yield null until that particular producer thread wakes back up.
//
template<typename T>
class alignas(sizeof(intptr_t) * 2)mpmc_node_queue
{
public:
// Create a new queue instance.
mpmc_node_queue()
{
m_FrontIntPtr = 1;
m_Front.obj.idx = 1;
m_Back.obj = 0;
atomic_thread_fence(memory_order_seq_cst);
}
// Returns true if queue is empty.
bool empty() const
{
return m_Back.load(memory_order_relaxed) == 0;
}
// Push a node to the back of the queue.
void push_back(T* node)
{
node->next.store(0, memory_order_relaxed);
if (T* prev = m_Back.exchange(node, memory_order_release))
{
prev->next.store(node, memory_order_release);
}
else
{
// store the new front (reload) and add one which will put idx back to an
// even number, releasing the consumer threads (ptr is always null and idx odd at this point).
if (PLATFORM_LLSC_NATIVE_SUPPORT)
{
m_FrontPair.ptr.store(node, memory_order_release);
}
else
{
m_FrontPair.ptr.store(node, memory_order_relaxed);
m_FrontPair.idx.fetch_add(1, memory_order_release);
}
}
}
// Push a linked list of nodes to the back of the queue.
void push_back(T* first_node, T* last_node)
{
last_node->next.store(0, memory_order_relaxed);
if (T* prev = m_Back.exchange(last_node, memory_order_release))
{
prev->next.store(first_node, memory_order_release);
}
else
{
if (PLATFORM_LLSC_NATIVE_SUPPORT)
{
m_FrontPair.ptr.store(first_node, memory_order_release);
}
else
{
m_FrontPair.ptr.store(first_node, memory_order_relaxed);
m_FrontPair.idx.fetch_add(1, memory_order_release);
}
}
}
// Try to pop frontmost node of the queue.
//
// Note that if null is returned, there may still be push operations in progress in a producer thread.
// Use the "empty" function to check if a queue is empty.
//
// \returns front node of the queue or null.
T* try_pop_front()
{
T* node, *next;
if (PLATFORM_LLSC_NATIVE_SUPPORT)
{
intptr_t value;
Baselib_atomic_llsc_ptr_acquire_release_v(&m_Front, &node, &next,
{
// If front bit 0 is set, queue back is being reloaded or queue is empty.
value = reinterpret_cast<intptr_t>(node);
if (value & 1)
{
Baselib_atomic_llsc_break();
return 0;
}
// Fetch next node. If zero, node is the current backnode. LLSC Monitor is internally cleared by subsequent cmpxchg.
if (!(next = static_cast<T*>(node->next.obj)))
goto BackNode;
});
return node;
BackNode:
// - filters obsolete nodes
// - Exclusive access (re-entrant block)
T * front = node;
if (!m_FrontPair.ptr.compare_exchange_strong(front, reinterpret_cast<T*>(value | 1), memory_order_acquire, memory_order_relaxed))
return 0;
// - filters incomplete nodes
// - check if node is back == retrigger new back
if (!m_Back.compare_exchange_strong(front, 0, memory_order_acquire, memory_order_relaxed))
{
// Back progressed or node is incomplete, restore access and return 0
m_FrontIntPtr.fetch_and(~1, memory_order_release);
return 0;
}
// Success, back == front node, back was set to zero above and index / access is restored by producers, so we return the back node.
// LLSC monitors invalidates any obsolete nodes still in process in other threads.
return node;
}
else
{
SequencedFrontPtr front, value;
// Get front node. The DCAS while operation will update front on retry
front = m_Front.load(memory_order_acquire);
do
{
// If front idx bit 0 is set, queue back is being reloaded or queue is empty.
if (front.idx & 1)
return 0;
// Fetch next node. If zero, node is the current backnode
node = front.ptr;
if (!(next = static_cast<T*>(node->next.load(memory_order_relaxed))))
goto BackNodeDCAS;
// On success, replace the current with the next node and return node. On fail, retry with updated front.
value.ptr = next;
value.idx = front.idx + 2;
}
while (!m_Front.compare_exchange_strong(front, value, memory_order_acquire, memory_order_relaxed));
return node;
BackNodeDCAS:
// - filters obsolete nodes
// - Exclusive access (re-entrant block)
value.ptr = front.ptr;
value.idx = front.idx | 1;
if (!m_Front.compare_exchange_strong(front, value, memory_order_acquire, memory_order_relaxed))
return 0;
// - filters incomplete nodes
// - check if node is back == retrigger new back
value.ptr = node;
if (!m_Back.compare_exchange_strong(value.ptr, 0, memory_order_acquire, memory_order_relaxed))
{
// Back progressed or node is incomplete, restore access and return 0
m_FrontPair.idx.fetch_and(~1, memory_order_release);
return 0;
}
// Success, back == front node, back was set to zero above and index / access is restored by producers, so we return the back node.
// Version check invalidates any obsolete nodes in still in process in other threads.
return node;
}
}
private:
typedef struct
{
T* ptr;
intptr_t idx;
} SequencedFrontPtr;
typedef struct
{
atomic<T*> ptr;
atomic<intptr_t> idx;
} FrontPair;
// Space out atomic members to individual cache lines. Required for native LLSC operations on some architectures, others to avoid false sharing
char _cachelineSpacer0[PLATFORM_CACHE_LINE_SIZE];
union
{
atomic<intptr_t> m_FrontIntPtr;
FrontPair m_FrontPair;
atomic<SequencedFrontPtr> m_Front;
};
char _cachelineSpacer1[PLATFORM_CACHE_LINE_SIZE - sizeof(SequencedFrontPtr)];
atomic<T*> m_Back;
char _cachelineSpacer2[PLATFORM_CACHE_LINE_SIZE - sizeof(T*)];
// FrontPair is atomic reflections of the SequencedFront fields used for CAS vs DCAS ops. They must match in size and layout.
// Do note that we can not check layout (offsetof) as the template class is incomplete!
static_assert(sizeof(mpmc_node_queue::m_FrontPair) == sizeof(mpmc_node_queue::m_Front), "SequencedFrontPtr and FrontPair must be of equal size");
// Verify mpmc_node is base of T
static_assert(std::is_base_of<baselib::mpmc_node, T>::value, "Node class/struct used with baselib::mpmc_node_queue must derive from baselib::mpmc_node.");
};
}
}