on
Swift - Combine in-depth
The introduction of Combine will lead to a major shift in the app development ecosystem as Swift will be gradually embracing the power of reactive programming.
The Combine framework provides a declarative Swift API for processing values over time.
These values can represent many kinds of asynchronous events.
Combine declares publishers to expose values that can change over time, and subscribers to receive those values from the publishers.
It provides a native way of handling asynchronous events.
Combine‘s underlying Publisher
and Subscriber
protocols lay down the foundations of the framework.
— Publishers and Subscribers
A publisher act as a values provider as it will deliver elements to one or more subscribers and must be tied to a subscriber through the receive(subscriber:)
method.
Publishers communicate with their subscribers through several methods:
receive(subscription:)
receive(_ input: Int) -> Subscribers.Demand
receive(completion:)
Every Publisher must adhere to this contract for downstream subscribers to function correctly.
Publishers provide a stream of elements which, once subscripted to, become accessible by their downstream subscribers.
The Publisher protocol is composed of two associated generic types, namely Output
and Failure
which respectively represents a type to produce and a failure type.
Conversely, the subscriber takes two associated generics types, Input
and Failure
.
Because a subscriber and a publisher are co-dependent, their associated types must match.
Declaring a publisher is pretty straightforward.
Publishers.Sequence<[Int], Never>(sequence: [1, 2, 3, 4])
This declares a Sequence
publisher using the Publishers
enum type with an underlying Sequence
struct.
It needs to have one or more subscribers using the receive(Subscriber)
method to start delivering elements.
let subscriber = Subscribers.Sink<Int, Never>(
receiveCompletion: {
completion in
print(completion)
}) { value in
print(value)
}
Publishers
.Sequence<[Int], Never>(sequence: [1, 2, 3, 4])
.receive(subscriber: subscriber)
The Sink
class creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber .
It is initialized using both receiveCompletion(:)
and receiveValue(:)
closures providing a convenient way to process the values from the publisher, as they come, and be notified once it has completed.
A concrete Input
type is explicitly declared — e.g Int
— as well as a concrete Failure
type — e.g Never
— .
// prints 1
// prints 2
// prints 3
// prints 4
// prints finished
At core
First, the Publisher is receiving a subscription event acknowledging that a subscribe request has occurred thus returning a Subscription
instance — e.g [Int]
—
Then, the subscriber is requesting — implicitly with the Sink
class — an unlimited number of values.
Each value is being received from the upstream publisher.
At last, the Publisher
, because it can no longer produce elements, sends a finished
completion event which ends its relationship with the subscriber.
Both will no longer be communicating.
Combine’s core implementation lies around the upstream publishers and the downstream subscribers, both tightly dependent to one another.
A Sequence
publisher could also be declared in a simpler way using the built-in publisher
variable.
[1, 2, 3, 4].publisher
.sink(receiveCompletion: { completion in
print(completion)
}) { value in
print(value)
}
The sink()
method immediately creates the subscriber, values are delivered straight away and the sequence terminates with a completion event, just like before.
— Subjects
Subjects act both as a Subcriber
and a Publisher
.
While a subject can receive values from an upstream publisher, it can also pass along these values to its downstream subscribers.
let currentSubject = CurrentValueSubject(1)
A CurrentValueSubject will hold an initial value.
let passthroughSubject = PassthroughSubject()
A passthrough subject will start emitting values only when it gets subscribed to.
At core
First, let’s declare an upstream publisher.
let publisher = [1, 2, 3, 4].publisher
Then, a passthrough subject.
let passthroughSubject = PassthroughSubject<Int, Never>()
We need a way to bind them together as a Publisher
can only receive a Subscription
type.
AnySubscriber
comes in handy as it provides an init(s: Subject)
which wrap our PassThroughSubject
thus becoming useable by our publisher.
let anySubscriber = AnySubscriber(passthroughSubject)
let publisher = [1, 2, 3, 4].publisher()
publisher.receive(subscriber: anySubscriber)
We declared an upstream publisher and a downstream subscriber, we could go a bit further.
As mentioned earlier, since a subscriber also act as a Publisher
, we could use the sink()
method on the PassThroughSubject
to immediately create another downstream subscriber which will be tied to that subject.
let passthroughSubject = PassthroughSubject<Int, Never>()
let anySubscriber = AnySubscriber(passthroughSubject)
let newSubscriber = passthroughSubject
.sink(receiveCompletion: { completion in
print(completion)
}) { value in
print(value)
}
let publisher = [1, 2, 3, 4].publisher
publisher.receive(subscriber: anySubscriber)
The passthroughSubject
instance act as a subscriber since it receive elements from its upstream publisher — the publisher
instance — while also acting as a publisher passing the received elements to its downstream subscriber — the newSubscriber
instance.
The CurrentValueSubject
allows direct access to its underlying value using the value
property as well as providing a way to pass a new value using the send(Output)
method.
let currentValueSubject = CurrentValueSubject<Int, Never>(1)
print(currentValueSubject.value)
// prints 1
currentValueSubject.send(2)
print(currentValueSubject.value)
// prints 2
currentValueSubject.send(completion: .finished)
// sending a completion event
currentValueSubject.send(10) // won’t be printed
print(currentValueSubject.value)
// prints 2
Once a subject — or a subscriber — receives a completion event, it will no longer receive inputs.
— Operators
Combine includes a handful of operators providing a convenient way to process values from an ongoing stream prior to delivering them to the subscriber.
The various operators defined as extensions on Publisher implement their functionality as classes or structures that extend this enumeration.
For example, the contains(_:) operator returns a Publishers.Contains instance.
Some of these operators already exist in the Swift standard library.
map // Publishers.Map
flatMap // Publishers.FlatMap
filter // Publishers.Filter
reduce // Publishers.Reduce
Some of them are new.
tryMap // Publishers.TryMap
tryCatch // Publishers.TryCatch
decode // Publishers.Decode
replaceError // Publishers.ReplaceError
let passthroughSubject = PassthroughSubject<String, Never>()
let anySubscriber = AnySubscriber(passthroughSubject)
passthroughSubject.sink(
receiveCompletion: { completion in
print(completion)
}) { value in
print(value)
}
[1, 2, 3, 4].publisher
.filter { $0.isMultiple(of: 2) }
.map { "My even number is \(String($0))" }
.receive(subscriber: anySubscriber)
// prints my even number is 2
// prints my even number is 4
// prints finished
— URL Session
Let’s fully explore the power of reactive programming by making an asynchronous network request using Combine’s built-in publisher dataTaskPublisher
.
var subscriptions = Set<AnyCancellable>()
let dataTaskPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://jsonplaceholder.typicode.com/posts")!)
dataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
.receive(on: DispatchQueue.main)
.sink { posts in
print("There are \(posts.count) posts")
}.store(in: &subscriptions)
// prints There are 100 posts
Such a declarative API is very convenient when it comes to handling complex asynchronous requests while maintaining code readability.
The Publishers
enum provides a struct called CombineLatest
allowing us to receive the latest elements from two publishers.
let postsDataTaskPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://jsonplaceholder.typicode.com/posts")!)
let commentsDataTaskPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://jsonplaceholder.typicode.com/comments")!)
let postsPublisher = postsDataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
let commentsPublisher = commentsDataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Comment].self, decoder: JSONDecoder())
.replaceError(with: [])
Publishers.CombineLatest(postsPublisher, commentsPublisher)
.sink { posts, comments in
print("There are \(posts.count) posts")
print("There are \(comments.count) comments")
}.store(in: &subscriptions)
// prints There are 100 posts
// prints There are 500 comments
Alternatively, we could use a fetch()
method which returns a publisher.
private enum Error: Swift.Error {
case invalidResponse
case invalidJSON
}
private func fetchPosts() -> AnyPublisher<[Post], Swift.Error> {
let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!
return URLSession.shared.dataTaskPublisher(for: url)
.tryMap { data, response -> Data in
guard let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw Error.invalidResponse
}
return data
}
.tryMap { data -> [Post] in
guard let posts = try?
JSONDecoder().decode([Post].self, from: data) else
{
throw Error.invalidJSON
}
return posts
}
.eraseToAnyPublisher()
}
— UI Binding
The assign(to: on)
is helpful when it comes to binding the value of a KVO-compliant property from the publisher.
Let’s populate our cells from our post objects.
func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) -> UITableViewCell {
let cell = tableView.dequeueReusableCell(withIdentifier: PostCell.identifier, for: indexPath) as! PostCell
let postsDataTaskPublisher =
URLSession
.shared
.dataTaskPublisher(for: URL(string:"https://jsonplaceholder.typicode.com/posts")!)
cell.subscriber = postsDataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
.map { return $0[indexPath.row].title }
.receive(on: DispatchQueue.main)
.assign(to: \.textLabel!.text, on: cell)
return cell
}
final class PostCell: UITableViewCell {
var subscriber: AnyCancellable?
static var identifier = "PostCell"
override func prepareForReuse() {
subscriber?.cancel()
}
override init(style: UITableViewCell.CellStyle, reuseIdentifier: String?) {
super.init(style: style, reuseIdentifier: reuseIdentifier)
}
required init?(coder: NSCoder) {
fatalError("init(coder:) has not been implemented")
}
}
— Cancellable
Whenever a subscriber no longer needs to receive elements from a publisher, it may cancel its subscription.
The subscriber types created by sink(receiveCompletion:receiveValue:) and assign(to:on:) both implement the Cancellable protocol, which provides a cancel() method.
var subscriptions = Set<AnyCancellable>()
let dataTaskPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://jsonplaceholder.typicode.com/posts")!)
let postsPublisher = dataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
.sink { posts in
print("There are \(posts.count) posts")
}
.store(in: &subscriptions)
postsPublisher.cancel()
This won’t print anything yet because reactive programming deals with asynchronous events however the cancel()
method is called before data has been fetched from the service.
Adding the following code block will address the issue.
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
postsPublisher.cancel()
}
// prints There are 100 posts
— Type erasers
AnySubscriber
and AnyPublisher
wraps respectively the Subscriber
and Publisher
protocols.
They come in handy when the details of an underlying object mustn’t be exposed using access controls.
A good use case is when dealing with closures.
Your app may also have the common pattern of using a closure as a property to invoke when certain events happen. With Combine, you can replace this pattern by using a Subject.
A subject allows you to imperatively publish a new element at any time by calling the send() method.
Adopt this pattern by using a private PassthroughSubject or CurrentValueSubject, then expose this publicly as an AnyPublisher.</em>
A View Controller.
private lazy var passthroughSubject = PassthroughSubject<String, Never>()
lazy var anySubscriber = AnySubscriber(passthroughSubject)
passthroughSubject.send("Hello world !")
Another View Controller.
vc.anySubscriber.sink(receiveCompletion: { completion in
print(completion)
}) { value in
print(value)
}.store(in: &subscriptions)
AnyCancellable
is a type-erased class that will automatically call cancel()
on deinit using Swift’s powerful memory management system.
— RxSwift vs. Combine
In RxSwift..
Publishers
are Observables
Subscribers
are Observers
Cancellable
is Disposable
CurrentValueSubject
is BehaviorSubject
PassthroughSubject
is PublishSubject
eraseToAnyPublisher
is asObservable
handleEvents()
is do()
Conclusion
With Combine, Swift takes a significant leap towards reactive programming making it easier to deal with asynchronous events in our apps.
While the adoption will be progressive — Combine is still in its early days — , the power of such a declarative API will definitely enhance the app development process.