adllm Insights logo adllm Insights logo

Mastering Backpressure in Swift Combine: A Deep Dive into Custom Subscribers

Published on by The adllm Team. Last modified: . Tags: Swift Combine Backpressure Concurrency iOS macOS Custom Subscriber Asynchronous Programming

The Swift Combine framework provides a powerful declarative API for processing values over time. However, managing the flow of data between fast Publishers and potentially slower Subscribers—a concept known as backpressure—is crucial for building robust and performant applications. While Combine offers some built-in tools, there are scenarios where fine-grained control over data consumption is necessary. This is where implementing a custom Subscriber shines.

This article explores the intricacies of backpressure in Combine and provides a comprehensive guide to creating custom Subscriber implementations. We’ll delve into how to meticulously manage demand, process items at a controlled pace, and avoid common pitfalls, all illustrated with practical code examples.

Understanding Backpressure and Demand in Combine

Before diving into custom subscribers, let’s solidify our understanding of core Combine concepts related to data flow control.

  • Publisher: A type that emits a sequence of values over time. More details can be found in the Apple Developer Documentation for Publisher.
  • Subscriber: A type that receives values from a Publisher. See the Apple Developer Documentation for Subscriber.
  • Subscription: A protocol representing the connection between a Publisher and a Subscriber. The Subscriber uses the Subscription to request items and to cancel the connection. Details are in the Apple Developer Documentation for Subscription.
  • Backpressure: A mechanism that allows a Subscriber to control how much data it receives from a Publisher. This prevents the Subscriber from being overwhelmed if the Publisher emits values too quickly.
  • Demand (Subscribers.Demand): The number of values a Subscriber is willing and able to process. Subscribers signal demand to Publishers via the Subscription’s request(_:) method. Learn more from the Apple Developer Documentation for Subscribers.Demand.

The core issue often arises with default Combine subscribers like sink or assign(to:on:). These typically request Subscribers.Demand.unlimited data from the Publisher. If the Subscriber performs complex or time-consuming work for each item, an unbounded demand can lead to excessive memory consumption, UI freezes, or even crashes. Custom Subscribers allow us to move beyond this default and implement more sophisticated demand management.

The Custom Subscriber Solution

To gain precise control over backpressure, we can implement our own type conforming to the Subscriber protocol. This protocol has three key methods we must implement:

  1. receive(subscription: Subscription): This method is called by the Publisher exactly once when the Subscriber successfully subscribes. It’s our opportunity to store the provided Subscription object and, crucially, to make an initial request(_:) for data. Without an initial request, no data will flow.

  2. receive(_ input: Input) -> Subscribers.Demand: This method is called by the Publisher each time it delivers a new value (of type Input). Here, we process the received item. The method must return a Subscribers.Demand indicating how many additional items we are now ready to process. Returning .none signals that we are not ready for more items immediately, while .max(1) (or another integer) signals readiness for more. This returned demand is cumulative with any prior unfulfilled demand.

  3. receive(completion: Subscribers.Completion<Failure>): This method is called by the Publisher exactly once, when the stream of values terminates, either due to a successful completion (.finished) or an error (.failure(Error)). This is where we perform any cleanup tasks.

Implementing a Backpressure-Aware Custom Subscriber: Step-by-Step

Let’s build a custom Subscriber that processes integer values one at a time, simulating a slow processing task.

First, we define the basic structure of our SlowSubscriber. It will conform to Subscriber and Cancellable (for explicit cleanup if needed, though the Subscription handles cancellation primarily).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import Combine
import Foundation // For Thread.sleep

// Our custom subscriber for Int values, potentially failing with an Error.
class SlowSubscriber: Subscriber, Cancellable {
    typealias Input = Int
    typealias Failure = Error

    private var subscription: Subscription?
    private let processingDelay: TimeInterval // How long to simulate work

    init(processingDelay: TimeInterval = 0.5) {
        self.processingDelay = processingDelay
        print("SlowSubscriber: Initialized.")
    }

    // Called when the subscription is first established.
    func receive(subscription: Subscription) {
        print("SlowSubscriber: Received subscription.")
        self.subscription = subscription
        // Crucial: Request the first item. Without this, nothing happens.
        print("SlowSubscriber: Requesting initial 1 item.")
        subscription.request(.max(1))
    }

    // Called for each new value received from the publisher.
    func receive(_ input: Input) -> Subscribers.Demand {
        print("SlowSubscriber: Received input \(input). Processing...")
        
        // Simulate a time-consuming task
        Thread.sleep(forTimeInterval: processingDelay) 
        
        print("SlowSubscriber: Finished processing \(input). Requesting next 1.")
        // After processing, request one more item.
        // Return .none if you are not ready for more items yet.
        return .max(1)
    }

    // Called when the publisher completes (either successfully or with an error).
    func receive(completion: Subscribers.Completion<Failure>) {
        switch completion {
        case .finished:
            print("SlowSubscriber: Received completion - Finished.")
        case .failure(let error):
            print("SlowSubscriber: Received completion - Error: \(error).")
        }
        // Clean up the subscription reference.
        self.subscription = nil
    }

    // From Cancellable protocol: Allows external cancellation.
    func cancel() {
        print("SlowSubscriber: Cancel called explicitly.")
        // Important: Propagate cancellation to the upstream publisher.
        subscription?.cancel()
        subscription = nil
    }
}

Using the SlowSubscriber:

Now, let’s see how to use our SlowSubscriber with a simple PassthroughSubject.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
let subject = PassthroughSubject<Int, Error>()
let slowSubscriber = SlowSubscriber(processingDelay: 1.0)

// Attach the subscriber to the publisher
subject.subscribe(slowSubscriber)

// Start sending values
print("Publisher: Sending 1")
subject.send(1) // Subscriber processes, then requests next

print("Publisher: Sending 2")
subject.send(2) // Subscriber processes, then requests next

print("Publisher: Sending 3")
subject.send(3) // Subscriber processes, then requests next

// Optionally, send completion
// subject.send(completion: .finished)

// To stop early (e.g., if the subscriber instance is deallocated
// or needs to stop for other reasons):
// slowSubscriber.cancel()

When you run this code, you’ll observe that the “Publisher: Sending X” messages might appear quickly, but the “SlowSubscriber: Received input X” and “SlowSubscriber: Finished processing X” messages will be spaced out by the processingDelay. This demonstrates that the SlowSubscriber is controlling the flow, only asking for the next item when it’s ready.

Advanced Backpressure Strategies with Custom Subscribers

Beyond simple one-by-one processing, custom subscribers enable more sophisticated strategies.

Controlled Buffering

You might want your subscriber to accept a small batch of items, process them, and only then request more. This can be efficient if the overhead of requesting each item individually is high or if batch processing is more natural for your task.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class BufferingSubscriber: Subscriber, Cancellable {
    typealias Input = String
    typealias Failure = Never // This subscriber won't handle publisher errors

    private var subscription: Subscription?
    private var buffer: [String] = []
    private let bufferCapacity: Int
    private let itemsToRequestWhenEmpty: Int
    private var currentlyProcessing = false // Simple flag for sync processing

    init(bufferCapacity: Int = 3, itemsToRequestWhenEmpty: Int = 3) {
        self.bufferCapacity = bufferCapacity
        self.itemsToRequestWhenEmpty = itemsToRequestWhenEmpty
        print("BufferingSubscriber: Initialized. Capacity: \(bufferCapacity)")
    }

    func receive(subscription: Subscription) {
        print("BufferingSubscriber: Received subscription.")
        self.subscription = subscription
        print("BufferingSubscriber: Req. initial \(itemsToRequestWhenEmpty) items.")
        subscription.request(.max(itemsToRequestWhenEmpty))
    }

    func receive(_ input: Input) -> Subscribers.Demand {
        print("BufferingSubscriber: Received '\(input)'. Buf: \(buffer.count)")
        buffer.append(input)
        
        // Asynchronously process buffer to avoid blocking receive(_:)
        // A more robust solution might use a serial dispatch queue for processing.
        DispatchQueue.global().async { self.processBuffer() }

        // We don't request more on each item receipt.
        // Demand is managed by processBuffer after it clears items.
        return .none 
    }

    private func processBuffer() {
        // Simple atomicity for this example.
        // For production, use proper synchronization like a serial queue.
        guard !currentlyProcessing else { return }
        currentlyProcessing = true

        while !buffer.isEmpty {
            let item = buffer.removeFirst()
            print("BufferingSubscriber: Processing '\(item)'. Buf: \(buffer.count)")
            Thread.sleep(forTimeInterval: 0.3) // Simulate work
            print("BufferingSubscriber: Finished '\(item)'.")
        }
        
        currentlyProcessing = false
        print("BufferingSubscriber: Buffer empty. Requesting more.")
        // Buffer is empty, request more items
        subscription?.request(.max(itemsToRequestWhenEmpty))
    }

    func receive(completion: Subscribers.Completion<Failure>) {
        print("BufferingSubscriber: Received completion: \(completion).")
        // Process any remaining items on completion
        DispatchQueue.global().async {
            self.processBuffer()
            // Ensure subscription is nilled out after final processing
            DispatchQueue.main.async { self.subscription = nil }
        }
    }

    func cancel() {
        print("BufferingSubscriber: Cancel called.")
        subscription?.cancel()
        subscription = nil
        buffer.removeAll() // Clear buffer on cancellation
    }
}

In this BufferingSubscriber, items are added to an internal buffer. processBuffer (which should be made thread-safe if called from multiple contexts, e.g., using a serial dispatch queue for buffer access and currentlyProcessing flag) processes items from this buffer. New items are requested only when the buffer is empty after a processing cycle. The demand returned by receive(_ input:) is .none to prevent requesting one item for each item received, letting the processBuffer logic dictate new demand.

Rate Limiting

A custom subscriber can also implement rate limiting by delaying its requests for more data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class RateLimitedSubscriber: Subscriber, Cancellable {
    typealias Input = Int
    typealias Failure = Error

    private var subscription: Subscription?
    private let processingInterval: TimeInterval // Min interval between requests

    init(processingInterval: TimeInterval = 1.0) {
        self.processingInterval = processingInterval
        print("RateLimitedSubscriber: Initialized.")
    }

    func receive(subscription: Subscription) {
        print("RateLimitedSubscriber: Received subscription.")
        self.subscription = subscription
        print("RateLimitedSubscriber: Requesting initial 1 item.")
        subscription.request(.max(1))
    }

    func receive(_ input: Input) -> Subscribers.Demand {
        print("RateLimitedSubscriber: Received input \(input). Processing...")
        // Simulate actual processing of the input
        // For this example, processing is considered instant.
        
        print("RateLimitedSubscriber: Finished processing \(input).")
        
        // Schedule the next request after the defined interval.
        // Use a weak self to avoid retain cycles if subscriber lives long.
        DispatchQueue.main.asyncAfter(deadline: .now() + processingInterval) {
            [weak self] in
            // Ensure subscription still exists (not cancelled or completed)
            guard let self = self, self.subscription != nil else { return }
            print("RateLimitedSubscriber: Interval elapsed. Requesting next 1.")
            self.subscription?.request(.max(1))
        }
        
        // We don't request more immediately; the asyncAfter block handles it.
        return .none 
    }

    func receive(completion: Subscribers.Completion<Failure>) {
        print("RateLimitedSubscriber: Received completion: \(completion).")
        self.subscription = nil // Clear subscription on completion
    }

    func cancel() {
        print("RateLimitedSubscriber: Cancel called.")
        subscription?.cancel()
        subscription = nil
    }
}

This RateLimitedSubscriber processes an item and then uses DispatchQueue.main.asyncAfter to wait for processingInterval before requesting the next item. This ensures that items are consumed at a maximum defined rate.

Common Pitfalls and Best Practices

When implementing custom subscribers, be mindful of these common issues:

  • Forgetting Initial Demand: Not calling subscription.request(_:) in receive(subscription:) is a common mistake that results in no data being delivered.
  • Requesting .unlimited Inadvertently: If your custom subscriber always requests .unlimited or a very large number without a proper processing strategy, you negate the benefits of custom backpressure.
  • Blocking in receive(_ input:): Performing long-running, synchronous operations directly within receive(_ input:) can block the publisher’s thread. Offload heavy work to a background queue if necessary, but carefully manage how demand is signaled then.
  • Not Storing the Subscription: You need to store the Subscription object to request more data or to cancel it.
  • Losing Subscription on Cancellation/Completion: Ensure that if the subscriber is cancelled, it also cancels the upstream Subscription. Similarly, in receive(completion:), nullify the stored subscription.
  • Demand Arithmetic: Remember that demand is cumulative. If you request .max(5) and then return .max(3) from receive(_ input:), the publisher sees this as a total of 8 available slots (minus what’s already delivered). Usually, you request an initial amount, and then receive(_ input:) returns .max(1) or .none to finely control flow one by one or in small batches after processing.
  • Resource Cleanup: Ensure any resources (buffers, timers, etc.) are properly released in receive(completion:) or cancel().
  • Thread Safety: If your subscriber’s methods can be called from different threads, or if internal state is mutated from background tasks (like processBuffer often is), ensure thread-safe access to shared mutable state (e.g., using locks, serial dispatch queues, or atomic operations).

Debugging Your Custom Subscriber

Debugging Combine flows can be tricky. Here are a few tips:

  • print() Operator: Use Combine’s built-in print(_:to:) operator or the handleEvents operator for detailed logging.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    // publisher
    //     .print("UpstreamDebug") // Basic logging
    //     .handleEvents( // More detailed event handling
    //         receiveSubscription: { sub in print("Event: Subscription \(sub)") },
    //         receiveOutput: { val in print("Event: Output \(val)") },
    //         receiveCompletion: { comp in print("Event: Completion \(comp)") },
    //         receiveCancel: { print("Event: Cancel") },
    //         receiveRequest: { demand in print("Event: Demand \(demand)") }
    //     )
    //     .subscribe(myCustomSubscriber)
    
    For even more powerful logging, consider the CombineExt library, which offers an enhanced debug() operator.
  • Logging within Subscriber: Add detailed print statements inside your subscriber’s methods, as shown in the examples.
  • Combine Timelane: This tool provides a visual representation of Combine events, which can be invaluable for understanding complex flows and backpressure. You can find implementations and forks on GitHub, such as this one by the SwiftUI-Plus community.
  • Xcode Debugger: Set breakpoints in your subscriber’s methods to inspect its state and the received values.

When to Use a Custom Subscriber (and When Not To)

Before building a custom Subscriber, always consider if Combine’s built-in operators can achieve your goal:

A custom Subscriber is generally preferred when:

  • You need highly specific, stateful logic for requesting items based on complex conditions (e.g., available resources, external signals, content of the data).
  • You are interfacing with a non-Combine system that has its own pull-based consumption model.
  • You require fine-grained control over buffering strategies beyond what buffer offers.
  • You need to implement unique rate-limiting or item-processing logic that standard operators don’t cover.

If a standard operator can solve your problem with less code, it’s usually the more maintainable choice.

Conclusion

Implementing custom Subscribers in Swift Combine is a powerful technique for mastering backpressure and managing complex asynchronous data flows. By taking explicit control over demand, developers can build more resilient, efficient, and responsive applications. While it requires a deeper understanding of Combine’s mechanics, the ability to tailor data consumption precisely to your application’s needs is an invaluable skill for any Swift developer working with asynchronous event streams. Remember to weigh the benefits against the complexity and always consider simpler, built-in alternatives first.