1 /******************************************************************************* 2 3 Utility functions for loading and dumping task pools for preserving 4 tasks between application restarts. 5 6 To use the serialization and deserialization funtionality the derived 7 task must implement 'serialize' and 'deserialize'. 8 9 public void serialize ( ref void[] buffer ) 10 11 public void deserialize ( void[] buffer ) 12 13 See usage example in the unit test for example implementation. 14 15 Copyright: 16 Copyright (c) 2009-2016 dunnhumby Germany GmbH. 17 All rights reserved. 18 19 License: 20 Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. 21 Alternatively, this file may be distributed under the terms of the Tango 22 3-Clause BSD License (see LICENSE_BSD.txt for details). 23 24 *******************************************************************************/ 25 26 module ocean.task.util.TaskPoolSerializer; 27 28 /******************************************************************************* 29 30 Utility functions for loading and dumping task pools for preserving 31 tasks between application restarts. 32 33 *******************************************************************************/ 34 35 public class TaskPoolSerializer 36 { 37 import ocean.transition; 38 39 import ocean.core.Array: concat; 40 import ocean.core.Enforce; 41 import ocean.core.TypeConvert; 42 import ocean.meta.traits.Aggregates /* : hasMethod */; 43 import ocean.io.device.File; 44 import ocean.io.model.IConduit; 45 import ocean.io.FilePath; 46 import ocean.io.serialize.SimpleStreamSerializer; 47 import ocean.task.Task; 48 49 /*************************************************************************** 50 51 Reusable serialization and deserialization buffer for tasks. 52 53 ***************************************************************************/ 54 55 private void[] serialize_buffer; 56 57 /*************************************************************************** 58 59 Temporary file path to be used while dumping tasks to disk. 60 61 ***************************************************************************/ 62 63 private mstring temp_dump_file_path; 64 65 /*************************************************************************** 66 67 Dump the current contents of the task pool to disk. 68 If the task pool is empty then no file will be created. 69 70 Uses `ocean.io.device.File` instead of `ocean.io.device.TempFile` due to 71 issues copying across partitions in the AUFS storage driver. 72 73 Params: 74 task_pool = The task pool to dump to disk. 75 dump_file_path = Dump current active tasks to the file path. 76 77 Returns: 78 The number of items dumped to disk. 79 80 Throws: 81 Exception on failure to create file. 82 83 ***************************************************************************/ 84 85 public size_t dump ( TaskPoolT ) ( TaskPoolT task_pool, cstring dump_file_path ) 86 { 87 if ( task_pool.num_busy == 0 ) return 0; 88 89 concat(this.temp_dump_file_path, dump_file_path, ".tmp"); 90 scope file = new File(this.temp_dump_file_path, File.WriteCreate); 91 92 auto items_written = this.dump(task_pool, file); 93 file.sync(); 94 file.close(); 95 FilePath(this.temp_dump_file_path).rename(dump_file_path); 96 return items_written; 97 } 98 99 /*************************************************************************** 100 101 Dump the current contents of the task pool to the output stream. 102 103 Params: 104 task_pool = The task pool to dump to the output stream. 105 stream = The stream to dump tasks to. 106 107 ***************************************************************************/ 108 109 public size_t dump ( TaskPoolT ) ( TaskPoolT task_pool, OutputStream stream ) 110 { 111 static assert(hasMethod!(TaskPoolT.TaskType, "serialize", 112 void delegate ( ref void[] ))); 113 114 size_t num_busy = task_pool.num_busy; 115 SimpleStreamSerializerArrays.write(stream, num_busy); 116 117 scope pool_itr = task_pool..new BusyItemsIterator; 118 foreach ( raw_task; pool_itr ) 119 { 120 this.serialize_buffer.length = 0; 121 enableStomping(this.serialize_buffer); 122 123 auto task = downcast!(TaskPoolT.TaskType)(raw_task); 124 assert(task); 125 task.serialize(this.serialize_buffer); 126 127 SimpleStreamSerializerArrays.write(stream, this.serialize_buffer); 128 } 129 130 return num_busy; 131 } 132 133 /*************************************************************************** 134 135 Loads serialized tasks from disk. Does nothing if no file exists. 136 137 Params: 138 task_pool = The task pool that the tasks will be loaded in to. 139 load_file_path = The file path of the file to load. 140 141 Returns: 142 The number of items loaded from the file. 143 144 Throws: 145 When serialized data has been corrupted and expected number of 146 items is not the amount read. 147 148 ***************************************************************************/ 149 150 public size_t load ( TaskPoolT ) ( TaskPoolT task_pool, cstring load_file_path ) 151 { 152 if ( !FilePath(load_file_path).exists ) return 0; 153 154 scope file = new File(load_file_path); 155 scope ( success ) 156 FilePath(load_file_path).remove(); 157 158 return this.load(task_pool, file); 159 } 160 161 /*************************************************************************** 162 163 Restores tasks from the InputStream to the TaskPool. 164 165 Params: 166 task_pool = The task pool that the tasks will be loaded in to. 167 stream = InputStream containing the serialized tasks. 168 169 Returns: 170 The number of tasks loaded from the stream. 171 172 Throws: 173 When serialized data has been corrupted and expected number of 174 items is not the amount read. 175 176 ***************************************************************************/ 177 178 public size_t load ( TaskPoolT ) ( TaskPoolT task_pool, InputStream stream ) 179 { 180 static assert(is(typeof(TaskPoolT.TaskType.deserialize)), 181 "Must contain `deserialize` method for restoring"); 182 183 size_t total_items; 184 185 SimpleStreamSerializerArrays.read(stream, total_items); 186 187 size_t len, tasks_loaded; 188 189 while ( tasks_loaded < total_items ) 190 { 191 SimpleStreamSerializerArrays.read(stream, this.serialize_buffer); 192 task_pool.restore(this.serialize_buffer); 193 ++tasks_loaded; 194 } 195 196 enforce!("==")(tasks_loaded, total_items); 197 return tasks_loaded; 198 } 199 200 }