diff options
Diffstat (limited to 'v_windows/v/old/vlib/sync/channels.v')
-rw-r--r-- | v_windows/v/old/vlib/sync/channels.v | 730 |
1 files changed, 730 insertions, 0 deletions
diff --git a/v_windows/v/old/vlib/sync/channels.v b/v_windows/v/old/vlib/sync/channels.v new file mode 100644 index 0000000..49f1396 --- /dev/null +++ b/v_windows/v/old/vlib/sync/channels.v @@ -0,0 +1,730 @@ +module sync + +import time +import rand + +$if windows { + #flag -I @VEXEROOT/thirdparty/stdatomic/win +} $else { + #flag -I @VEXEROOT/thirdparty/stdatomic/nix +} + +$if linux { + $if tinyc { + $if amd64 { + // most Linux distributions have /usr/lib/libatomic.so, but Ubuntu uses gcc version specific dir + #flag -L/usr/lib/gcc/x86_64-linux-gnu/6 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/7 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/8 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/9 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/10 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/11 + #flag -L/usr/lib/gcc/x86_64-linux-gnu/12 + } $else $if arm64 { + #flag -L/usr/lib/gcc/aarch64-linux-gnu/6 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/7 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/8 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/9 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/10 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/11 + #flag -L/usr/lib/gcc/aarch64-linux-gnu/12 + } + #flag -latomic + } +} + +#include <atomic.h> +// The following functions are actually generic in C +fn C.atomic_load_ptr(voidptr) voidptr +fn C.atomic_store_ptr(voidptr, voidptr) +fn C.atomic_compare_exchange_weak_ptr(voidptr, voidptr, voidptr) bool +fn C.atomic_compare_exchange_strong_ptr(voidptr, voidptr, voidptr) bool +fn C.atomic_exchange_ptr(voidptr, voidptr) voidptr +fn C.atomic_fetch_add_ptr(voidptr, voidptr) voidptr +fn C.atomic_fetch_sub_ptr(voidptr, voidptr) voidptr + +fn C.atomic_load_u16(voidptr) u16 +fn C.atomic_store_u16(voidptr, u16) +fn C.atomic_compare_exchange_weak_u16(voidptr, voidptr, u16) bool +fn C.atomic_compare_exchange_strong_u16(voidptr, voidptr, u16) bool +fn C.atomic_exchange_u16(voidptr, u16) u16 +fn C.atomic_fetch_add_u16(voidptr, u16) u16 +fn C.atomic_fetch_sub_u16(voidptr, u16) u16 + +fn C.atomic_load_u32(voidptr) u32 +fn C.atomic_store_u32(voidptr, u32) +fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool +fn C.atomic_compare_exchange_strong_u32(voidptr, voidptr, u32) bool +fn C.atomic_exchange_u32(voidptr, u32) u32 +fn C.atomic_fetch_add_u32(voidptr, u32) u32 +fn C.atomic_fetch_sub_u32(voidptr, u32) u32 + +fn C.atomic_load_u64(voidptr) u64 +fn C.atomic_store_u64(voidptr, u64) +fn C.atomic_compare_exchange_weak_u64(voidptr, voidptr, u64) bool +fn C.atomic_compare_exchange_strong_u64(voidptr, voidptr, u64) bool +fn C.atomic_exchange_u64(voidptr, u64) u64 +fn C.atomic_fetch_add_u64(voidptr, u64) u64 +fn C.atomic_fetch_sub_u64(voidptr, u64) u64 + +const ( + // how often to try to get data without blocking before to wait for semaphore + spinloops = 750 + spinloops_sem = 4000 +) + +enum BufferElemStat { + unused = 0 + writing + written + reading +} + +struct Subscription { +mut: + sem &Semaphore + prev &&Subscription + nxt &Subscription +} + +enum Direction { + pop + push +} + +struct Channel { + ringbuf &byte // queue for buffered channels + statusbuf &byte // flags to synchronize write/read in ringbuf + objsize u32 +mut: // atomic + writesem Semaphore // to wake thread that wanted to write, but buffer was full + readsem Semaphore // to wake thread that wanted to read, but buffer was empty + writesem_im Semaphore + readsem_im Semaphore + write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait + read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait + adr_read C.atomic_uintptr_t // used to identify origin of writesem + adr_written C.atomic_uintptr_t // used to identify origin of readsem + write_free u32 // for queue state + read_avail u32 + buf_elem_write_idx u32 + buf_elem_read_idx u32 + // for select + write_subscriber &Subscription + read_subscriber &Subscription + write_sub_mtx u16 + read_sub_mtx u16 + closed u16 +pub: + cap u32 // queue length in #objects +} + +pub fn new_channel<T>(n u32) &Channel { + st := sizeof(T) + if isreftype(T) { + return new_channel_st(n, st) + } else { + return new_channel_st_noscan(n, st) + } +} + +fn new_channel_st(n u32, st u32) &Channel { + wsem := if n > 0 { n } else { 1 } + rsem := if n > 0 { u32(0) } else { 1 } + rbuf := if n > 0 { unsafe { malloc(int(n * st)) } } else { &byte(0) } + sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &byte(0) } + mut ch := Channel{ + objsize: st + cap: n + write_free: n + read_avail: 0 + ringbuf: rbuf + statusbuf: sbuf + write_subscriber: 0 + read_subscriber: 0 + } + ch.writesem.init(wsem) + ch.readsem.init(rsem) + ch.writesem_im.init(0) + ch.readsem_im.init(0) + return &ch +} + +fn new_channel_st_noscan(n u32, st u32) &Channel { + $if gcboehm_opt ? { + wsem := if n > 0 { n } else { 1 } + rsem := if n > 0 { u32(0) } else { 1 } + rbuf := if n > 0 { unsafe { malloc_noscan(int(n * st)) } } else { &byte(0) } + sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &byte(0) } + mut ch := Channel{ + objsize: st + cap: n + write_free: n + read_avail: 0 + ringbuf: rbuf + statusbuf: sbuf + write_subscriber: 0 + read_subscriber: 0 + } + ch.writesem.init(wsem) + ch.readsem.init(rsem) + ch.writesem_im.init(0) + ch.readsem_im.init(0) + return &ch + } $else { + return new_channel_st(n, st) + } +} + +pub fn (ch &Channel) auto_str(typename string) string { + return 'chan $typename{cap: $ch.cap, closed: $ch.closed}' +} + +pub fn (mut ch Channel) close() { + open_val := u16(0) + if !C.atomic_compare_exchange_strong_u16(&ch.closed, &open_val, 1) { + return + } + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, &nulladr, + voidptr(-1)) { + nulladr = voidptr(0) + } + ch.readsem_im.post() + ch.readsem.post() + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + null16 = u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + ch.writesem.post() + if ch.cap == 0 { + C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, voidptr(0)) + } + ch.writesem_im.post() +} + +[inline] +pub fn (mut ch Channel) len() int { + return int(C.atomic_load_u32(&ch.read_avail)) +} + +[inline] +pub fn (mut ch Channel) closed() bool { + return C.atomic_load_u16(&ch.closed) != 0 +} + +[inline] +pub fn (mut ch Channel) push(src voidptr) { + if ch.try_push_priv(src, false) == .closed { + panic('push on closed channel') + } +} + +[inline] +pub fn (mut ch Channel) try_push(src voidptr) ChanState { + return ch.try_push_priv(src, true) +} + +fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState { + if C.atomic_load_u16(&ch.closed) != 0 { + return .closed + } + spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem } + mut have_swapped := false + for { + mut got_sem := false + mut wradr := C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) }) + for wradr != C.NULL { + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) }, + &wradr, voidptr(0)) + { + // there is a reader waiting for us + unsafe { C.memcpy(wradr, src, ch.objsize) } + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_written) }, + &nulladr, wradr) { + nulladr = voidptr(0) + } + ch.readsem_im.post() + return .success + } + } + if no_block && ch.cap == 0 { + return .not_ready + } + // get token to read + for _ in 0 .. spinloops_sem_ { + if got_sem { + break + } + got_sem = ch.writesem.try_wait() + } + if !got_sem { + if no_block { + return .not_ready + } + ch.writesem.wait() + } + if C.atomic_load_u16(&ch.closed) != 0 { + ch.writesem.post() + return .closed + } + if ch.cap == 0 { + // try to advertise current object as readable + mut read_in_progress := false + C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, src) + wradr = C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) }) + if wradr != C.NULL { + mut src2 := src + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) }, + &src2, voidptr(0)) + { + ch.writesem.post() + continue + } else { + read_in_progress = true + } + } + if !read_in_progress { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16, + u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + mut src2 := src + for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ { + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, + &src2, voidptr(0)) + { + have_swapped = true + read_in_progress = true + break + } + src2 = src + } + mut got_im_sem := false + for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ { + got_im_sem = ch.writesem_im.try_wait() + if got_im_sem { + break + } + } + for { + if got_im_sem { + got_im_sem = false + } else { + ch.writesem_im.wait() + } + if C.atomic_load_u16(&ch.closed) != 0 { + if have_swapped + || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) { + ch.writesem.post() + return .success + } else { + return .closed + } + } + if have_swapped + || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_read) }, &src2, voidptr(0)) { + ch.writesem.post() + break + } else { + // this semaphore was not for us - repost in + ch.writesem_im.post() + if src2 == voidptr(-1) { + ch.readsem.post() + return .closed + } + src2 = src + } + } + return .success + } else { + // buffered channel + mut space_in_queue := false + mut wr_free := C.atomic_load_u32(&ch.write_free) + for wr_free > 0 { + space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free, + wr_free - 1) + if space_in_queue { + break + } + } + if space_in_queue { + mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx) + for { + mut new_wr_idx := wr_idx + 1 + for new_wr_idx >= ch.cap { + new_wr_idx -= ch.cap + } + if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, + new_wr_idx) + { + break + } + } + mut wr_ptr := ch.ringbuf + mut status_adr := ch.statusbuf + unsafe { + wr_ptr += wr_idx * ch.objsize + status_adr += wr_idx * sizeof(u16) + } + mut expected_status := u16(BufferElemStat.unused) + for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, + u16(BufferElemStat.writing)) { + expected_status = u16(BufferElemStat.unused) + } + unsafe { + C.memcpy(wr_ptr, src, ch.objsize) + } + C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written)) + C.atomic_fetch_add_u32(&ch.read_avail, 1) + ch.readsem.post() + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + return .success + } else { + if no_block { + return .not_ready + } + ch.writesem.post() + } + } + } + // we should not get here but the V compiler want's to see a return statement + assert false + return .success +} + +[inline] +pub fn (mut ch Channel) pop(dest voidptr) bool { + return ch.try_pop_priv(dest, false) == .success +} + +[inline] +pub fn (mut ch Channel) try_pop(dest voidptr) ChanState { + return ch.try_pop_priv(dest, true) +} + +fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { + spinloops_sem_, spinloops_ := if no_block { 1, 1 } else { sync.spinloops, sync.spinloops_sem } + mut have_swapped := false + mut write_in_progress := false + for { + mut got_sem := false + if ch.cap == 0 { + // unbuffered channel - first see if a `push()` has adversized + mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) }) + for rdadr != C.NULL { + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.read_adr) }, + &rdadr, voidptr(0)) + { + // there is a writer waiting for us + unsafe { C.memcpy(dest, rdadr, ch.objsize) } + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(unsafe { &voidptr(&ch.adr_read) }, + &nulladr, rdadr) { + nulladr = voidptr(0) + } + ch.writesem_im.post() + return .success + } + } + if no_block { + if C.atomic_load_u16(&ch.closed) == 0 { + return .not_ready + } else { + return .closed + } + } + } + // get token to read + for _ in 0 .. spinloops_sem_ { + if got_sem { + break + } + got_sem = ch.readsem.try_wait() + } + if !got_sem { + if no_block { + if C.atomic_load_u16(&ch.closed) == 0 { + return .not_ready + } else { + return .closed + } + } + ch.readsem.wait() + } + if ch.cap > 0 { + // try to get buffer token + mut obj_in_queue := false + mut rd_avail := C.atomic_load_u32(&ch.read_avail) + for rd_avail > 0 { + obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail, + rd_avail - 1) + if obj_in_queue { + break + } + } + if obj_in_queue { + mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx) + for { + mut new_rd_idx := rd_idx + 1 + for new_rd_idx >= ch.cap { + new_rd_idx -= ch.cap + } + if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, + new_rd_idx) + { + break + } + } + mut rd_ptr := ch.ringbuf + mut status_adr := ch.statusbuf + unsafe { + rd_ptr += rd_idx * ch.objsize + status_adr += rd_idx * sizeof(u16) + } + mut expected_status := u16(BufferElemStat.written) + for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, + u16(BufferElemStat.reading)) { + expected_status = u16(BufferElemStat.written) + } + unsafe { + C.memcpy(dest, rd_ptr, ch.objsize) + } + C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused)) + C.atomic_fetch_add_u32(&ch.write_free, 1) + ch.writesem.post() + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + return .success + } + } + // try to advertise `dest` as writable + C.atomic_store_ptr(unsafe { &voidptr(&ch.write_adr) }, dest) + if ch.cap == 0 { + mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) }) + if rdadr != C.NULL { + mut dest2 := dest + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.write_adr) }, + &dest2, voidptr(0)) + { + ch.readsem.post() + continue + } else { + write_in_progress = true + } + } + } + if ch.cap == 0 && !write_in_progress { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } + mut dest2 := dest + for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ { + if C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, + &dest2, voidptr(0)) + { + have_swapped = true + break + } else if dest2 == voidptr(-1) { + ch.readsem.post() + return .closed + } + dest2 = dest + } + mut got_im_sem := false + for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ { + got_im_sem = ch.readsem_im.try_wait() + if got_im_sem { + break + } + } + for { + if got_im_sem { + got_im_sem = false + } else { + ch.readsem_im.wait() + } + if have_swapped + || C.atomic_compare_exchange_strong_ptr(unsafe { &voidptr(&ch.adr_written) }, &dest2, voidptr(0)) { + ch.readsem.post() + break + } else { + // this semaphore was not for us - repost in + ch.readsem_im.post() + if dest2 == voidptr(-1) { + ch.readsem.post() + return .closed + } + dest2 = dest + } + } + break + } + return .success +} + +// Wait `timeout` on any of `channels[i]` until one of them can push (`is_push[i] = true`) or pop (`is_push[i] = false`) +// object referenced by `objrefs[i]`. `timeout = time.infinite` means wait unlimited time. `timeout <= 0` means return +// immediately if no transaction can be performed without waiting. +// return value: the index of the channel on which a transaction has taken place +// -1 if waiting for a transaction has exceeded timeout +// -2 if all channels are closed + +pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int { + assert channels.len == dir.len + assert dir.len == objrefs.len + mut subscr := []Subscription{len: channels.len} + mut sem := unsafe { Semaphore{} } + sem.init(0) + for i, ch in channels { + subscr[i].sem = unsafe { &sem } + if dir[i] == .push { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + subscr[i].prev = unsafe { &ch.write_subscriber } + unsafe { + subscr[i].nxt = C.atomic_exchange_ptr(&voidptr(&ch.write_subscriber), + &subscr[i]) + } + if voidptr(subscr[i].nxt) != voidptr(0) { + subscr[i].nxt.prev = unsafe { &subscr[i].nxt } + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } else { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + subscr[i].prev = unsafe { &ch.read_subscriber } + unsafe { + subscr[i].nxt = C.atomic_exchange_ptr(&voidptr(&ch.read_subscriber), &subscr[i]) + } + if voidptr(subscr[i].nxt) != voidptr(0) { + subscr[i].nxt.prev = unsafe { &subscr[i].nxt } + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + } + stopwatch := if timeout == time.infinite || timeout <= 0 { + time.StopWatch{} + } else { + time.new_stopwatch() + } + mut event_idx := -1 // negative index means `timed out` + + outer: for { + rnd := rand.u32_in_range(0, u32(channels.len)) + mut num_closed := 0 + for j, _ in channels { + mut i := j + int(rnd) + if i >= channels.len { + i -= channels.len + } + if dir[i] == .push { + stat := channels[i].try_push_priv(objrefs[i], true) + if stat == .success { + event_idx = i + break outer + } else if stat == .closed { + num_closed++ + } + } else { + stat := channels[i].try_pop_priv(objrefs[i], true) + if stat == .success { + event_idx = i + break outer + } else if stat == .closed { + num_closed++ + } + } + } + if num_closed == channels.len { + event_idx = -2 + break outer + } + if timeout <= 0 { + break outer + } + if timeout != time.infinite { + remaining := timeout - stopwatch.elapsed() + if !sem.timed_wait(remaining) { + break outer + } + } else { + sem.wait() + } + } + // reset subscribers + for i, ch in channels { + if dir[i] == .push { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + unsafe { + *subscr[i].prev = subscr[i].nxt + } + if subscr[i].nxt != 0 { + subscr[i].nxt.prev = subscr[i].prev + // just in case we have missed a semaphore during restore + subscr[i].nxt.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + } else { + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + unsafe { + *subscr[i].prev = subscr[i].nxt + } + if subscr[i].nxt != 0 { + subscr[i].nxt.prev = subscr[i].prev + subscr[i].nxt.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + } + } + sem.destroy() + return event_idx +} |