1 module ocean.task.util.TaskSuspender_test;
2
3 import ocean.task.util.TaskSuspender;
4 import ocean.task.util.StreamProcessor;
5 import ocean.task.Task;
6 import ocean.task.Scheduler;
7
8 import ocean.core.Test;
9
10 class Processor : Task
11 {
12 int x;
13
14 void copyArguments ( int x )
15 {
16 this.x = x;
17 }
18
19 override void run ( )
20 {
21 if (x == 100)
22 theScheduler.shutdown();
23 theScheduler.processEvents();
24 }
25 }
26
27 class Generator : Task
28 {
29 StreamProcessor!(Processor) sp;
30 int i;
31
32 this ( )
33 {
34 this.sp = new typeof(this.sp)(ThrottlerConfig.init);
35 }
36
37 override void run ( )
38 {
39 while (true)
40 {
41 this.sp.process(i);
42 ++i;
43 }
44 }
45 }
46
47 unittest
48 {
49 initScheduler(SchedulerConfiguration.init);
50 auto generator = new Generator;
51 generator.sp.addStream(new TaskSuspender(generator));
52 theScheduler.queue(generator);
53 theScheduler.eventLoop();
54 test!(">=")(generator.i, 100);
55 test!("<=")(generator.i, 100 + SchedulerConfiguration.init.task_queue_limit);
56 }