BluFedora Job System v1.0.0
This is a C++ job system library for use in game engines.
job_api.hpp
Go to the documentation of this file.
1/******************************************************************************/
18/******************************************************************************/
19#ifndef JOB_API_HPP
20#define JOB_API_HPP
21
22#include "job_assert.hpp" // JobAssert
23#include "job_init_token.hpp" // InitializationToken
24
25#include <cstdint> // sized integer types
26#include <new> // placement new
27#include <utility> // forward, move
28
29namespace Job
30{
31 // Fwd Declarations
32
37 struct Task;
38
39 // Enums
40
45 enum class QueueType : std::uint8_t
46 {
47 NORMAL = 0,
48 MAIN = 1,
49 WORKER = 2,
50 };
51
52 // Type Aliases
53
54 using WorkerID = std::uint16_t;
55 using TaskFn = void (*)(Task*);
56
57 // Private
58
59 namespace detail
60 {
61 QueueType taskQType(const Task* task) noexcept;
62 void* taskGetPrivateUserData(Task* const task, const std::size_t alignment) noexcept;
63 void* taskReservePrivateUserData(Task* const task, const std::size_t num_bytes, const std::size_t alignment) noexcept;
64 bool mainQueueTryRunTask(void) noexcept;
65 } // namespace detail
66
77 std::size_t NumSystemThreads() noexcept;
78
79 // Main System API
80
86 {
87 std::uint8_t num_user_threads = 0;
88 std::uint8_t num_threads = 0;
89 std::uint16_t main_queue_size = 256;
90 std::uint16_t normal_queue_size = 1024;
91 std::uint16_t worker_queue_size = 32;
92 std::uint64_t job_steal_rng_seed = 0u;
93 };
94
100 {
102 std::size_t byte_size;
103 std::size_t alignment;
104
106 };
107
124 InitializationToken Initialize(const JobSystemMemoryRequirements& memory_requirements = {}, void* const memory = nullptr) noexcept;
125
136 void SetupUserThread();
137
146 const char* ProcessorArchitectureName() noexcept;
147
156 std::uint16_t NumWorkers() noexcept;
157
168 WorkerID CurrentWorker() noexcept;
169
180 bool IsMainThread() noexcept;
181
190 void Shutdown() noexcept;
191
192 // Task API
193
203 struct TaskData
204 {
205 void* ptr;
206 std::size_t size;
207 };
208
222 Task* TaskMake(const TaskFn function, Task* const parent = nullptr) noexcept;
223
237 TaskData TaskGetData(Task* const task, const std::size_t alignment) noexcept;
238
255 void TaskAddContinuation(Task* const self, Task* const continuation, const QueueType queue = QueueType::NORMAL) noexcept;
256
271 template<typename T>
272 T* TaskDataAs(Task* const task) noexcept;
273
290 template<typename T, typename... Args>
291 void TaskEmplaceData(Task* const task, Args&&... args);
292
306 template<typename T>
307 void TaskSetData(Task* const task, const T& data);
308
319 template<typename T>
320 void TaskDestructData(Task* const task);
321
338 template<typename Closure>
339 Task* TaskMake(Closure&& function, Task* const parent = nullptr);
340
350 void TaskIncRef(Task* const task) noexcept;
351
359 void TaskDecRef(Task* const task) noexcept;
360
377 bool TaskIsDone(const Task* const task) noexcept;
378
396 template<typename ConditionFn>
397 void TickMainQueue(ConditionFn&& condition) noexcept
398 {
399 do
400 {
402 {
403 break;
404 }
405 } while (condition());
406 }
407
418 inline void TickMainQueue() noexcept
419 {
420 TickMainQueue([]() { return true; });
421 }
422
437 void TaskSubmit(Task* const self, const QueueType queue = QueueType::NORMAL) noexcept;
438
451 void WaitOnTask(const Task* const task) noexcept;
452
462 void TaskSubmitAndWait(Task* const self, const QueueType queue = QueueType::NORMAL) noexcept;
463
468 void PauseProcessor() noexcept;
469
474 void YieldTimeSlice() noexcept;
475
476 // Template Function Implementation //
477
478 template<typename T>
479 T* TaskDataAs(Task* const task) noexcept
480 {
481 const TaskData data = TaskGetData(task, alignof(T));
482
483 return data.size >= sizeof(T) ? static_cast<T*>(data.ptr) : nullptr;
484 }
485
486 template<typename T, typename... Args>
487 void TaskEmplaceData(Task* const task, Args&&... args)
488 {
489 const TaskData data = TaskGetData(task, alignof(T));
490
491 JobAssert(data.size >= sizeof(T), "Attempting to store an object too large to fit within a task's storage buffer.");
492
493 new (data.ptr) T(std::forward<Args>(args)...);
494 }
495
496 template<typename T>
497 void TaskSetData(Task* const task, const T& data)
498 {
499 TaskEmplaceData<T>(task, data);
500 }
501
502 template<typename T>
504 {
505 TaskDataAs<T>(task)->~T();
506 }
507
508 template<typename Closure>
509 Task* TaskMake(Closure&& function, Task* const parent)
510 {
511 Task* const task = TaskMake(
512 +[](Task* const task) -> void {
513 Closure& function = *static_cast<Closure*>(detail::taskGetPrivateUserData(task, alignof(Closure)));
514 function(task);
515 function.~Closure();
516 },
517 parent);
518 void* const private_data = detail::taskReservePrivateUserData(task, sizeof(Closure), alignof(Closure));
519 new (private_data) Closure(std::forward<Closure>(function));
520
521 return task;
522 }
523
524 // Parallel Algorithms API
525
526 struct Splitter
527 {
549 static Splitter EvenSplit(const std::size_t total_num_items, std::size_t num_groups_per_thread = 1u)
550 {
551 if (num_groups_per_thread < 1u)
552 {
553 num_groups_per_thread = 1u;
554 }
555
556 return Splitter{(total_num_items / num_groups_per_thread) / NumWorkers()};
557 }
558
559 static constexpr Splitter MaxItemsPerTask(const std::size_t max_items)
560 {
561 return Splitter{max_items};
562 }
563
564 template<typename T>
565 static constexpr Splitter MaxDataSize(const std::size_t max_data_size)
566 {
567 return Splitter{max_data_size / sizeof(T)};
568 }
569
570 std::size_t max_count = 0u;
571
572 constexpr bool operator()(const std::size_t count) const { return count > max_count; }
573 };
574
607 template<typename F, typename S>
608 Task* ParallelFor(const std::size_t start, const std::size_t count, S&& splitter, F&& fn, Task* parent = nullptr)
609 {
610 return TaskMake(
611 [=, splitter = std::move(splitter), fn = std::move(fn)](Task* const task) {
612 if (count > 1u && splitter(count))
613 {
614 const std::size_t left_count = count / 2;
615 const std::size_t right_count = count - left_count;
616 const QueueType parent_q_type = detail::taskQType(task);
617
618 TaskSubmit(ParallelFor(start, left_count, splitter, fn, task), parent_q_type);
619 TaskSubmit(ParallelFor(start + left_count, right_count, splitter, fn, task), parent_q_type);
620 }
621 else
622 {
623 for (std::size_t offset = 0u; offset < count; ++offset)
624 {
625 fn(task, start + offset);
626 }
627 }
628 },
629 parent);
630 }
631
632 template<typename Splitter, typename Reducer>
633 Task* ParallelReduce(const std::size_t start, const std::size_t count, Splitter&& splitter, Reducer&& reduce, Task* parent = nullptr)
634 {
635 return TaskMake(
636 [=, splitter = std::move(splitter), reduce = std::move(reduce)](Task* const task) {
637 // NOTE(SR):
638 // Could also have a stride that increases each step.
639 // This would be bad for Cuda GPU (Shared Memory Bank Conflict)
640 // But good on CPU with better locality.
641 // https://developer.download.nvidia.com/assets/cuda/files/reduction.pdf
642
643 const QueueType parent_q_type = detail::taskQType(task);
644 std::size_t count_left = count;
645 while (count_left > 1)
646 {
647 const std::size_t stride = count_left / 2;
648
649 const auto ReduceRange = [stride, &reduce](Task* const sub_task, const std::size_t index) {
650 reduce(sub_task, index, index + stride);
651 };
652
653 TaskSubmitAndWait(ParallelFor(start, stride, splitter, ReduceRange, nullptr), parent_q_type);
654
655 if ((count_left & 1) != 0)
656 {
657 reduce(task, start, start + count_left - 1);
658 }
659
660 count_left = stride;
661 }
662 },
663 parent);
664 }
665
701 template<typename T, typename F, typename S>
702 Task* ParallelFor(T* const data, const std::size_t count, S&& splitter, F&& fn, Task* parent = nullptr)
703 {
704 return ParallelFor(
705 std::size_t(0), count, std::move(splitter), [data, fn = std::move(fn)](Task* const task, const std::size_t index) {
706 fn(task, data + index);
707 },
708 parent);
709 }
710
728 template<typename... F>
729 Task* ParallelInvoke(Task* const parent, F&&... fns)
730 {
731 return TaskMake(
732 [=](Task* const parent_task) mutable {
733 const QueueType parent_q_type = detail::taskQType(parent_task);
734 (TaskSubmit(TaskMake(std::move(fns), parent_task), parent_q_type), ...);
735 },
736 parent);
737 }
738} // namespace Job
739
740#endif // JOB_API_HPP
741
742/******************************************************************************/
743/*
744 MIT License
745
746 Copyright (c) 2020-2024 Shareef Abdoul-Raheem
747
748 Permission is hereby granted, free of charge, to any person obtaining a copy
749 of this software and associated documentation files (the "Software"), to deal
750 in the Software without restriction, including without limitation the rights
751 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
752 copies of the Software, and to permit persons to whom the Software is
753 furnished to do so, subject to the following conditions:
754
755 The above copyright notice and this permission notice shall be included in all
756 copies or substantial portions of the Software.
757
758 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
759 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
760 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
761 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
762 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
763 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
764 SOFTWARE.
765*/
766/******************************************************************************/
Assertion macro for this library.
#define JobAssert(expr, msg)
Definition: job_assert.hpp:27
Token type used for other subsystems that rely on the job system to verify that the Job system has be...
void * taskReservePrivateUserData(Task *const task, const std::size_t num_bytes, const std::size_t alignment) noexcept
void * taskGetPrivateUserData(Task *const task, const std::size_t alignment) noexcept
QueueType taskQType(const Task *task) noexcept
bool mainQueueTryRunTask(void) noexcept
Definition: job_api.hpp:30
TaskData TaskGetData(Task *const task, const std::size_t alignment) noexcept
Returns you the user-data buffer you way write to get data into your TaskFn.
std::size_t size
The size of the buffer.
Definition: job_api.hpp:206
void * ptr
The start of the buffer you may write to.
Definition: job_api.hpp:205
void TaskSubmitAndWait(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Same as calling taskSubmit followed by waitOnTask.
void WaitOnTask(const Task *const task) noexcept
Waits until the specified task is done executing. This function will block but do work while being bl...
void(*)(Task *) TaskFn
The signature of the type of function for a single Task.
Definition: job_api.hpp:55
T * TaskDataAs(Task *const task) noexcept
Grabs the user-data pointer as the T you specified. No safety is guaranteed, this is just a dumb cast...
Definition: job_api.hpp:479
void TaskAddContinuation(Task *const self, Task *const continuation, const QueueType queue=QueueType::NORMAL) noexcept
A 'continuation' is a task that will be added to a queue after the 'self' Task has finished running.
InitializationToken Initialize(const JobSystemMemoryRequirements &memory_requirements={}, void *const memory=nullptr) noexcept
Sets up the Job system and creates all the worker threads. The thread that calls 'Job::Initialize' is...
Definition: job_system.cpp:741
Task * ParallelInvoke(Task *const parent, F &&... fns)
Invokes each passed in function object in parallel.
Definition: job_api.hpp:729
void YieldTimeSlice() noexcept
Asks the OS to yield this threads execution to another thread on the current cpu core.
std::uint16_t NumWorkers() noexcept
Returns the number of workers created by the system. This function can be called by any thread concur...
Definition: job_system.cpp:918
QueueType
Determines which threads the task will be allowed to run on.
Definition: job_api.hpp:46
@ MAIN
Tasks in this queue will only be run by the main thread.
@ NORMAL
Tasks in this queue will run on either the main or worker threads.
@ WORKER
Tasks in this queue will never run on the main thread.
Task * TaskMake(const TaskFn function, Task *const parent=nullptr) noexcept
Creates a new Task that should be later submitted by calling 'TaskSubmit'.
Definition: job_system.cpp:986
const char * ProcessorArchitectureName() noexcept
An implementation defined name for the CPU architecture of the device. This function can be called by...
Definition: job_system.cpp:923
void TaskSetData(Task *const task, const T &data)
Copies 'data' into the user-data buffer by calling the T copy constructor.
Definition: job_api.hpp:497
void SetupUserThread()
Must be called in the callstack of the thread to be setup.
Definition: job_system.cpp:856
void Shutdown() noexcept
This will deallocate any memory used by the system and shutdown any threads created by 'bfJob::initia...
Definition: job_system.cpp:939
Task * ParallelFor(const std::size_t start, const std::size_t count, S &&splitter, F &&fn, Task *parent=nullptr)
Parallel for algorithm, splits the work up recursively splitting based on the splitter passed in.
Definition: job_api.hpp:608
void TaskEmplaceData(Task *const task, Args &&... args)
Calls the constructor of T on the user-data buffer.
Definition: job_api.hpp:487
WorkerID CurrentWorker() noexcept
The current id of the current thread. This function can be called by any thread concurrently.
Definition: job_system.cpp:928
void TaskIncRef(Task *const task) noexcept
Increments the task's ref count preventing it from being garbage collected.
std::uint16_t WorkerID
The id type of each worker thread.
Definition: job_api.hpp:54
bool IsMainThread() noexcept
Allows for querying if we are currently executing in the main thread.
Definition: job_system.cpp:934
void PauseProcessor() noexcept
CPU pause instruction to indicate when you are in a spin wait loop.
void TaskDecRef(Task *const task) noexcept
Decrements the task's ref count allow it to be garbage collected.
void TickMainQueue(ConditionFn &&condition) noexcept
Runs tasks from the main queue as long as there are tasks available and condition returns true.
Definition: job_api.hpp:397
std::size_t NumSystemThreads() noexcept
Makes some system calls to grab the number threads / processors on the device. This function can be c...
Definition: job_system.cpp:866
void TaskSubmit(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Submits the task to the specified queue.
void TaskDestructData(Task *const task)
Helper for calling destructor on the task's user data.
Definition: job_api.hpp:503
Task * ParallelReduce(const std::size_t start, const std::size_t count, Splitter &&splitter, Reducer &&reduce, Task *parent=nullptr)
Definition: job_api.hpp:633
bool TaskIsDone(const Task *const task) noexcept
Returns the done status of the task.
The runtime configuration for the Job System.
Definition: job_api.hpp:86
A buffer for user-data you can write to, maybe large enough to store task data inline.
Definition: job_api.hpp:204
The memory requirements for a given configuration JobSystemCreateOptions.
Definition: job_api.hpp:100
JobSystemMemoryRequirements(const JobSystemCreateOptions &options={}) noexcept
Definition: job_system.cpp:720
std::size_t alignment
The base alignment the pointer should be.
Definition: job_api.hpp:103
const JobSystemCreateOptions options
The options used to create the memory requirements.
Definition: job_api.hpp:101
std::size_t byte_size
The number of bytes the job system needed.
Definition: job_api.hpp:102
std::size_t max_count
Definition: job_api.hpp:570
static Splitter EvenSplit(const std::size_t total_num_items, std::size_t num_groups_per_thread=1u)
Splits work evenly across the threads depending on the number of workers.
Definition: job_api.hpp:549
static constexpr Splitter MaxDataSize(const std::size_t max_data_size)
Definition: job_api.hpp:565
constexpr bool operator()(const std::size_t count) const
Definition: job_api.hpp:572
static constexpr Splitter MaxItemsPerTask(const std::size_t max_items)
Definition: job_api.hpp:559