When we first start learning Combine, the introductory example is often something like this:
var subscriptions = Set<AnyCancellable>()
let publisher = [1, 2, 3, 4].publisher
publisher
.sink(
receiveCompletion: { print("Received completion: \($0)") },
receiveValue: { print("Received value: \($0)") }
)
.store(in: &subscriptions)
This prints the following to the console:
Received value: 1
Received value: 2
Received value: 3
Received value: 4
Received completion: finished
The sink
method returns an instance of AnyCancellable
, which wraps an internal instance of type Sink
.
Both Sink
and Assign
are built-in subscribers provided by the Combine framework. According to Apple’s documentation, Sink
is “a simple subscriber that requests an unlimited number of values upon subscription.”
In this post, we'll re-implement Sink
ourselves, calling it MySink
, to better understand how Subscriber
works under the hood.
Step 1: Conform to Subscriber
Because MySink
is a subscriber, it must conform to the Subscriber
protocol:
final class MySink<Input, Failure>: Subscriber where Failure: Error {
func receive(subscription: Subscription) {
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Subscribers.Demand {
print("Received value: \(input)")
return .none
}
func receive(completion: Subscribers.Completion<Failure>) {
print("Completion: \(completion)")
}
}
In the receive(subscription:)
method, we request an unlimited number of values, as expected from the standard Sink
. In receive(_:)
, we return none
, meaning we don’t change the demand. (Note: demand is additive in Combine.)
Let’s test it:
let publisher = [1, 2, 3, 4].publisher
let mySink1 = MySink<Int, Never>()
publisher.subscribe(mySink1)
Output
Received value: 1
Received value: 2
Received value: 3
Received value: 4
Received completion: finished
So far, so good!
Step 2: Create a mySink Extension
Now let’s extend Publisher
with a method that mimics sink
:
extension Publisher {
func mySink() -> AnyCancellable {
let sink = MySink<Output, Failure>()
subscribe(sink)
return AnyCancellable(sink)
}
}
But this won't compile. You'll get the following error:
Initializer ’init(_:)’ requires that ‘MySink<Self.Output, Self.Failure>’ conform to ‘Cancellable’
To fix that, let’s make MySink
conform to Cancellable
:
final class MySink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
func cancel() {}
}
Testing it again:
Received value: 1
Received value: 2
Received value: 3
Received value: 4
Received completion: finished
It works! But let’s take it a step further.
Step 3: Support Cancellation Properly
Let’s try using a PassthroughSubject
instead:
let subject = PassthroughSubject<String, Never>()
let mySink2 = subject.mySink()
mySink2.store(in: &subscriptions)
subject.send("A")
subject.send("B")
// Cancel the subscription
mySink2.cancel()
subject.send("C")
Console output:
Received value: A
Received value: B
Received value: C
Something's wrong! We’re still receiving values after cancellation!
That’s because our cancel()
method doesn’t inform the publisher that we want to stop. To fix this, we need to store the Subscription instance and cancel it properly:
final class MySink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
private var subscription: Subscription?
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited)
}
func cancel() {
subscription?.cancel()
subscription = nil
}
// Other methods remain the same...
}
Now when we run the code:
Received value: A
Received value: B
Perfect.
Step 4: Make MySink More Flexible
So far, we’re just printing values. But we want to allow custom behavior like the original sink. Let's update MySink
to accept closures:
final class MySink<Input, Failure>: Subscriber, Cancellable where Failure: Error {
private let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
private let receiveValue: (Input) -> Void
private var subscription: Subscription?
init(
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
receiveValue: @escaping (Input) -> Void
) {
self.receiveValue = receiveValue
self.receiveCompletion = receiveCompletion
}
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Subscribers.Demand {
receiveValue(input)
return .none
}
func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
subscription = nil
}
func cancel() {
subscription?.cancel()
subscription = nil
}
}
And don’t forget to update the mySink
extension:
extension Publisher {
func mySink(
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
receiveValue: @escaping (Output) -> Void
) -> AnyCancellable {
let sink = MySink<Output, Failure>(
receiveCompletion: receiveCompletion,
receiveValue: receiveValue
)
subscribe(sink)
return AnyCancellable(sink)
}
}
Wrapping Up
And that’s it! We’ve re-implemented the core behavior of Combine’s Sink
and sink
. While the real implementation is more complex and handles more edge cases, our version captures the key ideas. Rebuilding components like this is a great way to deepen your understanding of Combine’s architecture.
👉 You can find the full source code here.