Distributing the F# Mailbox Processor


Note: This blog post is part of the 2014 F# Advent Calendar. Be sure to check out yesterday’s Intro to Data Science post by Jon Wood!

Mailbox Processors 101

If you’ve been using F# for any reasonable length of time, you’ll have come across the MailboxProcessor, AKA the F# Agent (or Actor). Mailbox Processors are cool. They give us the ability to offload load to background processors without worrying about managing the thread that they live on (as agents silently “go to sleep” when they aren’t processing anything), and they take away the pain of locking as they ensure that only one message will be processed at a time whilst automatically queuing up backed up messages. They also allow us to visualise problems differently to how we might do so when just using a raw Task, in terms of message passing. We can partition data based by pushing them to different actors and thus in some cases eliminate locking issues and transactions all together.

Here’s a sample Mailbox Processor that receives an arbitrary “message” and “priority” for a specific user and automatically accumulates them together into a file (let’s imagine it’s a distributed file e.g. Azure Blob Storage or similar):-


// Our simple domain model
type Priority =
| Low
| Normal
| High
type Message = {
Text : string
Priority : Priority
}
type MessageCollection =
{ Count : int
Messages : Message list }
type Command =
| Record of Message // Add a new message to file
| Clear // Wipe out all messages
/// Create an Mailbox Processor for a specific actor.
let CreateActor (ActorKey name) =
MailboxProcessor.Start(fun mailbox ->
let messageStore = GetMessageStore name
let rec loop data =
async {
// Wait asynchronously until we get a message
let! message = mailbox.Receive()
// Process the message
match message with
| Clear ->
// Delete file and loop around.
messageStore.DeleteIfExists()
return! loop { Count = 0; Messages = [] }
| Record message ->
// Append the new message to file.
let updatedData =
{ data with
Count = data.Count + 1
Messages = data.Messages @ [ message ] }
messageStore.SetData(updatedData)
return! (loop updatedData)
}
// Start by loading the existing data, then loop indefinitely
let data = defaultArg (messageStore.GetData()) { Count = 0; Messages = [] }
loop data)
// Create an actor and post to it locally
let isaac = CreateActor "isaac"
isaac.Post(Record { Text = "Hello"; Priority = Priority.Low })

view raw

CloudAgent-1.fs

hosted with ❤ by GitHub

As is common with F#, we use a discriminated union to show the different types of messages that can be received and pattern match on them to determine the outcome. Also notice how the agent code itself is wrapped in a generator function so that we can easily create new agents as required.

All this is all great – what’s not to love? Well, in turns out that the Mailbox Processor does have a few limitations: –

  • In process only. You can’t distribute workloads across multiple machines or processes.
  • Does not handle routing. If you want multiple instances of the same agent, you need to manually create / destroy those agents on demand, and ensure that there is a way to route messages to the correct agent. This might be something as simple as a Map of string and agent, or something more complex.
  • Does not have any fault tolerance built-in. If your agent dies whilst processing a message, your message will be lost (unless you have written your own fault handling mechanism). If the agent crashes, all queued messages will also be lost.

These are things that are sometimes native in other languages like Erlang, or provided in some frameworks.

Actors on F#

At this point, it’s worth pointing out that there are already several actor frameworks that run on F# (some of more utility than others…): –

  • Akka .NET – a port of the Scala framework Akka. Akka is a tried-and-tested framework, and the port is apparently very close to the original version, so it might be a good call if you’re coming from that background.
  • Cricket – an extensible F# actor framework that has support for multiple routing and supervision mechanisms.
  • Orleans – an actor framework by Microsoft that was designed for C# but can be made to work with F#. However, it’s not particularly idiomatic in terms of F#, and seems to have been in alpha and beta forever.
  • MBrace – a general distributed compute / data framework that can be made to work as an actor framework.

One commonality between the above is that none of them use the native Mailbox Processor as a starting point. I wanted something that would enable me to simply lift my existing Mailbox Processor code into a distributed pool of workers. So, I wrote CloudAgent – a simple, easy-to-use library that is designed to do one thing and one thing only – distribute Mailbox Processors with the minimal amount of change to your existing codebase. It adds the above three features (distribution, routing and resilience) to Mailbox Processors by pushing the bulk of the work into another component – Azure Service Bus (ASB).

Azure Service Bus

ASB is a resilient, cheap and high-performance service bus offered on Azure as a platform-as-a-service (PAAS). As such, you do not worry about the physical provisioning of the service – you simply sign into the Azure portal, create a service bus, and then create things like FIFO queues or multicast Topics underneath it. The billing for this is cheap – something like $0.01 for 10,000 messages, and it takes literally seconds to create a service bus and queue.

How do we use ASB for distribution of Mailbox Processors? Well, CloudAgent uses it as a resilient backing store for the Mailbox Processor queue, so instead of seeing messages stack up in the mailbox processor itself, they stack up in Azure instead, and are pulled one at a time into the mailbox processor. CloudAgent automatically serializes and deserializes the messages, so as far as the Mailbox Processor is concerned, this happens transparently (currently this is JSON but I’m looking to plug in other frameworks such as FSPickler in the future). We’ll see now how we use the features of ASB to provide the previously mentioned three features that we want to add to Mailbox Processors.

Distribution and Routing

These first two characteristics can essentially be dealt with in one question: “How can I scale Mailbox Processor?”. Firstly, as we’re using Service Bus, it automatically handles both multiple publishers and subscribers for a given FIFO queue. This allows us to push many messages onto a queue, and have many worker processes handling messages simultaneously. This is something CloudAgent does automatically for us – when a consumer starts listening to a Service Bus Queue, it will immediately start polling for new messages (or, as we’ll see shortly, sessions), and then route then to an “appropriate” worker. What does this mean? To answer that, we need to understand that there are two types of worker models: –

Worker Pools

Worker Pools in CloudAgent are what I would classify as “dumb” agents. They do not fit in with the “actor” paradigm, but more for processing of generic messages that do not necessary need to be ordered in a specific sequence, or by a single instance. This might be useful where you need “burst out” capability for purely functional computations that can be scaled horizontally without reliance on other external sources of data. In this model, we use a standard ASB queue to hold messages, and set up as many machines as we want to process messages. (By default, CloudAgent will create 512 workers per node). Each CloudAgent node will simply poll for messages and allocate each one to a random agent in its local pool.

Actor Pools

Actor Pools fit more with the classic Agent / Actor paradigm. Here, messages are tagged with a specific ActorKey, which ensures that only a single instance of a F# Mailbox Processor will process messages for this actor at any one time. We use a feature of ASB Queues, called “Sessions”, to perform the routing: Each CloudAgent node will request the next available “session” to process; the session represents the stream of messages for a particular actor. Once a session is made available (by a message being sent to the queue, with a new actor key), this will be allocated to a particular worker node, and subsequently to a new instance of Mailbox Processor for that actor (CloudAgent maintains a map of Actor Key / Mailbox Processors for local routing).

So if you send 10 messages to Actor “Joe Bloggs”, these will all be routed to the same physical machine, and the same instance of Mailbox Processor. Once the messages “dry up”, that specific mailbox processor will be disposed of by the CloudAgent node; when new messages appear, a new instance will be allocated within the pool and the whole cycle starts again.

Here’s an example of how we would connect our existing Mailbox Processor from above into CloudAgent in terms of both producer of messages (equivalent of Post) and consumer of messages:


// A connection to a Azure Service Bus-backed CloudAgent actor pool
let cloudConnection = CloudConnection.ActorCloudConnection(ServiceBusConnection "Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=fsdoifsm0983m29048098dfs", Connections.Queue "myqueue")
(* Posting messages – run this code on every producer *)
let sendTo = ConnectionFactory.SendToActorPool CloudConnection
let sendToIsaac = sendTo (ActorKey "isaac")
sendToIsaac Clear |> Async.RunSynchronously // send a message to Clear the file to the Isaac actor
sendToIsaac(Record { Text = "Hello"; Priority = Priority.Low }) |> Async.RunSynchronously // Send a new message to Isaac
(* Consuming messages – run this code on every consumer *)
open FSharp.CloudAgent
open FSharp.CloudAgent.Messaging
// Start listening! CloudAgent will start listening to Service Bus for messages and create Mailbox Processors as required.
let disposable = ConnectionFactory.StartListening(cloudConnection, CreateActor >> BasicCloudAgent)

view raw

CloudAgent-2.fs

hosted with ❤ by GitHub

Notice that there is no change to the Mailbox Processor code whatsoever. All we have done is bind up the creation of a Mailbox Processor to ASB through CloudAgent. Instead of us calling Post on an agent directly, we send a message to Service Bus, which in turn is captured by CloudAgent on a worker node, and then internally Posted to the appropriate Mailbox Processor. In this context, you can think of CloudAgent as a framework over Mailbox Processors to route and receive messages through Azure Service Bus Queues.

Resiliency

An orthagonal concern to routing and distribution is that of message resiliency. One of the features that we get for free by pushing messages through Azure Service Bus is that messages waiting to be processed are by definition resilient – they’re stored on Service Bus for a user-defined period until they are picked up off the queue and processed. You might set this to a minute, a day, or a week – it doesn’t matter. So until a message starts to be processed, we do not have to worry if no consumers are available. But what about messages that are being processed – what if the Mailbox Processor crashes part way through? Again, CloudAgent offers us a way of solving this: –

Basic Agents

Basic Agents contain no fault tolerance within the context of message processing. This actually fits nicely within the context of the standard “Fire-and-forget” mechanism of Posting messages to F# MPs. Whilst it’s your responsibility to ensure that you handle failures yourself, you don’t have to worry about repeats of messages – they will only ever be sent to the pool once. You might use this for messages that might go wrong once in a while where it’s not the end of the world if they do. With a Basic Agent, the above Mailbox Processor code sample would not need to change at all.

Resilient Agents

Service Bus also optionally gives us “at least once” processing. This means that once we finish processing a message, we must response to Service Bus and “confirm” that we successfully processed it. If we don’t respond in time, Service Bus will assume that the processor has failed, and resend the message; if too many attempts fail, the message will automatically get dead-lettered. How do we map this “confirmation” process into MailboxProcessors? That’s easy – through a variant of the native PostAndReply mechanism offered by Mailbox Processors. Here, every message we receive contains a payload and a reply channel we call with a choice of Completed, Failed or Abandoned. Failure tells Service Bus to retry (until it exceeds the retry limit), whilst Abandon will immediately dead-letter the message. This is useful for “bad data” rather than transient failures such as database connection failures, where you would probably want the retry functionality.

Here’s how we change our Mailbox Processor code to take advantage of this resilient behaviour; notice that the Receive() call now returns a payload and a callback function that gets translated into ASB. We also add a new business rule that says we can never have more than 5 messages saved at once; if we do, we’ll reject the message by Abandoning it: –


let CreateActor (ActorKey name) =
MailboxProcessor.Start(fun mailbox ->
let messageStore = GetMessageStore name
let rec loop data =
async {
// Wait asynchronously until we get a message + reply channel
let! message, replyWith = mailbox.Receive()
match message with
| Clear ->
messageStore.DeleteIfExists()
replyWith Completed // confirm processing!
return! loop { Count = 0; Messages = [] }
| Record message when data.Count < 5 ->
let updatedData =
{ data with
Count = data.Count + 1
Messages = data.Messages @ [ message ] }
messageStore.SetData(updatedData)
replyWith Completed // confirm processing!
return! (loop updatedData)
| Record message ->
// bad data – max limit of messages is 5
replyWith Abandoned
}
let data = defaultArg (messageStore.GetData()) { Count = 0; Messages = [] }
loop data)
// Change how we start listening to the pool as well – ResilientCloudAgent instead of BasicCloudAgent
let disposable = ConnectionFactory.StartListening(cloudConnection, CreateActor >> ResilientCloudAgent)

view raw

CloudAgent-3.fs

hosted with ❤ by GitHub

At the cost of having to explicitly reply to every message we process, we now get retry functionality with automatic dead lettering. If the agent crashes and does not respond in a specific time, Service Bus will also automatically resend the message for a new agent to pick up. Bear in mind though that this also means however that if the first consumer does not respond in time, Service Bus will assume it has died and repost it – so a message may be posted many times. Therefore, in this model, you should design your agents to process messages in an idempotent manner.

Conclusion

Here’s a screenshot of three processes (all running on the same machine, but could be distributed) subscribing to the same service bus through CloudAgent and being sent messages for three different Actors. Notice how all the messages for a given actor have an affinity to a particular console window (and therefore consumer and agent): –

CloudAgentService Bus enables us to quickly and easily convert Mailbox Processors from single-process concepts into massively scalable and fault-tolerant workers that can be used as dumb workers or as actors with in-built routing. The actual CloudAgent code is pretty small – just four or five .fs files and around 500 lines of code. This isn’t only because F# is extremely succinct and powerful, but also because Azure is doing the heavy lifting of everything we want from a messaging subsystem; when coupled with the Message Processor / Agent paradigm, I believe that this forms a simple, yet compelling offering for distributing processing in a fairly friction-free manner.

12 thoughts on “Distributing the F# Mailbox Processor

  1. Great article, great project – love it! A couple of questions, if I may?

    1 – Sagas
    I’m guessing a combination of resilient agents and idempotent messages would go a long way here, but how should sagas be handled?

    2 – Hosting
    What are the hosting options (VM Process, Worker Role, WebJob, Azure Website. etc.?).

    1. Hi Sean!

      If I’m honest, I’d never heard the term Sagas before 🙂 But it seems like this simply refers to long running processes. You would need to be careful of ensuring that the service bus queue has a reasonable length of time set to process messages but yes, resilient messages + idempotent processing would seem to be how this would best be managed – you would probably need to manage state somewhere e.g. blob storage or SQL and when your actor starts up, read in the state to allow it to continue processing in case it died after e.g. 59 minutes into a one hour process. An alternative might be to split up the message into distinct sub-messages, each one considered standalone – when one is completed, it would post a new message onto the service bus for the next stage to be processed independently.

      Hosting could be any of the options you’ve listed I suppose, although as Web Jobs sit within a Website anyway, you may as well just run directly within a website – plus running an always-on actor pool as a webjob isn’t really what they were designed for. I personally wouldn’t bother with a direct IAAS-style VM process either – websites and worker roles both offer much easier deployment and maintenance mechanisms and one of them should probably be your first port of call.

      1. Hi Isaac

        Thanks for the response.

        For sagas, unless you’re planning another implementation, I’m sure that will all work fine for me right now.

        There are a few implementations in the .net space that wrap sagas really well. One of those is Rebus (https://github.com/rebus-org/Rebus/wiki) which is fully open source. Rebus is based on the NServiceBus implementation (www.particular.net), but that is now a Freemium.

        For the general saga case, serialisation and correlation/retrieval of stages is important. That’s handled by the likes of Rebus/NSB/etc., and they also bring gifts like timeout handling, amongst others.

        However, none of them have the joyful elegance of F#!

        For deployment – happy to take the website or worker role route. Are you planning any example projects for these?

        Many thanks

        Sean

  2. Yep, I was actually – probably over the next week or so I’ll put together a demonstrator using a Worker Role 🙂 If you want a slightly more flexible framework (albeit one that does not support the native Mailbox Processor and that requires a little bit more effort), you might still want to look at either Cricket or Akka .NET, although I must admit I’ve not used either of them in anger.

    1. Awesome – looking forward to seeing that! It would be nice to have a website example too as they’re free, so it would increase the reach imo! $0.02 🙂

      I’ve looked at Akka .NET, but not Cricket – I’ll check that out soon. What I’m liking about your CloudAgent compared to Akka.NET is (1) durable messaging and Azure queue integration baked right in, and (2) the staying close to the standard F# implementation. The latter is important for me right now as this is a candidate for my first live F# use case!

Leave a comment