1 /*******************************************************************************
2
3 Copyright:
4 Copyright (c) 20017 dunnhumby Germany GmbH.
5 All rights reserved.
6
7 License:
8 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
9 Alternatively, this file may be distributed under the terms of the Tango
10 3-Clause BSD License (see LICENSE_BSD.txt for details).
11
12 *******************************************************************************/
13
14 module ocean.task.util.StreamProcessor_test;
15
16
17 import ocean.task.Task;
18 import ocean.task.Scheduler;
19 import ocean.task.util.StreamProcessor;
20 import ocean.task.util.Timer;
21
22 import ocean.io.model.ISuspendable;
23 import ocean.io.Stdout;
24 import ocean.io.select.client.TimerEvent;
25
26 import ocean.core.Test;
27 import ocean.core.Verify;
28
29 /*******************************************************************************
30
31 Example of processing task, one which is managed by StreamProcessor and
32 gets scheduled for each new record arriving from Generator
33
34 *******************************************************************************/
35
36 class ProcessingTask : Task
37 {
38 int x;
39
40 static int total;
41
42 void copyArguments ( int x )
43 {
44 this.x = x;
45 }
46
47 override void run ( )
48 {
49 .wait(100);
50 ++total;
51
52 if (this.x == 1000)
53 theScheduler.shutdown();
54 }
55 }
56
57 /*******************************************************************************
58
59 "stream" or "generator" class, one which keeps producing new records for
60 processing at throttled rate
61
62 *******************************************************************************/
63
64 class Generator : ISuspendable
65 {
66 TimerEvent timer;
67 StreamProcessor!(ProcessingTask) stream_processor;
68 int counter;
69
70 this ( )
71 {
72 this.timer = new TimerEvent(&this.generate);
73
74 auto throttler_config = ThrottlerConfig(10, 1);
75 this.stream_processor = new StreamProcessor!(ProcessingTask)(throttler_config);
76 this.stream_processor.addStream(this);
77 }
78
79 void start ( )
80 {
81 this.timer.set(0, 1, 0, 1);
82 this.resume();
83 }
84
85 override void resume ( )
86 {
87 theScheduler.epoll.register(this.timer);
88 }
89
90 override void suspend ( )
91 {
92 theScheduler.epoll.unregister(this.timer);
93 }
94
95 override bool suspended ( )
96 {
97 return this.timer.is_registered;
98 }
99
100 bool generate ( )
101 {
102 this.stream_processor.process(this.counter);
103 ++this.counter;
104 return true;
105 }
106 }
107
108 unittest
109 {
110 SchedulerConfiguration config;
111 config.worker_fiber_limit = 10;
112 config.task_queue_limit = 30;
113 initScheduler(config);
114
115 auto generator = new Generator;
116 generator.start();
117 theScheduler.eventLoop();
118
119 // exact number of tasks that will be processed before the shutdown
120 // may vary but it must always be at most 1000 + task_queue_limit
121 test!(">=")(ProcessingTask.total, 1000);
122 test!("<=")(ProcessingTask.total, 1000 + config.task_queue_limit);
123 }
124
125 unittest
126 {
127 SchedulerConfiguration config;
128 initScheduler(config);
129
130 static class DummyTask : Task
131 {
132 override public void run ( ) { }
133 public void copyArguments ( ) { }
134 }
135
136 {
137 // suspend point >= task queue
138 auto throttler_config = ThrottlerConfig(config.task_queue_limit, 1);
139 testThrown!(ThrottlerFailureException)(new StreamProcessor!(DummyTask)(
140 throttler_config));
141 }
142
143 {
144 // resume point >= task queue
145 auto throttler_config = ThrottlerConfig(1, config.task_queue_limit);
146 testThrown!(ThrottlerFailureException)(new StreamProcessor!(DummyTask)(
147 throttler_config));
148 }
149
150 {
151 // works
152 auto throttler_config = ThrottlerConfig(config.task_queue_limit - 1, 1);
153 auto processor = new StreamProcessor!(DummyTask)(throttler_config);
154 }
155 }
156