2011-03-15

Async Scala

In my last blog post http://naedyr.blogspot.com/2011/03/atomic-scala.html I described a scala wrapper for java.util.concurrent.AtomicReference, which made shared mutable state easy to use from multiple threads.

The next step is actually creating separate threads.

The old java.lang.Thread class is how most people would do multiple threads in java; but this meant managing starting and stopping the thread and managing your own thread pools.
The java.util.concurrent.Executors class and related libraries was a big step forward. But maybe, just like AtomicReference, it's use wasn't exactly obvious. My favorite part of it was the java.util.concurrent.Future class; which allows a return value from a thread, as well as exceptions to be thrown from the thread; and caught by the calling thread.

What I really wanted though, was to be able to run any block of code in another thread, and not have to think about the specifics. So I put together this wrapper class in scala:

case class Result[T](private val future: java.util.concurrent.Future[T]) {
  private lazy val value = future.get()
  def await(): T = value
}
object Async {
  val threadPool = Executors.newCachedThreadPool()
  def async[T](func: => T): Result[T] = {
    Result(Async.threadPool.submit(new Callable[T]() {
      def call(): T = func
    }))
  }
  def await[T](result: Result[T]): T = result.await()
}

Full source available at : http://code.google.com/p/naedyrscala/

Here's some example usage, with a couple of variations on syntax:
val sum = async { (1 to 100000000).reduceLeft(_ + _) ; println("finished1")}
val sum2 = async { (1 to 100000000).reduceLeft(_ + _); println("finished2") }
val sum3 = async { (1 to 100000000).reduceLeft(_ + _); println("finished3") }
println("do something else")
println(sum.await)
println(await { sum })
println(await(sum3))

With this wrapper, the entire usage boils down to the async and await functions.

  • Pass any block of code to async, and it will run in another thread.
  • To get the result of that thread, call await.

I was inspired by the C# version of async/await, but this scala version is far more general and simpler to use.

If you use await directly on an async call, the effect is to run the block in another thread, but then wait on that thread in the current thread, which is pretty pointless:
val sum = await{async {
      (1 to 100000000).reduceLeft(_ + _)
     println("finished3")
    }}
println("do something else")
println(sum)
You can have other threads await on each other. I don't think it's possible to create a deadlock situation with these semantics.
val sum = async { (1 to 100000000).reduceLeft(_ + _) }
val result = async {
      val x = await(sum)
      println("callback " + x)
      x
}
println(await(result)+await(sum))
If you run the below code several times, you'll see differing numbers and orderings of the "+/- amount comments", but the final amount, after the awaits should always be the same.
case class Account(private val initialAmount: Int) {
  val balance = Atom(initialAmount)
  def withdraw(amount: Int) = {
    balance.set { x => println("-" + amount); x - amount }
  }
  def deposit(amount: Int) = {
    balance.set { x => println("+" + amount); x + amount }
  }
}
val account = Account(500)
   val results1 = async {
   account.deposit(100)
   account.withdraw(100)
}
val results2 = async {
  account.withdraw(10)
  account.withdraw(10)
}
val results3 = async { account.withdraw(10) }
await(results1)
await(results2)
await(results3)
assertEquals((500 + 100 - 100 - 10 - 10 - 10), account.balance.get)

When combined, async/await and atom provide some very easy to use libraries for doing shared state multi threaded programming; with a minimum of effort.

Both the atom and async/await are made possible due to closures. I can't wait to Java 8.

No comments:

Post a Comment