BluFedora Job System v1.0.0
This is a C++ job system library for use in game engines.
job_api.hpp
Go to the documentation of this file.
1/******************************************************************************/
18/******************************************************************************/
19#ifndef JOB_API_HPP
20#define JOB_API_HPP
21
22#include "job_assert.hpp" // JobAssert
23
24#include <cstdint> // sized integer types
25#include <new> // placement new
26#include <utility> // forward, move
27
28namespace Job
29{
30 // Fwd Declarations
31
36 struct Task;
37
38 // Enums
39
44 enum class QueueType : std::uint8_t
45 {
46 NORMAL = 0,
47 MAIN = 1,
48 WORKER = 2,
49 };
50
51 // Type Aliases
52
53 using WorkerID = std::uint16_t;
54 using TaskFn = void (*)(Task*);
55
56 // Private
57
58 namespace detail
59 {
60 QueueType taskQType(const Task* task) noexcept;
61 void* taskGetPrivateUserData(Task* const task, const std::size_t alignment) noexcept;
62 void* taskReservePrivateUserData(Task* const task, const std::size_t num_bytes, const std::size_t alignment) noexcept;
63 bool mainQueueTryRunTask(void) noexcept;
64 } // namespace detail
65
76 std::size_t NumSystemThreads() noexcept;
77
78 // Main System API
79
85 {
86 std::uint8_t num_user_threads = 0;
87 std::uint8_t num_threads = 0;
88 std::uint16_t main_queue_size = 256;
89 std::uint16_t normal_queue_size = 1024;
90 std::uint16_t worker_queue_size = 32;
91 std::uint64_t job_steal_rng_seed = 0u;
92 };
93
99 {
101 std::size_t byte_size;
102 std::size_t alignment;
103
105 };
106
120 void Initialize(const JobSystemMemoryRequirements& memory_requirements = {}, void* const memory = nullptr) noexcept;
121
132 void SetupUserThread();
133
142 const char* ProcessorArchitectureName() noexcept;
143
152 std::uint16_t NumWorkers() noexcept;
153
164 WorkerID CurrentWorker() noexcept;
165
176 bool IsMainThread() noexcept;
177
186 void Shutdown() noexcept;
187
188 // Task API
189
199 struct TaskData
200 {
201 void* ptr;
202 std::size_t size;
203 };
204
218 Task* TaskMake(const TaskFn function, Task* const parent = nullptr) noexcept;
219
233 TaskData TaskGetData(Task* const task, const std::size_t alignment) noexcept;
234
251 void TaskAddContinuation(Task* const self, Task* const continuation, const QueueType queue = QueueType::NORMAL) noexcept;
252
267 template<typename T>
268 T* TaskDataAs(Task* const task) noexcept;
269
286 template<typename T, typename... Args>
287 void TaskEmplaceData(Task* const task, Args&&... args);
288
302 template<typename T>
303 void TaskSetData(Task* const task, const T& data);
304
315 template<typename T>
316 void TaskDestructData(Task* const task);
317
334 template<typename Closure>
335 Task* TaskMake(Closure&& function, Task* const parent = nullptr);
336
346 void TaskIncRef(Task* const task) noexcept;
347
355 void TaskDecRef(Task* const task) noexcept;
356
373 bool TaskIsDone(const Task* const task) noexcept;
374
392 template<typename ConditionFn>
393 void TickMainQueue(ConditionFn&& condition) noexcept
394 {
395 do
396 {
398 {
399 break;
400 }
401 } while (condition());
402 }
403
414 inline void TickMainQueue() noexcept
415 {
416 TickMainQueue([]() { return true; });
417 }
418
433 void TaskSubmit(Task* const self, const QueueType queue = QueueType::NORMAL) noexcept;
434
447 void WaitOnTask(const Task* const task) noexcept;
448
458 void TaskSubmitAndWait(Task* const self, const QueueType queue = QueueType::NORMAL) noexcept;
459
464 void PauseProcessor() noexcept;
465
470 void YieldTimeSlice() noexcept;
471
472 // Template Function Implementation //
473
474 template<typename T>
475 T* TaskDataAs(Task* const task) noexcept
476 {
477 const TaskData data = TaskGetData(task, alignof(T));
478
479 return data.size >= sizeof(T) ? static_cast<T*>(data.ptr) : nullptr;
480 }
481
482 template<typename T, typename... Args>
483 void TaskEmplaceData(Task* const task, Args&&... args)
484 {
485 const TaskData data = TaskGetData(task, alignof(T));
486
487 JobAssert(data.size >= sizeof(T), "Attempting to store an object too large to fit within a task's storage buffer.");
488
489 new (data.ptr) T(std::forward<Args>(args)...);
490 }
491
492 template<typename T>
493 void TaskSetData(Task* const task, const T& data)
494 {
495 TaskEmplaceData<T>(task, data);
496 }
497
498 template<typename T>
500 {
501 TaskDataAs<T>(task)->~T();
502 }
503
504 template<typename Closure>
505 Task* TaskMake(Closure&& function, Task* const parent)
506 {
507 Task* const task = TaskMake(
508 +[](Task* const task) -> void {
509 Closure& function = *static_cast<Closure*>(detail::taskGetPrivateUserData(task, alignof(Closure)));
510 function(task);
511 function.~Closure();
512 },
513 parent);
514 void* const private_data = detail::taskReservePrivateUserData(task, sizeof(Closure), alignof(Closure));
515 ::new (private_data) Closure(std::forward<Closure>(function));
516
517 return task;
518 }
519
520 // Parallel Algorithms API
521
522 struct Splitter
523 {
545 static Splitter EvenSplit(const std::size_t total_num_items, std::size_t num_groups_per_thread = 1u)
546 {
547 if (num_groups_per_thread < 1u)
548 {
549 num_groups_per_thread = 1u;
550 }
551
552 return Splitter{(total_num_items / num_groups_per_thread) / NumWorkers()};
553 }
554
555 static constexpr Splitter MaxItemsPerTask(const std::size_t max_items)
556 {
557 return Splitter{max_items};
558 }
559
560 template<typename T>
561 static constexpr Splitter MaxDataSize(const std::size_t max_data_size)
562 {
563 return Splitter{max_data_size / sizeof(T)};
564 }
565
566 std::size_t max_count = 0u;
567
568 constexpr bool operator()(const std::size_t count) const { return count > max_count; }
569 };
570
603 template<typename F, typename S>
604 Task* ParallelFor(const std::size_t start, const std::size_t count, S&& splitter, F&& fn, Task* parent = nullptr)
605 {
606 return TaskMake(
607 [=, splitter = std::move(splitter), fn = std::move(fn)](Task* const task) {
608 if (count > 1u && splitter(count))
609 {
610 const std::size_t left_count = count / 2;
611 const std::size_t right_count = count - left_count;
612 const QueueType parent_q_type = detail::taskQType(task);
613
614 TaskSubmit(ParallelFor(start, left_count, splitter, fn, task), parent_q_type);
615 TaskSubmit(ParallelFor(start + left_count, right_count, splitter, fn, task), parent_q_type);
616 }
617 else
618 {
619 for (std::size_t offset = 0u; offset < count; ++offset)
620 {
621 fn(task, start + offset);
622 }
623 }
624 },
625 parent);
626 }
627
628 template<typename Splitter, typename Reducer>
629 Task* ParallelReduce(const std::size_t start, const std::size_t count, Splitter&& splitter, Reducer&& reduce, Task* parent = nullptr)
630 {
631 return TaskMake(
632 [=, splitter = std::move(splitter), reduce = std::move(reduce)](Task* const task) {
633 // NOTE(SR):
634 // Could also have a stride that increases each step.
635 // This would be bad for Cuda GPU (Shared Memory Bank Conflict)
636 // But good on CPU with better locality.
637 // https://developer.download.nvidia.com/assets/cuda/files/reduction.pdf
638
639 const QueueType parent_q_type = detail::taskQType(task);
640 std::size_t count_left = count;
641 while (count_left > 1)
642 {
643 const std::size_t stride = count_left / 2;
644
645 const auto ReduceRange = [stride, &reduce](Task* const sub_task, const std::size_t index) {
646 reduce(sub_task, index, index + stride);
647 };
648
649 TaskSubmitAndWait(ParallelFor(start, stride, splitter, ReduceRange, nullptr), parent_q_type);
650
651 if ((count_left & 1) != 0)
652 {
653 reduce(task, start, start + count_left - 1);
654 }
655
656 count_left = stride;
657 }
658 },
659 parent);
660 }
661
697 template<typename T, typename F, typename S>
698 Task* ParallelFor(T* const data, const std::size_t count, S&& splitter, F&& fn, Task* parent = nullptr)
699 {
700 return ParallelFor(
701 std::size_t(0), count, std::move(splitter), [data, fn = std::move(fn)](Task* const task, const std::size_t index) {
702 fn(task, data + index);
703 },
704 parent);
705 }
706
724 template<typename... F>
725 Task* ParallelInvoke(Task* const parent, F&&... fns)
726 {
727 return TaskMake(
728 [=](Task* const parent_task) mutable {
729 const QueueType parent_q_type = detail::taskQType(parent_task);
730 (TaskSubmit(TaskMake(std::move(fns), parent_task), parent_q_type), ...);
731 },
732 parent);
733 }
734} // namespace Job
735
736#endif // JOB_API_HPP
737
738/******************************************************************************/
739/*
740 MIT License
741
742 Copyright (c) 2020-2025 Shareef Abdoul-Raheem
743
744 Permission is hereby granted, free of charge, to any person obtaining a copy
745 of this software and associated documentation files (the "Software"), to deal
746 in the Software without restriction, including without limitation the rights
747 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
748 copies of the Software, and to permit persons to whom the Software is
749 furnished to do so, subject to the following conditions:
750
751 The above copyright notice and this permission notice shall be included in all
752 copies or substantial portions of the Software.
753
754 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
755 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
756 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
757 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
758 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
759 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
760 SOFTWARE.
761*/
762/******************************************************************************/
Assertion macro for this library.
#define JobAssert(expr, msg)
Definition: job_assert.hpp:27
void * taskReservePrivateUserData(Task *const task, const std::size_t num_bytes, const std::size_t alignment) noexcept
void * taskGetPrivateUserData(Task *const task, const std::size_t alignment) noexcept
QueueType taskQType(const Task *task) noexcept
bool mainQueueTryRunTask(void) noexcept
Definition: job_api.hpp:29
TaskData TaskGetData(Task *const task, const std::size_t alignment) noexcept
Returns you the user-data buffer you way write to get data into your TaskFn.
std::size_t size
The size of the buffer.
Definition: job_api.hpp:202
void * ptr
The start of the buffer you may write to.
Definition: job_api.hpp:201
void TaskSubmitAndWait(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Same as calling taskSubmit followed by waitOnTask.
void WaitOnTask(const Task *const task) noexcept
Waits until the specified task is done executing. This function will block but do work while being bl...
void(*)(Task *) TaskFn
The signature of the type of function for a single Task.
Definition: job_api.hpp:54
T * TaskDataAs(Task *const task) noexcept
Grabs the user-data pointer as the T you specified. No safety is guaranteed, this is just a dumb cast...
Definition: job_api.hpp:475
void TaskAddContinuation(Task *const self, Task *const continuation, const QueueType queue=QueueType::NORMAL) noexcept
A 'continuation' is a task that will be added to a queue after the 'self' Task has finished running.
Task * ParallelInvoke(Task *const parent, F &&... fns)
Invokes each passed in function object in parallel.
Definition: job_api.hpp:725
void YieldTimeSlice() noexcept
Asks the OS to yield this threads execution to another thread on the current cpu core.
std::uint16_t NumWorkers() noexcept
Returns the number of workers created by the system. This function can be called by any thread concur...
Definition: job_system.cpp:916
QueueType
Determines which threads the task will be allowed to run on.
Definition: job_api.hpp:45
@ MAIN
Tasks in this queue will only be run by the main thread.
@ NORMAL
Tasks in this queue will run on either the main or worker threads.
@ WORKER
Tasks in this queue will never run on the main thread.
Task * TaskMake(const TaskFn function, Task *const parent=nullptr) noexcept
Creates a new Task that should be later submitted by calling 'TaskSubmit'.
Definition: job_system.cpp:984
const char * ProcessorArchitectureName() noexcept
An implementation defined name for the CPU architecture of the device. This function can be called by...
Definition: job_system.cpp:921
void TaskSetData(Task *const task, const T &data)
Copies 'data' into the user-data buffer by calling the T copy constructor.
Definition: job_api.hpp:493
void SetupUserThread()
Must be called in the callstack of the thread to be setup.
Definition: job_system.cpp:854
void Shutdown() noexcept
This will deallocate any memory used by the system and shutdown any threads created by 'bfJob::initia...
Definition: job_system.cpp:937
Task * ParallelFor(const std::size_t start, const std::size_t count, S &&splitter, F &&fn, Task *parent=nullptr)
Parallel for algorithm, splits the work up recursively splitting based on the splitter passed in.
Definition: job_api.hpp:604
void TaskEmplaceData(Task *const task, Args &&... args)
Calls the constructor of T on the user-data buffer.
Definition: job_api.hpp:483
WorkerID CurrentWorker() noexcept
The current id of the current thread. This function can be called by any thread concurrently.
Definition: job_system.cpp:926
void TaskIncRef(Task *const task) noexcept
Increments the task's ref count preventing it from being garbage collected.
std::uint16_t WorkerID
The id type of each worker thread.
Definition: job_api.hpp:53
bool IsMainThread() noexcept
Allows for querying if we are currently executing in the main thread.
Definition: job_system.cpp:932
void PauseProcessor() noexcept
CPU pause instruction to indicate when you are in a spin wait loop.
void TaskDecRef(Task *const task) noexcept
Decrements the task's ref count allow it to be garbage collected.
void TickMainQueue(ConditionFn &&condition) noexcept
Runs tasks from the main queue as long as there are tasks available and condition returns true.
Definition: job_api.hpp:393
std::size_t NumSystemThreads() noexcept
Makes some system calls to grab the number threads / processors on the device. This function can be c...
Definition: job_system.cpp:864
void Initialize(const JobSystemMemoryRequirements &memory_requirements={}, void *const memory=nullptr) noexcept
Sets up the Job system and creates all the worker threads. The thread that calls 'Job::Initialize' is...
Definition: job_system.cpp:741
void TaskSubmit(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Submits the task to the specified queue.
void TaskDestructData(Task *const task)
Helper for calling destructor on the task's user data.
Definition: job_api.hpp:499
Task * ParallelReduce(const std::size_t start, const std::size_t count, Splitter &&splitter, Reducer &&reduce, Task *parent=nullptr)
Definition: job_api.hpp:629
bool TaskIsDone(const Task *const task) noexcept
Returns the done status of the task.
The runtime configuration for the Job System.
Definition: job_api.hpp:85
A buffer for user-data you can write to, maybe large enough to store task data inline.
Definition: job_api.hpp:200
The memory requirements for a given configuration JobSystemCreateOptions.
Definition: job_api.hpp:99
JobSystemMemoryRequirements(const JobSystemCreateOptions &options={}) noexcept
Definition: job_system.cpp:720
std::size_t alignment
The base alignment the pointer should be.
Definition: job_api.hpp:102
std::size_t byte_size
The number of bytes the job system needed.
Definition: job_api.hpp:101
JobSystemCreateOptions options
The options used to create the memory requirements.
Definition: job_api.hpp:100
std::size_t max_count
Definition: job_api.hpp:566
static Splitter EvenSplit(const std::size_t total_num_items, std::size_t num_groups_per_thread=1u)
Splits work evenly across the threads depending on the number of workers.
Definition: job_api.hpp:545
static constexpr Splitter MaxDataSize(const std::size_t max_data_size)
Definition: job_api.hpp:561
constexpr bool operator()(const std::size_t count) const
Definition: job_api.hpp:568
static constexpr Splitter MaxItemsPerTask(const std::size_t max_items)
Definition: job_api.hpp:555