1 /*******************************************************************************
2 
3     Utility functions for loading and dumping task pools for preserving
4     tasks between application restarts.
5 
6     To use the serialization and deserialization funtionality the derived
7     task must implement 'serialize' and 'deserialize'.
8 
9     public void serialize ( ref void[] buffer )
10 
11     public void deserialize ( void[] buffer )
12 
13     See usage example in the unit test for example implementation.
14 
15     Copyright:
16         Copyright (c) 2009-2016 dunnhumby Germany GmbH.
17         All rights reserved.
18 
19     License:
20         Boost Software License Version 1.0. See LICENSE_BOOST.txt for details.
21         Alternatively, this file may be distributed under the terms of the Tango
22         3-Clause BSD License (see LICENSE_BSD.txt for details).
23 
24 *******************************************************************************/
25 
26 module ocean.task.util.TaskPoolSerializer;
27 
28 /*******************************************************************************
29 
30     Utility functions for loading and dumping task pools for preserving
31     tasks between application restarts.
32 
33 *******************************************************************************/
34 
35 public class TaskPoolSerializer
36 {
37     import ocean.transition;
38 
39     import ocean.core.Array: concat;
40     import ocean.core.Enforce;
41     import ocean.core.TypeConvert;
42     import ocean.meta.traits.Aggregates /* : hasMethod */;
43     import ocean.io.device.File;
44     import ocean.io.model.IConduit;
45     import ocean.io.FilePath;
46     import ocean.io.serialize.SimpleStreamSerializer;
47     import ocean.task.Task;
48 
49     /***************************************************************************
50 
51         Reusable serialization and deserialization buffer for tasks.
52 
53     ***************************************************************************/
54 
55     private void[] serialize_buffer;
56 
57     /***************************************************************************
58 
59         Temporary file path to be used while dumping tasks to disk.
60 
61     ***************************************************************************/
62 
63     private mstring temp_dump_file_path;
64 
65     /***************************************************************************
66 
67         Dump the current contents of the task pool to disk.
68         If the task pool is empty then no file will be created.
69 
70         Uses `ocean.io.device.File` instead of `ocean.io.device.TempFile` due to
71         issues copying across partitions in the AUFS storage driver.
72 
73         Params:
74             task_pool = The task pool to dump to disk.
75             dump_file_path = Dump current active tasks to the file path.
76 
77         Returns:
78             The number of items dumped to disk.
79 
80         Throws:
81             Exception on failure to create file.
82 
83     ***************************************************************************/
84 
85     public size_t dump ( TaskPoolT ) ( TaskPoolT task_pool, cstring dump_file_path )
86     {
87         if ( task_pool.num_busy == 0 ) return 0;
88 
89         concat(this.temp_dump_file_path, dump_file_path, ".tmp");
90         scope file = new File(this.temp_dump_file_path, File.WriteCreate);
91 
92         auto items_written = this.dump(task_pool, file);
93         file.sync();
94         file.close();
95         FilePath(this.temp_dump_file_path).rename(dump_file_path);
96         return items_written;
97     }
98 
99     /***************************************************************************
100 
101         Dump the current contents of the task pool to the output stream.
102 
103         Params:
104             task_pool = The task pool to dump to the output stream.
105             stream = The stream to dump tasks to.
106 
107     ***************************************************************************/
108 
109     public size_t dump ( TaskPoolT ) ( TaskPoolT task_pool, OutputStream stream )
110     {
111         static assert(hasMethod!(TaskPoolT.TaskType, "serialize",
112                       void delegate ( ref void[] )));
113 
114         size_t num_busy = task_pool.num_busy;
115         SimpleStreamSerializerArrays.write(stream, num_busy);
116 
117         scope pool_itr = task_pool..new BusyItemsIterator;
118         foreach ( raw_task; pool_itr )
119         {
120             this.serialize_buffer.length = 0;
121             enableStomping(this.serialize_buffer);
122 
123             auto task = downcast!(TaskPoolT.TaskType)(raw_task);
124             assert(task);
125             task.serialize(this.serialize_buffer);
126 
127             SimpleStreamSerializerArrays.write(stream, this.serialize_buffer);
128         }
129 
130         return num_busy;
131     }
132 
133     /***************************************************************************
134 
135         Loads serialized tasks from disk. Does nothing if no file exists.
136 
137         Params:
138             task_pool = The task pool that the tasks will be loaded in to.
139             load_file_path = The file path of the file to load.
140 
141         Returns:
142             The number of items loaded from the file.
143 
144         Throws:
145             When serialized data has been corrupted and expected number of
146             items is not the amount read.
147 
148     ***************************************************************************/
149 
150     public size_t load ( TaskPoolT ) ( TaskPoolT task_pool, cstring load_file_path )
151     {
152         if ( !FilePath(load_file_path).exists ) return 0;
153 
154         scope file = new File(load_file_path);
155         scope ( success )
156             FilePath(load_file_path).remove();
157 
158         return this.load(task_pool, file);
159     }
160 
161     /***************************************************************************
162 
163         Restores tasks from the InputStream to the TaskPool.
164 
165         Params:
166             task_pool = The task pool that the tasks will be loaded in to.
167             stream = InputStream containing the serialized tasks.
168 
169         Returns:
170             The number of tasks loaded from the stream.
171 
172         Throws:
173             When serialized data has been corrupted and expected number of
174             items is not the amount read.
175 
176     ***************************************************************************/
177 
178     public size_t load ( TaskPoolT ) ( TaskPoolT task_pool, InputStream stream )
179     {
180         static assert(is(typeof(TaskPoolT.TaskType.deserialize)),
181             "Must contain `deserialize` method for restoring");
182 
183         size_t total_items;
184 
185         SimpleStreamSerializerArrays.read(stream, total_items);
186 
187         size_t len, tasks_loaded;
188 
189         while ( tasks_loaded < total_items )
190         {
191             SimpleStreamSerializerArrays.read(stream, this.serialize_buffer);
192             task_pool.restore(this.serialize_buffer);
193             ++tasks_loaded;
194         }
195 
196         enforce!("==")(tasks_loaded, total_items);
197         return tasks_loaded;
198     }
199 
200 }