mpsc_node_queue.h
6.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#pragma once
#include "../C/Baselib_Memory.h"
#include "mpsc_node.h"
namespace baselib
{
BASELIB_CPP_INTERFACE
{
// In computer science, a queue is a collection in which the entities in the collection are kept in order and the principal (or only) operations on the
// collection are the addition of entities to the rear terminal position, known as enqueue, and removal of entities from the front terminal position, known
// as dequeue. This makes the queue a First-In-First-Out (FIFO) data structure. In a FIFO data structure, the first element added to the queue will be the
// first one to be removed. This is equivalent to the requirement that once a new element is added, all elements that were added before have to be removed
// before the new element can be removed. Often a peek or front operation is also entered, returning the value of the front element without dequeuing it.
// A queue is an example of a linear data structure, or more abstractly a sequential collection.
//
// "Queue (abstract data type)", Wikipedia: The Free Encyclopedia
// https://en.wikipedia.org/w/index.php?title=Queue_(abstract_data_type)&oldid=878671332
//
// This implementation is a lockless node queue capable of handling multiple producers and a single consumer (exclusive access)
//
// Node types are required to inherit the mpsc_node class. No data from the inherited class is modified/copied, so no restrictions apply.
// The node memory is allocated and destroyed by the user (user owned).
// Dequeued nodes may be deleted, overwritten/discarded and/or reused.
//
// Notes consumer threads:
// Only one consumer thread will exclusively access the front node. Other consumer threads will always progress, either by failing to dequeue or
// successfully dequeuing the next node once the current thread thread opens access. As opposed to the parallel consumer implementation,
// this is significantly more performant as no DCAS-operations/loops are involved, but if the consumer thread with current exclusive access falls asleep
// when dequeuing, no other threads will successfully dequeue until the thread wakes up.
// Producer threads always progress independently.
//
// Notes on producer threads:
// A producer thread swaps the back and writes the link information in two consecutive atomic operations. If a producer thread falls asleep after the
// swap and before the link information has been written, the consumer thread(s) will not advance past this point since it doesn't have
// the information yet. Therefore the consumer threads calls will yield null until that particular producer thread wakes back up.
//
template<typename T>
class alignas(sizeof(intptr_t) * 2)mpsc_node_queue
{
public:
// Create a new queue instance.
mpsc_node_queue()
{
m_Front.obj = 0;
m_Back.obj = 0;
atomic_thread_fence(memory_order_seq_cst);
}
// Returns true if queue is empty.
bool empty() const
{
return m_Back.load(memory_order_relaxed) == 0;
}
// Push a node to the back of the queue.
void push_back(T* node)
{
node->next.store(0, memory_order_relaxed);
if (T* prev = m_Back.exchange(node, memory_order_release))
prev->next.store(node, memory_order_release);
else
m_Front.store(node, memory_order_release);
}
// Push a linked list of nodes to the back of the queue.
void push_back(T* first_node, T* last_node)
{
last_node->next.store(0, memory_order_relaxed);
if (T* prev = m_Back.exchange(last_node, memory_order_release))
prev->next.store(first_node, memory_order_release);
else
m_Front.store(first_node, memory_order_release);
}
// Try to pop frontmost node of the queue.
//
// Note that if null is returned, there may still be push operations in progress in a producer thread.
// Use the "empty" function to check if a queue is empty.
//
// \returns front node of the queue or null.
T* try_pop_front()
{
T* node, *next, *expected;
// acquire thread exclusive access of front node, return 0 if fail or queue is empty
intptr_t front = m_FrontIntPtr.fetch_or(1, memory_order_acquire);
if ((front & 1) | !(front >> 1))
return 0;
node = (T*)front;
next = static_cast<T*>(node->next.load(memory_order_relaxed));
if (!next)
{
// Set to zero, assuming we got the head. Exclusive access maintained as only producer can write zero.
m_Front.store(0, memory_order_release);
// - filters incomplete nodes
// - check if node is back == retrigger new back
expected = node;
if (!m_Back.compare_exchange_strong(expected, 0, memory_order_acquire, memory_order_relaxed))
{
// Back progressed or node is incomplete, reset front ptr and return 0.
m_Front.store(node, memory_order_release);
return 0;
}
// Successfully got the back, so just return node.
return node;
}
// Store next (clear block) and return node
m_Front.store(next, memory_order_release);
return node;
}
private:
// Space out atomic members to individual cache lines. Required for native LLSC operations on some architectures, others to avoid false sharing
char _cachelineSpacer0[PLATFORM_CACHE_LINE_SIZE];
union
{
atomic<T*> m_Front;
atomic<intptr_t> m_FrontIntPtr;
};
char _cachelineSpacer1[PLATFORM_CACHE_LINE_SIZE - sizeof(T*)];
atomic<T*> m_Back;
char _cachelineSpacer2[PLATFORM_CACHE_LINE_SIZE - sizeof(T*)];
// Verify mpsc_node is base of T
static_assert(std::is_base_of<baselib::mpsc_node, T>::value, "Node class/struct used with baselib::mpsc_node_queue must derive from baselib::mpsc_node.");
};
}
}