main.tex (42931B)
1 \documentclass[runningheads]{llncs} 2 3 \usepackage{amsmath} 4 \usepackage{amssymb} 5 \usepackage{booktabs} 6 \usepackage{cite} 7 \usepackage{graphicx} 8 \usepackage{url} 9 \usepackage{listings} 10 \usepackage{tikz} 11 \usepackage{textcomp} 12 \usepackage{textcomp} 13 14 % https://github.com/stuhlmueller/scheme-listings/blob/master/lstlang0.sty 15 \lstdefinelanguage{scheme}{ 16 morekeywords=[1]{define, define-syntax, define-macro, lambda, define-stream, stream-lambda, 17 define*}, 18 morekeywords=[2]{begin, call-with-current-continuation, call/cc, 19 call-with-input-file, call-with-output-file, case, cond, 20 do, else, for-each, if, 21 let*, let, let-syntax, letrec, letrec-syntax, 22 let-values, let*-values, 23 and, or, not, delay, force, 24 quasiquote, quote, unquote, unquote-splicing, 25 syntax, syntax-rules, eval, environment, query, 26 car, cdr, cons}, 27 morekeywords=[3]{import, export}, 28 alsodigit=!\$\%&*+-./:<=>?@^_~, 29 sensitive=true, 30 morecomment=[l]{;}, 31 morecomment=[s]{\#|}{|\#}, 32 morestring=[b]", 33 basicstyle=\small\rmfamily, 34 keywordstyle={\bf\rmfamily\color[HTML]{4081ec}}, 35 commentstyle=\color[rgb]{0.33,0.33,0.33}, 36 stringstyle={\color[HTML]{00a000}}, 37 upquote=true, 38 breaklines=true, 39 breakatwhitespace=true, 40 literate=*{`}{{`}}{1}, 41 showstringspaces=false 42 } 43 44 \lstdefinelanguage{cpp}{ 45 morekeywords=[1]{class,struct,enum,public,private,protected,virtual,override,const, 46 void,int,new,delete}, 47 morecomment=[l]{//}, 48 basicstyle=\small\rmfamily, 49 keywordstyle={\bf\color[HTML]{4081ec}}, 50 commentstyle=\color[rgb]{0.33,0.33,0.33}, 51 stringstyle={\color[HTML]{00a000}}, 52 escapeinside={LATEX}{END}, 53 } 54 55 \begin{document} 56 57 \title{Functional programming interface for parallel and distributed computing% 58 \thanks{Supported by Council for grants of the President of the Russian Federation (grant no.~\mbox{MK-383.2020.9}).}} 59 \author{% 60 Ivan Petriakov\orcidID{0000-0001-5835-9313}\and\\ 61 Ivan Gankevich\textsuperscript{*}\orcidID{0000-0001-7067-6928} 62 } 63 64 \titlerunning{Functional programming interface} 65 \authorrunning{I.\,Petriakov et al.} 66 67 \institute{Saint Petersburg State University\\ 68 7-9 Universitetskaya Emb., St Petersburg 199034, Russia\\ 69 \email{st049350@student.spbu.ru},\\ 70 \email{i.gankevich@spbu.ru},\\ 71 \url{https://spbu.ru/}} 72 73 \maketitle 74 75 \begin{abstract} 76 77 There are many programming frameworks and languages for parallel and 78 distributed computing which are successful both in industry and academia, 79 however, all of them are isolated and self-contained. We believe that the main 80 reason that there is no common denominator between them is that there is no 81 intermediate representation for distributed computations. For sequential 82 computations we have bytecode that is used as an intermediate, portable and 83 universal representation of a programme written in any language, but bytecode 84 lacks an ability to describe process 85 communication. If we add this feature, we get low-level representation on top of 86 which all the frameworks and languages for parallel and distributed 87 computations can be built. In this paper we explore how such intermediate 88 representation can be made, how it can reduce programming effort and how it may 89 simplify internal structure of existing frameworks. We also demonstrate how 90 high-level interface can be build for a functional language that completely hides 91 all the difficulties that a programmer encounters when he or she works with 92 distributed systems. 93 94 \keywords{% 95 \and API 96 \and intermediate representation 97 \and C++ 98 \and Guile. 99 } 100 \end{abstract} 101 102 \section{Introduction} 103 104 There are many programming frameworks and languages for parallel and 105 distributed 106 computing~\cite{spark2016,fault-tolerant-distributed-haskell,wilde2011swift,fetterly2009dryadlinq,pinho2014oopp} 107 which are successful both in industry and academia, however, all of them are 108 isolated and self-contained. We believe that the main reason that there is no 109 common denominator between these frameworks and languages is that there is no 110 common protocol or low-level representation for distributed computations. For sequential 111 computations we have bytecode (e.g.~LLVM~\cite{llvm}, Java bytecode, Guile 112 bytecode) that is used as an intermediate, portable and universal 113 representation of a programme written in any language; also we have assembler 114 which is non-portable, but still popular intermediate representation. One 115 important feature, that bytecode and assembler lack, is an ability to 116 communicate between parallel processes. This communication is the common 117 denominator on top of which all the frameworks and languages for parallel and 118 distributed computations can be built, however, there is no universal low-level 119 representation that describes communication. 120 121 Why common low-level representation exists for sequential computations, but does not 122 exist for parallel and distributed ones? One of the reasons, which applies to 123 both distributed and parallel computations, is the fact that people still think 124 about programmes as sequences of steps~--- the same way as people themselves 125 perform complex tasks. Imperative languages, in which programmes are written 126 as series of steps, are still prevalent in industry and academia; this is in 127 contrast to unpopular functional languages in which programmes are written as 128 compositions of functions with no implied order of computation. Another reason 129 which applies to distributed computations is the fact that these computations 130 are inherently unreliable and there is no universal approach for handling 131 cluster node failures. While imperative languages produce more efficient 132 programmes, they do not provide safety from deadlocks and fault tolerance 133 guarantees. Also, they are much more difficult to write, as a human have to 134 work with mutable state (local and global variables, objects etc.) and it is 135 difficult to keep this state in mind while writing the code. Functional 136 languages minimise the usage of mutable state, provide partial safety from 137 deadlocks (under the assumption that a programmer does not use locks manually) 138 and can be modified to provide fault tolerance. From the authors' perspective 139 people understand the potential of functional languages, but have not yet 140 realised this potential to get all their advantages; people realised the full 141 potential of imperative languages, but do not know how to get rid of their 142 disadvantages. 143 144 In this paper we describe low-level representation based on \emph{kernels} 145 which is suitable for distributed and parallel computations. Kernels provide 146 automatic fault tolerance and can be used to exchange the data between 147 programmes written in different languages. We implement kernels in C++ and 148 build a reference cluster scheduler that uses kernels as the protocol to run 149 applications that span multiple cluster nodes. Then we use kernels as an 150 intermediate representation for Guile programming language, run benchmarks 151 using the scheduler and compare the performance of different implementations of 152 the same programme. 153 154 To prevent ambiguity we use the term \emph{parallel} to describe computations 155 that use several processor cores of single cluster node, the term 156 \emph{distributed} to describe computations that use several cluster nodes and 157 any number of cores on each node, and term \emph{cluster} to describe anything 158 that refers to local cluster (as opposed to geographically distributed clusters 159 which are not studied in this paper). \emph{Intermediate representation} in 160 our paper is a particular form of abstract syntax tree, e.g.~in functional 161 languages \emph{continuation passing style} is popular intermediate 162 representation of the code. 163 164 %TODO \cite{lang-virt} 165 %\cite{fetterly2009dryadlinq} 166 %\cite{wilde2011swift} 167 %\cite{pinho2014oopp} 168 169 170 \section{Methods} 171 172 \subsection{Parallel and distributed computing technologies as components of 173 unified system} 174 175 In order to write parallel and distributed programmes the same way as we write 176 sequential programmes, we need the following components. 177 \begin{itemize} 178 \item Portable low-level representation of 179 the code and the data and includes means of decomposition of the code and the data into 180 parts that can be computed in parallel. The closest sequential 181 counterpart is LLVM. 182 \item Cluster scheduler that executes 183 parallel and distributed applications and uses the low-level representation to implement 184 communication between these applications running on different cluster nodes. 185 The closest single-node counterpart is operating system kernel that executes 186 user processes. 187 \item High-level interface that wraps the low-level representation for existing 188 popular programming languages in a form of a framework or a library. This interface 189 uses cluster scheduler, if it is available and node parallelism is needed by 190 the application, otherwise the code is executed on the local node and parallelism 191 is limited to the parallelism of the node. The closest 192 single-node counterpart is C library that provides an interface to system 193 calls of the operating system kernel. 194 \end{itemize} 195 These three components are built on top of each other as in classical object-oriented 196 programming approach, and all the complexity is pushed down to the lowest layer: 197 low-level representation is responsible for providing parallelism and fault tolerance to 198 the applications, cluster scheduler uses these facilities to provide transparent execution of 199 the applications on multiple cluster nodes, and high-level interface maps 200 the underlying system to the target language to simplify the work for 201 application programmers. 202 203 High-performance computing technologies have the same three-component 204 structure: message passing library (MPI) is widely considered a low-level 205 language of parallel computing, batch job schedulers are used to allocate 206 resources and high-level interface is some library that is built on top of MPI; 207 however, the responsibilities of the components are not clearly separated and 208 the hierarchical structure is not maintained. MPI provides means of 209 communication between parallel processes, but does not provide data 210 decomposition facilities and fault tolerance guarantees: data decomposition is 211 done either in high-level language or manually, and fault tolerance is provided 212 by batch job scheduler. Batch jobs schedulers provide means to allocate 213 resources (cluster nodes, processor cores, memory etc.) and launch parallel MPI 214 processes, but do not have control over messages that are sent between these 215 processes and do not control the actual number of resources used by the 216 programme (all resources are owned exclusively by the programme, and the programme decides 217 how to use them), i.e.~cluster 218 schedulers and MPI programmes do not talk to each other after the parallel 219 processes were launched. Consequently, high-level interface is also separated 220 from the scheduler. Although, high-level interface is built on top of the 221 low-level interface, batch job scheduler is fully integrated with neither of 222 them: the cluster-wide counterpart of operating system kernel does not have 223 control over communication of the applications that are run on the cluster, but 224 is used as resource allocator instead. 225 226 The situation in newer big data technologies is different: there are the same 227 three components with hierarchical structure, but the low-level representation is 228 integrated in the scheduler. There is YARN cluster 229 scheduler~\cite{vavilapalli2013yarn} with API that is used as a low-level 230 representation for parallel and distributed computing, and there are many high-level 231 libraries that are built on top of YARN~\cite{hadoop,oozie,spark2016,storm}. 232 The scheduler has more control over job execution as jobs are decomposed into 233 tasks and execution of tasks is controlled by the scheduler. Unfortunately, the 234 lack of common low-level representation made all high-level frameworks that are built 235 on top of YARN API use their own custom protocol for communication, shift 236 responsibility of providing fault tolerance to the scheduler and shift 237 responsibility of data decomposition to higher level frameworks. 238 239 To summarise, the current state-of-the-art technologies for parallel and 240 distributed computing can be divided into three classes: low-level representations, 241 cluster schedulers and high-level interfaces; however, responsibilities of each 242 class are not clearly separated by the developers of these technologies. 243 Although, the structure of the components resembles the operating system kernel 244 and its application interface, the components sometimes are not built on top of 245 each other, but integrated horizontally, and as a result the complexity of the 246 parallel and distributed computations is sometimes visible on the highest 247 levels of abstraction. 248 249 Our proposal is to design a low-level representation that provides fault 250 tolerance, means of data and code decomposition and means of communication for 251 parallel and distributed applications. Having such a representation at your disposal 252 makes it easy to build higher level components, because the complexity of 253 cluster systems is hidden from the programmer, the duplicated effort of 254 implementing the same facilities in higher level interfaces is reduced, and 255 cluster scheduler has full control of the programme execution as it speaks the 256 same protocol and uses the same low-level representation internally: the representation is 257 general enough to describe any distributed programme including the scheduler 258 itself. 259 260 \subsection{Kernels as objects that control the programme flow} 261 \label{sec-kernels} 262 263 In order to create low-level representation for parallel and distributed computing we 264 borrow familiar features from sequential low-level representations and augment them 265 with asynchronous function calls and an ability to read and write call stack 266 frames. 267 268 In assembler and LLVM the programme is written in imperative style as a series 269 of processor instructions. The variables are stored either on the stack (a 270 special area of the computer's main memory) or in processor registers. Logical 271 parts of the programme are represented by functions. A call to a function 272 places all function arguments on the stack and then jumps to the address of the 273 function. When the function returns, the result of the computation is written 274 to the processor register and control flow is returned to the calling function. 275 When the main function returns, the programme terminates. 276 277 There are two problems with the assembler that need to be solved in order for 278 it to be useful in parallel and distributed computing. First, the contents of 279 the stack can not be copied between cluster nodes or saved to and read from the 280 file, because they often contain pointers to memory blocks that may be invalid 281 on another cluster node or in the process that reads the stack from the file. 282 Second, there is no natural way to express parallelism in this language: all 283 function calls are synchronous and all instructions are executed in the 284 specified order. In order to solve these problems we use object-oriented 285 techniques. 286 287 We represent each stack frame with an object: local variables become object 288 fields, and each function call is decomposed into the code that goes before 289 function call, the code that performs function call, and the code that goes 290 after. The code that goes before the call is placed into \texttt{act} method of 291 the object and after this code the new object is created to call the function 292 asynchronously. The code that goes after the call is placed into \texttt{react} 293 method of the object, and this code is called asynchronously when the function 294 call returns (this method takes the corresponding object as an argument). The 295 object also has \texttt{read} and \texttt{write} methods that are used to read 296 and write its fields to and from file or to copy the object to another cluster 297 node. In this model each object contains enough information to perform the 298 corresponding function call, and we can make these calls in any order we like. 299 Also, the object is self-contained, and we can ask another cluster node to 300 perform the function call or save the object to disk to perform the call when 301 the user wants to resume the computation (e.g.~after the computer is upgraded 302 and rebooted). 303 304 The function calls are made asynchronous with help of thread pool. Each thread 305 pool consists of an object queue and an array of threads. When the object is 306 placed in the queue, one of the threads extracts it and calls \texttt{act} or 307 \texttt{react} method depending on the state of the object. There are two 308 states that are controlled by the programmer: when the state is 309 \textit{upstream} \texttt{act} method is called, when the state is 310 \textit{downstream} \texttt{react} method of the parent object is called with 311 the current object as the argument. When the state is \textit{downstream} and 312 there is no parent, the programme terminates. 313 314 We call these objects \emph{control flow objects} or \emph{kernels} for short. 315 These objects contain the input data in object fields, the code that processes 316 this data in object methods and the output data (the result of the computation) 317 also in object fields. The programmer decides which data is input and output. 318 To reduce network usage the programmer may delete input data when the kernel 319 enters \textit{downstream} state: that way only output data is copied back to 320 the parent kernel over the network. The example programme written using kernels 321 and using regular function call stack is shown in table~\ref{tab-call-stack}. 322 323 \begin{table} 324 \centering 325 \begin{minipage}[t]{0.39\textwidth} 326 \begin{lstlisting}[language=cpp] 327 int nested(int a) { 328 return 123 + a; 329 } 330 \end{lstlisting} 331 \end{minipage} 332 \begin{minipage}[t]{0.59\textwidth} 333 \begin{lstlisting}[language=cpp] 334 struct Nested: public Kernel { 335 int result; 336 int a; 337 Nested(int a): a(a) {} 338 void act() override { 339 result = a + 123; 340 LATEX{\color[HTML]{ac4040}\bf{}async\_return}END(); 341 } 342 }; 343 \end{lstlisting} 344 \end{minipage} 345 \begin{minipage}[t]{0.39\textwidth} 346 \begin{lstlisting}[language=cpp] 347 void main() { 348 // code before 349 int result = nested(); 350 // code after 351 print(result); 352 } 353 \end{lstlisting} 354 \end{minipage} 355 \begin{minipage}[t]{0.59\textwidth} 356 \begin{lstlisting}[language=cpp] 357 struct Main: public Kernel { 358 void act() override { 359 // code before 360 LATEX{\color[HTML]{ac4040}\bf{}async\_call}END(new Nested); 361 } 362 void react(Kernel* child) override { 363 int result = ((Nested*)child)->result; 364 // code after 365 print(result); 366 LATEX{\color[HTML]{ac4040}\bf{}async\_return}END(); 367 } 368 }; 369 370 void main() { 371 LATEX{\color[HTML]{ac4040}\bf{}async\_call}END(new Main); 372 wait(); 373 } 374 \end{lstlisting} 375 \end{minipage} 376 \caption{The same programme written using regular function call stack 377 (left) and kernels (right). Here \texttt{async\_call} performs asynchronous 378 function call by pushing the child kernel to the queue, \texttt{async\_return} 379 performs asynchronous return from the function call by pushing the current 380 kernel to the queue.\label{tab-call-stack}} 381 \end{table} 382 383 This low-level representation can be seen as an adaptation of classic function call 384 stack, but with asynchronous function calls and an ability to read and write 385 stack frames. These differences give kernels the following advantages. 386 \begin{itemize} 387 \item Kernels define dependencies between function calls, but do not define 388 the order of the computation. This gives natural way of expressing parallel 389 computations on the lowest possible level. 390 \item Kernels can be written to and read from any medium: files, network 391 connections, serial connections etc. This allows to implement fault 392 tolerance efficiently using any existing methods: in order to implement 393 checkpoints 394 a programmer no longer need to save memory contents of each parallel 395 process, only the fields of the main kernel are needed to restart 396 the programme from the last sequential step. However, with kernels 397 checkpoints can be replaced with simple restart: when the node 398 to which the child kernel was sent fails, the copy of this kernel 399 can be sent to another node without stopping the programme and no 400 additional configuration from the programmer. 401 \item Finally, kernels are simple enough to be used as an intermediate 402 representation for high-level languages: either via a compiler 403 modification, or via wrapper library that calls the low-level implementation 404 directly. Kernels can not replace LLVM or assembler, because their level of 405 abstraction is higher, therefore, compiler modification is possible 406 only for languages that use high-level intermediate representation 407 (e.g.~LISP-like languages and purely functional languages that have 408 natural way of expressing parallelism by computing arguments of 409 functions in parallel). 410 \end{itemize} 411 412 \subsection{Reference cluster scheduler based on kernels} 413 414 Kernels are general enough to write any programme, and the first programme that 415 we wrote using them was cluster scheduler that uses kernels to implement its 416 internal logic and to run applications spanning multiple cluster nodes. 417 Single-node version of the scheduler is as simple as thread pool attached to 418 kernel queue described in section~\ref{sec-kernels}. The programme starts with 419 pushing the first (or \emph{main}) kernel to the queue and ends when the main 420 kernel changes its state to \textit{downstream} and pushes itself to the queue. 421 The number of threads in the pool equals the number of processor cores, but can 422 be set manually to limit the amount of parallelism. Cluster version of the 423 scheduler is more involved and uses kernels to implement its logic. 424 425 Cluster scheduler runs in a separate daemon process on each cluster node, and 426 processes communicate with each other using kernels: process on node \(A\) 427 writes some kernel to network connection with node \(B\) and process on node 428 \(B\) reads the kernel and performs useful operation with it. Here kernels are 429 used like messages rather than stack frames: kernel that always resides in node 430 \(A\) creates child message kernel and sends it to the kernel that always 431 resides in node \(B\). In order to implement this logic we added 432 \textit{point-to-point} state and a field that specifies the identifier of the 433 target kernel. In addition to that we added source and destination address 434 fields to be able to route the kernel to the target cluster node and return it 435 back to the source node: \((\text{parent-kernel},\text{source-address})\) tuple 436 uniquely identifies the location of the parent kernel, and 437 \((\text{target-kernel},\text{destination-address})\) tuple uniquely identifies 438 the location of the target kernel. The first tuple is also used by 439 \textit{downstream} kernels that return back to their parents, but the second 440 tuple is used only by \textit{point-to-point} kernels. 441 442 There are several responsibilities of cluster scheduler: 443 \begin{itemize} 444 \item run applications in child processes, 445 \item route kernels between application processes running on different cluster nodes, 446 \item maintain a list of available cluster nodes. 447 \end{itemize} 448 In order to implement them we created a kernel queue and a thread pool for each 449 concern that the scheduler has to deal with (see~figure~\ref{fig-local-routing}): we have 450 \begin{itemize} 451 \item timer queue for scheduled and periodic tasks, 452 \item network queue for sending kernels to and receiving from other cluster nodes, 453 \item process queue for creating child processes and sending kernels to and receiving 454 from them, and 455 \item the main processor queue for processing kernels in parallel using multiple processor cores. 456 \end{itemize} 457 This separation of concerns allows us to overlap data transfer and data processing: 458 while the main queue processes kernels in parallel, process and network queues 459 send and receive other kernels. This approach leads to small amount of oversubscription 460 as separate threads are used to send and receive kernels, but benchmarks showed that 461 this is not a big problem as most of the time these threads wait for the operating 462 system kernel to transfer the data. 463 464 \begin{figure} 465 \centering 466 \tikzset{Rect/.style={text width=1.30cm,draw,align=center,thick,rounded corners}} 467 \begin{tikzpicture}[x=1.75cm,y=-1.25cm] 468 \node[Rect] (parallel) at (0,0) {Processor queue\strut}; 469 \node[Rect] (timer) at (1,0) {Timer queue\strut}; 470 \node[Rect] (disk) at (2,0) {Disk queue\strut}; 471 \node[Rect] (nic) at (3,0) {Network queue\strut}; 472 \node[Rect] (process) at (4,0) {Process queue\strut}; 473 \node[Rect] (cpu0) at (0,-1) {CPU 0\strut}; 474 \node[Rect] (cpu1) at (0,1) {CPU 1\strut}; 475 \node[Rect] (disk0) at (2,-1) {Disk 0\strut}; 476 \node[Rect] (disk1) at (2,1) {Disk 1\strut}; 477 \node[Rect] (timer0) at (1,-1) {Timer 0\strut}; 478 \node[Rect] (nic0) at (3,-1) {NIC 0\strut}; 479 \node[Rect] (nic1) at (3,1) {NIC 1\strut}; 480 \node[Rect] (parent) at (4,-1) {Parent\strut}; 481 \node[Rect] (child) at (4,1) {Child\strut}; 482 \path[draw,thick] (parallel) -- (cpu0); 483 \path[draw,thick] (parallel) -- (cpu1); 484 \path[draw,thick] (timer) -- (timer0); 485 \path[draw,thick] (disk) -- (disk0); 486 \path[draw,thick] (disk) -- (disk1); 487 \path[draw,thick] (nic) -- (nic0); 488 \path[draw,thick] (nic) -- (nic1); 489 \path[draw,thick] (process) -- (parent); 490 \path[draw,thick] (process) -- (child); 491 \end{tikzpicture} 492 \caption{Default kernel queues for each cluster scheduler concern.\label{fig-local-routing}} 493 \end{figure} 494 495 Cluster scheduler runs applications in child processes; this approach is 496 natural for UNIX-like operating systems as the parent process has full control 497 of its children: the amount of resources can be limited (the number of 498 processor cores, the amount of memory etc.) and the process can be terminated 499 at any time. After introducing child processes into the scheduler we added 500 cluster-wide source (target) application identifier field that uniquely 501 identifies the source (the target) application from which the kernel originated 502 (to which the kernel was sent). Also each kernel carries an application 503 descriptor that specifies how to run the application (command line arguments, 504 environment variables etc.) and if the corresponding process is not running, it is 505 launched automatically by the scheduler. Child processes are needed only as 506 means of controlling resources usage: a process is a scheduling unit for 507 operating system kernel, but in cluster scheduler a child process performs 508 something useful only when the kernel (which is a unit of scheduling in our 509 scheduler) is sent to the corresponding application and launched automatically 510 if there is no such application. Application spans multiple cluster nodes and 511 may have any number of child processes (but no more than one process per node). 512 These processes are launched on-demand and do nothing until the kernel is 513 received. This behaviour allows us to implement dynamic parallelism: we do not 514 need to specify the number of parallel processes on application launch, the 515 scheduler will automatically create them. To reduce memory consumption stale 516 processes, that have not received any kernel for a long period of time, may be 517 terminated (new processes will be created automatically, when the kernel arrives 518 anyway). Kernels can be sent from one application to another by specifying 519 different application descriptor. 520 521 Child process communicates with its parent using optimised child process queue. 522 If the parent process does not want to communicate, the child process continues 523 execution on the local node: the applications written using cluster scheduler 524 interface work correctly even when the scheduler is not available, but use 525 local node instead of the cluster. 526 527 Since the node may have multiple child processes, we may have a situation 528 when all of them try to use all processor cores, which will lead to 529 oversubscription and suboptimal performance. In order to solve this problem, we 530 introduce weight field which tells how many threads will be used by the kernel. 531 The default is one thread for ordinary kernels and nought threads for cluster 532 scheduler kernels. Process queue tracks the total weight of the kernels that 533 were sent to child processes and queues incoming kernels if the weight reaches 534 the number of processor cores. Also, each cluster node reports this information to 535 other nodes for better load balancing decisions. 536 537 The scheduler acts as a router for the kernels: when the kernel is received 538 from the application, the scheduler analyses its fields and decides to which 539 cluster node it can be sent. If the kernel has \textit{downstream} or 540 \textit{point-to-point} state, the kernel is sent to the node where the target 541 kernel resides; if the kernel has \textit{upstream} state, load balancing 542 algorithm decides which node to send the kernel to. Load balancing algorithm 543 tracks the total weight of the kernels that were sent to the specified node and 544 also receives the same information from the node (in case other nodes also send 545 there their kernels), then it chooses the node with the lowest weight and sends 546 the kernel to this node. If all nodes are full, the kernel is retained in the 547 queue until the enough processor cores become available. The algorithm is very 548 conservative and does not use work-stealing for improved performance, however, 549 the fault tolerance is easier to 550 implement~\cite{gankevich2016factory,gankevich2017subord} when the target and 551 the source node fields do not change during kernel lifetime which is not the 552 case for work-stealing scenario. 553 554 The last but not the least responsibility of the scheduler is to discover and 555 maintain a list of cluster nodes and establish persistent network connections 556 to neighbours. Cluster scheduler does this automatically by scanning the 557 network using efficient algorithm: the nodes in the network are organised in 558 artificial tree topology with the specified fan-out value and each node tries to 559 communicate with the nodes which are closer to the root of the tree. This 560 approach significantly reduces the number of data that needs to be sent over 561 the network to find all cluster nodes: in ideal case only one kernel is sent to 562 and received from the parent node. The algorithm is described 563 in~\cite{gankevich2015subord}. After the connections are established, all the 564 \textit{upstream} kernels that are received from the applications' child 565 processes are routed to neighbour nodes in the tree topology (both parent 566 and child nodes). This creates a problem because the number of nodes ``behind'' 567 the parent node is generally different than the number of nodes behind the 568 child nodes. In order to solve this problem we track not only the total weight 569 of all kernels of the neighbour node, but the total weight of each node in 570 the cluster and sum the weight of all nodes that are behind the node \(A\) to 571 compute the total weight of node \(A\) for load balancing. Also, we 572 apply load balancing recursively: when the kernel arrives at the node, load 573 balancing algorithm is executed once more to decide whether the kernel can be 574 sent locally or to another cluster node (except the source node). This approach 575 solves the problem, and now applications can be launched not only on the root 576 node, but on any node without load balancing problems. This approach adds small 577 overhead, as the kernel goes through intermediate node, but if the overhead is 578 undesirable, the application can be launched on the root node. Node discovery 579 and node state updates are implemented using \textit{point-to-point} kernels. 580 581 To summarise, cluster scheduler uses kernels as unit of scheduling and as 582 communication protocol between its daemon processes running on different 583 cluster nodes. Daemon process acts as an intermediary between application 584 processes running on different cluster nodes, and all application kernels are 585 sent through this process to other cluster nodes. Kernels that are sent through 586 the scheduler are heavy-weight: they have more fields than local kernels and 587 the routing through the scheduler introduces multiple overheads compared to 588 direct communication. However, using cluster scheduler hugely simplifies 589 application development, as application developer does not need to worry about 590 networking, fault tolerance, load balancing and ``how many parallel processes 591 are enough for my application'': this is now handled by the scheduler. For 592 maximum efficiency and embedded applications the application can be linked 593 directly to the scheduler to be able to run in the same daemon process, that 594 way application kernels are no longer sent though daemon process and the 595 overhead of the scheduler is minimal. 596 597 \subsection{Parallel and distributed evaluation of Guile expressions using kernels} 598 599 Kernels low-level interface and cluster scheduler are written in C++ language. 600 From the authors' perspective C is too low-level and Java has too much overhead 601 for cluster computing, whereas C++ is the middleground choice. The 602 implementation is the direct mapping of the ideas discussed in previous 603 sections on C++ abstractions: kernel is a base class 604 (see~listing~\ref{lst-kernel-api}) for all control flow 605 objects with common fields (\texttt{parent}, \texttt{target} and all others) 606 and \texttt{act}, \texttt{react}, \texttt{read}, \texttt{write} virtual 607 functions that are overridden in subclasses. This direct mapping is natural for 608 a mixed-paradigm language like C++, but functional languages may benefit from 609 implementing the same ideas in the compiler or interpreter. 610 611 \begin{lstlisting}[language=cpp,% 612 caption={Public interface of the kernel and the queue classes in C++ (simplified for clarity).},% 613 captionpos=b, 614 label={lst-kernel-api}] 615 enum class states {upstream, downstream, point_to_point}; 616 617 class kernel { 618 public: 619 virtual void act(); 620 virtual void react(kernel* child); 621 virtual void write(buffer& out) const; 622 virtual void read(buffer& in); 623 kernel* parent = nullptr; 624 kernel* target = nullptr; 625 states state = states::upstream; 626 }; 627 628 class queue { 629 public: 630 void push(kernel* k); 631 }; 632 \end{lstlisting} 633 634 We made a reference implementation of kernels for Guile 635 language~\cite{galassi2002guile}. Guile is a dialect of 636 Scheme~\cite{sussman1998} which in turn is a dialect of 637 LISP~\cite{mccarthy1960}. The distinct feature of LISP-like languages is 638 homoiconicity, i.e.~the code and the data is represented by tree-like 639 structure (lists that contain atoms or other lists as elements). This feature makes 640 it possible to express parallelism directly in the language: every list element 641 can be computed independently and it can be sent to other cluster nodes for 642 parallel computation. To implement parallelism we created a Guile interpreter 643 that evaluates every list element in parallel using kernels. In practice this 644 means that every argument of a procedure call (a procedure call is also a list 645 with the first element being the procedure name) is computed in parallel. This 646 interpreter is able to run any existing Guile programme (provided that it does 647 not use threads, locks and semaphores explicitly) and the output will be the 648 same as with the original interpreter, the programme will automatically use 649 cluster nodes for parallel computations, and fault tolerance will be 650 automatically provided by our cluster scheduler. From the authors' perspective 651 this approach is the most transparent and safe way of writing parallel and 652 distributed programmes with clear separation of concerns: the programmer takes 653 care of the application logic, and cluster scheduler takes care of the 654 parallelism, load balancing and fault tolerance. 655 656 Our interpreter consists of standard \textit{read-eval-print} loop out of which 657 only \textit{eval} step uses kernels for parallel and distributed computations. 658 Inside \textit{eval} we use hybrid approach for parallelism: we use kernels to 659 evaluate arguments of procedure calls and arguments of \texttt{cons} primitive 660 asynchronously only if these arguments contain other procedure calls. This 661 means that all simple arguments (variables, symbols, other primitives etc.) are 662 computed sequentially without creating child kernels. 663 664 Evaluating procedure calls and \texttt{cons} using kernels is enough to make 665 \texttt{map} form parallel, but we had to rewrite \texttt{fold} form to make it 666 parallel. Our parallelism is based on the fact that procedure arguments can be 667 evaluated in parallel without affecting the correctness of the procedure, 668 however, evaluating arguments in parallel in \texttt{fold} does not give 669 speedup because of the nested \texttt{fold} and \texttt{proc} calls: the next 670 recursive call to \texttt{fold} waits until the call to \texttt{proc} 671 completes. Alternative procedure \texttt{fold-pairwise} does not have this 672 deficiency, but is only correct for \texttt{proc} that does not care about the 673 order of the arguments (\texttt{+}, \texttt{*} operators etc.). In this 674 procedure we apply \texttt{proc} to successive pairs of elements from the 675 initial list, after that we recursively call \texttt{fold-pairwise} for the 676 resulting list. The iteration is continued until only one element is left in 677 the list, then we return this element as the result of the procedure call. This 678 new procedure is also iterative, but parallel inside each iteration. We choose 679 \texttt{map} and \texttt{fold} forms to illustrate automatic parallelism 680 because many other forms are based on them~\cite{hutton-fold}. Our 681 implementation is shown in listing~\ref{lst-parallel-forms}. 682 683 \begin{lstlisting}[language=Scheme,% 684 caption={Parallel \texttt{map} and \texttt{fold} forms in Guile.}, 685 captionpos=b, 686 label={lst-parallel-forms}] 687 (define (map proc lst) "Parallel map." 688 (if (null? lst) lst 689 (cons (proc (car lst)) (map proc (cdr lst))))) 690 (define (fold proc init lst) "Sequential fold." 691 (if (null? lst) init 692 (fold proc (proc (car lst) init) (cdr lst)))) 693 (define (do-fold-pairwise proc lst) 694 (if (null? lst) '() 695 (if (null? (cdr lst)) lst 696 (do-fold-pairwise proc 697 (cons (proc (car lst) (car (cdr lst))) 698 (do-fold-pairwise proc (cdr (cdr lst)))))))) 699 (define (fold-pairwise proc lst) "Parallel pairwise fold." 700 (car (do-fold-pairwise proc lst))) 701 \end{lstlisting} 702 703 \section{Results} 704 705 We tested performance of our interpreter using the forms in 706 listing~\ref{lst-parallel-forms}. For each form we applied synthetic procedure 707 that sleeps 200 milliseconds to the list with 96 elements. Then we ran the 708 resulting script using native Guile interpreter and our interpreter and 709 measured total running time for different number of threads. For native Guile 710 interpreter the running time of all forms is the same for any number of 711 threads. For our interpreter \texttt{map} and \texttt{fold-pairwise} forms run 712 time decreases with the number of threads and for \texttt{fold} form run time 713 stays the same (figure~\ref{fig-results}). 714 715 \begin{figure} 716 \centering 717 \includegraphics{build/gnuplot/results.eps} 718 \caption{The run time of the forms from listing~\ref{lst-parallel-forms} 719 for different number of parallel threads and different interpreters.\label{fig-results}} 720 \end{figure} 721 722 723 724 725 %In order to test performance of our interpreter we used a programme that 726 %processes frequency-directional spectra of ocean waves from NDBC 727 %dataset~\cite{ndbc-web-data-guide,ndbc-techreport}. Each spectrum consists of 728 %five variables, each of which is stored in a separate file in a form of time 729 %series. First, we find five files that correspond to the same station where 730 %the data was collected and the same year. Then we merge the corresponding 731 %records from these files into single vector-valued time series. Incomplete 732 %groups of files and incomplete records are removed. After that we write the 733 %resulting groups to disk. We wrote this programme in C++ with kernels, 734 %in Guile with kernels and in Guile without kernels. 735 736 \section{Discussion} 737 738 Computing procedure arguments in parallel is a natural way of expressing 739 parallelism in functional language, and in our tests the performance of the 740 programme is close to the one with manual parallelism. Lower performance is 741 explained by the fact that we introduce more overhead by using asynchronous 742 kernels to compute procedure arguments where this does not give large 743 performance gains (even with ideal parallelism with no overhead). If we remove 744 these overheads we will get the same time as the original programme with manual 745 parallelism. This is explained by the fact that the main loop of the programme 746 is written as an application of \texttt{map} form and our interpreter makes it 747 parallel. Executing this loop in parallel gives the largest performance gains 748 compared to other parts of the programme. We expect that the difference between 749 automatic and manual parallelism to be more visible in larger and more complex 750 programmes, and in future plan to benchmark more algorithms with known parallel 751 implementations. 752 753 754 \section{Conclusion} 755 756 Using procedure arguments to define parallel programme parts gives new 757 perspective on writing parallel programmes. In imperative languages programmers 758 are used to rearranging loops to optimise memory access patterns and help the 759 compiler vectorise the code, but with parallel-arguments approach in functional 760 languages they can rearrange the forms to help the interpreter to extract more 761 parallelism. This parallelism is automatic and does not affect the correctness 762 of the programme (of course, you need to serialise access and modification of 763 the global variables). With help of kernels these parallel computations are 764 extended to distributed computations. Kernels provide standard way of 765 expressing parallel and distributed programme parts, automatic fault tolerance 766 for master and worker nodes and automatic load balancing via cluster scheduler. 767 Together kernels and arguments-based parallelism provide low- and high-level 768 programming interface for clusters and single nodes that conveniently hide 769 the shortcomings of parallel and distributed computations allowing the 770 programmer to focus on the actual problem being solved rather than fixing 771 bugs in his or her parallel and distributed code. 772 773 Future work is to re-implement LISP language features that are relevant for 774 parallelism in a form of C++ library and use this library for parallelism, but 775 implement the actual computations in C++. This would allow to improve 776 performance of purely functional programmes by using the functional language 777 for parallelism and imperative language for performance-critical code. 778 779 \subsubsection*{Acknowledgements.} 780 Research work is supported by Council for grants of the President of the 781 Russian Federation (grant no.~MK-383.2020.9). 782 783 \bibliographystyle{splncs04} 784 \bibliography{main.bib} 785 786 \end{document}