diff options
Diffstat (limited to 'v_windows/v/vlib/sync/pool')
| -rw-r--r-- | v_windows/v/vlib/sync/pool/README.md | 36 | ||||
| -rw-r--r-- | v_windows/v/vlib/sync/pool/pool.v | 165 | ||||
| -rw-r--r-- | v_windows/v/vlib/sync/pool/pool_test.v | 52 | 
3 files changed, 253 insertions, 0 deletions
diff --git a/v_windows/v/vlib/sync/pool/README.md b/v_windows/v/vlib/sync/pool/README.md new file mode 100644 index 0000000..bdea5b3 --- /dev/null +++ b/v_windows/v/vlib/sync/pool/README.md @@ -0,0 +1,36 @@ + +The `sync.pool` module provides a convenient way to run identical tasks over +an array of items *in parallel*, without worrying about thread synchronization, +waitgroups, mutexes etc.., you just need to supply a callback function, that +will be called once per each item in your input array. + +After all the work is done in parallel by the worker threads in the pool, +pool.work_on_items will return. You can then call pool.get_results<Result>() +to retrieve a list of all the results, that the worker callbacks returned +for each input item. Example: + +```v +import sync.pool + +struct SResult { +	s string +} + +fn sprocess(pp &pool.PoolProcessor, idx int, wid int) &SResult { +	item := pp.get_item<string>(idx) +	println('idx: $idx, wid: $wid, item: ' + item) +	return &SResult{item.reverse()} +} + +fn main() { +	mut pp := pool.new_pool_processor(callback: sprocess) +	pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc']) +	// optionally, you can iterate over the results too: +	for x in pp.get_results<SResult>() { +		println('result: $x.s') +	} +} +``` + +See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a +more detailed usage example. diff --git a/v_windows/v/vlib/sync/pool/pool.v b/v_windows/v/vlib/sync/pool/pool.v new file mode 100644 index 0000000..b2c5340 --- /dev/null +++ b/v_windows/v/vlib/sync/pool/pool.v @@ -0,0 +1,165 @@ +module pool + +import sync +import runtime + +[trusted] +fn C.atomic_fetch_add_u32(voidptr, u32) u32 + +pub const ( +	no_result = voidptr(0) +) + +pub struct PoolProcessor { +	thread_cb voidptr +mut: +	njobs           int +	items           []voidptr +	results         []voidptr +	ntask           u32 // reading/writing to this should be atomic +	waitgroup       sync.WaitGroup +	shared_context  voidptr +	thread_contexts []voidptr +} + +pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr + +pub struct PoolProcessorConfig { +	maxjobs  int +	callback ThreadCB +} + +// new_pool_processor returns a new PoolProcessor instance. +// The parameters of new_pool_processor are: +//    context.maxjobs: when 0 (the default), the PoolProcessor will use a +//      number of threads, that is optimal for your system to process your items. +//    context.callback: this should be a callback function, that each worker +//      thread in the pool will run for each item. +//      The callback function will receive as parameters: +//      1) the PoolProcessor instance, so it can call +//            p.get_item<int>(idx) to get the actual item at index idx +//      2) idx - the index of the currently processed item +//      3) task_id - the index of the worker thread in which the callback +//            function is running. +pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor { +	if isnil(context.callback) { +		panic('You need to pass a valid callback to new_pool_processor.') +	} +	mut pool := PoolProcessor{ +		items: [] +		results: [] +		shared_context: voidptr(0) +		thread_contexts: [] +		njobs: context.maxjobs +		ntask: 0 +		thread_cb: voidptr(context.callback) +	} +	pool.waitgroup.init() +	return &pool +} + +// set_max_jobs gives you the ability to override the number +// of jobs *after* the PoolProcessor had been created already. +pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) { +	pool.njobs = njobs +} + +// work_on_items receives a list of items of type T, +// then starts a work pool of pool.njobs threads, each running +// pool.thread_cb in a loop, untill all items in the list, +// are processed. +// When pool.njobs is 0, the number of jobs is determined +// by the number of available cores on the system. +// work_on_items returns *after* all threads finish. +// You can optionally call get_results after that. +pub fn (mut pool PoolProcessor) work_on_items<T>(items []T) { +	pool.work_on_pointers(unsafe { items.pointers() }) +} + +pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) { +	mut njobs := runtime.nr_jobs() +	if pool.njobs > 0 { +		njobs = pool.njobs +	} +	pool.items = [] +	pool.results = [] +	pool.thread_contexts = [] +	pool.items << items +	pool.results = []voidptr{len: (pool.items.len)} +	pool.thread_contexts << []voidptr{len: (pool.items.len)} +	pool.waitgroup.add(njobs) +	for i := 0; i < njobs; i++ { +		if njobs > 1 { +			go process_in_thread(mut pool, i) +		} else { +			// do not run concurrently, just use the same thread: +			process_in_thread(mut pool, i) +		} +	} +	pool.waitgroup.wait() +} + +// process_in_thread does the actual work of worker thread. +// It is a workaround for the current inability to pass a +// method in a callback. +fn process_in_thread(mut pool PoolProcessor, task_id int) { +	cb := ThreadCB(pool.thread_cb) +	ilen := pool.items.len +	for { +		idx := int(C.atomic_fetch_add_u32(&pool.ntask, 1)) +		if idx >= ilen { +			break +		} +		pool.results[idx] = cb(pool, idx, task_id) +	} +	pool.waitgroup.done() +} + +// get_item - called by the worker callback. +// Retrieves a type safe instance of the currently processed item +pub fn (pool &PoolProcessor) get_item<T>(idx int) T { +	return *(&T(pool.items[idx])) +} + +// get_result - called by the main thread to get a specific result. +// Retrieves a type safe instance of the produced result. +pub fn (pool &PoolProcessor) get_result<T>(idx int) T { +	return *(&T(pool.results[idx])) +} + +// get_results - get a list of type safe results in the main thread. +pub fn (pool &PoolProcessor) get_results<T>() []T { +	mut res := []T{} +	for i in 0 .. pool.results.len { +		res << *(&T(pool.results[i])) +	} +	return res +} + +// set_shared_context - can be called during the setup so that you can +// provide a context that is shared between all worker threads, like +// common options/settings. +pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) { +	pool.shared_context = context +} + +// get_shared_context - can be called in each worker callback, to get +// the context set by pool.set_shared_context +pub fn (pool &PoolProcessor) get_shared_context() voidptr { +	return pool.shared_context +} + +// set_thread_context - can be called during the setup at the start of +// each worker callback, so that the worker callback can have some thread +// local storage area where it can write/read information that is private +// to the given thread, without worrying that it will get overwritten by +// another thread +pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) { +	pool.thread_contexts[idx] = context +} + +// get_thread_context - returns a pointer, that was set with +// pool.set_thread_context . This pointer is private to each thread. +pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr { +	return pool.thread_contexts[idx] +} diff --git a/v_windows/v/vlib/sync/pool/pool_test.v b/v_windows/v/vlib/sync/pool/pool_test.v new file mode 100644 index 0000000..629b524 --- /dev/null +++ b/v_windows/v/vlib/sync/pool/pool_test.v @@ -0,0 +1,52 @@ +import time +import sync.pool + +pub struct SResult { +	s string +} + +pub struct IResult { +	i int +} + +fn worker_s(p &pool.PoolProcessor, idx int, worker_id int) &SResult { +	item := p.get_item<string>(idx) +	println('worker_s worker_id: $worker_id | idx: $idx | item: $item') +	time.sleep(3 * time.millisecond) +	return &SResult{'$item $item'} +} + +fn worker_i(p &pool.PoolProcessor, idx int, worker_id int) &IResult { +	item := p.get_item<int>(idx) +	println('worker_i worker_id: $worker_id | idx: $idx | item: $item') +	time.sleep(5 * time.millisecond) +	return &IResult{item * 1000} +} + +fn test_work_on_strings() { +	mut pool_s := pool.new_pool_processor( +		callback: worker_s +		maxjobs: 8 +	) + +	pool_s.work_on_items(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']) +	for x in pool_s.get_results<SResult>() { +		println(x.s) +		assert x.s.len > 1 +	} +} + +fn test_work_on_ints() { +	// NB: since maxjobs is left empty here, +	// the pool processor will use njobs = runtime.nr_jobs so that +	// it will work optimally without overloading the system +	mut pool_i := pool.new_pool_processor( +		callback: worker_i +	) + +	pool_i.work_on_items([1, 2, 3, 4, 5, 6, 7, 8]) +	for x in pool_i.get_results<IResult>() { +		println(x.i) +		assert x.i > 100 +	} +}  | 
