Skip to content

Commit

Permalink
[FIX] Fix thread safety of parallel warp task.
Browse files Browse the repository at this point in the history
  • Loading branch information
paintdream committed Jun 10, 2024
1 parent 6291f8e commit 6f26fb6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/iris_coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ namespace iris {

constexpr void await_resume() const noexcept {}

void reset() {
void reset() noexcept {
signaled.store(0, std::memory_order_release);
}

Expand Down Expand Up @@ -773,7 +773,7 @@ namespace iris {
return !elements.empty();
}

void await_suspend(std::coroutine_handle<> handle) {
void await_suspend(std::coroutine_handle<> handle) noexcept {
auto guard = out_fence();

info_t info;
Expand Down
41 changes: 26 additions & 15 deletions src/iris_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ namespace iris {
}

// initialize with specified priority, all tasks that runs on this warp will be scheduled with this priority
explicit iris_warp_t(async_worker_t& worker, size_t prior = 0) : async_worker(worker), priority(prior), stack_next_warp(nullptr) {
explicit iris_warp_t(async_worker_t& worker, size_t prior = 0) : async_worker(worker), priority(prior), stack_next_warp(nullptr), parallel_task_resurrect_head(nullptr) {
init_storage<strand>(worker.get_thread_count());

thread_warp.store(nullptr, std::memory_order_relaxed);
Expand All @@ -272,13 +272,14 @@ namespace iris {
queueing.store(queue_state_idle, std::memory_order_release);
}

iris_warp_t(iris_warp_t&& rhs) noexcept : async_worker(rhs.async_worker), storage(std::move(rhs.storage)), priority(rhs.priority), stack_next_warp(rhs.stack_next_warp) {
iris_warp_t(iris_warp_t&& rhs) noexcept : async_worker(rhs.async_worker), storage(std::move(rhs.storage)), priority(rhs.priority), stack_next_warp(rhs.stack_next_warp), parallel_task_resurrect_head(rhs.parallel_task_resurrect_head) {
thread_warp.store(rhs.thread_warp.load(std::memory_order_relaxed), std::memory_order_relaxed);
parallel_task_head.store(rhs.parallel_task_head.load(std::memory_order_relaxed), std::memory_order_relaxed);
suspend_count.store(rhs.suspend_count.load(std::memory_order_relaxed), std::memory_order_relaxed);
queueing.store(rhs.queueing.load(std::memory_order_relaxed), std::memory_order_relaxed);

rhs.stack_next_warp = nullptr;
rhs.parallel_task_resurrect_head = nullptr;
rhs.thread_warp.store(nullptr, std::memory_order_relaxed);
rhs.parallel_task_head.store(nullptr, std::memory_order_relaxed);
rhs.suspend_count.store(0, std::memory_order_relaxed);
Expand All @@ -292,8 +293,8 @@ namespace iris {
// execute remaining tasks on destruction
while (!join<true, true>()) {}

IRIS_ASSERT(parallel_task_head.load(std::memory_order_acquire) == nullptr);
IRIS_ASSERT(storage.empty());
IRIS_ASSERT(!has_parallel_task());
}

// get stack warp pointer
Expand Down Expand Up @@ -421,7 +422,7 @@ namespace iris {
// do cleanup
bool empty = true;
for (iterator_t p = begin; p != end; ++p) {
empty = empty && (*p).empty() && (*p).parallel_task_head.load(std::memory_order_acquire) == nullptr;
empty = empty && (*p).empty() && !(*p).has_parallel_task();

while (true) {
preempt_guard_t preempt_guard(*p, ~size_t(0));
Expand Down Expand Up @@ -473,6 +474,10 @@ namespace iris {
}

protected:
bool has_parallel_task() const noexcept {
return parallel_task_head.load(std::memory_order_acquire) != nullptr || parallel_task_resurrect_head != nullptr;
}

// take execution atomically, returns true on success.
bool preempt() noexcept {
iris_warp_t** expected = nullptr;
Expand Down Expand Up @@ -573,7 +578,7 @@ namespace iris {
}

while (p != nullptr) {
storage.executing_head = p->next;
storage.executing_head = p->next; // mark next for exception safety
p->next = nullptr;
async_worker.execute_task(p);
execute_counter++;
Expand All @@ -582,6 +587,7 @@ namespace iris {
return;
}

// go over next task
p = storage.executing_head;
}
} while (execute_counter != 0);
Expand Down Expand Up @@ -659,26 +665,30 @@ namespace iris {
}
}

if (parallel_task_head.load(std::memory_order_acquire) != nullptr) {
// always try to execute parallel tasks
if (has_parallel_task()) {
preempt_guard_t preempt_guard(*this, ~size_t(0));
if (preempt_guard) {
execute_parallel();
}
}
}

// parallel tasks are not guaranteed to be queued sequentially as input order
void execute_parallel() {
while (parallel_task_head.load(std::memory_order_acquire) != nullptr) {
task_t* task = parallel_task_head.exchange(nullptr, std::memory_order_acquire);
if (task != nullptr) {
while (has_parallel_task()) {
task_t* p = parallel_task_resurrect_head;
if (p == nullptr) {
p = parallel_task_head.exchange(nullptr, std::memory_order_acquire);
}

while (p != nullptr) {
IRIS_ASSERT(is_suspended());
parallel_task_resurrect_head = p->next;
p->next = nullptr;
async_worker.queue_task(p, priority);

while (task != nullptr) {
task_t* p = task->next;
task->next = nullptr;
async_worker.queue_task(task, priority);
task = p;
}
p = parallel_task_resurrect_head;
}
}
}
Expand Down Expand Up @@ -735,6 +745,7 @@ namespace iris {
std::atomic<size_t> suspend_count; // current suspend count
std::atomic<size_t> queueing; // is flush request sent to async_worker? 0 : not yet, 1 : yes, 2 : is to flush right away.
std::atomic<task_t*> parallel_task_head; // linked-list for pending parallel tasks
task_t* parallel_task_resurrect_head;
typename std::conditional<strand, chain_storage_t, grid_storage_t>::type storage; // task storage
size_t priority;
iris_warp_t* stack_next_warp;
Expand Down

0 comments on commit 6f26fb6

Please sign in to comment.