Better View Models: Taming Concurrent Effects
When async effects complete out of order, your app's state becomes a race condition waiting to happen. Let's make our view models more predicable.
The Action Queue: Taming Concurrent Effects
In Part 1 of this series, we built a minimal reducer-effect system. It worked - until it didn't. The moment two effects completed out of order, state became unpredictable.
Today we fix that. By the end, you'll have an actor-backed execution context that processes actions serially, no matter when effects fire.
The Problem, Revisited
Here's our send(_:) from Part 1:
func send(_ action: Action) {
let (newState, effect) = reduce(action: action)
state = newState
Task {
await effect.run { [weak self] action in
self?.send(action)
}
}
}The bug hides in plain sight. Each effect spawns its own Task. Each task calls send(_:) when it completes. Nothing coordinates them.
Picture a search field. User types "swift":
Time ─────────────────────────────────────────────────►
User types: s w i f t
│ │ │ │ │
Effects: ▼ ▼ ▼ ▼ ▼
Task 1 Task 2 Task 3 Task 4 Task 5
│ │ │ │ │
│ │ │ └──────┼───► completes first (fast server)
│ │ └─────────────┼───► completes second
│ └────────────────────┼───► completes fourth
└───────────────────────────┼───► completes third
└───► completes last
Final state: results for "s" (Task 1 finished last)
Expected: results for "swift"The network doesn't care about your intentions. Responses arrive when they arrive. Without coordination, the last response wins - even if it's for the wrong query.
First Attempt: An Action Queue
The fix seems obvious. Don't process actions immediately - queue them:
final class FeatureViewModel: Reducer {
var state: State
private var actionQueue: [Action] = []
func send(_ action: Action) {
actionQueue.append(action)
processNextAction()
}
private func processNextAction() {
guard !actionQueue.isEmpty else { return }
let action = actionQueue.removeFirst()
let (newState, effect) = reduce(action: action)
state = newState
Task {
await effect.run { [weak self] action in
self?.send(action)
}
}
}
}Better? Not really. We moved the problem, not solved it.
processNextAction() still fires effects in unstructured tasks. Those tasks still call send(_:) concurrently. We've added a queue but nothing enforces serial processing.
Second Attempt: Process Until Empty
What if we kept processing until the queue drains?
private func processNextAction() {
while !actionQueue.isEmpty {
let action = actionQueue.removeFirst()
let (newState, effect) = reduce(action: action)
state = newState
Task {
await effect.run { [weak self] action in
self?.send(action)
}
}
}
}Same problem. The while loop processes the current queue contents synchronously. But effects are async. By the time an effect completes and calls send(_:), the loop has long since exited.
We need a loop that waits for work. An infinite loop that suspends when idle and resumes when actions arrive.
Third Attempt: The Infinite Loop
private func processLoop() async {
while true {
if actionQueue.isEmpty {
// ??? wait somehow ???
continue
}
let action = actionQueue.removeFirst()
let (newState, effect) = reduce(action: action)
state = newState
await effect.run { [weak self] action in
self?.send(action)
}
}
}Two problems:
How do we wait? Busy-waiting (
while actionQueue.isEmpty {}) burns CPU. We need a suspension point that resumes when work arrives.Where does this loop run? It's
async, so it needs aTask. But that Task lives forever. Who owns it? When does it stop?
Let's solve the waiting problem first. Swift gives us AsyncStream - a channel that suspends consumers until producers send values:
final class FeatureViewModel: Reducer {
var state: State
private var actionStream: AsyncStream<Action>!
private var actionContinuation: AsyncStream<Action>.Continuation!
init(state: State = State()) {
self.state = state
self.actionStream = AsyncStream { continuation in
self.actionContinuation = continuation
}
// Start the processing loop
Task {
await processLoop()
}
}
func send(_ action: Action) {
actionContinuation.yield(action)
}
private func processLoop() async {
for await action in actionStream {
let (newState, effect) = reduce(action: action)
state = newState
await effect.run { [weak self] action in
self?.send(action)
}
}
}
}Now we're getting somewhere. send(_:) pushes actions into the stream. processLoop() pulls them out one at a time. The for await suspends when the stream is empty and resumes when new actions arrive.
But we've introduced a new bug. A serious one.
The Data Race
func send(_ action: Action) {
actionContinuation.yield(action) // Called from anywhere
}
private func processLoop() async {
for await action in actionStream {
// ...
state = newState // Mutated here
}
}send(_:) can be called from any context - the main thread, a background task, an effect callback. processLoop() runs in its own task. Both touch shared mutable state: the continuation, and state itself.
This is a data race. Swift 6's strict concurrency checking would flag it immediately. Even in Swift 5, it's undefined behavior waiting to corrupt your state.
We need mutual exclusion. One execution context that owns the queue, the state, and the processing loop. Only one piece of code should touch these at a time.
Swift has exactly the right tool: actors.
Fourth Attempt: The Actor
actor ReducerExecutor<R: Reducer> {
private var reducer: R
private var actionStream: AsyncStream<R.Action>!
private var actionContinuation: AsyncStream<R.Action>.Continuation!
var state: R.State {
reducer.state
}
init(reducer: R) {
self.reducer = reducer
self.actionStream = AsyncStream { continuation in
self.actionContinuation = continuation
}
Task {
await processLoop()
}
}
func send(_ action: R.Action) {
actionContinuation.yield(action)
}
private func processLoop() async {
for await action in actionStream {
let (newState, effect) = reducer.reduce(action: action)
reducer.state = newState
await effect.run { [weak self] action in
guard let self else { return }
await self.send(action)
}
}
}
}The actor guarantees:
- Serial access. Only one task executes actor-isolated code at a time. No data races.
- Safe mutation.
reducer.stateis only modified insideprocessLoop(), which runs serially. - Thread-safe sends.
send(_:)can be called from anywhere. The actor serializes access to the continuation.
Let's trace through our search example again:
Time ─────────────────────────────────────────────────►
User types: s w i f t
│ │ │ │ │
send(): ▼ ▼ ▼ ▼ ▼
yield yield yield yield yield
│ │ │ │ │
└──────┴──────┴──────┴──────┘
│
▼
┌─────────────────┐
│ ActionStream │
│ [s, w, i, f, t]│
└────────┬────────┘
│
for await action
│
▼
┌─────────────────┐
│ processLoop() │
│ (serial) │
└─────────────────┘
│
processes one at a time
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
action: s action: w ... etc
│ │
reduce → effect reduce → effect
│ │
await effect await effect
│ │
(cancels prev?) (cancels prev?)Wait. There's still a problem.
The Await Trap
Look at processLoop() again:
for await action in actionStream {
let (newState, effect) = reducer.reduce(action: action)
reducer.state = newState
await effect.run { ... } // ⚠️ We wait here
}We await the effect. That means we don't process the next action until the current effect completes.
For our search field, that's catastrophic. User types "s", we fire an API call, and block the queue until it returns. Keystrokes "w", "i", "f", "t" pile up, waiting. The UI feels frozen.
We don't want to wait for effects. We want to fire and forget - let effects run concurrently while we keep processing actions. But we still need effect results (the actions they produce) to flow back through the queue.
private func processLoop() async {
for await action in actionStream {
let (newState, effect) = reducer.reduce(action: action)
reducer.state = newState
// Fire and forget - don't await
Task { [weak self] in
await effect.run { action in
guard let self else { return }
await self.send(action)
}
}
}
}Now effects run concurrently, but their result actions still flow through send(), which yields to the stream, which gets processed serially by processLoop().
The state updates are serial. The effects are concurrent. That's exactly what we want.
The Complete Implementation
actor ReducerExecutor<R: Reducer> {
private var reducer: R
private var actionStream: AsyncStream<R.Action>!
private var actionContinuation: AsyncStream<R.Action>.Continuation!
var state: R.State {
reducer.state
}
init(reducer: R) {
self.reducer = reducer
self.actionStream = AsyncStream { continuation in
self.actionContinuation = continuation
}
Task { await processLoop() }
}
func send(_ action: R.Action) {
actionContinuation.yield(action)
}
private func processLoop() async {
for await action in actionStream {
let (newState, effect) = reducer.reduce(action: action)
reducer.state = newState
Task { [weak self] in
await effect.run { action in
guard let self else { return }
await self.send(action)
}
}
}
}
}Usage:
let executor = ReducerExecutor(reducer: FeatureViewModel())
await executor.send(.incrementTapped)
await executor.send(.loadTapped)
// Later, from an effect callback, from any thread:
await executor.send(.dataLoaded(42))What We Solved
| Problem | Solution |
|---|---|
| Actions processed out of order | AsyncStream + serial for await loop |
| Infinite loop blocks | Async iteration suspends when idle |
| Data races on queue/state | Actor isolation |
| Effects blocking the queue | Fire-and-forget with Task { } |
What's Still Broken
We're not done. The actor solves ordering and safety, but:
No main thread guarantee. The actor runs on the cooperative thread pool. If SwiftUI observes state, we're updating a @Published property off the main thread. Crash.
No cancellation. Those fire-and-forget effect tasks? They run until completion even if the executor is deallocated. Memory leaks, wasted work, potential crashes.
No lifecycle management. The processing loop runs forever. When should it stop? What happens when the view disappears?
We'll tackle the main thread problem in Part 3: The Execution Context.
Homework Solution: Effect.merge
Part 1's challenge: extend Effect to combine multiple effects.
Here's one approach:
extension Effect {
static func merge(_ effects: Effect...) -> Effect {
Effect { send in
await withTaskGroup(of: Void.self) { group in
for effect in effects {
group.addTask {
await effect.run(send)
}
}
}
}
}
}Usage in a reducer:
case .loginTapped:
state.isLoading = true
return (state, .merge(
.run { _ in await analytics.track("login_attempted") },
.run { send in
let result = await api.login(email: state.email, password: state.password)
send(.loginCompleted(result))
}
))Both effects run concurrently. The merged effect completes when all children complete. The send callback is shared - both effects can emit actions.
Alternative: sequential execution. If order matters, use a loop instead of withTaskGroup:
static func concatenate(_ effects: Effect...) -> Effect {
Effect { send in
for effect in effects {
await effect.run(send)
}
}
}Your Challenge
Our ReducerExecutor spawns effect tasks but never tracks them. If the user navigates away mid-request, those tasks keep running.
Challenge: Add a way to track in-flight effect tasks. When ReducerExecutor is deallocated (or a new cancel method is called), cancel all tracked tasks.
Hint: You'll need to handle task cleanup when effects complete naturally, not just on cancellation.
Next Up
Part 3: The Execution Context - Guaranteeing state mutations happen on the main actor, without sacrificing the benefits of actor isolation.