on
Combine - Single-valued publisher
While creating a custom operator out of Combine’s built-in ones in the Publisher extension is a useful (and quick!) technique to harmonize some common reactive behaviors, it is sometimes necessary to provide with some more in-depth customization to achieve our goal.
Combine enables us to conform to the Publisher, Subscription and Subscriber protocols to create a tailored operator and gain control over our flow.
Our purpose is to create a single-valued publisher which will deliver one value and completes immediately, or fails.
— Part 1: Creating a custom Publisher
First, we need to extend the Publishers enum which is a namespace for types related to the Publisher protocol in which we create a AsSingle struct that conforms to the Publisher protocol and has one required receive<S>(subscriber: S) method.
From that point on, we’ll be dealing with generics namely the Upstream type which stands for the publisher and the Downstream type which is the subscriber.
extension Publishers {
struct AsSingle<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let upstream: Upstream
init(upstream: Upstream) {
self.upstream = upstream
}
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber))
}
}
}
— Part 2: Creating a custom Subscription
When the publisher receives a subscriber, the subscriber needs to receive a subscription that holds an instance of a AsSingleSink object (a custom subscriber).
The sink instance will be nullified whenever the cancel() method is triggered which causes the publisher to stop delivering values.
extension Publishers.Single {
class Subscription<Downstream: Subscriber>: Combine.Subscription where Upstream.Output == Downstream.Input, Upstream.Failure == Downstream.Failure {
private var sink: AsSingleSink<Upstream, Downstream>?
init(upstream: Upstream, downstream: Downstream) {
sink = .init(upstream: upstream, downstream: downstream)
}
func request(_ demand: Subscribers.Demand) { }
func cancel() {
sink = nil
}
}
}
— Part 3: Creating a custom Subscriber
The final part is to create a subscriber to manage our flow using three required methods.
- receive(subscription: Subscription)
- receive(_ input: Upstream.Output) -> Subscribers.Demand
- receive(completion: Subscribers.Completion
)
We’ll only request a single value to the upstream publisher using subscription.request(.max(1)
The first input will be delivered to the downstream subscriber immediately followed by a finished event so we can match the desired behavior.
We also need to hold an optional variable to check whether the sequence is not empty otherwise we provide with a fatalError in the completion event.
class AsSingleSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where Upstream.Output == Downstream.Input, Downstream.Failure == Upstream.Failure {
private var downstream: Downstream
private var _element: Upstream.Output?
init(upstream: Upstream, downstream: Downstream) {
self.downstream = downstream
upstream.subscribe(self)
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
_element = input
_ = downstream.receive(input)
downstream.receive(completion: .finished)
return .none
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
switch completion {
case .failure(let err):
downstream.receive(completion: .failure(err))
case .finished:
if _element == nil {
fatalError("❌ Sequence doesn't contain any elements.")
}
}
}
}
— Part 5: Creating the operator
The final part is to create the asSingle method in a Publisher extension and simply returns an AsSingle instance.
extension Publisher {
func asSingle() -> Publishers.AsSingle<Self> {
return Publishers.AsSingle(upstream: self)
}
}
— Part 6: Use case
The subscription terminates as soon as the subscriber receives one value.
var subscriptions = Set<AnyCancellable>()
[1, 2, 3].publisher
.asSingle()
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { val in
print(val)
}).store(in: &subscriptions)
// 1
// finished
If the sequence doesn’t contain any elements, it ends up with a fatalError.
[].publisher
.asSingle()
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { val in
print(val)
}).store(in: &subscriptions)
// Fatal error: ❌ Sequence doesn't contain any elements.
Note: In RxSwift, the asSingle() operator behaves quite similarly however turning an Observable sequence with more than one value or no value at all into a Single and subscribing to it ends up with an RxError.
The above implementation is highly based upon the CombineExt repository on Github.