Combine - Creating a custom subscriber

While Combine provides with a sink method that conveniently « creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber. », the framework also enables us to define our own subscriber by conforming to the Subscriber protocol.

— Use case

class CustomSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never
    
    func receive(subscription: Subscription) {
        subscription.request(.max(1))
    }
    
    func receive(_ input: Int) -> Subscribers.Demand {
        print("Value:", input)
        return .none
    }
    
    func receive(completion: Subscribers.Completion<Never>) {
        print("Completion: \(completion)")
    }
}

Since the Subscriber protocol has two associated types namely Input and Failure, we must specify the type of values our custom subscriber must receive as well as an error type.

There are three additional methods requirements to implement.

  1. receive(subscription: Subscription)
  2. receive(_ input:) -> Subscribers.Demand
  3. receive(completion: Subscribers.Completion)

The receive(subscription: Subscription) method will be triggered one time as soon as the publisher is bound to the subscriber. It requests a certain amount of values to the subscription object through an enum of three cases : none, unlimited, max(value:)).

The receive(_ input:) -> Subscribers.Demand method will be triggered each time a new value is delivered, this is the right moment to reevaluate the amount of values needed.
For instance, you could return .none to stick to the initial request or ask for two more values each time a new input comes in.

Note that it will be added to the initial request, so you can only increase the amount of max values.

let publisher = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].publisher
let subscriber = CustomSubscriber()

publisher.subscribe(subscriber)

// Value: 1

No completion event is printed out since the stream has a finite number of values and we requested just one.

Let’s now demand an unlimited amount of values.

func receive(_ input: Int) -> Subscribers.Demand {
    print("Value:", input)
    return .unlimited
}

// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Value: 5
// Value: 6
// Value: 7
// Value: 8
// Value: 9
// Value: 10
// Completion: finished

Let’s control our flow by conditioning our demand in the receive(_ input:) -> Subscribers.Demand method.

func receive(_ input: Int) -> Subscribers.Demand {
    print("Value:", input)
    return (input == 3) ? .none: .max(1)
}

// Value: 1
// Value: 2
// Value: 3

— Conclusion

Managing our stream through the Subscriber protocol enables us to have better control over our flow while opening up larger reactive implementation options.