aboutsummaryrefslogtreecommitdiff
path: root/v_windows/v/vlib/sync
diff options
context:
space:
mode:
authorIndrajith K L2022-12-03 17:00:20 +0530
committerIndrajith K L2022-12-03 17:00:20 +0530
commitf5c4671bfbad96bf346bd7e9a21fc4317b4959df (patch)
tree2764fc62da58f2ba8da7ed341643fc359873142f /v_windows/v/vlib/sync
downloadcli-tools-windows-f5c4671bfbad96bf346bd7e9a21fc4317b4959df.tar.gz
cli-tools-windows-f5c4671bfbad96bf346bd7e9a21fc4317b4959df.tar.bz2
cli-tools-windows-f5c4671bfbad96bf346bd7e9a21fc4317b4959df.zip
Adds most of the toolsHEADmaster
Diffstat (limited to 'v_windows/v/vlib/sync')
-rw-r--r--v_windows/v/vlib/sync/array_rlock_test.v38
-rw-r--r--v_windows/v/vlib/sync/atomic2/atomic.v88
-rw-r--r--v_windows/v/vlib/sync/atomic2/atomic_test.v105
-rw-r--r--v_windows/v/vlib/sync/bench/channel_bench_go.go68
-rw-r--r--v_windows/v/vlib/sync/bench/channel_bench_v.v64
-rw-r--r--v_windows/v/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v150
-rw-r--r--v_windows/v/vlib/sync/bench/results.md48
-rw-r--r--v_windows/v/vlib/sync/channel_1_test.v25
-rw-r--r--v_windows/v/vlib/sync/channel_2_test.v19
-rw-r--r--v_windows/v/vlib/sync/channel_3_test.v32
-rw-r--r--v_windows/v/vlib/sync/channel_4_test.v32
-rw-r--r--v_windows/v/vlib/sync/channel_array_mut_test.v35
-rw-r--r--v_windows/v/vlib/sync/channel_close_test.v104
-rw-r--r--v_windows/v/vlib/sync/channel_fill_test.v22
-rw-r--r--v_windows/v/vlib/sync/channel_opt_propagate_test.v39
-rw-r--r--v_windows/v/vlib/sync/channel_polling_test.v56
-rw-r--r--v_windows/v/vlib/sync/channel_push_or_1_test.v65
-rw-r--r--v_windows/v/vlib/sync/channel_push_or_2_test.v25
-rw-r--r--v_windows/v/vlib/sync/channel_select_2_test.v62
-rw-r--r--v_windows/v/vlib/sync/channel_select_3_test.v122
-rw-r--r--v_windows/v/vlib/sync/channel_select_4_test.v43
-rw-r--r--v_windows/v/vlib/sync/channel_select_5_test.v61
-rw-r--r--v_windows/v/vlib/sync/channel_select_6_test.v75
-rw-r--r--v_windows/v/vlib/sync/channel_select_test.v84
-rw-r--r--v_windows/v/vlib/sync/channel_try_buf_test.v17
-rw-r--r--v_windows/v/vlib/sync/channel_try_unbuf_test.v13
-rw-r--r--v_windows/v/vlib/sync/channels.v730
-rw-r--r--v_windows/v/vlib/sync/pool/README.md36
-rw-r--r--v_windows/v/vlib/sync/pool/pool.v165
-rw-r--r--v_windows/v/vlib/sync/pool/pool_test.v52
-rw-r--r--v_windows/v/vlib/sync/select_close_test.v92
-rw-r--r--v_windows/v/vlib/sync/struct_chan_init_test.v14
-rw-r--r--v_windows/v/vlib/sync/sync_default.c.v193
-rw-r--r--v_windows/v/vlib/sync/sync_macos.c.v232
-rw-r--r--v_windows/v/vlib/sync/sync_windows.c.v212
-rw-r--r--v_windows/v/vlib/sync/threads/threads.c.v13
-rw-r--r--v_windows/v/vlib/sync/threads/threads.v4
-rw-r--r--v_windows/v/vlib/sync/waitgroup.v84
-rw-r--r--v_windows/v/vlib/sync/waitgroup_test.v41
39 files changed, 3360 insertions, 0 deletions
diff --git a/v_windows/v/vlib/sync/array_rlock_test.v b/v_windows/v/vlib/sync/array_rlock_test.v
new file mode 100644
index 0000000..ad4f778
--- /dev/null
+++ b/v_windows/v/vlib/sync/array_rlock_test.v
@@ -0,0 +1,38 @@
+fn test_shared_modification() {
+ shared foo := &[2, 0, 5]
+ lock foo {
+ unsafe {
+ foo[1] = 3
+ foo[0] *= 7
+ foo[1]--
+ foo[2] -= 2
+ }
+ }
+ rlock foo {
+ unsafe {
+ assert foo[0] == 14
+ assert foo[1] == 2
+ assert foo[2] == 3
+ }
+ }
+}
+
+[direct_array_access]
+fn test_shared_direct_modification() {
+ shared foo := &[2, 0, 5]
+ lock foo {
+ unsafe {
+ foo[1] = 3
+ foo[0] *= 7
+ foo[1]--
+ foo[2] -= 2
+ }
+ }
+ rlock foo {
+ unsafe {
+ assert foo[0] == 14
+ assert foo[1] == 2
+ assert foo[2] == 3
+ }
+ }
+}
diff --git a/v_windows/v/vlib/sync/atomic2/atomic.v b/v_windows/v/vlib/sync/atomic2/atomic.v
new file mode 100644
index 0000000..2ff64f2
--- /dev/null
+++ b/v_windows/v/vlib/sync/atomic2/atomic.v
@@ -0,0 +1,88 @@
+module atomic2
+
+/*
+Implements the atomic operations. For now TCC does not support
+the atomic versions on nix so it uses locks to simulate the same behavor.
+On windows tcc can simulate with other atomic operations.
+
+The @VEXEROOT/thirdparty/stdatomic contains compability header files
+for stdatomic that supports both nix, windows and c++.
+
+This implementations should be regarded as alpha stage and be
+further tested.
+*/
+
+#flag windows -I @VEXEROOT/thirdparty/stdatomic/win
+#flag linux -I @VEXEROOT/thirdparty/stdatomic/nix
+#flag darwin -I @VEXEROOT/thirdparty/stdatomic/nix
+#flag freebsd -I @VEXEROOT/thirdparty/stdatomic/nix
+#flag solaris -I @VEXEROOT/thirdparty/stdatomic/nix
+
+$if linux {
+ $if tinyc {
+ $if amd64 {
+ // most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/6
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/7
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/8
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/9
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/10
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/11
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/12
+ } $else $if arm64 {
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/6
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/7
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/8
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/9
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/10
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/11
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/12
+ }
+ #flag -latomic
+ }
+}
+#include <atomic.h>
+// add_u64 adds provided delta as an atomic operation
+pub fn add_u64(ptr &u64, delta int) bool {
+ res := C.atomic_fetch_add_u64(voidptr(ptr), delta)
+ return res == 0
+}
+
+// sub_u64 subtracts provided delta as an atomic operation
+pub fn sub_u64(ptr &u64, delta int) bool {
+ res := C.atomic_fetch_sub_u64(voidptr(ptr), delta)
+ return res == 0
+}
+
+// add_i64 adds provided delta as an atomic operation
+pub fn add_i64(ptr &i64, delta int) bool {
+ res := C.atomic_fetch_add_u64(voidptr(ptr), delta)
+ return res == 0
+}
+
+// add_i64 subtracts provided delta as an atomic operation
+pub fn sub_i64(ptr &i64, delta int) bool {
+ res := C.atomic_fetch_sub_u64(voidptr(ptr), delta)
+ return res == 0
+}
+
+// atomic store/load operations have to be used when there might be another concurrent access
+// atomicall set a value
+pub fn store_u64(ptr &u64, val u64) {
+ C.atomic_store_u64(voidptr(ptr), val)
+}
+
+// atomicall get a value
+pub fn load_u64(ptr &u64) u64 {
+ return C.atomic_load_u64(voidptr(ptr))
+}
+
+// atomicall set a value
+pub fn store_i64(ptr &i64, val i64) {
+ C.atomic_store_u64(voidptr(ptr), val)
+}
+
+// atomicall get a value
+pub fn load_i64(ptr &i64) i64 {
+ return i64(C.atomic_load_u64(voidptr(ptr)))
+}
diff --git a/v_windows/v/vlib/sync/atomic2/atomic_test.v b/v_windows/v/vlib/sync/atomic2/atomic_test.v
new file mode 100644
index 0000000..7a5ffd8
--- /dev/null
+++ b/v_windows/v/vlib/sync/atomic2/atomic_test.v
@@ -0,0 +1,105 @@
+import sync.atomic2
+import sync
+
+const (
+ iterations_per_cycle = 100_000
+)
+
+struct Counter {
+mut:
+ counter u64
+}
+
+// without proper syncronization this would fail
+fn test_count_10_times_1_cycle_should_result_10_cycles_with_sync() {
+ desired_iterations := 10 * iterations_per_cycle
+ mut wg := sync.new_waitgroup()
+ mut counter := &Counter{}
+ wg.add(10)
+ for i := 0; i < 10; i++ {
+ go count_one_cycle(mut counter, mut wg)
+ }
+ wg.wait()
+ assert counter.counter == desired_iterations
+ eprintln(' with synchronization the counter is: ${counter.counter:10} , expectedly == ${desired_iterations:10}')
+}
+
+// This test just to make sure that we have an anti-test to prove it works
+fn test_count_10_times_1_cycle_should_not_be_10_cycles_without_sync() {
+ desired_iterations := 10 * iterations_per_cycle
+ mut wg := sync.new_waitgroup()
+ mut counter := &Counter{}
+ wg.add(10)
+ for i := 0; i < 10; i++ {
+ go count_one_cycle_without_sync(mut counter, mut wg)
+ }
+ wg.wait()
+ // NB: we do not assert here, just print, because sometimes by chance counter.counter may be == desired_iterations
+ eprintln('without synchronization the counter is: ${counter.counter:10} , expectedly != ${desired_iterations:10}')
+}
+
+fn test_count_plus_one_u64() {
+ mut c := u64(0)
+ atomic2.add_u64(&c, 1)
+ assert atomic2.load_u64(&c) == 1
+}
+
+fn test_count_plus_one_i64() {
+ mut c := i64(0)
+ atomic2.add_i64(&c, 1)
+ assert atomic2.load_i64(&c) == 1
+}
+
+fn test_count_plus_greater_than_one_u64() {
+ mut c := u64(0)
+ atomic2.add_u64(&c, 10)
+ assert atomic2.load_u64(&c) == 10
+}
+
+fn test_count_plus_greater_than_one_i64() {
+ mut c := i64(0)
+ atomic2.add_i64(&c, 10)
+ assert atomic2.load_i64(&c) == 10
+}
+
+fn test_count_minus_one_u64() {
+ mut c := u64(1)
+ atomic2.sub_u64(&c, 1)
+ assert atomic2.load_u64(&c) == 0
+}
+
+fn test_count_minus_one_i64() {
+ mut c := i64(0)
+ atomic2.sub_i64(&c, 1)
+ assert atomic2.load_i64(&c) == -1
+}
+
+fn test_count_minus_greater_than_one_u64() {
+ mut c := u64(0)
+ atomic2.store_u64(&c, 10)
+ atomic2.sub_u64(&c, 10)
+ assert atomic2.load_u64(&c) == 0
+}
+
+fn test_count_minus_greater_than_one_i64() {
+ mut c := i64(0)
+ atomic2.store_i64(&c, 10)
+ atomic2.sub_i64(&c, 20)
+ assert atomic2.load_i64(&c) == -10
+}
+
+// count_one_cycle counts the common counter iterations_per_cycle times in thread-safe way
+fn count_one_cycle(mut counter Counter, mut group sync.WaitGroup) {
+ for i := 0; i < iterations_per_cycle; i++ {
+ atomic2.add_u64(&counter.counter, 1)
+ }
+ group.done()
+}
+
+// count_one_cycle_without_sync counts the common counter iterations_per_cycle times in none thread-safe way
+fn count_one_cycle_without_sync(mut counter Counter, mut group sync.WaitGroup) {
+ for i := 0; i < iterations_per_cycle; i++ {
+ counter.counter++
+ }
+ group.done()
+}
diff --git a/v_windows/v/vlib/sync/bench/channel_bench_go.go b/v_windows/v/vlib/sync/bench/channel_bench_go.go
new file mode 100644
index 0000000..a0afbbc
--- /dev/null
+++ b/v_windows/v/vlib/sync/bench/channel_bench_go.go
@@ -0,0 +1,68 @@
+package main
+
+import "fmt"
+import "log"
+import "os"
+import "time"
+import "strconv"
+
+func assert_eq(a, b int64) {
+ if a != b {
+ log.Fatalf("assertion failed\nleft: %d, right: %d\n", a, b)
+ }
+}
+
+func do_rec(ch chan int32, resch chan int64, n int32) {
+ var sum int64
+ var i int32
+ for i = 0; i < n; i++ {
+ sum += int64(<- ch)
+ }
+ fmt.Println(sum)
+ resch <- sum
+}
+
+func do_send(ch chan int32, start, end int32) {
+ for i := start; i < end; i++ {
+ ch <- i
+ }
+}
+
+func main() {
+ if len(os.Args) != 5 {
+ log.Fatalf("usage:\n\t%s <nsend> <nrec> <buflen> <nobj>\n", os.Args[0])
+ }
+ nsend, _ := strconv.Atoi(os.Args[1])
+ nrec, _ := strconv.Atoi(os.Args[2])
+ buflen, _ := strconv.Atoi(os.Args[3])
+ nobj, _ := strconv.Atoi(os.Args[4])
+ stopwatch := time.Now()
+ ch := make(chan int32, buflen)
+ resch := make(chan int64, 0)
+ no := nobj
+ for i := 0; i < nrec; i++ {
+ n := no / (nrec - i)
+ go do_rec(ch, resch, int32(n))
+ no -= n
+ }
+ assert_eq(int64(no), 0)
+ no = nobj
+ for i := 0; i < nsend; i++ {
+ n := no / (nsend - i)
+ end := no
+ no -= n
+ go do_send(ch, int32(no), int32(end))
+ }
+ assert_eq(int64(no), 0)
+ var sum int64
+ for i := 0; i < nrec; i++ {
+ sum += <-resch
+ }
+ elapsed := time.Now().Sub(stopwatch)
+ rate := float64(nobj)/float64(elapsed.Nanoseconds())*1000.0
+ duration := 1.0e-09 * float64(elapsed.Nanoseconds())
+ fmt.Printf("%d objects in %g s (%.2f objs/µs)\n", nobj, duration, rate)
+ expected_sum := int64(nobj)*int64(nobj-1)/2
+ fmt.Printf("got: %d, expected: %d\n", sum, expected_sum)
+ assert_eq(sum, expected_sum)
+}
diff --git a/v_windows/v/vlib/sync/bench/channel_bench_v.v b/v_windows/v/vlib/sync/bench/channel_bench_v.v
new file mode 100644
index 0000000..54dcfe9
--- /dev/null
+++ b/v_windows/v/vlib/sync/bench/channel_bench_v.v
@@ -0,0 +1,64 @@
+// Channel Benchmark
+//
+// `nobj` integers are sent thru a channel with queue length`buflen`
+// using `nsend` sender threads and `nrec` receiver threads.
+//
+// The receive threads add all received numbers and send them to the
+// main thread where the total sum is compare to the expected value.
+import time
+import os
+
+fn do_rec(ch chan int, resch chan i64, n int) {
+ mut sum := i64(0)
+ for _ in 0 .. n {
+ sum += <-ch
+ }
+ println(sum)
+ resch <- sum
+}
+
+fn do_send(ch chan int, start int, end int) {
+ for i in start .. end {
+ ch <- i
+ }
+}
+
+fn main() {
+ if os.args.len != 5 {
+ eprintln('usage:\n\t${os.args[0]} <nsend> <nrec> <buflen> <nobj>')
+ exit(1)
+ }
+ nsend := os.args[1].int()
+ nrec := os.args[2].int()
+ buflen := os.args[3].int()
+ nobj := os.args[4].int()
+ stopwatch := time.new_stopwatch()
+ ch := chan int{cap: buflen}
+ resch := chan i64{}
+ mut no := nobj
+ for i in 0 .. nrec {
+ n := no / (nrec - i)
+ go do_rec(ch, resch, n)
+ no -= n
+ }
+ assert no == 0
+ no = nobj
+ for i in 0 .. nsend {
+ n := no / (nsend - i)
+ end := no
+ no -= n
+ go do_send(ch, no, end)
+ }
+ assert no == 0
+ mut sum := i64(0)
+ for _ in 0 .. nrec {
+ sum += <-resch
+ }
+ elapsed := stopwatch.elapsed()
+ rate := f64(nobj) / elapsed * time.microsecond
+ println('$nobj objects in ${f64(elapsed) / time.second} s (${rate:.2f} objs/µs)')
+ // use sum formula by Gauß to calculate the expected result
+ expected_sum := i64(nobj) * (nobj - 1) / 2
+ println('got: $sum, expected: $expected_sum')
+ assert sum == expected_sum
+}
diff --git a/v_windows/v/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v b/v_windows/v/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v
new file mode 100644
index 0000000..999bb1d
--- /dev/null
+++ b/v_windows/v/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v
@@ -0,0 +1,150 @@
+import os
+import os.cmdline
+import time
+import sync
+
+// Usage:
+// many_writers_and_receivers_on_1_channel [-readers 1] [-writers 4] [-chan_cap 100] [-iterations 25000] > results.csv
+//
+// You can then open results.csv in Excel/Calc and for example plot the first vs the second column.
+enum EventKind {
+ push
+ pop
+}
+
+struct Event {
+ is_set bool
+ id int
+ gtime u64 // nanoseconds
+ i int
+ kind EventKind
+ elapsed i64 // nanoseconds, elapsed after the previous event of the same kind
+}
+
+struct Context {
+mut:
+ n_iters int
+ n_readers int
+ n_writers int
+ //
+ pops_wg &sync.WaitGroup
+ pops []Event
+ //
+ pushes_wg &sync.WaitGroup
+ pushes []Event
+}
+
+fn do_rec(ch chan int, id int, mut ctx Context) {
+ eprintln('start of do_rec id: $id')
+ mut timer_sw_x := time.new_stopwatch()
+ mut tmp := int(0)
+ mut i := int(0)
+ // NB: a single receiver thread can get slightly more
+ // than its fair share of sends, that is why
+ // the receiver's Event array is much larger,
+ // enough so a single receiver can potentially process all
+ // writers pushes, and it is partitioned over all of
+ // id, ctx.n_writers and n_iters:
+ n_iters := ctx.n_iters
+ base := id * n_iters * ctx.n_writers
+ for {
+ for ch.try_pop(tmp) == .success {
+ ctx.pops[base + i] = Event{
+ is_set: true
+ id: id
+ gtime: time.sys_mono_now()
+ i: i
+ kind: .pop
+ elapsed: timer_sw_x.elapsed().nanoseconds()
+ }
+ timer_sw_x.restart()
+ i++
+ if tmp == 1 {
+ ctx.pops_wg.done()
+ return
+ }
+ }
+ }
+}
+
+fn do_send(ch chan int, id int, mut ctx Context) {
+ eprintln('start of do_send id: $id')
+ mut timer_sw_x := time.new_stopwatch()
+ n_iters := ctx.n_iters
+ base := n_iters * id // sender events can not overlap
+ for i := 0; i < n_iters; i++ {
+ idx := base + i
+ ctx.pushes[idx] = Event{
+ is_set: true
+ id: id
+ gtime: time.sys_mono_now()
+ i: i
+ kind: .push
+ elapsed: timer_sw_x.elapsed().nanoseconds()
+ }
+ timer_sw_x.restart()
+ tmp := int(0)
+ ch <- tmp
+ }
+ ctx.pushes_wg.done()
+}
+
+fn main() {
+ //
+ args := os.args[1..]
+ if '-h' in args || '--help' in args {
+ eprintln('Usage:\n many_writers_and_receivers_on_1_channel [-readers 1] [-writers 4] [-chan_cap 100] [-iterations 25000]')
+ exit(0)
+ }
+ n_iters := cmdline.option(args, '-iterations', '25000').int()
+ n_readers := cmdline.option(args, '-readers', '1').int()
+ n_writers := cmdline.option(args, '-writers', '4').int()
+ chan_cap := cmdline.option(args, '-chan_cap', '100').int()
+ eprintln('> n_iters, $n_iters, n_writers, $n_writers, n_readers, $n_readers, chan_cap, $chan_cap')
+ //
+ ch := chan int{cap: chan_cap}
+ max_number_of_pushes := n_writers * (n_iters + 2)
+ max_number_of_pops := max_number_of_pushes * n_readers
+ eprintln('> max_number_of_pushes, $max_number_of_pushes, max_number_of_pops (per receiver), $max_number_of_pops')
+ mut ctx := &Context{
+ n_iters: n_iters
+ n_readers: n_readers
+ n_writers: n_writers
+ pushes_wg: sync.new_waitgroup()
+ pops_wg: sync.new_waitgroup()
+ pushes: []Event{len: max_number_of_pushes}
+ pops: []Event{len: max_number_of_pops}
+ }
+ ctx.pops_wg.add(n_readers)
+ for i := 0; i < n_readers; i++ {
+ go do_rec(ch, i, mut ctx)
+ }
+ ctx.pushes_wg.add(n_writers)
+ for i := 0; i < n_writers; i++ {
+ go do_send(ch, i, mut ctx)
+ }
+ ctx.pushes_wg.wait()
+ eprintln('>> all pushes done')
+ for i := 0; i < n_readers; i++ {
+ ch <- 1
+ }
+ ctx.pops_wg.wait()
+ eprintln('>> all pops done')
+ mut all_events := []Event{}
+ all_events << ctx.pops
+ all_events << ctx.pushes
+ all_events.sort(a.elapsed < b.elapsed)
+ mut i := 0
+ for e in all_events {
+ if !e.is_set {
+ continue
+ }
+ i++
+ if e.kind == .pop {
+ println('${i:8} , ${e.elapsed:10}, ns , do_rec id:, ${e.id:3} , i=, ${e.i:5} , ${e.gtime:20}')
+ }
+ if e.kind == .push {
+ println('${i:8} , ${e.elapsed:10}, ns , do_send id:, ${e.id:3} , i=, ${e.i:5} , ${e.gtime:20}')
+ }
+ }
+}
diff --git a/v_windows/v/vlib/sync/bench/results.md b/v_windows/v/vlib/sync/bench/results.md
new file mode 100644
index 0000000..882471c
--- /dev/null
+++ b/v_windows/v/vlib/sync/bench/results.md
@@ -0,0 +1,48 @@
+# Channel Benchmark Results
+
+This documents lists several benchmark results for different platforms in order to
+identify performance regressions and improvements.
+
+The are measured using the command
+
+```
+> channel_bench_* <nsend> <nrec> <buflen> <nobj>
+
+nsend ... number of threads that push objects into the channel
+nrec .... number of threads that pop objects from the channel
+buflen .. length of channel buffer queue - `0` means unbuffered channel
+nobj .... number of objects to pass thru the channel
+```
+
+## AMD Ryzen 7 3800X, Ubuntu-20.04 x86_64
+
+10000000 Objects transfered, results in Objects/µs
+
+| nsend | nrec | buflen | **V (gcc -O2)** | **V (clang)** | **V (tcc)** | **Go (golang)** | **Go (gccgo -O2)** |
+| :---: | :---:| :---: | :---: | :---: | :---: | :---: | :---: |
+| 1 | 1 | 0 | 1.97 | 1.63 | 2.08 | 4.65 | 0.56 |
+| 1 | 1 | 100 | 3.05 | 2.29 | 1.93 | 18.90 | 6.08 |
+| 4 | 4 | 0 | 0.87 | 0.90 | 0.99 | 1.84 | 0.84 |
+| 4 | 4 | 100 | 3.35 | 3.07 | 2.92 | 7.43 | 3.71 |
+
+## AMD Ryzen 7 3800X, Windows 10 2004 x64
+
+| nsend | nrec | buflen | **V (gcc -O2)** | **V (msvc /O2)** | **V (tcc)** | **Go (golang)** |
+| :---: | :---:| :---: | :---: | :---: | :---: | :---: |
+| 1 | 1 | 0 | 2.30 | 3.76 | 2.02 | 4.67 |
+| 1 | 1 | 100 | 2.96 | 3.12 | 2.26 | 23.31 |
+| 4 | 4 | 0 | 0.90 | 1.05 | 0.83 | 1.38 |
+| 4 | 4 | 100 | 2.28 | 2.16 | 2.43 | 4.63 |
+
+## Raspberry Pi 3B+, Void Linux musl 32 bit
+
+10000000 Objects transfered, results in Objects/µs
+
+| nsend | nrec | buflen | **V (gcc -O2)** | **Go (golang)** |
+| :---: | :---:| :---: | :---: | :---: |
+| 1 | 1 | 0 | 0.37 | 0.21 |
+| 1 | 1 | 100 | 1.03 | 0.74 |
+| 4 | 4 | 0 | 0.04 | 0.38 |
+| 4 | 4 | 100 | 2.78 | 2.63 |
+| 2 | 2 | 0 | 0.05 | 0.38 |
+| 2 | 2 | 100 | 1.26 | 0.75 |
diff --git a/v_windows/v/vlib/sync/channel_1_test.v b/v_windows/v/vlib/sync/channel_1_test.v
new file mode 100644
index 0000000..17588fd
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_1_test.v
@@ -0,0 +1,25 @@
+const (
+ num_iterations = 10000
+)
+
+fn do_send(ch chan int) {
+ for i in 0 .. num_iterations {
+ ch <- i
+ }
+}
+
+fn test_channel_buffered() {
+ ch := chan int{cap: 1000}
+ go do_send(ch)
+ mut sum := i64(0)
+ for _ in 0 .. num_iterations {
+ sum += <-ch
+ }
+ assert sum == u64(num_iterations) * (num_iterations - 1) / 2
+}
+
+fn test_builtin_enum() {
+ x := ChanState.closed
+ assert x == .closed
+ println(x)
+}
diff --git a/v_windows/v/vlib/sync/channel_2_test.v b/v_windows/v/vlib/sync/channel_2_test.v
new file mode 100644
index 0000000..5e8251d
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_2_test.v
@@ -0,0 +1,19 @@
+const (
+ num_iterations = 10000
+)
+
+fn do_send(ch chan int) {
+ for i in 0 .. num_iterations {
+ ch <- i
+ }
+}
+
+fn test_channel_unbuffered() {
+ ch := chan int{}
+ go do_send(ch)
+ mut sum := i64(0)
+ for _ in 0 .. num_iterations {
+ sum += <-ch
+ }
+ assert sum == u64(num_iterations) * (num_iterations - 1) / 2
+}
diff --git a/v_windows/v/vlib/sync/channel_3_test.v b/v_windows/v/vlib/sync/channel_3_test.v
new file mode 100644
index 0000000..d07276b
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_3_test.v
@@ -0,0 +1,32 @@
+fn do_rec(ch chan int, resch chan i64) {
+ mut sum := i64(0)
+ for _ in 0 .. 2000 {
+ sum += <-ch
+ }
+ println(sum)
+ resch <- sum
+}
+
+fn do_send(ch chan int) {
+ for i in 0 .. 2000 {
+ ch <- i
+ }
+}
+
+fn test_channel_multi_unbuffered() {
+ ch := chan int{}
+ resch := chan i64{}
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_send(ch)
+ go do_send(ch)
+ go do_send(ch)
+ go do_send(ch)
+ mut sum := i64(0)
+ for _ in 0 .. 4 {
+ sum += <-resch
+ }
+ assert sum == i64(4) * 2000 * (2000 - 1) / 2
+}
diff --git a/v_windows/v/vlib/sync/channel_4_test.v b/v_windows/v/vlib/sync/channel_4_test.v
new file mode 100644
index 0000000..3792668
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_4_test.v
@@ -0,0 +1,32 @@
+fn do_rec(ch chan int, resch chan i64) {
+ mut sum := i64(0)
+ for _ in 0 .. 2000 {
+ sum += <-ch
+ }
+ println(sum)
+ resch <- sum
+}
+
+fn do_send(ch chan int) {
+ for i in 0 .. 2000 {
+ ch <- i
+ }
+}
+
+fn test_channel_multi_buffered() {
+ ch := chan int{cap: 100}
+ resch := chan i64{}
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_send(ch)
+ go do_send(ch)
+ go do_send(ch)
+ go do_send(ch)
+ mut sum := i64(0)
+ for _ in 0 .. 4 {
+ sum += <-resch
+ }
+ assert sum == i64(4) * 2000 * (2000 - 1) / 2
+}
diff --git a/v_windows/v/vlib/sync/channel_array_mut_test.v b/v_windows/v/vlib/sync/channel_array_mut_test.v
new file mode 100644
index 0000000..bfd53a1
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_array_mut_test.v
@@ -0,0 +1,35 @@
+const (
+ num_iterations = 10000
+)
+
+struct St {
+mut:
+ dummy i64
+ dummy2 u32
+ dummy3 i64
+ n int
+ dummy4 int
+}
+
+// this function gets an array of channels for `St` references
+fn do_rec_calc_send(chs []chan mut St) {
+ for {
+ mut s := <-chs[0] or { break }
+ s.n++
+ chs[1] <- s
+ }
+}
+
+fn test_channel_array_mut() {
+ mut chs := [chan mut St{cap: 1}, chan mut St{}]
+ go do_rec_calc_send(chs)
+ mut t := &St{
+ n: 100
+ }
+ for _ in 0 .. num_iterations {
+ chs[0] <- t
+ t = <-chs[1]
+ }
+ chs[0].close()
+ assert t.n == 100 + num_iterations
+}
diff --git a/v_windows/v/vlib/sync/channel_close_test.v b/v_windows/v/vlib/sync/channel_close_test.v
new file mode 100644
index 0000000..a31bfa9
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_close_test.v
@@ -0,0 +1,104 @@
+import time
+
+fn do_rec(ch chan int, resch chan i64) {
+ mut sum := i64(0)
+ for {
+ a := <-ch or { break }
+ sum += a
+ }
+ assert ch.closed == true
+ println(sum)
+ resch <- sum
+}
+
+fn do_send(ch chan int) {
+ for i in 0 .. 8000 {
+ ch <- i
+ }
+ assert ch.closed == false
+ ch.close()
+ assert ch.closed == true
+}
+
+fn test_channel_close_buffered_multi() {
+ ch := chan int{cap: 10}
+ resch := chan i64{}
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_send(ch)
+ mut sum := i64(0)
+ for _ in 0 .. 4 {
+ sum += <-resch
+ }
+ assert sum == i64(8000) * (8000 - 1) / 2
+}
+
+fn test_channel_close_unbuffered_multi() {
+ ch := chan int{}
+ resch := chan i64{}
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_rec(ch, resch)
+ go do_send(ch)
+ mut sum := i64(0)
+ for _ in 0 .. 4 {
+ sum += <-resch
+ }
+ assert sum == i64(8000) * (8000 - 1) / 2
+}
+
+fn test_channel_close_buffered() {
+ ch := chan int{cap: 100}
+ resch := chan i64{}
+ go do_rec(ch, resch)
+ go do_send(ch)
+ mut sum := i64(0)
+ sum += <-resch
+ assert sum == i64(8000) * (8000 - 1) / 2
+}
+
+fn test_channel_close_unbuffered() {
+ ch := chan int{}
+ resch := chan i64{cap: 100}
+ go do_rec(ch, resch)
+ go do_send(ch)
+ mut sum := i64(0)
+ sum += <-resch
+ assert sum == i64(8000) * (8000 - 1) / 2
+}
+
+fn test_channel_send_close_buffered() {
+ ch := chan int{cap: 1}
+ t := go fn (ch chan int) {
+ ch <- 31
+ mut x := 45
+ ch <- 17 or { x = -133 }
+
+ assert x == -133
+ }(ch)
+ time.sleep(100 * time.millisecond)
+ ch.close()
+ mut r := <-ch
+ r = <-ch or { 23 }
+ assert r == 23
+ t.wait()
+}
+
+fn test_channel_send_close_unbuffered() {
+ time.sleep(1 * time.second)
+ ch := chan int{}
+ t := go fn (ch chan int) {
+ mut x := 31
+ ch <- 177 or { x = -71 }
+
+ assert x == -71
+ }(ch)
+ time.sleep(100 * time.millisecond)
+ ch.close()
+ r := <-ch or { 238 }
+ assert r == 238
+ t.wait()
+}
diff --git a/v_windows/v/vlib/sync/channel_fill_test.v b/v_windows/v/vlib/sync/channel_fill_test.v
new file mode 100644
index 0000000..b4eabc0
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_fill_test.v
@@ -0,0 +1,22 @@
+import sync
+
+const (
+ queue_len = 1000
+ queue_fill = 763
+)
+
+fn do_send(ch chan int, mut fin sync.Semaphore) {
+ for i in 0 .. queue_fill {
+ ch <- i
+ }
+ fin.post()
+}
+
+fn test_channel_len_cap() {
+ ch := chan int{cap: queue_len}
+ mut sem := sync.new_semaphore()
+ go do_send(ch, mut sem)
+ sem.wait()
+ assert ch.cap == queue_len
+ assert ch.len == queue_fill
+}
diff --git a/v_windows/v/vlib/sync/channel_opt_propagate_test.v b/v_windows/v/vlib/sync/channel_opt_propagate_test.v
new file mode 100644
index 0000000..a7e26fe
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_opt_propagate_test.v
@@ -0,0 +1,39 @@
+import sync
+
+const (
+ num_iterations = 10000
+)
+
+fn get_val_from_chan(ch chan i64) ?i64 {
+ r := <-ch ?
+ return r
+}
+
+// this function gets an array of channels for `i64`
+fn do_rec_calc_send(chs []chan i64, mut sem sync.Semaphore) {
+ mut msg := ''
+ for {
+ mut s := get_val_from_chan(chs[0]) or {
+ msg = err.msg
+ break
+ }
+ s++
+ chs[1] <- s
+ }
+ assert msg == 'channel closed'
+ sem.post()
+}
+
+fn test_channel_array_mut() {
+ mut chs := [chan i64{}, chan i64{cap: 10}]
+ mut sem := sync.new_semaphore()
+ go do_rec_calc_send(chs, mut sem)
+ mut t := i64(100)
+ for _ in 0 .. num_iterations {
+ chs[0] <- t
+ t = <-chs[1]
+ }
+ chs[0].close()
+ sem.wait()
+ assert t == 100 + num_iterations
+}
diff --git a/v_windows/v/vlib/sync/channel_polling_test.v b/v_windows/v/vlib/sync/channel_polling_test.v
new file mode 100644
index 0000000..846dcfd
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_polling_test.v
@@ -0,0 +1,56 @@
+// Channel Benchmark
+//
+// `nobj` integers are sent thru a channel with queue length`buflen`
+// using `nsend` sender threads and `nrec` receiver threads.
+//
+// The receive threads add all received numbers and send them to the
+// main thread where the total sum is compare to the expected value.
+const (
+ nsend = 2
+ nrec = 2
+ buflen = 100
+ nobj = 10000
+ objs_per_thread = 5000
+)
+
+fn do_rec(ch chan int, resch chan i64, n int) {
+ mut sum := i64(0)
+ for _ in 0 .. n {
+ mut r := 0
+ for ch.try_pop(mut r) != .success {
+ }
+ sum += r
+ }
+ println(sum)
+ resch <- sum
+}
+
+fn do_send(ch chan int, start int, end int) {
+ for i in start .. end {
+ for ch.try_push(i) != .success {
+ }
+ }
+}
+
+fn test_channel_polling() {
+ ch := chan int{cap: buflen}
+ resch := chan i64{}
+ for _ in 0 .. nrec {
+ go do_rec(ch, resch, objs_per_thread)
+ }
+ mut n := nobj
+ for _ in 0 .. nsend {
+ end := n
+ n -= objs_per_thread
+ go do_send(ch, n, end)
+ }
+ mut sum := i64(0)
+ for _ in 0 .. nrec {
+ sum += <-resch
+ println('> running sum: $sum')
+ }
+ // use sum formula by Gauß to calculate the expected result
+ expected_sum := i64(nobj) * (nobj - 1) / 2
+ println('expected sum: $expected_sum | sum: $sum')
+ assert sum == expected_sum
+}
diff --git a/v_windows/v/vlib/sync/channel_push_or_1_test.v b/v_windows/v/vlib/sync/channel_push_or_1_test.v
new file mode 100644
index 0000000..1551d83
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_push_or_1_test.v
@@ -0,0 +1,65 @@
+const n = 1000
+
+const c = 100
+
+fn f(ch chan int) {
+ for _ in 0 .. n {
+ _ := <-ch
+ }
+ ch.close()
+}
+
+fn test_push_or_unbuffered() {
+ ch := chan int{}
+ go f(ch)
+ mut j := 0
+ for {
+ ch <- j or { break }
+
+ j++
+ }
+ assert j == n
+}
+
+fn test_push_or_buffered() {
+ ch := chan int{cap: c}
+ go f(ch)
+ mut j := 0
+ for {
+ ch <- j or { break }
+
+ j++
+ }
+ // we don't know how many elements are in the buffer when the channel
+ // is closed, so check j against an interval
+ assert j >= n
+ assert j <= n + c
+}
+
+fn g(ch chan int, res chan int) {
+ mut j := 0
+ for {
+ ch <- j or { break }
+
+ j++
+ }
+ println('done $j')
+ res <- j
+}
+
+fn test_many_senders() {
+ ch := chan int{}
+ res := chan int{}
+ go g(ch, res)
+ go g(ch, res)
+ go g(ch, res)
+ mut k := 0
+ for _ in 0 .. 3 * n {
+ k = <-ch
+ }
+ ch.close()
+ mut sum := <-res
+ sum += <-res
+ sum += <-res
+ assert sum == 3 * n
+}
diff --git a/v_windows/v/vlib/sync/channel_push_or_2_test.v b/v_windows/v/vlib/sync/channel_push_or_2_test.v
new file mode 100644
index 0000000..451d9e7
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_push_or_2_test.v
@@ -0,0 +1,25 @@
+const n = 1000
+
+fn f(ch chan f64) {
+ mut s := 0.0
+ for _ in 0 .. n {
+ s += <-ch
+ }
+ assert s == f64(n * (n + 1) / 2)
+ ch.close()
+}
+
+fn do_send(ch chan f64, val f64) ?f64 {
+ ch <- val ?
+ return val + 1.0
+}
+
+fn test_push_propargate() {
+ ch := chan f64{}
+ go f(ch)
+ mut s := 1.0
+ for {
+ s = do_send(ch, s) or { break }
+ }
+ assert s == f64(n + 1)
+}
diff --git a/v_windows/v/vlib/sync/channel_select_2_test.v b/v_windows/v/vlib/sync/channel_select_2_test.v
new file mode 100644
index 0000000..189aaf6
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_select_2_test.v
@@ -0,0 +1,62 @@
+import time
+
+fn do_rec_i64(ch chan i64) {
+ mut sum := i64(0)
+ for _ in 0 .. 300 {
+ sum += <-ch
+ }
+ assert sum == 300 * (300 - 1) / 2
+}
+
+fn do_send_int(ch chan int) {
+ for i in 0 .. 300 {
+ ch <- i
+ }
+}
+
+fn do_send_byte(ch chan byte) {
+ for i in 0 .. 300 {
+ ch <- byte(i)
+ }
+}
+
+fn do_send_i64(ch chan i64) {
+ for i in 0 .. 300 {
+ ch <- i
+ }
+}
+
+fn test_select() {
+ chi := chan int{}
+ chl := chan i64{cap: 1}
+ chb := chan byte{cap: 10}
+ recch := chan i64{cap: 0}
+ go do_rec_i64(recch)
+ go do_send_int(chi)
+ go do_send_byte(chb)
+ go do_send_i64(chl)
+ mut sum := i64(0)
+ mut rl := i64(0)
+ mut sl := i64(0)
+ for _ in 0 .. 1200 {
+ select {
+ ri := <-chi {
+ sum += ri
+ }
+ recch <- sl {
+ sl++
+ }
+ rl = <-chl {
+ sum += rl
+ }
+ rb := <-chb {
+ sum += rb
+ }
+ }
+ }
+ // Use Gauß' formula for the first 2 contributions
+ // the 3rd contribution is `byte` and must be seen modulo 256
+ expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2
+ assert sum == expected_sum
+ time.sleep(20 * time.millisecond) // to give assert in coroutine enough time
+}
diff --git a/v_windows/v/vlib/sync/channel_select_3_test.v b/v_windows/v/vlib/sync/channel_select_3_test.v
new file mode 100644
index 0000000..fdf6096
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_select_3_test.v
@@ -0,0 +1,122 @@
+import time
+import sync
+
+struct St {
+ a int
+}
+
+fn getint() int {
+ return 8
+}
+
+fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, mut sem sync.Semaphore) {
+ mut a := 5
+ select {
+ a = <-ch3 {
+ a = 0
+ }
+ b := <-ch2 {
+ a = b.a
+ }
+ ch3 <- 5 {
+ a = 1
+ }
+ ch2 <- St{
+ a: 37
+ } {
+ a = 2
+ }
+ ch4 <- (6 + 7 * 9) {
+ a = 8
+ }
+ ch5 <- getint() {
+ a = 9
+ }
+ 300 * time.millisecond {
+ a = 3
+ }
+ }
+ assert a == 3
+ sem.post()
+}
+
+fn f2(ch1 chan St, ch2 chan int, mut sem sync.Semaphore) {
+ mut r := 23
+ for i in 0 .. 2 {
+ select {
+ b := <-ch1 {
+ r = b.a
+ }
+ ch2 <- r {
+ r = 17
+ }
+ }
+ if i == 0 {
+ assert r == 17
+ } else {
+ assert r == 13
+ }
+ }
+ sem.post()
+}
+
+fn test_select_blocks() {
+ ch1 := chan int{cap: 1}
+ ch2 := chan St{}
+ ch3 := chan int{}
+ ch4 := chan int{}
+ ch5 := chan int{}
+ mut sem := sync.new_semaphore()
+ mut r := false
+ t := select {
+ b := <-ch1 {
+ println(b)
+ }
+ else {
+ // no channel ready
+ r = true
+ }
+ }
+ assert r == true
+ assert t == true
+ go f2(ch2, ch3, mut sem)
+ n := <-ch3
+ assert n == 23
+ ch2 <- St{
+ a: 13
+ }
+ sem.wait()
+ stopwatch := time.new_stopwatch()
+ go f1(ch1, ch2, ch3, ch4, ch5, mut sem)
+ sem.wait()
+ elapsed_ms := f64(stopwatch.elapsed()) / time.millisecond
+ // https://docs.microsoft.com/en-us/windows-hardware/drivers/kernel/high-resolution-timers
+ // > For example, for Windows running on an x86 processor, the default interval between
+ // > system clock ticks is typically about 15 milliseconds, and the minimum interval
+ // > between system clock ticks is about 1 millisecond.
+ assert elapsed_ms >= 280.0 // 300 - (15ms + 5ms just in case)
+
+ ch1.close()
+ ch2.close()
+ mut h := 7
+ mut is_open := true
+ if select {
+ _ := <-ch2 {
+ h = 0
+ }
+ ch1 <- h {
+ h = 1
+ }
+ else {
+ h = 2
+ }
+ } {
+ panic('channel is still open')
+ } else {
+ is_open = false
+ }
+ // no branch should have run
+ assert h == 7
+ // since all channels are closed `select` should return `false`
+ assert is_open == false
+}
diff --git a/v_windows/v/vlib/sync/channel_select_4_test.v b/v_windows/v/vlib/sync/channel_select_4_test.v
new file mode 100644
index 0000000..77cd557
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_select_4_test.v
@@ -0,0 +1,43 @@
+fn do_rec_i64(ch chan i64, sumch chan i64) {
+ mut sum := i64(0)
+ for _ in 0 .. 30000 {
+ sum += <-ch
+ }
+ sumch <- sum
+}
+
+fn do_send_int(ch chan int) {
+ for i in 0 .. 30000 {
+ ch <- i
+ }
+}
+
+fn test_select() {
+ chi := chan int{cap: 10}
+ recch := chan i64{cap: 10}
+ chsum := chan i64{}
+ go do_rec_i64(recch, chsum)
+ go do_send_int(chi)
+ mut sum := i64(0)
+ mut sl := i64(0)
+ for _ in 0 .. 60000 + recch.cap {
+ select {
+ ri := <-chi {
+ sum += ri
+ }
+ recch <- sl {
+ sl++
+ }
+ }
+ }
+ // Use Gauß' formula
+ expected_sum := i64(30000) * (30000 - 1) / 2
+ assert sum == expected_sum
+
+ mut sumrec := <-chsum
+ // Empty receive buffer
+ for _ in 0 .. recch.cap {
+ sumrec += <-recch
+ }
+ assert sumrec == i64(30000 + recch.cap) * (30000 + recch.cap - 1) / 2
+}
diff --git a/v_windows/v/vlib/sync/channel_select_5_test.v b/v_windows/v/vlib/sync/channel_select_5_test.v
new file mode 100644
index 0000000..97228fe
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_select_5_test.v
@@ -0,0 +1,61 @@
+fn do_rec_i64(ch chan i64, sumch chan i64) {
+ mut sum := i64(0)
+ for _ in 0 .. 10000 {
+ sum += <-ch
+ }
+ sumch <- sum
+}
+
+fn do_send_int(ch chan int) {
+ for i in 0 .. 10000 {
+ ch <- i
+ }
+}
+
+fn do_send_int2(ch chan int) {
+ for i in 10000 .. 20000 {
+ ch <- i
+ }
+}
+
+fn do_send_int3(ch chan int) {
+ for i in 20000 .. 30000 {
+ ch <- i
+ }
+}
+
+fn test_select() {
+ chi := chan int{cap: 10}
+ recch := chan i64{cap: 10}
+ chsum := chan i64{}
+ go do_rec_i64(recch, chsum)
+ go do_rec_i64(recch, chsum)
+ go do_rec_i64(recch, chsum)
+ go do_send_int(chi)
+ go do_send_int2(chi)
+ go do_send_int3(chi)
+ mut sum := i64(0)
+ mut sl := i64(0)
+ for _ in 0 .. 60000 + recch.cap {
+ select {
+ ri := <-chi {
+ sum += ri
+ }
+ recch <- sl {
+ sl++
+ }
+ }
+ }
+ // Use Gauß' formula
+ expected_sum := i64(30000) * (30000 - 1) / 2
+ assert sum == expected_sum
+
+ mut sumrec := <-chsum
+ sumrec += <-chsum
+ sumrec += <-chsum
+ // Empty receive buffer
+ for _ in 0 .. recch.cap {
+ sumrec += <-recch
+ }
+ assert sumrec == i64(30000 + recch.cap) * (30000 + recch.cap - 1) / 2
+}
diff --git a/v_windows/v/vlib/sync/channel_select_6_test.v b/v_windows/v/vlib/sync/channel_select_6_test.v
new file mode 100644
index 0000000..9b5f2d4
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_select_6_test.v
@@ -0,0 +1,75 @@
+// This test case runs concurrent 3 instances of `do_select` that
+// communicate with 6 other threads doing send and receive operations.
+// There are buffered and unbuffered channels - handled by one or two
+// concurrend threads on the other side
+
+fn do_select(ch1 chan int, ch2 chan int, chf1 chan f64, chf2 chan f64, sumch1 chan i64, sumch2 chan i64) {
+ mut sum1 := i64(0)
+ mut sum2 := i64(0)
+ f1 := 17.0
+ f2 := 7.0
+ for _ in 0 .. 20000 + chf1.cap / 3 {
+ select {
+ chf1 <- f1 {}
+ i := <-ch1 {
+ sum1 += i
+ }
+ j := <-ch2 {
+ sum2 += j
+ }
+ chf2 <- f2 {}
+ }
+ }
+ sumch1 <- sum1
+ sumch2 <- sum2
+}
+
+fn do_send_int(ch chan int, factor int) {
+ for i in 0 .. 10000 {
+ ch <- (i * factor)
+ }
+}
+
+fn do_rec_f64(ch chan f64, sumch chan f64) {
+ mut sum := 0.0
+ for _ in 0 .. 10000 {
+ sum += <-ch
+ }
+ sumch <- sum
+}
+
+fn test_select() {
+ ch1 := chan int{cap: 3}
+ ch2 := chan int{}
+ // buffer length of chf1 mus be mutiple of 3 (# select threads)
+ chf1 := chan f64{cap: 30}
+ chf2 := chan f64{}
+ chsum1 := chan i64{}
+ chsum2 := chan i64{}
+ chsumf1 := chan f64{}
+ chsumf2 := chan f64{}
+ go do_send_int(ch1, 3)
+ go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2)
+ go do_rec_f64(chf1, chsumf1)
+ go do_rec_f64(chf2, chsumf2)
+ go do_rec_f64(chf2, chsumf2)
+ go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2)
+ go do_send_int(ch2, 7)
+ go do_send_int(ch2, 17)
+ go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2)
+
+ sum1 := <-chsum1 + <-chsum1 + <-chsum1
+ sum2 := <-chsum2 + <-chsum2 + <-chsum2
+ mut sumf1 := <-chsumf1
+ // empty channel buffer
+ for _ in 0 .. chf1.cap {
+ sumf1 += <-chf1
+ }
+ sumf2 := <-chsumf2 + <-chsumf2
+ // Use Gauß' formula
+ expected_sum := i64(10000) * (10000 - 1) / 2
+ assert sum1 == 3 * expected_sum
+ assert sum2 == (7 + 17) * expected_sum
+ assert sumf1 == 17.0 * f64(10000 + chf1.cap)
+ assert sumf2 == 7.0 * 20000
+}
diff --git a/v_windows/v/vlib/sync/channel_select_test.v b/v_windows/v/vlib/sync/channel_select_test.v
new file mode 100644
index 0000000..26ed641
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_select_test.v
@@ -0,0 +1,84 @@
+/*
+* ATTENTION! Do not use this file as an example!
+ * For that, please look at `channel_select_2_test.v` or `channel_select_3_test.v`
+ *
+ * This test case uses the implementation in `sync/channels.v` directly
+ * in order to test it independently from the support in the core language
+*/
+
+module sync
+
+import time
+
+fn do_rec_i64(mut ch Channel) {
+ mut sum := i64(0)
+ for _ in 0 .. 300 {
+ mut a := i64(0)
+ ch.pop(&a)
+ sum += a
+ }
+ assert sum == 300 * (300 - 1) / 2
+}
+
+fn do_send_int(mut ch Channel) {
+ for i in 0 .. 300 {
+ ch.push(&i)
+ }
+}
+
+fn do_send_byte(mut ch Channel) {
+ for i in 0 .. 300 {
+ ii := byte(i)
+ ch.push(&ii)
+ }
+}
+
+fn do_send_i64(mut ch Channel) {
+ for i in 0 .. 300 {
+ ii := i64(i)
+ ch.push(&ii)
+ }
+}
+
+fn test_select() {
+ mut chi := new_channel<int>(0)
+ mut chl := new_channel<i64>(1)
+ mut chb := new_channel<byte>(10)
+ mut recch := new_channel<i64>(0)
+ go do_rec_i64(mut recch)
+ go do_send_int(mut chi)
+ go do_send_byte(mut chb)
+ go do_send_i64(mut chl)
+ mut channels := [chi, recch, chl, chb]
+ directions := [Direction.pop, .push, .pop, .pop]
+ mut sum := i64(0)
+ mut rl := i64(0)
+ mut ri := int(0)
+ mut rb := byte(0)
+ mut sl := i64(0)
+ mut objs := [voidptr(&ri), &sl, &rl, &rb]
+ for _ in 0 .. 1200 {
+ idx := channel_select(mut channels, directions, mut objs, time.infinite)
+ match idx {
+ 0 {
+ sum += ri
+ }
+ 1 {
+ sl++
+ }
+ 2 {
+ sum += rl
+ }
+ 3 {
+ sum += rb
+ }
+ else {
+ println('got $idx (timeout)')
+ }
+ }
+ }
+ // Use Gauß' formula for the first 2 contributions
+ // the 3rd contribution is `byte` and must be seen modulo 256
+ expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2
+ assert sum == expected_sum
+}
diff --git a/v_windows/v/vlib/sync/channel_try_buf_test.v b/v_windows/v/vlib/sync/channel_try_buf_test.v
new file mode 100644
index 0000000..e521314
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_try_buf_test.v
@@ -0,0 +1,17 @@
+fn test_channel_try_buffered() {
+ ch := chan int{cap: 5}
+ for z in 2 .. 13 {
+ if ch.try_push(z) == .not_ready {
+ assert z == 7
+ break
+ }
+ }
+ mut obj := int(0)
+ for ch.try_pop(mut obj) == .success {
+ println(obj)
+ }
+ assert obj == 6
+ ch <- 17
+ obj = <-ch
+ assert obj == 17
+}
diff --git a/v_windows/v/vlib/sync/channel_try_unbuf_test.v b/v_windows/v/vlib/sync/channel_try_unbuf_test.v
new file mode 100644
index 0000000..ee11468
--- /dev/null
+++ b/v_windows/v/vlib/sync/channel_try_unbuf_test.v
@@ -0,0 +1,13 @@
+fn test_channel_try_unbuffered() {
+ ch := chan int{}
+ for z in 5 .. 8 {
+ if ch.try_push(z) == .not_ready {
+ assert z == 5
+ break
+ }
+ panic('push on non-ready channel not detected')
+ }
+ mut obj := -17
+ for ch.try_pop(mut obj) == .success {}
+ assert obj == -17
+}
diff --git a/v_windows/v/vlib/sync/channels.v b/v_windows/v/vlib/sync/channels.v
new file mode 100644
index 0000000..49f1396
--- /dev/null
+++ b/v_windows/v/vlib/sync/channels.v
@@ -0,0 +1,730 @@
+module sync
+
+import time
+import rand
+
+$if windows {
+ #flag -I @VEXEROOT/thirdparty/stdatomic/win
+} $else {
+ #flag -I @VEXEROOT/thirdparty/stdatomic/nix
+}
+
+$if linux {
+ $if tinyc {
+ $if amd64 {
+ // most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/6
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/7
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/8
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/9
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/10
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/11
+ #flag -L/usr/lib/gcc/x86_64-linux-gnu/12
+ } $else $if arm64 {
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/6
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/7
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/8
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/9
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/10
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/11
+ #flag -L/usr/lib/gcc/aarch64-linux-gnu/12
+ }
+ #flag -latomic
+ }
+}
+
+#include <atomic.h>
+// The following functions are actually generic in C
+fn C.atomic_load_ptr(voidptr) voidptr
+fn C.atomic_store_ptr(voidptr, voidptr)
+fn C.atomic_compare_exchange_weak_ptr(voidptr, voidptr, voidptr) bool
+fn C.atomic_compare_exchange_strong_ptr(voidptr, voidptr, voidptr) bool
+fn C.atomic_exchange_ptr(voidptr, voidptr) voidptr
+fn C.atomic_fetch_add_ptr(voidptr, voidptr) voidptr
+fn C.atomic_fetch_sub_ptr(voidptr, voidptr) voidptr
+
+fn C.atomic_load_u16(voidptr) u16
+fn C.atomic_store_u16(voidptr, u16)
+fn C.atomic_compare_exchange_weak_u16(voidptr, voidptr, u16) bool
+fn C.atomic_compare_exchange_strong_u16(voidptr, voidptr, u16) bool
+fn C.atomic_exchange_u16(voidptr, u16) u16
+fn C.atomic_fetch_add_u16(voidptr, u16) u16
+fn C.atomic_fetch_sub_u16(voidptr, u16) u16
+
+fn C.atomic_load_u32(voidptr) u32
+fn C.atomic_store_u32(voidptr, u32)
+fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool
+fn C.atomic_compare_exchange_strong_u32(voidptr, voidptr, u32) bool
+fn C.atomic_exchange_u32(voidptr, u32) u32
+fn C.atomic_fetch_add_u32(voidptr, u32) u32
+fn C.atomic_fetch_sub_u32(voidptr, u32) u32
+
+fn C.atomic_load_u64(voidptr) u64
+fn C.atomic_store_u64(voidptr, u64)
+fn C.atomic_compare_exchange_weak_u64(voidptr, voidptr, u64) bool
+fn C.atomic_compare_exchange_strong_u64(voidptr, voidptr, u64) bool
+fn C.atomic_exchange_u64(voidptr, u64) u64
+fn C.atomic_fetch_add_u64(voidptr, u64) u64
+fn C.atomic_fetch_sub_u64(voidptr, u64) u64
+
+const (
+ // how often to try to get data without blocking before to wait for semaphore
+ spinloops = 750
+ spinloops_sem = 4000
+)
+
+enum BufferElemStat {
+ unused = 0
+ writing
+ written
+ reading
+}
+
+struct Subscription {
+mut:
+ sem &Semaphore
+ prev &&Subscription
+ nxt &Subscription
+}
+
+enum Direction {
+ pop
+ push
+}
+
+struct Channel {
+ ringbuf &byte // queue for buffered channels
+ statusbuf &byte // flags to synchronize write/read in ringbuf
+ objsize u32
+mut: // atomic
+ writesem Semaphore // to wake thread that wanted to write, but buffer was full
+ readsem Semaphore // to wake thread that wanted to read, but buffer was empty
+ writesem_im Semaphore
+ readsem_im Semaphore
+ write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait
+ read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait
+ adr_read C.atomic_uintptr_t // used to identify origin of writesem
+ adr_written C.atomic_uintptr_t // used to identify origin of readsem
+ write_free u32 // for queue state
+ read_avail u32
+ buf_elem_write_idx u32
+ buf_elem_read_idx u32
+ // for select
+ write_subscriber &Subscription
+ read_subscriber &Subscription
+ write_sub_mtx u16
+ read_sub_mtx u16
+ closed u16
+pub:
+ cap u32 // queue length in #objects
+}
+
+pub fn new_channel<T>(n u32) &Channel {
+ st := sizeof(T)
+ if isreftype(T) {
+ return new_channel_st(n, st)
+ } else {
+ return new_channel_st_noscan(n, st)
+ }
+}
+
+fn new_channel_st(n u32, st u32) &Channel {
+ wsem := if n > 0 { n } else { 1 }
+ rsem := if n > 0 { u32(0) } else { 1 }
+ rbuf := if n > 0 { unsafe { malloc(int(n * st)) } } else { &byte(0) }
+ sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &byte(0) }
+ mut ch := Channel{
+ objsize: st
+ cap: n
+ write_free: n
+ read_avail: 0
+ ringbuf: rbuf
+ statusbuf: sbuf
+ write_subscriber: 0
+ read_subscriber: 0
+ }
+ ch.writesem.init(wsem)
+ ch.readsem.init(rsem)
+ ch.writesem_im.init(0)
+ ch.readsem_im.init(0)
+ return &ch
+}
+
+fn new_channel_st_noscan(n u32, st u32) &Channel {
+ $if gcboehm_opt ? {
+ wsem := if n > 0 { n } else { 1 }
+ rsem := if n > 0 { u32(0) } else { 1 }
+ rbuf := if n > 0 { unsafe { malloc_noscan(int(n * st)) } } else { &byte(0) }
+ sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &byte(0) }
+ mut ch := Channel{
+ objsize: st
+ cap: n
+ write_free: n
+ read_avail: 0
+ ringbuf: rbuf
+ statusbuf: sbuf
+ write_subscriber: 0
+ read_subscriber: 0
+ }
+ ch.writesem.init(wsem)
+ ch.readsem.init(rsem)
+ ch.writesem_im.init(0)
+ ch.readsem_im.init(0)
+ return &ch
+ } $else {
+ return new_channel_st(n, st)
+ }
+}
+
+pub fn (ch &Channel) auto_str(typename string) string {
+ return 'chan $typename{cap: $ch.cap, closed: $ch.closed}'
+}
+
+pub fn (mut ch Channel) close() {
+ open_val := u16(0)
+ if !C.atomic_compare_exchange_strong_u16(&ch.closed, &open_val, 1) {
+ return
+ }
+ mut nulladr := voidptr(0)
+ for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, &nulladr,
+ voidptr(-1)) {
+ nulladr = voidptr(0)
+ }
+ ch.readsem_im.post()
+ ch.readsem.post()
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ if ch.read_subscriber != voidptr(0) {
+ ch.read_subscriber.sem.post()
+ }
+ C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
+ null16 = u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ if ch.write_subscriber != voidptr(0) {
+ ch.write_subscriber.sem.post()
+ }
+ C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
+ ch.writesem.post()
+ if ch.cap == 0 {
+ C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, voidptr(0))
+ }
+ ch.writesem_im.post()
+}
+
+[inline]
+pub fn (mut ch Channel) len() int {
+ return int(C.atomic_load_u32(&ch.read_avail))
+}
+
+[inline]
+pub fn (mut ch Channel) closed() bool {
+ return C.atomic_load_u16(&ch.closed) != 0
+}
+
+[inline]
+pub fn (mut ch Channel) push(src voidptr) {
+ if ch.try_push_priv(src, false) == .closed {
+ panic('push on closed channel')
+ }
+}
+
+[inline]
+pub fn (mut ch Channel) try_push(src voidptr) ChanState {
+ return ch.try_push_priv(src, true)
+}
+
+fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
+ if C.atomic_load_u16(&ch.closed) != 0 {
+ return .closed
+ }
+ spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem }
+ mut have_swapped := false
+ for {
+ mut got_sem := false
+ mut wradr := C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) })
+ for wradr != C.NULL {
+ if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) },
+ &wradr, voidptr(0))
+ {
+ // there is a reader waiting for us
+ unsafe { C.memcpy(wradr, src, ch.objsize) }
+ mut nulladr := voidptr(0)
+ for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) },
+ &nulladr, wradr) {
+ nulladr = voidptr(0)
+ }
+ ch.readsem_im.post()
+ return .success
+ }
+ }
+ if no_block && ch.cap == 0 {
+ return .not_ready
+ }
+ // get token to read
+ for _ in 0 .. spinloops_sem_ {
+ if got_sem {
+ break
+ }
+ got_sem = ch.writesem.try_wait()
+ }
+ if !got_sem {
+ if no_block {
+ return .not_ready
+ }
+ ch.writesem.wait()
+ }
+ if C.atomic_load_u16(&ch.closed) != 0 {
+ ch.writesem.post()
+ return .closed
+ }
+ if ch.cap == 0 {
+ // try to advertise current object as readable
+ mut read_in_progress := false
+ C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, src)
+ wradr = C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) })
+ if wradr != C.NULL {
+ mut src2 := src
+ if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) },
+ &src2, voidptr(0))
+ {
+ ch.writesem.post()
+ continue
+ } else {
+ read_in_progress = true
+ }
+ }
+ if !read_in_progress {
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16,
+ u16(1)) {
+ null16 = u16(0)
+ }
+ if ch.read_subscriber != voidptr(0) {
+ ch.read_subscriber.sem.post()
+ }
+ C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
+ }
+ mut src2 := src
+ for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ {
+ if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) },
+ &src2, voidptr(0))
+ {
+ have_swapped = true
+ read_in_progress = true
+ break
+ }
+ src2 = src
+ }
+ mut got_im_sem := false
+ for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ {
+ got_im_sem = ch.writesem_im.try_wait()
+ if got_im_sem {
+ break
+ }
+ }
+ for {
+ if got_im_sem {
+ got_im_sem = false
+ } else {
+ ch.writesem_im.wait()
+ }
+ if C.atomic_load_u16(&ch.closed) != 0 {
+ if have_swapped
+ || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) {
+ ch.writesem.post()
+ return .success
+ } else {
+ return .closed
+ }
+ }
+ if have_swapped
+ || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) {
+ ch.writesem.post()
+ break
+ } else {
+ // this semaphore was not for us - repost in
+ ch.writesem_im.post()
+ if src2 == voidptr(-1) {
+ ch.readsem.post()
+ return .closed
+ }
+ src2 = src
+ }
+ }
+ return .success
+ } else {
+ // buffered channel
+ mut space_in_queue := false
+ mut wr_free := C.atomic_load_u32(&ch.write_free)
+ for wr_free > 0 {
+ space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free,
+ wr_free - 1)
+ if space_in_queue {
+ break
+ }
+ }
+ if space_in_queue {
+ mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx)
+ for {
+ mut new_wr_idx := wr_idx + 1
+ for new_wr_idx >= ch.cap {
+ new_wr_idx -= ch.cap
+ }
+ if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx,
+ new_wr_idx)
+ {
+ break
+ }
+ }
+ mut wr_ptr := ch.ringbuf
+ mut status_adr := ch.statusbuf
+ unsafe {
+ wr_ptr += wr_idx * ch.objsize
+ status_adr += wr_idx * sizeof(u16)
+ }
+ mut expected_status := u16(BufferElemStat.unused)
+ for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status,
+ u16(BufferElemStat.writing)) {
+ expected_status = u16(BufferElemStat.unused)
+ }
+ unsafe {
+ C.memcpy(wr_ptr, src, ch.objsize)
+ }
+ C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written))
+ C.atomic_fetch_add_u32(&ch.read_avail, 1)
+ ch.readsem.post()
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ if ch.read_subscriber != voidptr(0) {
+ ch.read_subscriber.sem.post()
+ }
+ C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
+ return .success
+ } else {
+ if no_block {
+ return .not_ready
+ }
+ ch.writesem.post()
+ }
+ }
+ }
+ // we should not get here but the V compiler want's to see a return statement
+ assert false
+ return .success
+}
+
+[inline]
+pub fn (mut ch Channel) pop(dest voidptr) bool {
+ return ch.try_pop_priv(dest, false) == .success
+}
+
+[inline]
+pub fn (mut ch Channel) try_pop(dest voidptr) ChanState {
+ return ch.try_pop_priv(dest, true)
+}
+
+fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
+ spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem }
+ mut have_swapped := false
+ mut write_in_progress := false
+ for {
+ mut got_sem := false
+ if ch.cap == 0 {
+ // unbuffered channel - first see if a `push()` has adversized
+ mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) })
+ for rdadr != C.NULL {
+ if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) },
+ &rdadr, voidptr(0))
+ {
+ // there is a writer waiting for us
+ unsafe { C.memcpy(dest, rdadr, ch.objsize) }
+ mut nulladr := voidptr(0)
+ for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_read) },
+ &nulladr, rdadr) {
+ nulladr = voidptr(0)
+ }
+ ch.writesem_im.post()
+ return .success
+ }
+ }
+ if no_block {
+ if C.atomic_load_u16(&ch.closed) == 0 {
+ return .not_ready
+ } else {
+ return .closed
+ }
+ }
+ }
+ // get token to read
+ for _ in 0 .. spinloops_sem_ {
+ if got_sem {
+ break
+ }
+ got_sem = ch.readsem.try_wait()
+ }
+ if !got_sem {
+ if no_block {
+ if C.atomic_load_u16(&ch.closed) == 0 {
+ return .not_ready
+ } else {
+ return .closed
+ }
+ }
+ ch.readsem.wait()
+ }
+ if ch.cap > 0 {
+ // try to get buffer token
+ mut obj_in_queue := false
+ mut rd_avail := C.atomic_load_u32(&ch.read_avail)
+ for rd_avail > 0 {
+ obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail,
+ rd_avail - 1)
+ if obj_in_queue {
+ break
+ }
+ }
+ if obj_in_queue {
+ mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx)
+ for {
+ mut new_rd_idx := rd_idx + 1
+ for new_rd_idx >= ch.cap {
+ new_rd_idx -= ch.cap
+ }
+ if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx,
+ new_rd_idx)
+ {
+ break
+ }
+ }
+ mut rd_ptr := ch.ringbuf
+ mut status_adr := ch.statusbuf
+ unsafe {
+ rd_ptr += rd_idx * ch.objsize
+ status_adr += rd_idx * sizeof(u16)
+ }
+ mut expected_status := u16(BufferElemStat.written)
+ for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status,
+ u16(BufferElemStat.reading)) {
+ expected_status = u16(BufferElemStat.written)
+ }
+ unsafe {
+ C.memcpy(dest, rd_ptr, ch.objsize)
+ }
+ C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused))
+ C.atomic_fetch_add_u32(&ch.write_free, 1)
+ ch.writesem.post()
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ if ch.write_subscriber != voidptr(0) {
+ ch.write_subscriber.sem.post()
+ }
+ C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
+ return .success
+ }
+ }
+ // try to advertise `dest` as writable
+ C.atomic_store_ptr(unsafe { &voidptr(&ch.write_adr) }, dest)
+ if ch.cap == 0 {
+ mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) })
+ if rdadr != C.NULL {
+ mut dest2 := dest
+ if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) },
+ &dest2, voidptr(0))
+ {
+ ch.readsem.post()
+ continue
+ } else {
+ write_in_progress = true
+ }
+ }
+ }
+ if ch.cap == 0 && !write_in_progress {
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ if ch.write_subscriber != voidptr(0) {
+ ch.write_subscriber.sem.post()
+ }
+ C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
+ }
+ mut dest2 := dest
+ for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ {
+ if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) },
+ &dest2, voidptr(0))
+ {
+ have_swapped = true
+ break
+ } else if dest2 == voidptr(-1) {
+ ch.readsem.post()
+ return .closed
+ }
+ dest2 = dest
+ }
+ mut got_im_sem := false
+ for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ {
+ got_im_sem = ch.readsem_im.try_wait()
+ if got_im_sem {
+ break
+ }
+ }
+ for {
+ if got_im_sem {
+ got_im_sem = false
+ } else {
+ ch.readsem_im.wait()
+ }
+ if have_swapped
+ || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, &dest2, voidptr(0)) {
+ ch.readsem.post()
+ break
+ } else {
+ // this semaphore was not for us - repost in
+ ch.readsem_im.post()
+ if dest2 == voidptr(-1) {
+ ch.readsem.post()
+ return .closed
+ }
+ dest2 = dest
+ }
+ }
+ break
+ }
+ return .success
+}
+
+// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`)
+// object referenced by `objrefs[i]`. `timeout = time.infinite` means wait unlimited time. `timeout <= 0` means return
+// immediately if no transaction can be performed without waiting.
+// return value: the index of the channel on which a transaction has taken place
+// -1 if waiting for a transaction has exceeded timeout
+// -2 if all channels are closed
+
+pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int {
+ assert channels.len == dir.len
+ assert dir.len == objrefs.len
+ mut subscr := []Subscription{len: channels.len}
+ mut sem := unsafe { Semaphore{} }
+ sem.init(0)
+ for i, ch in channels {
+ subscr[i].sem = unsafe { &sem }
+ if dir[i] == .push {
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ subscr[i].prev = unsafe { &ch.write_subscriber }
+ unsafe {
+ subscr[i].nxt = C.atomic_exchange_ptr(&voidptr(&ch.write_subscriber),
+ &subscr[i])
+ }
+ if voidptr(subscr[i].nxt) != voidptr(0) {
+ subscr[i].nxt.prev = unsafe { &subscr[i].nxt }
+ }
+ C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
+ } else {
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ subscr[i].prev = unsafe { &ch.read_subscriber }
+ unsafe {
+ subscr[i].nxt = C.atomic_exchange_ptr(&voidptr(&ch.read_subscriber), &subscr[i])
+ }
+ if voidptr(subscr[i].nxt) != voidptr(0) {
+ subscr[i].nxt.prev = unsafe { &subscr[i].nxt }
+ }
+ C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
+ }
+ }
+ stopwatch := if timeout == time.infinite || timeout <= 0 {
+ time.StopWatch{}
+ } else {
+ time.new_stopwatch()
+ }
+ mut event_idx := -1 // negative index means `timed out`
+
+ outer: for {
+ rnd := rand.u32_in_range(0, u32(channels.len))
+ mut num_closed := 0
+ for j, _ in channels {
+ mut i := j + int(rnd)
+ if i >= channels.len {
+ i -= channels.len
+ }
+ if dir[i] == .push {
+ stat := channels[i].try_push_priv(objrefs[i], true)
+ if stat == .success {
+ event_idx = i
+ break outer
+ } else if stat == .closed {
+ num_closed++
+ }
+ } else {
+ stat := channels[i].try_pop_priv(objrefs[i], true)
+ if stat == .success {
+ event_idx = i
+ break outer
+ } else if stat == .closed {
+ num_closed++
+ }
+ }
+ }
+ if num_closed == channels.len {
+ event_idx = -2
+ break outer
+ }
+ if timeout <= 0 {
+ break outer
+ }
+ if timeout != time.infinite {
+ remaining := timeout - stopwatch.elapsed()
+ if !sem.timed_wait(remaining) {
+ break outer
+ }
+ } else {
+ sem.wait()
+ }
+ }
+ // reset subscribers
+ for i, ch in channels {
+ if dir[i] == .push {
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ unsafe {
+ *subscr[i].prev = subscr[i].nxt
+ }
+ if subscr[i].nxt != 0 {
+ subscr[i].nxt.prev = subscr[i].prev
+ // just in case we have missed a semaphore during restore
+ subscr[i].nxt.sem.post()
+ }
+ C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
+ } else {
+ mut null16 := u16(0)
+ for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
+ null16 = u16(0)
+ }
+ unsafe {
+ *subscr[i].prev = subscr[i].nxt
+ }
+ if subscr[i].nxt != 0 {
+ subscr[i].nxt.prev = subscr[i].prev
+ subscr[i].nxt.sem.post()
+ }
+ C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
+ }
+ }
+ sem.destroy()
+ return event_idx
+}
diff --git a/v_windows/v/vlib/sync/pool/README.md b/v_windows/v/vlib/sync/pool/README.md
new file mode 100644
index 0000000..bdea5b3
--- /dev/null
+++ b/v_windows/v/vlib/sync/pool/README.md
@@ -0,0 +1,36 @@
+
+The `sync.pool` module provides a convenient way to run identical tasks over
+an array of items *in parallel*, without worrying about thread synchronization,
+waitgroups, mutexes etc.., you just need to supply a callback function, that
+will be called once per each item in your input array.
+
+After all the work is done in parallel by the worker threads in the pool,
+pool.work_on_items will return. You can then call pool.get_results<Result>()
+to retrieve a list of all the results, that the worker callbacks returned
+for each input item. Example:
+
+```v
+import sync.pool
+
+struct SResult {
+ s string
+}
+
+fn sprocess(pp &pool.PoolProcessor, idx int, wid int) &SResult {
+ item := pp.get_item<string>(idx)
+ println('idx: $idx, wid: $wid, item: ' + item)
+ return &SResult{item.reverse()}
+}
+
+fn main() {
+ mut pp := pool.new_pool_processor(callback: sprocess)
+ pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc'])
+ // optionally, you can iterate over the results too:
+ for x in pp.get_results<SResult>() {
+ println('result: $x.s')
+ }
+}
+```
+
+See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a
+more detailed usage example.
diff --git a/v_windows/v/vlib/sync/pool/pool.v b/v_windows/v/vlib/sync/pool/pool.v
new file mode 100644
index 0000000..b2c5340
--- /dev/null
+++ b/v_windows/v/vlib/sync/pool/pool.v
@@ -0,0 +1,165 @@
+module pool
+
+import sync
+import runtime
+
+[trusted]
+fn C.atomic_fetch_add_u32(voidptr, u32) u32
+
+pub const (
+ no_result = voidptr(0)
+)
+
+pub struct PoolProcessor {
+ thread_cb voidptr
+mut:
+ njobs int
+ items []voidptr
+ results []voidptr
+ ntask u32 // reading/writing to this should be atomic
+ waitgroup sync.WaitGroup
+ shared_context voidptr
+ thread_contexts []voidptr
+}
+
+pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr
+
+pub struct PoolProcessorConfig {
+ maxjobs int
+ callback ThreadCB
+}
+
+// new_pool_processor returns a new PoolProcessor instance.
+// The parameters of new_pool_processor are:
+// context.maxjobs: when 0 (the default), the PoolProcessor will use a
+// number of threads, that is optimal for your system to process your items.
+// context.callback: this should be a callback function, that each worker
+// thread in the pool will run for each item.
+// The callback function will receive as parameters:
+// 1) the PoolProcessor instance, so it can call
+// p.get_item<int>(idx) to get the actual item at index idx
+// 2) idx - the index of the currently processed item
+// 3) task_id - the index of the worker thread in which the callback
+// function is running.
+pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor {
+ if isnil(context.callback) {
+ panic('You need to pass a valid callback to new_pool_processor.')
+ }
+ mut pool := PoolProcessor{
+ items: []
+ results: []
+ shared_context: voidptr(0)
+ thread_contexts: []
+ njobs: context.maxjobs
+ ntask: 0
+ thread_cb: voidptr(context.callback)
+ }
+ pool.waitgroup.init()
+ return &pool
+}
+
+// set_max_jobs gives you the ability to override the number
+// of jobs *after* the PoolProcessor had been created already.
+pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) {
+ pool.njobs = njobs
+}
+
+// work_on_items receives a list of items of type T,
+// then starts a work pool of pool.njobs threads, each running
+// pool.thread_cb in a loop, untill all items in the list,
+// are processed.
+// When pool.njobs is 0, the number of jobs is determined
+// by the number of available cores on the system.
+// work_on_items returns *after* all threads finish.
+// You can optionally call get_results after that.
+pub fn (mut pool PoolProcessor) work_on_items<T>(items []T) {
+ pool.work_on_pointers(unsafe { items.pointers() })
+}
+
+pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) {
+ mut njobs := runtime.nr_jobs()
+ if pool.njobs > 0 {
+ njobs = pool.njobs
+ }
+ pool.items = []
+ pool.results = []
+ pool.thread_contexts = []
+ pool.items << items
+ pool.results = []voidptr{len: (pool.items.len)}
+ pool.thread_contexts << []voidptr{len: (pool.items.len)}
+ pool.waitgroup.add(njobs)
+ for i := 0; i < njobs; i++ {
+ if njobs > 1 {
+ go process_in_thread(mut pool, i)
+ } else {
+ // do not run concurrently, just use the same thread:
+ process_in_thread(mut pool, i)
+ }
+ }
+ pool.waitgroup.wait()
+}
+
+// process_in_thread does the actual work of worker thread.
+// It is a workaround for the current inability to pass a
+// method in a callback.
+fn process_in_thread(mut pool PoolProcessor, task_id int) {
+ cb := ThreadCB(pool.thread_cb)
+ ilen := pool.items.len
+ for {
+ idx := int(C.atomic_fetch_add_u32(&pool.ntask, 1))
+ if idx >= ilen {
+ break
+ }
+ pool.results[idx] = cb(pool, idx, task_id)
+ }
+ pool.waitgroup.done()
+}
+
+// get_item - called by the worker callback.
+// Retrieves a type safe instance of the currently processed item
+pub fn (pool &PoolProcessor) get_item<T>(idx int) T {
+ return *(&T(pool.items[idx]))
+}
+
+// get_result - called by the main thread to get a specific result.
+// Retrieves a type safe instance of the produced result.
+pub fn (pool &PoolProcessor) get_result<T>(idx int) T {
+ return *(&T(pool.results[idx]))
+}
+
+// get_results - get a list of type safe results in the main thread.
+pub fn (pool &PoolProcessor) get_results<T>() []T {
+ mut res := []T{}
+ for i in 0 .. pool.results.len {
+ res << *(&T(pool.results[i]))
+ }
+ return res
+}
+
+// set_shared_context - can be called during the setup so that you can
+// provide a context that is shared between all worker threads, like
+// common options/settings.
+pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) {
+ pool.shared_context = context
+}
+
+// get_shared_context - can be called in each worker callback, to get
+// the context set by pool.set_shared_context
+pub fn (pool &PoolProcessor) get_shared_context() voidptr {
+ return pool.shared_context
+}
+
+// set_thread_context - can be called during the setup at the start of
+// each worker callback, so that the worker callback can have some thread
+// local storage area where it can write/read information that is private
+// to the given thread, without worrying that it will get overwritten by
+// another thread
+pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) {
+ pool.thread_contexts[idx] = context
+}
+
+// get_thread_context - returns a pointer, that was set with
+// pool.set_thread_context . This pointer is private to each thread.
+pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr {
+ return pool.thread_contexts[idx]
+}
diff --git a/v_windows/v/vlib/sync/pool/pool_test.v b/v_windows/v/vlib/sync/pool/pool_test.v
new file mode 100644
index 0000000..629b524
--- /dev/null
+++ b/v_windows/v/vlib/sync/pool/pool_test.v
@@ -0,0 +1,52 @@
+import time
+import sync.pool
+
+pub struct SResult {
+ s string
+}
+
+pub struct IResult {
+ i int
+}
+
+fn worker_s(p &pool.PoolProcessor, idx int, worker_id int) &SResult {
+ item := p.get_item<string>(idx)
+ println('worker_s worker_id: $worker_id | idx: $idx | item: $item')
+ time.sleep(3 * time.millisecond)
+ return &SResult{'$item $item'}
+}
+
+fn worker_i(p &pool.PoolProcessor, idx int, worker_id int) &IResult {
+ item := p.get_item<int>(idx)
+ println('worker_i worker_id: $worker_id | idx: $idx | item: $item')
+ time.sleep(5 * time.millisecond)
+ return &IResult{item * 1000}
+}
+
+fn test_work_on_strings() {
+ mut pool_s := pool.new_pool_processor(
+ callback: worker_s
+ maxjobs: 8
+ )
+
+ pool_s.work_on_items(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'])
+ for x in pool_s.get_results<SResult>() {
+ println(x.s)
+ assert x.s.len > 1
+ }
+}
+
+fn test_work_on_ints() {
+ // NB: since maxjobs is left empty here,
+ // the pool processor will use njobs = runtime.nr_jobs so that
+ // it will work optimally without overloading the system
+ mut pool_i := pool.new_pool_processor(
+ callback: worker_i
+ )
+
+ pool_i.work_on_items([1, 2, 3, 4, 5, 6, 7, 8])
+ for x in pool_i.get_results<IResult>() {
+ println(x.i)
+ assert x.i > 100
+ }
+}
diff --git a/v_windows/v/vlib/sync/select_close_test.v b/v_windows/v/vlib/sync/select_close_test.v
new file mode 100644
index 0000000..3a40e2d
--- /dev/null
+++ b/v_windows/v/vlib/sync/select_close_test.v
@@ -0,0 +1,92 @@
+module sync
+
+import time
+
+fn do_rec_i64(mut ch Channel) {
+ mut sum := i64(0)
+ for i in 0 .. 300 {
+ if i == 200 {
+ ch.close()
+ }
+ mut a := i64(0)
+ if ch.pop(&a) {
+ sum += a
+ }
+ }
+ assert sum == 200 * (200 - 1) / 2
+}
+
+fn do_send_int(mut ch Channel) {
+ for i in 0 .. 300 {
+ ch.push(&i)
+ }
+ ch.close()
+}
+
+fn do_send_byte(mut ch Channel) {
+ for i in 0 .. 300 {
+ ii := byte(i)
+ ch.push(&ii)
+ }
+ ch.close()
+}
+
+fn do_send_i64(mut ch Channel) {
+ for i in 0 .. 300 {
+ ii := i64(i)
+ ch.push(&ii)
+ }
+ ch.close()
+}
+
+fn test_select() {
+ mut chi := new_channel<int>(0)
+ mut chl := new_channel<i64>(1)
+ mut chb := new_channel<byte>(10)
+ mut recch := new_channel<i64>(0)
+ go do_rec_i64(mut recch)
+ go do_send_int(mut chi)
+ go do_send_byte(mut chb)
+ go do_send_i64(mut chl)
+ mut channels := [chi, recch, chl, chb]
+ directions := [Direction.pop, .push, .pop, .pop]
+ mut sum := i64(0)
+ mut rl := i64(0)
+ mut ri := int(0)
+ mut rb := byte(0)
+ mut sl := i64(0)
+ mut objs := [voidptr(&ri), &sl, &rl, &rb]
+ for j in 0 .. 1101 {
+ idx := channel_select(mut channels, directions, mut objs, time.infinite)
+ match idx {
+ 0 {
+ sum += ri
+ }
+ 1 {
+ sl++
+ }
+ 2 {
+ sum += rl
+ }
+ 3 {
+ sum += rb
+ }
+ -2 {
+ // channel was closed - last item
+ assert j == 1100
+ }
+ else {
+ println('got $idx (timeout)')
+ assert false
+ }
+ }
+ if j == 1100 {
+ // check also in other direction
+ assert idx == -2
+ }
+ }
+ // Use Gauß' formula for the first 2 contributions
+ // the 3rd contribution is `byte` and must be seen modulo 256
+ expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2
+ assert sum == expected_sum
+}
diff --git a/v_windows/v/vlib/sync/struct_chan_init_test.v b/v_windows/v/vlib/sync/struct_chan_init_test.v
new file mode 100644
index 0000000..a51ea4b
--- /dev/null
+++ b/v_windows/v/vlib/sync/struct_chan_init_test.v
@@ -0,0 +1,14 @@
+struct Abc {
+ ch chan int
+}
+
+fn f(st Abc) {
+ st.ch <- 47
+}
+
+fn test_chan_init() {
+ st := Abc{}
+ go f(st)
+ i := <-st.ch
+ assert i == 47
+}
diff --git a/v_windows/v/vlib/sync/sync_default.c.v b/v_windows/v/vlib/sync/sync_default.c.v
new file mode 100644
index 0000000..76f0b44
--- /dev/null
+++ b/v_windows/v/vlib/sync/sync_default.c.v
@@ -0,0 +1,193 @@
+// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved.
+// Use of this source code is governed by an MIT license
+// that can be found in the LICENSE file.
+module sync
+
+import time
+
+// There's no additional linking (-lpthread) needed for Android.
+// See https://stackoverflow.com/a/31277163/1904615
+$if !android {
+ #flag -lpthread
+}
+
+#include <semaphore.h>
+
+[trusted]
+fn C.pthread_mutex_init(voidptr, voidptr) int
+fn C.pthread_mutex_lock(voidptr) int
+fn C.pthread_mutex_unlock(voidptr) int
+fn C.pthread_mutex_destroy(voidptr) int
+fn C.pthread_rwlockattr_init(voidptr) int
+fn C.pthread_rwlockattr_setkind_np(voidptr, int) int
+fn C.pthread_rwlockattr_setpshared(voidptr, int) int
+fn C.pthread_rwlock_init(voidptr, voidptr) int
+fn C.pthread_rwlock_rdlock(voidptr) int
+fn C.pthread_rwlock_wrlock(voidptr) int
+fn C.pthread_rwlock_unlock(voidptr) int
+fn C.sem_init(voidptr, int, u32) int
+fn C.sem_post(voidptr) int
+fn C.sem_wait(voidptr) int
+fn C.sem_trywait(voidptr) int
+fn C.sem_timedwait(voidptr, voidptr) int
+fn C.sem_destroy(voidptr) int
+
+// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
+[heap]
+pub struct Mutex {
+ mutex C.pthread_mutex_t
+}
+
+[heap]
+pub struct RwMutex {
+ mutex C.pthread_rwlock_t
+}
+
+struct RwMutexAttr {
+ attr C.pthread_rwlockattr_t
+}
+
+[heap]
+struct Semaphore {
+ sem C.sem_t
+}
+
+pub fn new_mutex() &Mutex {
+ mut m := &Mutex{}
+ m.init()
+ return m
+}
+
+pub fn (mut m Mutex) init() {
+ C.pthread_mutex_init(&m.mutex, C.NULL)
+}
+
+pub fn new_rwmutex() &RwMutex {
+ mut m := &RwMutex{}
+ m.init()
+ return m
+}
+
+pub fn (mut m RwMutex) init() {
+ a := RwMutexAttr{}
+ C.pthread_rwlockattr_init(&a.attr)
+ // Give writer priority over readers
+ C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
+ C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE)
+ C.pthread_rwlock_init(&m.mutex, &a.attr)
+}
+
+// @lock(), for *manual* mutex handling, since `lock` is a keyword
+pub fn (mut m Mutex) @lock() {
+ C.pthread_mutex_lock(&m.mutex)
+}
+
+pub fn (mut m Mutex) unlock() {
+ C.pthread_mutex_unlock(&m.mutex)
+}
+
+// RwMutex has separate read- and write locks
+pub fn (mut m RwMutex) @rlock() {
+ C.pthread_rwlock_rdlock(&m.mutex)
+}
+
+pub fn (mut m RwMutex) @lock() {
+ C.pthread_rwlock_wrlock(&m.mutex)
+}
+
+// Windows SRWLocks have different function to unlock
+// So provide two functions here, too, to have a common interface
+pub fn (mut m RwMutex) runlock() {
+ C.pthread_rwlock_unlock(&m.mutex)
+}
+
+pub fn (mut m RwMutex) unlock() {
+ C.pthread_rwlock_unlock(&m.mutex)
+}
+
+[inline]
+pub fn new_semaphore() &Semaphore {
+ return new_semaphore_init(0)
+}
+
+pub fn new_semaphore_init(n u32) &Semaphore {
+ mut sem := &Semaphore{}
+ sem.init(n)
+ return sem
+}
+
+pub fn (mut sem Semaphore) init(n u32) {
+ C.sem_init(&sem.sem, 0, n)
+}
+
+pub fn (mut sem Semaphore) post() {
+ C.sem_post(&sem.sem)
+}
+
+pub fn (mut sem Semaphore) wait() {
+ for {
+ if C.sem_wait(&sem.sem) == 0 {
+ return
+ }
+ e := C.errno
+ match e {
+ C.EINTR {
+ continue // interrupted by signal
+ }
+ else {
+ panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) })
+ }
+ }
+ }
+}
+
+// `try_wait()` should return as fast as possible so error handling is only
+// done when debugging
+pub fn (mut sem Semaphore) try_wait() bool {
+ $if !debug {
+ return C.sem_trywait(&sem.sem) == 0
+ } $else {
+ if C.sem_trywait(&sem.sem) != 0 {
+ e := C.errno
+ match e {
+ C.EAGAIN {
+ return false
+ }
+ else {
+ panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) })
+ }
+ }
+ }
+ return true
+ }
+}
+
+pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
+ t_spec := timeout.timespec()
+ for {
+ if C.sem_timedwait(&sem.sem, &t_spec) == 0 {
+ return true
+ }
+ e := C.errno
+ match e {
+ C.EINTR {
+ continue // interrupted by signal
+ }
+ C.ETIMEDOUT {
+ break
+ }
+ else {
+ panic(unsafe { tos_clone(&byte(C.strerror(e))) })
+ }
+ }
+ }
+ return false
+}
+
+pub fn (sem Semaphore) destroy() {
+ res := C.sem_destroy(&sem.sem)
+ if res == 0 {
+ return
+ }
+ panic(unsafe { tos_clone(&byte(C.strerror(res))) })
+}
diff --git a/v_windows/v/vlib/sync/sync_macos.c.v b/v_windows/v/vlib/sync/sync_macos.c.v
new file mode 100644
index 0000000..3f83198
--- /dev/null
+++ b/v_windows/v/vlib/sync/sync_macos.c.v
@@ -0,0 +1,232 @@
+// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved.
+// Use of this source code is governed by an MIT license
+// that can be found in the LICENSE file.
+module sync
+
+import time
+
+#flag -lpthread
+#include <semaphore.h>
+#include <sys/errno.h>
+
+[trusted]
+fn C.pthread_mutex_init(voidptr, voidptr) int
+fn C.pthread_mutex_lock(voidptr) int
+fn C.pthread_mutex_unlock(voidptr) int
+fn C.pthread_mutex_destroy(voidptr) int
+fn C.pthread_rwlockattr_init(voidptr) int
+fn C.pthread_rwlockattr_setkind_np(voidptr, int) int
+fn C.pthread_rwlockattr_setpshared(voidptr, int) int
+fn C.pthread_rwlock_init(voidptr, voidptr) int
+fn C.pthread_rwlock_rdlock(voidptr) int
+fn C.pthread_rwlock_wrlock(voidptr) int
+fn C.pthread_rwlock_unlock(voidptr) int
+fn C.pthread_condattr_init(voidptr) int
+fn C.pthread_condattr_setpshared(voidptr, int) int
+fn C.pthread_condattr_destroy(voidptr) int
+fn C.pthread_cond_init(voidptr, voidptr) int
+fn C.pthread_cond_signal(voidptr) int
+fn C.pthread_cond_wait(voidptr, voidptr) int
+fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int
+fn C.pthread_cond_destroy(voidptr) int
+
+// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
+[heap]
+pub struct Mutex {
+ mutex C.pthread_mutex_t
+}
+
+[heap]
+pub struct RwMutex {
+ mutex C.pthread_rwlock_t
+}
+
+struct RwMutexAttr {
+ attr C.pthread_rwlockattr_t
+}
+
+struct CondAttr {
+ attr C.pthread_condattr_t
+}
+
+/*
+MacOSX has no unnamed semaphores and no `timed_wait()` at all
+ so we emulate the behaviour with other devices
+*/
+[heap]
+struct Semaphore {
+ mtx C.pthread_mutex_t
+ cond C.pthread_cond_t
+mut:
+ count u32
+}
+
+pub fn new_mutex() &Mutex {
+ mut m := &Mutex{}
+ m.init()
+ return m
+}
+
+pub fn (mut m Mutex) init() {
+ C.pthread_mutex_init(&m.mutex, C.NULL)
+}
+
+pub fn new_rwmutex() &RwMutex {
+ mut m := &RwMutex{}
+ m.init()
+ return m
+}
+
+pub fn (mut m RwMutex) init() {
+ a := RwMutexAttr{}
+ C.pthread_rwlockattr_init(&a.attr)
+ // Give writer priority over readers
+ C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
+ C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE)
+ C.pthread_rwlock_init(&m.mutex, &a.attr)
+}
+
+// @lock(), for *manual* mutex handling, since `lock` is a keyword
+pub fn (mut m Mutex) @lock() {
+ C.pthread_mutex_lock(&m.mutex)
+}
+
+pub fn (mut m Mutex) unlock() {
+ C.pthread_mutex_unlock(&m.mutex)
+}
+
+// RwMutex has separate read- and write locks
+pub fn (mut m RwMutex) @rlock() {
+ C.pthread_rwlock_rdlock(&m.mutex)
+}
+
+pub fn (mut m RwMutex) @lock() {
+ C.pthread_rwlock_wrlock(&m.mutex)
+}
+
+// Windows SRWLocks have different function to unlock
+// So provide two functions here, too, to have a common interface
+pub fn (mut m RwMutex) runlock() {
+ C.pthread_rwlock_unlock(&m.mutex)
+}
+
+pub fn (mut m RwMutex) unlock() {
+ C.pthread_rwlock_unlock(&m.mutex)
+}
+
+[inline]
+pub fn new_semaphore() &Semaphore {
+ return new_semaphore_init(0)
+}
+
+pub fn new_semaphore_init(n u32) &Semaphore {
+ mut sem := &Semaphore{}
+ sem.init(n)
+ return sem
+}
+
+pub fn (mut sem Semaphore) init(n u32) {
+ C.atomic_store_u32(&sem.count, n)
+ C.pthread_mutex_init(&sem.mtx, C.NULL)
+ attr := CondAttr{}
+ C.pthread_condattr_init(&attr.attr)
+ C.pthread_condattr_setpshared(&attr.attr, C.PTHREAD_PROCESS_PRIVATE)
+ C.pthread_cond_init(&sem.cond, &attr.attr)
+ C.pthread_condattr_destroy(&attr.attr)
+}
+
+pub fn (mut sem Semaphore) post() {
+ mut c := C.atomic_load_u32(&sem.count)
+ for c > 1 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) {
+ return
+ }
+ }
+ C.pthread_mutex_lock(&sem.mtx)
+ c = C.atomic_fetch_add_u32(&sem.count, 1)
+ if c == 0 {
+ C.pthread_cond_signal(&sem.cond)
+ }
+ C.pthread_mutex_unlock(&sem.mtx)
+}
+
+pub fn (mut sem Semaphore) wait() {
+ mut c := C.atomic_load_u32(&sem.count)
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ return
+ }
+ }
+ C.pthread_mutex_lock(&sem.mtx)
+ c = C.atomic_load_u32(&sem.count)
+
+ outer: for {
+ if c == 0 {
+ C.pthread_cond_wait(&sem.cond, &sem.mtx)
+ c = C.atomic_load_u32(&sem.count)
+ }
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ if c > 1 {
+ C.pthread_cond_signal(&sem.cond)
+ }
+ break outer
+ }
+ }
+ }
+ C.pthread_mutex_unlock(&sem.mtx)
+}
+
+pub fn (mut sem Semaphore) try_wait() bool {
+ mut c := C.atomic_load_u32(&sem.count)
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ return true
+ }
+ }
+ return false
+}
+
+pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
+ mut c := C.atomic_load_u32(&sem.count)
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ return true
+ }
+ }
+ C.pthread_mutex_lock(&sem.mtx)
+ t_spec := timeout.timespec()
+ mut res := 0
+ c = C.atomic_load_u32(&sem.count)
+
+ outer: for {
+ if c == 0 {
+ res = C.pthread_cond_timedwait(&sem.cond, &sem.mtx, &t_spec)
+ if res == C.ETIMEDOUT {
+ break outer
+ }
+ c = C.atomic_load_u32(&sem.count)
+ }
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ if c > 1 {
+ C.pthread_cond_signal(&sem.cond)
+ }
+ break outer
+ }
+ }
+ }
+ C.pthread_mutex_unlock(&sem.mtx)
+ return res == 0
+}
+
+pub fn (mut sem Semaphore) destroy() {
+ mut res := C.pthread_cond_destroy(&sem.cond)
+ if res == 0 {
+ res = C.pthread_mutex_destroy(&sem.mtx)
+ if res == 0 {
+ return
+ }
+ }
+ panic(unsafe { tos_clone(&byte(C.strerror(res))) })
+}
diff --git a/v_windows/v/vlib/sync/sync_windows.c.v b/v_windows/v/vlib/sync/sync_windows.c.v
new file mode 100644
index 0000000..d0942b7
--- /dev/null
+++ b/v_windows/v/vlib/sync/sync_windows.c.v
@@ -0,0 +1,212 @@
+// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved.
+// Use of this source code is governed by an MIT license
+// that can be found in the LICENSE file.
+module sync
+
+import time
+
+#include <synchapi.h>
+#include <time.h>
+
+fn C.GetSystemTimeAsFileTime(lpSystemTimeAsFileTime &C._FILETIME)
+fn C.InitializeConditionVariable(voidptr)
+fn C.WakeConditionVariable(voidptr)
+fn C.SleepConditionVariableSRW(voidptr, voidptr, u32, u32) int
+
+// TODO: The suggestion of using CriticalSection instead of mutex
+// was discussed. Needs consideration.
+
+// Mutex HANDLE
+type MHANDLE = voidptr
+
+// Semaphore HANDLE
+type SHANDLE = voidptr
+
+//[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
+
+// `SRWLOCK` is much more performant that `Mutex` on Windows, so use that in both cases since we don't want to share with other processes
+[heap]
+pub struct Mutex {
+mut:
+ mx C.SRWLOCK // mutex handle
+}
+
+[heap]
+pub struct RwMutex {
+mut:
+ mx C.SRWLOCK // mutex handle
+}
+
+[heap]
+struct Semaphore {
+ mtx C.SRWLOCK
+ cond C.CONDITION_VARIABLE
+mut:
+ count u32
+}
+
+pub fn new_mutex() &Mutex {
+ mut m := &Mutex{}
+ m.init()
+ return m
+}
+
+pub fn new_rwmutex() &RwMutex {
+ mut m := &RwMutex{}
+ m.init()
+ return m
+}
+
+pub fn (mut m Mutex) init() {
+ C.InitializeSRWLock(&m.mx)
+}
+
+pub fn (mut m RwMutex) init() {
+ C.InitializeSRWLock(&m.mx)
+}
+
+pub fn (mut m Mutex) @lock() {
+ C.AcquireSRWLockExclusive(&m.mx)
+}
+
+pub fn (mut m Mutex) unlock() {
+ C.ReleaseSRWLockExclusive(&m.mx)
+}
+
+// RwMutex has separate read- and write locks
+pub fn (mut m RwMutex) @rlock() {
+ C.AcquireSRWLockShared(&m.mx)
+}
+
+pub fn (mut m RwMutex) @lock() {
+ C.AcquireSRWLockExclusive(&m.mx)
+}
+
+// Windows SRWLocks have different function to unlock
+// So provide two functions here, too, to have a common interface
+pub fn (mut m RwMutex) runlock() {
+ C.ReleaseSRWLockShared(&m.mx)
+}
+
+pub fn (mut m RwMutex) unlock() {
+ C.ReleaseSRWLockExclusive(&m.mx)
+}
+
+pub fn (mut m Mutex) destroy() {
+ // nothing to do
+}
+
+[inline]
+pub fn new_semaphore() &Semaphore {
+ return new_semaphore_init(0)
+}
+
+pub fn new_semaphore_init(n u32) &Semaphore {
+ mut sem := &Semaphore{}
+ sem.init(n)
+ return sem
+}
+
+pub fn (mut sem Semaphore) init(n u32) {
+ C.atomic_store_u32(&sem.count, n)
+ C.InitializeSRWLock(&sem.mtx)
+ C.InitializeConditionVariable(&sem.cond)
+}
+
+pub fn (mut sem Semaphore) post() {
+ mut c := C.atomic_load_u32(&sem.count)
+ for c > 1 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) {
+ return
+ }
+ }
+ C.AcquireSRWLockExclusive(&sem.mtx)
+ c = C.atomic_fetch_add_u32(&sem.count, 1)
+ if c == 0 {
+ C.WakeConditionVariable(&sem.cond)
+ }
+ C.ReleaseSRWLockExclusive(&sem.mtx)
+}
+
+pub fn (mut sem Semaphore) wait() {
+ mut c := C.atomic_load_u32(&sem.count)
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ return
+ }
+ }
+ C.AcquireSRWLockExclusive(&sem.mtx)
+ c = C.atomic_load_u32(&sem.count)
+
+ outer: for {
+ if c == 0 {
+ C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, C.INFINITE, 0)
+ c = C.atomic_load_u32(&sem.count)
+ }
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ if c > 1 {
+ C.WakeConditionVariable(&sem.cond)
+ }
+ break outer
+ }
+ }
+ }
+ C.ReleaseSRWLockExclusive(&sem.mtx)
+}
+
+pub fn (mut sem Semaphore) try_wait() bool {
+ mut c := C.atomic_load_u32(&sem.count)
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ return true
+ }
+ }
+ return false
+}
+
+pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
+ mut c := C.atomic_load_u32(&sem.count)
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ return true
+ }
+ }
+ mut ft_start := C._FILETIME{}
+ C.GetSystemTimeAsFileTime(&ft_start)
+ time_end := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) +
+ u64(timeout / (100 * time.nanosecond))
+ mut t_ms := timeout.sys_milliseconds()
+ C.AcquireSRWLockExclusive(&sem.mtx)
+ mut res := 0
+ c = C.atomic_load_u32(&sem.count)
+
+ outer: for {
+ if c == 0 {
+ res = C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, t_ms, 0)
+ if res == 0 {
+ break outer
+ }
+ c = C.atomic_load_u32(&sem.count)
+ }
+ for c > 0 {
+ if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
+ if c > 1 {
+ C.WakeConditionVariable(&sem.cond)
+ }
+ break outer
+ }
+ }
+ C.GetSystemTimeAsFileTime(&ft_start)
+ time_now := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) // in 100ns
+ if time_now > time_end {
+ break outer // timeout exceeded
+ }
+ t_ms = u32((time_end - time_now) / 10000)
+ }
+ C.ReleaseSRWLockExclusive(&sem.mtx)
+ return res != 0
+}
+
+pub fn (s Semaphore) destroy() {
+}
diff --git a/v_windows/v/vlib/sync/threads/threads.c.v b/v_windows/v/vlib/sync/threads/threads.c.v
new file mode 100644
index 0000000..02506c2
--- /dev/null
+++ b/v_windows/v/vlib/sync/threads/threads.c.v
@@ -0,0 +1,13 @@
+module threads
+
+// This module adds the necessary compiler flags for using threads.
+// It is automatically imported by code that does `go func()` .
+// See vlib/v/parser/pratt.v, search for ast.GoExpr .
+// The goal is that programs, that do not use threads at all will not need
+// to link to -lpthread etc.
+// NB: on some platforms like Android, linking -lpthread is not needed too.
+// See https://stackoverflow.com/a/31277163/1904615
+
+$if !windows && !android {
+ #flag -lpthread
+}
diff --git a/v_windows/v/vlib/sync/threads/threads.v b/v_windows/v/vlib/sync/threads/threads.v
new file mode 100644
index 0000000..f20fc0e
--- /dev/null
+++ b/v_windows/v/vlib/sync/threads/threads.v
@@ -0,0 +1,4 @@
+module threads
+
+// This file is just a placeholder.
+// The actual implementation is backend/platform specific, so see threads.c.v, threads.js.v etc.
diff --git a/v_windows/v/vlib/sync/waitgroup.v b/v_windows/v/vlib/sync/waitgroup.v
new file mode 100644
index 0000000..3e9ac39
--- /dev/null
+++ b/v_windows/v/vlib/sync/waitgroup.v
@@ -0,0 +1,84 @@
+// Copyright (c) 2019 Alexander Medvednikov. All rights reserved.
+// Use of this source code is governed by an MIT license
+// that can be found in the LICENSE file.
+module sync
+
+[trusted]
+fn C.atomic_fetch_add_u32(voidptr, u32) u32
+
+[trusted]
+fn C.atomic_load_u32(voidptr) u32
+
+[trusted]
+fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool
+
+// WaitGroup
+// Do not copy an instance of WaitGroup, use a ref instead.
+//
+// usage: in main thread:
+// `wg := sync.new_waitgroup()
+// `wg.add(nr_jobs)` before starting jobs with `go ...`
+// `wg.wait()` to wait for all jobs to have finished
+//
+// in each parallel job:
+// `wg.done()` when finished
+//
+// [init_with=new_waitgroup] // TODO: implement support for init_with struct attribute, and disallow WaitGroup{} from outside the sync.new_waitgroup() function.
+[heap]
+struct WaitGroup {
+mut:
+ task_count u32 // current task count - reading/writing should be atomic
+ wait_count u32 // current wait count - reading/writing should be atomic
+ sem Semaphore // This blocks wait() until tast_countreleased by add()
+}
+
+pub fn new_waitgroup() &WaitGroup {
+ mut wg := WaitGroup{}
+ wg.init()
+ return &wg
+}
+
+pub fn (mut wg WaitGroup) init() {
+ wg.sem.init(0)
+}
+
+// add increments (+ve delta) or decrements (-ve delta) task count by delta
+// and unblocks any wait() calls if task count becomes zero.
+// add panics if task count drops below zero.
+pub fn (mut wg WaitGroup) add(delta int) {
+ old_nrjobs := int(C.atomic_fetch_add_u32(&wg.task_count, u32(delta)))
+ new_nrjobs := old_nrjobs + delta
+ mut num_waiters := C.atomic_load_u32(&wg.wait_count)
+ if new_nrjobs < 0 {
+ panic('Negative number of jobs in waitgroup')
+ }
+
+ if new_nrjobs == 0 && num_waiters > 0 {
+ // clear waiters
+ for !C.atomic_compare_exchange_weak_u32(&wg.wait_count, &num_waiters, 0) {
+ if num_waiters == 0 {
+ return
+ }
+ }
+ for (num_waiters > 0) {
+ wg.sem.post()
+ num_waiters--
+ }
+ }
+}
+
+// done is a convenience fn for add(-1)
+pub fn (mut wg WaitGroup) done() {
+ wg.add(-1)
+}
+
+// wait blocks until all tasks are done (task count becomes zero)
+pub fn (mut wg WaitGroup) wait() {
+ nrjobs := int(C.atomic_load_u32(&wg.task_count))
+ if nrjobs == 0 {
+ // no need to wait
+ return
+ }
+ C.atomic_fetch_add_u32(&wg.wait_count, 1)
+ wg.sem.wait() // blocks until task_count becomes 0
+}
diff --git a/v_windows/v/vlib/sync/waitgroup_test.v b/v_windows/v/vlib/sync/waitgroup_test.v
new file mode 100644
index 0000000..493665f
--- /dev/null
+++ b/v_windows/v/vlib/sync/waitgroup_test.v
@@ -0,0 +1,41 @@
+module sync
+
+import time
+
+fn test_waitgroup_reuse() {
+ mut wg := new_waitgroup()
+
+ wg.add(1)
+ wg.done()
+
+ wg.add(1)
+ mut executed := false
+ go fn (mut wg WaitGroup, executed voidptr) {
+ defer {
+ wg.done()
+ }
+ unsafe {
+ *(&bool(executed)) = true
+ }
+ time.sleep(100 * time.millisecond)
+ assert wg.wait_count == 1
+ }(mut wg, voidptr(&executed))
+
+ wg.wait()
+ assert executed
+ assert wg.wait_count == 0
+}
+
+fn test_waitgroup_no_use() {
+ mut done := false
+ go fn (done voidptr) {
+ time.sleep(1 * time.second)
+ if *(&bool(done)) == false {
+ panic('test_waitgroup_no_use did not complete in time')
+ }
+ }(voidptr(&done))
+
+ mut wg := new_waitgroup()
+ wg.wait()
+ done = true
+}