Combine - Single-valued publisher

While creating a custom operator out of Combine’s built-in ones in the Publisher extension is a useful (and quick!) technique to harmonize some common reactive behaviors, it is sometimes necessary to provide with some more in-depth customization to achieve our goal.

Combine enables us to conform to the Publisher, Subscription and Subscriber protocols to create a tailored operator and gain control over our flow.

Our purpose is to create a single-valued publisher which will deliver one value and completes immediately, or fails.

— Part 1: Creating a custom Publisher

First, we need to extend the Publishers enum which is a namespace for types related to the Publisher protocol in which we create a AsSingle struct that conforms to the Publisher protocol and has one required receive<S>(subscriber: S) method.

From that point on, we’ll be dealing with generics namely the Upstream type which stands for the publisher and the Downstream type which is the subscriber.

  extension Publishers {
    struct AsSingle<Upstream: Publisher>: Publisher {
        typealias Output = Upstream.Output
        typealias Failure = Upstream.Failure

         private let upstream: Upstream

         init(upstream: Upstream) {
            self.upstream = upstream
        }

         func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
            subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber))
        }
    }
}

— Part 2: Creating a custom Subscription

When the publisher receives a subscriber, the subscriber needs to receive a subscription that holds an instance of a AsSingleSink object (a custom subscriber).

The sink instance will be nullified whenever the cancel() method is triggered which causes the publisher to stop delivering values.

extension Publishers.Single {
    class Subscription<Downstream: Subscriber>: Combine.Subscription where Upstream.Output == Downstream.Input, Upstream.Failure == Downstream.Failure {
        private var sink: AsSingleSink<Upstream, Downstream>?

         init(upstream: Upstream, downstream: Downstream) {
            sink = .init(upstream: upstream, downstream: downstream)
        }

         func request(_ demand: Subscribers.Demand) { }

         func cancel() {
            sink = nil
        }
    }
}

— Part 3: Creating a custom Subscriber

The final part is to create a subscriber to manage our flow using three required methods.

We’ll only request a single value to the upstream publisher using subscription.request(.max(1)

The first input will be delivered to the downstream subscriber immediately followed by a finished event so we can match the desired behavior.

We also need to hold an optional variable to check whether the sequence is not empty otherwise we provide with a fatalError in the completion event.

class AsSingleSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where Upstream.Output == Downstream.Input, Downstream.Failure == Upstream.Failure {
    private var downstream: Downstream
    private var _element: Upstream.Output?

     init(upstream: Upstream, downstream: Downstream) {
        self.downstream = downstream
        upstream.subscribe(self)
    }

     func receive(subscription: Subscription) {
        subscription.request(.max(1))
    }

     func receive(_ input: Upstream.Output) -> Subscribers.Demand {
        _element = input
        _ = downstream.receive(input)
        downstream.receive(completion: .finished)
        
        return .none
    }

     func receive(completion: Subscribers.Completion<Upstream.Failure>) {
        switch completion {
        case .failure(let err):
            downstream.receive(completion: .failure(err))
        case .finished:
            if _element == nil {
                fatalError("❌ Sequence doesn't contain any elements.")
            }
        }
    }
}

— Part 5: Creating the operator

The final part is to create the asSingle method in a Publisher extension and simply returns an AsSingle instance.

  extension Publisher {
    func asSingle() -> Publishers.AsSingle<Self> {
        return Publishers.AsSingle(upstream: self)
    }
}

— Part 6: Use case

The subscription terminates as soon as the subscriber receives one value.

var subscriptions = Set<AnyCancellable>()

[1, 2, 3].publisher
    .asSingle()
    .sink(receiveCompletion: { c in
        print(c)
    }, receiveValue: { val in
        print(val)
    }).store(in: &subscriptions)

// 1
// finished

If the sequence doesn’t contain any elements, it ends up with a fatalError.

[].publisher
    .asSingle()
    .sink(receiveCompletion: { c in
        print(c)
    }, receiveValue: { val in
        print(val)
    }).store(in: &subscriptions)

// Fatal error: ❌ Sequence doesn't contain any elements.

Note: In RxSwift, the asSingle() operator behaves quite similarly however turning an Observable sequence with more than one value or no value at all into a Single and subscribing to it ends up with an RxError.

The above implementation is highly based upon the CombineExt repository on Github.