10. Parallelism using Actors

In this section, we’re going to take a look at the Java vs. Scala way of doing things. We’ll look at a guiding example that is focused on concurrent/parallel computing. This example appeared in High Performance Java Platform Computing by Thomas W. Christopher and George K. Thiruvathukal. We’ll show how to organize a previously worked out solution that uses more explicit concurrency mechanisms from Java and how it can be reworked into a side-effect free Scala version by taking advantage of Scala’s innate support for basic actor-style parallelism.

10.1. Guiding Example: Longest Common Subsequence

A longest common subsequence (LCS) of two strings is a longest sequence of characters that occurs in order in the two strings. It differs from the longest common substring in that the characters in the longest common subsequence need not be contiguous. There may, of course, be more than one LCS, since there may be several subsequences with the same length.

There is a folk algorithm to find the length of the LCS of two strings. The algorithm uses a form of dynamic programming. In divide-and-conquer algorithms, recall that the overall problem is broken into parts, the parts are solved individually, and the solutions are assembled into a solution to the overall problem. Dynamic programming is similar, except that the best way to divide the overall problem into parts is not known before the subproblems are solved, so dynamic programming solves all subproblems and then finds the best way to assemble them.

The algorithm works as follows: Let the two strings be c0 and c1. Create a two-dimensional array a:

int [][] a=new int[c0.length()+1] [c1.length()+1]

Initialize a[i][0] to 0 for all i and a[0][j] to 0 for all j, since there are no characters in an empty substring. The other elements, a[i][j] , are filled in as follows:

for (int i=0; i <= c0.length(); i++)
   a[i][0] = 0;

for (int j=0; j <= c1.length(); j++)
   a[0][j] = 0;

We will fill in the array so that a[i][j] is the length of the LCS of c0.substring(0,i) and c1.substring(0,j). Recall that s.substring(m,n) is the substring of s from position m up to, but not including, position n.:

for (i=1; i <= c0.length(); i++)
   for (j=1; j <= c1.length(); j++)
      if (c0.charAt(i-1) == c1.charAt(j-1))
         a[i][j]=a[i-1][j-1]+1;
      else
         a[i][j]=Math.max(a[i][j-1],a[i-1][j]);

The above shows a traditional imperative solution that constructs a result matrix comprising the results of the LCS.

So how exactly does this method work?

Element a[i-1][j-1] has the length of the LCS of string c0.substring(0,i-1) and c1.substring(0,j-1). If elements c0.charAt(i-1) and c1.charAt(j-1) are found to be equal, then the LCS can be extended by one to length a[i-1] [j-1]+1. If these characters don’t match, then what? In that case, we ignore the last character in one or the other of the strings. The LCS is either a[i][j-1] or a[i-1][j], representing the maximum length of the LCS for all but the last character of c1.substring(0,j-1) or c0.substring(0,i-1), respectively.

The chore graph from [HPJPC] for calculation of the LCS is shown in the following figure.

Systolic Array

Any order of calculation that is consistent with the dependencies is permissible. Two are fairly obvious: (1) by rows, top to bottom, and (2) by columns, left to right.

Another possibility is along diagonals. All a[i][j], where i+j==m can be calculated at the same time, for m stepping from 2 to c0.length()+c1.length(). . Visualizing waves of computation passing across arrays is a good technique for designing parallel array algorithms. It has been researched under the names systolic arrays and wavefront arrays [Wavefront].

The following figure shows how a wavefront computation progresses.

Wavefront Illustration
[Wavefront]
    1. Kung, C. E. Leiserson: Algorithms for VLSI processor arrays; in: C. Mead, L. Conway (eds.): Introduction to VLSI Systems; Addison-Wesley, 1979
[HPJPC]Thomas W. Christopher and George K. Thiruvathukal, High Performance Java Platform Computing, Prentice Hall PTR and Sun Microsystems Press, 2000.

10.2. Java Threads Implementation

Bitbucket ZIP File
https://bitbucket.org/loyolachicagocs_books/hpjpc-source-java/get/default.zip
Bitbucket via Mercurial
hg clone https://bitbucket.org/loyolachicagocs_books/hpjpc-source-java
Instructions Build and Run Instructions
https://bitbucket.org/loyolachicagocs_books/hpjpc-source-java

Our Java implementation (see LCS.java) of the LCS algorithm divides the array into vertical bands and is pictured in Each band is filled in row by row from top to bottom. Each band (except the leftmost) must wait for the band to its left to fill in the last element of a row before it can start can start filling in that row. This is an instance of the producer-consumer releationship.

The following figure shows how our Java solution organizes the work in bands:

Systolic Array/Wavefront Illustration

LCS class

1
2
3
4
5
   int numThreads;
   char[] c0;
   char[] c1;
   int[][] a;
   Accumulator done;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   public LCS(char[] c0, char[] c1, int numThreads) {
      this.numThreads = numThreads;
      this.c0 = c0;
      this.c1 = c1;
      int i;
      done = new Accumulator(numThreads);

      a = new int[c0.length + 1][c1.length + 1];

      Semaphore left = new Semaphore(c0.length), right;
      for (i = 0; i < numThreads; i++) {
         right = new Semaphore();
         new Band(startOfBand(i, numThreads, c1.length), startOfBand(i + 1,
               numThreads, c1.length) - 1, left, right).start();
         left = right;
      }
   }
1
2
3
   public LCS(String s0, String s1, int numThreads) {
      this(s0.toCharArray(), s1.toCharArray(), numThreads);
   }
1
2
3
   int startOfBand(int i, int nb, int N) {
      return 1 + i * (N / nb) + Math.min(i, N % nb);
   }
1
2
3
4
5
6
7
   public int getLength() {
      try {
         done.getFuture().getValue();
      } catch (InterruptedException ex) {
      }
      return a[c0.length][c1.length];
   }

Band internal class (does the work)

1
2
3
      int low;
      int high;
      Semaphore left, right;
1
2
3
4
5
6
      Band(int low, int high, Semaphore left, Semaphore right) {
         this.low = low;
         this.high = high;
         this.left = left;
         this.right = right;
      }

Actual Runnable body

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
      public void run() {
         try {
            int i, j;
            for (i = 1; i < a.length; i++) {
               left.down();
               for (j = low; j <= high; j++) {
                  if (c0[i - 1] == c1[j - 1])
                     a[i][j] = a[i - 1][j - 1] + 1;
                  else
                     a[i][j] = Math.max(a[i - 1][j], a[i][j - 1]);
               }
               right.up();
            }
            done.signal();
         } catch (InterruptedException ex) {
         }
      }

Main

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
      public static void main(String[] args) {
         if (args.length < 2) {
            System.out.println("Usage: java LCS$Test1 string0 string1");
            System.exit(0);
         }
         int nt = 3;
         String s0 = args[0];
         String s1 = args[1];
         System.out.println(s0);
         System.out.println(s1);
         long t0 = System.currentTimeMillis();
         LCS w = new LCS(s0, s1, nt);
         long t1 = System.currentTimeMillis() - t0;
         System.out.println(w.getLength());
         System.out.println("Elapsed time " + t1 + " milliseconds");
      }

10.3. Scala Actors Implementation

Bitbucket ZIP File
https://bitbucket.org/loyolachicagocs_plsystems/lcs-systolicarray-scala/get/default.zip
Bitbucket via Mercurial
hg clone https://bitbucket.org/loyolachicagocs_plsystems/lcs-systolicarray-scala
Instructions Build and Run Instructions
https://bitbucket.org/loyolachicagocs_plsystems/lcs-systolicarray-scala

Trait

1
2
3
4
5
6
trait SystolicArray[T] {
  def start(): Unit
  def put(v: T): Unit
  def take(): T
  def stop(): Unit
}

The entire SystolicArray implementation is here:

1
2
3
4
5
6
trait SystolicArray[T] {
  def start(): Unit
  def put(v: T): Unit
  def take(): T
  def stop(): Unit
}

Logging

// begin-object-logger
1
2
3
4
5
6
  private object logger {
    private val DEBUG = false
    // use call-by-name to ensure the argument is evaluated on demand only
    def debug(msg: => String) { if (DEBUG) println("debug: " + msg) }
    // add other log levels as needed
  }

Apply

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  def apply[T](rows: Int, cols: Int, f: Acc[T]): SystolicArray[T] = {
    require { 0 < rows }
    require { 0 < cols }
    val result = new SyncVar[T]
    lazy val a: LazyArray[T] = Stream.tabulate(rows, cols) {
      (i, j) => new Cell(i, j, rows, cols, a, f, result)
    }
    val root = a(0)(0)
    new SystolicArray[T] {
      override def start() = root.start()
      override def put(v: T) { root ! ((-1, -1) -> v) }
      override def take() = result.take()
      override def stop() { root ! Stop }
    }
  }

The internal Cell class, used to represent the cells of the Systolic Array (generally).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  protected class Cell[T](row: Int, col: Int, rows: Int, cols: Int, a: => LazyArray[T],
      f: Acc[T], result: SyncVar[T]) extends Actor { self =>

    require { 0 <= row && row < rows }
    require { 0 <= col && col < cols }

    logger.debug("creating (" + row + ", " + col + ")")

    override def act() {
      logger.debug("starting (" + row + ", " + col + ")")
      var start = true
      loop {
        logger.debug("waiting  (" + row + ", " + col + ")")
        barrier(if (row == 0 || col == 0) 1 else 3) { ms =>
          if (start) { startNeighbors() ; start = false }
          propagate(ms)
        }
        // one-way message: anything below here is skipped!
      }
    }

    protected def barrier(n: Int)(f: Map[Pos, T] => Unit): Unit =
      barrier1(n)(f)(Map.empty)

    protected def barrier1(n: Int)(f: Map[Pos, T] => Unit)(ms: Map[Pos, T]): Unit = {
      if (n <= 0)
        f(ms)
      else
        react {
          case Stop => stopNeighbors() ; exit()
          case (p: Pos, v: T) => barrier1(n - 1)(f)(ms + (p -> v))
        }
      // one-way message: anything after react is skipped!
    }

    protected def applyToNeighbors(f: Cell[T] => Unit) {
      if (row < rows - 1)                   f(a(row + 1)(col    ))
      if (col < cols - 1)                   f(a(row    )(col + 1))
      if (row < rows - 1 && col < cols - 1) f(a(row + 1)(col + 1))
    }

    protected def startNeighbors() { applyToNeighbors { _.start() } }

    protected def propagate(ms: Map[Pos, T]) {
      val r = f((row, col), ms)
      val m = (row, col) -> r
      logger.debug("firing   " + m)
      if (row < rows - 1)                     a(row + 1)(col    ) ! m
      if (col < cols - 1)                     a(row    )(col + 1) ! m
      if (row < rows - 1 && col < cols - 1)   a(row + 1)(col + 1) ! m
      if (row >= rows - 1 && col >= cols - 1) result.put(r)
    }

    protected def stopNeighbors() { applyToNeighbors { _ ! Stop } }
  }

This is used for autowiring the quadrant from where messages are being fired (from). It is an example of how Scala can help us avoid making mistakes. In scientific computations, subscript problems are common.

1
2
3
4
5
  implicit class Helper[T](ms: Map[Pos, T]) {
    def north    (implicit current: (Pos, T)): T = ms.get((current._1._1 - 1, current._1._2    )).getOrElse(current._2)
    def west     (implicit current: (Pos, T)): T = ms.get((current._1._1    , current._1._2 - 1)).getOrElse(current._2)
    def northwest(implicit current: (Pos, T)): T = ms.get((current._1._1 - 1, current._1._2 - 1)).getOrElse(current._2)
  }

This is used to autowire the left and top edges of the array. Although easy enough to check, it can be difficult to remember which subscript is row or column. Scala again makes this very easy for us. As we’ll see, it also helps to make the user function self-documenting (literate).

1
2
3
4
5
  implicit class PosHelper(p: Pos) {
    def north = p._1 - 1
    def west = p._2 - 1
    def isOnEdge = p._1 == 0 || p._2 == 0
  }

Wrapping it up with object lcs...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
object lcs {
  import SystolicArray._

  def f(c0: String, c1: String)(p: Pos, ms: Map[Pos, Int]) = {
    implicit val currentPosAndDefaultValue = (p, 0)
    if (p.isOnEdge)
      0
    else if (c0(p.north) == c1(p.west))
      ms.northwest + 1
    else
      math.max(ms.west, ms.north)
  }

  def apply(c0: String, c1: String): Int = {
    val root = SystolicArray(c0.length + 1, c1.length + 1, f(c0, c1))
    root.start()
    root.put(1)
    root.take
  }
}

Setting up the text fixtures...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object Fixtures {
  import SystolicArray._

  val c0 = "Now is the time for all great women to come to the aid of their country"

  val c1 = "Now all great women will come to the aid of their country"

  val f1 = (p: Pos, ms: Map[Pos, Int]) => ms.values.sum

  val f2 = (p: Pos, ms: Map[Pos, Int]) => {
    implicit val currentPosAndDefaultValue = (p, 0)
    ms.north + ms.northwest + ms.west
  }

  val f3 = lcs.f(c0, c1) _

  // "bare-metal" version of lcs, does not run significantly faster
  val f4 = (p: Pos, ms: Map[Pos, Int]) => {
    if (p._1 == 0 || p._2 == 0)
      0
    else if (c0(p._1 - 1) == c1(p._2 - 1))
      ms.get((p._1 - 1, p._2 - 1)).getOrElse(0) + 1
    else
      math.max(
        ms.get((p._1 - 1, p._2)).getOrElse(0),
        ms.get((p._1, p._2 - 1)).getOrElse(0))
  }
}

Testing...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class Tests {

  @Test def testSum() {
    val root = SystolicArray(3, 3, f1)
    root.start()
    root.put(1)
    assertEquals(13, root.take())
  }

  @Test def testSample() {
    assertEquals(53, lcs(c0, c1))
  }
}