Skip to content

Commit

Permalink
[MOD] Use linked-list for strands.
Browse files Browse the repository at this point in the history
  • Loading branch information
paintdream committed May 28, 2024
1 parent c6bfe39 commit f8402d6
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 84 deletions.
215 changes: 132 additions & 83 deletions src/iris_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,61 +38,7 @@ SOFTWARE.
#include <condition_variable>

namespace iris {
namespace impl {
// storage for queued tasks
template <typename queue_buffer_t, bool>
struct storage_t {
storage_t() noexcept {}
storage_t(storage_t&& rhs) noexcept {
queue_buffer = std::move(rhs.queue_buffer);
}

storage_t& operator = (storage_t&& rhs) noexcept {
if (this != &rhs) {
queue_buffer = std::move(rhs.queue_buffer);
}

return *this;
}

bool empty() const noexcept {
return queue_buffer.empty();
}

queue_buffer_t queue_buffer;
std::mutex mutex;
};

template <typename queue_buffer_t>
struct storage_t<queue_buffer_t, false> {
storage_t() noexcept : current_version(0), next_version(0) {
barrier_version.store(0, std::memory_order_relaxed);
}

storage_t(storage_t&& rhs) noexcept {
barrier_version.store(rhs.barrier_version.load(std::memory_order_acquire), std::memory_order_release);
queue_buffers = std::move(rhs.queue_buffers);
queue_versions = std::move(rhs.queue_versions);
current_version = rhs.current_version;
next_version = rhs.next_version;
}

bool empty() const noexcept {
for (size_t i = 0; i < queue_buffers.size(); i++) {
if (!queue_buffers[i].empty())
return false;
}

return true;
}

std::atomic<size_t> barrier_version;
std::vector<queue_buffer_t> queue_buffers;
std::vector<size_t> queue_versions;
size_t current_version;
size_t next_version;
};

namespace impl {
// for exception safe, roll back atomic operations as needed
enum guard_operation {
add, sub, invalidate
Expand Down Expand Up @@ -251,6 +197,54 @@ namespace iris {
callable_t callable;
};

// storage for queued tasks
struct chain_storage_t {
chain_storage_t() noexcept : executing_head(nullptr) {
queueing_head.store(nullptr, std::memory_order_relaxed);
}

chain_storage_t(chain_storage_t&& rhs) noexcept : executing_head(rhs.executing_head) {
rhs.executing_head = nullptr;
rhs.queueing_head.store(rhs.queueing_head.load(std::memory_order_acquire), std::memory_order_release);
}

bool empty() const noexcept {
return executing_head == nullptr && queueing_head.load(std::memory_order_acquire) == nullptr;
}

task_t* executing_head;
std::atomic<task_t*> queueing_head;
};

struct grid_storage_t {
grid_storage_t() noexcept : current_version(0), next_version(0) {
barrier_version.store(0, std::memory_order_relaxed);
}

grid_storage_t(grid_storage_t&& rhs) noexcept {
barrier_version.store(rhs.barrier_version.load(std::memory_order_acquire), std::memory_order_release);
queue_buffers = std::move(rhs.queue_buffers);
queue_versions = std::move(rhs.queue_versions);
current_version = rhs.current_version;
next_version = rhs.next_version;
}

bool empty() const noexcept {
for (size_t i = 0; i < queue_buffers.size(); i++) {
if (!queue_buffers[i].empty())
return false;
}

return true;
}

std::atomic<size_t> barrier_version;
std::vector<queue_buffer_t> queue_buffers;
std::vector<size_t> queue_versions;
size_t current_version;
size_t next_version;
};

// do not copy this structure, only to move
iris_warp_t(const iris_warp_t& rhs) = delete;
iris_warp_t& operator = (const iris_warp_t& rhs) = delete;
Expand All @@ -260,17 +254,17 @@ namespace iris {
// for warps, we prepare one queue for each thread to remove mutex requirements

template <bool s>
typename std::enable_if<s>::type init_buffers(size_t thread_count) noexcept {}
typename std::enable_if<s>::type init_storage(size_t thread_count) noexcept {}

template <bool s>
typename std::enable_if<!s>::type init_buffers(size_t thread_count) noexcept(noexcept(std::declval<iris_warp_t>().storage.queue_buffers.resize(thread_count))) {
typename std::enable_if<!s>::type init_storage(size_t thread_count) noexcept(noexcept(std::declval<iris_warp_t>().storage.queue_buffers.resize(thread_count))) {
storage.queue_buffers.resize(thread_count);
storage.queue_versions.resize(thread_count);
}

// initialize with specified priority, all tasks that runs on this warp will be scheduled with this priority
explicit iris_warp_t(async_worker_t& worker, size_t prior = 0) : async_worker(worker), priority(prior), stack_next_warp(nullptr) {
init_buffers<strand>(worker.get_thread_count());
init_storage<strand>(worker.get_thread_count());

thread_warp.store(nullptr, std::memory_order_relaxed);
parallel_task_head.store(nullptr, std::memory_order_relaxed);
Expand Down Expand Up @@ -298,7 +292,8 @@ namespace iris {
// execute remaining tasks on destruction
while (!join<true, true>()) {}

IRIS_ASSERT(parallel_task_head.load(std::memory_order_relaxed) == nullptr);
IRIS_ASSERT(parallel_task_head.load(std::memory_order_acquire) == nullptr);
IRIS_ASSERT(storage.empty());
}

// get stack warp pointer
Expand Down Expand Up @@ -391,7 +386,7 @@ namespace iris {
template <typename callable_t>
void queue_routine_external(callable_t&& func) {
IRIS_ASSERT(async_worker.get_current_thread_index() == ~size_t(0));
async_worker.queue(external_t<typename std::remove_reference<callable_t>::type>(*this, std::forward<callable_t>(func)), priority);
queue_routine_external_internal<strand>(std::forward<callable_t>(func));
}

// queue a barrier here, any routines queued after this barrier must be scheduled after any routines before this barrier
Expand Down Expand Up @@ -493,9 +488,18 @@ namespace iris {
}
}

template <bool s, typename callable_t>
typename std::enable_if<s>::type queue_routine_external_internal(callable_t&& func) {
queue_routine_post(std::forward<callable_t>(func));
}

template <bool s, typename callable_t>
typename std::enable_if<!s>::type queue_routine_external_internal(callable_t&& func) {
async_worker.queue(external_t<typename std::remove_reference<callable_t>::type>(*this, std::forward<callable_t>(func)), priority);
}

template <bool self_execute, typename callable_t>
void queue_routine_parallel_internal(callable_t&& func) {
// already detached
suspend();

suspend_guard_t guard(this);
Expand Down Expand Up @@ -550,22 +554,40 @@ namespace iris {
queueing.store(queue_state_executing, std::memory_order_release);
iris_warp_t** warp_ptr = &get_current_warp_internal();
IRIS_ASSERT(*warp_ptr == this);
queue_buffer_t& buffer = storage.queue_buffer;

// execute tasks in queue_buffer until suspended
size_t execute_counter;

do {
execute_counter = 0;
while (!buffer.empty()) {
typename queue_buffer_t::element_t func = std::move(buffer.top());
buffer.pop();
task_t* p = storage.executing_head;
if (p == nullptr) {
p = storage.queueing_head.exchange(nullptr, std::memory_order_relaxed);
task_t* q = nullptr;

while (p != nullptr) {
task_t* t = p->next;
p->next = q;
q = p;
p = t;
}

func(); // we have already thread_fence acquired above
p = q;
} else {
storage.executing_head = nullptr;
}

while (p != nullptr) {
task_t* q = p->next;
p->next = nullptr;
async_worker.execute_task(p);
execute_counter++;

if ((!force && suspend_count.load(std::memory_order_relaxed) != 0) || *warp_ptr != this)
if ((!force && suspend_count.load(std::memory_order_relaxed) != 0) || *warp_ptr != this) {
IRIS_ASSERT(storage.executing_head == nullptr);
storage.executing_head = q;
return;
}

p = q;
}
} while (execute_counter != 0);
}
Expand Down Expand Up @@ -641,7 +663,9 @@ namespace iris {
queueing.store(queue_state_pending, std::memory_order_relaxed);
}
}
} else if (parallel_task_head.load(std::memory_order_acquire) != nullptr) {
}

if (parallel_task_head.load(std::memory_order_acquire) != nullptr) {
preempt_guard_t preempt_guard(*this, ~size_t(0));
if (preempt_guard) {
execute_parallel();
Expand Down Expand Up @@ -679,10 +703,14 @@ namespace iris {
// queue task from specified thread.
template <bool s, typename callable_t>
typename std::enable_if<s>::type push(callable_t&& func) {
task_t* task = async_worker.make_task(std::forward<callable_t>(func));

// avoid legacy compiler bugs
// see https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
task_t* node = storage.queueing_head.load(std::memory_order_relaxed);
do {
std::lock_guard<std::mutex> guard(storage.mutex);
storage.queue_buffer.push(std::forward<callable_t>(func));
} while (false);
task->next = node;
} while (!storage.queueing_head.compare_exchange_weak(node, task, std::memory_order_acq_rel, std::memory_order_relaxed));

flush();
}
Expand Down Expand Up @@ -713,7 +741,7 @@ namespace iris {
std::atomic<size_t> suspend_count; // current suspend count
std::atomic<size_t> queueing; // is flush request sent to async_worker? 0 : not yet, 1 : yes, 2 : is to flush right away.
std::atomic<task_t*> parallel_task_head; // linked-list for pending parallel tasks
impl::storage_t<queue_buffer_t, strand> storage; // task storage
typename std::conditional<strand, chain_storage_t, grid_storage_t>::type storage; // task storage
size_t priority;
iris_warp_t* stack_next_warp;
};
Expand Down Expand Up @@ -1153,6 +1181,8 @@ namespace iris {
~iris_async_worker_t() noexcept {
terminate();
join();

IRIS_ASSERT(task_count.load(std::memory_order_acquire) == 0);
}

// get current thread index
Expand Down Expand Up @@ -1182,6 +1212,12 @@ namespace iris {
return task;
}

void execute_task(task_t* task) {
task_count.fetch_sub(1, std::memory_order_release);
poll_guard_t guard(task_allocator, task);
task->task();
}

void queue_task(task_t* task, size_t priority = 0) {
IRIS_ASSERT(task != nullptr && task->next == nullptr);
if (!is_terminated()) {
Expand Down Expand Up @@ -1235,8 +1271,7 @@ namespace iris {
} else {
// terminate finished, just run at here
IRIS_ASSERT(get_current_thread_index_internal() == ~size_t(0));
poll_guard_t guard(task_allocator, task);
task->task();
execute_task(task);
}
}
}
Expand Down Expand Up @@ -1328,13 +1363,29 @@ namespace iris {
std::atomic<task_t*>& task_head = task_heads[i];
task_t* task = task_head.exchange(nullptr, std::memory_order_acquire);
empty = empty && (task == nullptr);

while (task != nullptr) {
task_t* p = task;
task = task->next;
task_head.store(task, std::memory_order_relaxed); // in case of exception
task_t* org = task_head.exchange(task->next, std::memory_order_release);

// return the remaining
if (org != nullptr) {
do {
task_t* next = org->next;

// avoid legacy compiler bugs
// see https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
task_t* node = task_head.load(std::memory_order_relaxed);
do {
org->next = node;
} while (!task_head.compare_exchange_weak(node, org, std::memory_order_relaxed, std::memory_order_relaxed));

org = next;
} while (org != nullptr);
}

poll_guard_t guard(task_allocator, p);
p->task();
execute_task(p);
task = task_head.exchange(nullptr, std::memory_order_acquire);
}
}

Expand Down Expand Up @@ -1394,10 +1445,8 @@ namespace iris {
wakeup_one_with_priority(priority);
}

task_count.fetch_sub(1, std::memory_order_release);
// in case task->task() throws exceptions
poll_guard_t guard(task_allocator, task);
task->task();
execute_task(task);
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/iris_dispatcher_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ template <typename element_t>
using worker_allocator_t = iris_object_allocator_t<element_t>;

void external_poll() {
static constexpr size_t thread_count = 2;
static constexpr size_t thread_count = 4;
static constexpr size_t warp_count = 8;

using worker_t = iris_async_worker_t<std::thread, std::function<void()>, worker_allocator_t>;
Expand Down

0 comments on commit f8402d6

Please sign in to comment.