1 module ocean.task.util.TaskSuspender_test; 2 3 import ocean.task.util.TaskSuspender; 4 import ocean.task.util.StreamProcessor; 5 import ocean.task.Task; 6 import ocean.task.Scheduler; 7 8 import ocean.core.Test; 9 10 class Processor : Task 11 { 12 int x; 13 14 void copyArguments ( int x ) 15 { 16 this.x = x; 17 } 18 19 override void run ( ) 20 { 21 if (x == 100) 22 theScheduler.shutdown(); 23 theScheduler.processEvents(); 24 } 25 } 26 27 class Generator : Task 28 { 29 StreamProcessor!(Processor) sp; 30 int i; 31 32 this ( ) 33 { 34 this.sp = new typeof(this.sp)(ThrottlerConfig.init); 35 } 36 37 override void run ( ) 38 { 39 while (true) 40 { 41 this.sp.process(i); 42 ++i; 43 } 44 } 45 } 46 47 unittest 48 { 49 initScheduler(SchedulerConfiguration.init); 50 auto generator = new Generator; 51 generator.sp.addStream(new TaskSuspender(generator)); 52 theScheduler.queue(generator); 53 theScheduler.eventLoop(); 54 test!(">=")(generator.i, 100); 55 test!("<=")(generator.i, 100 + SchedulerConfiguration.init.task_queue_limit); 56 }