1 /*******************************************************************************
2 
3     Utility functions for loading and dumping task pools for preserving
4     tasks between application restarts.
5 
6     To use the serialization and deserialization funtionality the derived
7     task must implement 'serialize' and 'deserialize'.
8 
9     public void serialize ( ref void[] buffer )
10 
11     public void deserialize ( void[] buffer )
12 
13     See usage example in the unit test for example implementation.
14 
15     Copyright:
16         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17         All rights reserved.
18 
19     License:
20         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21         Alternatively, this file may be distributed under the terms of the Tango
22         3-Clause BSD License (see LICENSE_BSD.txt for details).
23 
24 *******************************************************************************/
25 
26 module ocean.task.util.TaskPoolSerializer;
27 
28 /*******************************************************************************
29 
30     Utility functions for loading and dumping task pools for preserving
31     tasks between application restarts.
32 
33 *******************************************************************************/
34 
35 public class TaskPoolSerializer
36 {
37     import ocean.meta.types.Qualifiers;
38 
39     import ocean.core.Array: concat;
40     import ocean.core.Enforce;
41     import ocean.core.TypeConvert;
42     import ocean.meta.traits.Aggregates /* : hasMethod */;
43     import ocean.io.device.File;
44     import ocean.io.model.IConduit;
45     import ocean.io.FilePath;
46     import ocean.io.serialize.SimpleStreamSerializer;
47     import ocean.task.Task;
48 
49     /***************************************************************************
50 
51         Reusable serialization and deserialization buffer for tasks.
52 
53     ***************************************************************************/
54 
55     private void[] serialize_buffer;
56 
57     /***************************************************************************
58 
59         Temporary file path to be used while dumping tasks to disk.
60 
61     ***************************************************************************/
62 
63     private mstring temp_dump_file_path;
64 
65     /***************************************************************************
66 
67         Dump the current contents of the task pool to disk.
68         If the task pool is empty then no file will be created.
69 
70         Uses `ocean.io.device.File` instead of `ocean.io.device.TempFile` due to
71         issues copying across partitions in the AUFS storage driver.
72 
73         Params:
74             task_pool = The task pool to dump to disk.
75             dump_file_path = Dump current active tasks to the file path.
76 
77         Returns:
78             The number of items dumped to disk.
79 
80         Throws:
81             Exception on failure to create file.
82 
83     ***************************************************************************/
84 
85     public size_t dump ( TaskPoolT ) ( TaskPoolT task_pool, cstring dump_file_path )
86     {
87         if ( task_pool.num_busy == 0 ) return 0;
88 
89         concat(this.temp_dump_file_path, dump_file_path, ".tmp");
90         scope file = new File(this.temp_dump_file_path, File.WriteCreate);
91 
92         auto items_written = this.dump(task_pool, file);
93         file.sync();
94         file.close();
95         FilePath(this.temp_dump_file_path).rename(dump_file_path);
96         return items_written;
97     }
98 
99     /***************************************************************************
100 
101         Dump the current contents of the task pool to the output stream.
102 
103         Params:
104             task_pool = The task pool to dump to the output stream.
105             stream = The stream to dump tasks to.
106 
107     ***************************************************************************/
108 
109     public size_t dump ( TaskPoolT ) ( TaskPoolT task_pool, OutputStream stream )
110     {
111         static assert(hasMethod!(TaskPoolT.TaskType, "serialize",
112                       void delegate ( ref void[] )));
113 
114         size_t num_busy = task_pool.num_busy;
115         SimpleStreamSerializerArrays.write(stream, num_busy);
116 
117         scope pool_itr = task_pool..new BusyItemsIterator;
118         foreach ( raw_task; pool_itr )
119         {
120             this.serialize_buffer.length = 0;
121             assumeSafeAppend(this.serialize_buffer);
122 
123             auto task = downcast!(TaskPoolT.TaskType)(raw_task);
124             assert(task);
125             task.serialize(this.serialize_buffer);
126 
127             SimpleStreamSerializerArrays.write(stream, this.serialize_buffer);
128         }
129 
130         return num_busy;
131     }
132 
133     /***************************************************************************
134 
135         Loads serialized tasks from disk. Does nothing if no file exists.
136 
137         Params:
138             task_pool = The task pool that the tasks will be loaded in to.
139             load_file_path = The file path of the file to load.
140             args = Parameters matching the task's 'deserialize()' excluding
141                    the deserialized buffer itself.
142 
143         Returns:
144             The number of items loaded from the file.
145 
146         Throws:
147             When serialized data has been corrupted and expected number of
148             items is not the amount read.
149 
150     ***************************************************************************/
151 
152     public size_t load ( TaskPoolT, Args ... ) ( TaskPoolT task_pool,
153         cstring load_file_path, Args args )
154     {
155         if ( !FilePath(load_file_path).exists ) return 0;
156 
157         scope file = new File(load_file_path);
158         scope ( success )
159             FilePath(load_file_path).remove();
160 
161         return this.load(task_pool, file, args);
162     }
163 
164     /***************************************************************************
165 
166         Restores tasks from the InputStream to the TaskPool.
167 
168         Params:
169             task_pool = The task pool that the tasks will be loaded in to.
170             stream = InputStream containing the serialized tasks.
171             args = Parameters matching the task's 'deserialize()' excluding
172                    the deserialized buffer itself.
173 
174         Returns:
175             The number of tasks loaded from the stream.
176 
177         Throws:
178             When serialized data has been corrupted and expected number of
179             items is not the amount read.
180 
181     ***************************************************************************/
182 
183     public size_t load ( TaskPoolT, Args ... ) ( TaskPoolT task_pool,
184         InputStream stream, Args args)
185     {
186         static assert(is(typeof(TaskPoolT.TaskType.deserialize)),
187             "Must contain `deserialize` method for restoring");
188 
189         size_t total_items;
190 
191         SimpleStreamSerializerArrays.read(stream, total_items);
192 
193         size_t len, tasks_loaded;
194 
195         while ( tasks_loaded < total_items )
196         {
197             SimpleStreamSerializerArrays.read(stream, this.serialize_buffer);
198             task_pool.restore(this.serialize_buffer, args);
199             ++tasks_loaded;
200         }
201 
202         enforce!("==")(tasks_loaded, total_items);
203         return tasks_loaded;
204     }
205 }