ANTIMATERIAL
Back to all posts
by Philip Kluz

Better View Models: Taming Concurrent Effects

When async effects complete out of order, your app's state becomes a race condition waiting to happen. Let's make our view models more predicable.

The Action Queue: Taming Concurrent Effects

In Part 1 of this series, we built a minimal reducer-effect system. It worked - until it didn't. The moment two effects completed out of order, state became unpredictable.

Today we fix that. By the end, you'll have an actor-backed execution context that processes actions serially, no matter when effects fire.

The Problem, Revisited

Here's our send(_:) from Part 1:

func send(_ action: Action) {
    let (newState, effect) = reduce(action: action)
    state = newState
    
    Task {
        await effect.run { [weak self] action in
            self?.send(action)
        }
    }
}

The bug hides in plain sight. Each effect spawns its own Task. Each task calls send(_:) when it completes. Nothing coordinates them.

Picture a search field. User types "swift":

Time ─────────────────────────────────────────────────►

User types:    s      w      i      f      t
               │      │      │      │      │
Effects:       ▼      ▼      ▼      ▼      ▼
            Task 1  Task 2  Task 3  Task 4  Task 5
               │      │      │      │      │
               │      │      │      └──────┼───► completes first (fast server)
               │      │      └─────────────┼───► completes second
               │      └────────────────────┼───► completes fourth
               └───────────────────────────┼───► completes third
                                           └───► completes last

Final state: results for "s" (Task 1 finished last)
Expected:    results for "swift"

The network doesn't care about your intentions. Responses arrive when they arrive. Without coordination, the last response wins - even if it's for the wrong query.

First Attempt: An Action Queue

The fix seems obvious. Don't process actions immediately - queue them:

final class FeatureViewModel: Reducer {
    var state: State
    private var actionQueue: [Action] = []
    
    func send(_ action: Action) {
        actionQueue.append(action)
        processNextAction()
    }
    
    private func processNextAction() {
        guard !actionQueue.isEmpty else { return }
        
        let action = actionQueue.removeFirst()
        let (newState, effect) = reduce(action: action)
        state = newState
        
        Task {
            await effect.run { [weak self] action in
                self?.send(action)
            }
        }
    }
}

Better? Not really. We moved the problem, not solved it.

processNextAction() still fires effects in unstructured tasks. Those tasks still call send(_:) concurrently. We've added a queue but nothing enforces serial processing.

Second Attempt: Process Until Empty

What if we kept processing until the queue drains?

private func processNextAction() {
    while !actionQueue.isEmpty {
        let action = actionQueue.removeFirst()
        let (newState, effect) = reduce(action: action)
        state = newState
        
        Task {
            await effect.run { [weak self] action in
                self?.send(action)
            }
        }
    }
}

Same problem. The while loop processes the current queue contents synchronously. But effects are async. By the time an effect completes and calls send(_:), the loop has long since exited.

We need a loop that waits for work. An infinite loop that suspends when idle and resumes when actions arrive.

Third Attempt: The Infinite Loop

private func processLoop() async {
    while true {
        if actionQueue.isEmpty {
            // ??? wait somehow ???
            continue
        }
        
        let action = actionQueue.removeFirst()
        let (newState, effect) = reduce(action: action)
        state = newState
        
        await effect.run { [weak self] action in
            self?.send(action)
        }
    }
}

Two problems:

  1. How do we wait? Busy-waiting (while actionQueue.isEmpty {}) burns CPU. We need a suspension point that resumes when work arrives.

  2. Where does this loop run? It's async, so it needs a Task. But that Task lives forever. Who owns it? When does it stop?

Let's solve the waiting problem first. Swift gives us AsyncStream - a channel that suspends consumers until producers send values:

final class FeatureViewModel: Reducer {
    var state: State
    
    private var actionStream: AsyncStream<Action>!
    private var actionContinuation: AsyncStream<Action>.Continuation!
    
    init(state: State = State()) {
        self.state = state
        
        self.actionStream = AsyncStream { continuation in
            self.actionContinuation = continuation
        }
        
        // Start the processing loop
        Task {
            await processLoop()
        }
    }
    
    func send(_ action: Action) {
        actionContinuation.yield(action)
    }
    
    private func processLoop() async {
        for await action in actionStream {
            let (newState, effect) = reduce(action: action)
            state = newState
            
            await effect.run { [weak self] action in
                self?.send(action)
            }
        }
    }
}

Now we're getting somewhere. send(_:) pushes actions into the stream. processLoop() pulls them out one at a time. The for await suspends when the stream is empty and resumes when new actions arrive.

But we've introduced a new bug. A serious one.

The Data Race

func send(_ action: Action) {
    actionContinuation.yield(action)  // Called from anywhere
}

private func processLoop() async {
    for await action in actionStream {
        // ...
        state = newState  // Mutated here
    }
}

send(_:) can be called from any context - the main thread, a background task, an effect callback. processLoop() runs in its own task. Both touch shared mutable state: the continuation, and state itself.

This is a data race. Swift 6's strict concurrency checking would flag it immediately. Even in Swift 5, it's undefined behavior waiting to corrupt your state.

We need mutual exclusion. One execution context that owns the queue, the state, and the processing loop. Only one piece of code should touch these at a time.

Swift has exactly the right tool: actors.

Fourth Attempt: The Actor

actor ReducerExecutor<R: Reducer> {
    private var reducer: R
    private var actionStream: AsyncStream<R.Action>!
    private var actionContinuation: AsyncStream<R.Action>.Continuation!
    
    var state: R.State {
        reducer.state
    }
    
    init(reducer: R) {
        self.reducer = reducer
        
        self.actionStream = AsyncStream { continuation in
            self.actionContinuation = continuation
        }
        
        Task {
            await processLoop()
        }
    }
    
    func send(_ action: R.Action) {
        actionContinuation.yield(action)
    }
    
    private func processLoop() async {
        for await action in actionStream {
            let (newState, effect) = reducer.reduce(action: action)
            reducer.state = newState
            
            await effect.run { [weak self] action in
                guard let self else { return }
                await self.send(action)
            }
        }
    }
}

The actor guarantees:

  1. Serial access. Only one task executes actor-isolated code at a time. No data races.
  2. Safe mutation. reducer.state is only modified inside processLoop(), which runs serially.
  3. Thread-safe sends. send(_:) can be called from anywhere. The actor serializes access to the continuation.

Let's trace through our search example again:

Time ─────────────────────────────────────────────────►

User types:    s      w      i      f      t
               │      │      │      │      │
send():        ▼      ▼      ▼      ▼      ▼
            yield   yield  yield  yield  yield
               │      │      │      │      │
               └──────┴──────┴──────┴──────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │  ActionStream   │
                    │  [s, w, i, f, t]│
                    └────────┬────────┘
                             │
                    for await action
                             │
                             ▼
                    ┌─────────────────┐
                    │  processLoop()  │
                    │  (serial)       │
                    └─────────────────┘
                             │
              processes one at a time
                             │
        ┌────────────────────┼────────────────────┐
        ▼                    ▼                    ▼
    action: s            action: w           ... etc
        │                    │
   reduce → effect      reduce → effect
        │                    │
   await effect         await effect
        │                    │
   (cancels prev?)      (cancels prev?)

Wait. There's still a problem.

The Await Trap

Look at processLoop() again:

for await action in actionStream {
    let (newState, effect) = reducer.reduce(action: action)
    reducer.state = newState
    
    await effect.run { ... }  // ⚠️ We wait here
}

We await the effect. That means we don't process the next action until the current effect completes.

For our search field, that's catastrophic. User types "s", we fire an API call, and block the queue until it returns. Keystrokes "w", "i", "f", "t" pile up, waiting. The UI feels frozen.

We don't want to wait for effects. We want to fire and forget - let effects run concurrently while we keep processing actions. But we still need effect results (the actions they produce) to flow back through the queue.

private func processLoop() async {
    for await action in actionStream {
        let (newState, effect) = reducer.reduce(action: action)
        reducer.state = newState
        
        // Fire and forget - don't await
        Task { [weak self] in
            await effect.run { action in
                guard let self else { return }
                await self.send(action)
            }
        }
    }
}

Now effects run concurrently, but their result actions still flow through send(), which yields to the stream, which gets processed serially by processLoop().

The state updates are serial. The effects are concurrent. That's exactly what we want.

The Complete Implementation

actor ReducerExecutor<R: Reducer> {
    private var reducer: R
    private var actionStream: AsyncStream<R.Action>!
    private var actionContinuation: AsyncStream<R.Action>.Continuation!
    
    var state: R.State {
        reducer.state
    }
    
    init(reducer: R) {
        self.reducer = reducer
        
        self.actionStream = AsyncStream { continuation in
            self.actionContinuation = continuation
        }
        
        Task { await processLoop() }
    }
    
    func send(_ action: R.Action) {
        actionContinuation.yield(action)
    }
    
    private func processLoop() async {
        for await action in actionStream {
            let (newState, effect) = reducer.reduce(action: action)
            reducer.state = newState
            
            Task { [weak self] in
                await effect.run { action in
                    guard let self else { return }
                    await self.send(action)
                }
            }
        }
    }
}

Usage:

let executor = ReducerExecutor(reducer: FeatureViewModel())

await executor.send(.incrementTapped)
await executor.send(.loadTapped)

// Later, from an effect callback, from any thread:
await executor.send(.dataLoaded(42))

What We Solved

Problem Solution
Actions processed out of order AsyncStream + serial for await loop
Infinite loop blocks Async iteration suspends when idle
Data races on queue/state Actor isolation
Effects blocking the queue Fire-and-forget with Task { }

What's Still Broken

We're not done. The actor solves ordering and safety, but:

No main thread guarantee. The actor runs on the cooperative thread pool. If SwiftUI observes state, we're updating a @Published property off the main thread. Crash.

No cancellation. Those fire-and-forget effect tasks? They run until completion even if the executor is deallocated. Memory leaks, wasted work, potential crashes.

No lifecycle management. The processing loop runs forever. When should it stop? What happens when the view disappears?

We'll tackle the main thread problem in Part 3: The Execution Context.

Homework Solution: Effect.merge

Part 1's challenge: extend Effect to combine multiple effects.

Here's one approach:

extension Effect {
    static func merge(_ effects: Effect...) -> Effect {
        Effect { send in
            await withTaskGroup(of: Void.self) { group in
                for effect in effects {
                    group.addTask {
                        await effect.run(send)
                    }
                }
            }
        }
    }
}

Usage in a reducer:

case .loginTapped:
    state.isLoading = true
    return (state, .merge(
        .run { _ in await analytics.track("login_attempted") },
        .run { send in
            let result = await api.login(email: state.email, password: state.password)
            send(.loginCompleted(result))
        }
    ))

Both effects run concurrently. The merged effect completes when all children complete. The send callback is shared - both effects can emit actions.

Alternative: sequential execution. If order matters, use a loop instead of withTaskGroup:

static func concatenate(_ effects: Effect...) -> Effect {
    Effect { send in
        for effect in effects {
            await effect.run(send)
        }
    }
}

Your Challenge

Our ReducerExecutor spawns effect tasks but never tracks them. If the user navigates away mid-request, those tasks keep running.

Challenge: Add a way to track in-flight effect tasks. When ReducerExecutor is deallocated (or a new cancel method is called), cancel all tracked tasks.

Hint: You'll need to handle task cleanup when effects complete naturally, not just on cancellation.

Next Up

Part 3: The Execution Context - Guaranteeing state mutations happen on the main actor, without sacrificing the benefits of actor isolation.

Enjoyed this post?

Back to all posts