Friday, August 23, 2013

An Aggregator Pattern for the Akka Actor Framework

Even before I start, I need to give full credit of the prior work to Jamie Allen for his tutorial at http://jaxenter.com/tutorial-asynchronous-programming-with-akka-actors-46220.html. This post is merely an attempt to improve some of the code I still feel is hard to understand and risky to use, no pun intended.

I also need to give tremendous credit for the Typesafe team that gave me the inspiration for coming up with a solution to what I feel is a slight programming model problem. You can read the original posts here.

Problem Statements


Lets talk a bit about some of the difficult areas of developing with the Akka actor framework:

  1. Your receive block is statically defined for all the messages an actor wants to receive. Yes, you can implement state changes using become() and unbecome(). But even then, the actor could only receive one set of messages at a time. If you create an actor to aggregate data from multiple sources, given there are dependencies between the sources, some messages may only be expected if certain preconditions exist or if the logic follows a certain path in the dependency graph. Declaring the pattern of the message to be received in a static way will need to deal with qualifiers (when is it OK to receive this message?) to accept or not accept the message in certain conditions.
  2. Whenever it is decided in the logic flow that a certain message is expected, a receive pattern needs to be added in the receive block. This is not DRY. The developer and any reader of this code will see the logic fragmented between the sender of the request and the receive block.
  3. Risk of unintentional closing over. Yes, it can be avoided by good coders but the risk of running into this issue is very real. This would happen commonly in many places in actor code:
    • When using ask
    • When using inner actors
    • When using futures
    • Even when using the scheduler to schedule a timeout to yourself.
  4. The use of ask to expect a certain message when a request is sent seems like a good way to get out of problem #1 (statically defining the receive block) but ask has some of these other deficiencies:
    • It creates a temporary actor for each ask which results in higher overhead
    • Processing the results of the ask via a future can easily close over the asking actor.
    For those reasons above, and probably more... tell, don't ask!

Some Rules to Follow or Break


Especially the close-over problem brought me to set a couple of concrete rules on writing the actors as follows:
  • No inner actors. If you need to transfer the state from a stateless (listening) actor to a stateful (executing) actor, create the actor instance and forward the message to the actor instance instead.
  • Scheduler calls ONLY send this actor a message. No direct calls to the actor implementation.
Also Promises and Futures are to be used with extreme care. Future callbacks can run on different threads and close-over their actors.

Again rules are rules and are subject to be broken, but only if you really know what you're doing. For the mere mortal rest of us - please stick to them. Or you can end up in no mans land where the behavior becomes undefined.

The Aggregator Pattern


To facilitate an almost 'ask' like behavior where we could dynamically add what message we're expecting but without the drawbacks of ask, I put together the Aggregator trait to be mixed into actors. This trait takes over the receive block (just like many other traits such as FSM). The trait provides expect, expectOnce, and unexpect calls that can be used as in the samples below...

  // Sending a message expecting one response  
  context.actorOf(Props[FooBarActor]) ! FooBarRequest(reqId) 
  expectOnce {
    case FooBarResponse(reqId, content) => println(content) 
  }  

  // Always expecting a certain message like an exception from other actors   
  expect {
    case FooBarException(message, cause) => println(message) 
  }  

  // Expecting a message  
  val expectHandle = expect {
    case FooBarException(message, cause) => println(message) 
  }

  // At some point, we want to no longer expect - this also works with expectOnce
  unexpect(expectHandle)  

The ready-to-use code for the aggregator trait is available here.

The Real Sample


We'll base the code on the AccountBalanceRetriever sample, just adding a twist allowing caller an option to select certain account types to display, which results in the actor responding with a subset of all account types. The first part of the code is just a shameless copy of what is already out there with just the case objects added for identifying the account types, and few other case objects used for timing out, and so forth...

import scala.collection._
import scala.concurrent.duration._
import scala.math.BigDecimal.int2bigDecimal

import akka.actor._

sealed trait AccountType
case object Checking extends AccountType
case object Savings extends AccountType
case object MoneyMarket extends AccountType

case class GetCustomerAccountBalances(id: Long, accountTypes: Set[AccountType])
case class GetAccountBalances(id: Long)

case class AccountBalances(accountType: AccountType, 
                           balance: Option[List[(Long, BigDecimal)]])

case class CheckingAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(balances: Option[List[(Long, BigDecimal)]])

case object TimedOut
case object CantUnderstand

class SavingsAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) =>
      sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
  }
}
class CheckingAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) =>
      sender ! CheckingAccountBalances(Some(List((3, 15000))))
  }
}
class MoneyMarketAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) =>
      sender ! MoneyMarketAccountBalances(None)
  }
}

Next is the actual AccountBalanceRetriever using the aggregator pattern.

class AccountBalanceRetriever extends Actor with Aggregator {

  import context._

  expectOnce {
    case GetCustomerAccountBalances(id, types) =>
      new AccountAggregator(sender, id, types)
    case _ =>
      sender ! CantUnderstand
      context.stop(self)
  }

  class AccountAggregator(originalSender: ActorRef,
                          id: Long, types: Set[AccountType]) {

    val results =
      mutable.ArrayBuffer.empty[(AccountType, Option[List[(Long, BigDecimal)]])]

    if (types.size > 0)
      types foreach {
        case Checking => fetchCheckingAccountsBalance()
        case Savings => fetchSavingsAccountsBalance()
        case MoneyMarket => fetchMoneyMarketAccountsBalance()
      }
    else collectBalances() // Empty type list yields empty response

    context.system.scheduler.scheduleOnce(250 milliseconds) {
      self ! TimedOut
    }
    expect {
      case TimedOut => collectBalances(force = true)
    }

    def fetchCheckingAccountsBalance() {
      context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case CheckingAccountBalances(balances) =>
          results += (Checking -> balances)
          collectBalances()
      }
    }

    def fetchSavingsAccountsBalance() {
      context.actorOf(Props[SavingsAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case SavingsAccountBalances(balances) =>
          results += (Savings -> balances)
          collectBalances()
      }
    }

    def fetchMoneyMarketAccountsBalance() {
      context.actorOf(Props[MoneyMarketAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case MoneyMarketAccountBalances(balances) =>
          results += (MoneyMarket -> balances)
          collectBalances()
      }
    }

    def collectBalances(force: Boolean = false) {
      if (results.size == types.size || force) {
        originalSender ! results.toList // Make sure it becomes immutable
        context.stop(self)
      }
    }
  }
}

As you can see, the aggregator patterns allows specifying custom expect or expectOnce blocks dynamically based on what message we are expecting in what conditions. For instance, if the types initially requested do not include the money-market account, we would never expect a MoneyMarketAccountBalances message and such a rogue message arriving will just not be handled by the actor. The code for this AccountBalanceRetriever can be found here.

Conclusion


Orchestration flows commonly make use of the aggregator pattern which obtains data from multiple sources. Even these data sources are dynamic in nature. The same aggregator may touch upon some data sources in some conditions and others in other conditions, thus demanding a more dynamic actor receive configuration. In addition, the use of inner actors and techniques for timing out an actor have a high risk of closing over. These can sometimes be hard to identify by the casual developers eye. We should have a solid and safe to use programming pattern with the Aggregator trait and the simple set of rules applied to prevent closing over.

Happy coding, and follow the rules. Don't engage in risky behavior!!!

2 comments:

  1. Very nice pattern, thanks for sharing! I have only one remark to your sample code: why do you use “new AccountAggregator” instead of putting that code into a method? The constructor’s side-effects is all that you are after, and wrapping this in an object is a little unconventional.

    ReplyDelete
    Replies
    1. You're absolutely right. In this particular example it would make total sense to use a method "aggregateAccount" instead of an "AccountAggregator" class. The methods fetch...AccountsBalance and collectBalances will need to become inner methods of aggregateAccount again in order to have access to the results field. The finishing call collectBalances just reads the results field and send the results back to the original sender.

      In more complicated use cases there will be multiple vals, possibly of type Promise that need to be accessed by methods and expect blocks. The finishing call often invokes a transformation on this object and sends back or forwards the result of said transformation. In this more common scenario, a transformation on an object is much more natural than a transformation on a method or function.

      Yet again, we can have these vals be members of the actor and run a transformation on the actor itself. In such cases all vals will be initialized without obtaining the initial request unless we use lazy initialization. I think it boils down to developer preference at that point. Thanks!

      Delete