1 /******************************************************************************* 2 3 Copyright: 4 Copyright (c) 20017 dunnhumby Germany GmbH. 5 All rights reserved. 6 7 License: 8 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 9 Alternatively, this file may be distributed under the terms of the Tango 10 3-Clause BSD License (see LICENSE_BSD.txt for details). 11 12 *******************************************************************************/ 13 14 module ocean.task.util.StreamProcessor_test; 15 16 17 import ocean.task.Task; 18 import ocean.task.Scheduler; 19 import ocean.task.util.StreamProcessor; 20 import ocean.task.util.Timer; 21 22 import ocean.io.model.ISuspendable; 23 import ocean.io.Stdout; 24 import ocean.io.select.client.TimerEvent; 25 26 import ocean.core.Test; 27 import ocean.core.Verify; 28 29 /******************************************************************************* 30 31 Example of processing task, one which is managed by StreamProcessor and 32 gets scheduled for each new record arriving from Generator 33 34 *******************************************************************************/ 35 36 class ProcessingTask : Task 37 { 38 int x; 39 40 static int total; 41 42 void copyArguments ( int x ) 43 { 44 this.x = x; 45 } 46 47 override void run ( ) 48 { 49 .wait(100); 50 ++total; 51 52 if (this.x == 1000) 53 theScheduler.shutdown(); 54 } 55 } 56 57 /******************************************************************************* 58 59 "stream" or "generator" class, one which keeps producing new records for 60 processing at throttled rate 61 62 *******************************************************************************/ 63 64 class Generator : ISuspendable 65 { 66 TimerEvent timer; 67 StreamProcessor!(ProcessingTask) stream_processor; 68 int counter; 69 70 this ( ) 71 { 72 this.timer = new TimerEvent(&this.generate); 73 74 auto throttler_config = ThrottlerConfig(10, 1); 75 this.stream_processor = new StreamProcessor!(ProcessingTask)(throttler_config); 76 this.stream_processor.addStream(this); 77 } 78 79 void start ( ) 80 { 81 this.timer.set(0, 1, 0, 1); 82 this.resume(); 83 } 84 85 override void resume ( ) 86 { 87 theScheduler.epoll.register(this.timer); 88 } 89 90 override void suspend ( ) 91 { 92 theScheduler.epoll.unregister(this.timer); 93 } 94 95 override bool suspended ( ) 96 { 97 return this.timer.is_registered; 98 } 99 100 bool generate ( ) 101 { 102 this.stream_processor.process(this.counter); 103 ++this.counter; 104 return true; 105 } 106 } 107 108 unittest 109 { 110 SchedulerConfiguration config; 111 config.worker_fiber_limit = 10; 112 config.task_queue_limit = 30; 113 initScheduler(config); 114 115 auto generator = new Generator; 116 generator.start(); 117 theScheduler.eventLoop(); 118 119 // exact number of tasks that will be processed before the shutdown 120 // may vary but it must always be at most 1000 + task_queue_limit 121 test!(">=")(ProcessingTask.total, 1000); 122 test!("<=")(ProcessingTask.total, 1000 + config.task_queue_limit); 123 } 124 125 unittest 126 { 127 SchedulerConfiguration config; 128 initScheduler(config); 129 130 static class DummyTask : Task 131 { 132 override public void run ( ) { } 133 public void copyArguments ( ) { } 134 } 135 136 { 137 // suspend point >= task queue 138 auto throttler_config = ThrottlerConfig(config.task_queue_limit, 1); 139 testThrown!(ThrottlerFailureException)(new StreamProcessor!(DummyTask)( 140 throttler_config)); 141 } 142 143 { 144 // resume point >= task queue 145 auto throttler_config = ThrottlerConfig(1, config.task_queue_limit); 146 testThrown!(ThrottlerFailureException)(new StreamProcessor!(DummyTask)( 147 throttler_config)); 148 } 149 150 { 151 // works 152 auto throttler_config = ThrottlerConfig(config.task_queue_limit - 1, 1); 153 auto processor = new StreamProcessor!(DummyTask)(throttler_config); 154 } 155 } 156