BluFedora Job System v1.0.0
This is a C++ job system library for use in game engines.
Job Namespace Reference

Namespaces

namespace  detail
 

Classes

struct  InitializationLock
 
struct  InitializationToken
 
struct  JobSystemContext
 
struct  JobSystemCreateOptions
 The runtime configuration for the Job System. More...
 
struct  JobSystemMemoryRequirements
 The memory requirements for a given configuration JobSystemCreateOptions. More...
 
class  LockedQueue
 
class  MPMCQueue
 
struct  Splitter
 
class  SPMCDeque
 
class  SPSCQueue
 
struct  Task
 
struct  TaskData
 A buffer for user-data you can write to, maybe large enough to store task data inline. More...
 
union  TaskFnStorage
 
union  TaskMemoryBlock
 
struct  TaskPool
 
struct  TaskPtr
 
struct  ThreadLocalState
 

Typedefs

using WorkerID = std::uint16_t
 The id type of each worker thread. More...
 
using TaskFn = void(*)(Task *)
 The signature of the type of function for a single Task. More...
 
using TaskHandle = std::uint16_t
 
using TaskHandleType = TaskHandle
 
using AtomicTaskHandleType = std::atomic< TaskHandle >
 
using WorkerIDType = WorkerID
 
using AtomicInt32 = std::atomic_int32_t
 
using Byte = unsigned char
 
using AtomicTaskPtr = std::atomic< TaskPtr >
 

Enumerations

enum class  QueueType : std::uint8_t { NORMAL = 0 , MAIN = 1 , WORKER = 2 }
 Determines which threads the task will be allowed to run on. More...
 
enum class  SPMCDequeStatus { SUCCESS , FAILED_RACE , FAILED_SIZE }
 

Functions

std::size_t NumSystemThreads () noexcept
 Makes some system calls to grab the number threads / processors on the device. This function can be called by any thread concurrently. More...
 
InitializationToken Initialize (const JobSystemMemoryRequirements &memory_requirements={}, void *const memory=nullptr) noexcept
 Sets up the Job system and creates all the worker threads. The thread that calls 'Job::Initialize' is considered the main thread. More...
 
void SetupUserThread ()
 Must be called in the callstack of the thread to be setup. More...
 
const char * ProcessorArchitectureName () noexcept
 An implementation defined name for the CPU architecture of the device. This function can be called by any thread concurrently. More...
 
std::uint16_t NumWorkers () noexcept
 Returns the number of workers created by the system. This function can be called by any thread concurrently. More...
 
WorkerID CurrentWorker () noexcept
 The current id of the current thread. This function can be called by any thread concurrently. More...
 
bool IsMainThread () noexcept
 Allows for querying if we are currently executing in the main thread. More...
 
void Shutdown () noexcept
 This will deallocate any memory used by the system and shutdown any threads created by 'bfJob::initialize'. More...
 
TaskTaskMake (const TaskFn function, Task *const parent=nullptr) noexcept
 Creates a new Task that should be later submitted by calling 'TaskSubmit'. More...
 
TaskData TaskGetData (Task *const task, const std::size_t alignment) noexcept
 Returns you the user-data buffer you way write to get data into your TaskFn. More...
 
void TaskAddContinuation (Task *const self, Task *const continuation, const QueueType queue=QueueType::NORMAL) noexcept
 A 'continuation' is a task that will be added to a queue after the 'self' Task has finished running. More...
 
template<typename T >
T * TaskDataAs (Task *const task) noexcept
 Grabs the user-data pointer as the T you specified. No safety is guaranteed, this is just a dumb cast. More...
 
template<typename T , typename... Args>
void TaskEmplaceData (Task *const task, Args &&... args)
 Calls the constructor of T on the user-data buffer. More...
 
template<typename T >
void TaskSetData (Task *const task, const T &data)
 Copies 'data' into the user-data buffer by calling the T copy constructor. More...
 
template<typename T >
void TaskDestructData (Task *const task)
 Helper for calling destructor on the task's user data. More...
 
template<typename Closure >
TaskTaskMake (Closure &&function, Task *const parent=nullptr)
 Creates a new task making a copy of the closure. More...
 
void TaskIncRef (Task *const task) noexcept
 Increments the task's ref count preventing it from being garbage collected. More...
 
void TaskDecRef (Task *const task) noexcept
 Decrements the task's ref count allow it to be garbage collected. More...
 
bool TaskIsDone (const Task *const task) noexcept
 Returns the done status of the task. More...
 
template<typename ConditionFn >
void TickMainQueue (ConditionFn &&condition) noexcept
 Runs tasks from the main queue as long as there are tasks available and condition returns true. More...
 
void TickMainQueue () noexcept
 Runs tasks from the main queue until it is empty. More...
 
void TaskSubmit (Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
 Submits the task to the specified queue. More...
 
void WaitOnTask (const Task *const task) noexcept
 Waits until the specified task is done executing. This function will block but do work while being blocked so there is no wasted time. More...
 
void TaskSubmitAndWait (Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
 Same as calling taskSubmit followed by waitOnTask. More...
 
void PauseProcessor () noexcept
 CPU pause instruction to indicate when you are in a spin wait loop. More...
 
void YieldTimeSlice () noexcept
 Asks the OS to yield this threads execution to another thread on the current cpu core. More...
 
template<typename F , typename S >
TaskParallelFor (const std::size_t start, const std::size_t count, S &&splitter, F &&fn, Task *parent=nullptr)
 Parallel for algorithm, splits the work up recursively splitting based on the splitter passed in. More...
 
template<typename Splitter , typename Reducer >
TaskParallelReduce (const std::size_t start, const std::size_t count, Splitter &&splitter, Reducer &&reduce, Task *parent=nullptr)
 
template<typename T , typename F , typename S >
TaskParallelFor (T *const data, const std::size_t count, S &&splitter, F &&fn, Task *parent=nullptr)
 Parallel for algorithm, splits the work up recursively splitting based on the splitter passed in. This version is a helper for array data. More...
 
template<typename... F>
TaskParallelInvoke (Task *const parent, F &&... fns)
 Invokes each passed in function object in parallel. More...
 

Variables

static constexpr std::size_t k_FalseSharingPadSize = std::hardware_destructive_interference_size
 
static constexpr std::size_t k_CachelineSize = 64u
 
static constexpr std::size_t k_ExpectedTaskSize = std::max(std::size_t(128u), k_CachelineSize)
 
static constexpr QueueType k_InvalidQueueType = QueueType(int(QueueType::WORKER) + 1)
 
static constexpr TaskHandle NullTaskHandle = std::numeric_limits<TaskHandle>::max()
 

Class Documentation

◆ Job::InitializationLock

struct Job::InitializationLock

Definition at line 192 of file job_system.cpp.

Class Members
mutex init_mutex
condition_variable init_cv
atomic_uint32_t num_workers_ready

◆ Job::JobSystemContext

struct Job::JobSystemContext

Definition at line 199 of file job_system.cpp.

Class Members
ThreadLocalState * workers
uint32_t num_workers
uint32_t num_owned_workers
atomic_uint32_t num_user_threads_setup
uint32_t num_tasks_per_worker
InitializationLock init_lock
const char * sys_arch_str
size_t system_alloc_size
size_t system_alloc_alignment
bool needs_delete
atomic_bool is_running
LockedQueue< TaskPtr > main_queue
mutex worker_sleep_mutex
condition_variable worker_sleep_cv
atomic_uint32_t num_available_jobs

◆ Job::JobSystemCreateOptions

struct Job::JobSystemCreateOptions

The runtime configuration for the Job System.

Definition at line 85 of file job_api.hpp.

Class Members
uint8_t num_user_threads The number of threads not owned by this system but wants access to the Job API (The thread must call Job::SetupUserThread).
uint8_t num_threads Use 0 to indicate using the number of cores available on the system.
uint16_t main_queue_size Number of tasks in the job system's QueueType::MAIN queue. (Must be power of two)
uint16_t normal_queue_size Number of tasks in each worker's QueueType::NORMAL queue. (Must be power of two)
uint16_t worker_queue_size Number of tasks in each worker's QueueType::WORKER queue. (Must be power of two)
uint64_t job_steal_rng_seed The RNG for work queue stealing will be seeded with this value.

◆ Job::TaskData

struct Job::TaskData

A buffer for user-data you can write to, maybe large enough to store task data inline.

If you store non trivial data remember to manually call it's destructor at the end of the task function.

If you call 'TaskEmplaceData' or 'TaskSetData' and need to update the data once more be sure to destruct the previous contents correctly if the data stored in the buffer is non trivial.

Definition at line 203 of file job_api.hpp.

Class Members
void * ptr The start of the buffer you may write to.
size_t size The size of the buffer.

◆ Job::TaskMemoryBlock

union Job::TaskMemoryBlock

Definition at line 167 of file job_system.cpp.

Class Members
TaskMemoryBlock * next
unsigned char storage[sizeof(Task)]

◆ Job::TaskPool

struct Job::TaskPool

Definition at line 174 of file job_system.cpp.

Class Members
TaskMemoryBlock * memory
TaskMemoryBlock * freelist

◆ Job::ThreadLocalState

struct Job::ThreadLocalState

Definition at line 180 of file job_system.cpp.

Class Members
SPMCDeque< TaskPtr > normal_queue
SPMCDeque< TaskPtr > worker_queue
TaskPool task_allocator
TaskHandle * allocated_tasks
TaskHandleType num_allocated_tasks
ThreadLocalState * last_stolen_worker
pcg_state_setseq_64 rng_state
thread thread_id

Typedef Documentation

◆ WorkerID

using Job::WorkerID = typedef std::uint16_t

The id type of each worker thread.

Definition at line 54 of file job_api.hpp.

◆ TaskFn

using Job::TaskFn = typedef void (*)(Task*)

The signature of the type of function for a single Task.

Definition at line 55 of file job_api.hpp.

◆ TaskHandle

using Job::TaskHandle = typedef std::uint16_t

Definition at line 82 of file job_system.cpp.

◆ TaskHandleType

Definition at line 83 of file job_system.cpp.

◆ AtomicTaskHandleType

using Job::AtomicTaskHandleType = typedef std::atomic<TaskHandle>

Definition at line 84 of file job_system.cpp.

◆ WorkerIDType

using Job::WorkerIDType = typedef WorkerID

Definition at line 85 of file job_system.cpp.

◆ AtomicInt32

using Job::AtomicInt32 = typedef std::atomic_int32_t

Definition at line 86 of file job_system.cpp.

◆ Byte

using Job::Byte = typedef unsigned char

Definition at line 87 of file job_system.cpp.

◆ AtomicTaskPtr

using Job::AtomicTaskPtr = typedef std::atomic<TaskPtr>

Definition at line 115 of file job_system.cpp.

Enumeration Type Documentation

◆ QueueType

enum class Job::QueueType : std::uint8_t
strong

Determines which threads the task will be allowed to run on.

Enumerator
NORMAL 

Tasks in this queue will run on either the main or worker threads.

MAIN 

Tasks in this queue will only be run by the main thread.

WORKER 

Tasks in this queue will never run on the main thread.

Definition at line 45 of file job_api.hpp.

46 {
47 NORMAL = 0,
48 MAIN = 1,
49 WORKER = 2,
50 };
@ MAIN
Tasks in this queue will only be run by the main thread.
@ NORMAL
Tasks in this queue will run on either the main or worker threads.
@ WORKER
Tasks in this queue will never run on the main thread.

◆ SPMCDequeStatus

enum class Job::SPMCDequeStatus
strong
Enumerator
SUCCESS 

Returned from Push, Pop and Steal.

FAILED_RACE 

Returned from Pop and Steal.

FAILED_SIZE 

Returned from Push, Pop and Steal.

Definition at line 231 of file job_queue.hpp.

232 {
233 SUCCESS,
236 };
@ FAILED_RACE
Returned from Pop and Steal.
@ FAILED_SIZE
Returned from Push, Pop and Steal.
@ SUCCESS
Returned from Push, Pop and Steal.

Function Documentation

◆ NumSystemThreads()

std::size_t Job::NumSystemThreads ( )
noexcept

Makes some system calls to grab the number threads / processors on the device. This function can be called by any thread concurrently.

Can be called even if the job system has not been initialized.

Returns
std::size_t The number threads / processors on the computer.

Definition at line 866 of file job_system.cpp.

867{
868#if IS_SINGLE_THREADED
869 return 1;
870#else
871 const auto n = std::thread::hardware_concurrency();
872 return n != 0 ? n : 1;
873#endif
874
875#if 0
876
877#if IS_WINDOWS
878 SYSTEM_INFO sysinfo;
879 GetSystemInfo(&sysinfo);
880 return sysinfo.dwNumberOfProcessors;
881#elif IS_POSIX
882 return sysconf(_SC_NPROCESSORS_ONLN) /* * 2*/;
883#elif 0 // FreeBSD, MacOS X, NetBSD, OpenBSD
884 nt mib[4];
885 int numCPU;
886 std::size_t len = sizeof(numCPU);
887
888 /* set the mib for hw.ncpu */
889 mib[0] = CTL_HW;
890 mib[1] = HW_AVAILCPU; // alternatively, try HW_NCPU;
891
892 /* get the number of CPUs from the system */
893 sysctl(mib, 2, &numCPU, &len, NULL, 0);
894
895 if (numCPU < 1)
896 {
897 mib[1] = HW_NCPU;
898 sysctl(mib, 2, &numCPU, &len, NULL, 0);
899 if (numCPU < 1)
900 numCPU = 1;
901 }
902
903 return numCPU;
904#elif 0 // HPUX
905 return mpctl(MPC_GETNUMSPUS, NULL, NULL);
906#elif 0 // IRIX
907 return sysconf(_SC_NPROC_ONLN);
908#elif 0 // Objective-C (Mac OS X >=10.5 or iOS)
909 NSUInteger a = [[NSProcessInfo processInfo] processorCount];
910 NSUInteger b = [[NSProcessInfo processInfo] activeProcessorCount];
911
912 return a;
913#endif
914
915#endif
916}

◆ Initialize()

Job::InitializationToken Job::Initialize ( const JobSystemMemoryRequirements memory_requirements = {},
void *const  memory = nullptr 
)
noexcept

Sets up the Job system and creates all the worker threads. The thread that calls 'Job::Initialize' is considered the main thread.

Parameters
memory_requirementsThe customization parameters to initialize the system with. To be gotten from Job::MemRequirementsForConfig.
memoryMust be memory_requirements.byte_size in size and with alignment memory_requirements.alignment. If nullptr then the system heap will be used.
Returns
The InitializationToken can be used by other subsystem to verify that the Job System has been initialized.

Definition at line 741 of file job_system.cpp.

742{
743 JobAssert(g_JobSystem == nullptr, "Already initialized.");
744
745 const bool needs_delete = memory == nullptr;
746
747 if (!memory)
748 {
749 memory = ::operator new[](memory_requirements.byte_size, std::align_val_t{memory_requirements.alignment});
750 }
751
752 JobAssert(memory != nullptr, "memory must be a valid pointer.");
753 JobAssert(IsPointerAligned(memory, memory_requirements.alignment), "memory must be a aligned to `memory_requirements.alignment`.");
754
755 const JobSystemCreateOptions& options = memory_requirements.options;
756 const std::uint64_t rng_seed = options.job_steal_rng_seed;
757 const WorkerID num_threads = config::WorkerCount(options);
758 const WorkerID owned_threads = num_threads - options.num_user_threads;
759 const std::uint16_t num_tasks_per_worker = config::NumTasksPerWorker(options);
760 const std::uint32_t total_num_tasks = config::TotalNumTasks(num_threads, num_tasks_per_worker);
761
762 void* alloc_ptr = memory;
763 JobSystemContext* job_system = LinearAlloc<JobSystemContext>(alloc_ptr, 1u).ptr;
764 Span<ThreadLocalState> all_workers = LinearAlloc<ThreadLocalState>(alloc_ptr, num_threads);
765 Span<TaskMemoryBlock> all_tasks = LinearAlloc<TaskMemoryBlock>(alloc_ptr, total_num_tasks);
766 Span<TaskPtr> main_tasks_ptrs = LinearAlloc<TaskPtr>(alloc_ptr, options.main_queue_size);
767 Span<AtomicTaskPtr> worker_task_ptrs = LinearAlloc<AtomicTaskPtr>(alloc_ptr, total_num_tasks);
768 Span<TaskHandle> all_task_handles = LinearAlloc<TaskHandle>(alloc_ptr, total_num_tasks);
769
770 job_system->main_queue.Initialize(SpanAlloc(&main_tasks_ptrs, options.main_queue_size), options.main_queue_size);
771 job_system->workers = all_workers.ptr;
772 job_system->num_workers = num_threads;
773 job_system->num_owned_workers = owned_threads;
774 job_system->num_user_threads_setup.store(0, std::memory_order_relaxed);
775 job_system->num_tasks_per_worker = num_tasks_per_worker;
776 job_system->sys_arch_str = "Unknown Arch";
777 job_system->num_available_jobs.store(0, std::memory_order_relaxed);
778 job_system->needs_delete = needs_delete;
779 job_system->system_alloc_size = memory_requirements.byte_size;
780 job_system->system_alloc_alignment = memory_requirements.alignment;
781 job_system->init_lock.num_workers_ready.store(1u, std::memory_order_relaxed); // Main thread already initialized.
782
783#if IS_WINDOWS
784 SYSTEM_INFO sysinfo;
785 GetSystemInfo(&sysinfo);
786
787 switch (sysinfo.wProcessorArchitecture)
788 {
789 case PROCESSOR_ARCHITECTURE_AMD64:
790 {
791 job_system->sys_arch_str = "x64 (Intel or AMD)";
792 break;
793 }
794 case PROCESSOR_ARCHITECTURE_ARM:
795 {
796 job_system->sys_arch_str = "ARM";
797 break;
798 }
799 case PROCESSOR_ARCHITECTURE_ARM64:
800 {
801 job_system->sys_arch_str = "ARM64";
802 break;
803 }
804 case PROCESSOR_ARCHITECTURE_IA64:
805 {
806 job_system->sys_arch_str = "Intel Itanium-Based";
807 break;
808 }
809 case PROCESSOR_ARCHITECTURE_INTEL:
810 {
811 job_system->sys_arch_str = "Intel x86";
812 break;
813 }
814 case PROCESSOR_ARCHITECTURE_UNKNOWN:
815 default:
816 {
817 job_system->sys_arch_str = "Unknown Arch";
818 break;
819 }
820 }
821#endif
822
823 ThreadLocalState* const main_thread_worker = job_system->workers;
824
825 for (std::uint64_t worker_index = 0; worker_index < num_threads; ++worker_index)
826 {
827 ThreadLocalState* const worker = SpanAlloc(&all_workers, 1u);
828
829 worker->normal_queue.Initialize(SpanAlloc(&worker_task_ptrs, options.normal_queue_size), options.normal_queue_size);
830 worker->worker_queue.Initialize(SpanAlloc(&worker_task_ptrs, options.worker_queue_size), options.worker_queue_size);
831 task_pool::Initialize(&worker->task_allocator, SpanAlloc(&all_tasks, num_tasks_per_worker), num_tasks_per_worker);
832 worker->allocated_tasks = SpanAlloc(&all_task_handles, num_tasks_per_worker);
833 worker->num_allocated_tasks = 0u;
834 pcg32_srandom_r(&worker->rng_state, worker_index + rng_seed, worker_index * 2u + 1u + rng_seed);
835 worker->last_stolen_worker = main_thread_worker;
836 }
837
838 g_JobSystem = job_system;
839 g_CurrentWorker = main_thread_worker;
840
841 std::atomic_thread_fence(std::memory_order_release);
842 for (std::uint64_t worker_index = 1; worker_index < owned_threads; ++worker_index)
843 {
844 worker::InitializeThread(job_system->workers + worker_index);
845 }
846
847 JobAssert(all_workers.num_elements == 0u, "All elements expected to be allocated out.");
848 JobAssert(all_tasks.num_elements == 0u, "All elements expected to be allocated out.");
849 JobAssert(main_tasks_ptrs.num_elements == 0u, "All elements expected to be allocated out.");
850 JobAssert(worker_task_ptrs.num_elements == 0u, "All elements expected to be allocated out.");
851 JobAssert(all_task_handles.num_elements == 0u, "All elements expected to be allocated out.");
852
853 return Job::InitializationToken{owned_threads};
854}
#define JobAssert(expr, msg)
Definition: job_assert.hpp:27
static thread_local Job::ThreadLocalState * g_CurrentWorker
Definition: job_system.cpp:227
static Job::JobSystemContext * g_JobSystem
Definition: job_system.cpp:226
LockedQueue< TaskPtr > main_queue
Definition: job_system.cpp:217
std::uint64_t job_steal_rng_seed
The RNG for work queue stealing will be seeded with this value.
Definition: job_api.hpp:92
InitializationLock init_lock
Definition: job_system.cpp:208
std::atomic_uint32_t num_user_threads_setup
Definition: job_system.cpp:206
InitializationToken Initialize(const JobSystemMemoryRequirements &memory_requirements={}, void *const memory=nullptr) noexcept
Sets up the Job system and creates all the worker threads. The thread that calls 'Job::Initialize' is...
Definition: job_system.cpp:741
std::uint16_t normal_queue_size
Number of tasks in each worker's QueueType::NORMAL queue. (Must be power of two)
Definition: job_api.hpp:90
ThreadLocalState * workers
Definition: job_system.cpp:203
std::uint32_t num_owned_workers
Definition: job_system.cpp:205
std::uint32_t num_workers
Definition: job_system.cpp:204
std::uint32_t num_tasks_per_worker
Definition: job_system.cpp:207
const char * sys_arch_str
Definition: job_system.cpp:209
std::atomic_uint32_t num_available_jobs
Definition: job_system.cpp:220
std::uint16_t main_queue_size
Number of tasks in the job system's QueueType::MAIN queue. (Must be power of two)
Definition: job_api.hpp:89
std::atomic_uint32_t num_workers_ready
Definition: job_system.cpp:196
std::size_t system_alloc_alignment
Definition: job_system.cpp:211
std::size_t system_alloc_size
Definition: job_system.cpp:210
std::uint16_t WorkerID
The id type of each worker thread.
Definition: job_api.hpp:54
std::uint8_t num_user_threads
The number of threads not owned by this system but wants access to the Job API (The thread must call ...
Definition: job_api.hpp:87
std::uint16_t worker_queue_size
Number of tasks in each worker's QueueType::WORKER queue. (Must be power of two)
Definition: job_api.hpp:91
The runtime configuration for the Job System.
Definition: job_api.hpp:86
std::size_t alignment
The base alignment the pointer should be.
Definition: job_api.hpp:103
const JobSystemCreateOptions options
The options used to create the memory requirements.
Definition: job_api.hpp:101
std::size_t byte_size
The number of bytes the job system needed.
Definition: job_api.hpp:102

References g_CurrentWorker, g_JobSystem, Job::JobSystemContext::init_lock, Initialize(), Job::JobSystemCreateOptions::job_steal_rng_seed, JobAssert, Job::JobSystemContext::main_queue, Job::JobSystemCreateOptions::main_queue_size, Job::JobSystemContext::needs_delete, Job::JobSystemCreateOptions::normal_queue_size, Job::JobSystemContext::num_available_jobs, Job::JobSystemContext::num_owned_workers, Job::JobSystemContext::num_tasks_per_worker, Job::JobSystemCreateOptions::num_user_threads, Job::JobSystemContext::num_user_threads_setup, Job::JobSystemContext::num_workers, Job::InitializationLock::num_workers_ready, Job::JobSystemContext::sys_arch_str, Job::JobSystemContext::system_alloc_alignment, Job::JobSystemContext::system_alloc_size, Job::JobSystemCreateOptions::worker_queue_size, and Job::JobSystemContext::workers.

Referenced by Initialize().

◆ SetupUserThread()

void Job::SetupUserThread ( )

Must be called in the callstack of the thread to be setup.

Sets up the state needed to be able to use the job system from this thread. The job system will not start up until all user threads have been setup.

Warning
Must never be called by either a thread setup by this system or the main thread.

Definition at line 856 of file job_system.cpp.

857{
858 Job::JobSystemContext* const job_system = g_JobSystem;
859 const std::uint32_t user_thread_id = job_system->num_owned_workers + job_system->num_user_threads_setup.fetch_add(1, std::memory_order_relaxed);
860
861 JobAssert(user_thread_id < job_system->num_workers, "Too many calls to `SetupUserThread`.");
862
863 worker::WorkerThreadSetup(job_system->workers + user_thread_id);
864}

References g_JobSystem, JobAssert, Job::JobSystemContext::num_owned_workers, Job::JobSystemContext::num_user_threads_setup, and Job::JobSystemContext::workers.

◆ ProcessorArchitectureName()

const char * Job::ProcessorArchitectureName ( )
noexcept

An implementation defined name for the CPU architecture of the device. This function can be called by any thread concurrently.

Returns
const char* Nul terminated name for the CPU architecture of the device.

Definition at line 923 of file job_system.cpp.

924{
926}

References g_JobSystem, and Job::JobSystemContext::sys_arch_str.

◆ NumWorkers()

std::uint16_t Job::NumWorkers ( )
noexcept

Returns the number of workers created by the system. This function can be called by any thread concurrently.

Returns
std::size_t The number of workers created by the system.

Definition at line 918 of file job_system.cpp.

919{
920 return std::uint16_t(g_JobSystem->num_workers);
921}

References g_JobSystem, and Job::JobSystemContext::num_workers.

Referenced by Job::Splitter::EvenSplit(), and TaskSubmit().

◆ CurrentWorker()

WorkerID Job::CurrentWorker ( )
noexcept

The current id of the current thread. This function can be called by any thread concurrently.

The main thread will always be 0.

Returns
WorkerID The current id of the current thread.

Definition at line 928 of file job_system.cpp.

929{
930 JobAssert(g_CurrentWorker != nullptr, "This thread was not created by the job system.");
932}

References g_CurrentWorker, g_JobSystem, JobAssert, and Job::JobSystemContext::workers.

Referenced by WaitOnTask().

◆ IsMainThread()

bool Job::IsMainThread ( )
noexcept

Allows for querying if we are currently executing in the main thread.

Returns
True if we are in the main thread, false otherwise.
Warning
Must only be called from a thread registered with the job system.

Definition at line 934 of file job_system.cpp.

935{
937}
bool IsMainThread() noexcept
Allows for querying if we are currently executing in the main thread.
Definition: job_system.cpp:934

References g_CurrentWorker, and IsMainThread().

Referenced by IsMainThread(), and Job::detail::mainQueueTryRunTask().

◆ Shutdown()

void Job::Shutdown ( )
noexcept

This will deallocate any memory used by the system and shutdown any threads created by 'bfJob::initialize'.

Warning
This function may only be called by the main thread.

Definition at line 939 of file job_system.cpp.

940{
941 JobAssert(g_JobSystem != nullptr, "Cannot shutdown when never initialized.");
942
943 static_assert(std::is_trivially_destructible_v<TaskMemoryBlock>, "TaskMemoryBlock's destructor not called.");
944 static_assert(std::is_trivially_destructible_v<TaskPtr>, "TaskPtr's destructor not called.");
945 static_assert(std::is_trivially_destructible_v<AtomicTaskPtr>, "AtomicTaskPtr's destructor not called.");
946 static_assert(std::is_trivially_destructible_v<TaskHandle>, "TaskHandle's destructor not called.");
947
948 JobSystemContext* const job_system = g_JobSystem;
949 const std::uint32_t num_workers = job_system->num_owned_workers;
950
951 // Incase all threads are not initialized by the time shutdown is called.
952 while (job_system->is_running.load(std::memory_order_relaxed) != true) {}
953
954 {
955 std::unique_lock<std::mutex> lock(job_system->worker_sleep_mutex);
956 job_system->is_running.store(false, std::memory_order_relaxed);
957 }
958
959 // Allow one last update loop to allow them to end.
960 system::WakeUpAllWorkers();
961
962 for (std::uint32_t i = 0; i < num_workers; ++i)
963 {
964 ThreadLocalState* const worker = job_system->workers + i;
965
966 if (i != 0)
967 {
968 worker::ShutdownThread(worker);
969 }
970
971 worker->~ThreadLocalState();
972 }
973
974 const bool needs_delete = job_system->needs_delete;
975
976 job_system->~JobSystemContext();
977 g_CurrentWorker = nullptr;
978 g_JobSystem = nullptr;
979
980 if (needs_delete)
981 {
982 ::operator delete[](job_system, job_system->system_alloc_size, std::align_val_t{job_system->system_alloc_alignment});
983 }
984}
std::atomic_bool is_running
Definition: job_system.cpp:213
std::mutex worker_sleep_mutex
Definition: job_system.cpp:218

References g_CurrentWorker, g_JobSystem, Job::JobSystemContext::is_running, JobAssert, Job::JobSystemContext::needs_delete, Job::JobSystemContext::num_owned_workers, Job::JobSystemContext::system_alloc_size, Job::JobSystemContext::worker_sleep_mutex, and Job::JobSystemContext::workers.

◆ TaskMake() [1/2]

Task * Job::TaskMake ( const TaskFn  function,
Task *const  parent = nullptr 
)
noexcept

Creates a new Task that should be later submitted by calling 'TaskSubmit'.

Parameters
functionThe function you want run by the scheduler.
parentAn optional parent Task used in conjunction with 'WaitOnTask' to force dependencies.
Returns
Task* The newly created task.

Definition at line 986 of file job_system.cpp.

987{
988 const WorkerID worker_id = worker::GetCurrentID();
989 ThreadLocalState* const worker = system::GetWorker(worker_id);
990 const std::uint32_t max_tasks_per_worker = g_JobSystem->num_tasks_per_worker;
991
992 if (worker->num_allocated_tasks == max_tasks_per_worker)
993 {
994 worker::GarbageCollectAllocatedTasks(worker);
995
996 if (worker->num_allocated_tasks == max_tasks_per_worker)
997 {
998 // While we cannot allocate do some work.
999 system::WakeUpAllWorkers();
1000 while (worker->num_allocated_tasks == max_tasks_per_worker)
1001 {
1002 worker::TryRunTask(worker);
1003 worker::GarbageCollectAllocatedTasks(worker);
1004 }
1005 }
1006 }
1007
1008 JobAssert(worker->num_allocated_tasks < max_tasks_per_worker, "Too many tasks allocated.");
1009
1010 Task* const task = task_pool::AllocateTask(&worker->task_allocator, worker_id, function, task::PointerToTaskPtr(parent));
1011 const TaskHandle task_hdl = task_pool::TaskToIndex(worker->task_allocator, task);
1012
1013 if (parent)
1014 {
1015 parent->num_unfinished_tasks.fetch_add(1u, std::memory_order_release);
1016 }
1017
1018 worker->allocated_tasks[worker->num_allocated_tasks++] = task_hdl;
1019
1020 return task;
1021}
std::uint16_t TaskHandle
Definition: job_system.cpp:82
AtomicInt32 num_unfinished_tasks
The number of children tasks.
Definition: job_system.cpp:151

References g_JobSystem, JobAssert, and Job::JobSystemContext::num_tasks_per_worker.

Referenced by ParallelFor(), ParallelInvoke(), ParallelReduce(), and TaskMake().

◆ TaskGetData()

TaskData Job::TaskGetData ( Task *const  task,
const std::size_t  alignment 
)
noexcept

Returns you the user-data buffer you way write to get data into your TaskFn.

Parameters
taskThe task whose user-data you want to grab.
alignmentThe required alignment the returned pointer must have.
Returns
TaskData The user-data buffer you may read and write.

Definition at line 1023 of file job_system.cpp.

1024{
1025 Byte* const user_storage_start = static_cast<Byte*>(AlignPointer(task->user_data + task->user_data_start, alignment));
1026 const Byte* const user_storage_end = std::end(task->user_data);
1027
1028 if (user_storage_start <= user_storage_end)
1029 {
1030 const std::size_t user_storage_size = user_storage_end - user_storage_start;
1031
1032 return {user_storage_start, user_storage_size};
1033 }
1034
1035 return {nullptr, 0u};
1036}
unsigned char Byte
Definition: job_system.cpp:87

Referenced by TaskDataAs(), and TaskEmplaceData().

◆ TaskAddContinuation()

void Job::TaskAddContinuation ( Task *const  self,
Task *const  continuation,
const QueueType  queue = QueueType::NORMAL 
)
noexcept

A 'continuation' is a task that will be added to a queue after the 'self' Task has finished running.

Continuations will be added to the same queue as the queue from the task that submits it.

Parameters
selfThe task to add the 'continuation' to.
continuationThe Task to run after 'self' has finished. This task must not have already been submitted to a queue.
queueThe queue you want the task to run on.

Definition at line 1038 of file job_system.cpp.

1039{
1040 JobAssert(self->q_type == k_InvalidQueueType, "The parent task should not have already been submitted to a queue.");
1041 JobAssert(continuation->q_type == k_InvalidQueueType, "A continuation must not have already been submitted to a queue or already added as a continuation.");
1042 JobAssert(continuation->next_continuation.isNull(), "A continuation must not have already been added to another task.");
1043
1044 const TaskPtr new_head = task::PointerToTaskPtr(continuation);
1045 continuation->q_type = queue;
1046 continuation->next_continuation = self->first_continuation.load(std::memory_order_relaxed);
1047
1048 while (!std::atomic_compare_exchange_strong(&self->first_continuation, &continuation->next_continuation, new_head))
1049 {
1050 }
1051}
static constexpr QueueType k_InvalidQueueType
Definition: job_system.cpp:78
QueueType q_type
The queue type this task has been submitted to, initialized to k_InvalidQueueType.
Definition: job_system.cpp:157
TaskPtr next_continuation
Next element in the linked list of continuations.
Definition: job_system.cpp:155
AtomicTaskPtr first_continuation
Head of linked list of tasks to be added on completion.
Definition: job_system.cpp:154
bool isNull() const noexcept
Definition: job_system.cpp:112

References JobAssert, and k_InvalidQueueType.

◆ TaskDataAs()

template<typename T >
T * Job::TaskDataAs ( Task *const  task)
noexcept

Grabs the user-data pointer as the T you specified. No safety is guaranteed, this is just a dumb cast.

Template Parameters
TThe type you want to receive the user-data buffer as.
Parameters
taskThe task whose data you are retrieving.
Returns
T& The user-data buffer casted as a T.

Definition at line 479 of file job_api.hpp.

480 {
481 const TaskData data = TaskGetData(task, alignof(T));
482
483 return data.size >= sizeof(T) ? static_cast<T*>(data.ptr) : nullptr;
484 }
TaskData TaskGetData(Task *const task, const std::size_t alignment) noexcept
Returns you the user-data buffer you way write to get data into your TaskFn.
std::size_t size
The size of the buffer.
Definition: job_api.hpp:206
void * ptr
The start of the buffer you may write to.
Definition: job_api.hpp:205
A buffer for user-data you can write to, maybe large enough to store task data inline.
Definition: job_api.hpp:204

References Job::TaskData::ptr, Job::TaskData::size, and TaskGetData().

◆ TaskEmplaceData()

template<typename T , typename... Args>
void Job::TaskEmplaceData ( Task *const  task,
Args &&...  args 
)

Calls the constructor of T on the user-data buffer.

Template Parameters
TThe type of T you want constructed in-place into the user-data buffer.
ArgsThe Argument types passed into the T constructor.
Parameters
taskThe task whose user-data buffer is affected.
argsThe arguments passed into the constructor of the user-data buffer casted as a T.

Definition at line 487 of file job_api.hpp.

488 {
489 const TaskData data = TaskGetData(task, alignof(T));
490
491 JobAssert(data.size >= sizeof(T), "Attempting to store an object too large to fit within a task's storage buffer.");
492
493 new (data.ptr) T(std::forward<Args>(args)...);
494 }

References JobAssert, Job::TaskData::ptr, Job::TaskData::size, and TaskGetData().

◆ TaskSetData()

template<typename T >
void Job::TaskSetData ( Task *const  task,
const T &  data 
)

Copies 'data' into the user-data buffer by calling the T copy constructor.

Template Parameters
TThe data type that will be emplaced into the user-data buffer.
Parameters
taskThe task whose user-data buffer is affected.
dataThe data copied into the user-data buffer.

Definition at line 497 of file job_api.hpp.

498 {
499 TaskEmplaceData<T>(task, data);
500 }

◆ TaskDestructData()

template<typename T >
void Job::TaskDestructData ( Task *const  task)

Helper for calling destructor on the task's user data.

Template Parameters
TThe expected type of the data, called ~T on the user data buffer.
Parameters
taskThe task whose user-data buffer is affected.

Definition at line 503 of file job_api.hpp.

504 {
505 TaskDataAs<T>(task)->~T();
506 }

◆ TaskMake() [2/2]

template<typename Closure >
Task * Job::TaskMake ( Closure &&  function,
Task *const  parent = nullptr 
)

Creates a new task making a copy of the closure.

Template Parameters
ClosureThe type of the callable.
Parameters
functionThe non pointer callable you want to store.
parentAn optional parent Task used in conjunction with 'WaitOnTask' to force dependencies.
Returns
Task* The newly created task.

Definition at line 509 of file job_api.hpp.

510 {
511 Task* const task = TaskMake(
512 +[](Task* const task) -> void {
513 Closure& function = *static_cast<Closure*>(detail::taskGetPrivateUserData(task, alignof(Closure)));
514 function(task);
515 function.~Closure();
516 },
517 parent);
518 void* const private_data = detail::taskReservePrivateUserData(task, sizeof(Closure), alignof(Closure));
519 new (private_data) Closure(std::forward<Closure>(function));
520
521 return task;
522 }
void * taskReservePrivateUserData(Task *const task, const std::size_t num_bytes, const std::size_t alignment) noexcept
void * taskGetPrivateUserData(Task *const task, const std::size_t alignment) noexcept
Task * TaskMake(Closure &&function, Task *const parent=nullptr)
Creates a new task making a copy of the closure.
Definition: job_api.hpp:509

References Job::detail::taskGetPrivateUserData(), TaskMake(), and Job::detail::taskReservePrivateUserData().

◆ TaskIncRef()

void Job::TaskIncRef ( Task *const  task)
noexcept

Increments the task's ref count preventing it from being garbage collected.

This function should be called before taskSubmit.

Parameters
taskThe task's who's ref count should be incremented.

Definition at line 1053 of file job_system.cpp.

1054{
1055 const auto old_ref_count = task->ref_count.fetch_add(1, std::memory_order_relaxed);
1056
1057 JobAssert(old_ref_count >= std::int16_t(1) || task->q_type == k_InvalidQueueType, "First call to taskIncRef should not happen after the task has been submitted.");
1058 (void)old_ref_count;
1059}

References JobAssert, and k_InvalidQueueType.

◆ TaskDecRef()

void Job::TaskDecRef ( Task *const  task)
noexcept

Decrements the task's ref count allow it to be garbage collected.

Parameters
taskThe task's who's ref count should be decremented.

Definition at line 1061 of file job_system.cpp.

1062{
1063 const auto old_ref_count = task->ref_count.fetch_sub(1, std::memory_order_relaxed);
1064
1065 JobAssert(old_ref_count >= 0, "taskDecRef: Called too many times.");
1066 (void)old_ref_count;
1067}

References JobAssert.

◆ TaskIsDone()

bool Job::TaskIsDone ( const Task *const  task)
noexcept

Returns the done status of the task.

This is only safe to call after submitting the task if you have an active reference to the task through a call to TaskIncRef.

Parameters
taskThe task to check whether or not it's done.
Returns
true - The task is done running. false - The task is still running.
See also
TaskIncRef

Definition at line 1069 of file job_system.cpp.

1070{
1071 return task->num_unfinished_tasks.load(std::memory_order_acquire) == -1;
1072}

Referenced by WaitOnTask().

◆ TickMainQueue() [1/2]

template<typename ConditionFn >
void Job::TickMainQueue ( ConditionFn &&  condition)
noexcept

Runs tasks from the main queue as long as there are tasks available and condition returns true.

This function is not required to be called since the main queue will be evaluated during other calls to this API but allows for an easy way to flush the main queue guaranteeing a minimum latency.

Template Parameters
ConditionFnThe type of the callable. Must be callable like: bool operator()(void);.
Parameters
conditionThe function object indicating if the main queue should continue being evaluated. Will be called after a task has been completed.
Warning
Must only be called from the main thread.

Definition at line 397 of file job_api.hpp.

398 {
399 do
400 {
402 {
403 break;
404 }
405 } while (condition());
406 }
bool mainQueueTryRunTask(void) noexcept

References Job::detail::mainQueueTryRunTask().

Referenced by TickMainQueue().

◆ TickMainQueue() [2/2]

void Job::TickMainQueue ( )
inlinenoexcept

Runs tasks from the main queue until it is empty.

This function is not required to be called since the main queue will be evaluated during other calls to this API but allows for an easy way to flush the main queue guaranteeing a minimum latency.

Warning
Must only be called from the main thread.

Definition at line 418 of file job_api.hpp.

419 {
420 TickMainQueue([]() { return true; });
421 }
void TickMainQueue() noexcept
Runs tasks from the main queue until it is empty.
Definition: job_api.hpp:418

References TickMainQueue().

◆ TaskSubmit()

void Job::TaskSubmit ( Task *const  self,
const QueueType  queue = QueueType::NORMAL 
)
noexcept

Submits the task to the specified queue.

The Task is not required to have been created on the same thread that submits.

You may now wait on this task using 'WaitOnTask'.

Parameters
selfThe task to submit.
queueThe queue you want the task to run on.

Definition at line 1074 of file job_system.cpp.

1075{
1076 JobAssert(self->q_type == k_InvalidQueueType, "A task cannot be submitted to a queue multiple times.");
1077
1078 const WorkerID num_workers = NumWorkers();
1079
1080 // If we only have one thread running using the worker queue is invalid.
1081 if (num_workers == 1u && queue == QueueType::WORKER)
1082 {
1083 queue = QueueType::NORMAL;
1084 }
1085
1086 ThreadLocalState* const worker = worker::GetCurrent();
1087 const TaskPtr task_ptr = task::PointerToTaskPtr(self);
1088
1089 self->q_type = queue;
1090
1091 switch (queue)
1092 {
1093 case QueueType::NORMAL:
1094 {
1095 task::SubmitQPushHelper(task_ptr, worker, &worker->normal_queue);
1096 break;
1097 }
1098 case QueueType::MAIN:
1099 {
1100 LockedQueue<TaskPtr>* const main_queue = &g_JobSystem->main_queue;
1101
1102 // NOTE(SR):
1103 // The only way `main_queue` will be emptied
1104 // is by the main thread, so there is a chance
1105 // that if it does not get flushed frequently
1106 // enough then we have a this thread spinning indefinitely.
1107 //
1108 while (!main_queue->Push(task_ptr))
1109 {
1110 // If we could not push to the queue then just do some work.
1111 worker::TryRunTask(worker);
1112 }
1113 break;
1114 }
1115 case QueueType::WORKER:
1116 {
1117 task::SubmitQPushHelper(task_ptr, worker, &worker->worker_queue);
1118 break;
1119 }
1120 default:
1121#if defined(__GNUC__) // GCC, Clang, ICC
1122 __builtin_unreachable();
1123#elif defined(_MSC_VER) // MSVC
1124 __assume(false);
1125#endif
1126 break;
1127 }
1128
1129 if (queue != QueueType::MAIN)
1130 {
1131 const std::int32_t num_pending_jobs = g_JobSystem->num_available_jobs.fetch_add(1, std::memory_order_relaxed);
1132
1133 if (num_pending_jobs >= num_workers)
1134 {
1135 system::WakeUpAllWorkers();
1136 }
1137 else
1138 {
1139 system::WakeUpOneWorker();
1140 }
1141 }
1142}
bool Push(const T &value)
Definition: job_queue.hpp:60
std::uint16_t NumWorkers() noexcept
Returns the number of workers created by the system. This function can be called by any thread concur...
Definition: job_system.cpp:918

References g_JobSystem, JobAssert, k_InvalidQueueType, MAIN, Job::JobSystemContext::main_queue, NORMAL, Job::JobSystemContext::num_available_jobs, NumWorkers(), Job::LockedQueue< T >::Push(), and WORKER.

Referenced by ParallelFor(), ParallelInvoke(), and TaskSubmitAndWait().

◆ WaitOnTask()

void Job::WaitOnTask ( const Task *const  task)
noexcept

Waits until the specified task is done executing. This function will block but do work while being blocked so there is no wasted time.

You may only call this function with a task created on the current 'Worker'.

It is a logic error to call this function on a task that has not been submitted (TaskSubmit).

Parameters
taskThe task to wait to finish executing.

Definition at line 1144 of file job_system.cpp.

1145{
1146 const WorkerID worker_id = CurrentWorker();
1147
1148 JobAssert(task->q_type != k_InvalidQueueType, "The Task must be submitted to a queue before you wait on it.");
1149 JobAssert(task->owning_worker == worker_id, "You may only call this function with a task created on the current 'Worker'.");
1150
1151 system::WakeUpAllWorkers();
1152
1153 ThreadLocalState* const worker = system::GetWorker(worker_id);
1154
1155 while (!TaskIsDone(task))
1156 {
1157 worker::TryRunTask(worker);
1158 }
1159}
WorkerID CurrentWorker() noexcept
The current id of the current thread. This function can be called by any thread concurrently.
Definition: job_system.cpp:928
bool TaskIsDone(const Task *const task) noexcept
Returns the done status of the task.

References CurrentWorker(), JobAssert, k_InvalidQueueType, and TaskIsDone().

Referenced by TaskSubmitAndWait().

◆ TaskSubmitAndWait()

void Job::TaskSubmitAndWait ( Task *const  self,
const QueueType  queue = QueueType::NORMAL 
)
noexcept

Same as calling taskSubmit followed by waitOnTask.

Parameters
selfThe task to submit and wait to finish executing.
queueThe queue you want the task to run on.

Definition at line 1161 of file job_system.cpp.

1162{
1163 TaskSubmit(self, queue);
1164 WaitOnTask(self);
1165}
void WaitOnTask(const Task *const task) noexcept
Waits until the specified task is done executing. This function will block but do work while being bl...
void TaskSubmit(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Submits the task to the specified queue.

References TaskSubmit(), and WaitOnTask().

Referenced by ParallelReduce().

◆ PauseProcessor()

void Job::PauseProcessor ( )
noexcept

CPU pause instruction to indicate when you are in a spin wait loop.

Definition at line 1198 of file job_system.cpp.

1199{
1200 NativePause();
1201}
#define NativePause

References NativePause.

Referenced by Job::MPMCQueue::Commit().

◆ YieldTimeSlice()

void Job::YieldTimeSlice ( )
noexcept

Asks the OS to yield this threads execution to another thread on the current cpu core.

Definition at line 1205 of file job_system.cpp.

1206{
1207 // Windows : SwitchToThread()
1208 // Linux : sched_yield()
1209 std::this_thread::yield();
1210}

◆ ParallelFor() [1/2]

template<typename F , typename S >
Task * Job::ParallelFor ( const std::size_t  start,
const std::size_t  count,
S &&  splitter,
F &&  fn,
Task parent = nullptr 
)

Parallel for algorithm, splits the work up recursively splitting based on the splitter passed in.

Assumes all callable objects passed in can be invoked on multiple threads at the same time.

Template Parameters
FType of function object passed in. Must be callable like: fn(Task* task, std::size_t index_range)
SCallable splitter, must be callable like: splitter(std::size_t count)
Parameters
startStart index for the range to be parallelized.
countstart + count defines the end range.
splitterCallable splitter, must be callable like: splitter(std::size_t count)
fnFunction object must be callable like: fn(Job::Task* const task, const std::size_t index)
parentParent task to add this task as a child of.
Returns
The new task holding the work of the parallel for.

Definition at line 608 of file job_api.hpp.

609 {
610 return TaskMake(
611 [=, splitter = std::move(splitter), fn = std::move(fn)](Task* const task) {
612 if (count > 1u && splitter(count))
613 {
614 const std::size_t left_count = count / 2;
615 const std::size_t right_count = count - left_count;
616 const QueueType parent_q_type = detail::taskQType(task);
617
618 TaskSubmit(ParallelFor(start, left_count, splitter, fn, task), parent_q_type);
619 TaskSubmit(ParallelFor(start + left_count, right_count, splitter, fn, task), parent_q_type);
620 }
621 else
622 {
623 for (std::size_t offset = 0u; offset < count; ++offset)
624 {
625 fn(task, start + offset);
626 }
627 }
628 },
629 parent);
630 }
QueueType taskQType(const Task *task) noexcept
QueueType
Determines which threads the task will be allowed to run on.
Definition: job_api.hpp:46
Task * ParallelFor(T *const data, const std::size_t count, S &&splitter, F &&fn, Task *parent=nullptr)
Parallel for algorithm, splits the work up recursively splitting based on the splitter passed in....
Definition: job_api.hpp:702

References ParallelFor(), TaskMake(), Job::detail::taskQType(), and TaskSubmit().

Referenced by ParallelFor(), and ParallelReduce().

◆ ParallelReduce()

template<typename Splitter , typename Reducer >
Task * Job::ParallelReduce ( const std::size_t  start,
const std::size_t  count,
Splitter &&  splitter,
Reducer &&  reduce,
Task parent = nullptr 
)

Definition at line 633 of file job_api.hpp.

634 {
635 return TaskMake(
636 [=, splitter = std::move(splitter), reduce = std::move(reduce)](Task* const task) {
637 // NOTE(SR):
638 // Could also have a stride that increases each step.
639 // This would be bad for Cuda GPU (Shared Memory Bank Conflict)
640 // But good on CPU with better locality.
641 // https://developer.download.nvidia.com/assets/cuda/files/reduction.pdf
642
643 const QueueType parent_q_type = detail::taskQType(task);
644 std::size_t count_left = count;
645 while (count_left > 1)
646 {
647 const std::size_t stride = count_left / 2;
648
649 const auto ReduceRange = [stride, &reduce](Task* const sub_task, const std::size_t index) {
650 reduce(sub_task, index, index + stride);
651 };
652
653 TaskSubmitAndWait(ParallelFor(start, stride, splitter, ReduceRange, nullptr), parent_q_type);
654
655 if ((count_left & 1) != 0)
656 {
657 reduce(task, start, start + count_left - 1);
658 }
659
660 count_left = stride;
661 }
662 },
663 parent);
664 }
void TaskSubmitAndWait(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Same as calling taskSubmit followed by waitOnTask.

References ParallelFor(), TaskMake(), Job::detail::taskQType(), and TaskSubmitAndWait().

◆ ParallelFor() [2/2]

template<typename T , typename F , typename S >
Task * Job::ParallelFor ( T *const  data,
const std::size_t  count,
S &&  splitter,
F &&  fn,
Task parent = nullptr 
)

Parallel for algorithm, splits the work up recursively splitting based on the splitter passed in. This version is a helper for array data.

Assumes all callable objects passed in can be invoked on multiple threads at the same time.

Template Parameters
TType of the array to process.
FType of function object passed in. Must be callable like: fn(Task* task, IndexRange index_range)
SCallable splitter, must be callable like: splitter(std::size_t count)
Parameters
dataThe start of the array to process.
countThe number of elements in the data array.
splitterCallable splitter, must be callable like: splitter(std::size_t count)
fnFunction object must be callable like: fn(job::Task* task, T* data_start, const std::size_t num_items)
parentParent task to add this task as a child of.
Returns
The new task holding the work of the parallel for.

Definition at line 702 of file job_api.hpp.

703 {
704 return ParallelFor(
705 std::size_t(0), count, std::move(splitter), [data, fn = std::move(fn)](Task* const task, const std::size_t index) {
706 fn(task, data + index);
707 },
708 parent);
709 }

References ParallelFor().

◆ ParallelInvoke()

template<typename... F>
Task * Job::ParallelInvoke ( Task *const  parent,
F &&...  fns 
)

Invokes each passed in function object in parallel.

Template Parameters
...FThe function objects types. Must be callable like: fn(Task* task)
Parameters
parentParent task to add this task as a child of.
...fnsFunction objects must be callable like: fn(Task* task)
Returns
The new task holding the work of the parallel invoke.

Definition at line 729 of file job_api.hpp.

730 {
731 return TaskMake(
732 [=](Task* const parent_task) mutable {
733 const QueueType parent_q_type = detail::taskQType(parent_task);
734 (TaskSubmit(TaskMake(std::move(fns), parent_task), parent_q_type), ...);
735 },
736 parent);
737 }

References TaskMake(), Job::detail::taskQType(), and TaskSubmit().

Variable Documentation

◆ k_FalseSharingPadSize

constexpr std::size_t Job::k_FalseSharingPadSize = std::hardware_destructive_interference_size
staticconstexpr

Definition at line 30 of file job_queue.hpp.

◆ k_CachelineSize

constexpr std::size_t Job::k_CachelineSize = 64u
staticconstexpr

Definition at line 74 of file job_system.cpp.

◆ k_ExpectedTaskSize

constexpr std::size_t Job::k_ExpectedTaskSize = std::max(std::size_t(128u), k_CachelineSize)
staticconstexpr

Definition at line 77 of file job_system.cpp.

◆ k_InvalidQueueType

constexpr QueueType Job::k_InvalidQueueType = QueueType(int(QueueType::WORKER) + 1)
staticconstexpr

Definition at line 78 of file job_system.cpp.

Referenced by TaskAddContinuation(), TaskIncRef(), TaskSubmit(), and WaitOnTask().

◆ NullTaskHandle

constexpr TaskHandle Job::NullTaskHandle = std::numeric_limits<TaskHandle>::max()
staticconstexpr

Definition at line 89 of file job_system.cpp.

Referenced by Job::TaskPtr::isNull(), and Job::TaskPtr::TaskPtr().