Understanding Subscriber in Combine by re-implementing Sink

June 7, 2020

When we first start learning Combine, the introductory example is often something like this:

var subscriptions = Set<AnyCancellable>()
let publisher = [1, 2, 3, 4].publisher

publisher
    .sink(
        receiveCompletion: { print("Received completion: \($0)") },
        receiveValue: { print("Received value: \($0)") }
    )
    .store(in: &subscriptions)

This prints the following to the console:

Received value: 1
Received value: 2
Received value: 3
Received value: 4
Received completion: finished

The sink method returns an instance of AnyCancellable, which wraps an internal instance of type Sink.

Both Sink and Assign are built-in subscribers provided by the Combine framework. According to Apple’s documentation, Sink is “a simple subscriber that requests an unlimited number of values upon subscription.” In this post, we'll re-implement Sink ourselves, calling it MySink, to better understand how Subscriber works under the hood.

Step 1: Conform to Subscriber

Because MySink is a subscriber, it must conform to the Subscriber protocol:

final class MySink<Input, Failure>: Subscriber where Failure: Error {
    func receive(subscription: Subscription) {
        subscription.request(.unlimited)
    }
    
    func receive(_ input: Input) -> Subscribers.Demand {
        print("Received value: \(input)")
        return .none
    }
    
    func receive(completion: Subscribers.Completion<Failure>) {
        print("Completion: \(completion)")
    }
}

In the receive(subscription:) method, we request an unlimited number of values, as expected from the standard Sink. In receive(_:), we return none, meaning we don’t change the demand. (Note: demand is additive in Combine.)

Let’s test it:

let publisher = [1, 2, 3, 4].publisher
let mySink1 = MySink<Int, Never>()
publisher.subscribe(mySink1)

Output

Received value: 1
Received value: 2
Received value: 3
Received value: 4
Received completion: finished

So far, so good!

Step 2: Create a mySink Extension

Now let’s extend Publisher with a method that mimics sink:

extension Publisher {
    func mySink() -> AnyCancellable {
        let sink = MySink<Output, Failure>()
        subscribe(sink)
        return AnyCancellable(sink)
    }
}

But this won't compile. You'll get the following error:

Initializer ’init(_:) requires that ‘MySink<Self.Output, Self.Failure>’ conform to ‘Cancellable’

To fix that, let’s make MySink conform to Cancellable:

final class MySink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
    func cancel() {}
}

Testing it again:

Received value: 1
Received value: 2
Received value: 3
Received value: 4
Received completion: finished

It works! But let’s take it a step further.

Step 3: Support Cancellation Properly

Let’s try using a PassthroughSubject instead:

let subject = PassthroughSubject<String, Never>()
let mySink2 = subject.mySink()
mySink2.store(in: &subscriptions)

subject.send("A")
subject.send("B")

// Cancel the subscription
mySink2.cancel()
subject.send("C")

Console output:

Received value: A
Received value: B
Received value: C

Something's wrong! We’re still receiving values after cancellation!

That’s because our cancel() method doesn’t inform the publisher that we want to stop. To fix this, we need to store the Subscription instance and cancel it properly:

final class MySink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
    private var subscription: Subscription?

    func receive(subscription: Subscription) {
        self.subscription = subscription
        subscription.request(.unlimited)
    }

    func cancel() {
        subscription?.cancel()
        subscription = nil
    }

    // Other methods remain the same...
}

Now when we run the code:

Received value: A
Received value: B

Perfect.

Step 4: Make MySink More Flexible

So far, we’re just printing values. But we want to allow custom behavior like the original sink. Let's update MySink to accept closures:

final class MySink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
    private let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
    private let receiveValue: (Input) -> Void
    private var subscription: Subscription?
    
    init(
        receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
        receiveValue: @escaping (Input) -> Void
    ) {
        self.receiveValue = receiveValue
        self.receiveCompletion = receiveCompletion
    }
    
    func receive(subscription: Subscription) {
        self.subscription = subscription
        subscription.request(.unlimited)
    }

    func receive(_ input: Input) -> Subscribers.Demand {
        receiveValue(input)
        return .none
    }
    
    func receive(completion: Subscribers.Completion<Failure>) {
        receiveCompletion(completion)
        subscription = nil
    }

    func cancel() {
        subscription?.cancel()
        subscription = nil
    }
}

And don’t forget to update the mySink extension:

extension Publisher {
    func mySink(
        receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
        receiveValue: @escaping (Output) -> Void
    ) -> AnyCancellable {
        let sink = MySink<Output, Failure>(
            receiveCompletion: receiveCompletion,
            receiveValue: receiveValue
        )
        subscribe(sink)
        return AnyCancellable(sink)
    }
}

Wrapping Up

And that’s it! We’ve re-implemented the core behavior of Combine’s Sink and sink. While the real implementation is more complex and handles more edge cases, our version captures the key ideas. Rebuilding components like this is a great way to deepen your understanding of Combine’s architecture.

👉 You can find the full source code here.