sections.tex (23567B)
1 \section{Methods} 2 3 \subsection{Model of computation} 4 5 To infer fault tolerance model which is suitable for big data applications we 6 use bulk-synchronous parallel model~\citep{valiant1990bridging} as the basis. 7 This model assumes that a parallel programme is composed of a number of 8 sequential steps that are internally parallel, and global synchronisation of 9 all parallel processes occurs after each step. In our model all sequential 10 steps are pipelined where it is possible. The evolution of the computational 11 model is described as follows. 12 13 Given a programme that is sequential and large enough to be decomposed into a 14 number of sequential steps, the simplest way to make it run faster is to 15 exploit data parallelism. Usually it means finding multi-dimensional arrays and 16 loops that access their elements and trying to make them parallel. After 17 transforming the loops the programme will still have the same number of 18 sequential steps, but every step will (ideally) be internally parallel. 19 20 After that the only possibility to speedup the programme is to overlap 21 execution of code blocks that work with different hardware devices. The most 22 common pattern is to overlap computation with network or disk I/O. This 23 approach makes sense because all devices operate with little synchronisation, 24 and issuing commands in parallel makes the whole programme perform better. This 25 behaviour can be achieved by allocating a separate task queue for each device 26 and submitting tasks to these queues asynchronously with execution of the main 27 thread. After this optimisation the programme will be composed of a number of 28 steps chained into the pipeline, each step is implemented as a task queue for a 29 particular device. 30 31 Pipelining of otherwise sequential steps is beneficial not only for the code 32 accessing different devices, but for the code different branches of which are 33 suitable for execution by multiple hardware threads of the same core, i.e. 34 branches accessing different regions of memory or performing mixed arithmetic 35 (floating point and integer). In other words, code branches which use different 36 modules of processor are good candidates to run in parallel on a processor core 37 with multiple hardware threads. 38 39 Even though pipelining may not add parallelism for a programme that uses only 40 one input file (or a set of input parameters), it adds parallelism when the 41 programme can process multiple input files: each input generates tasks which 42 travel through the whole pipeline in parallel with tasks generated by other 43 inputs. With a pipeline an array of files is processed in parallel by the same 44 set of resources allocated for a batch job. The pipeline is likely to deliver 45 greater efficiency for busy HPC clusters compared to executing a separate job 46 for each input file, because the time that each subsequent job spends in the 47 queue is eliminated. A diagram of a pipeline is presented in 48 fig.~\ref{fig:pipeline} and discussed in sec~\ref{sec:results}. 49 50 Computational model with a pipeline can be seen as \emph{bulk-asynchronous 51 model}, because of the parallel nature of otherwise sequential execution steps. 52 This model is the basis of the fault-tolerance model developed here. 53 54 \subsection{Programming model principles} 55 56 Data processing pipeline model is based on the following principles that 57 maximise efficiency of a programme: 58 \begin{itemize} 59 60 \item There is no notion of a message in the model, a kernel is itself a 61 message that can be sent over network to another node and directly access 62 any kernel on the local node. Only programme logic may guarantee the 63 existence of the kernel. 64 65 \item A kernel is a \emph{cooperative routine}, which is submitted to the kernel 66 pool upon the call and is executed asynchronously by the scheduler. There can 67 be any number of calls to other subroutines inside routine body. Every call 68 submits corresponding subroutine to the kernel pool and returns immediately. 69 Kernels in the pool can be executed in any order; this fact is used by the 70 scheduler to exploit parallelism offered by the computer by distributing 71 kernels from the pool across available cluster nodes and processor cores. 72 73 \item Asynchronous execution prevents the use of explicit synchronisation after 74 the call to subroutine is made; the system scheduler returns the control flow 75 to the routine each time one of its subroutine returns. Such cooperation 76 transforms each routine which calls subroutines into an event handler, where 77 each event is a subroutine and the handler is the routine that called them. 78 79 \item The routine may communicate with any number of local kernels, whose 80 addresses it knows; communication with kernels which are not adjacent in the 81 call stack complexifies the control flow and the call stack looses its tree 82 shape. Only the programme logic may guarantee the presence of communicating 83 kernels in memory. One way to ensure this is to perform communication between 84 subroutines which are called from the same routine. Since such communication 85 is possible within the hierarchy through the parent routine, it may be treated 86 as an optimisation that eliminates the overhead of transferring data over an 87 intermediate node. The situation is different for interactive or event-based 88 programmes (e.g. servers and programmes with graphical interface) in which 89 this is primary type of communication. 90 91 \item In addition to this, communication which does not occur along hierarchical 92 links and is executed over the cluster network complexifies the design of 93 resiliency algorithms. Since it is impossible to ensure that a kernel resides 94 in memory of a neighbour node, a node may fail in the middle of its execution 95 of the corresponding routine. As a result, upon a failure of a routine all of 96 its subroutines must be restarted. This encourages a programmer to construct 97 \begin{itemize} 98 \item deep tree hierarchies of tightly coupled kernels (which communicate 99 on the same level of hierarchy) to reduce overhead of recomputation; 100 \item fat tree hierarchies of loosely coupled kernels, providing maximal 101 degree of parallelism. 102 \end{itemize} 103 Deep hierarchy is not only the requirement of technology; it helps optimise 104 communication of large number of cluster nodes reducing it to 105 communication of adjacent nodes. 106 107 \end{itemize} 108 Thus, kernels possess properties of both cooperative routines and event 109 handlers. 110 111 \subsection{Fail over model} 112 113 Although fault-tolerance and high-availability are different terms, in essence 114 they describe the same property --- an ability of a system to switch processing 115 from a failed component to its live spare or backup component. In case of 116 fault-tolerance it is the ability to switch from a failed slave node to a spare 117 one, i.e. to repeat computation step on a healthy slave node. In case of 118 high-availability it is the ability to switch from a failed master node to a 119 backup node with full recovery of execution state. These are the core abilities 120 that constitute distributed system's ability to \emph{fail over}. 121 122 The key feature that is missing in the current parallel programming and big 123 data processing technologies is a possibility to specify hierarchical 124 (parent-child) dependencies between parallel tasks (kernels). When one has such 125 dependency, it is trivial to determine which kernel should be responsible for 126 re-executing a failed kernel on a healthy node. To re-execute the kernel on the 127 top of the hierarchy, its copy is created and executed on a different node 128 (detailed algorithm is discussed in sec.~\ref{sec:master-node-failure}). There 129 exists a number of engines that are capable of executing directed acyclic 130 graphs of kernels in parallel~\citep{acun2014charmpp,islam2012oozie}, but 131 graphs are not suitable to infer parent-child relationship between kernels, 132 because a node in the graph may have multiple parent nodes. 133 134 \subsection{Programming model} 135 136 This work is based on the results of previous research: 137 In~\citep{gankevich2015subordination,gankevich2015iccsa} we developed an 138 algorithm that allows to build a tree hierarchy from strictly ordered set of 139 cluster nodes. The sole purpose of this hierarchy is to make a cluster more 140 fault-tolerant by introducing multiple master nodes. If a master node fails, 141 then its subordinates try to connect to another node from the same or higher 142 level of the hierarchy. If there is no such node, one of the subordinates 143 becomes the master. In~\citep{gankevich2015spec} we developed a framework for 144 big data processing without fault tolerance, and here this framework is 145 combined with fault-tolerance techniques described in this paper. 146 147 Each programme that runs on top of the tree hierarchy is composed of 148 computational kernels---objects that contain data and code to process it. To 149 exploit parallelism a kernel may create arbitrary number of subordinate kernels 150 which are automatically spread first across available processor cores, second 151 across subordinate nodes in the tree hierarchy. The programme is itself a kernel 152 (without a parent as it is executed by a user), which either solves the problem 153 sequentially on its own or creates subordinate kernels to solve it in parallel. 154 155 In contrast to HPC applications, in big data applications it is inefficient to 156 run computational kernels on arbitrary chosen nodes. More practical approach is 157 to bind every kernel to a file location in a parallel file system and transfer 158 the kernel to that location before processing the file. That way expensive data 159 transfer is eliminated, and the file is always read from a local drive. This 160 approach is more deterministic compared to existing ones, e.g. MapReduce 161 framework runs jobs on nodes that are ``close'' to the file location, but not 162 necessarily the exact node where the file is located~\citep{dean2008mapreduce}. 163 However, this approach does not come without disadvantages: scalability of a 164 big data application is limited by the strategy that was employed to distribute 165 its input files across cluster nodes. The more nodes used to store input files, 166 the more read performance is achieved. The advantage of our approach is that 167 the I/O performance is more predictable, than one of the hybrid approach with 168 streaming files over the network. 169 170 The main purpose of the model is to simplify the development of distributed batch 171 processing applications and middleware. The focus is to make an application 172 resilient to failures, i.e. make it fault tolerant and highly available, and do 173 it transparently to a programmer. The implementation is divided into two 174 layers: the lower layer consists of routines and classes for single node 175 applications (with no network interactions), and the upper layer for 176 applications that run on an arbitrary number of nodes. There are two kinds of 177 tightly coupled entities in the model~--- \emph{control flow objects} (or 178 \emph{kernels}) and \emph{pipelines}~--- which are used together to compose a 179 programme. 180 181 Kernels implement control flow logic in theirs \texttt{act} and \texttt{react} 182 methods and store the state of the current control flow branch. Both logic and 183 state are implemented by a programmer. In the \texttt{act} method some function is 184 either directly computed or decomposed into nested functions (represented by a 185 set of subordinate kernels) which are subsequently sent to a pipeline. In 186 \texttt{react} method subordinate kernels that returned from the pipeline are 187 processed by their parent. Calls to \texttt{act} and \texttt{react} methods are 188 asynchronous and are made within threads attached to a pipeline. For each 189 kernel \texttt{act} is called only once, and for multiple kernels the calls are 190 done in parallel to each other, whereas \texttt{react} method is called once 191 for each subordinate kernel, and all the calls are made in the same thread to 192 prevent race conditions (for different parent kernels different threads may be 193 used). 194 195 Pipelines implement asynchronous calls to \texttt{act} and \texttt{react}, and 196 try to make as many parallel calls as possible considering concurrency of the 197 platform (no. of cores per node and no. of nodes in a cluster). A pipeline 198 consists of a kernel pool, which contains all the subordinate kernels sent by 199 their parents, and a thread pool that processes kernels in accordance with 200 rules outlined in the previous paragraph. A separate pipeline is used for each 201 device: There are pipelines for parallel processing, schedule-based processing 202 (periodic and delayed tasks), and a proxy pipeline for processing of kernels on 203 other cluster nodes. 204 205 In principle, kernels and pipelines machinery reflect the one of procedures and 206 call stacks, with the advantage that kernel methods are called asynchronously 207 and in parallel to each other (as much as programme logic allows). Kernel field 208 is the stack, \texttt{act} method is a sequence of processor instructions 209 before nested procedure call, and \texttt{react} method is a sequence of 210 processor instructions after the call. Constructing and sending subordinate 211 kernels to the pipeline is nested procedure call. Two methods are necessary to 212 make calls asynchronous, and replace active wait for completion of subordinate 213 kernels with passive one. Pipelines, in turn, allow implementing passive wait, 214 and call correct kernel methods by analysing their internal state. 215 216 \subsection{Handling master node failures} 217 \label{sec:master-node-failure} 218 219 A possible way of handling a failure of a node where the first kernel is 220 located (a master node) is to replicate this kernel to a backup node, and make 221 all updates to its state propagate to the backup node by means of a distributed 222 transaction. This approach requires synchronisation between all nodes that 223 execute subordinates of the first kernel and the node with the first kernel 224 itself. When a node with the first kernel goes offline, the nodes with 225 subordinate kernels must know what node is the backup one. However, if the 226 backup node also goes offline in the middle of execution of some subordinate 227 kernel, then it is impossible for this kernel to discover the next backup node 228 to return to, because this kernel has not discovered the unavailability of the 229 master node yet. One can think of a consensus-based algorithm to ensure that 230 subordinate kernels always know where the backup node is, but distributed 231 consensus algorithms do not scale well to the large number of nodes and they 232 are not reliable~\citep{fischer1985impossibility}. So, consensus-based approach 233 does not play well with asynchronous nature of computational kernels as it may 234 inhibit scalability of a parallel programme. 235 236 Fortunately, the first kernel usually does not perform operations in parallel, 237 it is rather sequentially launches execution steps one by one, so it has only 238 one subordinate at a time. Such behaviour is described by bulk-synchronous 239 parallel programming model, in the framework of which a programme consists of 240 sequential supersteps which are internally 241 parallel~\citep{valiant1990bridging}. Keeping this in mind, we can simplify 242 synchronisation of its state: we can send the first kernel along with its 243 subordinate to the subordinate node. When the node with the first kernel fails, 244 its copy receives its subordinate, and no execution time is lost. When the node 245 with its copy fails, its subordinate is rescheduled on some other node, and in 246 the worst case a whole step of computation is lost. 247 248 The described approach works only for kernels that do not have a parent and have 249 only one subordinate at a time, and act similar to manually triggered 250 checkpoints. The advantage is that they 251 \begin{itemize} 252 \item save results after each sequential step when memory footprint of a 253 programme is low, 254 \item they save only relevant data, 255 \item and they use memory of a subordinate node instead of stable storage. 256 \end{itemize} 257 258 \section{Evaluation} 259 \label{sec:results} 260 261 Master node fail over technique is evaluated on the example of wave energy 262 spectra processing application. This programme uses NDBC 263 dataset~\citep{ndbc-dataset} to reconstruct frequency-directional spectra from 264 wave rider buoy measurements and compute variance. Each spectrum is 265 reconstructed from five variables using the following 266 formula~\citep{earle1996nondirectional}. 267 \begin{equation*} 268 S(\omega, \theta) = \frac{1}{\pi} 269 \left[ 270 \frac{1}{2} + 271 r_1 \cos \left( \theta - \alpha_1 \right) + 272 r_2 \sin \left( 2 \left( \theta - \alpha_2 \right) \right) 273 \right] 274 S_0(\omega). 275 \end{equation*} 276 Here $\omega$ denotes frequency, $\theta$ is wave direction, $r_{1,2}$ and 277 $\alpha_{1,2}$ are parameters of spectrum decomposition and $S_0$ is 278 non-directional spectrum; $r_{1,2}$, $\alpha_{1,2}$ and $S_0$ are acquired 279 through measurements. Properties of the dataset which is used in evaluation are 280 listed in Table~\ref{tab:ndbc-dataset}. 281 282 \begin{table} 283 \centering 284 \caption{NDBC dataset properties.} 285 \begin{tabular}{ll} 286 \toprule 287 Dataset size & 144MB \\ 288 Dataset size (uncompressed) & 770MB \\ 289 No. of wave stations & 24 \\ 290 Time span & 3 years (2010--2012) \\ 291 Total no. of spectra & 445422 \\ 292 \bottomrule 293 \end{tabular} 294 \label{tab:ndbc-dataset} 295 \end{table} 296 297 The algorithm of processing spectra is as follows. First, current directory is 298 recursively scanned for input files. Data for all buoys is distributed across 299 cluster nodes and each buoy's data processing is distributed across processor 300 cores of a node. Processing begins with joining corresponding measurements for 301 each spectrum variables into a tuple, then for each tuple frequency-directional 302 spectrum is reconstructed and its variance is computed. Results are gradually 303 copied back to the machine where the application was executed and when the 304 processing is complete the programme terminates. A data processing pipeline 305 corresponding to the algorithm is presented in fig.~\ref{fig:pipeline}. 306 307 \begin{figure} 308 \centering 309 \includegraphics[scale=0.6]{ppl} 310 \caption{Data processing pipeline of a programme.} 311 \label{fig:pipeline} 312 \end{figure} 313 314 315 \begin{table} 316 \centering 317 \caption{Test platform configuration.} 318 \begin{tabular}{ll} 319 \toprule 320 CPU & Intel Xeon E5440, 2.83GHz \\ 321 RAM & 4Gb \\ 322 HDD & ST3250310NS, 7200rpm \\ 323 No. of nodes & 12 \\ 324 No. of CPU cores per node & 8 \\ 325 \bottomrule 326 \end{tabular} 327 \label{tab:cluster} 328 \end{table} 329 330 In a series of test runs we benchmarked performance of the application in the 331 presence of different types of failures: 332 \begin{itemize} 333 \item failure of a master node (a node where the first kernel is run), 334 \item failure of a slave node (a node where spectra from a particular 335 station are reconstructed) and 336 \item failure of a backup node (a node where the first kernel is copied). 337 \end{itemize} 338 A tree hierarchy with sufficiently large fan-out value was chosen to make all 339 cluster nodes connect directly to the first one so that only one master node 340 exists in the cluster. In each run the first kernel was launched on a different 341 node to make mapping of kernel hierarchy to the tree hierarchy optimal. A victim 342 node was made offline after a fixed amount of time early after the programme 343 start. To make up for the node failure all data files have replicas stored on 344 different cluster nodes. All relevant parameters are summarised in 345 Table~\ref{tab:benchmark} (here ``root'' and ``leaf'' refer to a node in the 346 tree hierarchy). The results of these runs were compared to the run without node 347 failures (Figure~\ref{fig:benchmark-bigdata}). 348 349 \begin{table} 350 \centering 351 \caption{Benchmark parameters.} 352 \begin{tabular}{llll} 353 \toprule 354 Experiment no. & Master node & Victim node & Time to offline, s \\ 355 \midrule 356 1 & root & & \\ 357 2 & root & leaf & 30 \\ 358 3 & leaf & leaf & 30 \\ 359 4 & leaf & root & 30 \\ 360 \bottomrule 361 \end{tabular} 362 \label{tab:benchmark} 363 \end{table} 364 365 The benchmark showed that only a backup node failure results in significant 366 performance penalty, in all other cases the performance is roughly equals to the 367 one without failures but with the number of nodes minus one. It happens because 368 the backup node not only stores the copy of the state of the current computation 369 step but executes this step in parallel with other subordinate nodes. So, when a 370 backup node fails, the master node executes the whole step once again on 371 arbitrarily chosen healthy subordinate node. 372 373 % \begin{figure} 374 % \centering 375 % \includegraphics{factory-3000} 376 % \caption{Performance of hydrodynamics HPC application in the presence of node failures.} 377 % \label{fig:benchmark-hpc} 378 % \end{figure} 379 380 \begin{figure} 381 \centering 382 \includegraphics{spec} 383 \caption{Performance of spectrum processing application in the presence of different types of node failures.} 384 \label{fig:benchmark-bigdata} 385 \end{figure} 386 387 To measure how much time is lost due to a failure we divide the total execution 388 time with a failure by the total execution time without the failure but with 389 the number of nodes minus one. The results for this calculation are obtained 390 from the same benchmark and are presented in 391 fig.~\ref{fig:benchmark-bigdata-2}. The difference in performance lies within 392 20\% margin for the number of nodes less or equal to six. As is expected, the 393 slowdown is noticable for the small number of nodes, and performance penalty 394 descreases for large number of nodes, as smaller portion of execution state is 395 lost. The picture is different for the number of nodes larger than six: 396 performance ratio fluctuates within 50\% margin. The reason for such behaviour 397 is that spectrum processing application does not scale beyond six nodes, and 398 the benchmark does not give realiable results beyond this point. 399 400 \begin{figure} 401 \centering 402 \includegraphics{build/ratio.eps} 403 \caption{Slowdown of the spectrum processing application in the presence of 404 different types of node failures compared to execution without failures but 405 with the number of nodes minus one.} 406 \label{fig:benchmark-bigdata-2} 407 \end{figure} 408 409 \section{Discussion} 410 411 Described algorithm guarantees to handle one failure per computational step, 412 more failures can be tolerated if they do not affect the master node. The system 413 handles simultaneous failure of all subordinate nodes, however, if both master 414 and backup nodes fail, there is no chance for an application to survive. In this 415 case the state of the current computation step is lost, and the only way to 416 restore it is to restart the application. 417 418 Computational kernels are means of abstraction that decouple a distributed 419 application from physical hardware: it does not matter how many nodes are online 420 for an application to run successfully. Computational kernels eliminate the need 421 to allocate a physical backup node to make the master node highly available, with 422 computational kernels approach any node can act as a backup one. Finally, 423 computational kernels can handle subordinate node failures in a way that is 424 transparent to a programmer. 425 426 The disadvantage of this approach is evident: there is no way of making existing 427 middleware highly available without rewriting their source code. Although, our 428 programming framework is lightweight, it is not easy to map architecture of 429 existing middleware systems to it: most systems are developed keeping in mind 430 static assignment of server/client roles, which is not easy to make dynamic. 431 Nevertheless, our approach may simplify design of future middleware systems.