From e98817e5cefdbfd554a0d35d89a60c3b0dec07ea Mon Sep 17 00:00:00 2001 From: Miccah Date: Tue, 27 Jul 2021 07:49:51 -0500 Subject: [PATCH] sync: only release semaphore in WaitGroup when there are waiters (#10967) --- vlib/sync/waitgroup.v | 28 ++++++++++++++++++++++++-- vlib/sync/waitgroup_test.v | 41 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 vlib/sync/waitgroup_test.v diff --git a/vlib/sync/waitgroup.v b/vlib/sync/waitgroup.v index ffa9af52c..3e9ac39b6 100644 --- a/vlib/sync/waitgroup.v +++ b/vlib/sync/waitgroup.v @@ -6,6 +6,12 @@ module sync [trusted] fn C.atomic_fetch_add_u32(voidptr, u32) u32 +[trusted] +fn C.atomic_load_u32(voidptr) u32 + +[trusted] +fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool + // WaitGroup // Do not copy an instance of WaitGroup, use a ref instead. // @@ -22,6 +28,7 @@ fn C.atomic_fetch_add_u32(voidptr, u32) u32 struct WaitGroup { mut: task_count u32 // current task count - reading/writing should be atomic + wait_count u32 // current wait count - reading/writing should be atomic sem Semaphore // This blocks wait() until tast_countreleased by add() } @@ -41,11 +48,22 @@ pub fn (mut wg WaitGroup) init() { pub fn (mut wg WaitGroup) add(delta int) { old_nrjobs := int(C.atomic_fetch_add_u32(&wg.task_count, u32(delta))) new_nrjobs := old_nrjobs + delta + mut num_waiters := C.atomic_load_u32(&wg.wait_count) if new_nrjobs < 0 { panic('Negative number of jobs in waitgroup') } - if new_nrjobs == 0 { - wg.sem.post() + + if new_nrjobs == 0 && num_waiters > 0 { + // clear waiters + for !C.atomic_compare_exchange_weak_u32(&wg.wait_count, &num_waiters, 0) { + if num_waiters == 0 { + return + } + } + for (num_waiters > 0) { + wg.sem.post() + num_waiters-- + } } } @@ -56,5 +74,11 @@ pub fn (mut wg WaitGroup) done() { // wait blocks until all tasks are done (task count becomes zero) pub fn (mut wg WaitGroup) wait() { + nrjobs := int(C.atomic_load_u32(&wg.task_count)) + if nrjobs == 0 { + // no need to wait + return + } + C.atomic_fetch_add_u32(&wg.wait_count, 1) wg.sem.wait() // blocks until task_count becomes 0 } diff --git a/vlib/sync/waitgroup_test.v b/vlib/sync/waitgroup_test.v new file mode 100644 index 000000000..493665f52 --- /dev/null +++ b/vlib/sync/waitgroup_test.v @@ -0,0 +1,41 @@ +module sync + +import time + +fn test_waitgroup_reuse() { + mut wg := new_waitgroup() + + wg.add(1) + wg.done() + + wg.add(1) + mut executed := false + go fn (mut wg WaitGroup, executed voidptr) { + defer { + wg.done() + } + unsafe { + *(&bool(executed)) = true + } + time.sleep(100 * time.millisecond) + assert wg.wait_count == 1 + }(mut wg, voidptr(&executed)) + + wg.wait() + assert executed + assert wg.wait_count == 0 +} + +fn test_waitgroup_no_use() { + mut done := false + go fn (done voidptr) { + time.sleep(1 * time.second) + if *(&bool(done)) == false { + panic('test_waitgroup_no_use did not complete in time') + } + }(voidptr(&done)) + + mut wg := new_waitgroup() + wg.wait() + done = true +} -- 2.30.2