Scheduler

Implementation of the IScheduler API

Members

Functions

await
void await(Task task, void delegate(Task) finished_dg)

Schedules the argument and suspends calling task until the argument finishes.

awaitOrTimeout
bool awaitOrTimeout(Task task, uint micro_seconds)

Similar to await but also has waiting timeout. Calling task will be resumed either if awaited task finished or timeout is hit, whichever happens first.

awaitResult
typeof(TaskT.result) awaitResult(TaskT task)

Convenience shortcut on top of await to await for a task and return some value type as a result.

delayedResume
void delayedResume(Task task)

Orders scheduler to resume given task unconditionally after current epoll cycle. Must be used instead of plain Task.resume from termination hooks of other tasks.

epoll
EpollSelectDispatcher epoll()

Getter for scheduler epoll instance. Necessary for integration with ISelectClient utilities so that new select client can be registered.

eventLoop
void eventLoop()

Starts pseudo-infinite event loop. Event loop will keep running as long as there is at least one event registered.

getSpecializedPoolStats
Optional!(SpecializedPoolStats) getSpecializedPoolStats(cstring name)
getStats
SchedulerStats getStats()

Provides load stats for the scheduler

processEvents
void processEvents()

Suspends current fiber temporarily, allowing pending events to be processed. Current fiber will be resumed as soon as no immediate events are left.

queue
void queue(Task task)

Method used to queue the task for later execution.

schedule
void schedule(Task task)

Method used to execute new task.

shutdown
void shutdown()

Forces early termination of all active tasks and shuts down event loop.

task_queue_full_cb
void task_queue_full_cb(TaskQueueFullCB dg)

Set delegate to call each time task is attempted to be queue but size limit is reached. Both the queue and the task will be supplied as arguments.

task_queue_full_cb
TaskQueueFullCB task_queue_full_cb()

Gets current callback for case of full queue

Variables

exception_handler
void delegate(Task, Exception) exception_handler;

Called each time task terminates with an exception when being run in context of the scheduler or the event loop.

select_cycle_hook_registered
bool select_cycle_hook_registered;

Indicates if this.selectCycleHook is already registered for the next epoll cycle.

Inherited Members

From IScheduler

Configuration
struct Configuration

Aggregation of various size limits used internally by scheduler. Instance of this struct is used as initScheduler argument.

Stats
struct Stats

Aggregate of various statistics that indicate overall scheduler load and performance.

SpecializedPoolStats
struct SpecializedPoolStats

Usage stats of a single specialized task pool

epoll
EpollSelectDispatcher epoll()

Getter for scheduler epoll instance. Necessary for integration with ISelectClient utilities so that new select client can be registered.

shutdown
void shutdown()

Forces early termination of all active tasks and shuts down event loop.

getStats
Stats getStats()

Provides load stats for the scheduler

getSpecializedPoolStats
Optional!(SpecializedPoolStats) getSpecializedPoolStats(cstring name)
schedule
void schedule(Task task)

Method used to execute new task.

queue
void queue(Task task)

Method used to queue the task for later execution.

await
void await(Task task, void delegate(Task) finished_dg)

Schedules the argument and suspends calling task until the argument finishes.

awaitResult
typeof(TaskT.result) awaitResult(TaskT task)

Convenience shortcut on top of await to await for a task and return some value type as a result.

awaitOrTimeout
bool awaitOrTimeout(Task task, uint micro_seconds)

Similar to await but also has waiting timeout. Calling task will be resumed either if awaited task finished or timeout is hit, whichever happens first.

delayedResume
void delayedResume(Task task)

Orders scheduler to resume given task unconditionally after current epoll cycle. Must be used instead of plain Task.resume from termination hooks of other tasks.

processEvents
void processEvents()

Suspends current fiber temporarily, allowing pending events to be processed. Current fiber will be resumed as soon as no immediate events are left.

eventLoop
void eventLoop()

Starts pseudo-infinite event loop. Event loop will keep running as long as there is at least one event registered.

Examples

1 // mandatory scheduler initialziation, should happen only once in the app
2 SchedulerConfiguration config;
3 initScheduler(config);
4 
5 // example custom task type - its `run` method will be executed
6 // within the context of the worker fiber to which it is assigned
7 class TestTask : Task
8 {
9     static size_t started;
10     static size_t recycled;
11 
12     override public void run ( )
13     {
14         ++TestTask.started;
15 
16         static immutable very_long_loop = 5;
17 
18         for (int i = 0; i < very_long_loop; ++i)
19         {
20             // sometimes a task has to do lengthy computations
21             // without any I/O like requests to remote servers. To avoid
22             // completely blocking all other tasks, it should call the
23             // `processEvents` method to pause briefly:
24             theScheduler.processEvents();
25         }
26     }
27 
28     override public void recycle ( )
29     {
30         ++TestTask.recycled;
31     }
32 }
33 
34 // a task that can be processed by an idle worker fiber from the scheduler's
35 // pool will be run immediately upon scheduling and yield on a call to
36 // `processEvents`, keeping their assigned worker fibers busy:
37 for (int i = 0; i < config.worker_fiber_limit; ++i)
38     theScheduler.schedule(new TestTask);
39 
40 // new tasks scheduled while all worker fibers are busy will get pushed into
41 // the scheduler's task queue to be run later on, when a worker fiber
42 // finishes its current job:
43 for (int i = 0; i < config.task_queue_limit; ++i)
44     theScheduler.schedule(new TestTask);
45 
46 // when the task queue is full, any new scheduling attempt will result
47 // in a TaskQueueFullException being thrown. However, it is possible to
48 // specify a custom callback to handle the situation instead:
49 theScheduler.task_queue_full_cb = ( Task, FixedRingQueue!(Task) ) { };
50 theScheduler.schedule(new TestTask); // will do nothing now
51 
52 // all worker fibers are still waiting, suspended, now as the scheduler loop
53 // isn't running, so there was nothing to resume them
54 test!("==")(TestTask.started, config.worker_fiber_limit);
55 test!("==")(TestTask.recycled, 0);
56 
57 // in a real application, this call will block the main thread for the
58 // duration of the application and most resuming will be done by epoll
59 // events
60 theScheduler.eventLoop();
61 test!("==")(TestTask.recycled,
62     config.worker_fiber_limit + config.task_queue_limit);

Meta