1 /******************************************************************************* 2 3 Example implementation/usage of stream processor which does no I/O and thus 4 can is also used as a unit test 5 6 Copyright: 7 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 8 All rights reserved. 9 10 License: 11 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 12 Alternatively, this file may be distributed under the terms of the Tango 13 3-Clause BSD License (see LICENSE_BSD.txt for details). 14 15 *******************************************************************************/ 16 17 module ocean.task.ThrottledTaskPool_test; 18 19 20 import ocean.task.Task; 21 import ocean.task.Scheduler; 22 import ocean.task.ThrottledTaskPool; 23 import ocean.task.util.Timer; 24 25 import ocean.io.model.ISuspendable; 26 import ocean.io.Stdout; 27 import ocean.io.select.client.TimerEvent; 28 29 import ocean.core.Test; 30 import ocean.core.Verify; 31 32 static import core.thread; 33 34 /******************************************************************************* 35 36 Example of processing task, one which is managed by StreamProcessor and 37 gets scheduled for each new record arriving from Generator 38 39 *******************************************************************************/ 40 41 class ProcessingTask : Task 42 { 43 int x; 44 45 static int total; 46 static size_t recycle_count; 47 48 void copyArguments ( int x ) 49 { 50 this.x = x; 51 } 52 53 override public void recycle ( ) 54 { 55 recycle_count++; 56 } 57 58 override void run ( ) 59 { 60 test!("==")(theScheduler.getStats.worker_fiber_busy, 0); 61 62 ++total; 63 .wait(100); 64 65 if (this.x == 1000) 66 theScheduler.shutdown(); 67 } 68 } 69 70 /******************************************************************************* 71 72 "stream" or "generator" class, one which keeps producing new records for 73 processing at throttled rate 74 75 *******************************************************************************/ 76 77 class Generator : ISuspendable 78 { 79 TimerEvent timer; 80 ThrottledTaskPool!(ProcessingTask) pool; 81 int counter; 82 83 this ( ) 84 { 85 this.timer = new TimerEvent(&this.generate); 86 this.pool = new ThrottledTaskPool!(ProcessingTask)(10, 0); 87 this.pool.throttler.addSuspendable(this); 88 } 89 90 void start ( ) 91 { 92 this.timer.set(0, 1, 0, 1); 93 this.resume(); 94 } 95 96 override void resume ( ) 97 { 98 theScheduler.epoll.register(this.timer); 99 } 100 101 override void suspend ( ) 102 { 103 theScheduler.epoll.unregister(this.timer); 104 } 105 106 override bool suspended ( ) 107 { 108 return this.timer.is_registered; 109 } 110 111 bool generate ( ) 112 { 113 this.pool.start(this.counter); 114 ++this.counter; 115 return true; 116 } 117 } 118 119 unittest 120 { 121 SchedulerConfiguration config; 122 with (config) 123 { 124 worker_fiber_limit = 10; 125 task_queue_limit = 30; 126 specialized_pools = [ 127 PoolDescription(ProcessingTask.classinfo.name, 10240) 128 ]; 129 } 130 initScheduler(config); 131 132 auto generator = new Generator; 133 134 generator.start(); 135 theScheduler.eventLoop(); 136 137 test!("==")(ProcessingTask.recycle_count, ProcessingTask.total); 138 test!("==")(generator.pool.num_busy(), 0); 139 140 // exact number of tasks that will be processed before the shutdown 141 // may vary but it must always be at most 1000 + task_queue_limit 142 test!(">=")(ProcessingTask.total, 1000); 143 test!("<=")(ProcessingTask.total, 1000 + config.task_queue_limit); 144 145 auto stats 146 = theScheduler.getSpecializedPoolStats(ProcessingTask.classinfo.name); 147 stats.visit( 148 ( ) { test(false); }, 149 (ref Scheduler.SpecializedPoolStats s) { 150 // not exact comparison because tasks may be finish fast 151 // enough that throttling is never hit 152 test!("<=")(s.total_fibers, 10); 153 test!("==")(s.used_fibers, 0); 154 } 155 ); 156 }