on
Combine - Single-valued publisher
While creating a custom operator out of Combine’s built-in ones in the Publisher
extension is a useful (and quick!) technique to harmonize some common reactive behaviors, it is sometimes necessary to provide with some more in-depth customization to achieve our goal.
Combine enables us to conform to the Publisher
, Subscription
and Subscriber
protocols to create a tailored operator and gain control over our flow.
Our purpose is to create a single-valued publisher which will deliver one value and completes immediately, or fails.
— Part 1: Creating a custom Publisher
First, we need to extend the Publishers
enum which is a namespace for types related to the Publisher protocol in which we create a AsSingle
struct that conforms to the Publisher
protocol and has one required receive<S>(subscriber: S)
method.
From that point on, we’ll be dealing with generics namely the Upstream
type which stands for the publisher and the Downstream
type which is the subscriber.
extension Publishers {
struct AsSingle<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let upstream: Upstream
init(upstream: Upstream) {
self.upstream = upstream
}
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber))
}
}
}
— Part 2: Creating a custom Subscription
When the publisher receives a subscriber, the subscriber needs to receive a subscription that holds an instance of a AsSingleSink
object (a custom subscriber).
The sink
instance will be nullified whenever the cancel()
method is triggered which causes the publisher to stop delivering values.
extension Publishers.Single {
class Subscription<Downstream: Subscriber>: Combine.Subscription where Upstream.Output == Downstream.Input, Upstream.Failure == Downstream.Failure {
private var sink: AsSingleSink<Upstream, Downstream>?
init(upstream: Upstream, downstream: Downstream) {
sink = .init(upstream: upstream, downstream: downstream)
}
func request(_ demand: Subscribers.Demand) { }
func cancel() {
sink = nil
}
}
}
— Part 3: Creating a custom Subscriber
The final part is to create a subscriber to manage our flow using three required methods.
- receive(subscription: Subscription)
- receive(_ input: Upstream.Output) -> Subscribers.Demand
- receive(completion: Subscribers.Completion
)
We’ll only request a single value to the upstream publisher using subscription.request(.max(1)
The first input will be delivered to the downstream subscriber immediately followed by a finished
event so we can match the desired behavior.
We also need to hold an optional variable to check whether the sequence is not empty otherwise we provide with a fatalError
in the completion event.
class AsSingleSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where Upstream.Output == Downstream.Input, Downstream.Failure == Upstream.Failure {
private var downstream: Downstream
private var _element: Upstream.Output?
init(upstream: Upstream, downstream: Downstream) {
self.downstream = downstream
upstream.subscribe(self)
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
_element = input
_ = downstream.receive(input)
downstream.receive(completion: .finished)
return .none
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
switch completion {
case .failure(let err):
downstream.receive(completion: .failure(err))
case .finished:
if _element == nil {
fatalError("❌ Sequence doesn't contain any elements.")
}
}
}
}
— Part 5: Creating the operator
The final part is to create the asSingle
method in a Publisher
extension and simply returns an AsSingle
instance.
extension Publisher {
func asSingle() -> Publishers.AsSingle<Self> {
return Publishers.AsSingle(upstream: self)
}
}
— Part 6: Use case
The subscription terminates as soon as the subscriber receives one value.
var subscriptions = Set<AnyCancellable>()
[1, 2, 3].publisher
.asSingle()
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { val in
print(val)
}).store(in: &subscriptions)
// 1
// finished
If the sequence doesn’t contain any elements, it ends up with a fatalError
.
[].publisher
.asSingle()
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { val in
print(val)
}).store(in: &subscriptions)
// Fatal error: ❌ Sequence doesn't contain any elements.
Note: In RxSwift, the asSingle()
operator behaves quite similarly however turning an Observable
sequence with more than one value or no value at all into a Single
and subscribing to it ends up with an RxError
.
The above implementation is highly based upon the CombineExt repository on Github.