BluFedora Job System v1.0.0
This is a C++ job system library for use in game engines.
job_system.cpp
Go to the documentation of this file.
1/******************************************************************************/
19/******************************************************************************/
21
22#include "concurrent/job_assert.hpp" //
24
25#include "pcg_basic.h" /* pcg_state_setseq_64, pcg32_srandom_r, pcg32_boundedrand_r */
26
27#include <algorithm> /* partition, for_each, distance */
28#include <cstdio> /* fprintf, stderr */
29#include <cstdlib> /* abort */
30#include <limits> /* numeric_limits */
31#include <new> /* hardware_constructive_interference_size, hardware_destructive_interference_size */
32#include <thread> /* thread */
33
34#if _WIN32
35#define IS_WINDOWS 1
36#define IS_POSIX 0
37#define IS_SINGLE_THREADED 0
38#elif __APPLE__
39#define IS_WINDOWS 0
40#define IS_POSIX 1
41#define IS_SINGLE_THREADED 0
42#elif (__ANDROID__ || __linux || __unix || __posix)
43#define IS_WINDOWS 0
44#define IS_POSIX 1
45#define IS_SINGLE_THREADED 0
46#elif __EMSCRIPTEN__
47#define IS_WINDOWS 0
48#define IS_POSIX 0
49#define IS_SINGLE_THREADED 1
50#endif
51
52#if IS_WINDOWS
53
54#define WIN32_LEAN_AND_MEAN
55#define NOMINMAX
56#define VC_EXTRALEAN
57#define WINDOWS_EXTRA_LEAN
58
59#include <Windows.h> /* SYSTEM_INFO, GetSystemInfo */
60#elif IS_POSIX
61#include <unistd.h> // also macOS 10.5+
62#else // macOS <= 10.4
63// #include <sys/param.h>
64// #include <sys/sysctl.h>
65#endif
66
67namespace Job
68{
69 // Constants
70
71#ifdef __cpp_lib_hardware_interference_size
72 static constexpr std::size_t k_CachelineSize = std::max(std::hardware_constructive_interference_size, std::hardware_destructive_interference_size);
73#else
74 static constexpr std::size_t k_CachelineSize = 64u;
75#endif
76
77 static constexpr std::size_t k_ExpectedTaskSize = std::max(std::size_t(128u), k_CachelineSize);
79
80 // Type Aliases
81
82 using TaskHandle = std::uint16_t;
84 using AtomicTaskHandleType = std::atomic<TaskHandle>;
86 using AtomicInt32 = std::atomic_int32_t;
87 using Byte = unsigned char;
88
89 static constexpr TaskHandle NullTaskHandle = std::numeric_limits<TaskHandle>::max();
90
91 // Struct Definitions
92
93 struct TaskPtr
94 {
97
98 TaskPtr() noexcept = default;
99
100 TaskPtr(WorkerID worker_id, TaskHandle task_idx) noexcept :
102 task_index{task_idx}
103 {
104 }
105
106 TaskPtr(std::nullptr_t) noexcept :
109 {
110 }
111
112 bool isNull() const noexcept { return task_index == NullTaskHandle; }
113 };
114
115 using AtomicTaskPtr = std::atomic<TaskPtr>;
116
117 static_assert(sizeof(TaskPtr) == sizeof(std::uint16_t) * 2u, "Expected to be the size of two uint16's.");
118 static_assert(sizeof(AtomicTaskPtr) == sizeof(TaskPtr) && AtomicTaskPtr::is_always_lock_free, "Expected to be lock-free so no extra data members should have been added.");
119
120 // NOTE(SR):
121 // So that 32bit and 64bit build have the same `Task` layout.
122 // Allows for the the user data Task::padding to always be 32byte aligned.
124 {
126 std::uint64_t pad;
127
128 TaskFnStorage(const TaskFn fn) noexcept :
129 fn{fn}
130 {
131 }
132 };
133 static_assert(sizeof(TaskFnStorage) == sizeof(std::uint64_t) && alignof(TaskFnStorage) == alignof(std::uint64_t), "Expected to always be 8 bytes.");
134
135 struct alignas(k_CachelineSize) Task
136 {
137 static constexpr std::size_t k_SizeOfMembers =
138 sizeof(TaskFnStorage) +
139 sizeof(AtomicInt32) +
140 sizeof(AtomicInt32) +
141 sizeof(std::uint8_t) +
142 sizeof(QueueType) +
143 sizeof(TaskPtr) +
144 sizeof(AtomicTaskPtr) +
145 sizeof(TaskPtr) +
146 sizeof(WorkerID);
147
148 static constexpr std::size_t k_TaskPaddingDataSize = k_ExpectedTaskSize - k_SizeOfMembers;
149
158 std::uint8_t user_data_start;
159 Byte user_data[k_TaskPaddingDataSize];
160
161 Task(WorkerID worker, TaskFn fn, TaskPtr parent) noexcept;
162 };
163
164 static_assert(sizeof(Task) == k_ExpectedTaskSize, "The task struct is expected to be this size.");
165 static_assert(std::is_trivially_destructible_v<Task>, "Task must be trivially destructible.");
166
167 union alignas(Task) TaskMemoryBlock
168 {
170 unsigned char storage[sizeof(Task)];
171 };
172 static_assert(sizeof(TaskMemoryBlock) == sizeof(Task) && alignof(TaskMemoryBlock) == alignof(Task), "TaskMemoryBlock should have no overhead.");
173
174 struct TaskPool
175 {
178 };
179
181 {
188 pcg_state_setseq_64 rng_state;
189 std::thread thread_id;
190 };
191
193 {
194 std::mutex init_mutex = {};
195 std::condition_variable init_cv = {};
196 std::atomic_uint32_t num_workers_ready = {};
197 };
198
200 {
201 // State that wont be changing during the system's runtime.
202
204 std::uint32_t num_workers;
205 std::uint32_t num_owned_workers;
206 std::atomic_uint32_t num_user_threads_setup;
207 std::uint32_t num_tasks_per_worker;
209 const char* sys_arch_str;
210 std::size_t system_alloc_size;
213 std::atomic_bool is_running;
214
215 // Shared Mutable State
216
219 std::condition_variable worker_sleep_cv;
220 std::atomic_uint32_t num_available_jobs;
221 };
222} // namespace Job
223
224// System Globals
225
227static thread_local Job::ThreadLocalState* g_CurrentWorker = nullptr;
228
229// Internal API
230
231#if JOB_SYS_ASSERTIONS
232
233void Job::detail::assertHandler(const bool condition, const char* const filename, const int line_number, const char* const msg)
234{
235 if (!condition)
236 {
237 std::fprintf(stderr, "JobSystem [%s:%i] Assertion '%s' Failed.\n", filename, line_number, msg);
238 std::abort();
239 }
240}
241
242#endif
243
244namespace
245{
246 using namespace Job;
247
248 namespace system
249 {
250 static void WakeUpAllWorkers() noexcept
251 {
252 g_JobSystem->worker_sleep_cv.notify_all();
253 }
254
255 static void WakeUpOneWorker() noexcept
256 {
257 g_JobSystem->worker_sleep_cv.notify_one();
258 }
259
260 static void Sleep() noexcept
261 {
262 Job::JobSystemContext* const job_system = g_JobSystem;
263
264 if (job_system->is_running.load(std::memory_order_relaxed))
265 {
267
268 if (job_system->num_available_jobs.load(std::memory_order_relaxed) == 0u)
269 {
270 std::unique_lock<std::mutex> lock(job_system->worker_sleep_mutex);
271 job_system->worker_sleep_cv.wait(lock, [job_system]() {
272 // NOTE(SR):
273 // Because the stl wants 'false' to mean continue waiting the logic is a bit confusing :/
274 //
275 // Returns false if the waiting should be continued, aka num_queued_jobs == 0u (also return true if not running).
276 //
277 // Wait If: running AND num_available_jobs == 0.
278 // Do Not Wait If: not running OR num_available_jobs != 0.
279 //
280 return !job_system->is_running || job_system->num_available_jobs.load(std::memory_order_relaxed) != 0; });
281 }
282 }
283 }
284
285 static ThreadLocalState* GetWorker(const WorkerID worker_id) noexcept
286 {
287 JobAssert(worker_id < NumWorkers(), "This thread was not created by the job system.");
288 return g_JobSystem->workers + worker_id;
289 }
290
291 } // namespace system
292
293 namespace task_pool
294 {
295 static void Initialize(Job::TaskPool* const pool, Job::TaskMemoryBlock* const memory, const Job::TaskHandleType capacity) noexcept
296 {
297 const Job::TaskHandleType capacity_minus_one = capacity - 1;
298
299 for (std::size_t i = 0u; i < capacity_minus_one; ++i)
300 {
301 memory[i].next = &memory[i + 1u];
302 }
303 memory[capacity_minus_one].next = nullptr;
304
305 pool->memory = memory;
306 pool->freelist = &memory[0];
307 }
308
309 static TaskHandle TaskToIndex(const Job::TaskPool& pool, const Task* const task) noexcept
310 {
311 const TaskMemoryBlock* const block = reinterpret_cast<const TaskMemoryBlock*>(task);
312
313 return TaskHandle(block - pool.memory);
314 }
315
316 static Task* TaskFromIndex(const Job::TaskPool& pool, const std::size_t idx) noexcept
317 {
318 return reinterpret_cast<Task*>(&pool.memory[idx].storage);
319 }
320
321 static Task* AllocateTask(Job::TaskPool* const pool, WorkerID worker, TaskFn fn, TaskPtr parent) noexcept
322 {
323 TaskMemoryBlock* const result = std::exchange(pool->freelist, pool->freelist->next);
324
325 JobAssert(result != nullptr, "Allocation failure.");
326
327 return new (result) Task(worker, fn, parent);
328 }
329
330 static void DeallocateTask(Job::TaskPool* const pool, Task* const task) noexcept
331 {
332 task->~Task();
333
334 TaskMemoryBlock* const block = new (task) TaskMemoryBlock();
335
336 block->next = std::exchange(pool->freelist, block);
337 }
338 } // namespace task_pool
339
340 namespace task
341 {
342 static Task* TaskPtrToPointer(const TaskPtr ptr) noexcept
343 {
344 if (!ptr.isNull())
345 {
346 ThreadLocalState* const worker = system::GetWorker(ptr.worker_id);
347 Task* const result = task_pool::TaskFromIndex(worker->task_allocator, ptr.task_index);
348
349 JobAssert(ptr.worker_id == result->owning_worker, "Corrupted worker ID.");
350
351 return result;
352 }
353
354 return nullptr;
355 }
356
357 static void TaskOnFinish(Task* const self) noexcept
358 {
359 const std::int32_t num_jobs_left = self->num_unfinished_tasks.fetch_sub(1, std::memory_order_relaxed) - 1;
360
361 if (num_jobs_left == 0)
362 {
363 Task* const parent_task = task::TaskPtrToPointer(self->parent);
364
365 if (parent_task)
366 {
367 TaskOnFinish(parent_task);
368 }
369
370 std::atomic_signal_fence(std::memory_order_release);
371
372 self->num_unfinished_tasks.fetch_sub(1, std::memory_order_relaxed);
373
374 TaskPtr continuation_ptr = self->first_continuation.load(std::memory_order_relaxed);
375
376 while (!continuation_ptr.isNull())
377 {
378 Task* const continuation = task::TaskPtrToPointer(continuation_ptr);
379 const TaskPtr next_task = continuation->next_continuation;
380 const QueueType q_type = std::exchange(continuation->q_type, k_InvalidQueueType);
381
382 TaskSubmit(continuation, q_type);
383
384 continuation_ptr = next_task;
385 }
386
387 self->ref_count.fetch_sub(1, std::memory_order_relaxed);
388 }
389 }
390
391 static void RunTaskFunction(Task* const self) noexcept
392 {
393 self->fn_storage.fn(self);
394 TaskOnFinish(self);
395 }
396
397 static TaskPtr PointerToTaskPtr(const Task* const self) noexcept
398 {
399 if (self)
400 {
401 const ThreadLocalState& worker = *system::GetWorker(self->owning_worker);
402 const TaskHandle self_index = task_pool::TaskToIndex(worker.task_allocator, self);
403
404 return TaskPtr{self->owning_worker, self_index};
405 }
406
407 return TaskPtr(nullptr);
408 }
409
410 } // namespace task
411
412 namespace worker
413 {
414 static void GarbageCollectAllocatedTasks(Job::ThreadLocalState* const worker) noexcept
415 {
416 TaskHandle* const allocated_tasks = worker->allocated_tasks;
417 Job::TaskPool& task_pool = worker->task_allocator;
418 const TaskHandleType num_tasks = worker->num_allocated_tasks;
419 TaskHandleType read_idx = 0u;
420 TaskHandleType write_idx = 0u;
421
422 while (read_idx != num_tasks)
423 {
424 const TaskHandle task_handle = allocated_tasks[read_idx++];
425 Task* const task_ptr = task_pool::TaskFromIndex(task_pool, task_handle);
426 const bool task_is_finished = task_ptr->ref_count.load(std::memory_order_acquire) == 0u;
427
428 if (task_is_finished)
429 {
430 task_pool::DeallocateTask(&task_pool, task_ptr);
431 }
432 else
433 {
434 allocated_tasks[write_idx++] = task_handle;
435 }
436 }
437
438 worker->num_allocated_tasks = write_idx;
439 }
440
441 static Job::ThreadLocalState* RandomWorker(Job::ThreadLocalState* const worker) noexcept
442 {
443 const std::uint32_t num_workers = g_JobSystem->num_workers;
444 const std::uint32_t other_worker_id = pcg32_boundedrand_r(&worker->rng_state, num_workers);
445
446 return system::GetWorker(WorkerID(other_worker_id));
447 }
448
449 static bool IsMainThread(const ThreadLocalState* const worker) noexcept
450 {
451 return worker == g_JobSystem->workers;
452 }
453
454 static bool TryRunTask(ThreadLocalState* const worker) noexcept
455 {
456 const bool is_main_thread = IsMainThread(worker);
457
458 TaskPtr task_ptr = nullptr;
459 worker->normal_queue.Pop(&task_ptr);
460
461 if (task_ptr.isNull() && !is_main_thread)
462 {
463 worker->worker_queue.Pop(&task_ptr);
464 }
465
466 const auto TrySteal = [is_main_thread, worker](ThreadLocalState* const other_worker) -> TaskPtr {
467 TaskPtr result = nullptr;
468
469 if (other_worker != worker)
470 {
471 other_worker->normal_queue.Steal(&result);
472
473 if (result.isNull() && !is_main_thread)
474 {
475 other_worker->worker_queue.Steal(&result);
476 }
477 }
478
479 return result;
480 };
481
482 if (task_ptr.isNull())
483 {
484 task_ptr = TrySteal(worker->last_stolen_worker);
485 }
486
487 if (task_ptr.isNull())
488 {
489 Job::ThreadLocalState* const random_worker = RandomWorker(worker);
490
491 task_ptr = TrySteal(random_worker);
492
493 if (task_ptr.isNull())
494 {
495 return false;
496 }
497
498 worker->last_stolen_worker = random_worker;
499 }
500
501 g_JobSystem->num_available_jobs.fetch_sub(1, std::memory_order_relaxed);
502
503 Task* const task = task::TaskPtrToPointer(task_ptr);
504 task::RunTaskFunction(task);
505
506 return true;
507 }
508
509 static void WaitForAllThreadsReady(Job::JobSystemContext* const job_system) noexcept
510 {
511 Job::InitializationLock* const init_lock = &job_system->init_lock;
512
513 if ((init_lock->num_workers_ready.fetch_add(1u, std::memory_order_relaxed) + 1) == g_JobSystem->num_workers)
514 {
515 job_system->is_running.store(true, std::memory_order_relaxed);
516 init_lock->init_cv.notify_all();
517 }
518 else
519 {
520 std::unique_lock<std::mutex> lock(init_lock->init_mutex);
521 init_lock->init_cv.wait(lock, [init_lock]() -> bool {
522 return init_lock->num_workers_ready.load(std::memory_order_relaxed) == g_JobSystem->num_workers;
523 });
524 }
525 }
526
527 static Job::JobSystemContext* WorkerThreadSetup(Job::ThreadLocalState* const worker)
528 {
529 std::atomic_thread_fence(std::memory_order_acquire);
530
531 Job::JobSystemContext* const job_system = g_JobSystem;
532
533#if IS_WINDOWS
534 const HANDLE handle = GetCurrentThread();
535
536#if 0
537 // Put each thread on to dedicated core
538
539 const DWORD_PTR affinity_mask = 1ull << thread_id;
540 const DWORD_PTR affinity_result = SetThreadAffinityMask(handle, affinity_mask);
541
542 if (affinity_result > 0)
543 {
544 // Increase thread priority
545
546 // const BOOL priority_result = SetThreadPriority(handle, THREAD_PRIORITY_HIGHEST);
547 // JobAssert(priority_result != 0, "Failed to set thread priority.");
548 }
549#endif
550 // Name the thread
551
552 const unsigned int thread_index = unsigned int(worker - job_system->workers);
553
554 char thread_name[32] = u8"";
555 wchar_t thread_name_w[sizeof(thread_name)] = L"";
556
557 const char* const format = thread_index >= job_system->num_owned_workers ? "Job::User_%u" : "Job::Owned_%u";
558
559 const int c_size = std::snprintf(thread_name, sizeof(thread_name), format, thread_index);
560
561 std::mbstowcs(thread_name_w, thread_name, c_size);
562
563 const HRESULT hr = SetThreadDescription(handle, thread_name_w);
564 JobAssert(SUCCEEDED(hr), "Failed to set thread name.");
565 (void)hr;
566#endif
567
569
570 WaitForAllThreadsReady(job_system);
571
572 return job_system;
573 }
574
575 static void InitializeThread(Job::ThreadLocalState* const worker) noexcept
576 {
577 worker->thread_id = std::thread([worker]() {
578 Job::JobSystemContext* const job_system = WorkerThreadSetup(worker);
579
580 while (job_system->is_running.load(std::memory_order_relaxed))
581 {
582 if (!worker::TryRunTask(worker))
583 {
584 system::Sleep();
585 }
586 }
587 });
588 }
589
590 static ThreadLocalState* GetCurrent() noexcept
591 {
592 JobAssert(g_CurrentWorker != nullptr, "This thread was not created by the job system.");
593 return g_CurrentWorker;
594 }
595
596 static WorkerID GetCurrentID() noexcept
597 {
598 JobAssert(g_CurrentWorker != nullptr, "This thread was not created by the job system.");
600 }
601
602 static void ShutdownThread(Job::ThreadLocalState* const worker) noexcept
603 {
604 // Join throws an exception if the thread is not joinable. this should always be true.
605 worker->thread_id.join();
606 }
607 } // namespace worker
608
609 namespace task
610 {
611 static void SubmitQPushHelper(const TaskPtr task_ptr, ThreadLocalState* const worker, SPMCDeque<TaskPtr>* queue) noexcept
612 {
613 if (queue->Push(task_ptr) != SPMCDequeStatus::SUCCESS)
614 {
615 // Loop until we have successfully pushed to the queue.
616 system::WakeUpAllWorkers();
617 while (queue->Push(task_ptr) != SPMCDequeStatus::SUCCESS)
618 {
619 // If we could not push to the queues then just do some work.
620 worker::TryRunTask(worker);
621 }
622 }
623 }
624 } // namespace task
625
626 static bool IsPointerAligned(const void* const ptr, const std::size_t alignment) noexcept
627 {
628 return (reinterpret_cast<std::uintptr_t>(ptr) & (alignment - 1u)) == 0u;
629 }
630
631 static void* AlignPointer(const void* const ptr, const std::size_t alignment) noexcept
632 {
633 const std::size_t required_alignment_mask = alignment - 1;
634
635 return reinterpret_cast<void*>(reinterpret_cast<std::uintptr_t>(ptr) + required_alignment_mask & ~required_alignment_mask);
636 }
637
638 template<typename T>
639 struct Span
640 {
641 T* ptr;
642 std::size_t num_elements;
643 };
644
645 template<typename T>
646 static Span<T> LinearAlloc(void*& ptr, const std::size_t num_elements) noexcept
647 {
648 void* const result = AlignPointer(ptr, alignof(T));
649
650 ptr = static_cast<unsigned char*>(result) + sizeof(T) * num_elements;
651
652 for (std::size_t i = 0; i < num_elements; ++i)
653 {
654 new (static_cast<T*>(result) + i) T;
655 }
656
657 return Span<T>{static_cast<T*>(result), num_elements};
658 }
659
660 template<typename T>
661 static T* SpanAlloc(Span<T>* const span, const std::size_t num_elements) noexcept
662 {
663 JobAssert(num_elements <= span->num_elements, "Out of bounds span alloc.");
664
665 T* const result = span->ptr;
666
667 span->ptr += num_elements;
668 span->num_elements -= num_elements;
669
670 return result;
671 }
672
673 static std::size_t AlignedSizeUp(const std::size_t size, const std::size_t alignment) noexcept
674 {
675 const std::size_t remainder = size % alignment;
676
677 return remainder != 0 ? size + (alignment - remainder) : size;
678 }
679
680 template<typename T>
681 static void MemoryRequirementsPush(Job::JobSystemMemoryRequirements* in_out_reqs, const std::size_t num_elements) noexcept
682 {
683 in_out_reqs->byte_size = AlignedSizeUp(in_out_reqs->byte_size, alignof(T));
684 in_out_reqs->alignment = in_out_reqs->alignment < alignof(T) ? alignof(T) : in_out_reqs->alignment;
685
686 in_out_reqs->byte_size += sizeof(T) * num_elements;
687 }
688
689 static bool IsPowerOf2(const std::size_t value) noexcept
690 {
691 return (value & (value - 1)) == 0;
692 }
693
694 namespace config
695 {
696 static Job::WorkerID WorkerCount(const Job::JobSystemCreateOptions& options) noexcept
697 {
698 return (options.num_threads ? options.num_threads : Job::WorkerID(Job::NumSystemThreads())) + options.num_user_threads;
699 }
700
701 static std::uint16_t NumTasksPerWorker(const Job::JobSystemCreateOptions& options) noexcept
702 {
703 const std::size_t num_tasks_per_worker = std::size_t(options.normal_queue_size) + std::size_t(options.worker_queue_size);
704
705 JobAssert(num_tasks_per_worker <= std::uint16_t(-1), "Too many task items per worker.");
706
707 return std::uint16_t(num_tasks_per_worker);
708 }
709
710 static std::uint32_t TotalNumTasks(const Job::WorkerID num_threads, const std::uint16_t num_tasks_per_worker) noexcept
711 {
712 return num_tasks_per_worker * num_threads;
713 }
714 } // namespace config
715
716} // namespace
717
718// Public API
719
721 options{options},
722 byte_size{0},
723 alignment{0}
724{
725 JobAssert(IsPowerOf2(options.main_queue_size), "Main queue size must be a power of two.");
726 JobAssert(IsPowerOf2(options.normal_queue_size), "Normal queue size must be a power of two.");
727 JobAssert(IsPowerOf2(options.worker_queue_size), "Worker queue size must be a power of two.");
728
729 const WorkerID num_threads = config::WorkerCount(options);
730 const std::uint16_t num_tasks_per_worker = config::NumTasksPerWorker(options);
731 const std::uint32_t total_num_tasks = config::TotalNumTasks(num_threads, num_tasks_per_worker);
732
733 MemoryRequirementsPush<JobSystemContext>(this, 1u);
734 MemoryRequirementsPush<ThreadLocalState>(this, num_threads);
735 MemoryRequirementsPush<TaskMemoryBlock>(this, total_num_tasks);
736 MemoryRequirementsPush<TaskPtr>(this, options.main_queue_size);
737 MemoryRequirementsPush<AtomicTaskPtr>(this, total_num_tasks);
738 MemoryRequirementsPush<TaskHandle>(this, total_num_tasks);
739}
740
741Job::InitializationToken Job::Initialize(const Job::JobSystemMemoryRequirements& memory_requirements, void* memory) noexcept
742{
743 JobAssert(g_JobSystem == nullptr, "Already initialized.");
744
745 const bool needs_delete = memory == nullptr;
746
747 if (!memory)
748 {
749 memory = ::operator new[](memory_requirements.byte_size, std::align_val_t{memory_requirements.alignment});
750 }
751
752 JobAssert(memory != nullptr, "memory must be a valid pointer.");
753 JobAssert(IsPointerAligned(memory, memory_requirements.alignment), "memory must be a aligned to `memory_requirements.alignment`.");
754
755 const JobSystemCreateOptions& options = memory_requirements.options;
756 const std::uint64_t rng_seed = options.job_steal_rng_seed;
757 const WorkerID num_threads = config::WorkerCount(options);
758 const WorkerID owned_threads = num_threads - options.num_user_threads;
759 const std::uint16_t num_tasks_per_worker = config::NumTasksPerWorker(options);
760 const std::uint32_t total_num_tasks = config::TotalNumTasks(num_threads, num_tasks_per_worker);
761
762 void* alloc_ptr = memory;
763 JobSystemContext* job_system = LinearAlloc<JobSystemContext>(alloc_ptr, 1u).ptr;
764 Span<ThreadLocalState> all_workers = LinearAlloc<ThreadLocalState>(alloc_ptr, num_threads);
765 Span<TaskMemoryBlock> all_tasks = LinearAlloc<TaskMemoryBlock>(alloc_ptr, total_num_tasks);
766 Span<TaskPtr> main_tasks_ptrs = LinearAlloc<TaskPtr>(alloc_ptr, options.main_queue_size);
767 Span<AtomicTaskPtr> worker_task_ptrs = LinearAlloc<AtomicTaskPtr>(alloc_ptr, total_num_tasks);
768 Span<TaskHandle> all_task_handles = LinearAlloc<TaskHandle>(alloc_ptr, total_num_tasks);
769
770 job_system->main_queue.Initialize(SpanAlloc(&main_tasks_ptrs, options.main_queue_size), options.main_queue_size);
771 job_system->workers = all_workers.ptr;
772 job_system->num_workers = num_threads;
773 job_system->num_owned_workers = owned_threads;
774 job_system->num_user_threads_setup.store(0, std::memory_order_relaxed);
775 job_system->num_tasks_per_worker = num_tasks_per_worker;
776 job_system->sys_arch_str = "Unknown Arch";
777 job_system->num_available_jobs.store(0, std::memory_order_relaxed);
778 job_system->needs_delete = needs_delete;
779 job_system->system_alloc_size = memory_requirements.byte_size;
780 job_system->system_alloc_alignment = memory_requirements.alignment;
781 job_system->init_lock.num_workers_ready.store(1u, std::memory_order_relaxed); // Main thread already initialized.
782
783#if IS_WINDOWS
784 SYSTEM_INFO sysinfo;
785 GetSystemInfo(&sysinfo);
786
787 switch (sysinfo.wProcessorArchitecture)
788 {
789 case PROCESSOR_ARCHITECTURE_AMD64:
790 {
791 job_system->sys_arch_str = "x64 (Intel or AMD)";
792 break;
793 }
794 case PROCESSOR_ARCHITECTURE_ARM:
795 {
796 job_system->sys_arch_str = "ARM";
797 break;
798 }
799 case PROCESSOR_ARCHITECTURE_ARM64:
800 {
801 job_system->sys_arch_str = "ARM64";
802 break;
803 }
804 case PROCESSOR_ARCHITECTURE_IA64:
805 {
806 job_system->sys_arch_str = "Intel Itanium-Based";
807 break;
808 }
809 case PROCESSOR_ARCHITECTURE_INTEL:
810 {
811 job_system->sys_arch_str = "Intel x86";
812 break;
813 }
814 case PROCESSOR_ARCHITECTURE_UNKNOWN:
815 default:
816 {
817 job_system->sys_arch_str = "Unknown Arch";
818 break;
819 }
820 }
821#endif
822
823 ThreadLocalState* const main_thread_worker = job_system->workers;
824
825 for (std::uint64_t worker_index = 0; worker_index < num_threads; ++worker_index)
826 {
827 ThreadLocalState* const worker = SpanAlloc(&all_workers, 1u);
828
829 worker->normal_queue.Initialize(SpanAlloc(&worker_task_ptrs, options.normal_queue_size), options.normal_queue_size);
830 worker->worker_queue.Initialize(SpanAlloc(&worker_task_ptrs, options.worker_queue_size), options.worker_queue_size);
831 task_pool::Initialize(&worker->task_allocator, SpanAlloc(&all_tasks, num_tasks_per_worker), num_tasks_per_worker);
832 worker->allocated_tasks = SpanAlloc(&all_task_handles, num_tasks_per_worker);
833 worker->num_allocated_tasks = 0u;
834 pcg32_srandom_r(&worker->rng_state, worker_index + rng_seed, worker_index * 2u + 1u + rng_seed);
835 worker->last_stolen_worker = main_thread_worker;
836 }
837
838 g_JobSystem = job_system;
839 g_CurrentWorker = main_thread_worker;
840
841 std::atomic_thread_fence(std::memory_order_release);
842 for (std::uint64_t worker_index = 1; worker_index < owned_threads; ++worker_index)
843 {
844 worker::InitializeThread(job_system->workers + worker_index);
845 }
846
847 JobAssert(all_workers.num_elements == 0u, "All elements expected to be allocated out.");
848 JobAssert(all_tasks.num_elements == 0u, "All elements expected to be allocated out.");
849 JobAssert(main_tasks_ptrs.num_elements == 0u, "All elements expected to be allocated out.");
850 JobAssert(worker_task_ptrs.num_elements == 0u, "All elements expected to be allocated out.");
851 JobAssert(all_task_handles.num_elements == 0u, "All elements expected to be allocated out.");
852
853 return Job::InitializationToken{owned_threads};
854}
855
857{
858 Job::JobSystemContext* const job_system = g_JobSystem;
859 const std::uint32_t user_thread_id = job_system->num_owned_workers + job_system->num_user_threads_setup.fetch_add(1, std::memory_order_relaxed);
860
861 JobAssert(user_thread_id < job_system->num_workers, "Too many calls to `SetupUserThread`.");
862
863 worker::WorkerThreadSetup(job_system->workers + user_thread_id);
864}
865
866std::size_t Job::NumSystemThreads() noexcept
867{
868#if IS_SINGLE_THREADED
869 return 1;
870#else
871 const auto n = std::thread::hardware_concurrency();
872 return n != 0 ? n : 1;
873#endif
874
875#if 0
876
877#if IS_WINDOWS
878 SYSTEM_INFO sysinfo;
879 GetSystemInfo(&sysinfo);
880 return sysinfo.dwNumberOfProcessors;
881#elif IS_POSIX
882 return sysconf(_SC_NPROCESSORS_ONLN) /* * 2*/;
883#elif 0 // FreeBSD, MacOS X, NetBSD, OpenBSD
884 nt mib[4];
885 int numCPU;
886 std::size_t len = sizeof(numCPU);
887
888 /* set the mib for hw.ncpu */
889 mib[0] = CTL_HW;
890 mib[1] = HW_AVAILCPU; // alternatively, try HW_NCPU;
891
892 /* get the number of CPUs from the system */
893 sysctl(mib, 2, &numCPU, &len, NULL, 0);
894
895 if (numCPU < 1)
896 {
897 mib[1] = HW_NCPU;
898 sysctl(mib, 2, &numCPU, &len, NULL, 0);
899 if (numCPU < 1)
900 numCPU = 1;
901 }
902
903 return numCPU;
904#elif 0 // HPUX
905 return mpctl(MPC_GETNUMSPUS, NULL, NULL);
906#elif 0 // IRIX
907 return sysconf(_SC_NPROC_ONLN);
908#elif 0 // Objective-C (Mac OS X >=10.5 or iOS)
909 NSUInteger a = [[NSProcessInfo processInfo] processorCount];
910 NSUInteger b = [[NSProcessInfo processInfo] activeProcessorCount];
911
912 return a;
913#endif
914
915#endif
916}
917
918std::uint16_t Job::NumWorkers() noexcept
919{
920 return std::uint16_t(g_JobSystem->num_workers);
921}
922
923const char* Job::ProcessorArchitectureName() noexcept
924{
926}
927
929{
930 JobAssert(g_CurrentWorker != nullptr, "This thread was not created by the job system.");
932}
933
934bool Job::IsMainThread() noexcept
935{
937}
938
939void Job::Shutdown() noexcept
940{
941 JobAssert(g_JobSystem != nullptr, "Cannot shutdown when never initialized.");
942
943 static_assert(std::is_trivially_destructible_v<TaskMemoryBlock>, "TaskMemoryBlock's destructor not called.");
944 static_assert(std::is_trivially_destructible_v<TaskPtr>, "TaskPtr's destructor not called.");
945 static_assert(std::is_trivially_destructible_v<AtomicTaskPtr>, "AtomicTaskPtr's destructor not called.");
946 static_assert(std::is_trivially_destructible_v<TaskHandle>, "TaskHandle's destructor not called.");
947
948 JobSystemContext* const job_system = g_JobSystem;
949 const std::uint32_t num_workers = job_system->num_owned_workers;
950
951 // Incase all threads are not initialized by the time shutdown is called.
952 while (job_system->is_running.load(std::memory_order_relaxed) != true) {}
953
954 {
955 std::unique_lock<std::mutex> lock(job_system->worker_sleep_mutex);
956 job_system->is_running.store(false, std::memory_order_relaxed);
957 }
958
959 // Allow one last update loop to allow them to end.
960 system::WakeUpAllWorkers();
961
962 for (std::uint32_t i = 0; i < num_workers; ++i)
963 {
964 ThreadLocalState* const worker = job_system->workers + i;
965
966 if (i != 0)
967 {
968 worker::ShutdownThread(worker);
969 }
970
971 worker->~ThreadLocalState();
972 }
973
974 const bool needs_delete = job_system->needs_delete;
975
976 job_system->~JobSystemContext();
977 g_CurrentWorker = nullptr;
978 g_JobSystem = nullptr;
979
980 if (needs_delete)
981 {
982 ::operator delete[](job_system, job_system->system_alloc_size, std::align_val_t{job_system->system_alloc_alignment});
983 }
984}
985
986Task* Job::TaskMake(const TaskFn function, Task* const parent) noexcept
987{
988 const WorkerID worker_id = worker::GetCurrentID();
989 ThreadLocalState* const worker = system::GetWorker(worker_id);
990 const std::uint32_t max_tasks_per_worker = g_JobSystem->num_tasks_per_worker;
991
992 if (worker->num_allocated_tasks == max_tasks_per_worker)
993 {
994 worker::GarbageCollectAllocatedTasks(worker);
995
996 if (worker->num_allocated_tasks == max_tasks_per_worker)
997 {
998 // While we cannot allocate do some work.
999 system::WakeUpAllWorkers();
1000 while (worker->num_allocated_tasks == max_tasks_per_worker)
1001 {
1002 worker::TryRunTask(worker);
1003 worker::GarbageCollectAllocatedTasks(worker);
1004 }
1005 }
1006 }
1007
1008 JobAssert(worker->num_allocated_tasks < max_tasks_per_worker, "Too many tasks allocated.");
1009
1010 Task* const task = task_pool::AllocateTask(&worker->task_allocator, worker_id, function, task::PointerToTaskPtr(parent));
1011 const TaskHandle task_hdl = task_pool::TaskToIndex(worker->task_allocator, task);
1012
1013 if (parent)
1014 {
1015 parent->num_unfinished_tasks.fetch_add(1u, std::memory_order_release);
1016 }
1017
1018 worker->allocated_tasks[worker->num_allocated_tasks++] = task_hdl;
1019
1020 return task;
1021}
1022
1023TaskData Job::TaskGetData(Task* const task, const std::size_t alignment) noexcept
1024{
1025 Byte* const user_storage_start = static_cast<Byte*>(AlignPointer(task->user_data + task->user_data_start, alignment));
1026 const Byte* const user_storage_end = std::end(task->user_data);
1027
1028 if (user_storage_start <= user_storage_end)
1029 {
1030 const std::size_t user_storage_size = user_storage_end - user_storage_start;
1031
1032 return {user_storage_start, user_storage_size};
1033 }
1034
1035 return {nullptr, 0u};
1036}
1037
1038void Job::TaskAddContinuation(Task* const self, Task* const continuation, const QueueType queue) noexcept
1039{
1040 JobAssert(self->q_type == k_InvalidQueueType, "The parent task should not have already been submitted to a queue.");
1041 JobAssert(continuation->q_type == k_InvalidQueueType, "A continuation must not have already been submitted to a queue or already added as a continuation.");
1042 JobAssert(continuation->next_continuation.isNull(), "A continuation must not have already been added to another task.");
1043
1044 const TaskPtr new_head = task::PointerToTaskPtr(continuation);
1045 continuation->q_type = queue;
1046 continuation->next_continuation = self->first_continuation.load(std::memory_order_relaxed);
1047
1048 while (!std::atomic_compare_exchange_strong(&self->first_continuation, &continuation->next_continuation, new_head))
1049 {
1050 }
1051}
1052
1053void Job::TaskIncRef(Task* const task) noexcept
1054{
1055 const auto old_ref_count = task->ref_count.fetch_add(1, std::memory_order_relaxed);
1056
1057 JobAssert(old_ref_count >= std::int16_t(1) || task->q_type == k_InvalidQueueType, "First call to taskIncRef should not happen after the task has been submitted.");
1058 (void)old_ref_count;
1059}
1060
1061void Job::TaskDecRef(Task* const task) noexcept
1062{
1063 const auto old_ref_count = task->ref_count.fetch_sub(1, std::memory_order_relaxed);
1064
1065 JobAssert(old_ref_count >= 0, "taskDecRef: Called too many times.");
1066 (void)old_ref_count;
1067}
1068
1069bool Job::TaskIsDone(const Task* const task) noexcept
1070{
1071 return task->num_unfinished_tasks.load(std::memory_order_acquire) == -1;
1072}
1073
1074void Job::TaskSubmit(Task* const self, QueueType queue) noexcept
1075{
1076 JobAssert(self->q_type == k_InvalidQueueType, "A task cannot be submitted to a queue multiple times.");
1077
1078 const WorkerID num_workers = NumWorkers();
1079
1080 // If we only have one thread running using the worker queue is invalid.
1081 if (num_workers == 1u && queue == QueueType::WORKER)
1082 {
1083 queue = QueueType::NORMAL;
1084 }
1085
1086 ThreadLocalState* const worker = worker::GetCurrent();
1087 const TaskPtr task_ptr = task::PointerToTaskPtr(self);
1088
1089 self->q_type = queue;
1090
1091 switch (queue)
1092 {
1093 case QueueType::NORMAL:
1094 {
1095 task::SubmitQPushHelper(task_ptr, worker, &worker->normal_queue);
1096 break;
1097 }
1098 case QueueType::MAIN:
1099 {
1100 LockedQueue<TaskPtr>* const main_queue = &g_JobSystem->main_queue;
1101
1102 // NOTE(SR):
1103 // The only way `main_queue` will be emptied
1104 // is by the main thread, so there is a chance
1105 // that if it does not get flushed frequently
1106 // enough then we have a this thread spinning indefinitely.
1107 //
1108 while (!main_queue->Push(task_ptr))
1109 {
1110 // If we could not push to the queue then just do some work.
1111 worker::TryRunTask(worker);
1112 }
1113 break;
1114 }
1115 case QueueType::WORKER:
1116 {
1117 task::SubmitQPushHelper(task_ptr, worker, &worker->worker_queue);
1118 break;
1119 }
1120 default:
1121#if defined(__GNUC__) // GCC, Clang, ICC
1122 __builtin_unreachable();
1123#elif defined(_MSC_VER) // MSVC
1124 __assume(false);
1125#endif
1126 break;
1127 }
1128
1129 if (queue != QueueType::MAIN)
1130 {
1131 const std::int32_t num_pending_jobs = g_JobSystem->num_available_jobs.fetch_add(1, std::memory_order_relaxed);
1132
1133 if (num_pending_jobs >= num_workers)
1134 {
1135 system::WakeUpAllWorkers();
1136 }
1137 else
1138 {
1139 system::WakeUpOneWorker();
1140 }
1141 }
1142}
1143
1144void Job::WaitOnTask(const Task* const task) noexcept
1145{
1146 const WorkerID worker_id = CurrentWorker();
1147
1148 JobAssert(task->q_type != k_InvalidQueueType, "The Task must be submitted to a queue before you wait on it.");
1149 JobAssert(task->owning_worker == worker_id, "You may only call this function with a task created on the current 'Worker'.");
1150
1151 system::WakeUpAllWorkers();
1152
1153 ThreadLocalState* const worker = system::GetWorker(worker_id);
1154
1155 while (!TaskIsDone(task))
1156 {
1157 worker::TryRunTask(worker);
1158 }
1159}
1160
1161void Job::TaskSubmitAndWait(Task* const self, const QueueType queue) noexcept
1162{
1163 TaskSubmit(self, queue);
1164 WaitOnTask(self);
1165}
1166
1167// Member Fn Definitions
1168
1169Task::Task(WorkerID worker, TaskFn fn, TaskPtr parent) noexcept :
1170 fn_storage{fn},
1171 num_unfinished_tasks{1},
1172 ref_count{1},
1173 parent{parent},
1174 first_continuation{nullptr},
1175 next_continuation{nullptr},
1176 owning_worker{worker},
1177 q_type{k_InvalidQueueType}, /* Set to a valid value in 'Job::submitTask' */
1178 user_data_start{0},
1179 user_data{}
1180{
1181}
1182
1183#if defined(_MSC_VER)
1184#define NativePause YieldProcessor
1185#elif defined(__clang__) && defined(__SSE__) || defined(__INTEL_COMPILER) // || defined(__GNUC_PREREQ) && (__GNUC_PREREQ(4, 7) && defined(__SSE__))
1186#include <xmmintrin.h>
1187#define NativePause _mm_pause
1188#elif defined(__arm__)
1189#ifdef __CC_ARM
1190#define NativePause __yield
1191#else
1192#define NativePause() __asm__ __volatile__("yield")
1193#endif
1194#else
1195#define NativePause std::this_thread::yield
1196#endif
1197
1198void Job::PauseProcessor() noexcept
1199{
1200 NativePause();
1201}
1202
1203#undef NativePause
1204
1205void Job::YieldTimeSlice() noexcept
1206{
1207 // Windows : SwitchToThread()
1208 // Linux : sched_yield()
1209 std::this_thread::yield();
1210}
1211
1212// Private Helpers
1213
1215{
1216 return task->q_type;
1217}
1218
1219void* Job::detail::taskGetPrivateUserData(Task* const task, const std::size_t alignment) noexcept
1220{
1221 return AlignPointer(task->user_data, alignment);
1222}
1223
1224void* Job::detail::taskReservePrivateUserData(Task* const task, const std::size_t num_bytes, const std::size_t alignment) noexcept
1225{
1226 const Byte* const user_storage_end = std::end(task->user_data);
1227 Byte* const requested_storage_start = static_cast<Byte*>(AlignPointer(task->user_data, alignment));
1228 const Byte* const requested_storage_end = requested_storage_start + num_bytes;
1229
1230 JobAssert(requested_storage_end <= user_storage_end, "Cannot store object within the task's user storage. ");
1231
1232 task->user_data_start = static_cast<std::uint8_t>(requested_storage_end - task->user_data);
1233
1234 return requested_storage_start;
1235}
1236
1238{
1239 JobAssert(worker::IsMainThread(worker::GetCurrent()), "Must only be called by main thread.");
1240
1241 TaskPtr task_ptr;
1242 if (g_JobSystem->main_queue.Pop(&task_ptr))
1243 {
1244 Task* const task = task::TaskPtrToPointer(task_ptr);
1245 task::RunTaskFunction(task);
1246 return true;
1247 }
1248
1249 return false;
1250}
1251
1252#undef IS_WINDOWS
1253#undef IS_POSIX
1254#undef IS_SINGLE_THREADED
1255
1256#if defined(_MSC_VER)
1257
1258#pragma warning(push)
1259#pragma warning(disable : 4244)
1260#pragma warning(disable : 4146)
1261
1262#endif
1263
1264#include "pcg_basic.c"
1265
1266#if defined(_MSC_VER)
1267
1268#pragma warning(pop)
1269
1270#endif
1271
1272/******************************************************************************/
1273/*
1274 MIT License
1275
1276 Copyright (c) 2020-2024 Shareef Abdoul-Raheem
1277
1278 Permission is hereby granted, free of charge, to any person obtaining a copy
1279 of this software and associated documentation files (the "Software"), to deal
1280 in the Software without restriction, including without limitation the rights
1281 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
1282 copies of the Software, and to permit persons to whom the Software is
1283 furnished to do so, subject to the following conditions:
1284
1285 The above copyright notice and this permission notice shall be included in all
1286 copies or substantial portions of the Software.
1287
1288 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1289 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
1290 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1291 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1292 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
1293 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
1294 SOFTWARE.
1295*/
1296/******************************************************************************/
bool Push(const T &value)
Definition: job_queue.hpp:60
API for a multi-threading job system.
Assertion macro for this library.
#define JobAssert(expr, msg)
Definition: job_assert.hpp:27
Concurrent Queue Implmementations for different situations.
static thread_local Job::ThreadLocalState * g_CurrentWorker
Definition: job_system.cpp:227
#define NativePause
static Job::JobSystemContext * g_JobSystem
Definition: job_system.cpp:226
void * taskReservePrivateUserData(Task *const task, const std::size_t num_bytes, const std::size_t alignment) noexcept
void * taskGetPrivateUserData(Task *const task, const std::size_t alignment) noexcept
QueueType taskQType(const Task *task) noexcept
bool mainQueueTryRunTask(void) noexcept
void assertHandler(const bool condition, const char *const filename, const int line_number, const char *const msg)
Definition: job_api.hpp:30
LockedQueue< TaskPtr > main_queue
Definition: job_system.cpp:217
std::atomic< TaskPtr > AtomicTaskPtr
Definition: job_system.cpp:115
std::uint64_t job_steal_rng_seed
The RNG for work queue stealing will be seeded with this value.
Definition: job_api.hpp:92
InitializationLock init_lock
Definition: job_system.cpp:208
TaskData TaskGetData(Task *const task, const std::size_t alignment) noexcept
Returns you the user-data buffer you way write to get data into your TaskFn.
SPMCDeque< TaskPtr > worker_queue
Definition: job_system.cpp:183
void TaskSubmitAndWait(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Same as calling taskSubmit followed by waitOnTask.
std::condition_variable worker_sleep_cv
Definition: job_system.cpp:219
void WaitOnTask(const Task *const task) noexcept
Waits until the specified task is done executing. This function will block but do work while being bl...
void(*)(Task *) TaskFn
The signature of the type of function for a single Task.
Definition: job_api.hpp:55
std::atomic_int32_t AtomicInt32
Definition: job_system.cpp:86
std::atomic_uint32_t num_user_threads_setup
Definition: job_system.cpp:206
void TaskAddContinuation(Task *const self, Task *const continuation, const QueueType queue=QueueType::NORMAL) noexcept
A 'continuation' is a task that will be added to a queue after the 'self' Task has finished running.
InitializationToken Initialize(const JobSystemMemoryRequirements &memory_requirements={}, void *const memory=nullptr) noexcept
Sets up the Job system and creates all the worker threads. The thread that calls 'Job::Initialize' is...
Definition: job_system.cpp:741
TaskMemoryBlock * freelist
Definition: job_system.cpp:177
TaskMemoryBlock * next
Definition: job_system.cpp:169
static constexpr TaskHandle NullTaskHandle
Definition: job_system.cpp:89
void YieldTimeSlice() noexcept
Asks the OS to yield this threads execution to another thread on the current cpu core.
std::uint16_t NumWorkers() noexcept
Returns the number of workers created by the system. This function can be called by any thread concur...
Definition: job_system.cpp:918
ThreadLocalState * last_stolen_worker
Definition: job_system.cpp:187
std::uint16_t normal_queue_size
Number of tasks in each worker's QueueType::NORMAL queue. (Must be power of two)
Definition: job_api.hpp:90
std::uint16_t TaskHandle
Definition: job_system.cpp:82
QueueType
Determines which threads the task will be allowed to run on.
Definition: job_api.hpp:46
@ MAIN
Tasks in this queue will only be run by the main thread.
@ NORMAL
Tasks in this queue will run on either the main or worker threads.
@ WORKER
Tasks in this queue will never run on the main thread.
TaskHandleType num_allocated_tasks
Definition: job_system.cpp:186
Task * TaskMake(const TaskFn function, Task *const parent=nullptr) noexcept
Creates a new Task that should be later submitted by calling 'TaskSubmit'.
Definition: job_system.cpp:986
pcg_state_setseq_64 rng_state
Definition: job_system.cpp:188
SPMCDeque< TaskPtr > normal_queue
Definition: job_system.cpp:182
ThreadLocalState * workers
Definition: job_system.cpp:203
const char * ProcessorArchitectureName() noexcept
An implementation defined name for the CPU architecture of the device. This function can be called by...
Definition: job_system.cpp:923
std::uint32_t num_owned_workers
Definition: job_system.cpp:205
std::uint32_t num_workers
Definition: job_system.cpp:204
std::uint32_t num_tasks_per_worker
Definition: job_system.cpp:207
std::atomic_bool is_running
Definition: job_system.cpp:213
unsigned char storage[sizeof(Task)]
Definition: job_system.cpp:170
const char * sys_arch_str
Definition: job_system.cpp:209
void SetupUserThread()
Must be called in the callstack of the thread to be setup.
Definition: job_system.cpp:856
std::atomic_uint32_t num_available_jobs
Definition: job_system.cpp:220
void Shutdown() noexcept
This will deallocate any memory used by the system and shutdown any threads created by 'bfJob::initia...
Definition: job_system.cpp:939
std::uint16_t main_queue_size
Number of tasks in the job system's QueueType::MAIN queue. (Must be power of two)
Definition: job_api.hpp:89
static constexpr std::size_t k_ExpectedTaskSize
Definition: job_system.cpp:77
std::atomic_uint32_t num_workers_ready
Definition: job_system.cpp:196
std::size_t system_alloc_alignment
Definition: job_system.cpp:211
std::mutex worker_sleep_mutex
Definition: job_system.cpp:218
WorkerID CurrentWorker() noexcept
The current id of the current thread. This function can be called by any thread concurrently.
Definition: job_system.cpp:928
void TaskIncRef(Task *const task) noexcept
Increments the task's ref count preventing it from being garbage collected.
static constexpr QueueType k_InvalidQueueType
Definition: job_system.cpp:78
TaskHandle * allocated_tasks
Definition: job_system.cpp:185
std::size_t system_alloc_size
Definition: job_system.cpp:210
std::uint16_t WorkerID
The id type of each worker thread.
Definition: job_api.hpp:54
std::uint8_t num_user_threads
The number of threads not owned by this system but wants access to the Job API (The thread must call ...
Definition: job_api.hpp:87
bool IsMainThread() noexcept
Allows for querying if we are currently executing in the main thread.
Definition: job_system.cpp:934
void PauseProcessor() noexcept
CPU pause instruction to indicate when you are in a spin wait loop.
TaskMemoryBlock * memory
Definition: job_system.cpp:176
std::uint16_t worker_queue_size
Number of tasks in each worker's QueueType::WORKER queue. (Must be power of two)
Definition: job_api.hpp:91
void TaskDecRef(Task *const task) noexcept
Decrements the task's ref count allow it to be garbage collected.
std::size_t NumSystemThreads() noexcept
Makes some system calls to grab the number threads / processors on the device. This function can be c...
Definition: job_system.cpp:866
std::atomic< TaskHandle > AtomicTaskHandleType
Definition: job_system.cpp:84
unsigned char Byte
Definition: job_system.cpp:87
void TaskSubmit(Task *const self, const QueueType queue=QueueType::NORMAL) noexcept
Submits the task to the specified queue.
std::condition_variable init_cv
Definition: job_system.cpp:195
std::uint8_t num_threads
Use 0 to indicate using the number of cores available on the system.
Definition: job_api.hpp:88
static constexpr std::size_t k_CachelineSize
Definition: job_system.cpp:74
WorkerID WorkerIDType
Definition: job_system.cpp:85
@ SUCCESS
Returned from Push, Pop and Steal.
TaskHandle TaskHandleType
Definition: job_system.cpp:83
bool TaskIsDone(const Task *const task) noexcept
Returns the done status of the task.
The runtime configuration for the Job System.
Definition: job_api.hpp:86
A buffer for user-data you can write to, maybe large enough to store task data inline.
Definition: job_api.hpp:204
The memory requirements for a given configuration JobSystemCreateOptions.
Definition: job_api.hpp:100
JobSystemMemoryRequirements(const JobSystemCreateOptions &options={}) noexcept
Definition: job_system.cpp:720
TaskFnStorage fn_storage
The function that will be run.
Definition: job_system.cpp:150
AtomicInt32 ref_count
Keeps the task from being garbage collected.
Definition: job_system.cpp:152
QueueType q_type
The queue type this task has been submitted to, initialized to k_InvalidQueueType.
Definition: job_system.cpp:157
TaskPtr next_continuation
Next element in the linked list of continuations.
Definition: job_system.cpp:155
AtomicInt32 num_unfinished_tasks
The number of children tasks.
Definition: job_system.cpp:151
WorkerID owning_worker
The worker this task has been created on, needed for Task::toTaskPtr and various assertions.
Definition: job_system.cpp:156
Task(WorkerID worker, TaskFn fn, TaskPtr parent) noexcept
AtomicTaskPtr first_continuation
Head of linked list of tasks to be added on completion.
Definition: job_system.cpp:154
TaskPtr parent
The parent task, can be null.
Definition: job_system.cpp:153
std::uint8_t user_data_start
Offset into padding that can be used for user data.
Definition: job_system.cpp:158
bool isNull() const noexcept
Definition: job_system.cpp:112
TaskHandle task_index
Definition: job_system.cpp:96
WorkerID worker_id
Definition: job_system.cpp:95
TaskPtr() noexcept=default
TaskPtr(std::nullptr_t) noexcept
Definition: job_system.cpp:106
std::uint64_t pad
Definition: job_system.cpp:126
TaskFnStorage(const TaskFn fn) noexcept
Definition: job_system.cpp:128