1 /*******************************************************************************
2 
3     Copyright:
4         Copyright (c) 20017 dunnhumby Germany GmbH.
5         All rights reserved.
6 
7     License:
8         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
9         Alternatively, this file may be distributed under the terms of the Tango
10         3-Clause BSD License (see LICENSE_BSD.txt for details).
11 
12 *******************************************************************************/
13 
14 module ocean.task.util.StreamProcessor_test;
15 
16 
17 import ocean.task.Task;
18 import ocean.task.Scheduler;
19 import ocean.task.util.StreamProcessor;
20 import ocean.task.util.Timer;
21 
22 import ocean.io.model.ISuspendable;
23 import ocean.io.Stdout;
24 import ocean.io.select.client.TimerEvent;
25 
26 import ocean.core.Test;
27 import ocean.core.Verify;
28 
29 /*******************************************************************************
30 
31     Example of processing task, one which is managed by StreamProcessor and
32     gets scheduled for each new record arriving from Generator
33 
34 *******************************************************************************/
35 
36 class ProcessingTask : Task
37 {
38     int x;
39 
40     static int total;
41 
42     void copyArguments ( int x )
43     {
44         this.x = x;
45     }
46 
47     override void run ( )
48     {
49         .wait(100);
50         ++total;
51 
52         if (this.x == 1000)
53             theScheduler.shutdown();
54     }
55 }
56 
57 /*******************************************************************************
58 
59     "stream" or "generator" class, one which keeps producing new records for
60     processing at throttled rate
61 
62 *******************************************************************************/
63 
64 class Generator : ISuspendable
65 {
66     TimerEvent timer;
67     StreamProcessor!(ProcessingTask) stream_processor;
68     int counter;
69 
70     this ( )
71     {
72         this.timer = new TimerEvent(&this.generate);
73 
74         auto throttler_config = ThrottlerConfig(10, 1);
75         this.stream_processor = new StreamProcessor!(ProcessingTask)(throttler_config);
76         this.stream_processor.addStream(this);
77     }
78 
79     void start ( )
80     {
81         this.timer.set(0, 1, 0, 1);
82         this.resume();
83     }
84 
85     override void resume ( )
86     {
87         theScheduler.epoll.register(this.timer);
88     }
89 
90     override void suspend ( )
91     {
92         theScheduler.epoll.unregister(this.timer);
93     }
94 
95     override bool suspended ( )
96     {
97         return this.timer.is_registered;
98     }
99 
100     bool generate ( )
101     {
102         this.stream_processor.process(this.counter);
103         ++this.counter;
104         return true;
105     }
106 }
107 
108 unittest
109 {
110     SchedulerConfiguration config;
111     config.worker_fiber_limit = 10;
112     config.task_queue_limit = 30;
113     initScheduler(config);
114 
115     auto generator = new Generator;
116     generator.start();
117     theScheduler.eventLoop();
118 
119     // exact number of tasks that will be processed before the shutdown
120     // may vary but it must always be at most 1000 + task_queue_limit
121     test!(">=")(ProcessingTask.total, 1000);
122     test!("<=")(ProcessingTask.total, 1000 + config.task_queue_limit);
123 }
124 
125 unittest
126 {
127     SchedulerConfiguration config;
128     initScheduler(config);
129 
130     static class DummyTask : Task
131     {
132         override public void run ( ) { }
133         public void copyArguments ( ) { }
134     }
135 
136     {
137         // suspend point >= task queue
138         auto throttler_config = ThrottlerConfig(config.task_queue_limit, 1);
139         testThrown!(ThrottlerFailureException)(new StreamProcessor!(DummyTask)(
140             throttler_config));
141     }
142 
143     {
144         // resume point >= task queue
145         auto throttler_config = ThrottlerConfig(1, config.task_queue_limit);
146         testThrown!(ThrottlerFailureException)(new StreamProcessor!(DummyTask)(
147             throttler_config));
148     }
149 
150     {
151         // works
152         auto throttler_config = ThrottlerConfig(config.task_queue_limit - 1, 1);
153         auto processor = new StreamProcessor!(DummyTask)(throttler_config);
154     }
155 }
156