diff options
| author | Indrajith K L | 2022-12-03 17:00:20 +0530 | 
|---|---|---|
| committer | Indrajith K L | 2022-12-03 17:00:20 +0530 | 
| commit | f5c4671bfbad96bf346bd7e9a21fc4317b4959df (patch) | |
| tree | 2764fc62da58f2ba8da7ed341643fc359873142f /v_windows/v/old/vlib/sync | |
| download | cli-tools-windows-f5c4671bfbad96bf346bd7e9a21fc4317b4959df.tar.gz cli-tools-windows-f5c4671bfbad96bf346bd7e9a21fc4317b4959df.tar.bz2 cli-tools-windows-f5c4671bfbad96bf346bd7e9a21fc4317b4959df.zip | |
Diffstat (limited to 'v_windows/v/old/vlib/sync')
39 files changed, 3360 insertions, 0 deletions
| diff --git a/v_windows/v/old/vlib/sync/array_rlock_test.v b/v_windows/v/old/vlib/sync/array_rlock_test.v new file mode 100644 index 0000000..ad4f778 --- /dev/null +++ b/v_windows/v/old/vlib/sync/array_rlock_test.v @@ -0,0 +1,38 @@ +fn test_shared_modification() { +	shared foo := &[2, 0, 5] +	lock foo { +		unsafe { +			foo[1] = 3 +			foo[0] *= 7 +			foo[1]-- +			foo[2] -= 2 +		} +	} +	rlock foo { +		unsafe { +			assert foo[0] == 14 +			assert foo[1] == 2 +			assert foo[2] == 3 +		} +	} +} + +[direct_array_access] +fn test_shared_direct_modification() { +	shared foo := &[2, 0, 5] +	lock foo { +		unsafe { +			foo[1] = 3 +			foo[0] *= 7 +			foo[1]-- +			foo[2] -= 2 +		} +	} +	rlock foo { +		unsafe { +			assert foo[0] == 14 +			assert foo[1] == 2 +			assert foo[2] == 3 +		} +	} +} diff --git a/v_windows/v/old/vlib/sync/atomic2/atomic.v b/v_windows/v/old/vlib/sync/atomic2/atomic.v new file mode 100644 index 0000000..2ff64f2 --- /dev/null +++ b/v_windows/v/old/vlib/sync/atomic2/atomic.v @@ -0,0 +1,88 @@ +module atomic2 + +/* +Implements the atomic operations. For now TCC does not support +the atomic versions on nix so it uses locks to simulate the same behavor. +On windows tcc can simulate with other atomic operations. + +The @VEXEROOT/thirdparty/stdatomic contains compability header files +for stdatomic that supports both nix, windows and c++. + +This implementations should be regarded as alpha stage and be +further tested. +*/ + +#flag windows -I @VEXEROOT/thirdparty/stdatomic/win +#flag linux -I @VEXEROOT/thirdparty/stdatomic/nix +#flag darwin -I @VEXEROOT/thirdparty/stdatomic/nix +#flag freebsd -I @VEXEROOT/thirdparty/stdatomic/nix +#flag solaris -I @VEXEROOT/thirdparty/stdatomic/nix + +$if linux { +	$if tinyc { +		$if amd64 { +			// most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/6 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/7 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/8 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/9 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/10 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/11 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/12 +		} $else $if arm64 { +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/6 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/7 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/8 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/9 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/10 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/11 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/12 +		} +		#flag -latomic +	} +} +#include <atomic.h> +// add_u64 adds provided delta as an atomic operation +pub fn add_u64(ptr &u64, delta int) bool { +	res := C.atomic_fetch_add_u64(voidptr(ptr), delta) +	return res == 0 +} + +// sub_u64 subtracts provided delta as an atomic operation +pub fn sub_u64(ptr &u64, delta int) bool { +	res := C.atomic_fetch_sub_u64(voidptr(ptr), delta) +	return res == 0 +} + +// add_i64 adds provided delta as an atomic operation +pub fn add_i64(ptr &i64, delta int) bool { +	res := C.atomic_fetch_add_u64(voidptr(ptr), delta) +	return res == 0 +} + +// add_i64 subtracts provided delta as an atomic operation +pub fn sub_i64(ptr &i64, delta int) bool { +	res := C.atomic_fetch_sub_u64(voidptr(ptr), delta) +	return res == 0 +} + +// atomic store/load operations have to be used when there might be another concurrent access +// atomicall set a value +pub fn store_u64(ptr &u64, val u64) { +	C.atomic_store_u64(voidptr(ptr), val) +} + +// atomicall get a value +pub fn load_u64(ptr &u64) u64 { +	return C.atomic_load_u64(voidptr(ptr)) +} + +// atomicall set a value +pub fn store_i64(ptr &i64, val i64) { +	C.atomic_store_u64(voidptr(ptr), val) +} + +// atomicall get a value +pub fn load_i64(ptr &i64) i64 { +	return i64(C.atomic_load_u64(voidptr(ptr))) +} diff --git a/v_windows/v/old/vlib/sync/atomic2/atomic_test.v b/v_windows/v/old/vlib/sync/atomic2/atomic_test.v new file mode 100644 index 0000000..7a5ffd8 --- /dev/null +++ b/v_windows/v/old/vlib/sync/atomic2/atomic_test.v @@ -0,0 +1,105 @@ +import sync.atomic2 +import sync + +const ( +	iterations_per_cycle = 100_000 +) + +struct Counter { +mut: +	counter u64 +} + +// without proper syncronization this would fail +fn test_count_10_times_1_cycle_should_result_10_cycles_with_sync() { +	desired_iterations := 10 * iterations_per_cycle +	mut wg := sync.new_waitgroup() +	mut counter := &Counter{} +	wg.add(10) +	for i := 0; i < 10; i++ { +		go count_one_cycle(mut counter, mut wg) +	} +	wg.wait() +	assert counter.counter == desired_iterations +	eprintln('   with synchronization the counter is: ${counter.counter:10} , expectedly == ${desired_iterations:10}') +} + +// This test just to make sure that we have an anti-test to prove it works +fn test_count_10_times_1_cycle_should_not_be_10_cycles_without_sync() { +	desired_iterations := 10 * iterations_per_cycle +	mut wg := sync.new_waitgroup() +	mut counter := &Counter{} +	wg.add(10) +	for i := 0; i < 10; i++ { +		go count_one_cycle_without_sync(mut counter, mut wg) +	} +	wg.wait() +	// NB: we do not assert here, just print, because sometimes by chance counter.counter may be == desired_iterations +	eprintln('without synchronization the counter is: ${counter.counter:10} , expectedly != ${desired_iterations:10}') +} + +fn test_count_plus_one_u64() { +	mut c := u64(0) +	atomic2.add_u64(&c, 1) +	assert atomic2.load_u64(&c) == 1 +} + +fn test_count_plus_one_i64() { +	mut c := i64(0) +	atomic2.add_i64(&c, 1) +	assert atomic2.load_i64(&c) == 1 +} + +fn test_count_plus_greater_than_one_u64() { +	mut c := u64(0) +	atomic2.add_u64(&c, 10) +	assert atomic2.load_u64(&c) == 10 +} + +fn test_count_plus_greater_than_one_i64() { +	mut c := i64(0) +	atomic2.add_i64(&c, 10) +	assert atomic2.load_i64(&c) == 10 +} + +fn test_count_minus_one_u64() { +	mut c := u64(1) +	atomic2.sub_u64(&c, 1) +	assert atomic2.load_u64(&c) == 0 +} + +fn test_count_minus_one_i64() { +	mut c := i64(0) +	atomic2.sub_i64(&c, 1) +	assert atomic2.load_i64(&c) == -1 +} + +fn test_count_minus_greater_than_one_u64() { +	mut c := u64(0) +	atomic2.store_u64(&c, 10) +	atomic2.sub_u64(&c, 10) +	assert atomic2.load_u64(&c) == 0 +} + +fn test_count_minus_greater_than_one_i64() { +	mut c := i64(0) +	atomic2.store_i64(&c, 10) +	atomic2.sub_i64(&c, 20) +	assert atomic2.load_i64(&c) == -10 +} + +// count_one_cycle counts the common counter iterations_per_cycle times in thread-safe way +fn count_one_cycle(mut counter Counter, mut group sync.WaitGroup) { +	for i := 0; i < iterations_per_cycle; i++ { +		atomic2.add_u64(&counter.counter, 1) +	} +	group.done() +} + +// count_one_cycle_without_sync counts the common counter iterations_per_cycle times in none thread-safe way +fn count_one_cycle_without_sync(mut counter Counter, mut group sync.WaitGroup) { +	for i := 0; i < iterations_per_cycle; i++ { +		counter.counter++ +	} +	group.done() +} diff --git a/v_windows/v/old/vlib/sync/bench/channel_bench_go.go b/v_windows/v/old/vlib/sync/bench/channel_bench_go.go new file mode 100644 index 0000000..a0afbbc --- /dev/null +++ b/v_windows/v/old/vlib/sync/bench/channel_bench_go.go @@ -0,0 +1,68 @@ +package main + +import "fmt" +import "log" +import "os" +import "time" +import "strconv" + +func assert_eq(a, b int64) { +	if a != b { +		log.Fatalf("assertion failed\nleft: %d, right: %d\n", a, b) +	} +} + +func do_rec(ch chan int32, resch chan int64, n int32) { +	var sum int64 +	var i int32 +	for i = 0; i < n; i++ { +		sum += int64(<- ch) +	} +	fmt.Println(sum) +	resch <- sum +} + +func do_send(ch chan int32, start, end int32) { +	for i := start; i < end; i++ { +		ch <- i +	} +} + +func main() { +	if len(os.Args) != 5 { +		log.Fatalf("usage:\n\t%s <nsend> <nrec> <buflen> <nobj>\n", os.Args[0]) +	} +	nsend, _ := strconv.Atoi(os.Args[1]) +	nrec, _ := strconv.Atoi(os.Args[2]) +	buflen, _ := strconv.Atoi(os.Args[3]) +	nobj, _ := strconv.Atoi(os.Args[4]) +	stopwatch := time.Now() +	ch := make(chan int32, buflen) +	resch := make(chan int64, 0) +	no := nobj +	for i := 0; i < nrec; i++ { +		n := no / (nrec - i) +		go do_rec(ch, resch, int32(n)) +		no -= n +	} +	assert_eq(int64(no), 0)	 +	no = nobj +	for i := 0; i < nsend; i++ { +		n := no / (nsend - i) +		end := no +		no -= n +		go do_send(ch, int32(no), int32(end)) +	} +	assert_eq(int64(no), 0)	 +	var sum int64 +	for i := 0; i < nrec; i++ { +		sum += <-resch +	} +	elapsed := time.Now().Sub(stopwatch) +	rate := float64(nobj)/float64(elapsed.Nanoseconds())*1000.0 +	duration := 1.0e-09 * float64(elapsed.Nanoseconds()) +	fmt.Printf("%d objects in %g s (%.2f objs/µs)\n", nobj, duration, rate) +	expected_sum := int64(nobj)*int64(nobj-1)/2 +	fmt.Printf("got: %d, expected: %d\n", sum, expected_sum) +	assert_eq(sum, expected_sum) +} diff --git a/v_windows/v/old/vlib/sync/bench/channel_bench_v.v b/v_windows/v/old/vlib/sync/bench/channel_bench_v.v new file mode 100644 index 0000000..54dcfe9 --- /dev/null +++ b/v_windows/v/old/vlib/sync/bench/channel_bench_v.v @@ -0,0 +1,64 @@ +// Channel Benchmark +// +// `nobj` integers are sent thru a channel with queue length`buflen` +// using `nsend` sender threads and `nrec` receiver threads. +// +// The receive threads add all received numbers and send them to the +// main thread where the total sum is compare to the expected value. +import time +import os + +fn do_rec(ch chan int, resch chan i64, n int) { +	mut sum := i64(0) +	for _ in 0 .. n { +		sum += <-ch +	} +	println(sum) +	resch <- sum +} + +fn do_send(ch chan int, start int, end int) { +	for i in start .. end { +		ch <- i +	} +} + +fn main() { +	if os.args.len != 5 { +		eprintln('usage:\n\t${os.args[0]} <nsend> <nrec> <buflen> <nobj>') +		exit(1) +	} +	nsend := os.args[1].int() +	nrec := os.args[2].int() +	buflen := os.args[3].int() +	nobj := os.args[4].int() +	stopwatch := time.new_stopwatch() +	ch := chan int{cap: buflen} +	resch := chan i64{} +	mut no := nobj +	for i in 0 .. nrec { +		n := no / (nrec - i) +		go do_rec(ch, resch, n) +		no -= n +	} +	assert no == 0 +	no = nobj +	for i in 0 .. nsend { +		n := no / (nsend - i) +		end := no +		no -= n +		go do_send(ch, no, end) +	} +	assert no == 0 +	mut sum := i64(0) +	for _ in 0 .. nrec { +		sum += <-resch +	} +	elapsed := stopwatch.elapsed() +	rate := f64(nobj) / elapsed * time.microsecond +	println('$nobj objects in ${f64(elapsed) / time.second} s (${rate:.2f} objs/µs)') +	// use sum formula by Gauß to calculate the expected result +	expected_sum := i64(nobj) * (nobj - 1) / 2 +	println('got: $sum, expected: $expected_sum') +	assert sum == expected_sum +} diff --git a/v_windows/v/old/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v b/v_windows/v/old/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v new file mode 100644 index 0000000..999bb1d --- /dev/null +++ b/v_windows/v/old/vlib/sync/bench/many_writers_and_receivers_on_1_channel.v @@ -0,0 +1,150 @@ +import os +import os.cmdline +import time +import sync + +// Usage: +// many_writers_and_receivers_on_1_channel [-readers 1] [-writers 4] [-chan_cap 100] [-iterations 25000] > results.csv +// +// You can then open results.csv in Excel/Calc and for example plot the first vs the second column. +enum EventKind { +	push +	pop +} + +struct Event { +	is_set  bool +	id      int +	gtime   u64 // nanoseconds +	i       int +	kind    EventKind +	elapsed i64 // nanoseconds, elapsed after the previous event of the same kind +} + +struct Context { +mut: +	n_iters   int +	n_readers int +	n_writers int +	// +	pops_wg &sync.WaitGroup +	pops    []Event +	// +	pushes_wg &sync.WaitGroup +	pushes    []Event +} + +fn do_rec(ch chan int, id int, mut ctx Context) { +	eprintln('start of  do_rec id: $id') +	mut timer_sw_x := time.new_stopwatch() +	mut tmp := int(0) +	mut i := int(0) +	// NB: a single receiver thread can get slightly more +	// than its fair share of sends, that is why +	// the receiver's Event array is much larger, +	// enough so a single receiver can potentially process all +	// writers pushes, and it is partitioned over all of +	// id, ctx.n_writers and n_iters: +	n_iters := ctx.n_iters +	base := id * n_iters * ctx.n_writers +	for { +		for ch.try_pop(tmp) == .success { +			ctx.pops[base + i] = Event{ +				is_set: true +				id: id +				gtime: time.sys_mono_now() +				i: i +				kind: .pop +				elapsed: timer_sw_x.elapsed().nanoseconds() +			} +			timer_sw_x.restart() +			i++ +			if tmp == 1 { +				ctx.pops_wg.done() +				return +			} +		} +	} +} + +fn do_send(ch chan int, id int, mut ctx Context) { +	eprintln('start of do_send id: $id') +	mut timer_sw_x := time.new_stopwatch() +	n_iters := ctx.n_iters +	base := n_iters * id // sender events can not overlap +	for i := 0; i < n_iters; i++ { +		idx := base + i +		ctx.pushes[idx] = Event{ +			is_set: true +			id: id +			gtime: time.sys_mono_now() +			i: i +			kind: .push +			elapsed: timer_sw_x.elapsed().nanoseconds() +		} +		timer_sw_x.restart() +		tmp := int(0) +		ch <- tmp +	} +	ctx.pushes_wg.done() +} + +fn main() { +	// +	args := os.args[1..] +	if '-h' in args || '--help' in args { +		eprintln('Usage:\n many_writers_and_receivers_on_1_channel [-readers 1] [-writers 4] [-chan_cap 100] [-iterations 25000]') +		exit(0) +	} +	n_iters := cmdline.option(args, '-iterations', '25000').int() +	n_readers := cmdline.option(args, '-readers', '1').int() +	n_writers := cmdline.option(args, '-writers', '4').int() +	chan_cap := cmdline.option(args, '-chan_cap', '100').int() +	eprintln('> n_iters, $n_iters, n_writers, $n_writers, n_readers, $n_readers, chan_cap, $chan_cap') +	// +	ch := chan int{cap: chan_cap} +	max_number_of_pushes := n_writers * (n_iters + 2) +	max_number_of_pops := max_number_of_pushes * n_readers +	eprintln('> max_number_of_pushes, $max_number_of_pushes, max_number_of_pops (per receiver), $max_number_of_pops') +	mut ctx := &Context{ +		n_iters: n_iters +		n_readers: n_readers +		n_writers: n_writers +		pushes_wg: sync.new_waitgroup() +		pops_wg: sync.new_waitgroup() +		pushes: []Event{len: max_number_of_pushes} +		pops: []Event{len: max_number_of_pops} +	} +	ctx.pops_wg.add(n_readers) +	for i := 0; i < n_readers; i++ { +		go do_rec(ch, i, mut ctx) +	} +	ctx.pushes_wg.add(n_writers) +	for i := 0; i < n_writers; i++ { +		go do_send(ch, i, mut ctx) +	} +	ctx.pushes_wg.wait() +	eprintln('>> all pushes done') +	for i := 0; i < n_readers; i++ { +		ch <- 1 +	} +	ctx.pops_wg.wait() +	eprintln('>> all pops done') +	mut all_events := []Event{} +	all_events << ctx.pops +	all_events << ctx.pushes +	all_events.sort(a.elapsed < b.elapsed) +	mut i := 0 +	for e in all_events { +		if !e.is_set { +			continue +		} +		i++ +		if e.kind == .pop { +			println('${i:8} , ${e.elapsed:10}, ns ,  do_rec id:, ${e.id:3} , i=, ${e.i:5} , ${e.gtime:20}') +		} +		if e.kind == .push { +			println('${i:8} , ${e.elapsed:10}, ns , do_send id:, ${e.id:3} , i=, ${e.i:5} , ${e.gtime:20}') +		} +	} +} diff --git a/v_windows/v/old/vlib/sync/bench/results.md b/v_windows/v/old/vlib/sync/bench/results.md new file mode 100644 index 0000000..882471c --- /dev/null +++ b/v_windows/v/old/vlib/sync/bench/results.md @@ -0,0 +1,48 @@ +# Channel Benchmark Results + +This documents lists several benchmark results for different platforms in order to +identify performance regressions and improvements. + +The are measured using the command + +``` +> channel_bench_* <nsend> <nrec> <buflen> <nobj> + +nsend ... number of threads that push objects into the channel +nrec .... number of threads that pop objects from the channel +buflen .. length of channel buffer queue - `0` means unbuffered channel +nobj .... number of objects to pass thru the channel +``` + +## AMD Ryzen 7 3800X, Ubuntu-20.04 x86_64 + +10000000 Objects transfered, results in Objects/µs + +| nsend | nrec | buflen | **V (gcc -O2)** | **V (clang)** | **V (tcc)** | **Go (golang)** | **Go (gccgo -O2)** | +| :---: | :---:| :---:  |      :---:      |    :---:      |    :---:    |     :---:      |      :---:         | +|   1   |   1  |    0   |      1.97       |    1.63       |    2.08     |      4.65      |       0.56         | +|   1   |   1  |   100  |      3.05       |    2.29       |    1.93     |     18.90      |       6.08         | +|   4   |   4  |    0   |      0.87       |    0.90       |    0.99     |      1.84      |       0.84         | +|   4   |   4  |   100  |      3.35       |    3.07       |    2.92     |      7.43      |       3.71         | + +## AMD Ryzen 7 3800X, Windows 10 2004 x64 + +| nsend | nrec | buflen | **V (gcc -O2)**  | **V (msvc /O2)** | **V (tcc)** | **Go (golang)** | +| :---: | :---:| :---:  |      :---:       |      :---:       |    :---:    |     :---:       | +|   1   |   1  |    0   |      2.30        |      3.76        |    2.02     |      4.67       | +|   1   |   1  |   100  |      2.96        |      3.12        |    2.26     |     23.31       | +|   4   |   4  |    0   |      0.90        |      1.05        |    0.83     |      1.38       | +|   4   |   4  |   100  |      2.28        |      2.16        |    2.43     |      4.63       | + +## Raspberry Pi 3B+, Void Linux musl 32 bit + +10000000 Objects transfered, results in Objects/µs + +| nsend | nrec | buflen | **V (gcc -O2)** | **Go (golang)** | +| :---: | :---:| :---:  |      :---:      |     :---:      | +|   1   |   1  |    0   |      0.37       |     0.21       | +|   1   |   1  |   100  |      1.03       |     0.74       | +|   4   |   4  |    0   |      0.04       |     0.38       | +|   4   |   4  |   100  |      2.78       |     2.63       | +|   2   |   2  |    0   |      0.05       |     0.38       | +|   2   |   2  |   100  |      1.26       |     0.75       | diff --git a/v_windows/v/old/vlib/sync/channel_1_test.v b/v_windows/v/old/vlib/sync/channel_1_test.v new file mode 100644 index 0000000..17588fd --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_1_test.v @@ -0,0 +1,25 @@ +const ( +	num_iterations = 10000 +) + +fn do_send(ch chan int) { +	for i in 0 .. num_iterations { +		ch <- i +	} +} + +fn test_channel_buffered() { +	ch := chan int{cap: 1000} +	go do_send(ch) +	mut sum := i64(0) +	for _ in 0 .. num_iterations { +		sum += <-ch +	} +	assert sum == u64(num_iterations) * (num_iterations - 1) / 2 +} + +fn test_builtin_enum() { +	x := ChanState.closed +	assert x == .closed +	println(x) +} diff --git a/v_windows/v/old/vlib/sync/channel_2_test.v b/v_windows/v/old/vlib/sync/channel_2_test.v new file mode 100644 index 0000000..5e8251d --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_2_test.v @@ -0,0 +1,19 @@ +const ( +	num_iterations = 10000 +) + +fn do_send(ch chan int) { +	for i in 0 .. num_iterations { +		ch <- i +	} +} + +fn test_channel_unbuffered() { +	ch := chan int{} +	go do_send(ch) +	mut sum := i64(0) +	for _ in 0 .. num_iterations { +		sum += <-ch +	} +	assert sum == u64(num_iterations) * (num_iterations - 1) / 2 +} diff --git a/v_windows/v/old/vlib/sync/channel_3_test.v b/v_windows/v/old/vlib/sync/channel_3_test.v new file mode 100644 index 0000000..d07276b --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_3_test.v @@ -0,0 +1,32 @@ +fn do_rec(ch chan int, resch chan i64) { +	mut sum := i64(0) +	for _ in 0 .. 2000 { +		sum += <-ch +	} +	println(sum) +	resch <- sum +} + +fn do_send(ch chan int) { +	for i in 0 .. 2000 { +		ch <- i +	} +} + +fn test_channel_multi_unbuffered() { +	ch := chan int{} +	resch := chan i64{} +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_send(ch) +	go do_send(ch) +	go do_send(ch) +	go do_send(ch) +	mut sum := i64(0) +	for _ in 0 .. 4 { +		sum += <-resch +	} +	assert sum == i64(4) * 2000 * (2000 - 1) / 2 +} diff --git a/v_windows/v/old/vlib/sync/channel_4_test.v b/v_windows/v/old/vlib/sync/channel_4_test.v new file mode 100644 index 0000000..3792668 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_4_test.v @@ -0,0 +1,32 @@ +fn do_rec(ch chan int, resch chan i64) { +	mut sum := i64(0) +	for _ in 0 .. 2000 { +		sum += <-ch +	} +	println(sum) +	resch <- sum +} + +fn do_send(ch chan int) { +	for i in 0 .. 2000 { +		ch <- i +	} +} + +fn test_channel_multi_buffered() { +	ch := chan int{cap: 100} +	resch := chan i64{} +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_send(ch) +	go do_send(ch) +	go do_send(ch) +	go do_send(ch) +	mut sum := i64(0) +	for _ in 0 .. 4 { +		sum += <-resch +	} +	assert sum == i64(4) * 2000 * (2000 - 1) / 2 +} diff --git a/v_windows/v/old/vlib/sync/channel_array_mut_test.v b/v_windows/v/old/vlib/sync/channel_array_mut_test.v new file mode 100644 index 0000000..bfd53a1 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_array_mut_test.v @@ -0,0 +1,35 @@ +const ( +	num_iterations = 10000 +) + +struct St { +mut: +	dummy  i64 +	dummy2 u32 +	dummy3 i64 +	n      int +	dummy4 int +} + +// this function gets an array of channels for `St` references +fn do_rec_calc_send(chs []chan mut St) { +	for { +		mut s := <-chs[0] or { break } +		s.n++ +		chs[1] <- s +	} +} + +fn test_channel_array_mut() { +	mut chs := [chan mut St{cap: 1}, chan mut St{}] +	go do_rec_calc_send(chs) +	mut t := &St{ +		n: 100 +	} +	for _ in 0 .. num_iterations { +		chs[0] <- t +		t = <-chs[1] +	} +	chs[0].close() +	assert t.n == 100 + num_iterations +} diff --git a/v_windows/v/old/vlib/sync/channel_close_test.v b/v_windows/v/old/vlib/sync/channel_close_test.v new file mode 100644 index 0000000..a31bfa9 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_close_test.v @@ -0,0 +1,104 @@ +import time + +fn do_rec(ch chan int, resch chan i64) { +	mut sum := i64(0) +	for { +		a := <-ch or { break } +		sum += a +	} +	assert ch.closed == true +	println(sum) +	resch <- sum +} + +fn do_send(ch chan int) { +	for i in 0 .. 8000 { +		ch <- i +	} +	assert ch.closed == false +	ch.close() +	assert ch.closed == true +} + +fn test_channel_close_buffered_multi() { +	ch := chan int{cap: 10} +	resch := chan i64{} +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_send(ch) +	mut sum := i64(0) +	for _ in 0 .. 4 { +		sum += <-resch +	} +	assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_unbuffered_multi() { +	ch := chan int{} +	resch := chan i64{} +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_rec(ch, resch) +	go do_send(ch) +	mut sum := i64(0) +	for _ in 0 .. 4 { +		sum += <-resch +	} +	assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_buffered() { +	ch := chan int{cap: 100} +	resch := chan i64{} +	go do_rec(ch, resch) +	go do_send(ch) +	mut sum := i64(0) +	sum += <-resch +	assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_unbuffered() { +	ch := chan int{} +	resch := chan i64{cap: 100} +	go do_rec(ch, resch) +	go do_send(ch) +	mut sum := i64(0) +	sum += <-resch +	assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_send_close_buffered() { +	ch := chan int{cap: 1} +	t := go fn (ch chan int) { +		ch <- 31 +		mut x := 45 +		ch <- 17 or { x = -133 } + +		assert x == -133 +	}(ch) +	time.sleep(100 * time.millisecond) +	ch.close() +	mut r := <-ch +	r = <-ch or { 23 } +	assert r == 23 +	t.wait() +} + +fn test_channel_send_close_unbuffered() { +	time.sleep(1 * time.second) +	ch := chan int{} +	t := go fn (ch chan int) { +		mut x := 31 +		ch <- 177 or { x = -71 } + +		assert x == -71 +	}(ch) +	time.sleep(100 * time.millisecond) +	ch.close() +	r := <-ch or { 238 } +	assert r == 238 +	t.wait() +} diff --git a/v_windows/v/old/vlib/sync/channel_fill_test.v b/v_windows/v/old/vlib/sync/channel_fill_test.v new file mode 100644 index 0000000..b4eabc0 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_fill_test.v @@ -0,0 +1,22 @@ +import sync + +const ( +	queue_len  = 1000 +	queue_fill = 763 +) + +fn do_send(ch chan int, mut fin sync.Semaphore) { +	for i in 0 .. queue_fill { +		ch <- i +	} +	fin.post() +} + +fn test_channel_len_cap() { +	ch := chan int{cap: queue_len} +	mut sem := sync.new_semaphore() +	go do_send(ch, mut sem) +	sem.wait() +	assert ch.cap == queue_len +	assert ch.len == queue_fill +} diff --git a/v_windows/v/old/vlib/sync/channel_opt_propagate_test.v b/v_windows/v/old/vlib/sync/channel_opt_propagate_test.v new file mode 100644 index 0000000..a7e26fe --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_opt_propagate_test.v @@ -0,0 +1,39 @@ +import sync + +const ( +	num_iterations = 10000 +) + +fn get_val_from_chan(ch chan i64) ?i64 { +	r := <-ch ? +	return r +} + +// this function gets an array of channels for `i64` +fn do_rec_calc_send(chs []chan i64, mut sem sync.Semaphore) { +	mut msg := '' +	for { +		mut s := get_val_from_chan(chs[0]) or { +			msg = err.msg +			break +		} +		s++ +		chs[1] <- s +	} +	assert msg == 'channel closed' +	sem.post() +} + +fn test_channel_array_mut() { +	mut chs := [chan i64{}, chan i64{cap: 10}] +	mut sem := sync.new_semaphore() +	go do_rec_calc_send(chs, mut sem) +	mut t := i64(100) +	for _ in 0 .. num_iterations { +		chs[0] <- t +		t = <-chs[1] +	} +	chs[0].close() +	sem.wait() +	assert t == 100 + num_iterations +} diff --git a/v_windows/v/old/vlib/sync/channel_polling_test.v b/v_windows/v/old/vlib/sync/channel_polling_test.v new file mode 100644 index 0000000..846dcfd --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_polling_test.v @@ -0,0 +1,56 @@ +// Channel Benchmark +// +// `nobj` integers are sent thru a channel with queue length`buflen` +// using `nsend` sender threads and `nrec` receiver threads. +// +// The receive threads add all received numbers and send them to the +// main thread where the total sum is compare to the expected value. +const ( +	nsend           = 2 +	nrec            = 2 +	buflen          = 100 +	nobj            = 10000 +	objs_per_thread = 5000 +) + +fn do_rec(ch chan int, resch chan i64, n int) { +	mut sum := i64(0) +	for _ in 0 .. n { +		mut r := 0 +		for ch.try_pop(mut r) != .success { +		} +		sum += r +	} +	println(sum) +	resch <- sum +} + +fn do_send(ch chan int, start int, end int) { +	for i in start .. end { +		for ch.try_push(i) != .success { +		} +	} +} + +fn test_channel_polling() { +	ch := chan int{cap: buflen} +	resch := chan i64{} +	for _ in 0 .. nrec { +		go do_rec(ch, resch, objs_per_thread) +	} +	mut n := nobj +	for _ in 0 .. nsend { +		end := n +		n -= objs_per_thread +		go do_send(ch, n, end) +	} +	mut sum := i64(0) +	for _ in 0 .. nrec { +		sum += <-resch +		println('> running sum: $sum') +	} +	// use sum formula by Gauß to calculate the expected result +	expected_sum := i64(nobj) * (nobj - 1) / 2 +	println('expected sum: $expected_sum | sum: $sum') +	assert sum == expected_sum +} diff --git a/v_windows/v/old/vlib/sync/channel_push_or_1_test.v b/v_windows/v/old/vlib/sync/channel_push_or_1_test.v new file mode 100644 index 0000000..1551d83 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_push_or_1_test.v @@ -0,0 +1,65 @@ +const n = 1000 + +const c = 100 + +fn f(ch chan int) { +	for _ in 0 .. n { +		_ := <-ch +	} +	ch.close() +} + +fn test_push_or_unbuffered() { +	ch := chan int{} +	go f(ch) +	mut j := 0 +	for { +		ch <- j or { break } + +		j++ +	} +	assert j == n +} + +fn test_push_or_buffered() { +	ch := chan int{cap: c} +	go f(ch) +	mut j := 0 +	for { +		ch <- j or { break } + +		j++ +	} +	// we don't know how many elements are in the buffer when the channel +	// is closed, so check j against an interval +	assert j >= n +	assert j <= n + c +} + +fn g(ch chan int, res chan int) { +	mut j := 0 +	for { +		ch <- j or { break } + +		j++ +	} +	println('done $j') +	res <- j +} + +fn test_many_senders() { +	ch := chan int{} +	res := chan int{} +	go g(ch, res) +	go g(ch, res) +	go g(ch, res) +	mut k := 0 +	for _ in 0 .. 3 * n { +		k = <-ch +	} +	ch.close() +	mut sum := <-res +	sum += <-res +	sum += <-res +	assert sum == 3 * n +} diff --git a/v_windows/v/old/vlib/sync/channel_push_or_2_test.v b/v_windows/v/old/vlib/sync/channel_push_or_2_test.v new file mode 100644 index 0000000..8f95395 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_push_or_2_test.v @@ -0,0 +1,25 @@ +const n = 1000 + +fn f(ch chan f64) { +	mut s := 0. +	for _ in 0 .. n { +		s += <-ch +	} +	assert s == f64(n * (n + 1) / 2) +	ch.close() +} + +fn do_send(ch chan f64, val f64) ?f64 { +	ch <- val ? +	return val + 1.0 +} + +fn test_push_propargate() { +	ch := chan f64{} +	go f(ch) +	mut s := 1.0 +	for { +		s = do_send(ch, s) or { break } +	} +	assert s == f64(n + 1) +} diff --git a/v_windows/v/old/vlib/sync/channel_select_2_test.v b/v_windows/v/old/vlib/sync/channel_select_2_test.v new file mode 100644 index 0000000..189aaf6 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_select_2_test.v @@ -0,0 +1,62 @@ +import time + +fn do_rec_i64(ch chan i64) { +	mut sum := i64(0) +	for _ in 0 .. 300 { +		sum += <-ch +	} +	assert sum == 300 * (300 - 1) / 2 +} + +fn do_send_int(ch chan int) { +	for i in 0 .. 300 { +		ch <- i +	} +} + +fn do_send_byte(ch chan byte) { +	for i in 0 .. 300 { +		ch <- byte(i) +	} +} + +fn do_send_i64(ch chan i64) { +	for i in 0 .. 300 { +		ch <- i +	} +} + +fn test_select() { +	chi := chan int{} +	chl := chan i64{cap: 1} +	chb := chan byte{cap: 10} +	recch := chan i64{cap: 0} +	go do_rec_i64(recch) +	go do_send_int(chi) +	go do_send_byte(chb) +	go do_send_i64(chl) +	mut sum := i64(0) +	mut rl := i64(0) +	mut sl := i64(0) +	for _ in 0 .. 1200 { +		select { +			ri := <-chi { +				sum += ri +			} +			recch <- sl { +				sl++ +			} +			rl = <-chl { +				sum += rl +			} +			rb := <-chb { +				sum += rb +			} +		} +	} +	// Use Gauß' formula for the first 2 contributions +	// the 3rd contribution is `byte` and must be seen modulo 256 +	expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2 +	assert sum == expected_sum +	time.sleep(20 * time.millisecond) // to give assert in coroutine enough time +} diff --git a/v_windows/v/old/vlib/sync/channel_select_3_test.v b/v_windows/v/old/vlib/sync/channel_select_3_test.v new file mode 100644 index 0000000..fdf6096 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_select_3_test.v @@ -0,0 +1,122 @@ +import time +import sync + +struct St { +	a int +} + +fn getint() int { +	return 8 +} + +fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, mut sem sync.Semaphore) { +	mut a := 5 +	select { +		a = <-ch3 { +			a = 0 +		} +		b := <-ch2 { +			a = b.a +		} +		ch3 <- 5 { +			a = 1 +		} +		ch2 <- St{ +			a: 37 +		} { +			a = 2 +		} +		ch4 <- (6 + 7 * 9) { +			a = 8 +		} +		ch5 <- getint() { +			a = 9 +		} +		300 * time.millisecond { +			a = 3 +		} +	} +	assert a == 3 +	sem.post() +} + +fn f2(ch1 chan St, ch2 chan int, mut sem sync.Semaphore) { +	mut r := 23 +	for i in 0 .. 2 { +		select { +			b := <-ch1 { +				r = b.a +			} +			ch2 <- r { +				r = 17 +			} +		} +		if i == 0 { +			assert r == 17 +		} else { +			assert r == 13 +		} +	} +	sem.post() +} + +fn test_select_blocks() { +	ch1 := chan int{cap: 1} +	ch2 := chan St{} +	ch3 := chan int{} +	ch4 := chan int{} +	ch5 := chan int{} +	mut sem := sync.new_semaphore() +	mut r := false +	t := select { +		b := <-ch1 { +			println(b) +		} +		else { +			// no channel ready +			r = true +		} +	} +	assert r == true +	assert t == true +	go f2(ch2, ch3, mut sem) +	n := <-ch3 +	assert n == 23 +	ch2 <- St{ +		a: 13 +	} +	sem.wait() +	stopwatch := time.new_stopwatch() +	go f1(ch1, ch2, ch3, ch4, ch5, mut sem) +	sem.wait() +	elapsed_ms := f64(stopwatch.elapsed()) / time.millisecond +	// https://docs.microsoft.com/en-us/windows-hardware/drivers/kernel/high-resolution-timers +	// > For example, for Windows running on an x86 processor, the default interval between +	// > system clock ticks is typically about 15 milliseconds, and the minimum interval +	// > between system clock ticks is about 1 millisecond. +	assert elapsed_ms >= 280.0 // 300 - (15ms + 5ms just in case) + +	ch1.close() +	ch2.close() +	mut h := 7 +	mut is_open := true +	if select { +		_ := <-ch2 { +			h = 0 +		} +		ch1 <- h { +			h = 1 +		} +		else { +			h = 2 +		} +	} { +		panic('channel is still open') +	} else { +		is_open = false +	} +	// no branch should have run +	assert h == 7 +	// since all channels are closed `select` should return `false` +	assert is_open == false +} diff --git a/v_windows/v/old/vlib/sync/channel_select_4_test.v b/v_windows/v/old/vlib/sync/channel_select_4_test.v new file mode 100644 index 0000000..77cd557 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_select_4_test.v @@ -0,0 +1,43 @@ +fn do_rec_i64(ch chan i64, sumch chan i64) { +	mut sum := i64(0) +	for _ in 0 .. 30000 { +		sum += <-ch +	} +	sumch <- sum +} + +fn do_send_int(ch chan int) { +	for i in 0 .. 30000 { +		ch <- i +	} +} + +fn test_select() { +	chi := chan int{cap: 10} +	recch := chan i64{cap: 10} +	chsum := chan i64{} +	go do_rec_i64(recch, chsum) +	go do_send_int(chi) +	mut sum := i64(0) +	mut sl := i64(0) +	for _ in 0 .. 60000 + recch.cap { +		select { +			ri := <-chi { +				sum += ri +			} +			recch <- sl { +				sl++ +			} +		} +	} +	// Use Gauß' formula +	expected_sum := i64(30000) * (30000 - 1) / 2 +	assert sum == expected_sum + +	mut sumrec := <-chsum +	// Empty receive buffer +	for _ in 0 .. recch.cap { +		sumrec += <-recch +	} +	assert sumrec == i64(30000 + recch.cap) * (30000 + recch.cap - 1) / 2 +} diff --git a/v_windows/v/old/vlib/sync/channel_select_5_test.v b/v_windows/v/old/vlib/sync/channel_select_5_test.v new file mode 100644 index 0000000..97228fe --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_select_5_test.v @@ -0,0 +1,61 @@ +fn do_rec_i64(ch chan i64, sumch chan i64) { +	mut sum := i64(0) +	for _ in 0 .. 10000 { +		sum += <-ch +	} +	sumch <- sum +} + +fn do_send_int(ch chan int) { +	for i in 0 .. 10000 { +		ch <- i +	} +} + +fn do_send_int2(ch chan int) { +	for i in 10000 .. 20000 { +		ch <- i +	} +} + +fn do_send_int3(ch chan int) { +	for i in 20000 .. 30000 { +		ch <- i +	} +} + +fn test_select() { +	chi := chan int{cap: 10} +	recch := chan i64{cap: 10} +	chsum := chan i64{} +	go do_rec_i64(recch, chsum) +	go do_rec_i64(recch, chsum) +	go do_rec_i64(recch, chsum) +	go do_send_int(chi) +	go do_send_int2(chi) +	go do_send_int3(chi) +	mut sum := i64(0) +	mut sl := i64(0) +	for _ in 0 .. 60000 + recch.cap { +		select { +			ri := <-chi { +				sum += ri +			} +			recch <- sl { +				sl++ +			} +		} +	} +	// Use Gauß' formula +	expected_sum := i64(30000) * (30000 - 1) / 2 +	assert sum == expected_sum + +	mut sumrec := <-chsum +	sumrec += <-chsum +	sumrec += <-chsum +	// Empty receive buffer +	for _ in 0 .. recch.cap { +		sumrec += <-recch +	} +	assert sumrec == i64(30000 + recch.cap) * (30000 + recch.cap - 1) / 2 +} diff --git a/v_windows/v/old/vlib/sync/channel_select_6_test.v b/v_windows/v/old/vlib/sync/channel_select_6_test.v new file mode 100644 index 0000000..e1b4ff8 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_select_6_test.v @@ -0,0 +1,75 @@ +// This test case runs concurrent 3 instances of `do_select` that +// communicate with 6 other threads doing send and receive operations. +// There are buffered and unbuffered channels - handled by one or two +// concurrend threads on the other side + +fn do_select(ch1 chan int, ch2 chan int, chf1 chan f64, chf2 chan f64, sumch1 chan i64, sumch2 chan i64) { +	mut sum1 := i64(0) +	mut sum2 := i64(0) +	f1 := 17. +	f2 := 7. +	for _ in 0 .. 20000 + chf1.cap / 3 { +		select { +			chf1 <- f1 {} +			i := <-ch1 { +				sum1 += i +			} +			j := <-ch2 { +				sum2 += j +			} +			chf2 <- f2 {} +		} +	} +	sumch1 <- sum1 +	sumch2 <- sum2 +} + +fn do_send_int(ch chan int, factor int) { +	for i in 0 .. 10000 { +		ch <- (i * factor) +	} +} + +fn do_rec_f64(ch chan f64, sumch chan f64) { +	mut sum := 0. +	for _ in 0 .. 10000 { +		sum += <-ch +	} +	sumch <- sum +} + +fn test_select() { +	ch1 := chan int{cap: 3} +	ch2 := chan int{} +	// buffer length of chf1 mus be mutiple of 3 (# select threads) +	chf1 := chan f64{cap: 30} +	chf2 := chan f64{} +	chsum1 := chan i64{} +	chsum2 := chan i64{} +	chsumf1 := chan f64{} +	chsumf2 := chan f64{} +	go do_send_int(ch1, 3) +	go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2) +	go do_rec_f64(chf1, chsumf1) +	go do_rec_f64(chf2, chsumf2) +	go do_rec_f64(chf2, chsumf2) +	go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2) +	go do_send_int(ch2, 7) +	go do_send_int(ch2, 17) +	go do_select(ch1, ch2, chf1, chf2, chsum1, chsum2) + +	sum1 := <-chsum1 + <-chsum1 + <-chsum1 +	sum2 := <-chsum2 + <-chsum2 + <-chsum2 +	mut sumf1 := <-chsumf1 +	// empty channel buffer +	for _ in 0 .. chf1.cap { +		sumf1 += <-chf1 +	} +	sumf2 := <-chsumf2 + <-chsumf2 +	// Use Gauß' formula +	expected_sum := i64(10000) * (10000 - 1) / 2 +	assert sum1 == 3 * expected_sum +	assert sum2 == (7 + 17) * expected_sum +	assert sumf1 == 17. * f64(10000 + chf1.cap) +	assert sumf2 == 7. * 20000 +} diff --git a/v_windows/v/old/vlib/sync/channel_select_test.v b/v_windows/v/old/vlib/sync/channel_select_test.v new file mode 100644 index 0000000..26ed641 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_select_test.v @@ -0,0 +1,84 @@ +/* +* ATTENTION! Do not use this file as an example! + * For that, please look at `channel_select_2_test.v` or `channel_select_3_test.v` + * + * This test case uses the implementation in `sync/channels.v` directly + * in order to test it independently from the support in the core language +*/ + +module sync + +import time + +fn do_rec_i64(mut ch Channel) { +	mut sum := i64(0) +	for _ in 0 .. 300 { +		mut a := i64(0) +		ch.pop(&a) +		sum += a +	} +	assert sum == 300 * (300 - 1) / 2 +} + +fn do_send_int(mut ch Channel) { +	for i in 0 .. 300 { +		ch.push(&i) +	} +} + +fn do_send_byte(mut ch Channel) { +	for i in 0 .. 300 { +		ii := byte(i) +		ch.push(&ii) +	} +} + +fn do_send_i64(mut ch Channel) { +	for i in 0 .. 300 { +		ii := i64(i) +		ch.push(&ii) +	} +} + +fn test_select() { +	mut chi := new_channel<int>(0) +	mut chl := new_channel<i64>(1) +	mut chb := new_channel<byte>(10) +	mut recch := new_channel<i64>(0) +	go do_rec_i64(mut recch) +	go do_send_int(mut chi) +	go do_send_byte(mut chb) +	go do_send_i64(mut chl) +	mut channels := [chi, recch, chl, chb] +	directions := [Direction.pop, .push, .pop, .pop] +	mut sum := i64(0) +	mut rl := i64(0) +	mut ri := int(0) +	mut rb := byte(0) +	mut sl := i64(0) +	mut objs := [voidptr(&ri), &sl, &rl, &rb] +	for _ in 0 .. 1200 { +		idx := channel_select(mut channels, directions, mut objs, time.infinite) +		match idx { +			0 { +				sum += ri +			} +			1 { +				sl++ +			} +			2 { +				sum += rl +			} +			3 { +				sum += rb +			} +			else { +				println('got $idx (timeout)') +			} +		} +	} +	// Use Gauß' formula for the first 2 contributions +	// the 3rd contribution is `byte` and must be seen modulo 256 +	expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2 +	assert sum == expected_sum +} diff --git a/v_windows/v/old/vlib/sync/channel_try_buf_test.v b/v_windows/v/old/vlib/sync/channel_try_buf_test.v new file mode 100644 index 0000000..e521314 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_try_buf_test.v @@ -0,0 +1,17 @@ +fn test_channel_try_buffered() { +	ch := chan int{cap: 5} +	for z in 2 .. 13 { +		if ch.try_push(z) == .not_ready { +			assert z == 7 +			break +		} +	} +	mut obj := int(0) +	for ch.try_pop(mut obj) == .success { +		println(obj) +	} +	assert obj == 6 +	ch <- 17 +	obj = <-ch +	assert obj == 17 +} diff --git a/v_windows/v/old/vlib/sync/channel_try_unbuf_test.v b/v_windows/v/old/vlib/sync/channel_try_unbuf_test.v new file mode 100644 index 0000000..ee11468 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channel_try_unbuf_test.v @@ -0,0 +1,13 @@ +fn test_channel_try_unbuffered() { +	ch := chan int{} +	for z in 5 .. 8 { +		if ch.try_push(z) == .not_ready { +			assert z == 5 +			break +		} +		panic('push on non-ready channel not detected') +	} +	mut obj := -17 +	for ch.try_pop(mut obj) == .success {} +	assert obj == -17 +} diff --git a/v_windows/v/old/vlib/sync/channels.v b/v_windows/v/old/vlib/sync/channels.v new file mode 100644 index 0000000..49f1396 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channels.v @@ -0,0 +1,730 @@ +module sync + +import time +import rand + +$if windows { +	#flag -I @VEXEROOT/thirdparty/stdatomic/win +} $else { +	#flag -I @VEXEROOT/thirdparty/stdatomic/nix +} + +$if linux { +	$if tinyc { +		$if amd64 { +			// most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/6 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/7 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/8 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/9 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/10 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/11 +			#flag -L/usr/lib/gcc/x86_64-linux-gnu/12 +		} $else $if arm64 { +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/6 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/7 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/8 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/9 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/10 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/11 +			#flag -L/usr/lib/gcc/aarch64-linux-gnu/12 +		} +		#flag -latomic +	} +} + +#include <atomic.h> +// The following functions are actually generic in C +fn C.atomic_load_ptr(voidptr) voidptr +fn C.atomic_store_ptr(voidptr, voidptr) +fn C.atomic_compare_exchange_weak_ptr(voidptr, voidptr, voidptr) bool +fn C.atomic_compare_exchange_strong_ptr(voidptr, voidptr, voidptr) bool +fn C.atomic_exchange_ptr(voidptr, voidptr) voidptr +fn C.atomic_fetch_add_ptr(voidptr, voidptr) voidptr +fn C.atomic_fetch_sub_ptr(voidptr, voidptr) voidptr + +fn C.atomic_load_u16(voidptr) u16 +fn C.atomic_store_u16(voidptr, u16) +fn C.atomic_compare_exchange_weak_u16(voidptr, voidptr, u16) bool +fn C.atomic_compare_exchange_strong_u16(voidptr, voidptr, u16) bool +fn C.atomic_exchange_u16(voidptr, u16) u16 +fn C.atomic_fetch_add_u16(voidptr, u16) u16 +fn C.atomic_fetch_sub_u16(voidptr, u16) u16 + +fn C.atomic_load_u32(voidptr) u32 +fn C.atomic_store_u32(voidptr, u32) +fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool +fn C.atomic_compare_exchange_strong_u32(voidptr, voidptr, u32) bool +fn C.atomic_exchange_u32(voidptr, u32) u32 +fn C.atomic_fetch_add_u32(voidptr, u32) u32 +fn C.atomic_fetch_sub_u32(voidptr, u32) u32 + +fn C.atomic_load_u64(voidptr) u64 +fn C.atomic_store_u64(voidptr, u64) +fn C.atomic_compare_exchange_weak_u64(voidptr, voidptr, u64) bool +fn C.atomic_compare_exchange_strong_u64(voidptr, voidptr, u64) bool +fn C.atomic_exchange_u64(voidptr, u64) u64 +fn C.atomic_fetch_add_u64(voidptr, u64) u64 +fn C.atomic_fetch_sub_u64(voidptr, u64) u64 + +const ( +	// how often to try to get data without blocking before to wait for semaphore +	spinloops     = 750 +	spinloops_sem = 4000 +) + +enum BufferElemStat { +	unused = 0 +	writing +	written +	reading +} + +struct Subscription { +mut: +	sem  &Semaphore +	prev &&Subscription +	nxt  &Subscription +} + +enum Direction { +	pop +	push +} + +struct Channel { +	ringbuf   &byte // queue for buffered channels +	statusbuf &byte // flags to synchronize write/read in ringbuf +	objsize   u32 +mut: // atomic +	writesem           Semaphore // to wake thread that wanted to write, but buffer was full +	readsem            Semaphore // to wake thread that wanted to read, but buffer was empty +	writesem_im        Semaphore +	readsem_im         Semaphore +	write_adr          C.atomic_uintptr_t // if != NULL the next obj can be written here without wait +	read_adr           C.atomic_uintptr_t // if != NULL an obj can be read from here without wait +	adr_read           C.atomic_uintptr_t // used to identify origin of writesem +	adr_written        C.atomic_uintptr_t // used to identify origin of readsem +	write_free         u32 // for queue state +	read_avail         u32 +	buf_elem_write_idx u32 +	buf_elem_read_idx  u32 +	// for select +	write_subscriber &Subscription +	read_subscriber  &Subscription +	write_sub_mtx    u16 +	read_sub_mtx     u16 +	closed           u16 +pub: +	cap u32 // queue length in #objects +} + +pub fn new_channel<T>(n u32) &Channel { +	st := sizeof(T) +	if isreftype(T) { +		return new_channel_st(n, st) +	} else { +		return new_channel_st_noscan(n, st) +	} +} + +fn new_channel_st(n u32, st u32) &Channel { +	wsem := if n > 0 { n } else { 1 } +	rsem := if n > 0 { u32(0) } else { 1 } +	rbuf := if n > 0 { unsafe { malloc(int(n * st)) } } else { &byte(0) } +	sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &byte(0) } +	mut ch := Channel{ +		objsize: st +		cap: n +		write_free: n +		read_avail: 0 +		ringbuf: rbuf +		statusbuf: sbuf +		write_subscriber: 0 +		read_subscriber: 0 +	} +	ch.writesem.init(wsem) +	ch.readsem.init(rsem) +	ch.writesem_im.init(0) +	ch.readsem_im.init(0) +	return &ch +} + +fn new_channel_st_noscan(n u32, st u32) &Channel { +	$if gcboehm_opt ? { +		wsem := if n > 0 { n } else { 1 } +		rsem := if n > 0 { u32(0) } else { 1 } +		rbuf := if n > 0 { unsafe { malloc_noscan(int(n * st)) } } else { &byte(0) } +		sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &byte(0) } +		mut ch := Channel{ +			objsize: st +			cap: n +			write_free: n +			read_avail: 0 +			ringbuf: rbuf +			statusbuf: sbuf +			write_subscriber: 0 +			read_subscriber: 0 +		} +		ch.writesem.init(wsem) +		ch.readsem.init(rsem) +		ch.writesem_im.init(0) +		ch.readsem_im.init(0) +		return &ch +	} $else { +		return new_channel_st(n, st) +	} +} + +pub fn (ch &Channel) auto_str(typename string) string { +	return 'chan $typename{cap: $ch.cap, closed: $ch.closed}' +} + +pub fn (mut ch Channel) close() { +	open_val := u16(0) +	if !C.atomic_compare_exchange_strong_u16(&ch.closed, &open_val, 1) { +		return +	} +	mut nulladr := voidptr(0) +	for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, &nulladr, +		voidptr(-1)) { +		nulladr = voidptr(0) +	} +	ch.readsem_im.post() +	ch.readsem.post() +	mut null16 := u16(0) +	for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { +		null16 = u16(0) +	} +	if ch.read_subscriber != voidptr(0) { +		ch.read_subscriber.sem.post() +	} +	C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) +	null16 = u16(0) +	for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { +		null16 = u16(0) +	} +	if ch.write_subscriber != voidptr(0) { +		ch.write_subscriber.sem.post() +	} +	C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) +	ch.writesem.post() +	if ch.cap == 0 { +		C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, voidptr(0)) +	} +	ch.writesem_im.post() +} + +[inline] +pub fn (mut ch Channel) len() int { +	return int(C.atomic_load_u32(&ch.read_avail)) +} + +[inline] +pub fn (mut ch Channel) closed() bool { +	return C.atomic_load_u16(&ch.closed) != 0 +} + +[inline] +pub fn (mut ch Channel) push(src voidptr) { +	if ch.try_push_priv(src, false) == .closed { +		panic('push on closed channel') +	} +} + +[inline] +pub fn (mut ch Channel) try_push(src voidptr) ChanState { +	return ch.try_push_priv(src, true) +} + +fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState { +	if C.atomic_load_u16(&ch.closed) != 0 { +		return .closed +	} +	spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem } +	mut have_swapped := false +	for { +		mut got_sem := false +		mut wradr := C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) }) +		for wradr != C.NULL { +			if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) }, +				&wradr, voidptr(0)) +			{ +				// there is a reader waiting for us +				unsafe { C.memcpy(wradr, src, ch.objsize) } +				mut nulladr := voidptr(0) +				for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, +					&nulladr, wradr) { +					nulladr = voidptr(0) +				} +				ch.readsem_im.post() +				return .success +			} +		} +		if no_block && ch.cap == 0 { +			return .not_ready +		} +		// get token to read +		for _ in 0 .. spinloops_sem_ { +			if got_sem { +				break +			} +			got_sem = ch.writesem.try_wait() +		} +		if !got_sem { +			if no_block { +				return .not_ready +			} +			ch.writesem.wait() +		} +		if C.atomic_load_u16(&ch.closed) != 0 { +			ch.writesem.post() +			return .closed +		} +		if ch.cap == 0 { +			// try to advertise current object as readable +			mut read_in_progress := false +			C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, src) +			wradr = C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) }) +			if wradr != C.NULL { +				mut src2 := src +				if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) }, +					&src2, voidptr(0)) +				{ +					ch.writesem.post() +					continue +				} else { +					read_in_progress = true +				} +			} +			if !read_in_progress { +				mut null16 := u16(0) +				for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16, +					u16(1)) { +					null16 = u16(0) +				} +				if ch.read_subscriber != voidptr(0) { +					ch.read_subscriber.sem.post() +				} +				C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) +			} +			mut src2 := src +			for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ { +				if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, +					&src2, voidptr(0)) +				{ +					have_swapped = true +					read_in_progress = true +					break +				} +				src2 = src +			} +			mut got_im_sem := false +			for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ { +				got_im_sem = ch.writesem_im.try_wait() +				if got_im_sem { +					break +				} +			} +			for { +				if got_im_sem { +					got_im_sem = false +				} else { +					ch.writesem_im.wait() +				} +				if C.atomic_load_u16(&ch.closed) != 0 { +					if have_swapped +						|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) { +						ch.writesem.post() +						return .success +					} else { +						return .closed +					} +				} +				if have_swapped +					|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) { +					ch.writesem.post() +					break +				} else { +					// this semaphore was not for us - repost in +					ch.writesem_im.post() +					if src2 == voidptr(-1) { +						ch.readsem.post() +						return .closed +					} +					src2 = src +				} +			} +			return .success +		} else { +			// buffered channel +			mut space_in_queue := false +			mut wr_free := C.atomic_load_u32(&ch.write_free) +			for wr_free > 0 { +				space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free, +					wr_free - 1) +				if space_in_queue { +					break +				} +			} +			if space_in_queue { +				mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx) +				for { +					mut new_wr_idx := wr_idx + 1 +					for new_wr_idx >= ch.cap { +						new_wr_idx -= ch.cap +					} +					if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, +						new_wr_idx) +					{ +						break +					} +				} +				mut wr_ptr := ch.ringbuf +				mut status_adr := ch.statusbuf +				unsafe { +					wr_ptr += wr_idx * ch.objsize +					status_adr += wr_idx * sizeof(u16) +				} +				mut expected_status := u16(BufferElemStat.unused) +				for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, +					u16(BufferElemStat.writing)) { +					expected_status = u16(BufferElemStat.unused) +				} +				unsafe { +					C.memcpy(wr_ptr, src, ch.objsize) +				} +				C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written)) +				C.atomic_fetch_add_u32(&ch.read_avail, 1) +				ch.readsem.post() +				mut null16 := u16(0) +				for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { +					null16 = u16(0) +				} +				if ch.read_subscriber != voidptr(0) { +					ch.read_subscriber.sem.post() +				} +				C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) +				return .success +			} else { +				if no_block { +					return .not_ready +				} +				ch.writesem.post() +			} +		} +	} +	// we should not get here but the V compiler want's to see a return statement +	assert false +	return .success +} + +[inline] +pub fn (mut ch Channel) pop(dest voidptr) bool { +	return ch.try_pop_priv(dest, false) == .success +} + +[inline] +pub fn (mut ch Channel) try_pop(dest voidptr) ChanState { +	return ch.try_pop_priv(dest, true) +} + +fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { +	spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem } +	mut have_swapped := false +	mut write_in_progress := false +	for { +		mut got_sem := false +		if ch.cap == 0 { +			// unbuffered channel - first see if a `push()` has adversized +			mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) }) +			for rdadr != C.NULL { +				if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) }, +					&rdadr, voidptr(0)) +				{ +					// there is a writer waiting for us +					unsafe { C.memcpy(dest, rdadr, ch.objsize) } +					mut nulladr := voidptr(0) +					for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_read) }, +						&nulladr, rdadr) { +						nulladr = voidptr(0) +					} +					ch.writesem_im.post() +					return .success +				} +			} +			if no_block { +				if C.atomic_load_u16(&ch.closed) == 0 { +					return .not_ready +				} else { +					return .closed +				} +			} +		} +		// get token to read +		for _ in 0 .. spinloops_sem_ { +			if got_sem { +				break +			} +			got_sem = ch.readsem.try_wait() +		} +		if !got_sem { +			if no_block { +				if C.atomic_load_u16(&ch.closed) == 0 { +					return .not_ready +				} else { +					return .closed +				} +			} +			ch.readsem.wait() +		} +		if ch.cap > 0 { +			// try to get buffer token +			mut obj_in_queue := false +			mut rd_avail := C.atomic_load_u32(&ch.read_avail) +			for rd_avail > 0 { +				obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail, +					rd_avail - 1) +				if obj_in_queue { +					break +				} +			} +			if obj_in_queue { +				mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx) +				for { +					mut new_rd_idx := rd_idx + 1 +					for new_rd_idx >= ch.cap { +						new_rd_idx -= ch.cap +					} +					if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, +						new_rd_idx) +					{ +						break +					} +				} +				mut rd_ptr := ch.ringbuf +				mut status_adr := ch.statusbuf +				unsafe { +					rd_ptr += rd_idx * ch.objsize +					status_adr += rd_idx * sizeof(u16) +				} +				mut expected_status := u16(BufferElemStat.written) +				for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, +					u16(BufferElemStat.reading)) { +					expected_status = u16(BufferElemStat.written) +				} +				unsafe { +					C.memcpy(dest, rd_ptr, ch.objsize) +				} +				C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused)) +				C.atomic_fetch_add_u32(&ch.write_free, 1) +				ch.writesem.post() +				mut null16 := u16(0) +				for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { +					null16 = u16(0) +				} +				if ch.write_subscriber != voidptr(0) { +					ch.write_subscriber.sem.post() +				} +				C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) +				return .success +			} +		} +		// try to advertise `dest` as writable +		C.atomic_store_ptr(unsafe { &voidptr(&ch.write_adr) }, dest) +		if ch.cap == 0 { +			mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) }) +			if rdadr != C.NULL { +				mut dest2 := dest +				if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) }, +					&dest2, voidptr(0)) +				{ +					ch.readsem.post() +					continue +				} else { +					write_in_progress = true +				} +			} +		} +		if ch.cap == 0 && !write_in_progress { +			mut null16 := u16(0) +			for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { +				null16 = u16(0) +			} +			if ch.write_subscriber != voidptr(0) { +				ch.write_subscriber.sem.post() +			} +			C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) +		} +		mut dest2 := dest +		for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ { +			if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, +				&dest2, voidptr(0)) +			{ +				have_swapped = true +				break +			} else if dest2 == voidptr(-1) { +				ch.readsem.post() +				return .closed +			} +			dest2 = dest +		} +		mut got_im_sem := false +		for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ { +			got_im_sem = ch.readsem_im.try_wait() +			if got_im_sem { +				break +			} +		} +		for { +			if got_im_sem { +				got_im_sem = false +			} else { +				ch.readsem_im.wait() +			} +			if have_swapped +				|| C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, &dest2, voidptr(0)) { +				ch.readsem.post() +				break +			} else { +				// this semaphore was not for us - repost in +				ch.readsem_im.post() +				if dest2 == voidptr(-1) { +					ch.readsem.post() +					return .closed +				} +				dest2 = dest +			} +		} +		break +	} +	return .success +} + +// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`) +// object referenced by `objrefs[i]`. `timeout = time.infinite` means wait unlimited time. `timeout <= 0` means return +// immediately if no transaction can be performed without waiting. +// return value: the index of the channel on which a transaction has taken place +//               -1 if waiting for a transaction has exceeded timeout +//               -2 if all channels are closed + +pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int { +	assert channels.len == dir.len +	assert dir.len == objrefs.len +	mut subscr := []Subscription{len: channels.len} +	mut sem := unsafe { Semaphore{} } +	sem.init(0) +	for i, ch in channels { +		subscr[i].sem = unsafe { &sem } +		if dir[i] == .push { +			mut null16 := u16(0) +			for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { +				null16 = u16(0) +			} +			subscr[i].prev = unsafe { &ch.write_subscriber } +			unsafe { +				subscr[i].nxt = C.atomic_exchange_ptr(&voidptr(&ch.write_subscriber), +					&subscr[i]) +			} +			if voidptr(subscr[i].nxt) != voidptr(0) { +				subscr[i].nxt.prev = unsafe { &subscr[i].nxt } +			} +			C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) +		} else { +			mut null16 := u16(0) +			for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { +				null16 = u16(0) +			} +			subscr[i].prev = unsafe { &ch.read_subscriber } +			unsafe { +				subscr[i].nxt = C.atomic_exchange_ptr(&voidptr(&ch.read_subscriber), &subscr[i]) +			} +			if voidptr(subscr[i].nxt) != voidptr(0) { +				subscr[i].nxt.prev = unsafe { &subscr[i].nxt } +			} +			C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) +		} +	} +	stopwatch := if timeout == time.infinite || timeout <= 0 { +		time.StopWatch{} +	} else { +		time.new_stopwatch() +	} +	mut event_idx := -1 // negative index means `timed out` + +	outer: for { +		rnd := rand.u32_in_range(0, u32(channels.len)) +		mut num_closed := 0 +		for j, _ in channels { +			mut i := j + int(rnd) +			if i >= channels.len { +				i -= channels.len +			} +			if dir[i] == .push { +				stat := channels[i].try_push_priv(objrefs[i], true) +				if stat == .success { +					event_idx = i +					break outer +				} else if stat == .closed { +					num_closed++ +				} +			} else { +				stat := channels[i].try_pop_priv(objrefs[i], true) +				if stat == .success { +					event_idx = i +					break outer +				} else if stat == .closed { +					num_closed++ +				} +			} +		} +		if num_closed == channels.len { +			event_idx = -2 +			break outer +		} +		if timeout <= 0 { +			break outer +		} +		if timeout != time.infinite { +			remaining := timeout - stopwatch.elapsed() +			if !sem.timed_wait(remaining) { +				break outer +			} +		} else { +			sem.wait() +		} +	} +	// reset subscribers +	for i, ch in channels { +		if dir[i] == .push { +			mut null16 := u16(0) +			for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { +				null16 = u16(0) +			} +			unsafe { +				*subscr[i].prev = subscr[i].nxt +			} +			if subscr[i].nxt != 0 { +				subscr[i].nxt.prev = subscr[i].prev +				// just in case we have missed a semaphore during restore +				subscr[i].nxt.sem.post() +			} +			C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) +		} else { +			mut null16 := u16(0) +			for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { +				null16 = u16(0) +			} +			unsafe { +				*subscr[i].prev = subscr[i].nxt +			} +			if subscr[i].nxt != 0 { +				subscr[i].nxt.prev = subscr[i].prev +				subscr[i].nxt.sem.post() +			} +			C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) +		} +	} +	sem.destroy() +	return event_idx +} diff --git a/v_windows/v/old/vlib/sync/pool/README.md b/v_windows/v/old/vlib/sync/pool/README.md new file mode 100644 index 0000000..bdea5b3 --- /dev/null +++ b/v_windows/v/old/vlib/sync/pool/README.md @@ -0,0 +1,36 @@ + +The `sync.pool` module provides a convenient way to run identical tasks over +an array of items *in parallel*, without worrying about thread synchronization, +waitgroups, mutexes etc.., you just need to supply a callback function, that +will be called once per each item in your input array. + +After all the work is done in parallel by the worker threads in the pool, +pool.work_on_items will return. You can then call pool.get_results<Result>() +to retrieve a list of all the results, that the worker callbacks returned +for each input item. Example: + +```v +import sync.pool + +struct SResult { +	s string +} + +fn sprocess(pp &pool.PoolProcessor, idx int, wid int) &SResult { +	item := pp.get_item<string>(idx) +	println('idx: $idx, wid: $wid, item: ' + item) +	return &SResult{item.reverse()} +} + +fn main() { +	mut pp := pool.new_pool_processor(callback: sprocess) +	pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc']) +	// optionally, you can iterate over the results too: +	for x in pp.get_results<SResult>() { +		println('result: $x.s') +	} +} +``` + +See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a +more detailed usage example. diff --git a/v_windows/v/old/vlib/sync/pool/pool.v b/v_windows/v/old/vlib/sync/pool/pool.v new file mode 100644 index 0000000..b2c5340 --- /dev/null +++ b/v_windows/v/old/vlib/sync/pool/pool.v @@ -0,0 +1,165 @@ +module pool + +import sync +import runtime + +[trusted] +fn C.atomic_fetch_add_u32(voidptr, u32) u32 + +pub const ( +	no_result = voidptr(0) +) + +pub struct PoolProcessor { +	thread_cb voidptr +mut: +	njobs           int +	items           []voidptr +	results         []voidptr +	ntask           u32 // reading/writing to this should be atomic +	waitgroup       sync.WaitGroup +	shared_context  voidptr +	thread_contexts []voidptr +} + +pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr + +pub struct PoolProcessorConfig { +	maxjobs  int +	callback ThreadCB +} + +// new_pool_processor returns a new PoolProcessor instance. +// The parameters of new_pool_processor are: +//    context.maxjobs: when 0 (the default), the PoolProcessor will use a +//      number of threads, that is optimal for your system to process your items. +//    context.callback: this should be a callback function, that each worker +//      thread in the pool will run for each item. +//      The callback function will receive as parameters: +//      1) the PoolProcessor instance, so it can call +//            p.get_item<int>(idx) to get the actual item at index idx +//      2) idx - the index of the currently processed item +//      3) task_id - the index of the worker thread in which the callback +//            function is running. +pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor { +	if isnil(context.callback) { +		panic('You need to pass a valid callback to new_pool_processor.') +	} +	mut pool := PoolProcessor{ +		items: [] +		results: [] +		shared_context: voidptr(0) +		thread_contexts: [] +		njobs: context.maxjobs +		ntask: 0 +		thread_cb: voidptr(context.callback) +	} +	pool.waitgroup.init() +	return &pool +} + +// set_max_jobs gives you the ability to override the number +// of jobs *after* the PoolProcessor had been created already. +pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) { +	pool.njobs = njobs +} + +// work_on_items receives a list of items of type T, +// then starts a work pool of pool.njobs threads, each running +// pool.thread_cb in a loop, untill all items in the list, +// are processed. +// When pool.njobs is 0, the number of jobs is determined +// by the number of available cores on the system. +// work_on_items returns *after* all threads finish. +// You can optionally call get_results after that. +pub fn (mut pool PoolProcessor) work_on_items<T>(items []T) { +	pool.work_on_pointers(unsafe { items.pointers() }) +} + +pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) { +	mut njobs := runtime.nr_jobs() +	if pool.njobs > 0 { +		njobs = pool.njobs +	} +	pool.items = [] +	pool.results = [] +	pool.thread_contexts = [] +	pool.items << items +	pool.results = []voidptr{len: (pool.items.len)} +	pool.thread_contexts << []voidptr{len: (pool.items.len)} +	pool.waitgroup.add(njobs) +	for i := 0; i < njobs; i++ { +		if njobs > 1 { +			go process_in_thread(mut pool, i) +		} else { +			// do not run concurrently, just use the same thread: +			process_in_thread(mut pool, i) +		} +	} +	pool.waitgroup.wait() +} + +// process_in_thread does the actual work of worker thread. +// It is a workaround for the current inability to pass a +// method in a callback. +fn process_in_thread(mut pool PoolProcessor, task_id int) { +	cb := ThreadCB(pool.thread_cb) +	ilen := pool.items.len +	for { +		idx := int(C.atomic_fetch_add_u32(&pool.ntask, 1)) +		if idx >= ilen { +			break +		} +		pool.results[idx] = cb(pool, idx, task_id) +	} +	pool.waitgroup.done() +} + +// get_item - called by the worker callback. +// Retrieves a type safe instance of the currently processed item +pub fn (pool &PoolProcessor) get_item<T>(idx int) T { +	return *(&T(pool.items[idx])) +} + +// get_result - called by the main thread to get a specific result. +// Retrieves a type safe instance of the produced result. +pub fn (pool &PoolProcessor) get_result<T>(idx int) T { +	return *(&T(pool.results[idx])) +} + +// get_results - get a list of type safe results in the main thread. +pub fn (pool &PoolProcessor) get_results<T>() []T { +	mut res := []T{} +	for i in 0 .. pool.results.len { +		res << *(&T(pool.results[i])) +	} +	return res +} + +// set_shared_context - can be called during the setup so that you can +// provide a context that is shared between all worker threads, like +// common options/settings. +pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) { +	pool.shared_context = context +} + +// get_shared_context - can be called in each worker callback, to get +// the context set by pool.set_shared_context +pub fn (pool &PoolProcessor) get_shared_context() voidptr { +	return pool.shared_context +} + +// set_thread_context - can be called during the setup at the start of +// each worker callback, so that the worker callback can have some thread +// local storage area where it can write/read information that is private +// to the given thread, without worrying that it will get overwritten by +// another thread +pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) { +	pool.thread_contexts[idx] = context +} + +// get_thread_context - returns a pointer, that was set with +// pool.set_thread_context . This pointer is private to each thread. +pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr { +	return pool.thread_contexts[idx] +} diff --git a/v_windows/v/old/vlib/sync/pool/pool_test.v b/v_windows/v/old/vlib/sync/pool/pool_test.v new file mode 100644 index 0000000..629b524 --- /dev/null +++ b/v_windows/v/old/vlib/sync/pool/pool_test.v @@ -0,0 +1,52 @@ +import time +import sync.pool + +pub struct SResult { +	s string +} + +pub struct IResult { +	i int +} + +fn worker_s(p &pool.PoolProcessor, idx int, worker_id int) &SResult { +	item := p.get_item<string>(idx) +	println('worker_s worker_id: $worker_id | idx: $idx | item: $item') +	time.sleep(3 * time.millisecond) +	return &SResult{'$item $item'} +} + +fn worker_i(p &pool.PoolProcessor, idx int, worker_id int) &IResult { +	item := p.get_item<int>(idx) +	println('worker_i worker_id: $worker_id | idx: $idx | item: $item') +	time.sleep(5 * time.millisecond) +	return &IResult{item * 1000} +} + +fn test_work_on_strings() { +	mut pool_s := pool.new_pool_processor( +		callback: worker_s +		maxjobs: 8 +	) + +	pool_s.work_on_items(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']) +	for x in pool_s.get_results<SResult>() { +		println(x.s) +		assert x.s.len > 1 +	} +} + +fn test_work_on_ints() { +	// NB: since maxjobs is left empty here, +	// the pool processor will use njobs = runtime.nr_jobs so that +	// it will work optimally without overloading the system +	mut pool_i := pool.new_pool_processor( +		callback: worker_i +	) + +	pool_i.work_on_items([1, 2, 3, 4, 5, 6, 7, 8]) +	for x in pool_i.get_results<IResult>() { +		println(x.i) +		assert x.i > 100 +	} +} diff --git a/v_windows/v/old/vlib/sync/select_close_test.v b/v_windows/v/old/vlib/sync/select_close_test.v new file mode 100644 index 0000000..3a40e2d --- /dev/null +++ b/v_windows/v/old/vlib/sync/select_close_test.v @@ -0,0 +1,92 @@ +module sync + +import time + +fn do_rec_i64(mut ch Channel) { +	mut sum := i64(0) +	for i in 0 .. 300 { +		if i == 200 { +			ch.close() +		} +		mut a := i64(0) +		if ch.pop(&a) { +			sum += a +		} +	} +	assert sum == 200 * (200 - 1) / 2 +} + +fn do_send_int(mut ch Channel) { +	for i in 0 .. 300 { +		ch.push(&i) +	} +	ch.close() +} + +fn do_send_byte(mut ch Channel) { +	for i in 0 .. 300 { +		ii := byte(i) +		ch.push(&ii) +	} +	ch.close() +} + +fn do_send_i64(mut ch Channel) { +	for i in 0 .. 300 { +		ii := i64(i) +		ch.push(&ii) +	} +	ch.close() +} + +fn test_select() { +	mut chi := new_channel<int>(0) +	mut chl := new_channel<i64>(1) +	mut chb := new_channel<byte>(10) +	mut recch := new_channel<i64>(0) +	go do_rec_i64(mut recch) +	go do_send_int(mut chi) +	go do_send_byte(mut chb) +	go do_send_i64(mut chl) +	mut channels := [chi, recch, chl, chb] +	directions := [Direction.pop, .push, .pop, .pop] +	mut sum := i64(0) +	mut rl := i64(0) +	mut ri := int(0) +	mut rb := byte(0) +	mut sl := i64(0) +	mut objs := [voidptr(&ri), &sl, &rl, &rb] +	for j in 0 .. 1101 { +		idx := channel_select(mut channels, directions, mut objs, time.infinite) +		match idx { +			0 { +				sum += ri +			} +			1 { +				sl++ +			} +			2 { +				sum += rl +			} +			3 { +				sum += rb +			} +			-2 { +				// channel was closed - last item +				assert j == 1100 +			} +			else { +				println('got $idx (timeout)') +				assert false +			} +		} +		if j == 1100 { +			// check also in other direction +			assert idx == -2 +		} +	} +	// Use Gauß' formula for the first 2 contributions +	// the 3rd contribution is `byte` and must be seen modulo 256 +	expected_sum := 2 * (300 * (300 - 1) / 2) + 256 * (256 - 1) / 2 + 44 * (44 - 1) / 2 +	assert sum == expected_sum +} diff --git a/v_windows/v/old/vlib/sync/struct_chan_init_test.v b/v_windows/v/old/vlib/sync/struct_chan_init_test.v new file mode 100644 index 0000000..a51ea4b --- /dev/null +++ b/v_windows/v/old/vlib/sync/struct_chan_init_test.v @@ -0,0 +1,14 @@ +struct Abc { +	ch chan int +} + +fn f(st Abc) { +	st.ch <- 47 +} + +fn test_chan_init() { +	st := Abc{} +	go f(st) +	i := <-st.ch +	assert i == 47 +} diff --git a/v_windows/v/old/vlib/sync/sync_default.c.v b/v_windows/v/old/vlib/sync/sync_default.c.v new file mode 100644 index 0000000..76f0b44 --- /dev/null +++ b/v_windows/v/old/vlib/sync/sync_default.c.v @@ -0,0 +1,193 @@ +// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +import time + +// There's no additional linking (-lpthread) needed for Android. +// See https://stackoverflow.com/a/31277163/1904615 +$if !android { +	#flag -lpthread +} + +#include <semaphore.h> + +[trusted] +fn C.pthread_mutex_init(voidptr, voidptr) int +fn C.pthread_mutex_lock(voidptr) int +fn C.pthread_mutex_unlock(voidptr) int +fn C.pthread_mutex_destroy(voidptr) int +fn C.pthread_rwlockattr_init(voidptr) int +fn C.pthread_rwlockattr_setkind_np(voidptr, int) int +fn C.pthread_rwlockattr_setpshared(voidptr, int) int +fn C.pthread_rwlock_init(voidptr, voidptr) int +fn C.pthread_rwlock_rdlock(voidptr) int +fn C.pthread_rwlock_wrlock(voidptr) int +fn C.pthread_rwlock_unlock(voidptr) int +fn C.sem_init(voidptr, int, u32) int +fn C.sem_post(voidptr) int +fn C.sem_wait(voidptr) int +fn C.sem_trywait(voidptr) int +fn C.sem_timedwait(voidptr, voidptr) int +fn C.sem_destroy(voidptr) int + +// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. +[heap] +pub struct Mutex { +	mutex C.pthread_mutex_t +} + +[heap] +pub struct RwMutex { +	mutex C.pthread_rwlock_t +} + +struct RwMutexAttr { +	attr C.pthread_rwlockattr_t +} + +[heap] +struct Semaphore { +	sem C.sem_t +} + +pub fn new_mutex() &Mutex { +	mut m := &Mutex{} +	m.init() +	return m +} + +pub fn (mut m Mutex) init() { +	C.pthread_mutex_init(&m.mutex, C.NULL) +} + +pub fn new_rwmutex() &RwMutex { +	mut m := &RwMutex{} +	m.init() +	return m +} + +pub fn (mut m RwMutex) init() { +	a := RwMutexAttr{} +	C.pthread_rwlockattr_init(&a.attr) +	// Give writer priority over readers +	C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) +	C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) +	C.pthread_rwlock_init(&m.mutex, &a.attr) +} + +// @lock(), for *manual* mutex handling, since `lock` is a keyword +pub fn (mut m Mutex) @lock() { +	C.pthread_mutex_lock(&m.mutex) +} + +pub fn (mut m Mutex) unlock() { +	C.pthread_mutex_unlock(&m.mutex) +} + +// RwMutex has separate read- and write locks +pub fn (mut m RwMutex) @rlock() { +	C.pthread_rwlock_rdlock(&m.mutex) +} + +pub fn (mut m RwMutex) @lock() { +	C.pthread_rwlock_wrlock(&m.mutex) +} + +// Windows SRWLocks have different function to unlock +// So provide two functions here, too, to have a common interface +pub fn (mut m RwMutex) runlock() { +	C.pthread_rwlock_unlock(&m.mutex) +} + +pub fn (mut m RwMutex) unlock() { +	C.pthread_rwlock_unlock(&m.mutex) +} + +[inline] +pub fn new_semaphore() &Semaphore { +	return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) &Semaphore { +	mut sem := &Semaphore{} +	sem.init(n) +	return sem +} + +pub fn (mut sem Semaphore) init(n u32) { +	C.sem_init(&sem.sem, 0, n) +} + +pub fn (mut sem Semaphore) post() { +	C.sem_post(&sem.sem) +} + +pub fn (mut sem Semaphore) wait() { +	for { +		if C.sem_wait(&sem.sem) == 0 { +			return +		} +		e := C.errno +		match e { +			C.EINTR { +				continue // interrupted by signal +			} +			else { +				panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) }) +			} +		} +	} +} + +// `try_wait()` should return as fast as possible so error handling is only +// done when debugging +pub fn (mut sem Semaphore) try_wait() bool { +	$if !debug { +		return C.sem_trywait(&sem.sem) == 0 +	} $else { +		if C.sem_trywait(&sem.sem) != 0 { +			e := C.errno +			match e { +				C.EAGAIN { +					return false +				} +				else { +					panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) }) +				} +			} +		} +		return true +	} +} + +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { +	t_spec := timeout.timespec() +	for { +		if C.sem_timedwait(&sem.sem, &t_spec) == 0 { +			return true +		} +		e := C.errno +		match e { +			C.EINTR { +				continue // interrupted by signal +			} +			C.ETIMEDOUT { +				break +			} +			else { +				panic(unsafe { tos_clone(&byte(C.strerror(e))) }) +			} +		} +	} +	return false +} + +pub fn (sem Semaphore) destroy() { +	res := C.sem_destroy(&sem.sem) +	if res == 0 { +		return +	} +	panic(unsafe { tos_clone(&byte(C.strerror(res))) }) +} diff --git a/v_windows/v/old/vlib/sync/sync_macos.c.v b/v_windows/v/old/vlib/sync/sync_macos.c.v new file mode 100644 index 0000000..3f83198 --- /dev/null +++ b/v_windows/v/old/vlib/sync/sync_macos.c.v @@ -0,0 +1,232 @@ +// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +import time + +#flag -lpthread +#include <semaphore.h> +#include <sys/errno.h> + +[trusted] +fn C.pthread_mutex_init(voidptr, voidptr) int +fn C.pthread_mutex_lock(voidptr) int +fn C.pthread_mutex_unlock(voidptr) int +fn C.pthread_mutex_destroy(voidptr) int +fn C.pthread_rwlockattr_init(voidptr) int +fn C.pthread_rwlockattr_setkind_np(voidptr, int) int +fn C.pthread_rwlockattr_setpshared(voidptr, int) int +fn C.pthread_rwlock_init(voidptr, voidptr) int +fn C.pthread_rwlock_rdlock(voidptr) int +fn C.pthread_rwlock_wrlock(voidptr) int +fn C.pthread_rwlock_unlock(voidptr) int +fn C.pthread_condattr_init(voidptr) int +fn C.pthread_condattr_setpshared(voidptr, int) int +fn C.pthread_condattr_destroy(voidptr) int +fn C.pthread_cond_init(voidptr, voidptr) int +fn C.pthread_cond_signal(voidptr) int +fn C.pthread_cond_wait(voidptr, voidptr) int +fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int +fn C.pthread_cond_destroy(voidptr) int + +// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. +[heap] +pub struct Mutex { +	mutex C.pthread_mutex_t +} + +[heap] +pub struct RwMutex { +	mutex C.pthread_rwlock_t +} + +struct RwMutexAttr { +	attr C.pthread_rwlockattr_t +} + +struct CondAttr { +	attr C.pthread_condattr_t +} + +/* +MacOSX has no unnamed semaphores and no `timed_wait()` at all +   so we emulate the behaviour with other devices +*/ +[heap] +struct Semaphore { +	mtx  C.pthread_mutex_t +	cond C.pthread_cond_t +mut: +	count u32 +} + +pub fn new_mutex() &Mutex { +	mut m := &Mutex{} +	m.init() +	return m +} + +pub fn (mut m Mutex) init() { +	C.pthread_mutex_init(&m.mutex, C.NULL) +} + +pub fn new_rwmutex() &RwMutex { +	mut m := &RwMutex{} +	m.init() +	return m +} + +pub fn (mut m RwMutex) init() { +	a := RwMutexAttr{} +	C.pthread_rwlockattr_init(&a.attr) +	// Give writer priority over readers +	C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) +	C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) +	C.pthread_rwlock_init(&m.mutex, &a.attr) +} + +// @lock(), for *manual* mutex handling, since `lock` is a keyword +pub fn (mut m Mutex) @lock() { +	C.pthread_mutex_lock(&m.mutex) +} + +pub fn (mut m Mutex) unlock() { +	C.pthread_mutex_unlock(&m.mutex) +} + +// RwMutex has separate read- and write locks +pub fn (mut m RwMutex) @rlock() { +	C.pthread_rwlock_rdlock(&m.mutex) +} + +pub fn (mut m RwMutex) @lock() { +	C.pthread_rwlock_wrlock(&m.mutex) +} + +// Windows SRWLocks have different function to unlock +// So provide two functions here, too, to have a common interface +pub fn (mut m RwMutex) runlock() { +	C.pthread_rwlock_unlock(&m.mutex) +} + +pub fn (mut m RwMutex) unlock() { +	C.pthread_rwlock_unlock(&m.mutex) +} + +[inline] +pub fn new_semaphore() &Semaphore { +	return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) &Semaphore { +	mut sem := &Semaphore{} +	sem.init(n) +	return sem +} + +pub fn (mut sem Semaphore) init(n u32) { +	C.atomic_store_u32(&sem.count, n) +	C.pthread_mutex_init(&sem.mtx, C.NULL) +	attr := CondAttr{} +	C.pthread_condattr_init(&attr.attr) +	C.pthread_condattr_setpshared(&attr.attr, C.PTHREAD_PROCESS_PRIVATE) +	C.pthread_cond_init(&sem.cond, &attr.attr) +	C.pthread_condattr_destroy(&attr.attr) +} + +pub fn (mut sem Semaphore) post() { +	mut c := C.atomic_load_u32(&sem.count) +	for c > 1 { +		if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) { +			return +		} +	} +	C.pthread_mutex_lock(&sem.mtx) +	c = C.atomic_fetch_add_u32(&sem.count, 1) +	if c == 0 { +		C.pthread_cond_signal(&sem.cond) +	} +	C.pthread_mutex_unlock(&sem.mtx) +} + +pub fn (mut sem Semaphore) wait() { +	mut c := C.atomic_load_u32(&sem.count) +	for c > 0 { +		if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +			return +		} +	} +	C.pthread_mutex_lock(&sem.mtx) +	c = C.atomic_load_u32(&sem.count) + +	outer: for { +		if c == 0 { +			C.pthread_cond_wait(&sem.cond, &sem.mtx) +			c = C.atomic_load_u32(&sem.count) +		} +		for c > 0 { +			if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +				if c > 1 { +					C.pthread_cond_signal(&sem.cond) +				} +				break outer +			} +		} +	} +	C.pthread_mutex_unlock(&sem.mtx) +} + +pub fn (mut sem Semaphore) try_wait() bool { +	mut c := C.atomic_load_u32(&sem.count) +	for c > 0 { +		if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +			return true +		} +	} +	return false +} + +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { +	mut c := C.atomic_load_u32(&sem.count) +	for c > 0 { +		if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +			return true +		} +	} +	C.pthread_mutex_lock(&sem.mtx) +	t_spec := timeout.timespec() +	mut res := 0 +	c = C.atomic_load_u32(&sem.count) + +	outer: for { +		if c == 0 { +			res = C.pthread_cond_timedwait(&sem.cond, &sem.mtx, &t_spec) +			if res == C.ETIMEDOUT { +				break outer +			} +			c = C.atomic_load_u32(&sem.count) +		} +		for c > 0 { +			if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +				if c > 1 { +					C.pthread_cond_signal(&sem.cond) +				} +				break outer +			} +		} +	} +	C.pthread_mutex_unlock(&sem.mtx) +	return res == 0 +} + +pub fn (mut sem Semaphore) destroy() { +	mut res := C.pthread_cond_destroy(&sem.cond) +	if res == 0 { +		res = C.pthread_mutex_destroy(&sem.mtx) +		if res == 0 { +			return +		} +	} +	panic(unsafe { tos_clone(&byte(C.strerror(res))) }) +} diff --git a/v_windows/v/old/vlib/sync/sync_windows.c.v b/v_windows/v/old/vlib/sync/sync_windows.c.v new file mode 100644 index 0000000..d0942b7 --- /dev/null +++ b/v_windows/v/old/vlib/sync/sync_windows.c.v @@ -0,0 +1,212 @@ +// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +import time + +#include <synchapi.h> +#include <time.h> + +fn C.GetSystemTimeAsFileTime(lpSystemTimeAsFileTime &C._FILETIME) +fn C.InitializeConditionVariable(voidptr) +fn C.WakeConditionVariable(voidptr) +fn C.SleepConditionVariableSRW(voidptr, voidptr, u32, u32) int + +// TODO: The suggestion of using CriticalSection instead of mutex +// was discussed. Needs consideration. + +// Mutex HANDLE +type MHANDLE = voidptr + +// Semaphore HANDLE +type SHANDLE = voidptr + +//[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. + +// `SRWLOCK` is much more performant that `Mutex` on Windows, so use that in both cases since we don't want to share with other processes +[heap] +pub struct Mutex { +mut: +	mx C.SRWLOCK // mutex handle +} + +[heap] +pub struct RwMutex { +mut: +	mx C.SRWLOCK // mutex handle +} + +[heap] +struct Semaphore { +	mtx  C.SRWLOCK +	cond C.CONDITION_VARIABLE +mut: +	count u32 +} + +pub fn new_mutex() &Mutex { +	mut m := &Mutex{} +	m.init() +	return m +} + +pub fn new_rwmutex() &RwMutex { +	mut m := &RwMutex{} +	m.init() +	return m +} + +pub fn (mut m Mutex) init() { +	C.InitializeSRWLock(&m.mx) +} + +pub fn (mut m RwMutex) init() { +	C.InitializeSRWLock(&m.mx) +} + +pub fn (mut m Mutex) @lock() { +	C.AcquireSRWLockExclusive(&m.mx) +} + +pub fn (mut m Mutex) unlock() { +	C.ReleaseSRWLockExclusive(&m.mx) +} + +// RwMutex has separate read- and write locks +pub fn (mut m RwMutex) @rlock() { +	C.AcquireSRWLockShared(&m.mx) +} + +pub fn (mut m RwMutex) @lock() { +	C.AcquireSRWLockExclusive(&m.mx) +} + +// Windows SRWLocks have different function to unlock +// So provide two functions here, too, to have a common interface +pub fn (mut m RwMutex) runlock() { +	C.ReleaseSRWLockShared(&m.mx) +} + +pub fn (mut m RwMutex) unlock() { +	C.ReleaseSRWLockExclusive(&m.mx) +} + +pub fn (mut m Mutex) destroy() { +	// nothing to do +} + +[inline] +pub fn new_semaphore() &Semaphore { +	return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) &Semaphore { +	mut sem := &Semaphore{} +	sem.init(n) +	return sem +} + +pub fn (mut sem Semaphore) init(n u32) { +	C.atomic_store_u32(&sem.count, n) +	C.InitializeSRWLock(&sem.mtx) +	C.InitializeConditionVariable(&sem.cond) +} + +pub fn (mut sem Semaphore) post() { +	mut c := C.atomic_load_u32(&sem.count) +	for c > 1 { +		if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) { +			return +		} +	} +	C.AcquireSRWLockExclusive(&sem.mtx) +	c = C.atomic_fetch_add_u32(&sem.count, 1) +	if c == 0 { +		C.WakeConditionVariable(&sem.cond) +	} +	C.ReleaseSRWLockExclusive(&sem.mtx) +} + +pub fn (mut sem Semaphore) wait() { +	mut c := C.atomic_load_u32(&sem.count) +	for c > 0 { +		if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +			return +		} +	} +	C.AcquireSRWLockExclusive(&sem.mtx) +	c = C.atomic_load_u32(&sem.count) + +	outer: for { +		if c == 0 { +			C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, C.INFINITE, 0) +			c = C.atomic_load_u32(&sem.count) +		} +		for c > 0 { +			if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +				if c > 1 { +					C.WakeConditionVariable(&sem.cond) +				} +				break outer +			} +		} +	} +	C.ReleaseSRWLockExclusive(&sem.mtx) +} + +pub fn (mut sem Semaphore) try_wait() bool { +	mut c := C.atomic_load_u32(&sem.count) +	for c > 0 { +		if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +			return true +		} +	} +	return false +} + +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { +	mut c := C.atomic_load_u32(&sem.count) +	for c > 0 { +		if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +			return true +		} +	} +	mut ft_start := C._FILETIME{} +	C.GetSystemTimeAsFileTime(&ft_start) +	time_end := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) + +		u64(timeout / (100 * time.nanosecond)) +	mut t_ms := timeout.sys_milliseconds() +	C.AcquireSRWLockExclusive(&sem.mtx) +	mut res := 0 +	c = C.atomic_load_u32(&sem.count) + +	outer: for { +		if c == 0 { +			res = C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, t_ms, 0) +			if res == 0 { +				break outer +			} +			c = C.atomic_load_u32(&sem.count) +		} +		for c > 0 { +			if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { +				if c > 1 { +					C.WakeConditionVariable(&sem.cond) +				} +				break outer +			} +		} +		C.GetSystemTimeAsFileTime(&ft_start) +		time_now := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) // in 100ns +		if time_now > time_end { +			break outer // timeout exceeded +		} +		t_ms = u32((time_end - time_now) / 10000) +	} +	C.ReleaseSRWLockExclusive(&sem.mtx) +	return res != 0 +} + +pub fn (s Semaphore) destroy() { +} diff --git a/v_windows/v/old/vlib/sync/threads/threads.c.v b/v_windows/v/old/vlib/sync/threads/threads.c.v new file mode 100644 index 0000000..02506c2 --- /dev/null +++ b/v_windows/v/old/vlib/sync/threads/threads.c.v @@ -0,0 +1,13 @@ +module threads + +// This module adds the necessary compiler flags for using threads. +// It is automatically imported by code that does `go func()` . +// See vlib/v/parser/pratt.v, search for ast.GoExpr . +// The goal is that programs, that do not use threads at all will not need +// to link to -lpthread etc. +// NB: on some platforms like Android, linking -lpthread is not needed too. +// See https://stackoverflow.com/a/31277163/1904615 + +$if !windows && !android { +	#flag -lpthread +} diff --git a/v_windows/v/old/vlib/sync/threads/threads.v b/v_windows/v/old/vlib/sync/threads/threads.v new file mode 100644 index 0000000..f20fc0e --- /dev/null +++ b/v_windows/v/old/vlib/sync/threads/threads.v @@ -0,0 +1,4 @@ +module threads + +// This file is just a placeholder. +// The actual implementation is backend/platform specific, so see threads.c.v, threads.js.v etc. diff --git a/v_windows/v/old/vlib/sync/waitgroup.v b/v_windows/v/old/vlib/sync/waitgroup.v new file mode 100644 index 0000000..3e9ac39 --- /dev/null +++ b/v_windows/v/old/vlib/sync/waitgroup.v @@ -0,0 +1,84 @@ +// Copyright (c) 2019 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +[trusted] +fn C.atomic_fetch_add_u32(voidptr, u32) u32 + +[trusted] +fn C.atomic_load_u32(voidptr) u32 + +[trusted] +fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool + +// WaitGroup +// Do not copy an instance of WaitGroup, use a ref instead. +// +// usage: in main thread: +// `wg := sync.new_waitgroup() +// `wg.add(nr_jobs)` before starting jobs with `go ...` +// `wg.wait()` to wait for all jobs to have finished +// +// in each parallel job: +// `wg.done()` when finished +// +// [init_with=new_waitgroup] // TODO: implement support for init_with struct attribute, and disallow WaitGroup{} from outside the sync.new_waitgroup() function. +[heap] +struct WaitGroup { +mut: +	task_count u32       // current task count - reading/writing should be atomic +	wait_count u32       // current wait count - reading/writing should be atomic +	sem        Semaphore // This blocks wait() until tast_countreleased by add() +} + +pub fn new_waitgroup() &WaitGroup { +	mut wg := WaitGroup{} +	wg.init() +	return &wg +} + +pub fn (mut wg WaitGroup) init() { +	wg.sem.init(0) +} + +// add increments (+ve delta) or decrements (-ve delta) task count by delta +// and unblocks any wait() calls if task count becomes zero. +// add panics if task count drops below zero. +pub fn (mut wg WaitGroup) add(delta int) { +	old_nrjobs := int(C.atomic_fetch_add_u32(&wg.task_count, u32(delta))) +	new_nrjobs := old_nrjobs + delta +	mut num_waiters := C.atomic_load_u32(&wg.wait_count) +	if new_nrjobs < 0 { +		panic('Negative number of jobs in waitgroup') +	} + +	if new_nrjobs == 0 && num_waiters > 0 { +		// clear waiters +		for !C.atomic_compare_exchange_weak_u32(&wg.wait_count, &num_waiters, 0) { +			if num_waiters == 0 { +				return +			} +		} +		for (num_waiters > 0) { +			wg.sem.post() +			num_waiters-- +		} +	} +} + +// done is a convenience fn for add(-1) +pub fn (mut wg WaitGroup) done() { +	wg.add(-1) +} + +// wait blocks until all tasks are done (task count becomes zero) +pub fn (mut wg WaitGroup) wait() { +	nrjobs := int(C.atomic_load_u32(&wg.task_count)) +	if nrjobs == 0 { +		// no need to wait +		return +	} +	C.atomic_fetch_add_u32(&wg.wait_count, 1) +	wg.sem.wait() // blocks until task_count becomes 0 +} diff --git a/v_windows/v/old/vlib/sync/waitgroup_test.v b/v_windows/v/old/vlib/sync/waitgroup_test.v new file mode 100644 index 0000000..493665f --- /dev/null +++ b/v_windows/v/old/vlib/sync/waitgroup_test.v @@ -0,0 +1,41 @@ +module sync + +import time + +fn test_waitgroup_reuse() { +	mut wg := new_waitgroup() + +	wg.add(1) +	wg.done() + +	wg.add(1) +	mut executed := false +	go fn (mut wg WaitGroup, executed voidptr) { +		defer { +			wg.done() +		} +		unsafe { +			*(&bool(executed)) = true +		} +		time.sleep(100 * time.millisecond) +		assert wg.wait_count == 1 +	}(mut wg, voidptr(&executed)) + +	wg.wait() +	assert executed +	assert wg.wait_count == 0 +} + +fn test_waitgroup_no_use() { +	mut done := false +	go fn (done voidptr) { +		time.sleep(1 * time.second) +		if *(&bool(done)) == false { +			panic('test_waitgroup_no_use did not complete in time') +		} +	}(voidptr(&done)) + +	mut wg := new_waitgroup() +	wg.wait() +	done = true +} | 
