32#define Job_CacheAlign alignas(k_FalseSharingPadSize)
62 const std::lock_guard<std::mutex> guard(
m_Lock);
76 bool Pop(T*
const out_value)
78 JobAssert(out_value !=
nullptr,
"`out_value` cannot be a nullptr.");
80 const std::lock_guard<std::mutex> guard(
m_Lock);
139 static_assert(atomic_size_type::is_always_lock_free,
"Expected to be lockfree.");
160 return PushLazy([&value](T*
const destination) { ::new (destination) T(value); });
163 bool Pop(T*
const out_value)
165 JobAssert(out_value !=
nullptr,
"`out_value` cannot be a nullptr.");
167 return PopLazy([out_value](T&& value) { *out_value = std::move(value); });
171 template<
typename CallbackFn>
191 template<
typename CallbackFn>
205 T*
const element =
ElementAt(read_index);
206 callback(std::move(*element));
245 static_assert(AtomicT::is_always_lock_free,
"T Should be a small pointer-like type, expected to be lock-free when atomic.");
283 const size_type size = write_index - read_index;
290 ElementAt(write_index)->store(value, std::memory_order_relaxed);
310 std::atomic_thread_fence(std::memory_order_seq_cst);
314 if (consumer_index <= producer_index)
316 if (consumer_index == producer_index)
318 const bool successful_pop =
m_ConsumerIndex.compare_exchange_strong(consumer_index, consumer_index + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
322 *out_value =
ElementAt(producer_index)->load(std::memory_order_relaxed);
329 *out_value =
ElementAt(producer_index)->load(std::memory_order_relaxed);
345 std::atomic_thread_fence(std::memory_order_seq_cst);
350 if (read_index < write_index)
353 T result =
ElementAt(read_index)->load(std::memory_order_relaxed);
356 if (
m_ConsumerIndex.compare_exchange_strong(read_index, read_index + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
358 *out_value = std::move(result);
419 return PushImpl<true>(elements, num_elements) != 0u;
424 return PushImpl<false>(elements, num_elements);
429 return PopImpl<true>(out_elements, num_elements) != 0u;
434 return PopImpl<false>(out_elements, num_elements);
438 template<
bool allOrNothing>
442 if (RequestWriteRange<allOrNothing>(&range, num_elements))
446 return written_elements;
452 template<
bool allOrNothing>
456 if (RequestPopRange<allOrNothing>(&range, num_elements))
460 return read_elements;
466 template<
bool allOrNothing>
477 if constexpr (allOrNothing)
479 if (capacity_left < num_items)
485 if (capacity_left == 0)
490 const size_type num_element_to_write = capacity_left < num_items ? capacity_left : num_items;
492 new_head = old_head + num_element_to_write;
494 }
while (!
m_ProducerPending.compare_exchange_weak(old_head, new_head, std::memory_order_relaxed, std::memory_order_relaxed));
496 *out_range = {old_head, new_head};
500 template<
bool allOrNothing>
511 size_t capacity_left = (
m_Capacity - distance);
512 if constexpr (allOrNothing)
514 if (capacity_left < num_items)
525 const size_type num_element_to_read = capacity_left < num_items ? capacity_left : num_items;
527 new_tail = old_tail + num_element_to_read;
529 }
while (!
m_ConsumerPending.compare_exchange_weak(old_tail, new_tail, std::memory_order_relaxed, std::memory_order_relaxed));
531 *out_range = {old_tail, new_tail};
540 const size_type num_items_before_split = write_size < capacity_before_split ? write_size : capacity_before_split;
541 const size_type num_items_after_split = write_size - num_items_before_split;
543 std::copy_n(elements + 0u, num_items_before_split,
m_Queue + real_start);
544 std::copy_n(elements + num_items_before_split, num_items_after_split,
m_Queue + 0u);
554 const size_type num_items_before_split = read_size < capacity_before_split ? read_size : capacity_before_split;
555 const size_type num_items_after_split = read_size - num_items_before_split;
557 std::copy_n(std::make_move_iterator(
m_Queue + real_start), num_items_before_split, out_elements + 0u);
558 std::copy_n(std::make_move_iterator(
m_Queue + 0u), num_items_after_split, out_elements + num_items_before_split);
566 while (!commit->compare_exchange_weak(
567 start_copy = range.
start,
569 std::memory_order_release,
570 std::memory_order_relaxed))
578 return (b > a) ? (b - a) :
m_Capacity - a + b;
void Initialize(T *const memory_backing, const size_type capacity) noexcept
bool Push(const T &value)
bool Pop(T *const out_value)
T * elementAt(const size_type raw_index) const noexcept
size_type mask(const size_type raw_index) const noexcept
unsigned char m_Padding0[k_FalseSharingPadSize - sizeof(atomic_size_type) *2]
size_type PushUpTo(const value_type *elements, const size_type num_elements)
void Commit(atomic_size_type *commit, const IndexRange range) const
atomic_size_type m_ProducerPending
size_type PopImpl(value_type *out_elements, const size_type num_elements)
size_type ReadElements(value_type *const out_elements, const IndexRange range) const
atomic_size_type m_ProducerCommited
size_type PopUpTo(value_type *out_elements, const size_type num_elements)
bool RequestPopRange(IndexRange *out_range, const size_type num_items)
bool PopExact(value_type *out_elements, const size_type num_elements)
std::atomic< size_type > atomic_size_type
unsigned char m_Padding2[k_FalseSharingPadSize - sizeof(m_Queue) - sizeof(m_Capacity)]
atomic_size_type m_ConsumerCommited
size_type Distance(const size_type a, const size_type b) const
atomic_size_type m_ConsumerPending
bool RequestWriteRange(IndexRange *out_range, const size_type num_items)
size_type WriteElements(const value_type *const elements, const IndexRange range)
size_type PushImpl(const value_type *elements, const size_type num_elements)
unsigned char m_Padding1[k_FalseSharingPadSize - sizeof(atomic_size_type) *2]
void Initialize(value_type *const memory_backing, const size_type capacity) noexcept
bool PushExact(const value_type *elements, const size_type num_elements)
AtomicT * ElementAt(const size_type index) const noexcept
atomic_size_type m_ConsumerIndex
SPMCDequeStatus Pop(T *const out_value)
atomic_size_type m_ProducerIndex
SPMCDequeStatus Push(const T &value)
void Initialize(AtomicT *const memory_backing, const size_type capacity) noexcept
SPMCDequeStatus Steal(T *const out_value)
unsigned char m_Padding0[k_FalseSharingPadSize - sizeof(m_ProducerIndex) - sizeof(m_ConsumerIndex)]
std::atomic< size_type > atomic_size_type
unsigned char m_Padding4[k_FalseSharingPadSize - sizeof(m_Data) - sizeof(m_Capacity) - sizeof(m_CapacityMask)]
std::atomic< size_type > atomic_size_type
atomic_size_type m_ConsumerIndex
T * ElementAt(const size_type index) const noexcept
unsigned char m_Padding2[k_FalseSharingPadSize - sizeof(m_ConsumerIndex)]
bool Push(const T &value)
bool IsFull(const size_type head, const size_type tail) const noexcept
unsigned char m_Padding1[k_FalseSharingPadSize - sizeof(m_CachedConsumerIndex)]
unsigned char m_Padding0[k_FalseSharingPadSize - sizeof(m_ProducerIndex)]
size_type m_CachedProducerIndex
unsigned char m_Padding3[k_FalseSharingPadSize - sizeof(m_CachedProducerIndex)]
size_type m_CachedConsumerIndex
bool PopLazy(CallbackFn &&callback)
bool Pop(T *const out_value)
atomic_size_type m_ProducerIndex
void Initialize(T *const memory_backing, const size_type capacity) noexcept
bool PushLazy(CallbackFn &&callback)
static bool IsEmpty(const size_type head, const size_type tail) noexcept
API for a multi-threading job system.
Assertion macro for this library.
#define JobAssert(expr, msg)
static constexpr std::size_t k_FalseSharingPadSize
void PauseProcessor() noexcept
CPU pause instruction to indicate when you are in a spin wait loop.
@ FAILED_RACE
Returned from Pop and Steal.
@ FAILED_SIZE
Returned from Push, Pop and Steal.
@ SUCCESS
Returned from Push, Pop and Steal.