aboutsummaryrefslogtreecommitdiff
path: root/v_windows/v/vlib/sync/waitgroup.v
diff options
context:
space:
mode:
Diffstat (limited to 'v_windows/v/vlib/sync/waitgroup.v')
-rw-r--r--v_windows/v/vlib/sync/waitgroup.v84
1 files changed, 84 insertions, 0 deletions
diff --git a/v_windows/v/vlib/sync/waitgroup.v b/v_windows/v/vlib/sync/waitgroup.v
new file mode 100644
index 0000000..3e9ac39
--- /dev/null
+++ b/v_windows/v/vlib/sync/waitgroup.v
@@ -0,0 +1,84 @@
+// Copyright (c) 2019 Alexander Medvednikov. All rights reserved.
+// Use of this source code is governed by an MIT license
+// that can be found in the LICENSE file.
+module sync
+
+[trusted]
+fn C.atomic_fetch_add_u32(voidptr, u32) u32
+
+[trusted]
+fn C.atomic_load_u32(voidptr) u32
+
+[trusted]
+fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool
+
+// WaitGroup
+// Do not copy an instance of WaitGroup, use a ref instead.
+//
+// usage: in main thread:
+// `wg := sync.new_waitgroup()
+// `wg.add(nr_jobs)` before starting jobs with `go ...`
+// `wg.wait()` to wait for all jobs to have finished
+//
+// in each parallel job:
+// `wg.done()` when finished
+//
+// [init_with=new_waitgroup] // TODO: implement support for init_with struct attribute, and disallow WaitGroup{} from outside the sync.new_waitgroup() function.
+[heap]
+struct WaitGroup {
+mut:
+ task_count u32 // current task count - reading/writing should be atomic
+ wait_count u32 // current wait count - reading/writing should be atomic
+ sem Semaphore // This blocks wait() until tast_countreleased by add()
+}
+
+pub fn new_waitgroup() &WaitGroup {
+ mut wg := WaitGroup{}
+ wg.init()
+ return &wg
+}
+
+pub fn (mut wg WaitGroup) init() {
+ wg.sem.init(0)
+}
+
+// add increments (+ve delta) or decrements (-ve delta) task count by delta
+// and unblocks any wait() calls if task count becomes zero.
+// add panics if task count drops below zero.
+pub fn (mut wg WaitGroup) add(delta int) {
+ old_nrjobs := int(C.atomic_fetch_add_u32(&wg.task_count, u32(delta)))
+ new_nrjobs := old_nrjobs + delta
+ mut num_waiters := C.atomic_load_u32(&wg.wait_count)
+ if new_nrjobs < 0 {
+ panic('Negative number of jobs in waitgroup')
+ }
+
+ if new_nrjobs == 0 && num_waiters > 0 {
+ // clear waiters
+ for !C.atomic_compare_exchange_weak_u32(&wg.wait_count, &num_waiters, 0) {
+ if num_waiters == 0 {
+ return
+ }
+ }
+ for (num_waiters > 0) {
+ wg.sem.post()
+ num_waiters--
+ }
+ }
+}
+
+// done is a convenience fn for add(-1)
+pub fn (mut wg WaitGroup) done() {
+ wg.add(-1)
+}
+
+// wait blocks until all tasks are done (task count becomes zero)
+pub fn (mut wg WaitGroup) wait() {
+ nrjobs := int(C.atomic_load_u32(&wg.task_count))
+ if nrjobs == 0 {
+ // no need to wait
+ return
+ }
+ C.atomic_fetch_add_u32(&wg.wait_count, 1)
+ wg.sem.wait() // blocks until task_count becomes 0
+}