BluFedora Job System v1.0.0
This is a C++ job system library for use in game engines.
Job::SPMCDeque< T > Class Template Reference

#include <job_queue.hpp>

Public Types

using size_type = std::int64_t
 
using atomic_size_type = std::atomic< size_type >
 

Public Member Functions

 SPMCDeque ()=default
 
 ~SPMCDeque ()=default
 
void Initialize (AtomicT *const memory_backing, const size_type capacity) noexcept
 
SPMCDequeStatus Push (const T &value)
 
SPMCDequeStatus Pop (T *const out_value)
 
SPMCDequeStatus Steal (T *const out_value)
 

Private Types

using AtomicT = std::atomic< T >
 

Private Member Functions

AtomicTElementAt (const size_type index) const noexcept
 

Private Attributes

atomic_size_type m_ProducerIndex
 
atomic_size_type m_ConsumerIndex
 
unsigned char m_Padding0 [k_FalseSharingPadSize - sizeof(m_ProducerIndex) - sizeof(m_ConsumerIndex)]
 
AtomicTm_Data
 
size_type m_Capacity
 
size_type m_CapacityMask
 

Detailed Description

template<typename T>
class Job::SPMCDeque< T >

Definition at line 241 of file job_queue.hpp.

Member Typedef Documentation

◆ AtomicT

template<typename T >
using Job::SPMCDeque< T >::AtomicT = std::atomic<T>
private

Definition at line 244 of file job_queue.hpp.

◆ size_type

template<typename T >
using Job::SPMCDeque< T >::size_type = std::int64_t

Definition at line 249 of file job_queue.hpp.

◆ atomic_size_type

template<typename T >
using Job::SPMCDeque< T >::atomic_size_type = std::atomic<size_type>

Definition at line 250 of file job_queue.hpp.

Constructor & Destructor Documentation

◆ SPMCDeque()

template<typename T >
Job::SPMCDeque< T >::SPMCDeque ( )
default

◆ ~SPMCDeque()

template<typename T >
Job::SPMCDeque< T >::~SPMCDeque ( )
default

Member Function Documentation

◆ Initialize()

template<typename T >
void Job::SPMCDeque< T >::Initialize ( AtomicT *const  memory_backing,
const size_type  capacity 
)
inlinenoexcept

Definition at line 268 of file job_queue.hpp.

269 {
270 m_ProducerIndex = 0;
271 m_ConsumerIndex = 0;
272 m_Data = memory_backing;
273 m_Capacity = capacity;
274 m_CapacityMask = capacity - 1;
275
276 JobAssert((m_Capacity & m_CapacityMask) == 0, "Capacity must be a power of 2.");
277 }
size_type m_CapacityMask
Definition: job_queue.hpp:261
atomic_size_type m_ConsumerIndex
Definition: job_queue.hpp:254
atomic_size_type m_ProducerIndex
Definition: job_queue.hpp:253
size_type m_Capacity
Definition: job_queue.hpp:260
AtomicT * m_Data
Definition: job_queue.hpp:259
#define JobAssert(expr, msg)
Definition: job_assert.hpp:27

References JobAssert, Job::SPMCDeque< T >::m_Capacity, Job::SPMCDeque< T >::m_CapacityMask, Job::SPMCDeque< T >::m_ConsumerIndex, Job::SPMCDeque< T >::m_Data, and Job::SPMCDeque< T >::m_ProducerIndex.

◆ Push()

template<typename T >
SPMCDequeStatus Job::SPMCDeque< T >::Push ( const T &  value)
inline

Definition at line 281 of file job_queue.hpp.

282 {
283 const size_type write_index = m_ProducerIndex.load(std::memory_order_relaxed);
284 const size_type read_index = m_ConsumerIndex.load(std::memory_order_acquire);
285 const size_type size = write_index - read_index;
286
287 if (size > m_CapacityMask)
288 {
290 }
291
292 ElementAt(write_index)->store(value, std::memory_order_relaxed);
293
294 m_ProducerIndex.store(write_index + 1, std::memory_order_release);
295
297 }
AtomicT * ElementAt(const size_type index) const noexcept
Definition: job_queue.hpp:371
std::int64_t size_type
Definition: job_queue.hpp:249
@ FAILED_SIZE
Returned from Push, Pop and Steal.
@ SUCCESS
Returned from Push, Pop and Steal.

References Job::SPMCDeque< T >::ElementAt(), Job::FAILED_SIZE, Job::SPMCDeque< T >::m_CapacityMask, Job::SPMCDeque< T >::m_ConsumerIndex, Job::SPMCDeque< T >::m_ProducerIndex, and Job::SUCCESS.

◆ Pop()

template<typename T >
SPMCDequeStatus Job::SPMCDeque< T >::Pop ( T *const  out_value)
inline

Definition at line 299 of file job_queue.hpp.

300 {
301 const size_type producer_index = m_ProducerIndex.load(std::memory_order_relaxed) - 1;
302
303 // Reserve the slot at the producer end.
304 m_ProducerIndex.store(producer_index, std::memory_order_relaxed);
305
306 // The above store needs to happen before this next read
307 // to have consistent view of the buffer.
308 //
309 // `m_ProducerIndex` can only be written to by this thread
310 // so first reserve a slot then we read what the other threads have to say.
311 //
312 std::atomic_thread_fence(std::memory_order_seq_cst);
313
314 size_type consumer_index = m_ConsumerIndex.load(std::memory_order_relaxed);
315
316 if (consumer_index <= producer_index)
317 {
318 if (consumer_index == producer_index) // Only one item in queue
319 {
320 const bool successful_pop = m_ConsumerIndex.compare_exchange_strong(consumer_index, consumer_index + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
321
322 if (successful_pop)
323 {
324 *out_value = ElementAt(producer_index)->load(std::memory_order_relaxed);
325 }
326
327 m_ProducerIndex.store(producer_index + 1, std::memory_order_relaxed);
329 }
330
331 *out_value = ElementAt(producer_index)->load(std::memory_order_relaxed);
333 }
334
335 // Empty Queue, so restore to canonical empty.
336 m_ProducerIndex.store(producer_index + 1, std::memory_order_seq_cst);
338 }
@ FAILED_RACE
Returned from Pop and Steal.

References Job::SPMCDeque< T >::ElementAt(), Job::FAILED_RACE, Job::FAILED_SIZE, Job::SPMCDeque< T >::m_ConsumerIndex, Job::SPMCDeque< T >::m_ProducerIndex, and Job::SUCCESS.

◆ Steal()

template<typename T >
SPMCDequeStatus Job::SPMCDeque< T >::Steal ( T *const  out_value)
inline

Definition at line 342 of file job_queue.hpp.

343 {
344 size_type read_index = m_ConsumerIndex.load(std::memory_order_acquire);
345
346 // Must fully read `m_ConsumerIndex` before we read the producer owned `m_ProducerIndex`.
347 std::atomic_thread_fence(std::memory_order_seq_cst);
348
349 const size_type write_index = m_ProducerIndex.load(std::memory_order_acquire);
350
351 // if (next_read_index <= write_index)
352 if (read_index < write_index)
353 {
354 // Must load result before the CAS, since a push can happen concurrently right after the CAS.
355 T result = ElementAt(read_index)->load(std::memory_order_relaxed);
356
357 // Need strong memory ordering to read the element before the cas.
358 if (m_ConsumerIndex.compare_exchange_strong(read_index, read_index + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
359 {
360 *out_value = std::move(result);
362 }
363
365 }
366
368 }

References Job::SPMCDeque< T >::ElementAt(), Job::FAILED_RACE, Job::FAILED_SIZE, Job::SPMCDeque< T >::m_ConsumerIndex, Job::SPMCDeque< T >::m_ProducerIndex, and Job::SUCCESS.

◆ ElementAt()

template<typename T >
AtomicT * Job::SPMCDeque< T >::ElementAt ( const size_type  index) const
inlineprivatenoexcept

Member Data Documentation

◆ m_ProducerIndex

template<typename T >
atomic_size_type Job::SPMCDeque< T >::m_ProducerIndex
private

◆ m_ConsumerIndex

template<typename T >
atomic_size_type Job::SPMCDeque< T >::m_ConsumerIndex
private

◆ m_Padding0

template<typename T >
unsigned char Job::SPMCDeque< T >::m_Padding0[k_FalseSharingPadSize - sizeof(m_ProducerIndex) - sizeof(m_ConsumerIndex)]
private

Definition at line 255 of file job_queue.hpp.

◆ m_Data

template<typename T >
AtomicT* Job::SPMCDeque< T >::m_Data
private

◆ m_Capacity

template<typename T >
size_type Job::SPMCDeque< T >::m_Capacity
private

Definition at line 260 of file job_queue.hpp.

Referenced by Job::SPMCDeque< T >::Initialize().

◆ m_CapacityMask

template<typename T >
size_type Job::SPMCDeque< T >::m_CapacityMask
private

The documentation for this class was generated from the following file: