BluFedora Job System v1.0.0
This is a C++ job system library for use in game engines.
Job::SPMCDeque< T > Class Template Reference

#include <job_queue.hpp>

Public Types

using size_type = std::int64_t
 
using atomic_size_type = std::atomic< size_type >
 

Public Member Functions

 SPMCDeque ()=default
 
 ~SPMCDeque ()=default
 
void Initialize (AtomicT *const memory_backing, const size_type capacity) noexcept
 
SPMCDequeStatus Push (const T &value)
 
SPMCDequeStatus Pop (T *const out_value)
 
SPMCDequeStatus Steal (T *const out_value)
 

Private Types

using AtomicT = std::atomic< T >
 

Private Member Functions

AtomicTElementAt (const size_type index) const noexcept
 

Private Attributes

atomic_size_type m_ProducerIndex
 
atomic_size_type m_ConsumerIndex
 
unsigned char m_Padding0 [k_FalseSharingPadSize - sizeof(m_ProducerIndex) - sizeof(m_ConsumerIndex)]
 
AtomicTm_Data
 
size_type m_Capacity
 
size_type m_CapacityMask
 

Detailed Description

template<typename T>
class Job::SPMCDeque< T >

Definition at line 240 of file job_queue.hpp.

Member Typedef Documentation

◆ AtomicT

template<typename T >
using Job::SPMCDeque< T >::AtomicT = std::atomic<T>
private

Definition at line 243 of file job_queue.hpp.

◆ size_type

template<typename T >
using Job::SPMCDeque< T >::size_type = std::int64_t

Definition at line 248 of file job_queue.hpp.

◆ atomic_size_type

template<typename T >
using Job::SPMCDeque< T >::atomic_size_type = std::atomic<size_type>

Definition at line 249 of file job_queue.hpp.

Constructor & Destructor Documentation

◆ SPMCDeque()

template<typename T >
Job::SPMCDeque< T >::SPMCDeque ( )
default

◆ ~SPMCDeque()

template<typename T >
Job::SPMCDeque< T >::~SPMCDeque ( )
default

Member Function Documentation

◆ Initialize()

template<typename T >
void Job::SPMCDeque< T >::Initialize ( AtomicT *const  memory_backing,
const size_type  capacity 
)
inlinenoexcept

Definition at line 266 of file job_queue.hpp.

267 {
268 m_ProducerIndex = 0;
269 m_ConsumerIndex = 0;
270 m_Data = memory_backing;
271 m_Capacity = capacity;
272 m_CapacityMask = capacity - 1;
273
274 JobAssert((m_Capacity & m_CapacityMask) == 0, "Capacity must be a power of 2.");
275 }
size_type m_CapacityMask
Definition: job_queue.hpp:260
atomic_size_type m_ConsumerIndex
Definition: job_queue.hpp:253
atomic_size_type m_ProducerIndex
Definition: job_queue.hpp:252
size_type m_Capacity
Definition: job_queue.hpp:259
AtomicT * m_Data
Definition: job_queue.hpp:258
#define JobAssert(expr, msg)
Definition: job_assert.hpp:27

References JobAssert, Job::SPMCDeque< T >::m_Capacity, Job::SPMCDeque< T >::m_CapacityMask, Job::SPMCDeque< T >::m_ConsumerIndex, Job::SPMCDeque< T >::m_Data, and Job::SPMCDeque< T >::m_ProducerIndex.

◆ Push()

template<typename T >
SPMCDequeStatus Job::SPMCDeque< T >::Push ( const T &  value)
inline

Definition at line 279 of file job_queue.hpp.

280 {
281 const size_type write_index = m_ProducerIndex.load(std::memory_order_relaxed);
282 const size_type read_index = m_ConsumerIndex.load(std::memory_order_acquire);
283 const size_type size = write_index - read_index;
284
285 if (size > m_CapacityMask)
286 {
288 }
289
290 ElementAt(write_index)->store(value, std::memory_order_relaxed);
291
292 m_ProducerIndex.store(write_index + 1, std::memory_order_release);
293
295 }
AtomicT * ElementAt(const size_type index) const noexcept
Definition: job_queue.hpp:369
std::int64_t size_type
Definition: job_queue.hpp:248
@ FAILED_SIZE
Returned from Push, Pop and Steal.
@ SUCCESS
Returned from Push, Pop and Steal.

References Job::SPMCDeque< T >::ElementAt(), Job::FAILED_SIZE, Job::SPMCDeque< T >::m_CapacityMask, Job::SPMCDeque< T >::m_ConsumerIndex, Job::SPMCDeque< T >::m_ProducerIndex, and Job::SUCCESS.

◆ Pop()

template<typename T >
SPMCDequeStatus Job::SPMCDeque< T >::Pop ( T *const  out_value)
inline

Definition at line 297 of file job_queue.hpp.

298 {
299 const size_type producer_index = m_ProducerIndex.load(std::memory_order_relaxed) - 1;
300
301 // Reserve the slot at the producer end.
302 m_ProducerIndex.store(producer_index, std::memory_order_relaxed);
303
304 // The above store needs to happen before this next read
305 // to have consistent view of the buffer.
306 //
307 // `m_ProducerIndex` can only be written to by this thread
308 // so first reserve a slot then we read what the other threads have to say.
309 //
310 std::atomic_thread_fence(std::memory_order_seq_cst);
311
312 size_type consumer_index = m_ConsumerIndex.load(std::memory_order_relaxed);
313
314 if (consumer_index <= producer_index)
315 {
316 if (consumer_index == producer_index) // Only one item in queue
317 {
318 const bool successful_pop = m_ConsumerIndex.compare_exchange_strong(consumer_index, consumer_index + 1, std::memory_order_seq_cst, std::memory_order_relaxed);
319
320 if (successful_pop)
321 {
322 *out_value = ElementAt(producer_index)->load(std::memory_order_relaxed);
323 }
324
325 m_ProducerIndex.store(producer_index + 1, std::memory_order_relaxed);
327 }
328
329 *out_value = ElementAt(producer_index)->load(std::memory_order_relaxed);
331 }
332
333 // Empty Queue, so restore to canonical empty.
334 m_ProducerIndex.store(producer_index + 1, std::memory_order_seq_cst);
336 }
@ FAILED_RACE
Returned from Pop and Steal.

References Job::SPMCDeque< T >::ElementAt(), Job::FAILED_RACE, Job::FAILED_SIZE, Job::SPMCDeque< T >::m_ConsumerIndex, Job::SPMCDeque< T >::m_ProducerIndex, and Job::SUCCESS.

◆ Steal()

template<typename T >
SPMCDequeStatus Job::SPMCDeque< T >::Steal ( T *const  out_value)
inline

Definition at line 340 of file job_queue.hpp.

341 {
342 size_type read_index = m_ConsumerIndex.load(std::memory_order_acquire);
343
344 // Must fully read `m_ConsumerIndex` before we read the producer owned `m_ProducerIndex`.
345 std::atomic_thread_fence(std::memory_order_seq_cst);
346
347 const size_type write_index = m_ProducerIndex.load(std::memory_order_acquire);
348
349 // if (next_read_index <= write_index)
350 if (read_index < write_index)
351 {
352 // Must load result before the CAS, since a push can happen concurrently right after the CAS.
353 T result = ElementAt(read_index)->load(std::memory_order_relaxed);
354
355 // Need strong memory ordering to read the element before the cas.
356 if (m_ConsumerIndex.compare_exchange_strong(read_index, read_index + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
357 {
358 *out_value = std::move(result);
360 }
361
363 }
364
366 }

References Job::SPMCDeque< T >::ElementAt(), Job::FAILED_RACE, Job::FAILED_SIZE, Job::SPMCDeque< T >::m_ConsumerIndex, Job::SPMCDeque< T >::m_ProducerIndex, and Job::SUCCESS.

◆ ElementAt()

template<typename T >
AtomicT * Job::SPMCDeque< T >::ElementAt ( const size_type  index) const
inlineprivatenoexcept

Member Data Documentation

◆ m_ProducerIndex

template<typename T >
atomic_size_type Job::SPMCDeque< T >::m_ProducerIndex
private

◆ m_ConsumerIndex

template<typename T >
atomic_size_type Job::SPMCDeque< T >::m_ConsumerIndex
private

◆ m_Padding0

template<typename T >
unsigned char Job::SPMCDeque< T >::m_Padding0[k_FalseSharingPadSize - sizeof(m_ProducerIndex) - sizeof(m_ConsumerIndex)]
private

Definition at line 254 of file job_queue.hpp.

◆ m_Data

template<typename T >
AtomicT* Job::SPMCDeque< T >::m_Data
private

◆ m_Capacity

template<typename T >
size_type Job::SPMCDeque< T >::m_Capacity
private

Definition at line 259 of file job_queue.hpp.

Referenced by Job::SPMCDeque< T >::Initialize().

◆ m_CapacityMask

template<typename T >
size_type Job::SPMCDeque< T >::m_CapacityMask
private

The documentation for this class was generated from the following file: