1 module ocean.task.util.TaskSuspender_test;
2 
3 import ocean.task.util.TaskSuspender;
4 import ocean.task.util.StreamProcessor;
5 import ocean.task.Task;
6 import ocean.task.Scheduler;
7 
8 import ocean.core.Test;
9 
10 class Processor : Task
11 {
12     int x;
13 
14     void copyArguments ( int x )
15     {
16         this.x = x;
17     }
18 
19     override void run ( )
20     {
21         if (x == 100)
22             theScheduler.shutdown();
23         theScheduler.processEvents();
24     }
25 }
26 
27 class Generator : Task
28 {
29     StreamProcessor!(Processor) sp;
30     int i;
31 
32     this ( )
33     {
34         this.sp = new typeof(this.sp)(ThrottlerConfig.init);
35     }
36 
37     override void run ( )
38     {
39         while (true)
40         {
41             this.sp.process(i);
42             ++i;
43         }
44     }
45 }
46 
47 unittest
48 {
49     initScheduler(SchedulerConfiguration.init);
50     auto generator = new Generator;
51     generator.sp.addStream(new TaskSuspender(generator));
52     theScheduler.queue(generator);
53     theScheduler.eventLoop();
54     test!(">=")(generator.i, 100);
55     test!("<=")(generator.i, 100 + SchedulerConfiguration.init.task_queue_limit);
56 }