32#define Job_CacheAlign alignas(k_FalseSharingPadSize)
62 const std::lock_guard<std::mutex> guard(
m_Lock);
76 bool Pop(T*
const out_value)
78 JobAssert(out_value !=
nullptr,
"`out_value` cannot be a nullptr.");
80 const std::lock_guard<std::mutex> guard(
m_Lock);
139 static_assert(atomic_size_type::is_always_lock_free,
"Expected to be lockfree.");
161 return PushLazy([&value](T*
const destination) { ::new (destination) T(value); });
164 bool Pop(T*
const out_value)
166 JobAssert(out_value !=
nullptr,
"`out_value` cannot be a nullptr.");
168 return PopLazy([out_value](T&& value) { *out_value = std::move(value); });
172 template<
typename CallbackFn>
192 template<
typename CallbackFn>
206 T*
const element =
ElementAt(read_index);
207 callback(std::move(*element));
246 static_assert(AtomicT::is_always_lock_free,
"T Should be a small pointer-like type, expected to be lock-free when atomic.");
285 const size_type size = write_index - read_index;
292 ElementAt(write_index)->store(value, std::memory_order_relaxed);
312 std::atomic_thread_fence(std::memory_order_seq_cst);
316 if (consumer_index <= producer_index)
318 if (consumer_index == producer_index)
320 const bool successful_pop =
m_ConsumerIndex.compare_exchange_strong(consumer_index, consumer_index + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
324 *out_value =
ElementAt(producer_index)->load(std::memory_order_relaxed);
331 *out_value =
ElementAt(producer_index)->load(std::memory_order_relaxed);
347 std::atomic_thread_fence(std::memory_order_seq_cst);
352 if (read_index < write_index)
355 T result =
ElementAt(read_index)->load(std::memory_order_relaxed);
358 if (
m_ConsumerIndex.compare_exchange_strong(read_index, read_index + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
360 *out_value = std::move(result);
422 return PushImpl<true>(elements, num_elements) != 0u;
427 return PushImpl<false>(elements, num_elements);
432 return PopImpl<true>(out_elements, num_elements) != 0u;
437 return PopImpl<false>(out_elements, num_elements);
441 template<
bool allOrNothing>
445 if (RequestWriteRange<allOrNothing>(&range, num_elements))
449 return written_elements;
455 template<
bool allOrNothing>
459 if (RequestPopRange<allOrNothing>(&range, num_elements))
463 return read_elements;
469 template<
bool allOrNothing>
480 if constexpr (allOrNothing)
482 if (capacity_left < num_items)
488 if (capacity_left == 0)
493 const size_type num_element_to_write = capacity_left < num_items ? capacity_left : num_items;
495 new_head = old_head + num_element_to_write;
497 }
while (!
m_ProducerPending.compare_exchange_weak(old_head, new_head, std::memory_order_relaxed, std::memory_order_relaxed));
499 *out_range = {old_head, new_head};
503 template<
bool allOrNothing>
514 size_t capacity_left = (
m_Capacity - distance);
515 if constexpr (allOrNothing)
517 if (capacity_left < num_items)
528 const size_type num_element_to_read = capacity_left < num_items ? capacity_left : num_items;
530 new_tail = old_tail + num_element_to_read;
532 }
while (!
m_ConsumerPending.compare_exchange_weak(old_tail, new_tail, std::memory_order_relaxed, std::memory_order_relaxed));
534 *out_range = {old_tail, new_tail};
543 const size_type num_items_before_split = write_size < capacity_before_split ? write_size : capacity_before_split;
544 const size_type num_items_after_split = write_size - num_items_before_split;
546 std::copy_n(elements + 0u, num_items_before_split,
m_Queue + real_start);
547 std::copy_n(elements + num_items_before_split, num_items_after_split,
m_Queue + 0u);
557 const size_type num_items_before_split = read_size < capacity_before_split ? read_size : capacity_before_split;
558 const size_type num_items_after_split = read_size - num_items_before_split;
560 std::copy_n(std::make_move_iterator(
m_Queue + real_start), num_items_before_split, out_elements + 0u);
561 std::copy_n(std::make_move_iterator(
m_Queue + 0u), num_items_after_split, out_elements + num_items_before_split);
569 while (!commit->compare_exchange_weak(
570 start_copy = range.
start,
572 std::memory_order_release,
573 std::memory_order_relaxed))
581 return (b > a) ? (b - a) :
m_Capacity - a + b;
void Initialize(T *const memory_backing, const size_type capacity) noexcept
bool Push(const T &value)
bool Pop(T *const out_value)
T * elementAt(const size_type raw_index) const noexcept
size_type mask(const size_type raw_index) const noexcept
unsigned char m_Padding0[k_FalseSharingPadSize - sizeof(atomic_size_type) *2]
size_type PushUpTo(const value_type *elements, const size_type num_elements)
void Commit(atomic_size_type *commit, const IndexRange range) const
atomic_size_type m_ProducerPending
size_type PopImpl(value_type *out_elements, const size_type num_elements)
size_type ReadElements(value_type *const out_elements, const IndexRange range) const
atomic_size_type m_ProducerCommited
size_type PopUpTo(value_type *out_elements, const size_type num_elements)
bool RequestPopRange(IndexRange *out_range, const size_type num_items)
bool PopExact(value_type *out_elements, const size_type num_elements)
std::atomic< size_type > atomic_size_type
unsigned char m_Padding2[k_FalseSharingPadSize - sizeof(m_Queue) - sizeof(m_Capacity)]
atomic_size_type m_ConsumerCommited
size_type Distance(const size_type a, const size_type b) const
atomic_size_type m_ConsumerPending
bool RequestWriteRange(IndexRange *out_range, const size_type num_items)
size_type WriteElements(const value_type *const elements, const IndexRange range)
size_type PushImpl(const value_type *elements, const size_type num_elements)
unsigned char m_Padding1[k_FalseSharingPadSize - sizeof(atomic_size_type) *2]
void Initialize(value_type *const memory_backing, const size_type capacity) noexcept
bool PushExact(const value_type *elements, const size_type num_elements)
AtomicT * ElementAt(const size_type index) const noexcept
atomic_size_type m_ConsumerIndex
SPMCDequeStatus Pop(T *const out_value)
atomic_size_type m_ProducerIndex
SPMCDequeStatus Push(const T &value)
void Initialize(AtomicT *const memory_backing, const size_type capacity) noexcept
SPMCDequeStatus Steal(T *const out_value)
unsigned char m_Padding0[k_FalseSharingPadSize - sizeof(m_ProducerIndex) - sizeof(m_ConsumerIndex)]
std::atomic< size_type > atomic_size_type
unsigned char m_Padding4[k_FalseSharingPadSize - sizeof(m_Data) - sizeof(m_Capacity) - sizeof(m_CapacityMask)]
std::atomic< size_type > atomic_size_type
atomic_size_type m_ConsumerIndex
T * ElementAt(const size_type index) const noexcept
unsigned char m_Padding2[k_FalseSharingPadSize - sizeof(m_ConsumerIndex)]
bool Push(const T &value)
bool IsFull(const size_type head, const size_type tail) const noexcept
unsigned char m_Padding1[k_FalseSharingPadSize - sizeof(m_CachedConsumerIndex)]
unsigned char m_Padding0[k_FalseSharingPadSize - sizeof(m_ProducerIndex)]
size_type m_CachedProducerIndex
unsigned char m_Padding3[k_FalseSharingPadSize - sizeof(m_CachedProducerIndex)]
size_type m_CachedConsumerIndex
bool PopLazy(CallbackFn &&callback)
bool Pop(T *const out_value)
atomic_size_type m_ProducerIndex
void Initialize(T *const memory_backing, const size_type capacity) noexcept
bool PushLazy(CallbackFn &&callback)
static bool IsEmpty(const size_type head, const size_type tail) noexcept
API for a multi-threading job system.
Assertion macro for this library.
#define JobAssert(expr, msg)
static constexpr std::size_t k_FalseSharingPadSize
void PauseProcessor() noexcept
CPU pause instruction to indicate when you are in a spin wait loop.
@ FAILED_RACE
Returned from Pop and Steal.
@ FAILED_SIZE
Returned from Push, Pop and Steal.
@ SUCCESS
Returned from Push, Pop and Steal.