iccsa-16-factory-extended

Master node fault tolerance in distributed big data processing clusters
git clone https://git.igankevich.com/iccsa-16-factory-extended.git
Log | Files | Refs

sections.tex (23567B)


      1 \section{Methods}
      2 
      3 \subsection{Model of computation}
      4 
      5 To infer fault tolerance model which is suitable for big data applications we
      6 use bulk-synchronous parallel model~\citep{valiant1990bridging} as the basis.
      7 This model assumes that a parallel programme is composed of a number of
      8 sequential steps that are internally parallel, and global synchronisation of
      9 all parallel processes occurs after each step. In our model all sequential
     10 steps are pipelined where it is possible. The evolution of the computational
     11 model is described as follows.
     12 
     13 Given a programme that is sequential and large enough to be decomposed into a
     14 number of sequential steps, the simplest way to make it run faster is to
     15 exploit data parallelism. Usually it means finding multi-dimensional arrays and
     16 loops that access their elements and trying to make them parallel. After
     17 transforming the loops the programme will still have the same number of
     18 sequential steps, but every step will (ideally) be internally parallel.
     19 
     20 After that the only possibility to speedup the programme is to overlap
     21 execution of code blocks that work with different hardware devices. The most
     22 common pattern is to overlap computation with network or disk I/O. This
     23 approach makes sense because all devices operate with little synchronisation,
     24 and issuing commands in parallel makes the whole programme perform better. This
     25 behaviour can be achieved by allocating a separate task queue for each device
     26 and submitting tasks to these queues asynchronously with execution of the main
     27 thread. After this optimisation the programme will be composed of a number of
     28 steps chained into the pipeline, each step is implemented as a task queue for a
     29 particular device.
     30 
     31 Pipelining of otherwise sequential steps is beneficial not only for the code
     32 accessing different devices, but for the code different branches of which are
     33 suitable for execution by multiple hardware threads of the same core, i.e.
     34 branches accessing different regions of memory or performing mixed arithmetic
     35 (floating point and integer). In other words, code branches which use different
     36 modules of processor are good candidates to run in parallel on a processor core
     37 with multiple hardware threads.
     38 
     39 Even though pipelining may not add parallelism for a programme that uses only
     40 one input file (or a set of input parameters), it adds parallelism when the
     41 programme can process multiple input files: each input generates tasks which
     42 travel through the whole pipeline in parallel with tasks generated by other
     43 inputs. With a pipeline an array of files is processed in parallel by the same
     44 set of resources allocated for a batch job. The pipeline is likely to deliver
     45 greater efficiency for busy HPC clusters compared to executing a separate job
     46 for each input file, because the time that each subsequent job spends in the
     47 queue is eliminated. A diagram of a pipeline is presented in
     48 fig.~\ref{fig:pipeline} and discussed in sec~\ref{sec:results}.
     49 
     50 Computational model with a pipeline can be seen as \emph{bulk-asynchronous
     51 model}, because of the parallel nature of otherwise sequential execution steps.
     52 This model is the basis of the fault-tolerance model developed here.
     53 
     54 \subsection{Programming model principles}
     55 
     56 Data processing pipeline model is based on the following principles that
     57 maximise efficiency of a programme:
     58 \begin{itemize}
     59 
     60 \item There is no notion of a message in the model, a kernel is itself a
     61 	message that can be sent over network to another node and directly access
     62 	any kernel on the local node. Only programme logic may guarantee the
     63 	existence of the kernel.
     64 
     65 \item A kernel is a \emph{cooperative routine}, which is submitted to the kernel
     66   pool upon the call and is executed asynchronously by the scheduler. There can
     67   be any number of calls to other subroutines inside routine body. Every call
     68   submits corresponding subroutine to the kernel pool and returns immediately.
     69   Kernels in the pool can be executed in any order; this fact is used by the
     70   scheduler to exploit parallelism offered by the computer by distributing
     71   kernels from the pool across available cluster nodes and processor cores.
     72 
     73 \item Asynchronous execution prevents the use of explicit synchronisation after
     74   the call to subroutine is made; the system scheduler returns the control flow
     75   to the routine each time one of its subroutine returns. Such cooperation
     76   transforms each routine which calls subroutines into an event handler, where
     77   each event is a subroutine and the handler is the routine that called them.
     78 
     79 \item The routine may communicate with any number of local kernels, whose
     80   addresses it knows; communication with kernels which are not adjacent in the
     81   call stack complexifies the control flow and the call stack looses its tree
     82   shape. Only the programme logic may guarantee the presence of communicating
     83   kernels in memory. One way to ensure this is to perform communication between
     84   subroutines which are called from the same routine. Since such communication
     85   is possible within the hierarchy through the parent routine, it may be treated
     86   as an optimisation that eliminates the overhead of transferring data over an
     87   intermediate node. The situation is different for interactive or event-based
     88   programmes (e.g. servers and programmes with graphical interface) in which
     89   this is primary type of communication.
     90 
     91 \item In addition to this, communication which does not occur along hierarchical
     92   links and is executed over the cluster network complexifies the design of
     93   resiliency algorithms. Since it is impossible to ensure that a kernel resides
     94   in memory of a neighbour node, a node may fail in the middle of its execution
     95   of the corresponding routine. As a result, upon a failure of a routine all of
     96   its subroutines must be restarted. This encourages a programmer to construct
     97   \begin{itemize}
     98 	\item deep tree hierarchies of tightly coupled kernels (which communicate
     99 		on the same level of hierarchy) to reduce overhead of recomputation;
    100 	\item fat tree hierarchies of loosely coupled kernels, providing maximal
    101 		degree of parallelism.
    102 	\end{itemize}
    103 	Deep hierarchy is not only the requirement of technology; it helps optimise
    104 	communication of large number of cluster nodes reducing it to
    105 	communication of adjacent nodes.
    106 
    107 \end{itemize}
    108 Thus, kernels possess properties of both cooperative routines and event
    109 handlers.
    110 
    111 \subsection{Fail over model}
    112 
    113 Although fault-tolerance and high-availability are different terms, in essence
    114 they describe the same property --- an ability of a system to switch processing
    115 from a failed component to its live spare or backup component. In case of
    116 fault-tolerance it is the ability to switch from a failed slave node to a spare
    117 one, i.e. to repeat computation step on a healthy slave node. In case of
    118 high-availability it is the ability to switch from a failed master node to a
    119 backup node with full recovery of execution state. These are the core abilities
    120 that constitute distributed system's ability to \emph{fail over}.
    121 
    122 The key feature that is missing in the current parallel programming and big
    123 data processing technologies is a possibility to specify hierarchical
    124 (parent-child) dependencies between parallel tasks (kernels). When one has such
    125 dependency, it is trivial to determine which kernel should be responsible for
    126 re-executing a failed kernel on a healthy node. To re-execute the kernel on the
    127 top of the hierarchy, its copy is created and executed on a different node
    128 (detailed algorithm is discussed in sec.~\ref{sec:master-node-failure}).  There
    129 exists a number of engines that are capable of executing directed acyclic
    130 graphs of kernels in parallel~\citep{acun2014charmpp,islam2012oozie}, but
    131 graphs are not suitable to infer parent-child relationship between kernels,
    132 because a node in the graph may have multiple parent nodes.
    133 
    134 \subsection{Programming model}
    135 
    136 This work is based on the results of previous research:
    137 In~\citep{gankevich2015subordination,gankevich2015iccsa} we developed an
    138 algorithm that allows to build a tree hierarchy from strictly ordered set of
    139 cluster nodes. The sole purpose of this hierarchy is to make a cluster more
    140 fault-tolerant by introducing multiple master nodes. If a master node fails,
    141 then its subordinates try to connect to another node from the same or higher
    142 level of the hierarchy. If there is no such node, one of the subordinates
    143 becomes the master. In~\citep{gankevich2015spec} we developed a framework for
    144 big data processing without fault tolerance, and here this framework is
    145 combined with fault-tolerance techniques described in this paper.
    146 
    147 Each programme that runs on top of the tree hierarchy is composed of
    148 computational kernels---objects that contain data and code to process it. To
    149 exploit parallelism a kernel may create arbitrary number of subordinate kernels
    150 which are automatically spread first across available processor cores, second
    151 across subordinate nodes in the tree hierarchy. The programme is itself a kernel
    152 (without a parent as it is executed by a user), which either solves the problem
    153 sequentially on its own or creates subordinate kernels to solve it in parallel.
    154 
    155 In contrast to HPC applications, in big data applications it is inefficient to
    156 run computational kernels on arbitrary chosen nodes. More practical approach is
    157 to bind every kernel to a file location in a parallel file system and transfer
    158 the kernel to that location before processing the file. That way expensive data
    159 transfer is eliminated, and the file is always read from a local drive. This
    160 approach is more deterministic compared to existing ones, e.g. MapReduce
    161 framework runs jobs on nodes that are ``close'' to the file location, but not
    162 necessarily the exact node where the file is located~\citep{dean2008mapreduce}.
    163 However, this approach does not come without disadvantages: scalability of a
    164 big data application is limited by the strategy that was employed to distribute
    165 its input files across cluster nodes. The more nodes used to store input files,
    166 the more read performance is achieved. The advantage of our approach is that
    167 the I/O performance is more predictable, than one of the hybrid approach with
    168 streaming files over the network.
    169 
    170 The main purpose of the model is to simplify the development of distributed batch
    171 processing applications and middleware. The focus is to make an application
    172 resilient to failures, i.e. make it fault tolerant and highly available, and do
    173 it transparently to a programmer. The implementation is divided into two
    174 layers: the lower layer consists of routines and classes for single node
    175 applications (with no network interactions), and the upper layer for
    176 applications that run on an arbitrary number of nodes. There are two kinds of
    177 tightly coupled entities in the model~--- \emph{control flow objects} (or
    178 \emph{kernels}) and \emph{pipelines}~--- which are used together to compose a
    179 programme.
    180 
    181 Kernels implement control flow logic in theirs \texttt{act} and \texttt{react}
    182 methods and store the state of the current control flow branch. Both logic and
    183 state are implemented by a programmer. In the \texttt{act} method some function is
    184 either directly computed or decomposed into nested functions (represented by a
    185 set of subordinate kernels) which are subsequently sent to a pipeline. In
    186 \texttt{react} method subordinate kernels that returned from the pipeline are
    187 processed by their parent. Calls to \texttt{act} and \texttt{react} methods are
    188 asynchronous and are made within threads attached to a pipeline. For each
    189 kernel \texttt{act} is called only once, and for multiple kernels the calls are
    190 done in parallel to each other, whereas \texttt{react} method is called once
    191 for each subordinate kernel, and all the calls are made in the same thread to
    192 prevent race conditions (for different parent kernels different threads may be
    193 used).
    194 
    195 Pipelines implement asynchronous calls to \texttt{act} and \texttt{react}, and
    196 try to make as many parallel calls as possible considering concurrency of the
    197 platform (no. of cores per node and no. of nodes in a cluster). A pipeline
    198 consists of a kernel pool, which contains all the subordinate kernels sent by
    199 their parents, and a thread pool that processes kernels in accordance with
    200 rules outlined in the previous paragraph. A separate pipeline is used for each
    201 device: There are pipelines for parallel processing, schedule-based processing
    202 (periodic and delayed tasks), and a proxy pipeline for processing of kernels on
    203 other cluster nodes.
    204 
    205 In principle, kernels and pipelines machinery reflect the one of procedures and
    206 call stacks, with the advantage that kernel methods are called asynchronously
    207 and in parallel to each other (as much as programme logic allows). Kernel field
    208 is the stack, \texttt{act} method is a sequence of processor instructions
    209 before nested procedure call, and \texttt{react} method is a sequence of
    210 processor instructions after the call. Constructing and sending subordinate
    211 kernels to the pipeline is nested procedure call. Two methods are necessary to
    212 make calls asynchronous, and replace active wait for completion of subordinate
    213 kernels with passive one. Pipelines, in turn, allow implementing passive wait,
    214 and call correct kernel methods by analysing their internal state.
    215 
    216 \subsection{Handling master node failures}
    217 \label{sec:master-node-failure}
    218 
    219 A possible way of handling a failure of a node where the first kernel is
    220 located (a master node) is to replicate this kernel to a backup node, and make
    221 all updates to its state propagate to the backup node by means of a distributed
    222 transaction. This approach requires synchronisation between all nodes that
    223 execute subordinates of the first kernel and the node with the first kernel
    224 itself. When a node with the first kernel goes offline, the nodes with
    225 subordinate kernels must know what node is the backup one. However, if the
    226 backup node also goes offline in the middle of execution of some subordinate
    227 kernel, then it is impossible for this kernel to discover the next backup node
    228 to return to, because this kernel has not discovered the unavailability of the
    229 master node yet. One can think of a consensus-based algorithm to ensure that
    230 subordinate kernels always know where the backup node is, but distributed
    231 consensus algorithms do not scale well to the large number of nodes and they
    232 are not reliable~\citep{fischer1985impossibility}. So, consensus-based approach
    233 does not play well with asynchronous nature of computational kernels as it may
    234 inhibit scalability of a parallel programme.
    235 
    236 Fortunately, the first kernel usually does not perform operations in parallel,
    237 it is rather sequentially launches execution steps one by one, so it has only
    238 one subordinate at a time. Such behaviour is described by bulk-synchronous
    239 parallel programming model, in the framework of which a programme consists of
    240 sequential supersteps which are internally
    241 parallel~\citep{valiant1990bridging}. Keeping this in mind, we can simplify
    242 synchronisation of its state: we can send the first kernel along with its
    243 subordinate to the subordinate node. When the node with the first kernel fails,
    244 its copy receives its subordinate, and no execution time is lost. When the node
    245 with its copy fails, its subordinate is rescheduled on some other node, and in
    246 the worst case a whole step of computation is lost.
    247 
    248 The described approach works only for kernels that do not have a parent and have
    249 only one subordinate at a time, and act similar to manually triggered
    250 checkpoints. The advantage is that they
    251 \begin{itemize}
    252     \item save results after each sequential step when memory footprint of a
    253       programme is low,
    254     \item they save only relevant data,
    255     \item and they use memory of a subordinate node instead of stable storage.
    256 \end{itemize}
    257 
    258 \section{Evaluation}
    259 \label{sec:results}
    260 
    261 Master node fail over technique is evaluated on the example of wave energy
    262 spectra processing application. This programme uses NDBC
    263 dataset~\citep{ndbc-dataset} to reconstruct frequency-directional spectra from
    264 wave rider buoy measurements and compute variance. Each spectrum is
    265 reconstructed from five variables using the following
    266 formula~\citep{earle1996nondirectional}.
    267 \begin{equation*}
    268     S(\omega, \theta) = \frac{1}{\pi}
    269     \left[
    270         \frac{1}{2} + 
    271         r_1 \cos \left( \theta - \alpha_1 \right) +
    272         r_2 \sin \left( 2 \left( \theta - \alpha_2 \right) \right)
    273     \right]
    274     S_0(\omega).
    275 \end{equation*}
    276 Here $\omega$ denotes frequency, $\theta$ is wave direction, $r_{1,2}$ and
    277 $\alpha_{1,2}$ are parameters of spectrum decomposition and $S_0$ is
    278 non-directional spectrum; $r_{1,2}$, $\alpha_{1,2}$ and $S_0$ are acquired
    279 through measurements. Properties of the dataset which is used in evaluation are
    280 listed in Table~\ref{tab:ndbc-dataset}.
    281 
    282 \begin{table}
    283     \centering
    284     \caption{NDBC dataset properties.}
    285     \begin{tabular}{ll}
    286         \toprule
    287         Dataset size                & 144MB \\
    288         Dataset size (uncompressed) & 770MB \\
    289         No. of wave stations        & 24 \\
    290         Time span                   & 3 years (2010--2012) \\
    291         Total no. of spectra        & 445422 \\
    292         \bottomrule
    293     \end{tabular}
    294     \label{tab:ndbc-dataset}
    295 \end{table}
    296 
    297 The algorithm of processing spectra is as follows. First, current directory is
    298 recursively scanned for input files. Data for all buoys is distributed across
    299 cluster nodes and each buoy's data processing is distributed across processor
    300 cores of a node. Processing begins with joining corresponding measurements for
    301 each spectrum variables into a tuple, then for each tuple frequency-directional
    302 spectrum is reconstructed and its variance is computed. Results are gradually
    303 copied back to the machine where the application was executed and when the
    304 processing is complete the programme terminates. A data processing pipeline
    305 corresponding to the algorithm is presented in fig.~\ref{fig:pipeline}.
    306 
    307 \begin{figure}
    308 	\centering
    309 	\includegraphics[scale=0.6]{ppl}
    310 	\caption{Data processing pipeline of a programme.}
    311 	\label{fig:pipeline}
    312 \end{figure}
    313 
    314 
    315 \begin{table}
    316     \centering
    317     \caption{Test platform configuration.}
    318     \begin{tabular}{ll}
    319          \toprule
    320          CPU & Intel Xeon E5440, 2.83GHz \\
    321          RAM & 4Gb \\
    322          HDD & ST3250310NS, 7200rpm \\
    323          No. of nodes & 12 \\
    324          No. of CPU cores per node & 8 \\
    325          \bottomrule
    326     \end{tabular}
    327     \label{tab:cluster}
    328 \end{table}
    329 
    330 In a series of test runs we benchmarked performance of the application in the
    331 presence of different types of failures:
    332 \begin{itemize}
    333     \item failure of a master node (a node where the first kernel is run),
    334     \item failure of a slave node (a node where spectra from a particular
    335       station are reconstructed) and
    336     \item failure of a backup node (a node where the first kernel is copied).
    337 \end{itemize}
    338 A tree hierarchy with sufficiently large fan-out value was chosen to make all
    339 cluster nodes connect directly to the first one so that only one master node
    340 exists in the cluster. In each run the first kernel was launched on a different
    341 node to make mapping of kernel hierarchy to the tree hierarchy optimal. A victim
    342 node was made offline after a fixed amount of time early after the programme
    343 start. To make up for the node failure all data files have replicas stored on
    344 different cluster nodes. All relevant parameters are summarised in
    345 Table~\ref{tab:benchmark} (here ``root'' and ``leaf'' refer to a node in the
    346 tree hierarchy). The results of these runs were compared to the run without node
    347 failures (Figure~\ref{fig:benchmark-bigdata}).
    348 
    349 \begin{table}
    350     \centering
    351     \caption{Benchmark parameters.}
    352     \begin{tabular}{llll}
    353          \toprule
    354          Experiment no. & Master node & Victim node & Time to offline, s \\
    355          \midrule
    356          1 & root &      & \\
    357          2 & root & leaf & 30 \\
    358          3 & leaf & leaf & 30 \\
    359          4 & leaf & root & 30 \\
    360          \bottomrule
    361     \end{tabular}
    362     \label{tab:benchmark}
    363 \end{table}
    364 
    365 The benchmark showed that only a backup node failure results in significant
    366 performance penalty, in all other cases the performance is roughly equals to the
    367 one without failures but with the number of nodes minus one. It happens because
    368 the backup node not only stores the copy of the state of the current computation
    369 step but executes this step in parallel with other subordinate nodes. So, when a
    370 backup node fails, the master node executes the whole step once again on
    371 arbitrarily chosen healthy subordinate node.
    372 
    373 % \begin{figure}
    374 %     \centering
    375 %     \includegraphics{factory-3000}
    376 %     \caption{Performance of hydrodynamics HPC application in the presence of node failures.}
    377 %     \label{fig:benchmark-hpc}
    378 % \end{figure}
    379 
    380 \begin{figure}
    381     \centering
    382     \includegraphics{spec}
    383     \caption{Performance of spectrum processing application in the presence of different types of node failures.}
    384     \label{fig:benchmark-bigdata}
    385 \end{figure}
    386 
    387 To measure how much time is lost due to a failure we divide the total execution
    388 time with a failure by the total execution time without the failure but with
    389 the number of nodes minus one. The results for this calculation are obtained
    390 from the same benchmark and are presented in
    391 fig.~\ref{fig:benchmark-bigdata-2}. The difference in performance lies within
    392 20\% margin for the number of nodes less or equal to six. As is expected, the
    393 slowdown is noticable for the small number of nodes, and performance penalty
    394 descreases for large number of nodes, as smaller portion of execution state is
    395 lost. The picture is different for the number of nodes larger than six:
    396 performance ratio fluctuates within 50\% margin. The reason for such behaviour
    397 is that spectrum processing application does not scale beyond six nodes, and
    398 the benchmark does not give realiable results beyond this point.
    399 
    400 \begin{figure}
    401     \centering
    402     \includegraphics{build/ratio.eps}
    403 	\caption{Slowdown of the spectrum processing application in the presence of
    404 	different types of node failures compared to execution without failures but
    405 	with the number of nodes minus one.}
    406     \label{fig:benchmark-bigdata-2}
    407 \end{figure}
    408 
    409 \section{Discussion}
    410 
    411 Described algorithm guarantees to handle one failure per computational step,
    412 more failures can be tolerated if they do not affect the master node. The system
    413 handles simultaneous failure of all subordinate nodes, however, if both master
    414 and backup nodes fail, there is no chance for an application to survive. In this
    415 case the state of the current computation step is lost, and the only way to
    416 restore it is to restart the application.
    417 
    418 Computational kernels are means of abstraction that decouple a distributed
    419 application from physical hardware: it does not matter how many nodes are online
    420 for an application to run successfully. Computational kernels eliminate the need
    421 to allocate a physical backup node to make the master node highly available, with
    422 computational kernels approach any node can act as a backup one. Finally,
    423 computational kernels can handle subordinate node failures in a way that is
    424 transparent to a programmer.
    425 
    426 The disadvantage of this approach is evident: there is no way of making existing
    427 middleware highly available without rewriting their source code. Although, our
    428 programming framework is lightweight, it is not easy to map architecture of
    429 existing middleware systems to it: most systems are developed keeping in mind
    430 static assignment of server/client roles, which is not easy to make dynamic.
    431 Nevertheless, our approach may simplify design of future middleware systems.