1 /******************************************************************************* 2 3 Utility functions for loading and dumping task pools for preserving 4 tasks between application restarts. 5 6 To use the serialization and deserialization funtionality the derived 7 task must implement 'serialize' and 'deserialize'. 8 9 public void serialize ( ref void[] buffer ) 10 11 public void deserialize ( void[] buffer ) 12 13 See usage example in the unit test for example implementation. 14 15 Copyright: 16 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 17 All rights reserved. 18 19 License: 20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 21 Alternatively, this file may be distributed under the terms of the Tango 22 3-Clause BSD License (see LICENSE_BSD.txt for details). 23 24 *******************************************************************************/ 25 26 module ocean.task.util.TaskPoolSerializer; 27 28 /******************************************************************************* 29 30 Utility functions for loading and dumping task pools for preserving 31 tasks between application restarts. 32 33 *******************************************************************************/ 34 35 public class TaskPoolSerializer 36 { 37 import ocean.meta.types.Qualifiers; 38 39 import ocean.core.Array: concat; 40 import ocean.core.Enforce; 41 import ocean.core.TypeConvert; 42 import ocean.meta.traits.Aggregates /* : hasMethod */; 43 import ocean.io.device.File; 44 import ocean.io.model.IConduit; 45 import ocean.io.FilePath; 46 import ocean.io.serialize.SimpleStreamSerializer; 47 import ocean.task.Task; 48 49 /*************************************************************************** 50 51 Reusable serialization and deserialization buffer for tasks. 52 53 ***************************************************************************/ 54 55 private void[] serialize_buffer; 56 57 /*************************************************************************** 58 59 Temporary file path to be used while dumping tasks to disk. 60 61 ***************************************************************************/ 62 63 private mstring temp_dump_file_path; 64 65 /*************************************************************************** 66 67 Dump the current contents of the task pool to disk. 68 If the task pool is empty then no file will be created. 69 70 Uses `ocean.io.device.File` instead of `ocean.io.device.TempFile` due to 71 issues copying across partitions in the AUFS storage driver. 72 73 Params: 74 task_pool = The task pool to dump to disk. 75 dump_file_path = Dump current active tasks to the file path. 76 77 Returns: 78 The number of items dumped to disk. 79 80 Throws: 81 Exception on failure to create file. 82 83 ***************************************************************************/ 84 85 public size_t dump ( TaskPoolT ) ( TaskPoolT task_pool, cstring dump_file_path ) 86 { 87 if ( task_pool.num_busy == 0 ) return 0; 88 89 concat(this.temp_dump_file_path, dump_file_path, ".tmp"); 90 scope file = new File(this.temp_dump_file_path, File.WriteCreate); 91 92 auto items_written = this.dump(task_pool, file); 93 file.sync(); 94 file.close(); 95 FilePath(this.temp_dump_file_path).rename(dump_file_path); 96 return items_written; 97 } 98 99 /*************************************************************************** 100 101 Dump the current contents of the task pool to the output stream. 102 103 Params: 104 task_pool = The task pool to dump to the output stream. 105 stream = The stream to dump tasks to. 106 107 ***************************************************************************/ 108 109 public size_t dump ( TaskPoolT ) ( TaskPoolT task_pool, OutputStream stream ) 110 { 111 static assert(hasMethod!(TaskPoolT.TaskType, "serialize", 112 void delegate ( ref void[] ))); 113 114 size_t num_busy = task_pool.num_busy; 115 SimpleStreamSerializerArrays.write(stream, num_busy); 116 117 scope pool_itr = task_pool..new BusyItemsIterator; 118 foreach ( raw_task; pool_itr ) 119 { 120 this.serialize_buffer.length = 0; 121 assumeSafeAppend(this.serialize_buffer); 122 123 auto task = downcast!(TaskPoolT.TaskType)(raw_task); 124 assert(task); 125 task.serialize(this.serialize_buffer); 126 127 SimpleStreamSerializerArrays.write(stream, this.serialize_buffer); 128 } 129 130 return num_busy; 131 } 132 133 /*************************************************************************** 134 135 Loads serialized tasks from disk. Does nothing if no file exists. 136 137 Params: 138 task_pool = The task pool that the tasks will be loaded in to. 139 load_file_path = The file path of the file to load. 140 args = Parameters matching the task's 'deserialize()' excluding 141 the deserialized buffer itself. 142 143 Returns: 144 The number of items loaded from the file. 145 146 Throws: 147 When serialized data has been corrupted and expected number of 148 items is not the amount read. 149 150 ***************************************************************************/ 151 152 public size_t load ( TaskPoolT, Args ... ) ( TaskPoolT task_pool, 153 cstring load_file_path, Args args ) 154 { 155 if ( !FilePath(load_file_path).exists ) return 0; 156 157 scope file = new File(load_file_path); 158 scope ( success ) 159 FilePath(load_file_path).remove(); 160 161 return this.load(task_pool, file, args); 162 } 163 164 /*************************************************************************** 165 166 Restores tasks from the InputStream to the TaskPool. 167 168 Params: 169 task_pool = The task pool that the tasks will be loaded in to. 170 stream = InputStream containing the serialized tasks. 171 args = Parameters matching the task's 'deserialize()' excluding 172 the deserialized buffer itself. 173 174 Returns: 175 The number of tasks loaded from the stream. 176 177 Throws: 178 When serialized data has been corrupted and expected number of 179 items is not the amount read. 180 181 ***************************************************************************/ 182 183 public size_t load ( TaskPoolT, Args ... ) ( TaskPoolT task_pool, 184 InputStream stream, Args args) 185 { 186 static assert(is(typeof(TaskPoolT.TaskType.deserialize)), 187 "Must contain `deserialize` method for restoring"); 188 189 size_t total_items; 190 191 SimpleStreamSerializerArrays.read(stream, total_items); 192 193 size_t len, tasks_loaded; 194 195 while ( tasks_loaded < total_items ) 196 { 197 SimpleStreamSerializerArrays.read(stream, this.serialize_buffer); 198 task_pool.restore(this.serialize_buffer, args); 199 ++tasks_loaded; 200 } 201 202 enforce!("==")(tasks_loaded, total_items); 203 return tasks_loaded; 204 } 205 }