on
Combine - Creating a custom subscriber
While Combine provides with a sink
method that conveniently « creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber. », the framework also enables us to define our own subscriber by conforming to the Subscriber
protocol.
— Use case
class CustomSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Value:", input)
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Completion: \(completion)")
}
}
Since the Subscriber
protocol has two associated types namely Input
and Failure
, we must specify the type of values our custom subscriber must receive as well as an error type.
There are three additional methods requirements to implement.
- receive(subscription: Subscription)
- receive(_ input:) -> Subscribers.Demand
- receive(completion: Subscribers.Completion
)
The receive(subscription: Subscription)
method will be triggered one time as soon as the publisher is bound to the subscriber.
It requests a certain amount of values to the subscription object through an enum of three cases : none, unlimited, max(value:)).
The receive(_ input:) -> Subscribers.Demand
method will be triggered each time a new value is delivered, this is the right moment to reevaluate the amount of values needed.
For instance, you could return .none
to stick to the initial request or ask for two more values each time a new input comes in.
Note that it will be added to the initial request, so you can only increase the amount of max
values.
let publisher = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].publisher
let subscriber = CustomSubscriber()
publisher.subscribe(subscriber)
// Value: 1
No completion event is printed out since the stream has a finite number of values and we requested just one.
Let’s now demand an unlimited amount of values.
func receive(_ input: Int) -> Subscribers.Demand {
print("Value:", input)
return .unlimited
}
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Value: 5
// Value: 6
// Value: 7
// Value: 8
// Value: 9
// Value: 10
// Completion: finished
Let’s control our flow by conditioning our demand in the receive(_ input:) -> Subscribers.Demand
method.
func receive(_ input: Int) -> Subscribers.Demand {
print("Value:", input)
return (input == 3) ? .none: .max(1)
}
// Value: 1
// Value: 2
// Value: 3
— Conclusion
Managing our stream through the Subscriber
protocol enables us to have better control over our flow while opening up larger reactive implementation options.