iccsa-15-novel-appr

Novel Approaches for Distributing Workload on Commodity Computer Systems
git clone https://git.igankevich.com/iccsa-15-novel-appr.git
Log | Files | Refs

sec-2.tex (17862B)


      1 \section{Distributed pipeline}
      2 \label{sec:pipe}
      3 
      4 The main idea of distributed pipeline is to create virtual topology of processes running on different computer cluster nodes and update it in real-time as infrastructure changes. The changes include nodes going offline and online, joining or leaving the cluster, replacement of any hardware component or an entire network node (including switches and routers), and other changes affecting system performance. Each change results in virtual topology being updated for a new infrastructure configuration.
      5 
      6 The main purpose of distributed pipeline is to optimise performance of distributed low-level service tasks running on a computer cluster. Typically these tasks involve querying each cluster node for some information or updating files on each node, and often single master node sends and collects messages from all slave nodes. To reduce network congestion intermediate nodes should be used to communicate master and slave nodes, that is to collect information and send it to master or the other way round. In case of large cluster a distributed pipeline with virtual topology of a tree is formed.
      7 
      8 The other purpose of distributed pipeline is to improve efficiency of high-performance applications. These applications typically launch many parallel processes on a subset of cluster nodes, which communicate messages with each other. To make these communications efficient virtual topology should resemble a real one as much as possible and take into account nodes' performance (relative to each other) and communication link speed (see Section~\ref{sec:node-rating}). That is, if two nodes are adjacent to each other in virtual topology, then the node which is closest to the tree root should have higher performance than the other one, and the link between them should be of the highest throughput.
      9 
     10 Another aspect of high-performance applications is their fault tolerance and recovery time from node failure. Often these applications consist of long-running processes which are checkpointed with specified time period. If a node fails, then a new one is reserved and the application is recovered from the last checkpoint on each node. To reduce recovery time checkpointing can be made incremental and selective, that is to recover only those parts of a program that have failed and checkpoint the data that have actually changed. In distributed pipeline it is accomplished by logging messages sent to other nodes and resending them in case of a node failure (see Section~\ref{sec:impl}).
     11 
     12 To summarise, distributed pipeline creates virtual topology of a computer cluster in a form of a tree with high-performance nodes being closest to the root, and every virtual link having the highest-possible throughput. The purpose of this pipeline is to optimise performance of various cluster management tasks, but it can be beneficial for high-performance applications as well.
     13 
     14 \subsection{Implementation}
     15 \label{sec:impl}
     16 
     17 From technical point of view distributed pipeline is a collection of reliable network connections between principal and subordinate nodes, which are made unique and shared by multiple applications (or service tasks) in a controlled way. The absence of duplicate connections between any two nodes conserves operating system resources and makes it possible to build distributed pipeline crossing multiple NAT environments: If the node hidden behind NAT can reach a node with public Internet address, they can communicate in both directions. Additionally, it allows creating persistent connections which are closed only when a change in topology occurs or in an event of system failure.
     18 
     19 Connection between nodes can be shared by multiple applications in a controlled way. Each message is tagged with an application identifier (a number), and each application sends/receives messages from either standard input/output or a separate file descriptor (a pipe) which can be polled and operated asynchronously. The data is automatically converted to either portable binary (with network byte order) or text format. So, if high performance of asynchronous communication and small size of binary messages are not required, any programming language which can read and write data to standard streams can be used to develop an application for distributed pipeline.
     20 
     21 To simplify writing high-performance applications for distributed pipeline the notion of a message is removed from the framework, instead the communication is done by sending one object to another and passing it as an argument to a defined method call. Each object can create subordinates to process data in parallel, perform parallel computations, or run tasks concurrently, and then report results to their principals. The absence of messages simplifies the API: an object always processes either principal's local data or results collected from subordinate objects.
     22 
     23 Various aspects of reliable asynchronous communication have to be considered to make distributed pipeline fault-tolerant. If the communication between objects is not needed, the object is sent to a free node to perform some computation, and sent back to its principal when it is done. Objects which are sent to remote computer nodes are saved in a buffer and are removed from it when they return to their principals. That way even if the remote node fails after receiving the objects, they are sent back to their principals from the buffer with an error. After that it is principal that decides to rollback and resend objects to another node, or to fail and report to its own principal. In case the failure occurs before sending an object (e.g. node goes offline), then the object is sent to some other node bypassing its principal. To summarise, subordinate objects \textit{always} return to their principals either with a success or a failure which further simplifies writing high-performance applications.
     24 
     25 Principal/subordinate objects easily map to tree topology. If not specified explicitly, a principal sends a subordinate object to a local execution queue. In case of high load, it is extracted from the queue and sent to a remote node. If the remote node is also under high load, the object is sent further. So, the hierarchy of principals and subordinates is compactly mapped to tree topology: lower-level nodes are used on-demand when a higher-level node can not handle the flow of objects. In other words, when a higher-level node ``overflows'', the excessive objects are ``spilled'' to lower-level nodes.
     26 
     27 So, the main goal of the implementation is to simplify application programming by minimising dependencies for small service programmes, making asynchronous messaging reliable, and by using automatic on-demand load balancing.
     28 
     29 \subsection{Peer discovery}
     30 
     31 The core of distributed pipeline is an algorithm which builds and updates virtual topology, and there are several states in which a node can be. In \textit{disconnected} state a node is not a part of distributed pipeline, and to become the part it discovers its peers and connects to one of them. Prior to connecting the link speed and relative performance of a node are measured. After connecting the node enters \textit{connected} state. In this state it receives updates from subordinate nodes (if any) and sends them to its principal. So, updates propagate from leaves to the root of a tree.
     32 
     33 In initial state a node uses discovery algorithm to find its peers. The node queries operating system for a list of networks it is part of and filters out global and Internet host loopback addresses (/32 and 127.0.0.0/8 blocks in IPv4 standard). Then it sends a subordinate object to each address in the list and measures response time. In case of success the response time is saved in the table and in case of failure it is deleted. After receiving all subordinate objects principal repeats the process until minimal number of performance samples is collected for all peers. Then the rating (see~Section~\ref{sec:node-rating}) of each peer is calculated and the table is sorted by it. The principal declares the first peer in the table a leader and sends a subordinate object to it which increases peer's level by one. Then the whole algorithm repeats for the next level of a tree. 
     34 
     35 %The algorithm is best described with a listing in Figure~\ref{alg:discovery}.
     36 
     37 Sometimes two nodes in disconnected state can be chosen to be principals of each other, which creates a cycle in tree topology of distributed pipeline. This can happen if two nodes have the same rating. The cycle is eliminated by rejecting offer from the node with higher IP address, so that a node with lower IP address becomes the principal. To make rating conflicts rare IP address is used as the second field when sorting the table of peers.
     38 
     39 On initial installation the total number of subordinate objects sent by each node amounts to $m n^2$ where $n$ is the total number of nodes and $m$ is the minimal number of samples, however, for subsequent restarts of the whole cluster this number can be significantly reduced with help of peer caches (Section~\ref{sec:peer-caches}). Upon entering connected state or receiving updates from peers each node stores current peer table in a file. When the node restarts it runs discovery algorithm only for peers in the file. If no peers are found to be online, then the algorithm repeats with an empty cache.
     40 
     41 \subsection{Node's rating}
     42 
     43 Determining performance of a computer is a complex task on its own right. The same computers may show varying performance for different applications, and the same application may show varying performance for different computers. For distributed pipeline performance of a node equals the number of computed objects per unit of time which depends on a type of a workload. So, performance of a computer is rather a function of a workload, not a variable that can be measured for a node.
     44 
     45 In contrast to performance, concurrency (the ability to handle many workloads of the same type or a large workload) is a variable of a node often amounting to the number of processor cores. Using concurrency instead of processor speed for measuring performance is equivalent to assuming that all processors in a cluster have the same clock rate so that a number of cores determines performance. This assumption gives rough estimate of real performance, however, it allows determining performance just by counting the total number of cores in a computer node and relieves one from running resource-consuming performance tests on each node.
     46 
     47 \label{sec:node-rating}
     48 In~\cite{deg2003,soshmina2007,andrianov2007} the authors suggest generalisation of Amdahl's law formula for computer clusters from which node's rating can be devised. The formula
     49 \begin{equation*}
     50     S_N = \frac{N}{1 - \alpha + \alpha N + \beta\gamma N^3}
     51 \end{equation*}
     52 shows speedup of a parallel programme on a cluster taking into account communication overhead. Here $N$ is the number of nodes, $\alpha$ is the sequential portion of a program, $\beta$ is the diameter of a system (the maximal number of intermediate nodes a message passes through when any two nodes communicate), and $\gamma$ is the ratio of node performance to network link performance. Speedup reaches its maximal value at $N = \sqrt[3]{(1-\alpha)/(2\beta\gamma)}$, so the higher the link performance is the higher speedup is achieved. In distributed pipeline performance is measured as the number of objects processed by a node or transmitted by network link during a fixed time period. The ratio of these two numbers is used as a rating.
     53 
     54 So, when nodes are in disconnected state the rating is estimated as the ratio of a node concurrency to the response time. The rating of a remote node is re-estimated to be the ratio of number of transmitted objects to the number of processed objects per unit of time when nodes enter connected state and start processing objects.
     55 
     56 \subsection{Rating and level updates}
     57 
     58 A node in connected state may update its level if a new subordinate node with the same or higher level chooses it as a leader. The level of each node equals to the maximal level of subordinates plus one. So, if some high-level node connects to a principal, then its level is recalculated and this change propagates towards root of a tree. That way the root node level equals the maximal level of all nodes in the cluster plus one.
     59 
     60 The rating of principal can become smaller than the rating of one of its subordinates when the workload type is changed from compute-intensive to data-intensive or the other way round. This also may occur due to a delayed level update, a change in node's configuration, or as a result of higher level node being offline. When it happens, the principal and the subordinate swap their positions in virtual topology, that is the higher level subordinate node becomes principal. Thus high-performance node can make its way to the root of a tree, if there are no network bottlenecks in the path.
     61 
     62 %Large number of subordinates. Prime factors of number of subordinates. Or node performance.
     63 
     64 So, rating and level updates propagate from leaves to the root of a tree in virtual topology automatically adapting distributed pipeline for a new type of workload or new cluster configuration.
     65 
     66 \subsection{Node failures and network partitions}
     67 
     68 There are three types of node failures in distributed pipeline: a failure of a leaf node, a principal node, and the root node. When a leaf node fails, objects in the corresponding sent buffer of its principal node are returned to their principals, and objects in unsent buffer are sent to some other subordinate node. If error processing is not done by principal object, then returning subordinate objects are also re-sent to other subordinate nodes. In case of principal or root node failure the recovery mechanism is the same, but subordinate nodes that lost their principal enter disconnected state, and a new principal is chosen from these subordinate nodes.
     69 
     70 In case of root node failure all objects which were sent to subordinate nodes are lost and retransmitted one more time. Sometimes this results in restart of the whole application which discards previously computed objects. The obvious solution to this problem is to buffer subordinate objects returning to their principals so that in an event of a failure retransmit them to a new principal node. However, in case of a root node failure there is no way to recover objects residing in this particular node other than replicating them to some other node prior to failure. This makes the solution unnecessary complicated, so for now no simple solution to this problem has been found.
     71 
     72 In case of network partition the recovery mechanism is also the same, and it also possess disadvantages of a root node failure: The results computed by subordinate objects are discarded and potentially large part of the application has to be restarted.
     73 
     74 So, the main approach for dealing with failures consists of resending lost objects to healthy nodes which is equivalent of recomputing a part of the problem being solved. In case of root node failure or network partition a potentially large number of objects are recomputed, but no simple solution to this problem has been found.
     75 
     76 \subsection{Evaluation}
     77 
     78 Test platform consisted of a multi-processor node, and Linux network namespaces were used to consolidate virtual cluster of varying number of nodes on a physical node~\cite{lantz2010network,handigol2012reproducible,heller2013reproducible}. Distributed pipeline needs one daemon process on each node to operate correctly, so one virtual node was used for each daemon. Tests were repeated multiple times to reduce influences of processes running in background. Each subsequent test run was separated from previous one with a delay to give operating system time to release resources, cleanup and flush buffers.
     79 
     80 Discovery test was designed to measure effect of cache on the time an initial node discovery takes. For the first run all cache files were removed from file system so that a node went from disconnected to connected state. For the second run all caches for each node were generated and stored in file system so that each node started from connected state.
     81 
     82 Buffer test shows how many objects sent buffer holds under various load. Objects were sent between two nodes in a ``ping-pong'' manner. Every update of buffer size was captured and maximum value for each run was calculated. A delay between sending objects simulated the load on the system: the higher the load is the higher the delay between subsequent transfers is.
     83 
     84 \subsection{Test results}
     85 
     86 \label{sec:peer-caches}
     87 Discovery test showed that caching node peers can speed up transition from disconnected to connected state by up to 25 times for 32 nodes (Fig.~\ref{fig:discovery}), and this number increases with the number of nodes. This is expected behaviour since the whole discovery step is omitted and cached values are used to reconstruct peers' level and performance data. The absolute time of this test is expected to increase when executed on real network, and cache effect might become more dramatic.
     88 
     89 Buffer test showed that sent buffer contains many objects only under low load, i.e. when the ratio of computations to data transfers is low. Under high load computations and data transfer overlap, and the number of objects in sent buffer lowers (Fig.~\ref{fig:buffer}).
     90 
     91 \begin{figure}
     92     \centering
     93     \includegraphics[width=0.77\textwidth]{discovery}
     94     \caption{Time taken for various number of nodes to discover each other with and without help of cache.}
     95     \label{fig:discovery}
     96 \end{figure}
     97 
     98 \begin{figure}
     99     \centering
    100     \includegraphics[width=0.77\textwidth]{buffer}
    101     \caption{Number of objects in sent buffer for various delays between objects transfers.}
    102     \label{fig:buffer}
    103 \end{figure}