1 /*******************************************************************************
2
3 Example implementation/usage of stream processor which does no I/O and thus
4 can is also used as a unit test
5
6 Copyright:
7 Copyright (c) 2009-2016 dunnhumby Germany GmbH.
8 All rights reserved.
9
10 License:
11 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
12 Alternatively, this file may be distributed under the terms of the Tango
13 3-Clause BSD License (see LICENSE_BSD.txt for details).
14
15 *******************************************************************************/
16
17 module ocean.task.ThrottledTaskPool_test;
18
19
20 import ocean.task.Task;
21 import ocean.task.Scheduler;
22 import ocean.task.ThrottledTaskPool;
23 import ocean.task.util.Timer;
24
25 import ocean.io.model.ISuspendable;
26 import ocean.io.Stdout;
27 import ocean.io.select.client.TimerEvent;
28
29 import ocean.core.Test;
30 import ocean.core.Verify;
31
32 static import core.thread;
33
34 /*******************************************************************************
35
36 Example of processing task, one which is managed by StreamProcessor and
37 gets scheduled for each new record arriving from Generator
38
39 *******************************************************************************/
40
41 class ProcessingTask : Task
42 {
43 int x;
44
45 static int total;
46 static size_t recycle_count;
47
48 void copyArguments ( int x )
49 {
50 this.x = x;
51 }
52
53 override public void recycle ( )
54 {
55 recycle_count++;
56 }
57
58 override void run ( )
59 {
60 test!("==")(theScheduler.getStats.worker_fiber_busy, 0);
61
62 ++total;
63 .wait(100);
64
65 if (this.x == 1000)
66 theScheduler.shutdown();
67 }
68 }
69
70 /*******************************************************************************
71
72 "stream" or "generator" class, one which keeps producing new records for
73 processing at throttled rate
74
75 *******************************************************************************/
76
77 class Generator : ISuspendable
78 {
79 TimerEvent timer;
80 ThrottledTaskPool!(ProcessingTask) pool;
81 int counter;
82
83 this ( )
84 {
85 this.timer = new TimerEvent(&this.generate);
86 this.pool = new ThrottledTaskPool!(ProcessingTask)(10, 0);
87 this.pool.throttler.addSuspendable(this);
88 }
89
90 void start ( )
91 {
92 this.timer.set(0, 1, 0, 1);
93 this.resume();
94 }
95
96 override void resume ( )
97 {
98 theScheduler.epoll.register(this.timer);
99 }
100
101 override void suspend ( )
102 {
103 theScheduler.epoll.unregister(this.timer);
104 }
105
106 override bool suspended ( )
107 {
108 return this.timer.is_registered;
109 }
110
111 bool generate ( )
112 {
113 this.pool.start(this.counter);
114 ++this.counter;
115 return true;
116 }
117 }
118
119 unittest
120 {
121 SchedulerConfiguration config;
122 with (config)
123 {
124 worker_fiber_limit = 10;
125 task_queue_limit = 30;
126 specialized_pools = [
127 PoolDescription(ProcessingTask.classinfo.name, 10240)
128 ];
129 }
130 initScheduler(config);
131
132 auto generator = new Generator;
133
134 generator.start();
135 theScheduler.eventLoop();
136
137 test!("==")(ProcessingTask.recycle_count, ProcessingTask.total);
138 test!("==")(generator.pool.num_busy(), 0);
139
140 // exact number of tasks that will be processed before the shutdown
141 // may vary but it must always be at most 1000 + task_queue_limit
142 test!(">=")(ProcessingTask.total, 1000);
143 test!("<=")(ProcessingTask.total, 1000 + config.task_queue_limit);
144
145 auto stats
146 = theScheduler.getSpecializedPoolStats(ProcessingTask.classinfo.name);
147 stats.visit(
148 ( ) { test(false); },
149 (ref Scheduler.SpecializedPoolStats s) {
150 // not exact comparison because tasks may be finish fast
151 // enough that throttling is never hit
152 test!("<=")(s.total_fibers, 10);
153 test!("==")(s.used_fibers, 0);
154 }
155 );
156 }