1 /*******************************************************************************
2 
3     Example implementation/usage of stream processor which does no I/O and thus
4     can is also used as a unit test
5 
6     Copyright:
7         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
8         All rights reserved.
9 
10     License:
11         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
12         Alternatively, this file may be distributed under the terms of the Tango
13         3-Clause BSD License (see LICENSE_BSD.txt for details).
14 
15 *******************************************************************************/
16 
17 module ocean.task.ThrottledTaskPool_test;
18 
19 
20 import ocean.task.Task;
21 import ocean.task.Scheduler;
22 import ocean.task.ThrottledTaskPool;
23 import ocean.task.util.Timer;
24 
25 import ocean.io.model.ISuspendable;
26 import ocean.io.Stdout;
27 import ocean.io.select.client.TimerEvent;
28 
29 import ocean.core.Test;
30 import ocean.core.Verify;
31 
32 static import core.thread;
33 
34 /*******************************************************************************
35 
36     Example of processing task, one which is managed by StreamProcessor and
37     gets scheduled for each new record arriving from Generator
38 
39 *******************************************************************************/
40 
41 class ProcessingTask : Task
42 {
43     int x;
44 
45     static int total;
46     static size_t recycle_count;
47 
48     void copyArguments ( int x )
49     {
50         this.x = x;
51     }
52 
53     override public void recycle ( )
54     {
55         recycle_count++;
56     }
57 
58     override void run ( )
59     {
60         test!("==")(theScheduler.getStats.worker_fiber_busy, 0);
61 
62         ++total;
63         .wait(100);
64 
65         if (this.x == 1000)
66             theScheduler.shutdown();
67     }
68 }
69 
70 /*******************************************************************************
71 
72     "stream" or "generator" class, one which keeps producing new records for
73     processing at throttled rate
74 
75 *******************************************************************************/
76 
77 class Generator : ISuspendable
78 {
79     TimerEvent timer;
80     ThrottledTaskPool!(ProcessingTask) pool;
81     int counter;
82 
83     this ( )
84     {
85         this.timer = new TimerEvent(&this.generate);
86         this.pool = new ThrottledTaskPool!(ProcessingTask)(10, 0);
87         this.pool.throttler.addSuspendable(this);
88     }
89 
90     void start ( )
91     {
92         this.timer.set(0, 1, 0, 1);
93         this.resume();
94     }
95 
96     override void resume ( )
97     {
98         theScheduler.epoll.register(this.timer);
99     }
100 
101     override void suspend ( )
102     {
103         theScheduler.epoll.unregister(this.timer);
104     }
105 
106     override bool suspended ( )
107     {
108         return this.timer.is_registered;
109     }
110 
111     bool generate ( )
112     {
113         this.pool.start(this.counter);
114         ++this.counter;
115         return true;
116     }
117 }
118 
119 unittest
120 {
121     SchedulerConfiguration config;
122     with (config)
123     {
124         worker_fiber_limit = 10;
125         task_queue_limit = 30;
126         specialized_pools = [
127             PoolDescription(ProcessingTask.classinfo.name, 10240)
128         ];
129     }
130     initScheduler(config);
131 
132     auto generator = new Generator;
133 
134     generator.start();
135     theScheduler.eventLoop();
136 
137     test!("==")(ProcessingTask.recycle_count, ProcessingTask.total);
138     test!("==")(generator.pool.num_busy(), 0);
139 
140     // exact number of tasks that will be processed before the shutdown
141     // may vary but it must always be at most 1000 + task_queue_limit
142     test!(">=")(ProcessingTask.total, 1000);
143     test!("<=")(ProcessingTask.total, 1000 + config.task_queue_limit);
144 
145     auto stats
146         = theScheduler.getSpecializedPoolStats(ProcessingTask.classinfo.name);
147     stats.visit(
148         ( ) { test(false); },
149         (ref Scheduler.SpecializedPoolStats s) {
150             // not exact comparison because tasks may be finish fast
151             // enough that throttling is never hit
152             test!("<=")(s.total_fibers, 10);
153             test!("==")(s.used_fibers, 0);
154         }
155     );
156 }