Wednesday, December 22, 2010

Scala Pros and Cons

Some pros and cons of the Scala programming language as compared to Java from a business management perspective.

Contents

Introduction

I have been promoting Scala for quite some time now, and I have encouraged all of the developers in my office to try it out, hoping that at some point we can start using it for new projects.

It was suggested to me that I put together a list of the pros and cons of Scala to help other people in the company evaluate whether they should consider using it for their projects. After pondering this I realized that this would likely be of interest to a wider audience than just at my company, and that it would make a good topic for my blog. Thus this post in which I list what I believe to be the most important factors that a company that wants to stay on the JVM platform should consider when deciding whether or not to move from Java to Scala.

Because this comparison is specifically Scala versus Java, I do not include the fact that Scala runs on the JVM, has performance on par with Java, interoperates easily with Java and after compiling is essentially indistinguihsable from Java. While these might be benefits for some companies when comparing Scala against other language such as Ruby or Python or other JVM languages, they do not distinguish Scala from Java.

The intended audience for this post is not the developers who might be using Scala, but the technical managers who are involved in the decision about what technology to use. I don't discuss Scala's cool features such as closures and continuations, easy interoperation with legacy Java code, traits with code, or a sophisticated type system with type inference, but focus instead on the business-level questions of how using Scala rather than Java might affect the success of a project, a team, or the company.

My company has not yet produced a product using Scala, so my experience is limited to some prototypes at the office and some personal work at home. The points below are based on that experience plus my interpretation of comments from others that I have read in the last couple of years. Everyone's viewpoint differs, so you may disagree with my statements or my evaluation of their importance. Similarly, the specifics of your situation may make your experience with Scala significantly different than mine. It is easy to find both praise and dismissal of Scala on the 'net. As they say, YMMV.

Pros

Higher Productivity

Studies and anecdotal evidence indicate that Scala programs are from 1/2 to 1/10 the number of lines of code as compared to a functionally equivalent Java program The larger the application, the more apparent this difference becomes. If you believe, as I do, that a given programmer can produce roughly the same number of lines of code per day independent of the language used, you can see how this reduction of lines of code can translate into a substantial increase in productivity and a faster time-to-market.

Higher Quality

Scala encourages a functional programming style, which in turn leads to code with fewer bugs. Also, fewer lines of code means fewer bugs. Fewer bugs means higher productivity and a higher quality product.

Scala's Actor library and its encouragement of immutable variables help to avoid race conditions among multiple threads and thus improve reliability in concurrent applications.

Better Developers

Programmers who learn Scala pick up programming practices such as functional programming that they can (at least to some extent) apply to other languages, such as Java. This makes them better Java programmers. Providing an environment in which developers can learn something new should also make for a more positive environment for those developers, so they will be happier with their jobs and less likely to leave the company. Likewise, a shop that uses a new and advanced language such as Scala is more likely to attract top-notch developers (which Paul Graham has named as "The Python Paradox").

Rapidly Improving Ecosystem

The early adopters of Scala such as Twitter are well down that road. Here at the end of 2010 it is tempting to say that the early adopter window is closed and anyone who starts with Scala in 2011 or later is no longer in that category.

Some of the cons listed below may never change, or change very slowly, but at least the tools issue will certainly improve. The rate of improvement of the Scala ecosystem is much faster than most other languages at the moment. The IDE tools are improving, the testing tools are improving, the documentation is improving, and the functionality of available libraries is improving. All of these improvements will continue to make the Scala ecosystem more attractive over time. If you evaluate the current situation and find it close to satisfactory, and the problem areas are the ones that will improve with time, then you probably won't have to wait very long for that to happen.

Cons

Learning Curve

Scala incorporates features and concepts that are not familiar to many programmers, such as functional programming and continuations. It can take some time to learn these concepts. Some people say the Scala syntax is more complicated than Java, but in fact the language spec for Scala is significantly smaller (191 pages) than the language spec for Java (684 pages). The difference is that Scala is much more regular and allows things to be combined in ways that can't be done in Java, which provides more power and expressiveness but also can take some time to learn.

I have seen comments to the effect that it could take about two months for a Java developer to come up to speed on Scala, although a developer who already has experience using other languages with functional constructs such as Python or Ruby should be able to pick it up faster.

Limited Developer Pool

There are far fewer developers who already know Scala than who already know Java, which means it will be more difficult if you need to staff up quickly and don't want to spend any time training.

Because of the more advanced concepts embodied in Scala, or at least concepts which are different from the concepts that many Java developers have internalized, it may be possible that some Java developers will have a difficult time coming up to speed on Scala. This would effectively decrease the overall pool of available developers even if you are willing to spend the time to train someone on Scala.

Until a much larger number of current Java developers try to learn Scala it will be difficult to know how much of a problem this will be. One possibility is that many current Java developers will not want to put the effort into learning the new concepts required to use Scala effectively, and that a large chunk of the growth in the pool of Scala developers will come from new developers who were educated with those concepts from the start.

Better Developers

Wait, wasn't this listed as a pro for Scala? Yes, this one is a two-edged sword. Even after accepting the Learning Curve and the current Limited Developer Pool, there is a possibility that not every Java developer has what it takes to become an effective Scala developer. Scala provides tools that allow highly competent developers to write very concise code, and if you give them those tools they will use them. Developers who are less competent may then have trouble understanding that code. Java suffers less from this particular problem because it doesn't provide the kinds of high-level constructs (such as higher-order functions and continuations) that can cause difficulty for less capable developers. If you take the Scala road and hire highly capable developers, you may be making a commitment to continue to hire only highly capable developers for some time.

You should ask yourself this question: would your company culture allow you to hire and keep a developer who is twice as expensive and ten times as productive? If your answer is "probably not", then you should stick with Java where you can hire those developers who are half as expensive and one-tenth as productive.

Limited Commercial Support

If you have a large Java project and you find you quickly need some additional developers, you can call a consulting company such as Accenture or Cognizant and they can throw an army of Java developers your way. If your project is written in Scala, I suspect they will happily tell you they can provide you with those resources, but they might have more difficulty doing that due to the above-mentioned Limited Developer Pool.

You could probably post a request to the Scala or Lift mailing lists and find some contractors with Scala experience to help you out, but there is not yet much in the way of explicit commercial support for Scala.

The recently founded company Scala Solutions is the first I am aware of that is advertising support for Scala in mission-critical applications. Large companies considering Scala may not be comfortable without having more choice here.

Tool Immaturity

The development tools for Scala are not as advanced as for Java. In particular, the IDE plugins are not yet as sophisticated, so developers who use IDEs may encounter some frustration. On the other hand, this issue should be addressed by Scala's Rapidly Improving Ecosystem.

Former Cons

While browsing the web you may run across people complaining about the items listed in this section. As I note for each item, as of this posting at the end of 2010 I believe these issues have been resolved.

Risk of Breaking Changes

As a new and evolving language, Scala has gone through some breaking changes a few times as new versions have been released. This has caused a lot of pain for some people, a number of whom have declared in their blog posts that this demonstrates that Scala is just an academic language and not ready for the real world. I believe that, while there still may be some changes to be made, with the release of version 2.8 the language has reached a level of stability that makes it suitable for production use.

Risk of Abandonment

Scala is a relatively new language, and some people feel that it may not "make it" and survive as an ongoing language, in which case any code written in Scala would become hard to maintain. I believe this risk is now relatively small, given how it has been growing over the last few years and the number of big names who have adopted Scala (see the list below).

Limited Documentation

While this may still be an issue for non-English teams, I believe there are now enough Scala books in English that this is no longer an issue for anyone who reads English. The Books on Scala page lists 19 books about Scala and Lift, including five Scala books in English and five in other languages (some of which are translations of the English versions) that are currently available.

References

Web Sites using Scala

The Scala web site has a page on Scala in the Enterprise, last updated a couple of months ago (as of this posting), listing quite a few companies that are using Scala.

Here are a few well-known web sites using Scala or Scala/Lift and some tech articles about them:

Other Posts

There are a fair number of posts listing technical pros and cons of Scala, or comparing Scala to various other languages such as Clojure or the others mentioned above, but it is more difficult to find commentary that is more specifically directed at the business or management questions. Below are links to some posts that, while often containing some technical content, also contain at least some more general comments. The posts are listed from newest to oldest. Originally posted 2010-12-22.
Updated 2010-12-22: removed "Higher Performance" item per James Iry's comment, fixed Foursquare per harryh's comment.

Tuesday, November 30, 2010

Nondeterministic Evaluation in Scala

Scala continuations can be used to implement nondeterministic evaluation.

Contents

Background

Nondeterministic evaluation is an approach in which choices among multiple alternatives are factored out of an application so as to allow writing the application as if all alternatives were simultaneously being considered. Factoring the application in this way allows using different strategies for searching the space of available choices with no or only small changes to the application code.

A truly nondeterministic evaluation would potentially give different results (possibly the same set of results in a different order) each time it is run. In the simplest implementations, the evaluation is actually deterministic and does give the same results every time it is run. A different way of interpreting the phrase "nondeterministic evaluation" is to think of it from the point of view of the application built on top of such an evaluator: the application is written as if the evaluation were nondeterministic, and the decision as to whether the implementation is deterministic or not is entirely within the evaluation package. In other words, the application is not determining the order of evaluation among alternatives, therefore from its perspective the evaluation is nondeterministic. If at some later time the evaluator is replaced by one with the same API but which is truly nondeterministic, the correctly-written application will continue to deliver valid results.

One of the benefits of a nondeterministic evaluation environment is that an application using that environment can (with some assumptions about the application's avoidance of side-effects) transparently be run in a multi-thread, multi-processor or multi-host version of that environment so as to parallelize the computation. Development of such an application can be done on a single-processor workstation using a small dataset, then moved to a more powerful environment for use on the real problem with a much larger dataset.

One simple model of nondeterministic evaluation is the amb evaluator discussed in the MIT "Wizard Book", Structure and Interpretation of Computer Programs (SICP) in Section 4.3. The amb operator was introduced by John McCarthy in his 1963 paper A Basis For a Mathematical Theory of Computation.

Goal

Before getting into the implementation, let's take a look at an example to show where we want to go. SICP starts its section on Logic Puzzles with this example:
The following puzzle (taken from Dinesman 1968) is typical of a large class of simple logic puzzles:
Baker, Cooper, Fletcher, Miller, and Smith live on different floors of an apartment house that contains only five floors. Baker does not live on the top floor. Cooper does not live on the bottom floor. Fletcher does not live on either the top or the bottom floor. Miller lives on a higher floor than does Cooper. Smith does not live on a floor adjacent to Fletcher's. Fletcher does not live on a floor adjacent to Cooper's. Where does everyone live?
We can determine who lives on each floor in a straightforward way by enumerating all the possibilities and imposing the given restrictions:
(define (multiple-dwelling)
  (let ((baker (amb 1 2 3 4 5))
        (cooper (amb 1 2 3 4 5))
        (fletcher (amb 1 2 3 4 5))
        (miller (amb 1 2 3 4 5))
        (smith (amb 1 2 3 4 5)))
    (require
     (distinct? (list baker cooper fletcher miller smith)))
    (require (not (= baker 5)))
    (require (not (= cooper 1)))
    (require (not (= fletcher 5)))
    (require (not (= fletcher 1)))
    (require (> miller cooper))
    (require (not (= (abs (- smith fletcher)) 1)))
    (require (not (= (abs (- fletcher cooper)) 1)))
    (list (list 'baker baker)
          (list 'cooper cooper)
          (list 'fletcher fletcher)
          (list 'miller miller)
          (list 'smith smith))))
Evaluating the expression (multiple-dwelling) produces the result
((baker 3) (cooper 2) (fletcher 4) (miller 5) (smith 1))
Here is what the above looks like in my Scala implementation (see the Examples section below for boilerplate):


MultipleDwelling.scala
class MultipleDwelling extends AmbEval[List[(String,Int)]] {
    def distinct(vals:List[Int]):Boolean = {
        vals.distinct.length == vals.length
    }
    generate {
        val baker = amb(List(1,2,3,4,5))
        val cooper = amb(List(1,2,3,4,5))
        val fletcher = amb(List(1,2,3,4,5))
        val miller = amb(List(1,2,3,4,5))
        val smith = amb(List(1,2,3,4,5))
        require(distinct(List(baker,cooper,fletcher,miller,smith)))
        require(baker!=5)
        require(cooper!=1)
        require(fletcher!=5)
        require(fletcher!=1)
        require(miller>cooper)
        require(scala.math.abs(smith-fletcher)!=1)
        require(scala.math.abs(fletcher-cooper)!=1)
        yld(List(
            ("baker",baker),
            ("cooper",cooper),
            ("fletcher",fletcher),
            ("miller",miller),
            ("smith",smith)))
    }
}
As you can see, it is quite similar to the lisp implementation. In particular, the code that solves this specific problem includes very little other than the statement of the problem: a set of possible values, a set of requirements, and a yielding of the solution. The implementation choices about how to make choices among the alternatives listed in the amb expressions and how to evaluate those alternatives are all handled in the code for the AmbEval class.

Of course, you could write something very similar to the above using for-comprehensions with guard statements, and that would likely be more practical for most situations, but besides the fact that you can't currently use a for-comprehension in suspendable (CPS) code and thus can't use that construct in a generator without modifications such as introduced in this post, that's just not as interesting as the amb evaluator.

The authors of SICP note that the evaluation of their straightforward implementation of multiple-dwelling is "very slow". When I execute my version on my desktop machine, it runs in less than 1/100th of a second. If that line was written in the original version of 1980, that same program could have taken 10,000 times as long to run, a couple of minutes - a long time for what seems like a small and simple program.

Implementation

In this post I implement a simple nondeterministic evaluator modeled on the amb evaluator introduced above that allows specific problems to be stated as in the above example and solved by the nondeterministic evaluator.

In SICP they build the amb evaluator in lisp along with a REPL that knows how to retrieve and print the multiple results returned by the evaluation. Rather than building my own Scala REPL, I chose to treat the nondeterministic evaluation as a generator. Thus a nondeterministic computation is implemented as a special kind of generator, and it returns its values as does a generator.

In my previous blog post I showed how to use Scala's delimited continuations to create a standalone generator. I now extend that generator to support nondeterministic evaluation. The use of continuations is an essential part of this implementation. Unlike in my previous examples of the use of continuations, where continuations were captured but only executed a single time, in this case a single continuation is executed multiple times.

As seen in the example in the Goal section above, the implementation uses a class called AmbEval, which defines the amb and require methods.
AmbEval.scala
package net.jimmc.scoroutine

import scala.collection.Iterator
import scala.util.continuations._

class AmbEval[T] extends StandaloneGenerator[T] {
    def amb[A](seq:Iterable[A]):A @cpsParam[Unit,Unit] = {
        shift { k:(A=>Unit) =>
            val it = seq.iterator
            reset[Unit,Unit] {
                //Use of var for v is workaround for Scala bug #3501
                var v:A = null.asInstanceOf[A]
                while (it.hasNext) {
                    v = it.next
                    k(v)
                    stepUntilDone
                }
            }
        }
    }

    def require(condition: =>Boolean):Unit @cpsParam[Unit,Unit] = {
        if (!condition) { amb(List()) }
    }
}
The amb method is mostly straightforward: it iterates through the values provided to it and calls the supplied continuation for each value. The continuation represents the remainder of the code following the call to amb, which means all of the code following the call to amb will be executed multiple times, once with each value supplied to the call to amb. Each of these calls is "searching" the solution space using one value from the set of alternatives supplied to amb.

When the calling code calls amb a second time, amb once again turns around and calls the remaining code multiple times, once for each value passed to amb. Thus the application code following the second call to amb gets executed once for each combination of every value in the first call to amb times every value in the second call to amb, which is the cross-product of those two set of alternatives. Likewise if the application has a third or fourth call to amb; each call to amb multiplies the number of times the following code is executed.

One subtlety of the implementation is the call to stepUntilDone. The call to the continuation k(v), which calls the code in the application following its call to amb, will return when that code has finished running; but if that code calls yld, it will be suspended, and control will return to amb after that suspension. At this point we need to suspend the amb code as well, and when it is resumed, ensure that we resume the continuation code that we called as k(v). As long as the continuation called from amb continues to suspend itself by calling yld, we need to continue suspending ourself in the same way. Once the continuation called from amb finishes and returns to amb without suspending itself, then amb can continue in its loop and invoke the continuation with the next selected value.

Note that using the yld method to return values from the generator and allowing multiple calls to yld by ensuring that every branch finishes its execution gives us a capability not in the lisp implementation defined in SICP and shown above. In that implementation, a valid choice is indicating by successfully reaching the end of a branch; the valid value is the return value of the function. In our implementation, a single branch can return multiple valid values by calling yld multiple times. This means that, in the Scala spirit, you are free to write your own code that imperatively determines for itself the validity of some of the potential alternatives rather than always using the amb and require methods to prune out invalid alternatives.

The stepUntilDone method implements the above algorithm to ensure that the called continuation completes. In order to know if the called continuation has suspended itself or completed execution, stepUntilDone needs to examine private data in the StandaloneGenerator class, so the cleanest solution is to add the stepUntilDone method to that class:
//Part of StandaloneGenerator.scala
    def stepUntilDone:Unit @suspendable = {
        //Use of var for saveStep is workaround for Scala bug #3501
        var saveStep:(Unit=>Unit) = null
        while (nextStep.isDefined) {
            saveStep = nextStep.get
            suspend     //sets nextStep to point here
            saveStep()
        }
    }
Note that the previously saved continuation from nextStep is saved in the local variable saveStep, which in turn is captured as part of the continuation then saved by the call to suspend. This provides a stacking mechanism that allows us to nest multiple calls to amb and properly manage the required backtracking. Since we are using local storage for this, we can have multiple instances of nondeterministic generators simultaneously suspended.

In both SICP and my implementation, the amb function does a depth-first search of the space of alternatives. This means it is not suitable for problems that includes multiple sets of alternatives where one or more sets are infinite. In fact, it really only does well on collections of sets that are small enough that the cross-product of all of the sets of choices can be fully enumerated, since the code choosing among alternatives makes its choice trivially and without any knowledge of which choices might be better than others. Also, the search space must not include any non-terminating branches, since eventually the algorithm would select that branch and get stuck.

As in SICP, the require method simply calls amb with no alternatives if its predicate is false, thus pruning the search tree at that point.

I did not attempt to implement any kind of side-effects backtracking such as capturing changes to global variables and undoing them after a branch finishes. Instead, regarding attempting to use code with side effects within a nondeterministic evaluation block, I offer this common suggestion: Don't Do That!

Examples

Using the AmbEval class we can easily code up the examples given in SICP. We start with the imports we use for all of the examples below:
import scala.collection.Iterator
import scala.util.continuations._
import net.jimmc.scoroutine._
Our examples will all be implemented as generator classes that extend the AmbEval class. We can see the results of each such computation by creating an instance of that class and printing out all of the values returned by the iterator. To simplify this, we define a little helper function to dump all of the results of an iterator:
def dump[T](gen:Iterator[T]) {
    for (x <- gen) println(x)
}
Here's how we can use this to print the results of evaluating the MultipleDwelling class given in the Goal section above:
dump(new MultipleDwelling)
output:
List((baker,3), (cooper,2), (fletcher,4), (miller,5), (smith,1))
Here are a few more examples, with their output given in a comment at the end of each code block.
A Pythagorean Triple Between
This problem is from SICP (exercise 4.35):
"implement a procedure that finds Pythagorean triples, i.e., triples of integers (i,j,k) between the given bounds such that i < j and i^2 + j^2 = k^2"
APythagoreanTripleBetween.scala
class APythagoreanTripleBetween(low:Int,high:Int) extends AmbEval[(Int,Int,Int)] {
    generate {
        val i = amb(low to high)
        val j = amb(i to high)
        val k = amb(j to high)
        require(i*i + j*j == k*k)
        yld((i,j,k))
    }
}
output from dump(new APythagoreanTripleBetween(1,20)):
(3,4,5)
(5,12,13)
(6,8,10)
(8,15,17)
(9,12,15)
(12,16,20)
Liars
This problem is from SICP (exercise 4.42):
Solve the following ``Liars'' puzzle (from Phillips 1934):
Five schoolgirls sat for an examination. Their parents -- so they thought -- showed an undue degree of interest in the result. They therefore agreed that, in writing home about the examination, each girl should make one true statement and one untrue one. The following are the relevant passages from their letters:
  • Betty: ``Kitty was second in the examination. I was only third.''
  • Ethel: ``You'll be glad to hear that I was on top. Joan was second.''
  • Joan: ``I was third, and poor old Ethel was bottom.''
  • Kitty: ``I came out second. Mary was only fourth.''
  • Mary: ``I was fourth. Top place was taken by Betty.''
What in fact was the order in which the five girls were placed?
Liars.scala
class Liars extends AmbEval[List[(String,Int)]] {
    def distinct(vals:List[Int]):Boolean = {
        vals.distinct.length == vals.length
    }
    generate {
        val betty = amb(List(1,2,3,4,5))
        val ethel = amb(List(1,2,3,4,5))
        val joan = amb(List(1,2,3,4,5))
        val kitty = amb(List(1,2,3,4,5))
        val mary = amb(List(1,2,3,4,5))
        require(distinct(List(betty,ethel,joan,kitty,mary)))
        require((kitty==2 && betty!=3) || (kitty!=2 && betty==3))
        require((ethel==1 && joan!=2) || (ethel!=1 && joan==2))
        require((joan==3 && ethel!=5) || (joan!=3 && ethel==5))
        require((kitty==2 && mary!=4) || (kitty!=2 && mary==4))
        require((mary==4 && betty!=1) || (mary!=4 && betty==1))
        yld(List(
            ("betty",betty),
            ("ethel",ethel),
            ("joan",joan),
            ("kitty",kitty),
            ("mary",mary)))
    }
}
output from dump(new Liars):
List((betty,3), (ethel,5), (joan,2), (kitty,1), (mary,4))
RosettaExample
This problem is from the rosettacode.org wiki page for Amb:
The example is using amb to choose four words from the following strings:

set 1: "the" "that" "a"

set 2: "frog" "elephant" "thing"

set 3: "walked" "treaded" "grows"

set 4: "slowly" "quickly"

It is a failure if the last character of word 1 is not equal to the first character of word 2, and similarly with word 2 and word 3, as well as word 3 and word 4. (the only successful sentence is "that thing grows slowly").
RosettaExample.scala
class RosettaExample extends AmbEval[List[String]] {
    generate {
        def joins(s1:String, s2:String) = s1.endsWith(s2.substring(0,1))
        val w1 = amb(List("the","that","a"))
        val w2 = amb(List("frog","elephant","thing"))
        val w3 = amb(List("walked","treaded","grows"))
        val w4 = amb(List("slowly","quickly"))
        require(joins(w1,w2))
        require(joins(w2,w3))
        require(joins(w3,w4))
        yld(List(w1,w2,w3,w4))
    }
}
output from dump(new RosettaExample):
List(that, thing, grows, slowly)

Other Implementations

If you poke around on the net you can find implementations of nondeterministic evaluators such as the amb evaluator. Some of these implementations suffer from one or more of three problems:
  • They use a for-comprehension or direct sequence iteration rather than an external amb evaluator. To see the difference, consider how the code would have to be modified in order to choose alternatives from each set of choices in random order rather than left-to-right. In an amb evaluator, this change can be made in one place.
  • They don't separate the application requirements from the mechanism that makes choices among alternatives. This can be seen by considering how a second example problem would be implemented in the same framework. It should be possible to implement the second problem by sharing code used in the implementation of the first problem but without either modifying or duplicating any code from the first problem.
  • They use a global variable to store a stack of continuations. This makes it impossible to run two independent nondeterministic evaluations at the same time.
Implementations of amb in various languages:
  • RosettaCode, implementations in many languages - but many of the implementations suffer from the problems listed above. In particular, the Scala implementation uses direct sequence iteration (in the form of a recursive tail operation on a list) and does not separate the application requirements (of first and last letters of adjacent words being the same) from the mechanism that makes choices among alternatives.
  • Nondeterministic evaluation in under 300 bytes of Python - Uses a global variable to store a stack of continuations.
  • Ruby - Uses a global variable to store a stack of continuations.
  • C#, Linq - Uses direct sequence iteration.
  • Scheme
Some interesting terms:
  • Angelic choice - always avoids choices that lead to nonterminating branches.
  • Demonic choice - always makes choices that lead to nonterminating branches.
  • Erratic choice - choices may or may not lead to nonterminating branches.

Wednesday, September 8, 2010

Standalone Generic Scala Generator

A generic Scala generator class built directly on Scala's shift and reset operators.

In my previous post I showed a generic generator class built on top of my coroutines library. I commented in that post that I was taking the expedient approach of using my existing library, but that it would be possible to package all of the functionality into the Generator class.

I soon realized that it would in fact be relatively easy to do that packaging, and that the resulting relatively simple class would probably make a good vehicle for demonstrating the usefulness of Scala's delimited continuations. Thus I give you the StandaloneGenerator class:

package net.jimmc.scoroutine

import scala.collection.Iterator
import scala.util.continuations._

class StandaloneGenerator[T] extends Iterator[T] {
    private var nextValue:Option[T] = None
    private var nextStep:Option[Unit=>Unit] = None

    /** Subclass calls this method to generate values.
     * @param body The code for your generator.
     */
    protected def generate(body: => Unit @suspendable) {
        reset {
            suspend
            body
        }
    }

    /** Yield the next generated value.
     * Call this code from your generator to deliver the next value.
     */
    protected def yld(x:T):Unit @suspendable = {
        nextValue = Some(x)
        suspend
    }

    /** Retrieve the next generated value.
     * Call this from your main code.
     */
    def next:T = {
        step
        nextValue match {
            case None => throw new NoSuchElementException("next on empty generator")
                    //make it similar to the equivalent Iterator exception
            case Some(x) => nextValue = None; x
        }
    }

    /** True if there is another value to retrieve.
     * Call this from your main code.
     */
    def hasNext:Boolean = {
        step
        nextValue.isDefined
    }

    /** Save our continuation to resume later. */
    private def suspend:Unit @suspendable = {
        shift { k:(Unit=>Unit) =>
            nextStep = Some(k)
        }
    }

    /** If we have a next step but we don't have a value queued up,
     * run the generator until it calls yld or completes. */
    private def step = {
        if (nextValue.isEmpty) nextStep foreach { nextStep = None; _() }
    }
}
The StandaloneGenerator class is a plug-compatible replacement for the Generator class described in my previous post, as long as the derived generator does not use fancy scheduling as in the Primes Generator example. So, for example, you could take the Integers Generator example from that post, replace the two occurences of Generator with StandaloneGenerator, and everything would work the same way.

We have two variables: nextValue is our one-element queue where we store the next value to be returned by the generator, and nextStep is our "scheduler queue" where we store our continuation each time we suspend the generator to return a value. Both of these variables are of type Option so that we can tell when they hold a value.

Control is managed by the two functions suspend and step. The suspend method has a shift block that could not be much simpler: all it does is store the passed-in continuation in the nextStep variable. Since the body of a shift block is always the last thing executed within the scope of the CPS code (delimited either by the enclosing reset block or an explicitly invoked continuation), the fact that suspend does not execute its continuation means that after suspend does its thing, control returns to the point just past the enclosing reset, or to the next statement after the explicit continuation call.

I considered calling the step method resume instead, to make it clear that it was the complement to suspend, but from the point of view of the code calling step, by the time control returns to the point after the call to step, the generator code has already been suspended again: it has completed running one step, which is from one yld call to the next.

The step function executes our continuation if we have one, but only if we don't already have a value in nextValue. Using foreach on an Option is a neat way to execute a block of code only if the Option has a value (i.e. is not None). In this case, since the contents of nextStep (if it has any) is a function, the placeholder variable _ gets set to the continuation and the code fragment _() is what actually executes the continuation. Here are two other ways this function could be implemented that do the same thing:
    private def step1 = {
        if (nextValue.isEmpty) nextStep match {
            case None => //do nothing
            case Some(p) => nextStep = None; p()
        }
    }

    private def step2 = {
        if (nextValue.isEmpty && nextStep.isDefined) {
            val p = nextStep.get; nextStep = None; p()
        }
    }
Let's walk through this and see how it works (assuming the generator generates at least one value):
  1. The main code instantiates a generator, which passes its generator body to the generate function, which calls suspend, which saves that body in nextStep without executing it.
  2. The main code calls hasNext, which calls step, which runs the continuation. This starts the generator body code running, which runs until it calls yld with the first value. That first value is stored in nextValue and the generator code is suspended, with the continuation stored in nextStep. Since we now have a value stored in nextValue, hasNext returns true.
  3. The main code calls next. Since we have a value in nextValue, the call to step does nothing (it is only there in case the caller calls next without first calling hasNext). The value in nextValue is returned, and that variable is cleared (set to None).
  4. Each time the main code calls hasNext and it calls step, the continuation stored in nextStep is executed, the generator code calls yld with the next value, that value is stored into nextValue, the continuation is stored in nextStep, and the generator is suspended, and hasNext returns true.
  5. Eventually (for a finite generator) there is a call to step where the generator finishes execution without calling yld. When this happens, no values are stored in either nextValue or nextStep. Since there is no value in nextValue, hasNext returns false. Since there is also no value in nextStep, further calls to step do nothing, and hasNext will continue to return false.
I dare say this might be one of the simplest realistic examples of the use of Scala's reset and shift operators that you will find. Two variables, six functions each not more than four lines of body code, 16 lines of function body code in total, 35 lines of code altogether.

Tuesday, September 7, 2010

Scala Generators

An implementation of generators on top of Scala's delimited continuations.

In my previous post I described a library that supports coroutines on top of Scala's delimited continuations capability. In this post I show how you can easily create generators on top of that coroutine library (net.jimmc.scoroutine). This is a second example of the kind of interesting construct that can be built on top of Scala's delimited continuations.

As with my previous post on coroutines, you don't need to understand reset and shift if you just want to use the Generator class shown here to write and use your own generators. But, as with coroutines, you should have a basic understanding of CPS code and its restrictions when writing generators.

Contents

Generators

A generator is a routine that produces values like an iterator but is structured as a function. The generated values are returned by calling a special function, typically called yield, with each value that is generated. In our case, since yield is a reserved word in Scala, we will use yld instead.

Generators and coroutines are closely related. Depending on the implementation, generators and coroutines may be almost the same thing or fairly different, but in any case, if you have either one, you can implement the other one on top of it. Since we already have coroutines in the net.jimmc.scoroutine library described in my previous post, we will implement generators on top of coroutines using that library.

You can think of this approach as using the Producer-Consumer pattern, where we set up a generator as the producer and we allow the main code to act as the consumer. We create a generic Generator class that does the following:
  • Creates a CoScheduler that we use to control the generator.
  • Creates a CoQueue buffer into which we will place the generated values.
  • Provides convenience functions yld (in place of the reserved word yield) and generate.
  • Provides next and hasNext functions for the consuming code to call from a non-CPS context, and so that a Generator can be used as an Iterator.
This is all simple and straightforward. Here is the code for Generator:

package net.jimmc.scoroutine

import scala.collection.Iterator
import scala.util.continuations._

/** Generic generator class.
 */
class Generator[T] extends Iterator[T] {
    val sched = new DefaultCoScheduler
    val buf = new CoQueue[T](sched,1)

    /** Subclass calls this method to generate values.
     * @param body The code for your generator.
     */
    def generate(body: => Unit @suspendable) {
        sched.addRoutine("gen") { body }
        sched.run
    }

    /** Yield the next generated value.
     * Call this code from your generator to deliver the next value.
     */
    protected def yld(x:T):Unit @suspendable = {
        buf.blockingEnqueue(x)
    }

    /** Retrieve the next generated value.
     * Call this from your main code.
     */
    def next:T = {
        sched.run
        buf.dequeue
    }

    /** True if there is another value to retrieve.
     * Call this from your main code.
     */
    def hasNext:Boolean = {
        sched.run
        !buf.dequeueBlocker.isBlocked
    }
}
We are not concerning ourselves with performance here, so we are simply using the available DefaultCoScheduler as our scheduler. As a future optimization, we could develop a scheduler optimized for a single coroutine and use that as our scheduler for simple generators that fit that criterion. We could go further and use neither a scheduler nor CoQueue, packaging all of the functionality directly into the Generator class; but we are using the more expedient approach of using those two pieces, since we already have them and are familiar with their use from our experience with coroutines.

Integers Generator

Here is how we would use our generic Generator class to create a generator that will generate integers up to a specified maximum value:
import net.jimmc.scoroutine.Generator

class IntGen(max:Int) extends Generator[Int] {
    generate {
        var x = 1
        while (x<=max) {
            yld(x)
            x = x + 1
        }
    }
}
The one catch to remember here is that the body of the generate call is CPS code, so as with the body of a coroutine, there are some restrictions on what control constructs we can use. Thus we use a while loop with a var rather than a for loop, since the latter does not yet work with the continuations compiler plugin.

Given the above generator class, here is a simple GenInts object with a main function that creates an instance of that generator, then calls it to print out its values:
object GenInts {
    def main(args:Array[String]) = {
        val gen = new IntGen(4)
        for (i <- gen)
            println(i)
    }
}
Alternatively, we could replace the for loop with direct calls to hasNext and next:
object GenInts {
    def main(args:Array[String]) = {
        val gen = new IntGen(4)
        while (gen.hasNext)
            println(gen.next)
    }
}

Primes Generator

It is possible to use shift and reset directly to code up a generator, but because our coroutine library implements a scheduler to which new coroutines can be added at any time, this gives you the ability to create generators that include dynamic filter pipelines.

The example I use for this is the Sieve of Eratosthenes, a method of calculating primes in which, each time a prime is found, it is added to a list of prime divisors that are used for testing each new candidate. In this GenPrimes example, I create a new filter for each prime and add it to the pipeline. You can do this much more efficiently in Scala using a Stream, but this example illustrates the technique of dynamically building a pipeline within a generator.
import scala.util.continuations._

import net.jimmc.scoroutine.CoQueue
import net.jimmc.scoroutine.Generator

object GenPrimes {
    def main(args:Array[String]) = {
        val gen = new PrimeGen()
        for (i <- gen) {
            println("Prime: "+i)
        }
    }
}

class PrimeGen extends Generator[Int] {
    val bufSize = 1
    val out1 = new CoQueue[Int](sched,bufSize)

    sched.addRoutine("prime2")(nextPrime(2,out1))
    generate {
        def gen(n:Int):Unit @suspendable = {
            out1.blockingEnqueue(n)
            gen(n+1)
        }
        gen(2)
    }

    def nextPrime(p:Int, in:CoQueue[Int]):Unit @suspendable = {
        var out:Option[CoQueue[Int]] = None
        yld(p)
        def sieve():Unit @suspendable = {
            val n = in.blockingDequeue()
            if ((n%p)!=0) {
                if (!out.isDefined) {
                    out = Some(new CoQueue[Int](sched,bufSize))
                    val rName = "prime"+n
                    sched.addRoutine(rName)(nextPrime(n,out.get))
                }
                out.get.blockingEnqueue(n)
            } else {
                in.dequeueBlocker.waitUntilNotBlocked
            }
            sieve()
        }
        sieve()
    }
}
This example starts by setting up two coroutines: the addRoutine call sets up the first filter in the pipeline, which reads values from the out1 queue and filters our all numbers divisible by 2. The generator call sets up the other initial coroutine, which generates every integer and feeds it into the first filter in the pipeline. We start off this counting generator with the first prime number, 2.

The nextPrime function is called each time we see a new prime. It starts by outputting its prime parameter value p as a value of the GenPrimes generator. It then goes into a loop reading its input buffer and looking for values which are not divisible by its prime number. The first time it finds one (when out is not yet defined) it registers (with a call to addRoutine) a new coroutine based on a new instance of nextPrime that uses our output as its input. It then passes each candidate prime along to that next filter in the sieve pipeline.

You can tell this is CPS code because of the suspendable annotations, which is a cue to realizing that the code might not behave quite as you think. For example, the gen function within the body of the generate call is recursive, so you might think it would cause a stack overflow. But since we are in a CPS function and the call to blockingEnqueue is a call to a CPS function, the recursive call to gen is turned into a continuation and executed later from the scheduler, so it is in fact not recursive. Likewise the recursive call to sieve is not really recursive for the same reason.

Another CPS detail is the call to waitUntilNotBlocked. It would seem to be functionally unnecessary, since the first thing in the sieve function is a call to blockingDequeue. However, this is the same attempt to avoid blocking as discussed in my previous post; without this call our code will not work.

Same Fringe

The SameFringe problem has been called the "killer application" for coroutines. Given two trees, they have the same fringe if the leaves of the two trees, read from left to right, are the same.

With coroutines, or in this case generators, the simple solution to this problem is to create a generator that takes a tree and returns the sequence of leaves of that tree, then compare the outputs of two of those generators on the two trees to be compared.

We start with a simple tree definition:
sealed abstract class Tree[T]
case class Branch[T](left:Tree[T], right:Tree[T]) extends Tree[T]
case class Leaf[T](x:T) extends Tree[T]
Given this tree definition, we write a generator that walks a tree and yields all of the leaves:
import scala.util.continuations._
import net.jimmc.scoroutine.Generator

class TreeFringe[T](tree:Tree[T]) extends Generator[T] {
    generate {
        def walk(t:Tree[T]):Unit @suspendable = {
            t match {
                case Leaf(x) => yld(x)
                case Branch(left,right) => walk(left); walk(right)
            }
        }
        walk(tree)
    }
}
Since our generators implement the Iterator trait, we can compare two generators as two iterators with this little piece of code, making the assumption that the tree leaf values are never null:
    def sameFringe[T](tree1:Tree[T], tree2:Tree[T]):Boolean = {
        !((new TreeFringe(tree1)).zipAll(new TreeFringe(tree2),null,null).
            exists(p=>p._1!=p._2))
    }
Alternatively, we could use this more verbose version:
    def sameFringe[T](tree1:Tree[T], tree2:Tree[T]):Boolean = {
        val fringe1 = new TreeFringe(tree1)
        val fringe2 = new TreeFringe(tree2)
        while(fringe1.hasNext && fringe2.hasNext) {
            if (fringe1.next != fringe2.next)
                return false;
        }
        !(fringe1.hasNext || fringe2.hasNext)
    }
We add a SameFringe object with a main method that creates some test trees, prints out the leaves of each tree using our generator, then calls our sameFringe method to check for equality.
object SameFringe {
    def main(args:Array[String]) = {
        val t1 = Branch(Branch(Leaf(1),Leaf(2)),Leaf(3))
        val t2 = Branch(Leaf(1),Branch(Leaf(2),Leaf(3)))
        val t3 = Branch(Leaf(1),Branch(Leaf(2),Leaf(4)))
        println("t1:"); for (x <- (new TreeFringe(t1))) println(x)
        println("t2:"); for (x <- (new TreeFringe(t2))) println(x)
        println("t3:"); for (x <- (new TreeFringe(t3))) println(x)
        println("sameFringe(t1,t2)="+sameFringe(t1,t2))
        println("sameFringe(t1,t3)="+sameFringe(t1,t3))
    }
    //include the sameFringe method in this object
}

More Possibilities

Some other possible uses for generators or coroutines:
  • Pipelines: A sequence of tasks can operate on a stream of data, with each task reading data from an input queue and writing to an output queue which is the input queue of the next task in the sequence.
  • Fan-out: A single producer with multiple consumers can be implemented by using a fan-out coroutine that reads from its input queue and writes the same data to multiple output queues, each of which is the input queue to one of the multiple consumers.
  • Fan-in: Multiple producers can use a single shared output queue so that the coroutine using that queue as its input queue receives data from multiple sources. If you stick with a single-thread scheduler, you don't have to worry about synchronization or other concurrent access issues on the shared queue. By combining Pipelines, Fan-out and Fan-in, we can create arbitrary networks of communicating coroutines.
  • State machines: For any situation in which a task has to maintain state based on one or more inputs, a coroutine or generator can be used to allow some of that state to be stored as the location of current execution in the code, which often makes the code simpler to write and maintain.
  • Parsers: A parser is a typical example of a producer that reads an input stream and maintains state. As the parser collects input characters (which could be provided by another coroutine in a pipeline) and resolves them into tokens, it writes them to its output queue where the tokens are available to the routine handling the next level of analysis.

Monday, September 6, 2010

Scala Coroutines

An implementation of coroutines on top of Scala's delimited continuations.

In my previous post I said that delimited continuations could be used to create interesting control constructs. In this post I give examples of one such construct: coroutines.

I describe the implementation of a library to make it easier to write coroutines, and I give an example that is built on that library. If you want to go straight to the example, see the section Producer-Consumer.

Like my previous post on delimited continuations, this is a long post. However, though long, it should be much easier going than that post. You don't need to read and understand that post in order to understand this post, but it will help in two places:
  1. In the discussion of the CoScheduler API and implementation, an understanding of reset and shift is required.
  2. You need to understand at some level about CPS code and its restrictions when writing coroutines or using the CPS-only methods in CoQueue and Blocker.

Contents

Coroutines

The Wikipdeia coroutines page says coroutines are a generalization of subroutines: whereas a subroutine starts at the beginning every time it is called, a coroutine starts at the beginning the first time it is called, but can start at any other point on successive calls. Generally this means that on successive calls, it continues running from the point where it most recently returned.

Instead of using a return statement, coroutines typically use a yield statement. A yield statement indicates that the routine is done executing for now, and will resume execution following the yield statement the next time it is called.

In the classic definition of coroutines, the yield statement indicates which coroutine is to be run next. With two coroutines, each always yields to the other; with more than two, a coroutine might have code to determine which other coroutine to yield to.

Unfortunately for coroutines, yield is already a keyword in Scala, so we can't use it for our purposes. We can either pick a slightly different word such as yld or yieldTo, or we can just use a different term altogether.

The classic example of the use of coroutines is a pair of routines, one of which (the producer) generates data and one of which (the consumer) consumes data. These two routines are connected by a queue; the producer puts data into the queue and the consumer takes data out of the queue.

This same producer-consumer pair is also a typical example of multi-thread code. Ted Neward uses this producer-consumer example in his blog post describing the concurrency benefits of using Scala.

In the multi-thread producer-consumer example, the producer thread runs and places data into the queue until the queue is full, at which point that thread stops running until the queue empties out enough for it to add more data. Meanwhile the consumer thread runs and takes data out of the queue until the queue is empty, at which point that thread stop running until the queue contains more data. If the host on which the multi-thread application is running contains more than one processor, both of these threads might be running at the same time.

I like to think of coroutines as being like multi-thread code, only without multiple threads. The coroutine version of the producer-consumer example works essentially the same as the multi-thread version, except that only one of the two is ever running at one point in time. The producer runs and places data into the queue until it is full, at which point it pauses and the consumer starts running. The consumer takes data out of the queue until it is empty, at which point it pauses and the producer starts running again.

In both the multi-thread and the coroutine version of this example, there is some state that is saved while each routine is paused waiting for the queue to fill or empty. In the multi-thread example, that state is saved in the thread. In our coroutine example, we use a different mechanism to save that state: a delimited continuation.

Why Use Coroutines

If coroutines are like multi-thread code, why use them and have to deal with continuations rather than just using threads? Here are some possible reasons:
  • With the default scheduler that runs everything in one thread, you don't need to worry about concurrency issues such as multiple concurrent access to shared state.
  • You can create your own scheduler to control when to run each of your coroutines. If you want that scheduler to use a thread pool and run coroutines concurrently, you can do that (assuming you then deal with concurrency issues in your coroutines).
  • An application can handle more suspended continuations than it can suspended threads (for example, see slide 19 of Phillip Haller's presentation on Actors, where he says an app can handle up to 5,000 threads, but up to 1,200,000 actors).

Building a Coroutine Library

It is possible to write coroutines directly in Scala code using reset and shift, but dealing with delimited continuations can be tricky, so I wanted to isolate all of that code into a reusable library that would make it easier to write coroutines as well as allow encapsulating more sophisticated capabilities within the library. The package name I selected for this library is net.jimmc.scoroutine. The source code is available on github.

I started with a change that makes these coroutines look less like coroutines and more like the multi-thread model: rather than have a coroutine specify what other coroutine is to be run, I wanted to be able to specify only that the coroutine is ready to give up control. Essentially, rather than yielding to another coroutine, I always yield to a scheduler, and the scheduler selects and then yields to the next coroutine. Given such a scheduler (described below), we can create a few simple constructs on which to build our coroutine API.

Blocker

In the typical producer-consumer example, there is an explicit check to see if the routine is blocked, and if so, then a call to yield is made. I wanted something more generic, so I created an abstract trait Blocker to represent the condition that a routine could be blocked by something:

package net.jimmc.scoroutine

trait Blocker {
    def isBlocked: Boolean
}
The implementations of this for the producer and consumer are straightforward: for the producer, isBlocked returns true when the queue is full; for the consumer, isBlocked returns true when the queue is empty.

Given the isBlocked method, the typical coroutine always includes a code fragment that looks something like this:
while (isBlocked)
    yield control
Since we will always be yielding control to the scheduler, we can encapsulate this into a more convenient method, which I have called waitUntilNotBlocked. I added this function to my Blocker trait, and delegated it to a scheduler of type CoScheduler:
package net.jimmc.scoroutine

trait Blocker {
    val scheduler: CoScheduler  //class must override to provide instance
    def isBlocked: Boolean

    def waitUntilNotBlocked:Unit = {
        scheduler.waitUntilNotBlocked(this)
    }
}
We pass this to the scheduler so that it can call our isBlocked method and continue our execution only when isBlocked returns false.

There is one more detail to be added to Blocker, but it is an important one. I stated above that this implementation is built on top of delimited continuations. When we call waitUntilNotBlocked and we are blocked, we want the coroutine library to wait until we are no longer blocked and then continue execution of our routine. The coroutine library will be using delimited continuations to do this, and since our routine might be suspended, the signature for the waitUntilNotBlocked method must include the suspendable annotation. We add that annotation along with a suitable import statement to get our final version of Blocker:
package net.jimmc.scoroutine

import scala.util.continuations._

trait Blocker {
    val scheduler: CoScheduler  //class must override to provide instance
    def isBlocked: Boolean

    def waitUntilNotBlocked:Unit @suspendable = {
        scheduler.waitUntilNotBlocked(this)
    }
}

CoQueue

Once we have Blocker, it is easy to compose a blocking version of the standard Queue class (where by "blocking" I mean that the coroutine will be suspended until the specified Blocker is no longer blocked). We want a version of the Queue.enqueue function that blocks when the queue is full, and a version of the Queue.dequeue function that blocks when the queue is empty. We create a thin wrapper around the Queue class, which we call CoQueue, to implement our blocking methods for use with our coroutines. For each of our two blocking conditions, we create an instance of Blocker with those conditions as each of their isBlocked functions, and we use those Blocker instances to create our blockingEnqueue and blockingDequeue methods.

Because the blockingEnqueue and blockingDequeue methods might block, they must be annotated as suspendable, which means they can only be called from within CPS code. Here is our entire CoQueue class:
package net.jimmc.scoroutine

import scala.util.continuations._
import scala.collection.mutable.Queue

class CoQueue[A](val scheduler:CoScheduler, val waitSize:Int)
        extends Queue[A] { coqueue =>

    val enqueueBlocker = new Blocker() {
        val scheduler = coqueue.scheduler
        def isBlocked() = length >= waitSize
    }

    val dequeueBlocker = new Blocker() {
        val scheduler = coqueue.scheduler
        def isBlocked() = isEmpty
    }

    def blockingEnqueue(x:A):Unit @suspendable = {
        enqueueBlocker.waitUntilNotBlocked
        enqueue(x)
    }

    def blockingDequeue():A @suspendable = {
        dequeueBlocker.waitUntilNotBlocked
        dequeue
    }
}

An Attempt to Avoid Blocking

You might wonder if we could implement blockingEnqueue as follows so as to avoid the call to the scheduler's waitUntilNotBlocked method:
//this won't compile
    def blockingEnqueue(x:A):Unit @suspendable = {
        if (enqueueBlocker.isBlocked)
            enqueueBlocker.waitUntilNotBlocked
        enqueue(x)
    }
However, the compiler gives this error message:
CoQueue.scala:22: error: type mismatch;
 found   : Unit
 required: Unit @scala.util.continuations.cpsParam[Unit,Unit]
        if (enqueueBlocker.isBlocked)
        ^
Syntactically, the problem is that the above one-sided if statement is equivalent to
        if (enqueueBlocker.isBlocked)
            enqueueBlocker.waitUntilNotBlocked
        else
            ()
The type of () is Unit, but the type of waitUntilNotBlocked is Unit @suspendable, so there is a type mismatch.

You might think we could use some trickery such as this:
//compiles, but won't run properly
    def unitSuspendable:Unit @suspendable = ()

    def blockingEnqueue(x:A):Unit @suspendable = {
        if (enqueueBlocker.isBlocked)
            enqueueBlocker.waitUntilNotBlocked
        else
            unitSuspendable
        enqueue(x)
    }
This will compile without errors, but will not work properly. The problem is that we are trying to define one code path that is CPS (through waitUntilNotBlocked) and one path that is not (through unitSuspendable). The CPS compiler plugin transforms the code to use continuations, which, as you might recall from the discussion in my previous post, packages the rest of the function up in a continuation and passes it along to the called function. But the unitSuspendable expression does nothing with that continuation; it neither executes it nor saves it for later execution. Thus as soon as this code path is taken, the continuation - which represents the remainder of the execution of the entire delimited continuation, including the caller of this function - is dropped, and the whole delimited continuation is done.

CoScheduler API

The scheduler is the piece of the coroutine library that saves the continuations for all of the participating coroutines and determines when to execute those continuations. By design, all of the tricky stuff is encapsulated in this class. I have divided the discussion of the scheduler into two section: this first section discusses at a high level the tasks for which the schedule must take responsibility, and defines an API to perform those tasks. The following section describes the implementation of that API. If you don't want to get too far into the delimited continuation stuff, you can read just this section and skip the implementation section.

I started with the requirements that it should be possible to write a scheduler that:
  1. uses one thread for all participating coroutines, or uses a thread pool so that multiple coroutines can run at once;
  2. uses a simple algorithm to select which coroutine to run next, such as round-robin, or a more complex algorithm, such as a priority-based approach;
  3. can be instantiated multiple times so that different collections of coroutines can be managed with different schedulers.
In order to allow all of the above, the scheduler API is defined in a trait, which I call CoScheduler. CoScheduler needs to define three basic functions:
  1. Register a coroutine. I chose to do this with a method that accepts a block of code which is the coroutine body. Since the coroutine body might be suspended, the signature for that argument must include the suspendable annotation. I call this method addRoutine. To improve tracing, I also pass in a name argument that can be used to identify that coroutine.
  2. Wait until a coroutine is no longer blocked. This is the method to which Blocker.waitUntilNotBlocked will delegate. It is also called waitUntilNotBlocked, and takes an argument which is the Blocker whose isBlocked method will be used to make that determination. Since this method will be called from the coroutine body or a method called from that body, it must be marked as suspendable.
  3. Run a coroutine. There are two flavors of this: run a single step, returning as soon as one coroutine has run and has returned control to the scheduler, or run as many steps as possible, until there are no more unblocked coroutines to run. I call these two methods runNextUnblockedRoutine and runUntilBlockedOrDone, respectively. Since these methods are meant to be called from the main application, not from within a coroutine, they are not marked as suspendable.

    When one of the run methods returns, we would like to know whether there are some blocked coroutines, all of our coroutines have completed, or we don't know which because we only ran one routine. In order to return this indication, we define a sealed class RunStatus with three case objects representing these three possibilities.
The API of our CoScheduler trait thus looks like this:
package net.jimmc.scoroutine

sealed abstract class RunStatus
case object RanOneRoutine extends RunStatus
case object SomeRoutinesBlocked extends RunStatus
case object AllRoutinesDone extends RunStatus

trait CoScheduler {
    def addRoutine(name:String)(body: => Unit @suspendable)

    def waitUntilNotBlocked(b:Blocker):Unit @suspendable

    def runNextUnblockedRoutine():RunStatus
    def runUntilBlockedOrDone():RunStatus
}
If you are interested in using the scoroutine library to write coroutines, but you don't care how it works internally, then you can skip the next few sections and pick up again at Producer-Consumer.

CoScheduler Implementation

The reason for implementing CoScheduler as a trait rather than a class is to make it easier to create multiple implementations. On the other hand, there is some functionality which is likely to be the same in all implementations, and since Scala allows us to include code in traits, we will add the implementation of those methods to the trait.

For example, given the runNextUnblockedRoutine method, the runUntilBlockedOrDone method is a simple loop that calls runNextUnblockedRoutine until it does not run something.

Likewise, if we internally create an instance of a Blocker in addRoutine, we can implement both that method and waitUntilNotBlocked on top of a single method that stores a continuation, which we will call setRoutineContinuation.

This means we can create a concrete scheduler class that extends the CoScheduler trait by implementing just two functions: runNextUnblockedRoutine and setRoutineContinuation.

Below is what our implementation of CoScheduler looks like after making the above changes.
package net.jimmc.scoroutine

import scala.util.continuations._

sealed class RunStatus
case object RanOneRoutine extends RunStatus
case object SomeRoutinesBlocked extends RunStatus
case object AllRoutinesDone extends RunStatus

trait CoScheduler { cosched =>
    private[scoroutine] def setRoutineContinuation(
            b:Blocker, cont:Option[Unit=>Unit]):Unit
    def runNextUnblockedRoutine():RunStatus

    /* We use a class rather than an object because we are using the
     * instance as a key to find more info about the associated routine. */
    class BlockerNever() extends Blocker {
        val scheduler = cosched
        val isBlocked = false
    }

    def addRoutine(name:String)(body: => Unit @suspendable) {
        reset {
            val blocker = new BlockerNever()
            waitUntilNotBlocked(blocker)
            body
        }
    }

    def runUntilBlockedOrDone():RunStatus = {
        var status:RunStatus = RanOneRoutine
        while (status==RanOneRoutine) {
            status = runNextUnblockedRoutine()
        }
        status
    }

    def waitUntilNotBlocked(b:Blocker):Unit @suspendable = {
        shift( (cont: Unit=>Unit) => {
            setRoutineContinuation(b,Some(cont))
        })
    }
}
As we already knew based on the existence of the suspendable annotation, there are two methods that deal with CPS code: addRoutine and waitUntilNotBlocked.

Before examining these functions, it is worth reviewing one point about how reset and shift work. When there is a shift call within a reset block, the CPS compiler plugin transforms the code within the reset block such that the code after the shift block is passed as a continuation argument to the shift block. The code within the shift block is thus the last code to be executed within the reset block. The code that is within the reset but outside of the shift is CPS code, but the code within the shift block is not CPS code. In other words, once within a reset block, we are executing CPS code until we get into the shift block, at which point we are no longer executing CPS code. The continuation contains CPS code; if we call the continuation from within our shift code, we transition from non-CPS code into CPS code. But if we happen to save the continuation, we can directly execute it later from any non-CPS code, and that will likewise transition us into the CPS code of the continuation.

The waitUntilNotBlocked function takes as an argument the Blocker that will tell us when the calling coroutine is allowed to run again, and, along with the continuation passed in to the shift, passes it to setRoutineContinuation. That function saves the continuation and its associated Blocker and returns without executing the continuation. Because we are returning without executing the continuation, control returns to the first statement past the end of the enclosing reset block, or, if the current code/continuation (i.e. the CPS code containing the call to waitUntilNotBlocked) was directly executed from non-CPS code, to the statement after the point at which that continuation was executed.

The addRoutine function creates a Blocker that never blocks, then calls waitUntilNotBlocked with that blocker. Since waitUntilNotBlocked is a CPS function (marked with the suspendable annotation), the remainder of the code in the reset block is turned into a continuation and passed along to waitUntilNotBlocked. When that method calls shift, the continuation we passed to waitUntilNotBlocked - i.e. our call to body - is part of the continuation passed to the shift. Thus when that method saves the continuation, that saved continuation includes the call to the coroutine body. Since the continuation is not immediately executed, control returns to the end of the reset block, and we return from addRoutine with our coroutine sitting in the scheduler ready to start running.

DefaultCoScheduler

Given the CoScheduler trait described above, the only functionality that remains for our concrete class is to implement a mechanism for storing continuations and selecting the next one to run. The DefaultCoScheduler implements a simple round-robin scheduling mechanism, selecting the next runnable continuation each time it is invoked. Note that this implementation has been designed to be simple, but is not very efficient. In particular, it will not exhibit good performance when there are a large number of coroutines of which only a few are runnable at any time.

We define a case class BlockerInfo to tie together a Blocker and its associated continuation, an ArrayBuffer to store an ordered set of those, and a map to find one given a Blocker.

The setRoutineContinuation function adds a new BlockerInfo to our array if we don't already have one for the given Blocker, or updates the existing one if we do.

The runNextUnblockedRoutine function does a simple linear scan through the array of items, starting just past where we left off the last time, looking for the first unblocked continuation and running it. If there were no runnable continuations, we return a status code without running anything.
package net.jimmc.scoroutine

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

class DefaultCoScheduler extends CoScheduler {
    val blockerIndexMap = new HashMap[Blocker,Int]
    case class BlockerInfo(val blocker:Blocker, index:Int,
            var cont:Option[Unit=>Unit])
    val blockerList = new ArrayBuffer[BlockerInfo]
    var nextIndex = 0

    private[scoroutine] def setRoutineContinuation(
            b:Blocker,cont:Option[Unit=>Unit]) {
        if (blockerIndexMap.get(b).isEmpty) {
            val nextIndex = blockerIndexMap.size
            blockerIndexMap.put(b,nextIndex)
            blockerList += BlockerInfo(b, nextIndex, cont)
        } else {
            val n = blockerIndexMap(b)
            blockerList(n).cont = cont
        }
    }

    def runNextUnblockedRoutine():RunStatus = {
        var blockedCount = 0
        for (i <- 0 until blockerList.size) {
            val index = (nextIndex + i) % blockerList.size
            val bInfo = blockerList(index)
            if (bInfo.cont.isDefined && bInfo.blocker.isBlocked) {
                blockedCount += 1
            }
            if (bInfo.cont.isDefined && !bInfo.blocker.isBlocked) {
                nextIndex = index + 1
                val nextCont = bInfo.cont
                bInfo.cont = None
                nextCont.get()          //run the continuation
                return RanOneRoutine
            }
        }
        if (blockedCount > 0) {
            SomeRoutinesBlocked
        } else {
            AllRoutinesDone
        }
    }
}
Note that DefaultCoScheduler does not import the continuations package. It does not use reset, shift, or any of the CPS annotations such as suspendable. This is because none of this code is CPS code. runNextUnblockedRoutine is called from non-CPS code, and although setRoutineContinuation is called from CPS code, it does not itself call any CPS functions nor does it use shift, so it does not need to be declared as CPS code.

Other Schedulers

DefaultCoScheduler implements a basic scheduling algorithm. It is intended for use with small numbers of coroutines and has not been optimized. Other schedulers could be written that are optimized for other situations, such as large numbers of coroutines, coroutines with different priorities, or "stickiness" so that a running coroutine continues to run until it is blocked before the next coroutine runs. Since the code that creates the coroutines starts by creating the scheduler that controls those coroutines, it would be simple to create a scheduler other than DefaultCoScheduler for use with a particular set of coroutines.

Producer-Consumer

Let's see how the Producer-Consumer example (ProdConTest) looks using the scoroutine library:
import scala.util.continuations._

import net.jimmc.scoroutine.DefaultCoScheduler
import net.jimmc.scoroutine.CoQueue

object ProdConTest {
    def main(args:Array[String]) = {
        val prodcon = new ProduceAndConsume()
        prodcon.run
    }
}

class ProdCon() {
    val sched = new DefaultCoScheduler
    val buf = new CoQueue[Int](sched,2)

    def run() {
        sched.addRoutine("producer"){
            var i = 0
            while (i < 4) {
                buf.blockingEnqueue(i)
            }
        }
        sched.addRoutine("consumer"){
            val total = buf.blockingDequeue +
                    buf.blockingDequeue + buf.blockingDequeue
            println("consume total is "+total)
        }
        sched.runUntilBlockedOrDone
    }
}
After the imports and a simple main method for testing, we have the ProdCon class with the actual producer-consumer definition.

We start by setting up two objects: the scheduler that will control our two coroutines, and a queue for communication between them. We then register two coroutines with our scheduler, one for the producer and one for the consumer, and we call sched.runUntilBlockedOrDone to run the coroutines until there is nothing runnable left on that scheduler.

You can't tell in this example, but the code blocks being passed to addRoutine are CPS code, the same as the body of a reset block. If you decide to refactor this code and push the blockingEnqueue or blockingDequeue calls down into a subroutine, that subroutine will have to be marked with the suspendable annotation. Also, because coroutine bodies are CPS code, there are there are some restrictions on what control constructs can be used. You can see what this looks like in the ProdConTestWithSubs source code in the scoroutine examples.

I will give some more examples in my next post, a followup about Generators.

Resources