aboutsummaryrefslogtreecommitdiff
path: root/v_windows/v/vlib/sync/pool
diff options
context:
space:
mode:
Diffstat (limited to 'v_windows/v/vlib/sync/pool')
-rw-r--r--v_windows/v/vlib/sync/pool/README.md36
-rw-r--r--v_windows/v/vlib/sync/pool/pool.v165
-rw-r--r--v_windows/v/vlib/sync/pool/pool_test.v52
3 files changed, 253 insertions, 0 deletions
diff --git a/v_windows/v/vlib/sync/pool/README.md b/v_windows/v/vlib/sync/pool/README.md
new file mode 100644
index 0000000..bdea5b3
--- /dev/null
+++ b/v_windows/v/vlib/sync/pool/README.md
@@ -0,0 +1,36 @@
+
+The `sync.pool` module provides a convenient way to run identical tasks over
+an array of items *in parallel*, without worrying about thread synchronization,
+waitgroups, mutexes etc.., you just need to supply a callback function, that
+will be called once per each item in your input array.
+
+After all the work is done in parallel by the worker threads in the pool,
+pool.work_on_items will return. You can then call pool.get_results<Result>()
+to retrieve a list of all the results, that the worker callbacks returned
+for each input item. Example:
+
+```v
+import sync.pool
+
+struct SResult {
+ s string
+}
+
+fn sprocess(pp &pool.PoolProcessor, idx int, wid int) &SResult {
+ item := pp.get_item<string>(idx)
+ println('idx: $idx, wid: $wid, item: ' + item)
+ return &SResult{item.reverse()}
+}
+
+fn main() {
+ mut pp := pool.new_pool_processor(callback: sprocess)
+ pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc'])
+ // optionally, you can iterate over the results too:
+ for x in pp.get_results<SResult>() {
+ println('result: $x.s')
+ }
+}
+```
+
+See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a
+more detailed usage example.
diff --git a/v_windows/v/vlib/sync/pool/pool.v b/v_windows/v/vlib/sync/pool/pool.v
new file mode 100644
index 0000000..b2c5340
--- /dev/null
+++ b/v_windows/v/vlib/sync/pool/pool.v
@@ -0,0 +1,165 @@
+module pool
+
+import sync
+import runtime
+
+[trusted]
+fn C.atomic_fetch_add_u32(voidptr, u32) u32
+
+pub const (
+ no_result = voidptr(0)
+)
+
+pub struct PoolProcessor {
+ thread_cb voidptr
+mut:
+ njobs int
+ items []voidptr
+ results []voidptr
+ ntask u32 // reading/writing to this should be atomic
+ waitgroup sync.WaitGroup
+ shared_context voidptr
+ thread_contexts []voidptr
+}
+
+pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr
+
+pub struct PoolProcessorConfig {
+ maxjobs int
+ callback ThreadCB
+}
+
+// new_pool_processor returns a new PoolProcessor instance.
+// The parameters of new_pool_processor are:
+// context.maxjobs: when 0 (the default), the PoolProcessor will use a
+// number of threads, that is optimal for your system to process your items.
+// context.callback: this should be a callback function, that each worker
+// thread in the pool will run for each item.
+// The callback function will receive as parameters:
+// 1) the PoolProcessor instance, so it can call
+// p.get_item<int>(idx) to get the actual item at index idx
+// 2) idx - the index of the currently processed item
+// 3) task_id - the index of the worker thread in which the callback
+// function is running.
+pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor {
+ if isnil(context.callback) {
+ panic('You need to pass a valid callback to new_pool_processor.')
+ }
+ mut pool := PoolProcessor{
+ items: []
+ results: []
+ shared_context: voidptr(0)
+ thread_contexts: []
+ njobs: context.maxjobs
+ ntask: 0
+ thread_cb: voidptr(context.callback)
+ }
+ pool.waitgroup.init()
+ return &pool
+}
+
+// set_max_jobs gives you the ability to override the number
+// of jobs *after* the PoolProcessor had been created already.
+pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) {
+ pool.njobs = njobs
+}
+
+// work_on_items receives a list of items of type T,
+// then starts a work pool of pool.njobs threads, each running
+// pool.thread_cb in a loop, untill all items in the list,
+// are processed.
+// When pool.njobs is 0, the number of jobs is determined
+// by the number of available cores on the system.
+// work_on_items returns *after* all threads finish.
+// You can optionally call get_results after that.
+pub fn (mut pool PoolProcessor) work_on_items<T>(items []T) {
+ pool.work_on_pointers(unsafe { items.pointers() })
+}
+
+pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) {
+ mut njobs := runtime.nr_jobs()
+ if pool.njobs > 0 {
+ njobs = pool.njobs
+ }
+ pool.items = []
+ pool.results = []
+ pool.thread_contexts = []
+ pool.items << items
+ pool.results = []voidptr{len: (pool.items.len)}
+ pool.thread_contexts << []voidptr{len: (pool.items.len)}
+ pool.waitgroup.add(njobs)
+ for i := 0; i < njobs; i++ {
+ if njobs > 1 {
+ go process_in_thread(mut pool, i)
+ } else {
+ // do not run concurrently, just use the same thread:
+ process_in_thread(mut pool, i)
+ }
+ }
+ pool.waitgroup.wait()
+}
+
+// process_in_thread does the actual work of worker thread.
+// It is a workaround for the current inability to pass a
+// method in a callback.
+fn process_in_thread(mut pool PoolProcessor, task_id int) {
+ cb := ThreadCB(pool.thread_cb)
+ ilen := pool.items.len
+ for {
+ idx := int(C.atomic_fetch_add_u32(&pool.ntask, 1))
+ if idx >= ilen {
+ break
+ }
+ pool.results[idx] = cb(pool, idx, task_id)
+ }
+ pool.waitgroup.done()
+}
+
+// get_item - called by the worker callback.
+// Retrieves a type safe instance of the currently processed item
+pub fn (pool &PoolProcessor) get_item<T>(idx int) T {
+ return *(&T(pool.items[idx]))
+}
+
+// get_result - called by the main thread to get a specific result.
+// Retrieves a type safe instance of the produced result.
+pub fn (pool &PoolProcessor) get_result<T>(idx int) T {
+ return *(&T(pool.results[idx]))
+}
+
+// get_results - get a list of type safe results in the main thread.
+pub fn (pool &PoolProcessor) get_results<T>() []T {
+ mut res := []T{}
+ for i in 0 .. pool.results.len {
+ res << *(&T(pool.results[i]))
+ }
+ return res
+}
+
+// set_shared_context - can be called during the setup so that you can
+// provide a context that is shared between all worker threads, like
+// common options/settings.
+pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) {
+ pool.shared_context = context
+}
+
+// get_shared_context - can be called in each worker callback, to get
+// the context set by pool.set_shared_context
+pub fn (pool &PoolProcessor) get_shared_context() voidptr {
+ return pool.shared_context
+}
+
+// set_thread_context - can be called during the setup at the start of
+// each worker callback, so that the worker callback can have some thread
+// local storage area where it can write/read information that is private
+// to the given thread, without worrying that it will get overwritten by
+// another thread
+pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) {
+ pool.thread_contexts[idx] = context
+}
+
+// get_thread_context - returns a pointer, that was set with
+// pool.set_thread_context . This pointer is private to each thread.
+pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr {
+ return pool.thread_contexts[idx]
+}
diff --git a/v_windows/v/vlib/sync/pool/pool_test.v b/v_windows/v/vlib/sync/pool/pool_test.v
new file mode 100644
index 0000000..629b524
--- /dev/null
+++ b/v_windows/v/vlib/sync/pool/pool_test.v
@@ -0,0 +1,52 @@
+import time
+import sync.pool
+
+pub struct SResult {
+ s string
+}
+
+pub struct IResult {
+ i int
+}
+
+fn worker_s(p &pool.PoolProcessor, idx int, worker_id int) &SResult {
+ item := p.get_item<string>(idx)
+ println('worker_s worker_id: $worker_id | idx: $idx | item: $item')
+ time.sleep(3 * time.millisecond)
+ return &SResult{'$item $item'}
+}
+
+fn worker_i(p &pool.PoolProcessor, idx int, worker_id int) &IResult {
+ item := p.get_item<int>(idx)
+ println('worker_i worker_id: $worker_id | idx: $idx | item: $item')
+ time.sleep(5 * time.millisecond)
+ return &IResult{item * 1000}
+}
+
+fn test_work_on_strings() {
+ mut pool_s := pool.new_pool_processor(
+ callback: worker_s
+ maxjobs: 8
+ )
+
+ pool_s.work_on_items(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'])
+ for x in pool_s.get_results<SResult>() {
+ println(x.s)
+ assert x.s.len > 1
+ }
+}
+
+fn test_work_on_ints() {
+ // NB: since maxjobs is left empty here,
+ // the pool processor will use njobs = runtime.nr_jobs so that
+ // it will work optimally without overloading the system
+ mut pool_i := pool.new_pool_processor(
+ callback: worker_i
+ )
+
+ pool_i.work_on_items([1, 2, 3, 4, 5, 6, 7, 8])
+ for x in pool_i.get_results<IResult>() {
+ println(x.i)
+ assert x.i > 100
+ }
+}