diff options
Diffstat (limited to 'v_windows/v/vlib/sync')
39 files changed, 3360 insertions, 0 deletions
diff --git a/v_windows/v/vlib/sync/array_rlock_test.v b/v_windows/v/vlib/sync/array_rlock_test.v new file mode 100644 index 0000000..ad4f778 --- /dev/null +++ b/v_windows/v/vlib/sync/array_rlock_test.v @@ -0,0 +1,38 @@ +fn test_shared_modification() { + shared foo := &[2, 0, 5] + lock foo { + unsafe { + foo[1] = 3 + foo[0] *= 7 + foo[1]-- + foo[2] -= 2 + } + } + rlock foo { + unsafe { + assert foo[0] == 14 + assert foo[1] == 2 + assert foo[2] == 3 + } + } +} + +[direct_array_access] +fn test_shared_direct_modification() { + shared foo := &[2, 0, 5] + lock foo { + unsafe { + foo[1] = 3 + foo[0] *= 7 + foo[1]-- + foo[2] -= 2 + } + } + rlock foo { + unsafe { + assert foo[0] == 14 + assert foo[1] == 2 + assert foo[2] == 3 + } + } +} diff --git a/v_windows/v/vlib/sync/atomic2/atomic.v b/v_windows/v/vlib/sync/atomic2/atomic.v new file mode 100644 index 0000000..2ff64f2 --- /dev/null +++ b/v_windows/v/vlib/sync/atomic2/atomic.v @@ -0,0 +1,88 @@ +module atomic2 + +/* +Implements the atomic operations. For now TCC does not support +the atomic versions on nix so it uses locks to simulate the same behavor. +On windows tcc can simulate with other atomic operations. + +The @VEXEROOT/thirdparty/stdatomic contains compability header files +for stdatomic that supports both nix, windows and c++. + +This implementations should be regarded as alpha stage and be +further tested. +*/ + +#flag windows -I @VEXEROOT/thirdparty/stdatomic/win +#flag linux -I @VEXEROOT/thirdparty/stdatomic/nix +#flag darwin -I @VEXEROOT/thirdparty/stdatomic/nix +#flag freebsd -I @VEXEROOT/thirdparty/stdatomic/nix +#flag solaris -I @VEXEROOT/thirdparty/stdatomic/nix + +$if linux { + $if tinyc { + $if amd64 { + // most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir + #flag -L/usr/lib/gcc/x86_64-linux-gnu/6 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/7 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/8 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/9 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/10 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/11 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/12 + } $else $if arm64 { + #flag -L/usr/lib/gcc/aarch64-linux-gnu/6 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/7 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/8 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/9 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/10 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/11 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/12 + } + #flag -latomic + } +} +#include <atomic.h> +// add_u64 adds provided delta as an atomic operation +pub fn add_u64(ptr &u64, delta int) bool { + res := C.atomic_fetch_add_u64(voidptr(ptr), delta) + return res == 0 +} + +// sub_u64 subtracts provided delta as an atomic operation +pub fn sub_u64(ptr &u64, delta int) bool { + res := C.atomic_fetch_sub_u64(voidptr(ptr), delta) + return res == 0 +} + +// add_i64 adds provided delta as an atomic operation +pub fn add_i64(ptr &i64, delta int) bool { + res := C.atomic_fetch_add_u64(voidptr(ptr), delta) + return res == 0 +} + +// add_i64 subtracts provided delta as an atomic operation +pub fn sub_i64(ptr &i64, delta int) bool { + res := C.atomic_fetch_sub_u64(voidptr(ptr), delta) + return res == 0 +} + +// atomic store/load operations have to be used when there might be another concurrent access +// atomicall set a value +pub fn store_u64(ptr &u64, val u64) { + C.atomic_store_u64(voidptr(ptr), val) +} + +// atomicall get a value +pub fn load_u64(ptr &u64) u64 { + return C.atomic_load_u64(voidptr(ptr)) +} + +// atomicall set a value +pub fn store_i64(ptr &i64, val i64) { + C.atomic_store_u64(voidptr(ptr), val) +} + +// atomicall get a value +pub fn load_i64(ptr &i64) i64 { + return i64(C.atomic_load_u64(voidptr(ptr))) +} diff --git a/v_windows/v/vlib/sync/atomic2/atomic_test.v b/v_windows/v/vlib/sync/atomic2/atomic_test.v new file mode 100644 index 0000000..7a5ffd8 --- /dev/null +++ b/v_windows/v/vlib/sync/atomic2/atomic_test.v @@ -0,0 +1,105 @@ +import sync.atomic2 +import sync + +const ( + iterations_per_cycle = 100_000 +) + +struct Counter { +mut: + counter u64 +} + +// without proper syncronization this would fail +fn test_count_10_times_1_cycle_should_result_10_cycles_with_sync() { + desired_iterations := 10 * iterations_per_cycle + mut wg := sync.new_waitgroup() + mut counter := &Counter{} + wg.add(10) + for i := 0; i < 10; i++ { + go count_one_cycle(mut counter, mut wg) + } + wg.wait() + assert counter.counter == desired_iterations + eprintln(' with synchronization the counter is: ${counter.counter:10} , expectedly == ${desired_iterations:10}') +} + +// This test just to make sure that we have an anti-test to prove it works +fn test_count_10_times_1_cycle_should_not_be_10_cycles_without_sync() { + desired_iterations := 10 * iterations_per_cycle + mut wg := sync.new_waitgroup() + mut counter := &Counter{} + wg.add(10) + for i := 0; i < 10; i++ { + go count_one_cycle_without_sync(mut counter, mut wg) + } + wg.wait() + // NB: we do not assert here, just print, because sometimes by chance counter.counter may be == desired_iterations + eprintln('without synchronization the counter is: ${counter.counter:10} , expectedly != ${desired_iterations:10}') +} + +fn test_count_plus_one_u64() { + mut c := u64(0) + atomic2.add_u64(&c, 1) + assert atomic2.load_u64(&c) == 1 +} + +fn test_count_plus_one_i64() { + mut c := i64(0) + atomic2.add_i64(&c, 1) + assert atomic2.load_i64(&c) == 1 +} + +fn test_count_plus_greater_than_one_u64() { + mut c := u64(0) + atomic2.add_u64(&c, 10) + assert atomic2.load_u64(&c) == 10 +} + +fn test_count_plus_greater_than_one_i64() { + mut c := i64(0) + atomic2.add_i64(&c, 10) + assert atomic2.load_i64(&c) == 10 +} + +fn test_count_minus_one_u64() { + mut c := u64(1) + atomic2.sub_u64(&c, 1) + assert atomic2.load_u64(&c) == 0 +} + +fn test_count_minus_one_i64() { + mut c := i64(0) + atomic2.sub_i64(&c, 1) + assert atomic2.load_i64(&c) == -1 +} + +fn test_count_minus_greater_than_one_u64() { + mut c := u64(0) + atomic2.store_u64(&c, 10) + atomic2.sub_u64(&c, 10) + assert atomic2.load_u64(&c) == 0 +} + +fn test_count_minus_greater_than_one_i64() { + mut c := i64(0) + atomic2.store_i64(&c, 10) + atomic2.sub_i64(&c, 20) + assert atomic2.load_i64(&c) == -10 +} + +// count_one_cycle counts the common counter iterations_per_cycle times in thread-safe way +fn count_one_cycle(mut counter Counter, mut group sync.WaitGroup) { + for i := 0; i < iterations_per_cycle; i++ { + atomic2.add_u64(&counter.counter, 1) + } + group.done() +} + +// count_one_cycle_without_sync counts the common counter iterations_per_cycle times in none thread-safe way +fn count_one_cycle_without_sync(mut counter Counter, mut group sync.WaitGroup) { + for i := 0; i < iterations_per_cycle; i++ { + counter.counter++ + } + group.done() +} diff --git a/v_windows/v/vlib/sync/bench/channel_bench_go.go b/v_windows/v/vlib/sync/bench/channel_bench_go.go new file mode 100644 index 0000000..a0afbbc --- /dev/null +++ b/v_windows/v/vlib/sync/bench/channel_bench_go.go @@ -0,0 +1,68 @@ +package main + +import "fmt" +import "log" +import "os" +import "time" +import "strconv" + +func assert_eq(a, b int64) { + if a != b { + log.Fatalf("assertion failed\nleft: %d, right: %d\n", a, b) + } +} + +func do_rec(ch chan int32, resch chan int64, n int32) { + var sum int64 + var i int32 + for i = 0; i < n; i++ { + sum += int64(<- ch) + } + fmt.Println(sum) + resch <- sum +} + +func do_send(ch chan int32, start, end int32) { + for i := start; i < end; i++ { + ch <- i + } +} + +func main() { + if len(os.Args) != 5 { + log.Fatalf("usage:\n\t%s <nsend> <nrec> <buflen> <nobj>\n", os.Args[0]) + } + nsend, _ := strconv.Atoi(os.Args[1]) + nrec, _ := strconv.Atoi(os.Args[2]) + buflen, _ := strconv.Atoi(os.Args[3]) + nobj, _ := strconv.Atoi(os.Args[4]) + stopwatch := time.Now() + ch := make(chan int32, buflen) + resch := make(chan int64, 0) + no := nobj + for i := 0; i < nrec; i++ { + n := no / (nrec - i) + go do_rec(ch, resch, int32(n)) + no -= n + } + assert_eq(int64(no), 0) + no = nobj + for i := 0; i < nsend; i++ { + n := no / (nsend - i) + end := no + no -= n + go do_send(ch, int32(no), int32(end)) + } + assert_eq(int64(no), 0) + var sum int64 + for i := 0; i < nrec; i++ { + sum += <-resch + } + elapsed := time.Now().Sub(stopwatch) + rate := float64(nobj)/float64(elapsed.Nanoseconds())*1000.0 + duration := 1.0e-09 * float64(elapsed.Nanoseconds()) + fmt.Printf("%d objects in %g s (%.2f objs/µs)\n", nobj, duration, rate) + expected_sum := int64(nobj)*int64(nobj-1)/2 + fmt.Printf("got: %d, expected: %d\n", sum, expected_sum) + assert_eq(sum, expected_sum) +} diff --git a/v_windows/v/vlib/sync/bench/channel_bench_v.v b/v_windows/v/vlib/sync/bench/channel_bench_v.v new file mode 100644 index 0000000..54dcfe9 --- /dev/null +++ b/v_windows/v/vlib/sync/bench/channel_bench_v.v @@ -0,0 +1,64 @@ +// Channel Benchmark +// +// `nobj` integers are sent thru a channel with queue length`buflen` +// using `nsend` sender threads and `nrec` receiver threads. +// +// The receive threads add all received numbers and send them to the +// main thread where the total sum is compare to the expected value. +import time +import os + +fn do_rec(ch chan int, resch chan i64, n int) { + mut sum := i64(0) + for _ in 0 .. n { + sum += <-ch + } + println(sum) + resch <- sum +} + +fn do_send(ch chan int, start int, end int) { + for i in start .. end { + ch <- i + } +} + +fn main() { + if os.args.len != 5 { + eprintln('usage:\n\t${os.args[0]} <nsend> <nrec> <buflen> <nobj>') + exit(1) + } + nsend := os.args[1].int() + nrec := os.args[2].int() + buflen := os.args[3].int() + nobj := os.args[4].int() + stopwatch := time.new_stopwatch() + ch := chan int{cap: buflen} + resch := chan i64{} + mut no := nobj + for i in 0 .. nrec { + n := no / (nrec - i) + go do_rec(ch, resch, n) + no -= n + } + assert no == 0 + no = nobj + for i in 0 .. nsend { + n := no / (nsend - i) + end := no + no -= n + go do_send(ch, no, end) + } + assert no == 0 + mut sum := i64(0) + for _ in 0 .. nrec { + sum += <-resch + } + elapsed := stopwatch.elapsed() + rate := f64(nobj) / elapsed * time.microsecond + println('$nobj objects in ${f64(elapsed) / time.second} s (${rate:.2f} objs/µs)') + // use sum formula by Gauß to calculate the expected result + expected_sum := i64(nobj) * (nobj - 1) / 2 + println('got: $sum, expected: $expected_sum') + assert sum == expected_sum +} diff --git a/v_windows/v/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v b/v_windows/v/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v new file mode 100644 index 0000000..999bb1d --- /dev/null +++ b/v_windows/v/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v @@ -0,0 +1,150 @@ +import os +import os.cmdline +import time +import sync + +// Usage: +// many_writers_and_receivers_on_1_channel [-readers 1] [-writers 4] [-chan_cap 100] [-iterations 25000] > results.csv +// +// You can then open results.csv in Excel/Calc and for example plot the first vs the second column. +enum EventKind { + push + pop +} + +struct Event { + is_set bool + id int + gtime u64 // nanoseconds + i int + kind EventKind + elapsed i64 // nanoseconds, elapsed after the previous event of the same kind +} + +struct Context { +mut: + n_iters int + n_readers int + n_writers int + // + pops_wg &sync.WaitGroup + pops []Event + // + pushes_wg &sync.WaitGroup + pushes []Event +} + +fn do_rec(ch chan int, id int, mut ctx Context) { + eprintln('start of do_rec id: $id') + mut timer_sw_x := time.new_stopwatch() + mut tmp := int(0) + mut i := int(0) + // NB: a single receiver thread can get slightly more + // than its fair share of sends, that is why + // the receiver's Event array is much larger, + // enough so a single receiver can potentially process all + // writers pushes, and it is partitioned over all of + // id, ctx.n_writers and n_iters: + n_iters := ctx.n_iters + base := id * n_iters * ctx.n_writers + for { + for ch.try_pop(tmp) == .success { + ctx.pops[base + i] = Event{ + is_set: true + id: id + gtime: time.sys_mono_now() + i: i + kind: .pop + elapsed: timer_sw_x.elapsed().nanoseconds() + } + timer_sw_x.restart() + i++ + if tmp == 1 { + ctx.pops_wg.done() + return + } + } + } +} + +fn do_send(ch chan int, id int, mut ctx Context) { + eprintln('start of do_send id: $id') + mut timer_sw_x := time.new_stopwatch() + n_iters := ctx.n_iters + base := n_iters * id // sender events can not overlap + for i := 0; i < n_iters; i++ { + idx := base + i + ctx.pushes[idx] = Event{ + is_set: true + id: id + gtime: time.sys_mono_now() + i: i + kind: .push + elapsed: timer_sw_x.elapsed().nanoseconds() + } + timer_sw_x.restart() + tmp := int(0) + ch <- tmp + } + ctx.pushes_wg.done() +} + +fn main() { + // + args := os.args[1..] + if '-h' in args || '--help' in args { + eprintln('Usage:\n many_writers_and_receivers_on_1_channel [-readers 1] [-writers 4] [-chan_cap 100] [-iterations 25000]') + exit(0) + } + n_iters := cmdline.option(args, '-iterations', '25000').int() + n_readers := cmdline.option(args, '-readers', '1').int() + n_writers := cmdline.option(args, '-writers', '4').int() + chan_cap := cmdline.option(args, '-chan_cap', '100').int() + eprintln('> n_iters, $n_iters, n_writers, $n_writers, n_readers, $n_readers, chan_cap, $chan_cap') + // + ch := chan int{cap: chan_cap} + max_number_of_pushes := n_writers * (n_iters + 2) + max_number_of_pops := max_number_of_pushes * n_readers + eprintln('> max_number_of_pushes, $max_number_of_pushes, max_number_of_pops (per receiver), $max_number_of_pops') + mut ctx := &Context{ + n_iters: n_iters + n_readers: n_readers + n_writers: n_writers + pushes_wg: sync.new_waitgroup() + pops_wg: sync.new_waitgroup() + pushes: []Event{len: max_number_of_pushes} + pops: []Event{len: max_number_of_pops} + } + ctx.pops_wg.add(n_readers) + for i := 0; i < n_readers; i++ { + go do_rec(ch, i, mut ctx) + } + ctx.pushes_wg.add(n_writers) + for i := 0; i < n_writers; i++ { + go do_send(ch, i, mut ctx) + } + ctx.pushes_wg.wait() + eprintln('>> all pushes done') + for i := 0; i < n_readers; i++ { + ch <- 1 + } + ctx.pops_wg.wait() + eprintln('>> all pops done') + mut all_events := []Event{} + all_events << ctx.pops + all_events << ctx.pushes + all_events.sort(a.elapsed < b.elapsed) + mut i := 0 + for e in all_events { + if !e.is_set { + continue + } + i++ + if e.kind == .pop { + println('${i:8} , ${e.elapsed:10}, ns , do_rec id:, ${e.id:3} , i=, ${e.i:5} , ${e.gtime:20}') + } + if e.kind == .push { + println('${i:8} , ${e.elapsed:10}, ns , do_send id:, ${e.id:3} , i=, ${e.i:5} , ${e.gtime:20}') + } + } +} diff --git a/v_windows/v/vlib/sync/bench/results.md b/v_windows/v/vlib/sync/bench/results.md new file mode 100644 index 0000000..882471c --- /dev/null +++ b/v_windows/v/vlib/sync/bench/results.md @@ -0,0 +1,48 @@ +# Channel Benchmark Results + +This documents lists several benchmark results for different platforms in order to +identify performance regressions and improvements. + +The are measured using the command + +``` +> channel_bench_* <nsend> <nrec> <buflen> <nobj> + +nsend ... number of threads that push objects into the channel +nrec .... number of threads that pop objects from the channel +buflen .. length of channel buffer queue - `0` means unbuffered channel +nobj .... number of objects to pass thru the channel +``` + +## AMD Ryzen 7 3800X, Ubuntu-20.04 x86_64 + +10000000 Objects transfered, results in Objects/µs + +| nsend | nrec | buflen | **V (gcc -O2)** | **V (clang)** | **V (tcc)** | **Go (golang)** | **Go (gccgo -O2)** | +| :---: | :---:| :---: | :---: | :---: | :---: | :---: | :---: | +| 1 | 1 | 0 | 1.97 | 1.63 | 2.08 | 4.65 | 0.56 | +| 1 | 1 | 100 | 3.05 | 2.29 | 1.93 | 18.90 | 6.08 | +| 4 | 4 | 0 | 0.87 | 0.90 | 0.99 | 1.84 | 0.84 | +| 4 | 4 | 100 | 3.35 | 3.07 | 2.92 | 7.43 | 3.71 | + +## AMD Ryzen 7 3800X, Windows 10 2004 x64 + +| nsend | nrec | buflen | **V (gcc -O2)** | **V (msvc /O2)** | **V (tcc)** | **Go (golang)** | +| :---: | :---:| :---: | :---: | :---: | :---: | :---: | +| 1 | 1 | 0 | 2.30 | 3.76 | 2.02 | 4.67 | +| 1 | 1 | 100 | 2.96 | 3.12 | 2.26 | 23.31 | +| 4 | 4 | 0 | 0.90 | 1.05 | 0.83 | 1.38 | +| 4 | 4 | 100 | 2.28 | 2.16 | 2.43 | 4.63 | + +## Raspberry Pi 3B+, Void Linux musl 32 bit + +10000000 Objects transfered, results in Objects/µs + +| nsend | nrec | buflen | **V (gcc -O2)** | **Go (golang)** | +| :---: | :---:| :---: | :---: | :---: | +| 1 | 1 | 0 | 0.37 | 0.21 | +| 1 | 1 | 100 | 1.03 | 0.74 | +| 4 | 4 | 0 | 0.04 | 0.38 | +| 4 | 4 | 100 | 2.78 | 2.63 | +| 2 | 2 | 0 | 0.05 | 0.38 | +| 2 | 2 | 100 | 1.26 | 0.75 | diff --git a/v_windows/v/vlib/sync/channel_1_test.v b/v_windows/v/vlib/sync/channel_1_test.v new file mode 100644 index 0000000..17588fd --- /dev/null +++ b/v_windows/v/vlib/sync/channel_1_test.v @@ -0,0 +1,25 @@ +const ( + num_iterations = 10000 +) + +fn do_send(ch chan int) { + for i in 0 .. num_iterations { + ch <- i + } +} + +fn test_channel_buffered() { + ch := chan int{cap: 1000} + go do_send(ch) + mut sum := i64(0) + for _ in 0 .. num_iterations { + sum += <-ch + } + assert sum == u64(num_iterations) * (num_iterations - 1) / 2 +} + +fn test_builtin_enum() { + x := ChanState.closed + assert x == .closed + println(x) +} diff --git a/v_windows/v/vlib/sync/channel_2_test.v b/v_windows/v/vlib/sync/channel_2_test.v new file mode 100644 index 0000000..5e8251d --- /dev/null +++ b/v_windows/v/vlib/sync/channel_2_test.v @@ -0,0 +1,19 @@ +const ( + num_iterations = 10000 +) + +fn do_send(ch chan int) { + for i in 0 .. num_iterations { + ch <- i + } +} + +fn test_channel_unbuffered() { + ch := chan int{} + go do_send(ch) + mut sum := i64(0) + for _ in 0 .. num_iterations { + sum += <-ch + } + assert sum == u64(num_iterations) * (num_iterations - 1) / 2 +} diff --git a/v_windows/v/vlib/sync/channel_3_test.v b/v_windows/v/vlib/sync/channel_3_test.v new file mode 100644 index 0000000..d07276b --- /dev/null +++ b/v_windows/v/vlib/sync/channel_3_test.v @@ -0,0 +1,32 @@ +fn do_rec(ch chan int, resch chan i64) { + mut sum := i64(0) + for _ in 0 .. 2000 { + sum += <-ch + } + println(sum) + resch <- sum +} + +fn do_send(ch chan int) { + for i in 0 .. 2000 { + ch <- i + } +} + +fn test_channel_multi_unbuffered() { + ch := chan int{} + resch := chan i64{} + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_send(ch) + go do_send(ch) + go do_send(ch) + go do_send(ch) + mut sum := i64(0) + for _ in 0 .. 4 { + sum += <-resch + } + assert sum == i64(4) * 2000 * (2000 - 1) / 2 +} diff --git a/v_windows/v/vlib/sync/channel_4_test.v b/v_windows/v/vlib/sync/channel_4_test.v new file mode 100644 index 0000000..3792668 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_4_test.v @@ -0,0 +1,32 @@ +fn do_rec(ch chan int, resch chan i64) { + mut sum := i64(0) + for _ in 0 .. 2000 { + sum += <-ch + } + println(sum) + resch <- sum +} + +fn do_send(ch chan int) { + for i in 0 .. 2000 { + ch <- i + } +} + +fn test_channel_multi_buffered() { + ch := chan int{cap: 100} + resch := chan i64{} + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_send(ch) + go do_send(ch) + go do_send(ch) + go do_send(ch) + mut sum := i64(0) + for _ in 0 .. 4 { + sum += <-resch + } + assert sum == i64(4) * 2000 * (2000 - 1) / 2 +} diff --git a/v_windows/v/vlib/sync/channel_array_mut_test.v b/v_windows/v/vlib/sync/channel_array_mut_test.v new file mode 100644 index 0000000..bfd53a1 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_array_mut_test.v @@ -0,0 +1,35 @@ +const ( + num_iterations = 10000 +) + +struct St { +mut: + dummy i64 + dummy2 u32 + dummy3 i64 + n int + dummy4 int +} + +// this function gets an array of channels for `St` references +fn do_rec_calc_send(chs []chan mut St) { + for { + mut s := <-chs[0] or { break } + s.n++ + chs[1] <- s + } +} + +fn test_channel_array_mut() { + mut chs := [chan mut St{cap: 1}, chan mut St{}] + go do_rec_calc_send(chs) + mut t := &St{ + n: 100 + } + for _ in 0 .. num_iterations { + chs[0] <- t + t = <-chs[1] + } + chs[0].close() + assert t.n == 100 + num_iterations +} diff --git a/v_windows/v/vlib/sync/channel_close_test.v b/v_windows/v/vlib/sync/channel_close_test.v new file mode 100644 index 0000000..a31bfa9 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_close_test.v @@ -0,0 +1,104 @@ +import time + +fn do_rec(ch chan int, resch chan i64) { + mut sum := i64(0) + for { + a := <-ch or { break } + sum += a + } + assert ch.closed == true + println(sum) + resch <- sum +} + +fn do_send(ch chan int) { + for i in 0 .. 8000 { + ch <- i + } + assert ch.closed == false + ch.close() + assert ch.closed == true +} + +fn test_channel_close_buffered_multi() { + ch := chan int{cap: 10} + resch := chan i64{} + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_send(ch) + mut sum := i64(0) + for _ in 0 .. 4 { + sum += <-resch + } + assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_unbuffered_multi() { + ch := chan int{} + resch := chan i64{} + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_rec(ch, resch) + go do_send(ch) + mut sum := i64(0) + for _ in 0 .. 4 { + sum += <-resch + } + assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_buffered() { + ch := chan int{cap: 100} + resch := chan i64{} + go do_rec(ch, resch) + go do_send(ch) + mut sum := i64(0) + sum += <-resch + assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_unbuffered() { + ch := chan int{} + resch := chan i64{cap: 100} + go do_rec(ch, resch) + go do_send(ch) + mut sum := i64(0) + sum += <-resch + assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_send_close_buffered() { + ch := chan int{cap: 1} + t := go fn (ch chan int) { + ch <- 31 + mut x := 45 + ch <- 17 or { x = -133 } + + assert x == -133 + }(ch) + time.sleep(100 * time.millisecond) + ch.close() + mut r := <-ch + r = <-ch or { 23 } + assert r == 23 + t.wait() +} + +fn test_channel_send_close_unbuffered() { + time.sleep(1 * time.second) + ch := chan int{} + t := go fn (ch chan int) { + mut x := 31 + ch <- 177 or { x = -71 } + + assert x == -71 + }(ch) + time.sleep(100 * time.millisecond) + ch.close() + r := <-ch or { 238 } + assert r == 238 + t.wait() +} diff --git a/v_windows/v/vlib/sync/channel_fill_test.v b/v_windows/v/vlib/sync/channel_fill_test.v new file mode 100644 index 0000000..b4eabc0 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_fill_test.v @@ -0,0 +1,22 @@ +import sync + +const ( + queue_len = 1000 + queue_fill = 763 +) + +fn do_send(ch chan int, mut fin sync.Semaphore) { + for i in 0 .. queue_fill { + ch <- i + } + fin.post() +} + +fn test_channel_len_cap() { + ch := chan int{cap: queue_len} + mut sem := sync.new_semaphore() + go do_send(ch, mut sem) + sem.wait() + assert ch.cap == queue_len + assert ch.len == queue_fill +} diff --git a/v_windows/v/vlib/sync/channel_opt_propagate_test.v b/v_windows/v/vlib/sync/channel_opt_propagate_test.v new file mode 100644 index 0000000..a7e26fe --- /dev/null +++ b/v_windows/v/vlib/sync/channel_opt_propagate_test.v @@ -0,0 +1,39 @@ +import sync + +const ( + num_iterations = 10000 +) + +fn get_val_from_chan(ch chan i64) ?i64 { + r := <-ch ? + return r +} + +// this function gets an array of channels for `i64` +fn do_rec_calc_send(chs []chan i64, mut sem sync.Semaphore) { + mut msg := '' + for { + mut s := get_val_from_chan(chs[0]) or { + msg = err.msg + break + } + s++ + chs[1] <- s + } + assert msg == 'channel closed' + sem.post() +} + +fn test_channel_array_mut() { + mut chs := [chan i64{}, chan i64{cap: 10}] + mut sem := sync.new_semaphore() + go do_rec_calc_send(chs, mut sem) + mut t := i64(100) + for _ in 0 .. num_iterations { + chs[0] <- t + t = <-chs[1] + } + chs[0].close() + sem.wait() + assert t == 100 + num_iterations +} diff --git a/v_windows/v/vlib/sync/channel_polling_test.v b/v_windows/v/vlib/sync/channel_polling_test.v new file mode 100644 index 0000000..846dcfd --- /dev/null +++ b/v_windows/v/vlib/sync/channel_polling_test.v @@ -0,0 +1,56 @@ +// Channel Benchmark +// +// `nobj` integers are sent thru a channel with queue length`buflen` +// using `nsend` sender threads and `nrec` receiver threads. +// +// The receive threads add all received numbers and send them to the +// main thread where the total sum is compare to the expected value. +const ( + nsend = 2 + nrec = 2 + buflen = 100 + nobj = 10000 + objs_per_thread = 5000 +) + +fn do_rec(ch chan int, resch chan i64, n int) { + mut sum := i64(0) + for _ in 0 .. n { + mut r := 0 + for ch.try_pop(mut r) != .success { + } + sum += r + } + println(sum) + resch <- sum +} + +fn do_send(ch chan int, start int, end int) { + for i in start .. end { + for ch.try_push(i) != .success { + } + } +} + +fn test_channel_polling() { + ch := chan int{cap: buflen} + resch := chan i64{} + for _ in 0 .. nrec { + go do_rec(ch, resch, objs_per_thread) + } + mut n := nobj + for _ in 0 .. nsend { + end := n + n -= objs_per_thread + go do_send(ch, n, end) + } + mut sum := i64(0) + for _ in 0 .. nrec { + sum += <-resch + println('> running sum: $sum') + } + // use sum formula by Gauß to calculate the expected result + expected_sum := i64(nobj) * (nobj - 1) / 2 + println('expected sum: $expected_sum | sum: $sum') + assert sum == expected_sum +} diff --git a/v_windows/v/vlib/sync/channel_push_or_1_test.v b/v_windows/v/vlib/sync/channel_push_or_1_test.v new file mode 100644 index 0000000..1551d83 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_push_or_1_test.v @@ -0,0 +1,65 @@ +const n = 1000 + +const c = 100 + +fn f(ch chan int) { + for _ in 0 .. n { + _ := <-ch + } + ch.close() +} + +fn test_push_or_unbuffered() { + ch := chan int{} + go f(ch) + mut j := 0 + for { + ch <- j or { break } + + j++ + } + assert j == n +} + +fn test_push_or_buffered() { + ch := chan int{cap: c} + go f(ch) + mut j := 0 + for { + ch <- j or { break } + + j++ + } + // we don't know how many elements are in the buffer when the channel + // is closed, so check j against an interval + assert j >= n + assert j <= n + c +} + +fn g(ch chan int, res chan int) { + mut j := 0 + for { + ch <- j or { break } + + j++ + } + println('done $j') + res <- j +} + +fn test_many_senders() { + ch := chan int{} + res := chan int{} + go g(ch, res) + go g(ch, res) + go g(ch, res) + mut k := 0 + for _ in 0 .. 3 * n { + k = <-ch + } + ch.close() + mut sum := <-res + sum += <-res + sum += <-res + assert sum == 3 * n +} diff --git a/v_windows/v/vlib/sync/channel_push_or_2_test.v b/v_windows/v/vlib/sync/channel_push_or_2_test.v new file mode 100644 index 0000000..451d9e7 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_push_or_2_test.v @@ -0,0 +1,25 @@ +const n = 1000 + +fn f(ch chan f64) { + mut s := 0.0 + for _ in 0 .. n { + s += <-ch + } + assert s == f64(n * (n + 1) / 2) + ch.close() +} + +fn do_send(ch chan f64, val f64) ?f64 { + ch <- val ? + return val + 1.0 +} + +fn test_push_propargate() { + ch := chan f64{} + go f(ch) + mut s := 1.0 + for { + s = do_send(ch, s) or { break } + } + assert s == f64(n + 1) +} diff --git a/v_windows/v/vlib/sync/channel_select_2_test.v b/v_windows/v/vlib/sync/channel_select_2_test.v new file mode 100644 index 0000000..189aaf6 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_select_2_test.v @@ -0,0 +1,62 @@ +import time + +fn do_rec_i64(ch chan i64) { + mut sum := i64(0) + for _ in 0 .. 300 { + sum += <-ch + } + assert sum == 300 * (300 - 1) / 2 +} + +fn do_send_int(ch chan int) { + for i in 0 .. 300 { + ch <- i + } +} + +fn do_send_byte(ch chan byte) { + for i in 0 .. 300 { + ch <- byte(i) + } +} + +fn do_send_i64(ch chan i64) { + for i in 0 .. 300 { + ch <- i + } +} + +fn test_select() { + chi := chan int{} + chl := chan i64{cap: 1} + chb := chan byte{cap: 10} + recch := chan i64{cap: 0} + go do_rec_i64(recch) + go do_send_int(chi) + go do_send_byte(chb) + go do_send_i64(chl) + mut sum := i64(0) + mut rl := i64(0) + mut sl := i64(0) + for _ in 0 .. 1200 { + select { + ri := <-chi { + sum += ri + } + recch <- sl { + sl++ + } + rl = <-chl { + sum += rl + } + rb := <-chb { + sum += rb + } + } + } + // Use Gauß' formula for the first 2 contributions + // the 3rd contribution is `byte` and must be seen modulo 256 + expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2 + assert sum == expected_sum + time.sleep(20 * time.millisecond) // to give assert in coroutine enough time +} diff --git a/v_windows/v/vlib/sync/channel_select_3_test.v b/v_windows/v/vlib/sync/channel_select_3_test.v new file mode 100644 index 0000000..fdf6096 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_select_3_test.v @@ -0,0 +1,122 @@ +import time +import sync + +struct St { + a int +} + +fn getint() int { + return 8 +} + +fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, mut sem sync.Semaphore) { + mut a := 5 + select { + a = <-ch3 { + a = 0 + } + b := <-ch2 { + a = b.a + } + ch3 <- 5 { + a = 1 + } + ch2 <- St{ + a: 37 + } { + a = 2 + } + ch4 <- (6 + 7 * 9) { + a = 8 + } + ch5 <- getint() { + a = 9 + } + 300 * time.millisecond { + a = 3 + } + } + assert a == 3 + sem.post() +} + +fn f2(ch1 chan St, ch2 chan int, mut sem sync.Semaphore) { + mut r := 23 + for i in 0 .. 2 { + select { + b := <-ch1 { + r = b.a + } + ch2 <- r { + r = 17 + } + } + if i == 0 { + assert r == 17 + } else { + assert r == 13 + } + } + sem.post() +} + +fn test_select_blocks() { + ch1 := chan int{cap: 1} + ch2 := chan St{} + ch3 := chan int{} + ch4 := chan int{} + ch5 := chan int{} + mut sem := sync.new_semaphore() + mut r := false + t := select { + b := <-ch1 { + println(b) + } + else { + // no channel ready + r = true + } + } + assert r == true + assert t == true + go f2(ch2, ch3, mut sem) + n := <-ch3 + assert n == 23 + ch2 <- St{ + a: 13 + } + sem.wait() + stopwatch := time.new_stopwatch() + go f1(ch1, ch2, ch3, ch4, ch5, mut sem) + sem.wait() + elapsed_ms := f64(stopwatch.elapsed()) / time.millisecond + // https://docs.microsoft.com/en-us/windows-hardware/drivers/kernel/high-resolution-timers + // > For example, for Windows running on an x86 processor, the default interval between + // > system clock ticks is typically about 15 milliseconds, and the minimum interval + // > between system clock ticks is about 1 millisecond. + assert elapsed_ms >= 280.0 // 300 - (15ms + 5ms just in case) + + ch1.close() + ch2.close() + mut h := 7 + mut is_open := true + if select { + _ := <-ch2 { + h = 0 + } + ch1 <- h { + h = 1 + } + else { + h = 2 + } + } { + panic('channel is still open') + } else { + is_open = false + } + // no branch should have run + assert h == 7 + // since all channels are closed `select` should return `false` + assert is_open == false +} diff --git a/v_windows/v/vlib/sync/channel_select_4_test.v b/v_windows/v/vlib/sync/channel_select_4_test.v new file mode 100644 index 0000000..77cd557 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_select_4_test.v @@ -0,0 +1,43 @@ +fn do_rec_i64(ch chan i64, sumch chan i64) { + mut sum := i64(0) + for _ in 0 .. 30000 { + sum += <-ch + } + sumch <- sum +} + +fn do_send_int(ch chan int) { + for i in 0 .. 30000 { + ch <- i + } +} + +fn test_select() { + chi := chan int{cap: 10} + recch := chan i64{cap: 10} + chsum := chan i64{} + go do_rec_i64(recch, chsum) + go do_send_int(chi) + mut sum := i64(0) + mut sl := i64(0) + for _ in 0 .. 60000 + recch.cap { + select { + ri := <-chi { + sum += ri + } + recch <- sl { + sl++ + } + } + } + // Use Gauß' formula + expected_sum := i64(30000) * (30000 - 1) / 2 + assert sum == expected_sum + + mut sumrec := <-chsum + // Empty receive buffer + for _ in 0 .. recch.cap { + sumrec += <-recch + } + assert sumrec == i64(30000 + recch.cap) * (30000 + recch.cap - 1) / 2 +} diff --git a/v_windows/v/vlib/sync/channel_select_5_test.v b/v_windows/v/vlib/sync/channel_select_5_test.v new file mode 100644 index 0000000..97228fe --- /dev/null +++ b/v_windows/v/vlib/sync/channel_select_5_test.v @@ -0,0 +1,61 @@ +fn do_rec_i64(ch chan i64, sumch chan i64) { + mut sum := i64(0) + for _ in 0 .. 10000 { + sum += <-ch + } + sumch <- sum +} + +fn do_send_int(ch chan int) { + for i in 0 .. 10000 { + ch <- i + } +} + +fn do_send_int2(ch chan int) { + for i in 10000 .. 20000 { + ch <- i + } +} + +fn do_send_int3(ch chan int) { + for i in 20000 .. 30000 { + ch <- i + } +} + +fn test_select() { + chi := chan int{cap: 10} + recch := chan i64{cap: 10} + chsum := chan i64{} + go do_rec_i64(recch, chsum) + go do_rec_i64(recch, chsum) + go do_rec_i64(recch, chsum) + go do_send_int(chi) + go do_send_int2(chi) + go do_send_int3(chi) + mut sum := i64(0) + mut sl := i64(0) + for _ in 0 .. 60000 + recch.cap { + select { + ri := <-chi { + sum += ri + } + recch <- sl { + sl++ + } + } + } + // Use Gauß' formula + expected_sum := i64(30000) * (30000 - 1) / 2 + assert sum == expected_sum + + mut sumrec := <-chsum + sumrec += <-chsum + sumrec += <-chsum + // Empty receive buffer + for _ in 0 .. recch.cap { + sumrec += <-recch + } + assert sumrec == i64(30000 + recch.cap) * (30000 + recch.cap - 1) / 2 +} diff --git a/v_windows/v/vlib/sync/channel_select_6_test.v b/v_windows/v/vlib/sync/channel_select_6_test.v new file mode 100644 index 0000000..9b5f2d4 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_select_6_test.v @@ -0,0 +1,75 @@ +// This test case runs concurrent 3 instances of `do_select` that +// communicate with 6 other threads doing send and receive operations. +// There are buffered and unbuffered channels - handled by one or two +// concurrend threads on the other side + +fn do_select(ch1 chan int, ch2 chan int, chf1 chan f64, chf2 chan f64, sumch1 chan i64, sumch2 chan i64) { + mut sum1 := i64(0) + mut sum2 := i64(0) + f1 := 17.0 + f2 := 7.0 + for _ in 0 .. 20000 + chf1.cap / 3 { + select { + chf1 <- f1 {} + i := <-ch1 { + sum1 += i + } + j := <-ch2 { + sum2 += j + } + chf2 <- f2 {} + } + } + sumch1 <- sum1 + sumch2 <- sum2 +} + +fn do_send_int(ch chan int, factor int) { + for i in 0 .. 10000 { + ch <- (i * factor) + } +} + +fn do_rec_f64(ch chan f64, sumch chan f64) { + mut sum := 0.0 + for _ in 0 .. 10000 { + sum += <-ch + } + sumch <- sum +} + +fn test_select() { + ch1 := chan int{cap: 3} + ch2 := chan int{} + // buffer length of chf1 mus be mutiple of 3 (# select threads) + chf1 := chan f64{cap: 30} + chf2 := chan f64{} + chsum1 := chan i64{} + chsum2 := chan i64{} + chsumf1 := chan f64{} + chsumf2 := chan f64{} + go do_send_int(ch1, 3) + go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2) + go do_rec_f64(chf1, chsumf1) + go do_rec_f64(chf2, chsumf2) + go do_rec_f64(chf2, chsumf2) + go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2) + go do_send_int(ch2, 7) + go do_send_int(ch2, 17) + go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2) + + sum1 := <-chsum1 + <-chsum1 + <-chsum1 + sum2 := <-chsum2 + <-chsum2 + <-chsum2 + mut sumf1 := <-chsumf1 + // empty channel buffer + for _ in 0 .. chf1.cap { + sumf1 += <-chf1 + } + sumf2 := <-chsumf2 + <-chsumf2 + // Use Gauß' formula + expected_sum := i64(10000) * (10000 - 1) / 2 + assert sum1 == 3 * expected_sum + assert sum2 == (7 + 17) * expected_sum + assert sumf1 == 17.0 * f64(10000 + chf1.cap) + assert sumf2 == 7.0 * 20000 +} diff --git a/v_windows/v/vlib/sync/channel_select_test.v b/v_windows/v/vlib/sync/channel_select_test.v new file mode 100644 index 0000000..26ed641 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_select_test.v @@ -0,0 +1,84 @@ +/* +* ATTENTION! Do not use this file as an example! + * For that, please look at `channel_select_2_test.v` or `channel_select_3_test.v` + * + * This test case uses the implementation in `sync/channels.v` directly + * in order to test it independently from the support in the core language +*/ + +module sync + +import time + +fn do_rec_i64(mut ch Channel) { + mut sum := i64(0) + for _ in 0 .. 300 { + mut a := i64(0) + ch.pop(&a) + sum += a + } + assert sum == 300 * (300 - 1) / 2 +} + +fn do_send_int(mut ch Channel) { + for i in 0 .. 300 { + ch.push(&i) + } +} + +fn do_send_byte(mut ch Channel) { + for i in 0 .. 300 { + ii := byte(i) + ch.push(&ii) + } +} + +fn do_send_i64(mut ch Channel) { + for i in 0 .. 300 { + ii := i64(i) + ch.push(&ii) + } +} + +fn test_select() { + mut chi := new_channel<int>(0) + mut chl := new_channel<i64>(1) + mut chb := new_channel<byte>(10) + mut recch := new_channel<i64>(0) + go do_rec_i64(mut recch) + go do_send_int(mut chi) + go do_send_byte(mut chb) + go do_send_i64(mut chl) + mut channels := [chi, recch, chl, chb] + directions := [Direction.pop, .push, .pop, .pop] + mut sum := i64(0) + mut rl := i64(0) + mut ri := int(0) + mut rb := byte(0) + mut sl := i64(0) + mut objs := [voidptr(&ri), &sl, &rl, &rb] + for _ in 0 .. 1200 { + idx := channel_select(mut channels, directions, mut objs, time.infinite) + match idx { + 0 { + sum += ri + } + 1 { + sl++ + } + 2 { + sum += rl + } + 3 { + sum += rb + } + else { + println('got $idx (timeout)') + } + } + } + // Use Gauß' formula for the first 2 contributions + // the 3rd contribution is `byte` and must be seen modulo 256 + expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2 + assert sum == expected_sum +} diff --git a/v_windows/v/vlib/sync/channel_try_buf_test.v b/v_windows/v/vlib/sync/channel_try_buf_test.v new file mode 100644 index 0000000..e521314 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_try_buf_test.v @@ -0,0 +1,17 @@ +fn test_channel_try_buffered() { + ch := chan int{cap: 5} + for z in 2 .. 13 { + if ch.try_push(z) == .not_ready { + assert z == 7 + break + } + } + mut obj := int(0) + for ch.try_pop(mut obj) == .success { + println(obj) + } + assert obj == 6 + ch <- 17 + obj = <-ch + assert obj == 17 +} diff --git a/v_windows/v/vlib/sync/channel_try_unbuf_test.v b/v_windows/v/vlib/sync/channel_try_unbuf_test.v new file mode 100644 index 0000000..ee11468 --- /dev/null +++ b/v_windows/v/vlib/sync/channel_try_unbuf_test.v @@ -0,0 +1,13 @@ +fn test_channel_try_unbuffered() { + ch := chan int{} + for z in 5 .. 8 { + if ch.try_push(z) == .not_ready { + assert z == 5 + break + } + panic('push on non-ready channel not detected') + } + mut obj := -17 + for ch.try_pop(mut obj) == .success {} + assert obj == -17 +} diff --git a/v_windows/v/vlib/sync/channels.v b/v_windows/v/vlib/sync/channels.v new file mode 100644 index 0000000..49f1396 --- /dev/null +++ b/v_windows/v/vlib/sync/channels.v @@ -0,0 +1,730 @@ +module sync + +import time +import rand + +$if windows { + #flag -I @VEXEROOT/thirdparty/stdatomic/win +} $else { + #flag -I @VEXEROOT/thirdparty/stdatomic/nix +} + +$if linux { + $if tinyc { + $if amd64 { + // most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir + #flag -L/usr/lib/gcc/x86_64-linux-gnu/6 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/7 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/8 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/9 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/10 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/11 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/12 + } $else $if arm64 { + #flag -L/usr/lib/gcc/aarch64-linux-gnu/6 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/7 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/8 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/9 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/10 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/11 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/12 + } + #flag -latomic + } +} + +#include <atomic.h> +// The following functions are actually generic in C +fn C.atomic_load_ptr(voidptr) voidptr +fn C.atomic_store_ptr(voidptr, voidptr) +fn C.atomic_compare_exchange_weak_ptr(voidptr, voidptr, voidptr) bool +fn C.atomic_compare_exchange_strong_ptr(voidptr, voidptr, voidptr) bool +fn C.atomic_exchange_ptr(voidptr, voidptr) voidptr +fn C.atomic_fetch_add_ptr(voidptr, voidptr) voidptr +fn C.atomic_fetch_sub_ptr(voidptr, voidptr) voidptr + +fn C.atomic_load_u16(voidptr) u16 +fn C.atomic_store_u16(voidptr, u16) +fn C.atomic_compare_exchange_weak_u16(voidptr, voidptr, u16) bool +fn C.atomic_compare_exchange_strong_u16(voidptr, voidptr, u16) bool +fn C.atomic_exchange_u16(voidptr, u16) u16 +fn C.atomic_fetch_add_u16(voidptr, u16) u16 +fn C.atomic_fetch_sub_u16(voidptr, u16) u16 + +fn C.atomic_load_u32(voidptr) u32 +fn C.atomic_store_u32(voidptr, u32) +fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool +fn C.atomic_compare_exchange_strong_u32(voidptr, voidptr, u32) bool +fn C.atomic_exchange_u32(voidptr, u32) u32 +fn C.atomic_fetch_add_u32(voidptr, u32) u32 +fn C.atomic_fetch_sub_u32(voidptr, u32) u32 + +fn C.atomic_load_u64(voidptr) u64 +fn C.atomic_store_u64(voidptr, u64) +fn C.atomic_compare_exchange_weak_u64(voidptr, voidptr, u64) bool +fn C.atomic_compare_exchange_strong_u64(voidptr, voidptr, u64) bool +fn C.atomic_exchange_u64(voidptr, u64) u64 +fn C.atomic_fetch_add_u64(voidptr, u64) u64 +fn C.atomic_fetch_sub_u64(voidptr, u64) u64 + +const ( + // how often to try to get data without blocking before to wait for semaphore + spinloops = 750 + spinloops_sem = 4000 +) + +enum BufferElemStat { + unused = 0 + writing + written + reading +} + +struct Subscription { +mut: + sem &Semaphore + prev &&Subscription + nxt &Subscription +} + +enum Direction { + pop + push +} + +struct Channel { + ringbuf &byte // queue for buffered channels + statusbuf &byte // flags to synchronize write/read in ringbuf + objsize u32 +mut: // atomic + writesem Semaphore // to wake thread that wanted to write, but buffer was full + readsem Semaphore // to wake thread that wanted to read, but buffer was empty + writesem_im Semaphore + readsem_im Semaphore + write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait + read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait + adr_read C.atomic_uintptr_t // used to identify origin of writesem + adr_written C.atomic_uintptr_t // used to identify origin of readsem + write_free u32 // for queue state + read_avail u32 + buf_elem_write_idx u32 + buf_elem_read_idx u32 + // for select + write_subscriber &Subscription + read_subscriber &Subscription + write_sub_mtx u16 + read_sub_mtx u16 + closed u16 +pub: + cap u32 // queue length in #objects +} + +pub fn new_channel<T>(n u32) &Channel { + st := sizeof(T) + if isreftype(T) { + return new_channel_st(n, st) + } else { + return new_channel_st_noscan(n, st) + } +} + +fn new_channel_st(n u32, st u32) &Channel { + wsem := if n > 0 { n } else { 1 } + rsem := if n > 0 { u32(0) } else { 1 } + rbuf := if n > 0 { unsafe { malloc(int(n * st)) } } else { &byte(0) } + sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &byte(0) } + mut ch := Channel{ + objsize: st + cap: n + write_free: n + read_avail: 0 + ringbuf: rbuf + statusbuf: sbuf + write_subscriber: 0 + read_subscriber: 0 + } + ch.writesem.init(wsem) + ch.readsem.init(rsem) + ch.writesem_im.init(0) + ch.readsem_im.init(0) + return &ch +} + +fn new_channel_st_noscan(n u32, st u32) &Channel { + $if gcboehm_opt ? { + wsem := if n > 0 { n } else { 1 } + rsem := if n > 0 { u32(0) } else { 1 } + rbuf := if n > 0 { unsafe { malloc_noscan(int(n * st)) } } else { &byte(0) } + sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &byte(0) } + mut ch := Channel{ + objsize: st + cap: n + write_free: n + read_avail: 0 + ringbuf: rbuf + statusbuf: sbuf + write_subscriber: 0 + read_subscriber: 0 + } + ch.writesem.init(wsem) + ch.readsem.init(rsem) + ch.writesem_im.init(0) + ch.readsem_im.init(0) + return &ch + } $else { + return new_channel_st(n, st) + } +} + +pub fn (ch &Channel) auto_str(typename string) string { + return 'chan $typename{cap: $ch.cap, closed: $ch.closed}' +} + +pub fn (mut ch Channel) close() { + open_val := u16(0) + if !C.atomic_compare_exchange_strong_u16(&ch.closed, &open_val, 1) { + return + } + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, &nulladr, + voidptr(-1)) { + nulladr = voidptr(0) + } + ch.readsem_im.post() + ch.readsem.post() + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + null16 = u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + ch.writesem.post() + if ch.cap == 0 { + C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, voidptr(0)) + } + ch.writesem_im.post() +} + +[inline] +pub fn (mut ch Channel) len() int { + return int(C.atomic_load_u32(&ch.read_avail)) +} + +[inline] +pub fn (mut ch Channel) closed() bool { + return C.atomic_load_u16(&ch.closed) != 0 +} + +[inline] +pub fn (mut ch Channel) push(src voidptr) { + if ch.try_push_priv(src, false) == .closed { + panic('push on closed channel') + } +} + +[inline] +pub fn (mut ch Channel) try_push(src voidptr) ChanState { + return ch.try_push_priv(src, true) +} + +fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState { + if C.atomic_load_u16(&ch.closed) != 0 { + return .closed + } + spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem } + mut have_swapped := false + for { + mut got_sem := false + mut wradr := C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) }) + for wradr != C.NULL { + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) }, + &wradr, voidptr(0)) + { + // there is a reader waiting for us + unsafe { C.memcpy(wradr, src, ch.objsize) } + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, + &nulladr, wradr) { + nulladr = voidptr(0) + } + ch.readsem_im.post() + return .success + } + } + if no_block && ch.cap == 0 { + return .not_ready + } + // get token to read + for _ in 0 .. spinloops_sem_ { + if got_sem { + break + } + got_sem = ch.writesem.try_wait() + } + if !got_sem { + if no_block { + return .not_ready + } + ch.writesem.wait() + } + if C.atomic_load_u16(&ch.closed) != 0 { + ch.writesem.post() + return .closed + } + if ch.cap == 0 { + // try to advertise current object as readable + mut read_in_progress := false + C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, src) + wradr = C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) }) + if wradr != C.NULL { + mut src2 := src + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) }, + &src2, voidptr(0)) + { + ch.writesem.post() + continue + } else { + read_in_progress = true + } + } + if !read_in_progress { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16, + u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + mut src2 := src + for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ { + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, + &src2, voidptr(0)) + { + have_swapped = true + read_in_progress = true + break + } + src2 = src + } + mut got_im_sem := false + for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ { + got_im_sem = ch.writesem_im.try_wait() + if got_im_sem { + break + } + } + for { + if got_im_sem { + got_im_sem = false + } else { + ch.writesem_im.wait() + } + if C.atomic_load_u16(&ch.closed) != 0 { + if have_swapped + || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) { + ch.writesem.post() + return .success + } else { + return .closed + } + } + if have_swapped + || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) { + ch.writesem.post() + break + } else { + // this semaphore was not for us - repost in + ch.writesem_im.post() + if src2 == voidptr(-1) { + ch.readsem.post() + return .closed + } + src2 = src + } + } + return .success + } else { + // buffered channel + mut space_in_queue := false + mut wr_free := C.atomic_load_u32(&ch.write_free) + for wr_free > 0 { + space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free, + wr_free - 1) + if space_in_queue { + break + } + } + if space_in_queue { + mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx) + for { + mut new_wr_idx := wr_idx + 1 + for new_wr_idx >= ch.cap { + new_wr_idx -= ch.cap + } + if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, + new_wr_idx) + { + break + } + } + mut wr_ptr := ch.ringbuf + mut status_adr := ch.statusbuf + unsafe { + wr_ptr += wr_idx * ch.objsize + status_adr += wr_idx * sizeof(u16) + } + mut expected_status := u16(BufferElemStat.unused) + for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, + u16(BufferElemStat.writing)) { + expected_status = u16(BufferElemStat.unused) + } + unsafe { + C.memcpy(wr_ptr, src, ch.objsize) + } + C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written)) + C.atomic_fetch_add_u32(&ch.read_avail, 1) + ch.readsem.post() + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + return .success + } else { + if no_block { + return .not_ready + } + ch.writesem.post() + } + } + } + // we should not get here but the V compiler want's to see a return statement + assert false + return .success +} + +[inline] +pub fn (mut ch Channel) pop(dest voidptr) bool { + return ch.try_pop_priv(dest, false) == .success +} + +[inline] +pub fn (mut ch Channel) try_pop(dest voidptr) ChanState { + return ch.try_pop_priv(dest, true) +} + +fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { + spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem } + mut have_swapped := false + mut write_in_progress := false + for { + mut got_sem := false + if ch.cap == 0 { + // unbuffered channel - first see if a `push()` has adversized + mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) }) + for rdadr != C.NULL { + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) }, + &rdadr, voidptr(0)) + { + // there is a writer waiting for us + unsafe { C.memcpy(dest, rdadr, ch.objsize) } + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_read) }, + &nulladr, rdadr) { + nulladr = voidptr(0) + } + ch.writesem_im.post() + return .success + } + } + if no_block { + if C.atomic_load_u16(&ch.closed) == 0 { + return .not_ready + } else { + return .closed + } + } + } + // get token to read + for _ in 0 .. spinloops_sem_ { + if got_sem { + break + } + got_sem = ch.readsem.try_wait() + } + if !got_sem { + if no_block { + if C.atomic_load_u16(&ch.closed) == 0 { + return .not_ready + } else { + return .closed + } + } + ch.readsem.wait() + } + if ch.cap > 0 { + // try to get buffer token + mut obj_in_queue := false + mut rd_avail := C.atomic_load_u32(&ch.read_avail) + for rd_avail > 0 { + obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail, + rd_avail - 1) + if obj_in_queue { + break + } + } + if obj_in_queue { + mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx) + for { + mut new_rd_idx := rd_idx + 1 + for new_rd_idx >= ch.cap { + new_rd_idx -= ch.cap + } + if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, + new_rd_idx) + { + break + } + } + mut rd_ptr := ch.ringbuf + mut status_adr := ch.statusbuf + unsafe { + rd_ptr += rd_idx * ch.objsize + status_adr += rd_idx * sizeof(u16) + } + mut expected_status := u16(BufferElemStat.written) + for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, + u16(BufferElemStat.reading)) { + expected_status = u16(BufferElemStat.written) + } + unsafe { + C.memcpy(dest, rd_ptr, ch.objsize) + } + C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused)) + C.atomic_fetch_add_u32(&ch.write_free, 1) + ch.writesem.post() + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + return .success + } + } + // try to advertise `dest` as writable + C.atomic_store_ptr(unsafe { &voidptr(&ch.write_adr) }, dest) + if ch.cap == 0 { + mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) }) + if rdadr != C.NULL { + mut dest2 := dest + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) }, + &dest2, voidptr(0)) + { + ch.readsem.post() + continue + } else { + write_in_progress = true + } + } + } + if ch.cap == 0 && !write_in_progress { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } + mut dest2 := dest + for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ { + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, + &dest2, voidptr(0)) + { + have_swapped = true + break + } else if dest2 == voidptr(-1) { + ch.readsem.post() + return .closed + } + dest2 = dest + } + mut got_im_sem := false + for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ { + got_im_sem = ch.readsem_im.try_wait() + if got_im_sem { + break + } + } + for { + if got_im_sem { + got_im_sem = false + } else { + ch.readsem_im.wait() + } + if have_swapped + || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, &dest2, voidptr(0)) { + ch.readsem.post() + break + } else { + // this semaphore was not for us - repost in + ch.readsem_im.post() + if dest2 == voidptr(-1) { + ch.readsem.post() + return .closed + } + dest2 = dest + } + } + break + } + return .success +} + +// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`) +// object referenced by `objrefs[i]`. `timeout = time.infinite` means wait unlimited time. `timeout <= 0` means return +// immediately if no transaction can be performed without waiting. +// return value: the index of the channel on which a transaction has taken place +// -1 if waiting for a transaction has exceeded timeout +// -2 if all channels are closed + +pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int { + assert channels.len == dir.len + assert dir.len == objrefs.len + mut subscr := []Subscription{len: channels.len} + mut sem := unsafe { Semaphore{} } + sem.init(0) + for i, ch in channels { + subscr[i].sem = unsafe { &sem } + if dir[i] == .push { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + subscr[i].prev = unsafe { &ch.write_subscriber } + unsafe { + subscr[i].nxt = C.atomic_exchange_ptr(&voidptr(&ch.write_subscriber), + &subscr[i]) + } + if voidptr(subscr[i].nxt) != voidptr(0) { + subscr[i].nxt.prev = unsafe { &subscr[i].nxt } + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } else { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + subscr[i].prev = unsafe { &ch.read_subscriber } + unsafe { + subscr[i].nxt = C.atomic_exchange_ptr(&voidptr(&ch.read_subscriber), &subscr[i]) + } + if voidptr(subscr[i].nxt) != voidptr(0) { + subscr[i].nxt.prev = unsafe { &subscr[i].nxt } + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + } + stopwatch := if timeout == time.infinite || timeout <= 0 { + time.StopWatch{} + } else { + time.new_stopwatch() + } + mut event_idx := -1 // negative index means `timed out` + + outer: for { + rnd := rand.u32_in_range(0, u32(channels.len)) + mut num_closed := 0 + for j, _ in channels { + mut i := j + int(rnd) + if i >= channels.len { + i -= channels.len + } + if dir[i] == .push { + stat := channels[i].try_push_priv(objrefs[i], true) + if stat == .success { + event_idx = i + break outer + } else if stat == .closed { + num_closed++ + } + } else { + stat := channels[i].try_pop_priv(objrefs[i], true) + if stat == .success { + event_idx = i + break outer + } else if stat == .closed { + num_closed++ + } + } + } + if num_closed == channels.len { + event_idx = -2 + break outer + } + if timeout <= 0 { + break outer + } + if timeout != time.infinite { + remaining := timeout - stopwatch.elapsed() + if !sem.timed_wait(remaining) { + break outer + } + } else { + sem.wait() + } + } + // reset subscribers + for i, ch in channels { + if dir[i] == .push { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + unsafe { + *subscr[i].prev = subscr[i].nxt + } + if subscr[i].nxt != 0 { + subscr[i].nxt.prev = subscr[i].prev + // just in case we have missed a semaphore during restore + subscr[i].nxt.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } else { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + unsafe { + *subscr[i].prev = subscr[i].nxt + } + if subscr[i].nxt != 0 { + subscr[i].nxt.prev = subscr[i].prev + subscr[i].nxt.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + } + sem.destroy() + return event_idx +} diff --git a/v_windows/v/vlib/sync/pool/README.md b/v_windows/v/vlib/sync/pool/README.md new file mode 100644 index 0000000..bdea5b3 --- /dev/null +++ b/v_windows/v/vlib/sync/pool/README.md @@ -0,0 +1,36 @@ + +The `sync.pool` module provides a convenient way to run identical tasks over +an array of items *in parallel*, without worrying about thread synchronization, +waitgroups, mutexes etc.., you just need to supply a callback function, that +will be called once per each item in your input array. + +After all the work is done in parallel by the worker threads in the pool, +pool.work_on_items will return. You can then call pool.get_results<Result>() +to retrieve a list of all the results, that the worker callbacks returned +for each input item. Example: + +```v +import sync.pool + +struct SResult { + s string +} + +fn sprocess(pp &pool.PoolProcessor, idx int, wid int) &SResult { + item := pp.get_item<string>(idx) + println('idx: $idx, wid: $wid, item: ' + item) + return &SResult{item.reverse()} +} + +fn main() { + mut pp := pool.new_pool_processor(callback: sprocess) + pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc']) + // optionally, you can iterate over the results too: + for x in pp.get_results<SResult>() { + println('result: $x.s') + } +} +``` + +See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a +more detailed usage example. diff --git a/v_windows/v/vlib/sync/pool/pool.v b/v_windows/v/vlib/sync/pool/pool.v new file mode 100644 index 0000000..b2c5340 --- /dev/null +++ b/v_windows/v/vlib/sync/pool/pool.v @@ -0,0 +1,165 @@ +module pool + +import sync +import runtime + +[trusted] +fn C.atomic_fetch_add_u32(voidptr, u32) u32 + +pub const ( + no_result = voidptr(0) +) + +pub struct PoolProcessor { + thread_cb voidptr +mut: + njobs int + items []voidptr + results []voidptr + ntask u32 // reading/writing to this should be atomic + waitgroup sync.WaitGroup + shared_context voidptr + thread_contexts []voidptr +} + +pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr + +pub struct PoolProcessorConfig { + maxjobs int + callback ThreadCB +} + +// new_pool_processor returns a new PoolProcessor instance. +// The parameters of new_pool_processor are: +// context.maxjobs: when 0 (the default), the PoolProcessor will use a +// number of threads, that is optimal for your system to process your items. +// context.callback: this should be a callback function, that each worker +// thread in the pool will run for each item. +// The callback function will receive as parameters: +// 1) the PoolProcessor instance, so it can call +// p.get_item<int>(idx) to get the actual item at index idx +// 2) idx - the index of the currently processed item +// 3) task_id - the index of the worker thread in which the callback +// function is running. +pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor { + if isnil(context.callback) { + panic('You need to pass a valid callback to new_pool_processor.') + } + mut pool := PoolProcessor{ + items: [] + results: [] + shared_context: voidptr(0) + thread_contexts: [] + njobs: context.maxjobs + ntask: 0 + thread_cb: voidptr(context.callback) + } + pool.waitgroup.init() + return &pool +} + +// set_max_jobs gives you the ability to override the number +// of jobs *after* the PoolProcessor had been created already. +pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) { + pool.njobs = njobs +} + +// work_on_items receives a list of items of type T, +// then starts a work pool of pool.njobs threads, each running +// pool.thread_cb in a loop, untill all items in the list, +// are processed. +// When pool.njobs is 0, the number of jobs is determined +// by the number of available cores on the system. +// work_on_items returns *after* all threads finish. +// You can optionally call get_results after that. +pub fn (mut pool PoolProcessor) work_on_items<T>(items []T) { + pool.work_on_pointers(unsafe { items.pointers() }) +} + +pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) { + mut njobs := runtime.nr_jobs() + if pool.njobs > 0 { + njobs = pool.njobs + } + pool.items = [] + pool.results = [] + pool.thread_contexts = [] + pool.items << items + pool.results = []voidptr{len: (pool.items.len)} + pool.thread_contexts << []voidptr{len: (pool.items.len)} + pool.waitgroup.add(njobs) + for i := 0; i < njobs; i++ { + if njobs > 1 { + go process_in_thread(mut pool, i) + } else { + // do not run concurrently, just use the same thread: + process_in_thread(mut pool, i) + } + } + pool.waitgroup.wait() +} + +// process_in_thread does the actual work of worker thread. +// It is a workaround for the current inability to pass a +// method in a callback. +fn process_in_thread(mut pool PoolProcessor, task_id int) { + cb := ThreadCB(pool.thread_cb) + ilen := pool.items.len + for { + idx := int(C.atomic_fetch_add_u32(&pool.ntask, 1)) + if idx >= ilen { + break + } + pool.results[idx] = cb(pool, idx, task_id) + } + pool.waitgroup.done() +} + +// get_item - called by the worker callback. +// Retrieves a type safe instance of the currently processed item +pub fn (pool &PoolProcessor) get_item<T>(idx int) T { + return *(&T(pool.items[idx])) +} + +// get_result - called by the main thread to get a specific result. +// Retrieves a type safe instance of the produced result. +pub fn (pool &PoolProcessor) get_result<T>(idx int) T { + return *(&T(pool.results[idx])) +} + +// get_results - get a list of type safe results in the main thread. +pub fn (pool &PoolProcessor) get_results<T>() []T { + mut res := []T{} + for i in 0 .. pool.results.len { + res << *(&T(pool.results[i])) + } + return res +} + +// set_shared_context - can be called during the setup so that you can +// provide a context that is shared between all worker threads, like +// common options/settings. +pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) { + pool.shared_context = context +} + +// get_shared_context - can be called in each worker callback, to get +// the context set by pool.set_shared_context +pub fn (pool &PoolProcessor) get_shared_context() voidptr { + return pool.shared_context +} + +// set_thread_context - can be called during the setup at the start of +// each worker callback, so that the worker callback can have some thread +// local storage area where it can write/read information that is private +// to the given thread, without worrying that it will get overwritten by +// another thread +pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) { + pool.thread_contexts[idx] = context +} + +// get_thread_context - returns a pointer, that was set with +// pool.set_thread_context . This pointer is private to each thread. +pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr { + return pool.thread_contexts[idx] +} diff --git a/v_windows/v/vlib/sync/pool/pool_test.v b/v_windows/v/vlib/sync/pool/pool_test.v new file mode 100644 index 0000000..629b524 --- /dev/null +++ b/v_windows/v/vlib/sync/pool/pool_test.v @@ -0,0 +1,52 @@ +import time +import sync.pool + +pub struct SResult { + s string +} + +pub struct IResult { + i int +} + +fn worker_s(p &pool.PoolProcessor, idx int, worker_id int) &SResult { + item := p.get_item<string>(idx) + println('worker_s worker_id: $worker_id | idx: $idx | item: $item') + time.sleep(3 * time.millisecond) + return &SResult{'$item $item'} +} + +fn worker_i(p &pool.PoolProcessor, idx int, worker_id int) &IResult { + item := p.get_item<int>(idx) + println('worker_i worker_id: $worker_id | idx: $idx | item: $item') + time.sleep(5 * time.millisecond) + return &IResult{item * 1000} +} + +fn test_work_on_strings() { + mut pool_s := pool.new_pool_processor( + callback: worker_s + maxjobs: 8 + ) + + pool_s.work_on_items(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']) + for x in pool_s.get_results<SResult>() { + println(x.s) + assert x.s.len > 1 + } +} + +fn test_work_on_ints() { + // NB: since maxjobs is left empty here, + // the pool processor will use njobs = runtime.nr_jobs so that + // it will work optimally without overloading the system + mut pool_i := pool.new_pool_processor( + callback: worker_i + ) + + pool_i.work_on_items([1, 2, 3, 4, 5, 6, 7, 8]) + for x in pool_i.get_results<IResult>() { + println(x.i) + assert x.i > 100 + } +} diff --git a/v_windows/v/vlib/sync/select_close_test.v b/v_windows/v/vlib/sync/select_close_test.v new file mode 100644 index 0000000..3a40e2d --- /dev/null +++ b/v_windows/v/vlib/sync/select_close_test.v @@ -0,0 +1,92 @@ +module sync + +import time + +fn do_rec_i64(mut ch Channel) { + mut sum := i64(0) + for i in 0 .. 300 { + if i == 200 { + ch.close() + } + mut a := i64(0) + if ch.pop(&a) { + sum += a + } + } + assert sum == 200 * (200 - 1) / 2 +} + +fn do_send_int(mut ch Channel) { + for i in 0 .. 300 { + ch.push(&i) + } + ch.close() +} + +fn do_send_byte(mut ch Channel) { + for i in 0 .. 300 { + ii := byte(i) + ch.push(&ii) + } + ch.close() +} + +fn do_send_i64(mut ch Channel) { + for i in 0 .. 300 { + ii := i64(i) + ch.push(&ii) + } + ch.close() +} + +fn test_select() { + mut chi := new_channel<int>(0) + mut chl := new_channel<i64>(1) + mut chb := new_channel<byte>(10) + mut recch := new_channel<i64>(0) + go do_rec_i64(mut recch) + go do_send_int(mut chi) + go do_send_byte(mut chb) + go do_send_i64(mut chl) + mut channels := [chi, recch, chl, chb] + directions := [Direction.pop, .push, .pop, .pop] + mut sum := i64(0) + mut rl := i64(0) + mut ri := int(0) + mut rb := byte(0) + mut sl := i64(0) + mut objs := [voidptr(&ri), &sl, &rl, &rb] + for j in 0 .. 1101 { + idx := channel_select(mut channels, directions, mut objs, time.infinite) + match idx { + 0 { + sum += ri + } + 1 { + sl++ + } + 2 { + sum += rl + } + 3 { + sum += rb + } + -2 { + // channel was closed - last item + assert j == 1100 + } + else { + println('got $idx (timeout)') + assert false + } + } + if j == 1100 { + // check also in other direction + assert idx == -2 + } + } + // Use Gauß' formula for the first 2 contributions + // the 3rd contribution is `byte` and must be seen modulo 256 + expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2 + assert sum == expected_sum +} diff --git a/v_windows/v/vlib/sync/struct_chan_init_test.v b/v_windows/v/vlib/sync/struct_chan_init_test.v new file mode 100644 index 0000000..a51ea4b --- /dev/null +++ b/v_windows/v/vlib/sync/struct_chan_init_test.v @@ -0,0 +1,14 @@ +struct Abc { + ch chan int +} + +fn f(st Abc) { + st.ch <- 47 +} + +fn test_chan_init() { + st := Abc{} + go f(st) + i := <-st.ch + assert i == 47 +} diff --git a/v_windows/v/vlib/sync/sync_default.c.v b/v_windows/v/vlib/sync/sync_default.c.v new file mode 100644 index 0000000..76f0b44 --- /dev/null +++ b/v_windows/v/vlib/sync/sync_default.c.v @@ -0,0 +1,193 @@ +// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +import time + +// There's no additional linking (-lpthread) needed for Android. +// See https://stackoverflow.com/a/31277163/1904615 +$if !android { + #flag -lpthread +} + +#include <semaphore.h> + +[trusted] +fn C.pthread_mutex_init(voidptr, voidptr) int +fn C.pthread_mutex_lock(voidptr) int +fn C.pthread_mutex_unlock(voidptr) int +fn C.pthread_mutex_destroy(voidptr) int +fn C.pthread_rwlockattr_init(voidptr) int +fn C.pthread_rwlockattr_setkind_np(voidptr, int) int +fn C.pthread_rwlockattr_setpshared(voidptr, int) int +fn C.pthread_rwlock_init(voidptr, voidptr) int +fn C.pthread_rwlock_rdlock(voidptr) int +fn C.pthread_rwlock_wrlock(voidptr) int +fn C.pthread_rwlock_unlock(voidptr) int +fn C.sem_init(voidptr, int, u32) int +fn C.sem_post(voidptr) int +fn C.sem_wait(voidptr) int +fn C.sem_trywait(voidptr) int +fn C.sem_timedwait(voidptr, voidptr) int +fn C.sem_destroy(voidptr) int + +// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. +[heap] +pub struct Mutex { + mutex C.pthread_mutex_t +} + +[heap] +pub struct RwMutex { + mutex C.pthread_rwlock_t +} + +struct RwMutexAttr { + attr C.pthread_rwlockattr_t +} + +[heap] +struct Semaphore { + sem C.sem_t +} + +pub fn new_mutex() &Mutex { + mut m := &Mutex{} + m.init() + return m +} + +pub fn (mut m Mutex) init() { + C.pthread_mutex_init(&m.mutex, C.NULL) +} + +pub fn new_rwmutex() &RwMutex { + mut m := &RwMutex{} + m.init() + return m +} + +pub fn (mut m RwMutex) init() { + a := RwMutexAttr{} + C.pthread_rwlockattr_init(&a.attr) + // Give writer priority over readers + C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) + C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) + C.pthread_rwlock_init(&m.mutex, &a.attr) +} + +// @lock(), for *manual* mutex handling, since `lock` is a keyword +pub fn (mut m Mutex) @lock() { + C.pthread_mutex_lock(&m.mutex) +} + +pub fn (mut m Mutex) unlock() { + C.pthread_mutex_unlock(&m.mutex) +} + +// RwMutex has separate read- and write locks +pub fn (mut m RwMutex) @rlock() { + C.pthread_rwlock_rdlock(&m.mutex) +} + +pub fn (mut m RwMutex) @lock() { + C.pthread_rwlock_wrlock(&m.mutex) +} + +// Windows SRWLocks have different function to unlock +// So provide two functions here, too, to have a common interface +pub fn (mut m RwMutex) runlock() { + C.pthread_rwlock_unlock(&m.mutex) +} + +pub fn (mut m RwMutex) unlock() { + C.pthread_rwlock_unlock(&m.mutex) +} + +[inline] +pub fn new_semaphore() &Semaphore { + return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) &Semaphore { + mut sem := &Semaphore{} + sem.init(n) + return sem +} + +pub fn (mut sem Semaphore) init(n u32) { + C.sem_init(&sem.sem, 0, n) +} + +pub fn (mut sem Semaphore) post() { + C.sem_post(&sem.sem) +} + +pub fn (mut sem Semaphore) wait() { + for { + if C.sem_wait(&sem.sem) == 0 { + return + } + e := C.errno + match e { + C.EINTR { + continue // interrupted by signal + } + else { + panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) }) + } + } + } +} + +// `try_wait()` should return as fast as possible so error handling is only +// done when debugging +pub fn (mut sem Semaphore) try_wait() bool { + $if !debug { + return C.sem_trywait(&sem.sem) == 0 + } $else { + if C.sem_trywait(&sem.sem) != 0 { + e := C.errno + match e { + C.EAGAIN { + return false + } + else { + panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) }) + } + } + } + return true + } +} + +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { + t_spec := timeout.timespec() + for { + if C.sem_timedwait(&sem.sem, &t_spec) == 0 { + return true + } + e := C.errno + match e { + C.EINTR { + continue // interrupted by signal + } + C.ETIMEDOUT { + break + } + else { + panic(unsafe { tos_clone(&byte(C.strerror(e))) }) + } + } + } + return false +} + +pub fn (sem Semaphore) destroy() { + res := C.sem_destroy(&sem.sem) + if res == 0 { + return + } + panic(unsafe { tos_clone(&byte(C.strerror(res))) }) +} diff --git a/v_windows/v/vlib/sync/sync_macos.c.v b/v_windows/v/vlib/sync/sync_macos.c.v new file mode 100644 index 0000000..3f83198 --- /dev/null +++ b/v_windows/v/vlib/sync/sync_macos.c.v @@ -0,0 +1,232 @@ +// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +import time + +#flag -lpthread +#include <semaphore.h> +#include <sys/errno.h> + +[trusted] +fn C.pthread_mutex_init(voidptr, voidptr) int +fn C.pthread_mutex_lock(voidptr) int +fn C.pthread_mutex_unlock(voidptr) int +fn C.pthread_mutex_destroy(voidptr) int +fn C.pthread_rwlockattr_init(voidptr) int +fn C.pthread_rwlockattr_setkind_np(voidptr, int) int +fn C.pthread_rwlockattr_setpshared(voidptr, int) int +fn C.pthread_rwlock_init(voidptr, voidptr) int +fn C.pthread_rwlock_rdlock(voidptr) int +fn C.pthread_rwlock_wrlock(voidptr) int +fn C.pthread_rwlock_unlock(voidptr) int +fn C.pthread_condattr_init(voidptr) int +fn C.pthread_condattr_setpshared(voidptr, int) int +fn C.pthread_condattr_destroy(voidptr) int +fn C.pthread_cond_init(voidptr, voidptr) int +fn C.pthread_cond_signal(voidptr) int +fn C.pthread_cond_wait(voidptr, voidptr) int +fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int +fn C.pthread_cond_destroy(voidptr) int + +// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. +[heap] +pub struct Mutex { + mutex C.pthread_mutex_t +} + +[heap] +pub struct RwMutex { + mutex C.pthread_rwlock_t +} + +struct RwMutexAttr { + attr C.pthread_rwlockattr_t +} + +struct CondAttr { + attr C.pthread_condattr_t +} + +/* +MacOSX has no unnamed semaphores and no `timed_wait()` at all + so we emulate the behaviour with other devices +*/ +[heap] +struct Semaphore { + mtx C.pthread_mutex_t + cond C.pthread_cond_t +mut: + count u32 +} + +pub fn new_mutex() &Mutex { + mut m := &Mutex{} + m.init() + return m +} + +pub fn (mut m Mutex) init() { + C.pthread_mutex_init(&m.mutex, C.NULL) +} + +pub fn new_rwmutex() &RwMutex { + mut m := &RwMutex{} + m.init() + return m +} + +pub fn (mut m RwMutex) init() { + a := RwMutexAttr{} + C.pthread_rwlockattr_init(&a.attr) + // Give writer priority over readers + C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) + C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) + C.pthread_rwlock_init(&m.mutex, &a.attr) +} + +// @lock(), for *manual* mutex handling, since `lock` is a keyword +pub fn (mut m Mutex) @lock() { + C.pthread_mutex_lock(&m.mutex) +} + +pub fn (mut m Mutex) unlock() { + C.pthread_mutex_unlock(&m.mutex) +} + +// RwMutex has separate read- and write locks +pub fn (mut m RwMutex) @rlock() { + C.pthread_rwlock_rdlock(&m.mutex) +} + +pub fn (mut m RwMutex) @lock() { + C.pthread_rwlock_wrlock(&m.mutex) +} + +// Windows SRWLocks have different function to unlock +// So provide two functions here, too, to have a common interface +pub fn (mut m RwMutex) runlock() { + C.pthread_rwlock_unlock(&m.mutex) +} + +pub fn (mut m RwMutex) unlock() { + C.pthread_rwlock_unlock(&m.mutex) +} + +[inline] +pub fn new_semaphore() &Semaphore { + return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) &Semaphore { + mut sem := &Semaphore{} + sem.init(n) + return sem +} + +pub fn (mut sem Semaphore) init(n u32) { + C.atomic_store_u32(&sem.count, n) + C.pthread_mutex_init(&sem.mtx, C.NULL) + attr := CondAttr{} + C.pthread_condattr_init(&attr.attr) + C.pthread_condattr_setpshared(&attr.attr, C.PTHREAD_PROCESS_PRIVATE) + C.pthread_cond_init(&sem.cond, &attr.attr) + C.pthread_condattr_destroy(&attr.attr) +} + +pub fn (mut sem Semaphore) post() { + mut c := C.atomic_load_u32(&sem.count) + for c > 1 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) { + return + } + } + C.pthread_mutex_lock(&sem.mtx) + c = C.atomic_fetch_add_u32(&sem.count, 1) + if c == 0 { + C.pthread_cond_signal(&sem.cond) + } + C.pthread_mutex_unlock(&sem.mtx) +} + +pub fn (mut sem Semaphore) wait() { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + return + } + } + C.pthread_mutex_lock(&sem.mtx) + c = C.atomic_load_u32(&sem.count) + + outer: for { + if c == 0 { + C.pthread_cond_wait(&sem.cond, &sem.mtx) + c = C.atomic_load_u32(&sem.count) + } + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + if c > 1 { + C.pthread_cond_signal(&sem.cond) + } + break outer + } + } + } + C.pthread_mutex_unlock(&sem.mtx) +} + +pub fn (mut sem Semaphore) try_wait() bool { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + return true + } + } + return false +} + +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + return true + } + } + C.pthread_mutex_lock(&sem.mtx) + t_spec := timeout.timespec() + mut res := 0 + c = C.atomic_load_u32(&sem.count) + + outer: for { + if c == 0 { + res = C.pthread_cond_timedwait(&sem.cond, &sem.mtx, &t_spec) + if res == C.ETIMEDOUT { + break outer + } + c = C.atomic_load_u32(&sem.count) + } + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + if c > 1 { + C.pthread_cond_signal(&sem.cond) + } + break outer + } + } + } + C.pthread_mutex_unlock(&sem.mtx) + return res == 0 +} + +pub fn (mut sem Semaphore) destroy() { + mut res := C.pthread_cond_destroy(&sem.cond) + if res == 0 { + res = C.pthread_mutex_destroy(&sem.mtx) + if res == 0 { + return + } + } + panic(unsafe { tos_clone(&byte(C.strerror(res))) }) +} diff --git a/v_windows/v/vlib/sync/sync_windows.c.v b/v_windows/v/vlib/sync/sync_windows.c.v new file mode 100644 index 0000000..d0942b7 --- /dev/null +++ b/v_windows/v/vlib/sync/sync_windows.c.v @@ -0,0 +1,212 @@ +// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +import time + +#include <synchapi.h> +#include <time.h> + +fn C.GetSystemTimeAsFileTime(lpSystemTimeAsFileTime &C._FILETIME) +fn C.InitializeConditionVariable(voidptr) +fn C.WakeConditionVariable(voidptr) +fn C.SleepConditionVariableSRW(voidptr, voidptr, u32, u32) int + +// TODO: The suggestion of using CriticalSection instead of mutex +// was discussed. Needs consideration. + +// Mutex HANDLE +type MHANDLE = voidptr + +// Semaphore HANDLE +type SHANDLE = voidptr + +//[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. + +// `SRWLOCK` is much more performant that `Mutex` on Windows, so use that in both cases since we don't want to share with other processes +[heap] +pub struct Mutex { +mut: + mx C.SRWLOCK // mutex handle +} + +[heap] +pub struct RwMutex { +mut: + mx C.SRWLOCK // mutex handle +} + +[heap] +struct Semaphore { + mtx C.SRWLOCK + cond C.CONDITION_VARIABLE +mut: + count u32 +} + +pub fn new_mutex() &Mutex { + mut m := &Mutex{} + m.init() + return m +} + +pub fn new_rwmutex() &RwMutex { + mut m := &RwMutex{} + m.init() + return m +} + +pub fn (mut m Mutex) init() { + C.InitializeSRWLock(&m.mx) +} + +pub fn (mut m RwMutex) init() { + C.InitializeSRWLock(&m.mx) +} + +pub fn (mut m Mutex) @lock() { + C.AcquireSRWLockExclusive(&m.mx) +} + +pub fn (mut m Mutex) unlock() { + C.ReleaseSRWLockExclusive(&m.mx) +} + +// RwMutex has separate read- and write locks +pub fn (mut m RwMutex) @rlock() { + C.AcquireSRWLockShared(&m.mx) +} + +pub fn (mut m RwMutex) @lock() { + C.AcquireSRWLockExclusive(&m.mx) +} + +// Windows SRWLocks have different function to unlock +// So provide two functions here, too, to have a common interface +pub fn (mut m RwMutex) runlock() { + C.ReleaseSRWLockShared(&m.mx) +} + +pub fn (mut m RwMutex) unlock() { + C.ReleaseSRWLockExclusive(&m.mx) +} + +pub fn (mut m Mutex) destroy() { + // nothing to do +} + +[inline] +pub fn new_semaphore() &Semaphore { + return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) &Semaphore { + mut sem := &Semaphore{} + sem.init(n) + return sem +} + +pub fn (mut sem Semaphore) init(n u32) { + C.atomic_store_u32(&sem.count, n) + C.InitializeSRWLock(&sem.mtx) + C.InitializeConditionVariable(&sem.cond) +} + +pub fn (mut sem Semaphore) post() { + mut c := C.atomic_load_u32(&sem.count) + for c > 1 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) { + return + } + } + C.AcquireSRWLockExclusive(&sem.mtx) + c = C.atomic_fetch_add_u32(&sem.count, 1) + if c == 0 { + C.WakeConditionVariable(&sem.cond) + } + C.ReleaseSRWLockExclusive(&sem.mtx) +} + +pub fn (mut sem Semaphore) wait() { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + return + } + } + C.AcquireSRWLockExclusive(&sem.mtx) + c = C.atomic_load_u32(&sem.count) + + outer: for { + if c == 0 { + C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, C.INFINITE, 0) + c = C.atomic_load_u32(&sem.count) + } + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + if c > 1 { + C.WakeConditionVariable(&sem.cond) + } + break outer + } + } + } + C.ReleaseSRWLockExclusive(&sem.mtx) +} + +pub fn (mut sem Semaphore) try_wait() bool { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + return true + } + } + return false +} + +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + return true + } + } + mut ft_start := C._FILETIME{} + C.GetSystemTimeAsFileTime(&ft_start) + time_end := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) + + u64(timeout / (100 * time.nanosecond)) + mut t_ms := timeout.sys_milliseconds() + C.AcquireSRWLockExclusive(&sem.mtx) + mut res := 0 + c = C.atomic_load_u32(&sem.count) + + outer: for { + if c == 0 { + res = C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, t_ms, 0) + if res == 0 { + break outer + } + c = C.atomic_load_u32(&sem.count) + } + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { + if c > 1 { + C.WakeConditionVariable(&sem.cond) + } + break outer + } + } + C.GetSystemTimeAsFileTime(&ft_start) + time_now := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) // in 100ns + if time_now > time_end { + break outer // timeout exceeded + } + t_ms = u32((time_end - time_now) / 10000) + } + C.ReleaseSRWLockExclusive(&sem.mtx) + return res != 0 +} + +pub fn (s Semaphore) destroy() { +} diff --git a/v_windows/v/vlib/sync/threads/threads.c.v b/v_windows/v/vlib/sync/threads/threads.c.v new file mode 100644 index 0000000..02506c2 --- /dev/null +++ b/v_windows/v/vlib/sync/threads/threads.c.v @@ -0,0 +1,13 @@ +module threads + +// This module adds the necessary compiler flags for using threads. +// It is automatically imported by code that does `go func()` . +// See vlib/v/parser/pratt.v, search for ast.GoExpr . +// The goal is that programs, that do not use threads at all will not need +// to link to -lpthread etc. +// NB: on some platforms like Android, linking -lpthread is not needed too. +// See https://stackoverflow.com/a/31277163/1904615 + +$if !windows && !android { + #flag -lpthread +} diff --git a/v_windows/v/vlib/sync/threads/threads.v b/v_windows/v/vlib/sync/threads/threads.v new file mode 100644 index 0000000..f20fc0e --- /dev/null +++ b/v_windows/v/vlib/sync/threads/threads.v @@ -0,0 +1,4 @@ +module threads + +// This file is just a placeholder. +// The actual implementation is backend/platform specific, so see threads.c.v, threads.js.v etc. diff --git a/v_windows/v/vlib/sync/waitgroup.v b/v_windows/v/vlib/sync/waitgroup.v new file mode 100644 index 0000000..3e9ac39 --- /dev/null +++ b/v_windows/v/vlib/sync/waitgroup.v @@ -0,0 +1,84 @@ +// Copyright (c) 2019 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +[trusted] +fn C.atomic_fetch_add_u32(voidptr, u32) u32 + +[trusted] +fn C.atomic_load_u32(voidptr) u32 + +[trusted] +fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool + +// WaitGroup +// Do not copy an instance of WaitGroup, use a ref instead. +// +// usage: in main thread: +// `wg := sync.new_waitgroup() +// `wg.add(nr_jobs)` before starting jobs with `go ...` +// `wg.wait()` to wait for all jobs to have finished +// +// in each parallel job: +// `wg.done()` when finished +// +// [init_with=new_waitgroup] // TODO: implement support for init_with struct attribute, and disallow WaitGroup{} from outside the sync.new_waitgroup() function. +[heap] +struct WaitGroup { +mut: + task_count u32 // current task count - reading/writing should be atomic + wait_count u32 // current wait count - reading/writing should be atomic + sem Semaphore // This blocks wait() until tast_countreleased by add() +} + +pub fn new_waitgroup() &WaitGroup { + mut wg := WaitGroup{} + wg.init() + return &wg +} + +pub fn (mut wg WaitGroup) init() { + wg.sem.init(0) +} + +// add increments (+ve delta) or decrements (-ve delta) task count by delta +// and unblocks any wait() calls if task count becomes zero. +// add panics if task count drops below zero. +pub fn (mut wg WaitGroup) add(delta int) { + old_nrjobs := int(C.atomic_fetch_add_u32(&wg.task_count, u32(delta))) + new_nrjobs := old_nrjobs + delta + mut num_waiters := C.atomic_load_u32(&wg.wait_count) + if new_nrjobs < 0 { + panic('Negative number of jobs in waitgroup') + } + + if new_nrjobs == 0 && num_waiters > 0 { + // clear waiters + for !C.atomic_compare_exchange_weak_u32(&wg.wait_count, &num_waiters, 0) { + if num_waiters == 0 { + return + } + } + for (num_waiters > 0) { + wg.sem.post() + num_waiters-- + } + } +} + +// done is a convenience fn for add(-1) +pub fn (mut wg WaitGroup) done() { + wg.add(-1) +} + +// wait blocks until all tasks are done (task count becomes zero) +pub fn (mut wg WaitGroup) wait() { + nrjobs := int(C.atomic_load_u32(&wg.task_count)) + if nrjobs == 0 { + // no need to wait + return + } + C.atomic_fetch_add_u32(&wg.wait_count, 1) + wg.sem.wait() // blocks until task_count becomes 0 +} diff --git a/v_windows/v/vlib/sync/waitgroup_test.v b/v_windows/v/vlib/sync/waitgroup_test.v new file mode 100644 index 0000000..493665f --- /dev/null +++ b/v_windows/v/vlib/sync/waitgroup_test.v @@ -0,0 +1,41 @@ +module sync + +import time + +fn test_waitgroup_reuse() { + mut wg := new_waitgroup() + + wg.add(1) + wg.done() + + wg.add(1) + mut executed := false + go fn (mut wg WaitGroup, executed voidptr) { + defer { + wg.done() + } + unsafe { + *(&bool(executed)) = true + } + time.sleep(100 * time.millisecond) + assert wg.wait_count == 1 + }(mut wg, voidptr(&executed)) + + wg.wait() + assert executed + assert wg.wait_count == 0 +} + +fn test_waitgroup_no_use() { + mut done := false + go fn (done voidptr) { + time.sleep(1 * time.second) + if *(&bool(done)) == false { + panic('test_waitgroup_no_use did not complete in time') + } + }(voidptr(&done)) + + mut wg := new_waitgroup() + wg.wait() + done = true +} |