1 | // Copyright (c) 2019 Alexander Medvednikov. All rights reserved. |
2 | // Use of this source code is governed by an MIT license |
3 | // that can be found in the LICENSE file. |
4 | module sync |
5 | |
6 | [trusted] |
7 | fn C.atomic_fetch_add_u32(voidptr, u32) u32 |
8 | |
9 | [trusted] |
10 | fn C.atomic_load_u32(voidptr) u32 |
11 | |
12 | [trusted] |
13 | fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool |
14 | |
15 | // WaitGroup |
16 | // Do not copy an instance of WaitGroup, use a ref instead. |
17 | // |
18 | // usage: in main thread: |
19 | // `wg := sync.new_waitgroup() |
20 | // `wg.add(nr_jobs)` before starting jobs with `go ...` |
21 | // `wg.wait()` to wait for all jobs to have finished |
22 | // |
23 | // in each parallel job: |
24 | // `wg.done()` when finished |
25 | // |
26 | // [init_with=new_waitgroup] // TODO: implement support for init_with struct attribute, and disallow WaitGroup{} from outside the sync.new_waitgroup() function. |
27 | [heap] |
28 | pub struct WaitGroup { |
29 | mut: |
30 | task_count u32 // current task count - reading/writing should be atomic |
31 | wait_count u32 // current wait count - reading/writing should be atomic |
32 | sem Semaphore // This blocks wait() until tast_countreleased by add() |
33 | } |
34 | |
35 | pub fn new_waitgroup() &WaitGroup { |
36 | mut wg := WaitGroup{} |
37 | wg.init() |
38 | return &wg |
39 | } |
40 | |
41 | pub fn (mut wg WaitGroup) init() { |
42 | wg.sem.init(0) |
43 | } |
44 | |
45 | // add increments (+ve delta) or decrements (-ve delta) task count by delta |
46 | // and unblocks any wait() calls if task count becomes zero. |
47 | // add panics if task count drops below zero. |
48 | pub fn (mut wg WaitGroup) add(delta int) { |
49 | old_nrjobs := int(C.atomic_fetch_add_u32(&wg.task_count, u32(delta))) |
50 | new_nrjobs := old_nrjobs + delta |
51 | mut num_waiters := C.atomic_load_u32(&wg.wait_count) |
52 | if new_nrjobs < 0 { |
53 | panic('Negative number of jobs in waitgroup') |
54 | } |
55 | |
56 | if new_nrjobs == 0 && num_waiters > 0 { |
57 | // clear waiters |
58 | for !C.atomic_compare_exchange_weak_u32(&wg.wait_count, &num_waiters, 0) { |
59 | if num_waiters == 0 { |
60 | return |
61 | } |
62 | } |
63 | for (num_waiters > 0) { |
64 | wg.sem.post() |
65 | num_waiters-- |
66 | } |
67 | } |
68 | } |
69 | |
70 | // done is a convenience fn for add(-1) |
71 | pub fn (mut wg WaitGroup) done() { |
72 | wg.add(-1) |
73 | } |
74 | |
75 | // wait blocks until all tasks are done (task count becomes zero) |
76 | pub fn (mut wg WaitGroup) wait() { |
77 | nrjobs := int(C.atomic_load_u32(&wg.task_count)) |
78 | if nrjobs == 0 { |
79 | // no need to wait |
80 | return |
81 | } |
82 | C.atomic_fetch_add_u32(&wg.wait_count, 1) |
83 | wg.sem.wait() // blocks until task_count becomes 0 |
84 | } |