MailboxProcessor & Agents
Build actor-based systems with MailboxProcessor for message-passing concurrency and isolated mutable state.
MailboxProcessor & Agents in F#
Master agent-based concurrency in F# with free flashcards and spaced repetition practice. This lesson covers MailboxProcessor fundamentals, message-passing patterns, and asynchronous agent designβessential concepts for building responsive, concurrent applications in functional programming.
Welcome to the World of Agents π»
In the realm of concurrent programming, managing shared state and coordinating multiple tasks can quickly become a nightmare of locks, race conditions, and deadlocks. F# offers an elegant solution through the MailboxProcessor, an implementation of the agent (or actor) model that encapsulates state and processes messages asynchronously.
π€ Did you know? The actor model was invented by Carl Hewitt in 1973, long before modern multicore processors made concurrency a daily concern. It's the same pattern used by Erlang to build massively concurrent telecom systems with 99.9999999% uptime!
Core Concepts: Understanding Agents
What is a MailboxProcessor?
A MailboxProcessor<'Msg> (often called an "agent") is a lightweight, independent processing unit that:
- π Encapsulates mutable state privately
- π¬ Receives messages through an asynchronous queue (the "mailbox")
- β‘ Processes messages sequentially in the order received
- π Runs on its own async workflow without blocking
π Real-World Analogy
Think of an agent like a customer service representative at a help desk. They:
- Have their own workspace (private state)
- Receive tickets in a queue (mailbox)
- Handle one ticket at a time (sequential processing)
- Work independently without blocking others (asynchronous)
The Agent Architecture
βββββββββββββββββββββββββββββββββββββββββββ β MAILBOXPROCESSOR β βββββββββββββββββββββββββββββββββββββββββββ€ β β β π¬ MAILBOX (Message Queue) β β βββββ βββββ βββββ βββββ β β βMsgβββMsgβββMsgβββMsgββ ... β β βββββ βββββ βββββ βββββ β β β β β βββββββββββββββββββββββββββ β β β MESSAGE PROCESSOR β β β β (async workflow) β β β β βββββββββββββββ β β β β β Match Msg β β β β β β Update Stateβ β β β β β Send Reply β β β β β βββββββββββββββ β β β βββββββββββββββββββββββββββ β β β β β π PRIVATE STATE β β (mutable, encapsulated) β β β βββββββββββββββββββββββββββββββββββββββββββ
Message Types and Patterns
Agents communicate through discriminated unions that define the message protocol:
// Define message types
type CounterMsg =
| Increment
| Decrement
| GetCount of AsyncReplyChannel<int>
| Reset
Key message patterns:
- Fire-and-forget: Send a message without expecting a reply (
Increment,Reset) - Request-reply: Send a message and await a response (
GetCount) - Broadcast: Send the same message to multiple agents
- Pipeline: Chain agents where output becomes input
Creating Your First Agent π
Basic Counter Agent
Let's build a simple counter that handles concurrent increment/decrement operations:
open System
// Define message types
type CounterMsg =
| Increment
| Decrement
| GetCount of AsyncReplyChannel<int>
// Create the agent
let counterAgent = MailboxProcessor<CounterMsg>.Start(fun inbox ->
// Define the message processing loop
let rec loop count = async {
// Wait for the next message
let! msg = inbox.Receive()
// Pattern match on message type
match msg with
| Increment ->
return! loop (count + 1)
| Decrement ->
return! loop (count - 1)
| GetCount replyChannel ->
replyChannel.Reply(count)
return! loop count
}
// Start with initial count of 0
loop 0
)
// Usage
counterAgent.Post(Increment) // Fire-and-forget
counterAgent.Post(Increment)
counterAgent.Post(Decrement)
// Request-reply pattern
let currentCount = counterAgent.PostAndReply(fun replyChannel ->
GetCount replyChannel)
printfn "Current count: %d" currentCount // Output: 1
π‘ Key points:
MailboxProcessor.Startcreates and starts the agentinbox.Receive()asynchronously waits for messages- The recursive
loopfunction maintains state through parameters Postsends fire-and-forget messagesPostAndReplysends a message and blocks until reply
Bank Account Agent (Stateful Example)
A more realistic example managing account transactions:
type AccountMsg =
| Deposit of decimal
| Withdraw of decimal * AsyncReplyChannel<Result<decimal, string>>
| GetBalance of AsyncReplyChannel<decimal>
let createBankAccount initialBalance =
MailboxProcessor<AccountMsg>.Start(fun inbox ->
let rec loop balance = async {
let! msg = inbox.Receive()
match msg with
| Deposit amount ->
let newBalance = balance + amount
printfn "Deposited $%.2f. New balance: $%.2f" amount newBalance
return! loop newBalance
| Withdraw (amount, replyChannel) ->
if amount > balance then
replyChannel.Reply(Error "Insufficient funds")
return! loop balance
else
let newBalance = balance - amount
printfn "Withdrew $%.2f. New balance: $%.2f" amount newBalance
replyChannel.Reply(Ok newBalance)
return! loop newBalance
| GetBalance replyChannel ->
replyChannel.Reply(balance)
return! loop balance
}
loop initialBalance
)
// Usage
let account = createBankAccount 1000.0m
account.Post(Deposit 500.0m)
let result = account.PostAndReply(fun reply -> Withdraw(200.0m, reply))
match result with
| Ok newBalance -> printfn "Success! Balance: $%.2f" newBalance
| Error msg -> printfn "Error: %s" msg
Timeout and TryReceive
Sometimes you need to handle timeouts or check for messages without blocking:
type TimerMsg =
| Tick
| Stop of AsyncReplyChannel<int>
let timerAgent = MailboxProcessor<TimerMsg>.Start(fun inbox ->
let rec loop tickCount = async {
// Wait up to 1 second for a message
let! msgOption = inbox.TryReceive(1000)
match msgOption with
| Some Tick ->
printfn "Tick %d" (tickCount + 1)
return! loop (tickCount + 1)
| Some (Stop replyChannel) ->
replyChannel.Reply(tickCount)
printfn "Timer stopped at %d ticks" tickCount
| None ->
// Timeout - no message received
printfn "Auto-tick %d" (tickCount + 1)
return! loop (tickCount + 1)
}
loop 0
)
// The timer will auto-tick every second
// Manual ticks can also be sent
timerAgent.Post(Tick)
Async.Sleep(2500) |> Async.RunSynchronously
let finalCount = timerAgent.PostAndReply(Stop)
Advanced Patterns π―
Agent Coordination: Producer-Consumer
Multiple agents can work together in pipelines:
type ProcessorMsg =
| Process of string
| Shutdown
// Consumer agent that processes items
let createConsumer name =
MailboxProcessor<ProcessorMsg>.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Process item ->
printfn "[%s] Processing: %s" name item
do! Async.Sleep(100) // Simulate work
printfn "[%s] Completed: %s" name item
return! loop()
| Shutdown ->
printfn "[%s] Shutting down" name
}
loop()
)
// Producer that distributes work
let createProducer (consumers: MailboxProcessor<ProcessorMsg> list) =
MailboxProcessor<string>.Start(fun inbox ->
let rec loop index = async {
let! item = inbox.Receive()
// Round-robin distribution
let consumer = consumers.[index % consumers.Length]
consumer.Post(Process item)
return! loop ((index + 1) % consumers.Length)
}
loop 0
)
// Create pipeline
let consumers = [1..3] |> List.map (fun i -> createConsumer (sprintf "Worker-%d" i))
let producer = createProducer consumers
// Send work items
["Task-A"; "Task-B"; "Task-C"; "Task-D"; "Task-E"; "Task-F"]
|> List.iter producer.Post
Error Handling and Supervision
β οΈ Critical: Unhandled exceptions in agents can crash them silently!
type SafeMsg =
| DoWork of string
| GetStatus of AsyncReplyChannel<string>
let createSupervisedAgent() =
MailboxProcessor<SafeMsg>.Start(fun inbox ->
let rec loop status = async {
try
let! msg = inbox.Receive()
match msg with
| DoWork data ->
// Potentially dangerous operation
if data = "crash" then
failwith "Intentional error"
printfn "Processed: %s" data
return! loop "OK"
| GetStatus reply ->
reply.Reply(status)
return! loop status
with
| ex ->
printfn "Error: %s. Recovering..." ex.Message
return! loop "ERROR"
}
loop "STARTED"
)
let agent = createSupervisedAgent()
agent.Post(DoWork "valid data")
agent.Post(DoWork "crash") // Causes error but agent survives
agent.Post(DoWork "more data") // Continues working
Scan and Selective Message Processing
The Scan method allows processing messages based on current state:
type PriorityMsg =
| Urgent of string
| Normal of string
| GetNextUrgent of AsyncReplyChannel<string option>
let priorityAgent = MailboxProcessor<PriorityMsg>.Start(fun inbox ->
let rec loop() = async {
// Scan for urgent messages first
let! msg = inbox.Scan(function
| Urgent text -> Some(async { return Urgent text })
| GetNextUrgent reply -> Some(async { return GetNextUrgent reply })
| _ -> None // Skip normal messages for now
)
match msg with
| Urgent text ->
printfn "URGENT: %s" text
return! loop()
| GetNextUrgent reply ->
reply.Reply(None)
return! loop()
| _ -> return! loop()
}
loop()
)
Real-World Example: Download Manager π
Let's build a concurrent download manager using agents:
open System.Net.Http
type DownloadMsg =
| Download of url: string * id: int
| GetStatus of AsyncReplyChannel<Map<int, string>>
| CancelAll
let createDownloadManager maxConcurrent =
let httpClient = new HttpClient()
MailboxProcessor<DownloadMsg>.Start(fun inbox ->
let rec loop (activeDownloads: Map<int, string>) currentCount = async {
let! msg = inbox.Receive()
match msg with
| Download (url, id) when currentCount < maxConcurrent ->
// Start async download without waiting
Async.Start(async {
try
printfn "[%d] Starting download: %s" id url
let! content = httpClient.GetStringAsync(url) |> Async.AwaitTask
printfn "[%d] Completed. Size: %d bytes" id content.Length
with
| ex -> printfn "[%d] Failed: %s" id ex.Message
})
let newMap = activeDownloads |> Map.add id "Downloading"
return! loop newMap (currentCount + 1)
| Download (url, id) ->
// Queue is full, repost to try later
do! Async.Sleep(100)
inbox.Post(Download (url, id))
return! loop activeDownloads currentCount
| GetStatus reply ->
reply.Reply(activeDownloads)
return! loop activeDownloads currentCount
| CancelAll ->
printfn "Cancelling all downloads"
return! loop Map.empty 0
}
loop Map.empty 0
)
// Usage
let downloadMgr = createDownloadManager 3
let urls = [
"https://example.com/file1.txt"
"https://example.com/file2.txt"
"https://example.com/file3.txt"
"https://example.com/file4.txt"
"https://example.com/file5.txt"
]
urls |> List.iteri (fun i url ->
downloadMgr.Post(Download (url, i)))
Common Mistakes β οΈ
1. Forgetting to Call the Recursive Loop
β Wrong:
let badAgent = MailboxProcessor.Start(fun inbox ->
let rec loop count = async {
let! msg = inbox.Receive()
match msg with
| Increment -> loop (count + 1) // Missing return!
}
loop 0
)
β Correct:
let goodAgent = MailboxProcessor.Start(fun inbox ->
let rec loop count = async {
let! msg = inbox.Receive()
match msg with
| Increment -> return! loop (count + 1) // return! is essential
}
loop 0
)
2. Blocking Operations in Message Handlers
β Wrong:
| ProcessFile filename ->
let content = System.IO.File.ReadAllText(filename) // Synchronous blocking!
return! loop ()
β Correct:
| ProcessFile filename ->
let! content = System.IO.File.ReadAllTextAsync(filename) |> Async.AwaitTask
return! loop ()
3. Not Handling PostAndReply Timeouts
β Wrong:
let result = agent.PostAndReply(GetData) // Could hang forever!
β Correct:
let result = agent.PostAndReply(GetData, timeout = 5000) // 5 second timeout
// Or use PostAndTryAsyncReply for async timeout handling
4. Sharing Mutable State Between Agents
β Wrong:
let sharedList = ResizeArray<string>() // Mutable shared state
let agent1 = MailboxProcessor.Start(fun inbox ->
// ... modifies sharedList ...
)
let agent2 = MailboxProcessor.Start(fun inbox ->
// ... also modifies sharedList - RACE CONDITION!
)
β Correct:
// Use message passing instead
type CoordinatorMsg =
| AddItem of string
| GetItems of AsyncReplyChannel<string list>
let coordinator = MailboxProcessor.Start(fun inbox ->
let rec loop items = async {
let! msg = inbox.Receive()
match msg with
| AddItem item -> return! loop (item :: items)
| GetItems reply ->
reply.Reply(items)
return! loop items
}
loop []
)
// Both agents communicate through coordinator
5. Ignoring Agent Errors
π‘ Tip: Subscribe to the Error event to catch unhandled exceptions:
let agent = MailboxProcessor.Start(fun inbox -> ...)
agent.Error.Add(fun ex ->
printfn "Agent crashed: %s" ex.Message
// Restart logic here
)
Performance Considerations π
| Scenario | Agents Suitable? | Alternative |
|---|---|---|
| 1-100 concurrent tasks | β Excellent | N/A |
| Thousands of agents | β Good (lightweight) | Consider pooling |
| High-throughput data | β οΈ May bottleneck | Parallel collections |
| CPU-bound work | β Good with Async | Task.Parallel |
| Stateful coordination | β Perfect fit | N/A |
π§ Memory Device: "SIPS"
- State encapsulation (agents own their state)
- Isolation (no shared memory)
- Post messages (communication method)
- Sequential processing (one message at a time)
Key Takeaways π―
- MailboxProcessor provides lightweight agents for concurrent programming without locks
- Messages are processed sequentially within each agent, eliminating race conditions
- Use
Postfor fire-and-forget,PostAndReplyfor request-reply patterns - Encapsulate state within the agent's recursive loop function
- Handle errors explicitly with try-catch or error event handlers
- Agents communicate through messages, never shared mutable state
- The agent model scales well for coordination and state management tasks
- Always use
return!when recursing to continue the message loop
π Quick Reference Card
| Create Agent | MailboxProcessor.Start(fun inbox -> ...) |
| Receive Message | let! msg = inbox.Receive() |
| Receive w/ Timeout | let! msgOpt = inbox.TryReceive(1000) |
| Send Fire-Forget | agent.Post(message) |
| Send w/ Reply | agent.PostAndReply(fun ch -> Msg ch) |
| Selective Receive | inbox.Scan(function ...) |
| Error Handling | agent.Error.Add(handler) |
| Message Type | Discriminated union with reply channels |
Further Study π
- Official F# Documentation - Agents: https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-mailboxprocessor-1.html
- Real-World Functional Programming (Chapter on Agents): https://www.manning.com/books/real-world-functional-programming
- F# for Fun and Profit - MailboxProcessor Tutorial: https://fsharpforfunandprofit.com/posts/concurrency-actor-model/
π§ Try This: Build your own chat server using agents! Create one agent per connected client and a coordinator agent to broadcast messages. This is a perfect real-world application of the agent model and will solidify your understanding of message-passing concurrency.
Remember: Agents aren't just a concurrency toolβthey're a design pattern that helps you think about your application as independent components that communicate through well-defined protocols. Master this pattern, and you'll write cleaner, more maintainable concurrent code! π