Skip to content

Commit

Permalink
[MOD] Use strong-typed enum. Remove iris_update_version.
Browse files Browse the repository at this point in the history
  • Loading branch information
paintdream committed Jun 22, 2024
1 parent 96fd9e6 commit f95d0f2
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 88 deletions.
6 changes: 2 additions & 4 deletions src/iris_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ SOFTWARE.
namespace iris {
template <typename element_t, size_t storage_size = (sizeof(element_t*) * 4 - sizeof(size_t)) / sizeof(element_t)>
struct iris_buffer_t {
enum : size_t {
ext_store_mask = (size_t(1) << (sizeof(size_t) * 8 - 1)), // has external storage?
data_view_mask = (size_t(1) << (sizeof(size_t) * 8 - 2)) // is data view?
};
static constexpr size_t ext_store_mask = (size_t(1) << (sizeof(size_t) * 8 - 1)); // has external storage?
static constexpr size_t data_view_mask = (size_t(1) << (sizeof(size_t) * 8 - 2)); // is data view?

using iterator = element_t*;
using const_iterator = const element_t*;
Expand Down
29 changes: 6 additions & 23 deletions src/iris_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,19 +511,6 @@ namespace iris {
}
}

template <typename version_t, typename update_t>
bool iris_update_version(std::atomic<version_t>& version, version_t target, update_t&& update) {
static_assert(std::is_signed<version_t>::value, "Version type must be signed.");
version_t current = version.load(std::memory_order_acquire);
do {
if (target - current <= 0) {
return false; // already updated by a newer routine
}
} while (!version.compare_exchange_weak(current, target, std::memory_order_release));

return update([&version, target]() { return version.load(std::memory_order_acquire) == target; });
}

template <typename type_t, typename index_t>
void iris_union_set_init(type_t&& vec, index_t from, index_t to) {
while (from != to) {
Expand Down Expand Up @@ -663,13 +650,11 @@ namespace iris {
// k = element size, m = block size, r = max recycled block count, 0 for not limited, w = control block count
template <size_t k, size_t m = default_block_size, size_t r = 8, size_t s = default_page_size / m, size_t w = 8>
struct iris_allocator_t : protected enable_read_write_fence_t<> {
enum {
block_size = m,
item_count = m / k,
bits = 8 * sizeof(size_t),
bitmap_block_size = (item_count + bits - 1) / bits,
mask = bits - 1
};
static constexpr size_t block_size = m;
static constexpr size_t item_count = m / k;
static constexpr size_t bits = 8 * sizeof(size_t);
static constexpr size_t bitmap_block_size = (item_count + bits - 1) / bits;
static constexpr size_t mask = bits - 1;

struct control_block_t {
iris_allocator_t* allocator;
Expand All @@ -679,9 +664,7 @@ namespace iris {
std::atomic<size_t> bitmap[bitmap_block_size];
};

enum {
offset = (sizeof(control_block_t) + k - 1) / k
};
static constexpr size_t offset = (sizeof(control_block_t) + k - 1) / k;

iris_allocator_t() noexcept {
static_assert(item_count / 2 * k > sizeof(control_block_t), "item_count is too small");
Expand Down
36 changes: 18 additions & 18 deletions src/iris_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ SOFTWARE.
namespace iris {
namespace impl {
// for exception safe, roll back atomic operations as needed
enum guard_operation {
enum class guard_operation_t : size_t {
add, sub, invalidate
};

template <guard_operation operation>
template <guard_operation_t operation>
struct atomic_guard_t {
atomic_guard_t(std::atomic<size_t>& var) : variable(&var) {}
~atomic_guard_t() noexcept {
if (variable != nullptr) {
if /* constexpr */ (operation == add) {
if /* constexpr */ (operation == guard_operation_t::add) {
variable->fetch_add(1, std::memory_order_release);
} else if /* constexpr */ (operation == sub) {
} else if /* constexpr */ (operation == guard_operation_t::sub) {
variable->fetch_sub(1, std::memory_order_release);
} else {
variable->store(~size_t(0), std::memory_order_release);
Expand Down Expand Up @@ -152,9 +152,9 @@ namespace iris {
using task_t = typename async_worker_t::task_t;

static constexpr size_t block_size = iris_extract_block_size<function_t, allocator_t>::value;
static constexpr size_t queue_state_idle = 0u;
static constexpr size_t queue_state_pending = 1u;
static constexpr size_t queue_state_executing = 2u;
enum class queue_state_t : size_t {
idle, pending, executing
};

// moving capture is not supported until C++ 14
// so we wrap some functors here
Expand Down Expand Up @@ -268,7 +268,7 @@ namespace iris {
thread_warp.store(nullptr, std::memory_order_relaxed);
parallel_task_head.store(nullptr, std::memory_order_relaxed);
suspend_count.store(0, std::memory_order_relaxed);
queueing.store(queue_state_idle, std::memory_order_release);
queueing.store(queue_state_t::idle, std::memory_order_release);
}

iris_warp_t(iris_warp_t&& rhs) noexcept : async_worker(rhs.async_worker), storage(std::move(rhs.storage)), priority(rhs.priority), stack_next_warp(rhs.stack_next_warp), parallel_task_resurrect_head(rhs.parallel_task_resurrect_head) {
Expand All @@ -282,7 +282,7 @@ namespace iris {
rhs.thread_warp.store(nullptr, std::memory_order_relaxed);
rhs.parallel_task_head.store(nullptr, std::memory_order_relaxed);
rhs.suspend_count.store(0, std::memory_order_relaxed);
rhs.queueing.store(queue_state_idle, std::memory_order_release);
rhs.queueing.store(queue_state_t::idle, std::memory_order_release);
}

~iris_warp_t() noexcept {
Expand Down Expand Up @@ -326,7 +326,7 @@ namespace iris {
stack_next_warp = nullptr;
thread_warp.store(nullptr, std::memory_order_release);

if (queueing.exchange(queue_state_idle, std::memory_order_relaxed) == queue_state_pending) {
if (queueing.exchange(queue_state_t::idle, std::memory_order_relaxed) == queue_state_t::pending) {
flush();
}

Expand All @@ -349,7 +349,7 @@ namespace iris {

if (ret) {
// all suspend requests removed, try to flush me
queueing.store(queue_state_idle, std::memory_order_relaxed);
queueing.store(queue_state_t::idle, std::memory_order_relaxed);
flush();
}

Expand Down Expand Up @@ -555,7 +555,7 @@ namespace iris {
IRIS_PROFILE_SCOPE(__FUNCTION__);

// mark for queueing, avoiding flush me more than once.
queueing.store(queue_state_executing, std::memory_order_release);
queueing.store(queue_state_t::executing, std::memory_order_release);
iris_warp_t** warp_ptr = &get_current_warp_internal();
IRIS_ASSERT(*warp_ptr == this);

Expand Down Expand Up @@ -598,7 +598,7 @@ namespace iris {
IRIS_PROFILE_SCOPE(__FUNCTION__);

// mark for queueing, avoiding flush me more than once.
queueing.store(queue_state_executing, std::memory_order_release);
queueing.store(queue_state_t::executing, std::memory_order_release);
iris_warp_t** warp_ptr = &get_current_warp_internal();
IRIS_ASSERT(*warp_ptr == this);

Expand Down Expand Up @@ -660,7 +660,7 @@ namespace iris {
flush();
}
} else {
queueing.store(queue_state_pending, std::memory_order_relaxed);
queueing.store(queue_state_t::pending, std::memory_order_relaxed);
}
}
}
Expand Down Expand Up @@ -697,8 +697,8 @@ namespace iris {
void flush() noexcept(noexcept(std::declval<iris_warp_t>().async_worker.queue(std::declval<function_t>(), 0))) {
// if current state is executing, the executing routine will reinvoke flush() if it detected pending state while exiting
// so we just need to queue a flush routine as soon as current state is idle
if (queueing.load(std::memory_order_acquire) != queue_state_pending) {
if (queueing.exchange(queue_state_pending, std::memory_order_acq_rel) == queue_state_idle) {
if (queueing.load(std::memory_order_acquire) != queue_state_t::pending) {
if (queueing.exchange(queue_state_t::pending, std::memory_order_acq_rel) == queue_state_t::idle) {
async_worker.queue(execute_t(*this), priority);
}
}
Expand Down Expand Up @@ -743,7 +743,7 @@ namespace iris {
async_worker_t& async_worker; // host async worker
std::atomic<iris_warp_t**> thread_warp; // save the running thread warp address.
std::atomic<size_t> suspend_count; // current suspend count
std::atomic<size_t> queueing; // is flush request sent to async_worker? 0 : not yet, 1 : yes, 2 : is to flush right away.
std::atomic<queue_state_t> queueing; // is flush request sent to async_worker? 0 : not yet, 1 : yes, 2 : is to flush right away.
std::atomic<task_t*> parallel_task_head; // linked-list for pending parallel tasks
task_t* parallel_task_resurrect_head;
typename std::conditional<strand, chain_storage_t, grid_storage_t>::type storage; // task storage
Expand Down Expand Up @@ -857,7 +857,7 @@ namespace iris {

// dispatch a task
void dispatch(routine_t* routine) {
impl::atomic_guard_t<impl::guard_operation::add> guard(routine->lock_count);
impl::atomic_guard_t<impl::guard_operation_t::add> guard(routine->lock_count);
if (routine->lock_count.fetch_sub(1, std::memory_order_relaxed) == 1) {
if (routine->routine) {
// if not a warped routine, queue it to worker directly.
Expand Down
43 changes: 0 additions & 43 deletions test/iris_dispatcher_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ static void garbage_collection();
static void acquire_release();
static void graph_dispatch();
static void graph_dispatch_exception();
static void update_version();

int main(void) {
external_poll();
Expand All @@ -26,7 +25,6 @@ int main(void) {
acquire_release();
graph_dispatch();
graph_dispatch_exception();
update_version();

return 0;
}
Expand Down Expand Up @@ -596,44 +594,3 @@ void acquire_release() {
main_warp.join([] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); });
}

void update_version() {
static constexpr size_t thread_count = 8;
static constexpr int version_count = 12;
iris_async_worker_t<> worker(thread_count);
std::atomic<int> version;
std::mutex mutex;
version.store(0, std::memory_order_relaxed);
size_t final_version = 0;
std::atomic<int> success_count;
std::atomic<int> finish_count;
success_count.store(0, std::memory_order_relaxed);
finish_count.store(0, std::memory_order_relaxed);
worker.start();

for (int i = 0; i < version_count; i++) {
worker.queue([i, &version, &mutex, &final_version, &success_count, &finish_count, &worker]() {
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 50));
bool success = iris_update_version(version, i + 1, [&mutex, i, &final_version](auto&& cond) {
std::lock_guard<std::mutex> guard(mutex);
if (cond()) {
final_version = i + 1;
return true;
} else {
return false;
}
});

if (success) {
success_count.fetch_add(1, std::memory_order_relaxed);
}

if (finish_count.fetch_add(1, std::memory_order_relaxed) + 1 == version_count) {
worker.terminate();
}
});
}

worker.join();
IRIS_ASSERT(final_version == version_count);
printf("Success of update %d/%d\n", success_count.load(std::memory_order_acquire), version_count);
}

0 comments on commit f95d0f2

Please sign in to comment.