Functional Programming Interface for Parallel and Distributed Computing
git clone https://git.igankevich.com/iccsa-21-guile.git
Log | Files | Refs

main.tex (42931B)

      1 \documentclass[runningheads]{llncs}
      3 \usepackage{amsmath}
      4 \usepackage{amssymb}
      5 \usepackage{booktabs}
      6 \usepackage{cite}
      7 \usepackage{graphicx}
      8 \usepackage{url}
      9 \usepackage{listings}
     10 \usepackage{tikz}
     11 \usepackage{textcomp}
     12 \usepackage{textcomp}
     14 % https://github.com/stuhlmueller/scheme-listings/blob/master/lstlang0.sty
     15 \lstdefinelanguage{scheme}{
     16   morekeywords=[1]{define, define-syntax, define-macro, lambda, define-stream, stream-lambda,
     17   define*},
     18   morekeywords=[2]{begin, call-with-current-continuation, call/cc,
     19     call-with-input-file, call-with-output-file, case, cond,
     20     do, else, for-each, if,
     21     let*, let, let-syntax, letrec, letrec-syntax,
     22     let-values, let*-values,
     23     and, or, not, delay, force,
     24     quasiquote, quote, unquote, unquote-splicing,
     25     syntax, syntax-rules, eval, environment, query,
     26     car, cdr, cons},
     27   morekeywords=[3]{import, export},
     28   alsodigit=!\$\%&*+-./:<=>?@^_~,
     29   sensitive=true,
     30   morecomment=[l]{;},
     31   morecomment=[s]{\#|}{|\#},
     32   morestring=[b]",
     33   basicstyle=\small\rmfamily,
     34   keywordstyle={\bf\rmfamily\color[HTML]{4081ec}},
     35   commentstyle=\color[rgb]{0.33,0.33,0.33},
     36   stringstyle={\color[HTML]{00a000}},
     37   upquote=true,
     38   breaklines=true,
     39   breakatwhitespace=true,
     40   literate=*{`}{{`}}{1},
     41   showstringspaces=false
     42 }
     44 \lstdefinelanguage{cpp}{
     45   morekeywords=[1]{class,struct,enum,public,private,protected,virtual,override,const,
     46                    void,int,new,delete},
     47   morecomment=[l]{//},
     48   basicstyle=\small\rmfamily,
     49   keywordstyle={\bf\color[HTML]{4081ec}},
     50   commentstyle=\color[rgb]{0.33,0.33,0.33},
     51   stringstyle={\color[HTML]{00a000}},
     52   escapeinside={LATEX}{END},
     53 }
     55 \begin{document}
     57 \title{Functional programming interface for parallel and distributed computing%
     58     \thanks{Supported by Council for grants of the President of the Russian Federation (grant no.~\mbox{MK-383.2020.9}).}}
     59 \author{%
     60     Ivan Petriakov\orcidID{0000-0001-5835-9313}\and\\
     61     Ivan Gankevich\textsuperscript{*}\orcidID{0000-0001-7067-6928}
     62 }
     64 \titlerunning{Functional programming interface}
     65 \authorrunning{I.\,Petriakov et al.}
     67 \institute{Saint Petersburg State University\\
     68     7-9 Universitetskaya Emb., St Petersburg 199034, Russia\\
     69     \email{st049350@student.spbu.ru},\\
     70     \email{i.gankevich@spbu.ru},\\
     71     \url{https://spbu.ru/}}
     73 \maketitle
     75 \begin{abstract}
     77 There are many programming frameworks and languages for parallel and
     78 distributed computing which are successful both in industry and academia,
     79 however, all of them are isolated and self-contained. We believe that the main
     80 reason that there is no common denominator between them is that there is no
     81 intermediate representation for distributed computations.  For sequential
     82 computations we have bytecode that is used as an intermediate, portable and
     83 universal representation of a programme written in any language, but bytecode
     84 lacks an ability to describe process
     85 communication. If we add this feature, we get low-level representation on top of
     86 which all the frameworks and languages for parallel and distributed
     87 computations can be built.  In this paper we explore how such intermediate
     88 representation can be made, how it can reduce programming effort and how it may
     89 simplify internal structure of existing frameworks. We also demonstrate how
     90 high-level interface can be build for a functional language that completely hides
     91 all the difficulties that a programmer encounters when he or she works with
     92 distributed systems.
     94 \keywords{%
     95 \and API
     96 \and intermediate representation
     97 \and C++
     98 \and Guile.
     99 }
    100 \end{abstract}
    102 \section{Introduction}
    104 There are many programming frameworks and languages for parallel and
    105 distributed
    106 computing~\cite{spark2016,fault-tolerant-distributed-haskell,wilde2011swift,fetterly2009dryadlinq,pinho2014oopp}
    107 which are successful both in industry and academia, however, all of them are
    108 isolated and self-contained. We believe that the main reason that there is no
    109 common denominator between these frameworks and languages is that there is no
    110 common protocol or low-level representation for distributed computations.  For sequential
    111 computations we have bytecode (e.g.~LLVM~\cite{llvm}, Java bytecode, Guile
    112 bytecode) that is used as an intermediate, portable and universal
    113 representation of a programme written in any language; also we have assembler
    114 which is non-portable, but still popular intermediate representation.  One
    115 important feature, that bytecode and assembler lack, is an ability to
    116 communicate between parallel processes.  This communication is the common
    117 denominator on top of which all the frameworks and languages for parallel and
    118 distributed computations can be built, however, there is no universal low-level
    119 representation that describes communication.
    121 Why common low-level representation exists for sequential computations, but does not
    122 exist for parallel and distributed ones?  One of the reasons, which applies to
    123 both distributed and parallel computations, is the fact that people still think
    124 about programmes as sequences of steps~--- the same way as people themselves
    125 perform complex tasks.  Imperative languages, in which programmes are written
    126 as series of steps, are still prevalent in industry and academia; this is in
    127 contrast to unpopular functional languages in which programmes are written as
    128 compositions of functions with no implied order of computation.  Another reason
    129 which applies to distributed computations is the fact that these computations
    130 are inherently unreliable and there is no universal approach for handling
    131 cluster node failures.  While imperative languages produce more efficient
    132 programmes, they do not provide safety from deadlocks and fault tolerance
    133 guarantees. Also, they are much more difficult to write, as a human have to
    134 work with mutable state (local and global variables, objects etc.) and it is
    135 difficult to keep this state in mind while writing the code.  Functional
    136 languages minimise the usage of mutable state, provide partial safety from
    137 deadlocks (under the assumption that a programmer does not use locks manually)
    138 and can be modified to provide fault tolerance.  From the authors' perspective
    139 people understand the potential of functional languages, but have not yet
    140 realised this potential to get all their advantages; people realised the full
    141 potential of imperative languages, but do not know how to get rid of their
    142 disadvantages.
    144 In this paper we describe low-level representation based on \emph{kernels}
    145 which is suitable for distributed and parallel computations. Kernels provide
    146 automatic fault tolerance and can be used to exchange the data between
    147 programmes written in different languages. We implement kernels in C++ and
    148 build a reference cluster scheduler that uses kernels as the protocol to run
    149 applications that span multiple cluster nodes. Then we use kernels as an
    150 intermediate representation for Guile programming language, run benchmarks
    151 using the scheduler and compare the performance of different implementations of
    152 the same programme.
    154 To prevent ambiguity we use the term \emph{parallel} to describe computations
    155 that use several processor cores of single cluster node, the term
    156 \emph{distributed} to describe computations that use several cluster nodes and
    157 any number of cores on each node, and term \emph{cluster} to describe anything
    158 that refers to local cluster (as opposed to geographically distributed clusters
    159 which are not studied in this paper). \emph{Intermediate representation} in
    160 our paper is a particular form of abstract syntax tree, e.g.~in functional
    161 languages \emph{continuation passing style} is popular intermediate
    162 representation of the code.
    164 %TODO \cite{lang-virt}
    165 %\cite{fetterly2009dryadlinq}
    166 %\cite{wilde2011swift}
    167 %\cite{pinho2014oopp}
    170 \section{Methods}
    172 \subsection{Parallel and distributed computing technologies as components of
    173 unified system}
    175 In order to write parallel and distributed programmes the same way as we write
    176 sequential programmes, we need the following components.
    177 \begin{itemize}
    178     \item Portable low-level representation  of
    179         the code and the data and includes means of decomposition of the code and the data into
    180         parts that can be computed in parallel. The closest sequential
    181         counterpart is LLVM.
    182     \item Cluster scheduler that executes
    183         parallel and distributed applications and uses the low-level representation to implement
    184         communication between these applications running on different cluster nodes.
    185         The closest single-node counterpart is operating system kernel that executes
    186         user processes.
    187     \item High-level interface that wraps the low-level representation for existing
    188         popular programming languages in a form of a framework or a library. This interface
    189         uses cluster scheduler, if it is available and node parallelism is needed by
    190         the application, otherwise the code is executed on the local node and parallelism
    191         is limited to the parallelism of the node. The closest
    192         single-node counterpart is C library that provides an interface to system
    193         calls of the operating system kernel.
    194 \end{itemize}
    195 These three components are built on top of each other as in classical object-oriented
    196 programming approach, and all the complexity is pushed down to the lowest layer:
    197 low-level representation is responsible for providing parallelism and fault tolerance to
    198 the applications, cluster scheduler uses these facilities to provide transparent execution of
    199 the applications on multiple cluster nodes, and high-level interface maps
    200 the underlying system to the target language to simplify the work for
    201 application programmers.
    203 High-performance computing technologies have the same three-component
    204 structure: message passing library (MPI) is widely considered a low-level
    205 language of parallel computing, batch job schedulers are used to allocate
    206 resources and high-level interface is some library that is built on top of MPI;
    207 however, the responsibilities of the components are not clearly separated and
    208 the hierarchical structure is not maintained.  MPI provides means of
    209 communication between parallel processes, but does not provide data
    210 decomposition facilities and fault tolerance guarantees: data decomposition is
    211 done either in high-level language or manually, and fault tolerance is provided
    212 by batch job scheduler.  Batch jobs schedulers provide means to allocate
    213 resources (cluster nodes, processor cores, memory etc.) and launch parallel MPI
    214 processes, but do not have control over messages that are sent between these
    215 processes and do not control the actual number of resources used by the
    216 programme (all resources are owned exclusively by the programme, and the programme decides
    217 how to use them), i.e.~cluster
    218 schedulers and MPI programmes do not talk to each other after the parallel
    219 processes were launched. Consequently, high-level interface is also separated
    220 from the scheduler.  Although, high-level interface is built on top of the
    221 low-level interface, batch job scheduler is fully integrated with neither of
    222 them: the cluster-wide counterpart of operating system kernel does not have
    223 control over communication of the applications that are run on the cluster, but
    224 is used as resource allocator instead.
    226 The situation in newer big data technologies is different: there are the same
    227 three components with hierarchical structure, but the low-level representation is
    228 integrated in the scheduler.  There is YARN cluster
    229 scheduler~\cite{vavilapalli2013yarn} with API that is used as a low-level
    230 representation for parallel and distributed computing, and there are many high-level
    231 libraries that are built on top of YARN~\cite{hadoop,oozie,spark2016,storm}.
    232 The scheduler has more control over job execution as jobs are decomposed into
    233 tasks and execution of tasks is controlled by the scheduler.  Unfortunately, the
    234 lack of common low-level representation made all high-level frameworks that are built
    235 on top of YARN API use their own custom protocol for communication, shift
    236 responsibility of providing fault tolerance to the scheduler and shift
    237 responsibility of data decomposition to higher level frameworks.
    239 To summarise, the current state-of-the-art technologies for parallel and
    240 distributed computing can be divided into three classes: low-level representations,
    241 cluster schedulers and high-level interfaces; however, responsibilities of each
    242 class are not clearly separated by the developers of these technologies.
    243 Although, the structure of the components resembles the operating system kernel
    244 and its application interface, the components sometimes are not built on top of
    245 each other, but integrated horizontally, and as a result the complexity of the
    246 parallel and distributed computations is sometimes visible on the highest
    247 levels of abstraction.
    249 Our proposal is to design a low-level representation that provides fault
    250 tolerance, means of data and code decomposition and means of communication for
    251 parallel and distributed applications. Having such a representation at your disposal
    252 makes it easy to build higher level components, because the complexity of
    253 cluster systems is hidden from the programmer, the duplicated effort of
    254 implementing the same facilities in higher level interfaces is reduced, and
    255 cluster scheduler has full control of the programme execution as it speaks the
    256 same protocol and uses the same low-level representation internally: the representation is
    257 general enough to describe any distributed programme including the scheduler
    258 itself.
    260 \subsection{Kernels as objects that control the programme flow}
    261 \label{sec-kernels}
    263 In order to create low-level representation for parallel and distributed computing we
    264 borrow familiar features from sequential low-level representations and augment them
    265 with asynchronous function calls and an ability to read and write call stack
    266 frames.
    268 In assembler and LLVM the programme is written in imperative style as a series
    269 of processor instructions. The variables are stored either on the stack (a
    270 special area of the computer's main memory) or in processor registers. Logical
    271 parts of the programme are represented by functions. A call to a function
    272 places all function arguments on the stack and then jumps to the address of the
    273 function.  When the function returns, the result of the computation is written
    274 to the processor register and control flow is returned to the calling function.
    275 When the main function returns, the programme terminates.
    277 There are two problems with the assembler that need to be solved in order for
    278 it to be useful in parallel and distributed computing. First, the contents of
    279 the stack can not be copied between cluster nodes or saved to and read from the
    280 file, because they often contain pointers to memory blocks that may be invalid
    281 on another cluster node or in the process that reads the stack from the file.
    282 Second, there is no natural way to express parallelism in this language: all
    283 function calls are synchronous and all instructions are executed in the
    284 specified order. In order to solve these problems we use object-oriented
    285 techniques.
    287 We represent each stack frame with an object: local variables become object
    288 fields, and each function call is decomposed into the code that goes before
    289 function call, the code that performs function call, and the code that goes
    290 after. The code that goes before the call is placed into \texttt{act} method of
    291 the object and after this code the new object is created to call the function
    292 asynchronously. The code that goes after the call is placed into \texttt{react}
    293 method of the object, and this code is called asynchronously when the function
    294 call returns (this method takes the corresponding object as an argument). The
    295 object also has \texttt{read} and \texttt{write} methods that are used to read
    296 and write its fields to and from file or to copy the object to another cluster
    297 node. In this model each object contains enough information to perform the
    298 corresponding function call, and we can make these calls in any order we like.
    299 Also, the object is self-contained, and we can ask another cluster node to
    300 perform the function call or save the object to disk to perform the call when
    301 the user wants to resume the computation (e.g.~after the computer is upgraded
    302 and rebooted).
    304 The function calls are made asynchronous with help of thread pool. Each thread
    305 pool consists of an object queue and an array of threads. When the object is
    306 placed in the queue, one of the threads extracts it and calls \texttt{act} or
    307 \texttt{react} method depending on the state of the object. There are two
    308 states that are controlled by the programmer: when the state is
    309 \textit{upstream} \texttt{act} method is called, when the state is
    310 \textit{downstream} \texttt{react} method of the parent object is called with
    311 the current object as the argument. When the state is \textit{downstream} and
    312 there is no parent, the programme terminates.
    314 We call these objects \emph{control flow objects} or \emph{kernels} for short.
    315 These objects contain the input data in object fields, the code that processes
    316 this data in object methods and the output data (the result of the computation)
    317 also in object fields. The programmer decides which data is input and output.
    318 To reduce network usage the programmer may delete input data when the kernel
    319 enters \textit{downstream} state: that way only output data is copied back to
    320 the parent kernel over the network. The example programme written using kernels
    321 and using regular function call stack is shown in table~\ref{tab-call-stack}.
    323 \begin{table}
    324     \centering
    325     \begin{minipage}[t]{0.39\textwidth}
    326 \begin{lstlisting}[language=cpp]
    327 int nested(int a) {
    328   return 123 + a;
    329 }
    330 \end{lstlisting}
    331     \end{minipage}
    332     \begin{minipage}[t]{0.59\textwidth}
    333 \begin{lstlisting}[language=cpp]
    334 struct Nested: public Kernel {
    335   int result;
    336   int a;
    337   Nested(int a): a(a) {}
    338   void act() override {
    339     result = a + 123;
    340     LATEX{\color[HTML]{ac4040}\bf{}async\_return}END();
    341   }
    342 };
    343 \end{lstlisting}
    344     \end{minipage}
    345     \begin{minipage}[t]{0.39\textwidth}
    346 \begin{lstlisting}[language=cpp]
    347 void main() {
    348   // code before
    349   int result = nested();
    350   // code after
    351   print(result);
    352 }
    353 \end{lstlisting}
    354     \end{minipage}
    355     \begin{minipage}[t]{0.59\textwidth}
    356 \begin{lstlisting}[language=cpp]
    357 struct Main: public Kernel {
    358   void act() override {
    359     // code before
    360     LATEX{\color[HTML]{ac4040}\bf{}async\_call}END(new Nested);
    361   }
    362   void react(Kernel* child) override {
    363     int result = ((Nested*)child)->result;
    364     // code after
    365     print(result);
    366     LATEX{\color[HTML]{ac4040}\bf{}async\_return}END();
    367   }
    368 };
    370 void main() {
    371   LATEX{\color[HTML]{ac4040}\bf{}async\_call}END(new Main);
    372   wait();
    373 }
    374 \end{lstlisting}
    375     \end{minipage}
    376     \caption{The same programme written using regular function call stack
    377     (left) and kernels (right). Here \texttt{async\_call} performs asynchronous
    378     function call by pushing the child kernel to the queue, \texttt{async\_return}
    379     performs asynchronous return from the function call by pushing the current
    380     kernel to the queue.\label{tab-call-stack}}
    381 \end{table}
    383 This low-level representation can be seen as an adaptation of classic function call
    384 stack, but with asynchronous function calls and an ability to read and write
    385 stack frames. These differences give kernels the following advantages.
    386 \begin{itemize}
    387     \item Kernels define dependencies between function calls, but do not define
    388         the order of the computation. This gives natural way of expressing parallel
    389         computations on the lowest possible level.
    390     \item Kernels can be written to and read from any medium: files, network
    391         connections, serial connections etc. This allows to implement fault
    392         tolerance efficiently using any existing methods: in order to implement
    393         checkpoints
    394         a programmer no longer need to save memory contents of each parallel
    395         process, only the fields of the main kernel are needed to restart
    396         the programme from the last sequential step. However, with kernels
    397         checkpoints can be replaced with simple restart: when the node
    398         to which the child kernel was sent fails, the copy of this kernel
    399         can be sent to another node without stopping the programme and no
    400         additional configuration from the programmer.
    401     \item Finally, kernels are simple enough to be used as an intermediate
    402         representation for high-level languages: either via a compiler
    403         modification, or via wrapper library that calls the low-level implementation
    404         directly. Kernels can not replace LLVM or assembler, because their level of 
    405         abstraction is higher, therefore, compiler modification is possible
    406         only for languages that use high-level intermediate representation
    407         (e.g.~LISP-like languages and purely functional languages that have
    408         natural way of expressing parallelism by computing arguments of
    409         functions in parallel).
    410 \end{itemize}
    412 \subsection{Reference cluster scheduler based on kernels}
    414 Kernels are general enough to write any programme, and the first programme that
    415 we wrote using them was cluster scheduler that uses kernels to implement its
    416 internal logic and to run applications spanning multiple cluster nodes.
    417 Single-node version of the scheduler is as simple as thread pool attached to
    418 kernel queue described in section~\ref{sec-kernels}. The programme starts with
    419 pushing the first (or \emph{main}) kernel to the queue and ends when the main
    420 kernel changes its state to \textit{downstream} and pushes itself to the queue.
    421 The number of threads in the pool equals the number of processor cores, but can
    422 be set manually to limit the amount of parallelism.  Cluster version of the
    423 scheduler is more involved and uses kernels to implement its logic.
    425 Cluster scheduler runs in a separate daemon process on each cluster node, and
    426 processes communicate with each other using kernels: process on node \(A\)
    427 writes some kernel to network connection with node \(B\) and process on node
    428 \(B\) reads the kernel and performs useful operation with it. Here kernels are
    429 used like messages rather than stack frames: kernel that always resides in node
    430 \(A\) creates child message kernel and sends it to the kernel that always
    431 resides in node \(B\). In order to implement this logic we added
    432 \textit{point-to-point} state and a field that specifies the identifier of the
    433 target kernel. In addition to that we added source and destination address
    434 fields to be able to route the kernel to the target cluster node and return it
    435 back to the source node: \((\text{parent-kernel},\text{source-address})\) tuple
    436 uniquely identifies the location of the parent kernel, and
    437 \((\text{target-kernel},\text{destination-address})\) tuple uniquely identifies
    438 the location of the target kernel. The first tuple is also used by
    439 \textit{downstream} kernels that return back to their parents, but the second
    440 tuple is used only by \textit{point-to-point} kernels.
    442 There are several responsibilities of cluster scheduler:
    443 \begin{itemize}
    444     \item run applications in child processes,
    445     \item route kernels between application processes running on different cluster nodes,
    446     \item maintain a list of available cluster nodes.
    447 \end{itemize}
    448 In order to implement them we created a kernel queue and a thread pool for each
    449 concern that the scheduler has to deal with (see~figure~\ref{fig-local-routing}): we have
    450 \begin{itemize}
    451     \item timer queue for scheduled and periodic tasks,
    452     \item network queue for sending kernels to and receiving from other cluster nodes,
    453     \item process queue for creating child processes and sending kernels to and receiving
    454         from them, and
    455     \item the main processor queue for processing kernels in parallel using multiple processor cores.
    456 \end{itemize}
    457 This separation of concerns allows us to overlap data transfer and data processing:
    458 while the main queue processes kernels in parallel, process and network queues
    459 send and receive other kernels. This approach leads to small amount of oversubscription
    460 as separate threads are used to send and receive kernels, but benchmarks showed that
    461 this is not a big problem as most of the time these threads wait for the operating
    462 system kernel to transfer the data.
    464 \begin{figure}
    465     \centering
    466     \tikzset{Rect/.style={text width=1.30cm,draw,align=center,thick,rounded corners}}
    467     \begin{tikzpicture}[x=1.75cm,y=-1.25cm]
    468         \node[Rect] (parallel) at (0,0) {Processor queue\strut};
    469         \node[Rect] (timer) at (1,0) {Timer queue\strut};
    470         \node[Rect] (disk) at (2,0) {Disk queue\strut};
    471         \node[Rect] (nic) at (3,0) {Network queue\strut};
    472         \node[Rect] (process) at (4,0) {Process queue\strut};
    473         \node[Rect] (cpu0) at (0,-1) {CPU 0\strut};
    474         \node[Rect] (cpu1) at (0,1) {CPU 1\strut};
    475         \node[Rect] (disk0) at (2,-1) {Disk 0\strut};
    476         \node[Rect] (disk1) at (2,1) {Disk 1\strut};
    477         \node[Rect] (timer0) at (1,-1) {Timer 0\strut};
    478         \node[Rect] (nic0) at (3,-1) {NIC 0\strut};
    479         \node[Rect] (nic1) at (3,1) {NIC 1\strut};
    480         \node[Rect] (parent) at (4,-1) {Parent\strut};
    481         \node[Rect] (child) at (4,1) {Child\strut};
    482         \path[draw,thick] (parallel) -- (cpu0);
    483         \path[draw,thick] (parallel) -- (cpu1);
    484         \path[draw,thick] (timer) -- (timer0);
    485         \path[draw,thick] (disk) -- (disk0);
    486         \path[draw,thick] (disk) -- (disk1);
    487         \path[draw,thick] (nic) -- (nic0);
    488         \path[draw,thick] (nic) -- (nic1);
    489         \path[draw,thick] (process) -- (parent);
    490         \path[draw,thick] (process) -- (child);
    491     \end{tikzpicture}
    492     \caption{Default kernel queues for each cluster scheduler concern.\label{fig-local-routing}}
    493 \end{figure}
    495 Cluster scheduler runs applications in child processes; this approach is
    496 natural for UNIX-like operating systems as the parent process has full control
    497 of its children: the amount of resources can be limited (the number of
    498 processor cores, the amount of memory etc.) and the process can be terminated
    499 at any time. After introducing child processes into the scheduler we added
    500 cluster-wide source (target) application identifier field that uniquely
    501 identifies the source (the target) application from which the kernel originated
    502 (to which the kernel was sent). Also each kernel carries an application
    503 descriptor that specifies how to run the application (command line arguments,
    504 environment variables etc.) and if the corresponding process is not running, it is
    505 launched automatically by the scheduler.  Child processes are needed only as
    506 means of controlling resources usage: a process is a scheduling unit for
    507 operating system kernel, but in cluster scheduler a child process performs
    508 something useful only when the kernel (which is a unit of scheduling in our
    509 scheduler) is sent to the corresponding application and launched automatically
    510 if there is no such application.  Application spans multiple cluster nodes and
    511 may have any number of child processes (but no more than one process per node).
    512 These processes are launched on-demand and do nothing until the kernel is
    513 received. This behaviour allows us to implement dynamic parallelism: we do not
    514 need to specify the number of parallel processes on application launch, the
    515 scheduler will automatically create them.  To reduce memory consumption stale
    516 processes, that have not received any kernel for a long period of time, may be
    517 terminated (new processes will be created automatically, when the kernel arrives
    518 anyway).  Kernels can be sent from one application to another by specifying
    519 different application descriptor.
    521 Child process communicates with its parent using optimised child process queue.
    522 If the parent process does not want to communicate, the child process continues
    523 execution on the local node: the applications written using cluster scheduler
    524 interface work correctly even when the scheduler is not available, but use
    525 local node instead of the cluster.
    527 Since the node may have multiple child processes, we may have a situation
    528 when all of them try to use all processor cores, which will lead to
    529 oversubscription and suboptimal performance. In order to solve this problem, we
    530 introduce weight field which tells how many threads will be used by the kernel.
    531 The default is one thread for ordinary kernels and nought threads for cluster
    532 scheduler kernels. Process queue tracks the total weight of the kernels that
    533 were sent to child processes and queues incoming kernels if the weight reaches
    534 the number of processor cores. Also, each cluster node reports this information to
    535 other nodes for better load balancing decisions.
    537 The scheduler acts as a router for the kernels: when the kernel is received
    538 from the application, the scheduler analyses its fields and decides to which
    539 cluster node it can be sent. If the kernel has \textit{downstream} or
    540 \textit{point-to-point} state, the kernel is sent to the node where the target
    541 kernel resides; if the kernel has \textit{upstream} state, load balancing
    542 algorithm decides which node to send the kernel to. Load balancing algorithm
    543 tracks the total weight of the kernels that were sent to the specified node and
    544 also receives the same information from the node (in case other nodes also send
    545 there their kernels), then it chooses the node with the lowest weight and sends
    546 the kernel to this node. If all nodes are full, the kernel is retained in the
    547 queue until the enough processor cores become available. The algorithm is very
    548 conservative and does not use work-stealing for improved performance, however,
    549 the fault tolerance is easier to
    550 implement~\cite{gankevich2016factory,gankevich2017subord} when the target and
    551 the source node fields do not change during kernel lifetime which is not the
    552 case for work-stealing scenario.
    554 The last but not the least responsibility of the scheduler is to discover and
    555 maintain a list of cluster nodes and establish persistent network connections
    556 to neighbours. Cluster scheduler does this automatically by scanning the
    557 network using efficient algorithm: the nodes in the network are organised in
    558 artificial tree topology with the specified fan-out value and each node tries to
    559 communicate with the nodes which are closer to the root of the tree.  This
    560 approach significantly reduces the number of data that needs to be sent over
    561 the network to find all cluster nodes: in ideal case only one kernel is sent to
    562 and received from the parent node. The algorithm is described
    563 in~\cite{gankevich2015subord}. After the connections are established, all the
    564 \textit{upstream} kernels that are received from the applications' child
    565 processes are routed to neighbour nodes in the tree topology (both parent
    566 and child nodes). This creates a problem because the number of nodes ``behind''
    567 the parent node is generally different than the number of nodes behind the
    568 child nodes. In order to solve this problem we track not only the total weight
    569 of all kernels of the neighbour node, but the total weight of each node in
    570 the cluster and sum the weight of all nodes that are behind the node \(A\) to
    571 compute the total weight of node \(A\) for load balancing. Also, we 
    572 apply load balancing recursively: when the kernel arrives at the node, load
    573 balancing algorithm is executed once more to decide whether the kernel can be
    574 sent locally or to another cluster node (except the source node). This approach
    575 solves the problem, and now applications can be launched not only on the root
    576 node, but on any node without load balancing problems. This approach adds small
    577 overhead, as the kernel goes through intermediate node, but if the overhead is
    578 undesirable, the application can be launched on the root node.  Node discovery
    579 and node state updates are implemented using \textit{point-to-point} kernels.
    581 To summarise, cluster scheduler uses kernels as unit of scheduling and as
    582 communication protocol between its daemon processes running on different
    583 cluster nodes. Daemon process acts as an intermediary between application
    584 processes running on different cluster nodes, and all application kernels are
    585 sent through this process to other cluster nodes. Kernels that are sent through
    586 the scheduler are heavy-weight: they have more fields than local kernels and
    587 the routing through the scheduler introduces multiple overheads compared to
    588 direct communication.  However, using cluster scheduler hugely simplifies
    589 application development, as application developer does not need to worry about
    590 networking, fault tolerance, load balancing and ``how many parallel processes
    591 are enough for my application'': this is now handled by the scheduler. For
    592 maximum efficiency and embedded applications the application can be linked
    593 directly to the scheduler to be able to run in the same daemon process, that
    594 way application kernels are no longer sent though daemon process and the
    595 overhead of the scheduler is minimal.
    597 \subsection{Parallel and distributed evaluation of Guile expressions using kernels}
    599 Kernels low-level interface and cluster scheduler are written in C++ language.
    600 From the authors' perspective C is too low-level and Java has too much overhead
    601 for cluster computing, whereas C++ is the middleground choice. The
    602 implementation is the direct mapping of the ideas discussed in previous
    603 sections on C++ abstractions: kernel is a base class
    604 (see~listing~\ref{lst-kernel-api}) for all control flow
    605 objects with common fields (\texttt{parent}, \texttt{target} and all others)
    606 and \texttt{act}, \texttt{react}, \texttt{read}, \texttt{write} virtual
    607 functions that are overridden in subclasses. This direct mapping is natural for
    608 a mixed-paradigm language like C++, but functional languages may benefit from
    609 implementing the same ideas in the compiler or interpreter.
    611 \begin{lstlisting}[language=cpp,%
    612 caption={Public interface of the kernel and the queue classes in C++ (simplified for clarity).},%
    613 captionpos=b,
    614 label={lst-kernel-api}]
    615 enum class states {upstream, downstream, point_to_point};
    617 class kernel {
    618 public:
    619   virtual void act();
    620   virtual void react(kernel* child);
    621   virtual void write(buffer& out) const;
    622   virtual void read(buffer& in);
    623   kernel* parent = nullptr;
    624   kernel* target = nullptr;
    625   states state = states::upstream;
    626 };
    628 class queue {
    629 public:
    630   void push(kernel* k);
    631 };
    632 \end{lstlisting}
    634 We made a reference implementation of kernels for Guile
    635 language~\cite{galassi2002guile}.  Guile is a dialect of
    636 Scheme~\cite{sussman1998} which in turn is a dialect of
    637 LISP~\cite{mccarthy1960}. The distinct feature of LISP-like languages is
    638 homoiconicity, i.e.~the code and the data is represented by tree-like
    639 structure (lists that contain atoms or other lists as elements). This feature makes
    640 it possible to express parallelism directly in the language: every list element
    641 can be computed independently and it can be sent to other cluster nodes for
    642 parallel computation. To implement parallelism we created a Guile interpreter
    643 that evaluates every list element in parallel using kernels. In practice this
    644 means that every argument of a procedure call (a procedure call is also a list
    645 with the first element being the procedure name) is computed in parallel.  This
    646 interpreter is able to run any existing Guile programme (provided that it does
    647 not use threads, locks and semaphores explicitly) and the output will be the
    648 same as with the original interpreter, the programme will automatically use
    649 cluster nodes for parallel computations, and fault tolerance will be
    650 automatically provided by our cluster scheduler. From the authors' perspective
    651 this approach is the most transparent and safe way of writing parallel and
    652 distributed programmes with clear separation of concerns: the programmer takes
    653 care of the application logic, and cluster scheduler takes care of the
    654 parallelism, load balancing and fault tolerance.
    656 Our interpreter consists of standard \textit{read-eval-print} loop out of which
    657 only \textit{eval} step uses kernels for parallel and distributed computations.
    658 Inside \textit{eval} we use hybrid approach for parallelism: we use kernels to
    659 evaluate arguments of procedure calls and arguments of \texttt{cons} primitive
    660 asynchronously only if these arguments contain other procedure calls. This
    661 means that all simple arguments (variables, symbols, other primitives etc.) are
    662 computed sequentially without creating child kernels.
    664 Evaluating procedure calls and \texttt{cons} using kernels is enough to make
    665 \texttt{map} form parallel, but we had to rewrite \texttt{fold} form to make it
    666 parallel. Our parallelism is based on the fact that procedure arguments can be
    667 evaluated in parallel without affecting the correctness of the procedure,
    668 however, evaluating arguments in parallel in \texttt{fold} does not give
    669 speedup because of the nested \texttt{fold} and \texttt{proc} calls: the next
    670 recursive call to \texttt{fold} waits until the call to \texttt{proc}
    671 completes. Alternative procedure \texttt{fold-pairwise} does not have this
    672 deficiency, but is only correct for \texttt{proc} that does not care about the
    673 order of the arguments (\texttt{+}, \texttt{*} operators etc.). In this
    674 procedure we apply \texttt{proc} to successive pairs of elements from the
    675 initial list, after that we recursively call \texttt{fold-pairwise} for the
    676 resulting list. The iteration is continued until only one element is left in
    677 the list, then we return this element as the result of the procedure call. This
    678 new procedure is also iterative, but parallel inside each iteration. We choose
    679 \texttt{map} and \texttt{fold} forms to illustrate automatic parallelism
    680 because many other forms are based on them~\cite{hutton-fold}. Our
    681 implementation is shown in listing~\ref{lst-parallel-forms}. 
    683 \begin{lstlisting}[language=Scheme,%
    684 caption={Parallel \texttt{map} and \texttt{fold} forms in Guile.},
    685 captionpos=b,
    686 label={lst-parallel-forms}]
    687 (define (map proc lst) "Parallel map."
    688   (if (null? lst) lst
    689     (cons (proc (car lst)) (map proc (cdr lst)))))
    690 (define (fold proc init lst) "Sequential fold."
    691   (if (null? lst) init
    692     (fold proc (proc (car lst) init) (cdr lst))))
    693 (define (do-fold-pairwise proc lst)
    694   (if (null? lst) '()
    695     (if (null? (cdr lst)) lst
    696       (do-fold-pairwise proc
    697         (cons (proc (car lst) (car (cdr lst)))
    698               (do-fold-pairwise proc (cdr (cdr lst))))))))
    699 (define (fold-pairwise proc lst) "Parallel pairwise fold."
    700   (car (do-fold-pairwise proc lst)))
    701 \end{lstlisting}
    703 \section{Results}
    705 We tested performance of our interpreter using the forms in
    706 listing~\ref{lst-parallel-forms}. For each form we applied synthetic procedure
    707 that sleeps 200 milliseconds to the list with 96 elements. Then we ran the
    708 resulting script using native Guile interpreter and our interpreter and
    709 measured total running time for different number of threads.  For native Guile
    710 interpreter the running time of all forms is the same for any number of
    711 threads.  For our interpreter \texttt{map} and \texttt{fold-pairwise} forms run
    712 time decreases with the number of threads and for \texttt{fold} form run time
    713 stays the same (figure~\ref{fig-results}).
    715 \begin{figure}
    716     \centering
    717     \includegraphics{build/gnuplot/results.eps}
    718     \caption{The run time of the forms from listing~\ref{lst-parallel-forms}
    719     for different number of parallel threads and different interpreters.\label{fig-results}}
    720 \end{figure}
    725 %In order to test performance of our interpreter we used a programme that
    726 %processes frequency-directional spectra of ocean waves from NDBC
    727 %dataset~\cite{ndbc-web-data-guide,ndbc-techreport}.  Each spectrum consists of
    728 %five variables, each of which is stored in a separate file in a form of time
    729 %series.  First, we find five files that correspond to the same station where
    730 %the data was collected and the same year.  Then we merge the corresponding
    731 %records from these files into single vector-valued time series.  Incomplete
    732 %groups of files and incomplete records are removed.  After that we write the
    733 %resulting groups to disk. We wrote this programme in C++ with kernels,
    734 %in Guile with kernels and in Guile without kernels.
    736 \section{Discussion}
    738 Computing procedure arguments in parallel is a natural way of expressing
    739 parallelism in functional language, and in our tests the performance of the
    740 programme is close to the one with manual parallelism. Lower performance is
    741 explained by the fact that we introduce more overhead by using asynchronous
    742 kernels to compute procedure arguments where this does not give large
    743 performance gains (even with ideal parallelism with no overhead). If we remove
    744 these overheads we will get the same time as the original programme with manual
    745 parallelism.  This is explained by the fact that the main loop of the programme
    746 is written as an application of \texttt{map} form and our interpreter makes it
    747 parallel. Executing this loop in parallel gives the largest performance gains
    748 compared to other parts of the programme. We expect that the difference between
    749 automatic and manual parallelism to be more visible in larger and more complex
    750 programmes, and in future plan to benchmark more algorithms with known parallel
    751 implementations.
    754 \section{Conclusion}
    756 Using procedure arguments to define parallel programme parts gives new
    757 perspective on writing parallel programmes. In imperative languages programmers
    758 are used to rearranging loops to optimise memory access patterns and help the
    759 compiler vectorise the code, but with parallel-arguments approach in functional
    760 languages they can rearrange the forms to help the interpreter to extract more
    761 parallelism. This parallelism is automatic and does not affect the correctness
    762 of the programme (of course, you need to serialise access and modification of
    763 the global variables). With help of kernels these parallel computations are
    764 extended to distributed computations. Kernels provide standard way of
    765 expressing parallel and distributed programme parts, automatic fault tolerance
    766 for master and worker nodes and automatic load balancing via cluster scheduler.
    767 Together kernels and arguments-based parallelism provide low- and high-level
    768 programming interface for clusters and single nodes that conveniently hide
    769 the shortcomings of parallel and distributed computations allowing the
    770 programmer to focus on the actual problem being solved rather than fixing
    771 bugs in his or her parallel and distributed code.
    773 Future work is to re-implement LISP language features that are relevant for
    774 parallelism in a form of C++ library and use this library for parallelism, but
    775 implement the actual computations in C++. This would allow to improve
    776 performance of purely functional programmes by using the functional language
    777 for parallelism and imperative language for performance-critical code.
    779 \subsubsection*{Acknowledgements.}
    780 Research work is supported by Council for grants of the President of the
    781 Russian Federation (grant no.~MK-383.2020.9).
    783 \bibliographystyle{splncs04}
    784 \bibliography{main.bib}
    786 \end{document}