BluFedora Job System v1.0.0
This is a C++ job system library for use in game engines.
job_api.hpp
Go to the documentation of this file.
1/******************************************************************************/
18/******************************************************************************/
19#ifndef JOB_API_HPP
20#define JOB_API_HPP
21
22#include "job_assert.hpp" // JobAssert
23
24#include <cstdint> // sized integer types
25#include <new> // placement new
26#include <utility> // forward, move
27
28namespace Job
29{
30 // Fwd Declarations
31
36 struct Task;
37
38 // Enums
39
44 enum class QueueType : std::uint8_t
45 {
46 NORMAL = 0,
47 MAIN = 1,
48 WORKER = 2,
49 };
50
51 // Type Aliases
52
53 using WorkerID = std::uint16_t;
54 using TaskFn = void (*)(Task*);
55
56 // Private
57
58 namespace detail
59 {
60 QueueType taskQType(const Task* task) noexcept;
61 void* taskGetPrivateUserData(Task* const task, const std::size_t alignment) noexcept;
62 void* taskReservePrivateUserData(Task* const task, const std::size_t num_bytes, const std::size_t alignment) noexcept;
63 bool mainQueueTryRunTask(void) noexcept;
64 } // namespace detail
65
76 std::size_t NumSystemThreads() noexcept;
77
78 // Main System API
79
85 {
86 std::uint8_t num_user_threads = 0;
87 std::uint8_t num_threads = 0;
88 std::uint16_t main_queue_size = 256;
89 std::uint16_t normal_queue_size = 1024;
90 std::uint16_t worker_queue_size = 32;
91 std::uint64_t job_steal_rng_seed = 0u;
92 };
93
99 {
101 std::size_t byte_size;
102 std::size_t alignment;
103
105 };
106
123 void Initialize(const JobSystemMemoryRequirements& memory_requirements = {}, void* const memory = nullptr) noexcept;
124
135 void SetupUserThread();
136
145 const char* ProcessorArchitectureName() noexcept;
146
155 std::uint16_t NumWorkers() noexcept;
156
167 WorkerID CurrentWorker() noexcept;
168
179 bool IsMainThread() noexcept;
180
189 void Shutdown() noexcept;
190
191 // Task API
192
202 struct TaskData
203 {
204 void* ptr;
205 std::size_t size;
206 };
207
221 Task* TaskMake(const TaskFn function, Task* const parent = nullptr) noexcept;
222
236 TaskData TaskGetData(Task* const task, const std::size_t alignment) noexcept;
237
254 void TaskAddContinuation(Task* const self, Task* const continuation, const QueueType queue = QueueType::NORMAL) noexcept;
255
270 template<typename T>
271 T* TaskDataAs(Task* const task) noexcept;
272
289 template<typename T, typename... Args>
290 void TaskEmplaceData(Task* const task, Args&&... args);
291
305 template<typename T>
306 void TaskSetData(Task* const task, const T& data);
307
318 template<typename T>
319 void TaskDestructData(Task* const task);
320
337 template<typename Closure>
338 Task* TaskMake(Closure&& function, Task* const parent = nullptr);
339
349 void TaskIncRef(Task* const task) noexcept;
350
358 void TaskDecRef(Task* const task) noexcept;
359
376 bool TaskIsDone(const Task* const task) noexcept;
377
395 template<typename ConditionFn>
396 void TickMainQueue(ConditionFn&& condition) noexcept
397 {
398 do
399 {
401 {
402 break;
403 }
404 } while (condition());
405 }
406
417 inline void TickMainQueue() noexcept
418 {
419 TickMainQueue([]() { return true; });
420 }
421
436 void TaskSubmit(Task* const self, const QueueType queue = QueueType::NORMAL) noexcept;
437
450 void WaitOnTask(const Task* const task) noexcept;
451
461 void TaskSubmitAndWait(Task* const self, const QueueType queue = QueueType::NORMAL) noexcept;
462
467 void PauseProcessor() noexcept;
468
473 void YieldTimeSlice() noexcept;
474
475 // Template Function Implementation //
476
477 template<typename T>
478 T* TaskDataAs(Task* const task) noexcept
479 {
480 const TaskData data = TaskGetData(task, alignof(T));
481
482 return data.size >= sizeof(T) ? static_cast<T*>(data.ptr) : nullptr;
483 }
484
485 template<typename T, typename... Args>
486 void TaskEmplaceData(Task* const task, Args&&... args)
487 {
488 const TaskData data = TaskGetData(task, alignof(T));
489
490 JobAssert(data.size >= sizeof(T), "Attempting to store an object too large to fit within a task's storage buffer.");
491
492 new (data.ptr) T(std::forward<Args>(args)...);
493 }
494
495 template<typename T>
496 void TaskSetData(Task* const task, const T& data)
497 {
498 TaskEmplaceData<T>(task, data);
499 }
500
501 template<typename T>
503 {
504 TaskDataAs<T>(task)->~T();
505 }
506
507 template<typename Closure>
508 Task* TaskMake(Closure&& function, Task* const parent)
509 {
510 Task* const task = TaskMake(
511 +[](Task* const task) -> void {
512 Closure& function = *static_cast<Closure*>(detail::taskGetPrivateUserData(task, alignof(Closure)));
513 function(task);
514 function.~Closure();
515 },
516 parent);
517 void* const private_data = detail::taskReservePrivateUserData(task, sizeof(Closure), alignof(Closure));
518 ::new (private_data) Closure(std::forward<Closure>(function));
519
520 return task;
521 }
522
523 // Parallel Algorithms API
524
525 struct Splitter
526 {
548 static Splitter EvenSplit(const std::size_t total_num_items, std::size_t num_groups_per_thread = 1u)
549 {
550 if (num_groups_per_thread < 1u)
551 {
552 num_groups_per_thread = 1u;
553 }
554
555 return Splitter{(total_num_items / num_groups_per_thread) / NumWorkers()};
556 }
557
558 static constexpr Splitter MaxItemsPerTask(const std::size_t max_items)
559 {
560 return Splitter{max_items};
561 }
562
563 template<typename T>
564 static constexpr Splitter MaxDataSize(const std::size_t max_data_size)
565 {
566 return Splitter{max_data_size / sizeof(T)};
567 }
568
569 std::size_t max_count = 0u;
570
571 constexpr bool operator()(const std::size_t count) const { return count > max_count; }
572 };
573
606 template<typename F, typename S>
607 Task* ParallelFor(const std::size_t start, const std::size_t count, S&& splitter, F&& fn, Task* parent = nullptr)
608 {
609 return TaskMake(
610 [=, splitter = std::move(splitter), fn = std::move(fn)](Task* const task) {
611 if (count > 1u && splitter(count))
612 {
613 const std::size_t left_count = count / 2;
614 const std::size_t right_count = count - left_count;
615 const QueueType parent_q_type = detail::taskQType(task);
616
617 TaskSubmit(ParallelFor(start, left_count, splitter, fn, task), parent_q_type);
618 TaskSubmit(ParallelFor(start + left_count, right_count, splitter, fn, task), parent_q_type);
619 }
620 else
621 {
622 for (std::size_t offset = 0u; offset < count; ++offset)
623 {
624 fn(task, start + offset);
625 }
626 }
627 },
628 parent);
629 }
630
631 template<typename Splitter, typename Reducer>
632 Task* ParallelReduce(const std::size_t start, const std::size_t count, Splitter&& splitter, Reducer&& reduce, Task* parent = nullptr)
633 {
634 return TaskMake(
635 [=, splitter = std::move(splitter), reduce = std::move(reduce)](Task* const task) {
636 // NOTE(SR):
637 // Could also have a stride that increases each step.
638 // This would be bad for Cuda GPU (Shared Memory Bank Conflict)
639 // But good on CPU with better locality.
640 // https://developer.download.nvidia.com/assets/cuda/files/reduction.pdf
641
642 const QueueType parent_q_type = detail::taskQType(task);
643 std::size_t count_left = count;
644 while (count_left > 1)
645 {
646 const std::size_t stride = count_left / 2;
647
648 const auto ReduceRange = [stride, &reduce](Task* const sub_task, const std::size_t index) {
649 reduce(sub_task, index, index + stride);
650 };
651
652 TaskSubmitAndWait(ParallelFor(start, stride, splitter, ReduceRange, nullptr), parent_q_type);
653
654 if ((count_left & 1) != 0)
655 {
656 reduce(task, start, start + count_left - 1);
657 }
658
659 count_left = stride;
660 }
661 },
662 parent);
663 }
664
700 template<typename T, typename F, typename S>
701 Task* ParallelFor(T* const data, const std::size_t count, S&& splitter, F&& fn, Task* parent = nullptr)
702 {
703 return ParallelFor(
704 std::size_t(0), count, std::move(splitter), [data, fn = std::move(fn)](Task* const task, const std::size_t index) {
705 fn(task, data + index);
706 },
707 parent);
708 }
709
727 template<typename... F>
728 Task* ParallelInvoke(Task* const parent, F&&... fns)
729 {
730 return TaskMake(
731 [=](Task* const parent_task) mutable {
732 const QueueType parent_q_type = detail::taskQType(parent_task);
733 (TaskSubmit(TaskMake(std::move(fns), parent_task), parent_q_type), ...);
734 },
735 parent);
736 }
737} // namespace Job
738
739#endif // JOB_API_HPP
740
741/******************************************************************************/
742/*
743 MIT License
744
745 Copyright (c) 2020-2025 Shareef Abdoul-Raheem
746
747 Permission is hereby granted, free of charge, to any person obtaining a copy
748 of this software and associated documentation files (the "Software"), to deal
749 in the Software without restriction, including without limitation the rights
750 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
751 copies of the Software, and to permit persons to whom the Software is
752 furnished to do so, subject to the following conditions:
753
754 The above copyright notice and this permission notice shall be included in all
755 copies or substantial portions of the Software.
756
757 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
758 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
759 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
760 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
761 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
762 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
763 SOFTWARE.
764*/
765/******************************************************************************/
Assertion macro for this library.
#define JobAssert(expr, msg)
Definition: job_assert.hpp:27
void * taskReservePrivateUserData(Task *const task, const std::size_t num_bytes, const std::size_t alignment) noexcept
void * taskGetPrivateUserData(Task *const task, const std::size_t alignment) noexcept
QueueType taskQType(const Task *task) noexcept
bool mainQueueTryRunTask(void) noexcept
Definition: job_api.hpp:29
TaskData TaskGetData(Task *const task, const std::size_t alignment) noexcept
Returns you the user-data buffer you way write to get data into your TaskFn.
std::size_t size
The size of the buffer.
Definition: job_api.hpp:205
void * ptr
The start of the buffer you may write to.
Definition: job_api.hpp:204
void TaskSubmitAndWait(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Same as calling taskSubmit followed by waitOnTask.
void WaitOnTask(const Task *const task) noexcept
Waits until the specified task is done executing. This function will block but do work while being bl...
void(*)(Task *) TaskFn
The signature of the type of function for a single Task.
Definition: job_api.hpp:54
T * TaskDataAs(Task *const task) noexcept
Grabs the user-data pointer as the T you specified. No safety is guaranteed, this is just a dumb cast...
Definition: job_api.hpp:478
void TaskAddContinuation(Task *const self, Task *const continuation, const QueueType queue=QueueType::NORMAL) noexcept
A 'continuation' is a task that will be added to a queue after the 'self' Task has finished running.
Task * ParallelInvoke(Task *const parent, F &&... fns)
Invokes each passed in function object in parallel.
Definition: job_api.hpp:728
void YieldTimeSlice() noexcept
Asks the OS to yield this threads execution to another thread on the current cpu core.
std::uint16_t NumWorkers() noexcept
Returns the number of workers created by the system. This function can be called by any thread concur...
Definition: job_system.cpp:916
QueueType
Determines which threads the task will be allowed to run on.
Definition: job_api.hpp:45
@ MAIN
Tasks in this queue will only be run by the main thread.
@ NORMAL
Tasks in this queue will run on either the main or worker threads.
@ WORKER
Tasks in this queue will never run on the main thread.
Task * TaskMake(const TaskFn function, Task *const parent=nullptr) noexcept
Creates a new Task that should be later submitted by calling 'TaskSubmit'.
Definition: job_system.cpp:984
const char * ProcessorArchitectureName() noexcept
An implementation defined name for the CPU architecture of the device. This function can be called by...
Definition: job_system.cpp:921
void TaskSetData(Task *const task, const T &data)
Copies 'data' into the user-data buffer by calling the T copy constructor.
Definition: job_api.hpp:496
void SetupUserThread()
Must be called in the callstack of the thread to be setup.
Definition: job_system.cpp:854
void Shutdown() noexcept
This will deallocate any memory used by the system and shutdown any threads created by 'bfJob::initia...
Definition: job_system.cpp:937
Task * ParallelFor(const std::size_t start, const std::size_t count, S &&splitter, F &&fn, Task *parent=nullptr)
Parallel for algorithm, splits the work up recursively splitting based on the splitter passed in.
Definition: job_api.hpp:607
void TaskEmplaceData(Task *const task, Args &&... args)
Calls the constructor of T on the user-data buffer.
Definition: job_api.hpp:486
WorkerID CurrentWorker() noexcept
The current id of the current thread. This function can be called by any thread concurrently.
Definition: job_system.cpp:926
void TaskIncRef(Task *const task) noexcept
Increments the task's ref count preventing it from being garbage collected.
std::uint16_t WorkerID
The id type of each worker thread.
Definition: job_api.hpp:53
bool IsMainThread() noexcept
Allows for querying if we are currently executing in the main thread.
Definition: job_system.cpp:932
void PauseProcessor() noexcept
CPU pause instruction to indicate when you are in a spin wait loop.
void TaskDecRef(Task *const task) noexcept
Decrements the task's ref count allow it to be garbage collected.
void TickMainQueue(ConditionFn &&condition) noexcept
Runs tasks from the main queue as long as there are tasks available and condition returns true.
Definition: job_api.hpp:396
std::size_t NumSystemThreads() noexcept
Makes some system calls to grab the number threads / processors on the device. This function can be c...
Definition: job_system.cpp:864
void Initialize(const JobSystemMemoryRequirements &memory_requirements={}, void *const memory=nullptr) noexcept
Sets up the Job system and creates all the worker threads. The thread that calls 'Job::Initialize' is...
Definition: job_system.cpp:741
void TaskSubmit(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Submits the task to the specified queue.
void TaskDestructData(Task *const task)
Helper for calling destructor on the task's user data.
Definition: job_api.hpp:502
Task * ParallelReduce(const std::size_t start, const std::size_t count, Splitter &&splitter, Reducer &&reduce, Task *parent=nullptr)
Definition: job_api.hpp:632
bool TaskIsDone(const Task *const task) noexcept
Returns the done status of the task.
The runtime configuration for the Job System.
Definition: job_api.hpp:85
A buffer for user-data you can write to, maybe large enough to store task data inline.
Definition: job_api.hpp:203
The memory requirements for a given configuration JobSystemCreateOptions.
Definition: job_api.hpp:99
JobSystemMemoryRequirements(const JobSystemCreateOptions &options={}) noexcept
Definition: job_system.cpp:720
std::size_t alignment
The base alignment the pointer should be.
Definition: job_api.hpp:102
std::size_t byte_size
The number of bytes the job system needed.
Definition: job_api.hpp:101
JobSystemCreateOptions options
The options used to create the memory requirements.
Definition: job_api.hpp:100
std::size_t max_count
Definition: job_api.hpp:569
static Splitter EvenSplit(const std::size_t total_num_items, std::size_t num_groups_per_thread=1u)
Splits work evenly across the threads depending on the number of workers.
Definition: job_api.hpp:548
static constexpr Splitter MaxDataSize(const std::size_t max_data_size)
Definition: job_api.hpp:564
constexpr bool operator()(const std::size_t count) const
Definition: job_api.hpp:571
static constexpr Splitter MaxItemsPerTask(const std::size_t max_items)
Definition: job_api.hpp:558