Swift - Combine in-depth

The introduction of Combine will lead to a major shift in the app development ecosystem as Swift will be gradually embracing the power of reactive programming.

The Combine framework provides a declarative Swift API for processing values over time.

These values can represent many kinds of asynchronous events.

Combine declares publishers to expose values that can change over time, and subscribers to receive those values from the publishers.

It provides a native way of handling asynchronous events.

Combine‘s underlying Publisher and Subscriber protocols lay down the foundations of the framework.

— Publishers and Subscribers

A publisher act as a values provider as it will deliver elements to one or more subscribers and must be tied to a subscriber through the receive(subscriber:) method.

Publishers communicate with their subscribers through several methods:

receive(subscription:)
receive(_ input: Int) -> Subscribers.Demand
receive(completion:)

Every Publisher must adhere to this contract for downstream subscribers to function correctly.

Publishers provide a stream of elements which, once subscripted to, become accessible by their downstream subscribers.

The Publisher protocol is composed of two associated generic types, namely Output and Failure which respectively represents a type to produce and a failure type.

Conversely, the subscriber takes two associated generics types, Input and Failure.

Because a subscriber and a publisher are co-dependent, their associated types must match.

Declaring a publisher is pretty straightforward.

Publishers.Sequence<[Int], Never>(sequence: [1, 2, 3, 4])

This declares a Sequence publisher using the Publishers enum type with an underlying Sequence struct. It needs to have one or more subscribers using the receive(Subscriber) method to start delivering elements.

let subscriber = Subscribers.Sink<Int, Never>(
                    receiveCompletion: { 
                       completion in
                      print(completion)
                  }) { value in
                      print(value)
                 }
Publishers
      .Sequence<[Int], Never>(sequence: [1, 2, 3, 4])
      .receive(subscriber: subscriber)

The Sink class creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber . It is initialized using both receiveCompletion(:) and receiveValue(:) closures providing a convenient way to process the values from the publisher, as they come, and be notified once it has completed.

A concrete Input type is explicitly declared — e.g Int — as well as a concrete Failure type — e.g Never — .

// prints 1
// prints 2
// prints 3
// prints 4
// prints finished

At core

First, the Publisher is receiving a subscription event acknowledging that a subscribe request has occurred thus returning a Subscription instance — e.g [Int]

Then, the subscriber is requesting — implicitly with the Sink class — an unlimited number of values. Each value is being received from the upstream publisher.

At last, the Publisher, because it can no longer produce elements, sends a finished completion event which ends its relationship with the subscriber.
Both will no longer be communicating.

Combine’s core implementation lies around the upstream publishers and the downstream subscribers, both tightly dependent to one another.

A Sequence publisher could also be declared in a simpler way using the built-in publisher variable.

[1, 2, 3, 4].publisher
     .sink(receiveCompletion: { completion in
         print(completion)
      }) { value in
         print(value)
      }

The sink() method immediately creates the subscriber, values are delivered straight away and the sequence terminates with a completion event, just like before.

— Subjects

Subjects act both as a Subcriber and a Publisher.

While a subject can receive values from an upstream publisher, it can also pass along these values to its downstream subscribers.

let currentSubject = CurrentValueSubject(1)

A CurrentValueSubject will hold an initial value.

let passthroughSubject = PassthroughSubject()

A passthrough subject will start emitting values only when it gets subscribed to.

At core

First, let’s declare an upstream publisher.

let publisher = [1, 2, 3, 4].publisher

Then, a passthrough subject.

let passthroughSubject = PassthroughSubject<Int, Never>()

We need a way to bind them together as a Publisher can only receive a Subscription type. AnySubscriber comes in handy as it provides an init(s: Subject) which wrap our PassThroughSubject thus becoming useable by our publisher.

let anySubscriber = AnySubscriber(passthroughSubject)
let publisher = [1, 2, 3, 4].publisher()

publisher.receive(subscriber: anySubscriber)

We declared an upstream publisher and a downstream subscriber, we could go a bit further.

As mentioned earlier, since a subscriber also act as a Publisher, we could use the sink() method on the PassThroughSubject to immediately create another downstream subscriber which will be tied to that subject.

let passthroughSubject = PassthroughSubject<Int, Never>()
let anySubscriber = AnySubscriber(passthroughSubject)

let newSubscriber = passthroughSubject
            .sink(receiveCompletion: { completion in
                 print(completion)
             }) { value in
                 print(value)
             }
             
let publisher = [1, 2, 3, 4].publisher
publisher.receive(subscriber: anySubscriber)

The passthroughSubject instance act as a subscriber since it receive elements from its upstream publisher — the publisher instance — while also acting as a publisher passing the received elements to its downstream subscriber — the newSubscriber instance.

The CurrentValueSubject allows direct access to its underlying value using the value property as well as providing a way to pass a new value using the send(Output) method.

let currentValueSubject = CurrentValueSubject<Int, Never>(1)
print(currentValueSubject.value)
// prints 1

currentValueSubject.send(2)
print(currentValueSubject.value)
// prints 2

currentValueSubject.send(completion: .finished)
// sending a completion event

currentValueSubject.send(10) // won’t be printed
print(currentValueSubject.value)
// prints 2

Once a subject — or a subscriber — receives a completion event, it will no longer receive inputs.

— Operators

Combine includes a handful of operators providing a convenient way to process values from an ongoing stream prior to delivering them to the subscriber.

The various operators defined as extensions on Publisher implement their functionality as classes or structures that extend this enumeration.
For example, the contains(_:) operator returns a Publishers.Contains instance.

Some of these operators already exist in the Swift standard library.

map // Publishers.Map
flatMap // Publishers.FlatMap
filter // Publishers.Filter
reduce // Publishers.Reduce

Some of them are new.

tryMap // Publishers.TryMap
tryCatch // Publishers.TryCatch
decode // Publishers.Decode
replaceError // Publishers.ReplaceError
let passthroughSubject = PassthroughSubject<String, Never>()
let anySubscriber = AnySubscriber(passthroughSubject)

passthroughSubject.sink(
     receiveCompletion: { completion in
          print(completion)
     }) { value in
          print(value)
     }
     
[1, 2, 3, 4].publisher
    .filter { $0.isMultiple(of: 2) }
    .map { "My even number is \(String($0))" }
    .receive(subscriber: anySubscriber)
    
// prints my even number is 2
// prints my even number is 4
// prints finished

— URL Session

Let’s fully explore the power of reactive programming by making an asynchronous network request using Combine’s built-in publisher dataTaskPublisher.

var subscriptions = Set<AnyCancellable>()

let dataTaskPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://jsonplaceholder.typicode.com/posts")!)

dataTaskPublisher
    .retry(2)
    .map(\.data)
    .decode(type: [Post].self, decoder: JSONDecoder())
    .replaceError(with: [])
    .receive(on: DispatchQueue.main)
    .sink { posts in
        print("There are \(posts.count) posts")
    }.store(in: &subscriptions)
    
// prints There are 100 posts

Such a declarative API is very convenient when it comes to handling complex asynchronous requests while maintaining code readability.

The Publishers enum provides a struct called CombineLatest allowing us to receive the latest elements from two publishers.

let postsDataTaskPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://jsonplaceholder.typicode.com/posts")!)
let commentsDataTaskPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://jsonplaceholder.typicode.com/comments")!)
        
let postsPublisher = postsDataTaskPublisher
     .retry(2)
     .map(\.data)
     .decode(type: [Post].self, decoder: JSONDecoder())
     .replaceError(with: [])
     
let commentsPublisher = commentsDataTaskPublisher
    .retry(2)
    .map(\.data)
    .decode(type: [Comment].self, decoder: JSONDecoder())
    .replaceError(with: [])
    
Publishers.CombineLatest(postsPublisher, commentsPublisher)
     .sink { posts, comments in
          print("There are \(posts.count) posts")
          print("There are \(comments.count) comments")
     }.store(in: &subscriptions)
     
// prints There are 100 posts
// prints There are 500 comments

Alternatively, we could use a fetch() method which returns a publisher.

private enum Error: Swift.Error {
    case invalidResponse
    case invalidJSON
}

private func fetchPosts() -> AnyPublisher<[Post], Swift.Error> {
    let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!
    
    return URLSession.shared.dataTaskPublisher(for: url)
        .tryMap { data, response -> Data in
            guard let httpResponse = response as? HTTPURLResponse,
                      httpResponse.statusCode == 200 else {
                throw Error.invalidResponse
            }
            return data
        }
        .tryMap { data -> [Post] in
             guard let posts = try?
                 JSONDecoder().decode([Post].self, from: data) else
             {
                  throw Error.invalidJSON
             }
    
             return posts
        }
        .eraseToAnyPublisher()
     }

— UI Binding

The assign(to: on) is helpful when it comes to binding the value of a KVO-compliant property from the publisher. Let’s populate our cells from our post objects.

func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) -> UITableViewCell {
    let cell = tableView.dequeueReusableCell(withIdentifier: PostCell.identifier, for: indexPath) as! PostCell
    
    let postsDataTaskPublisher = 
         URLSession
        .shared
        .dataTaskPublisher(for: URL(string:"https://jsonplaceholder.typicode.com/posts")!)
        
     cell.subscriber = postsDataTaskPublisher
         .retry(2)
         .map(\.data)
         .decode(type: [Post].self, decoder: JSONDecoder())
         .replaceError(with: [])
         .map { return $0[indexPath.row].title }
         .receive(on: DispatchQueue.main)
         .assign(to: \.textLabel!.text, on: cell)
     
     return cell
}

final class PostCell: UITableViewCell {
    var subscriber: AnyCancellable?
    static var identifier = "PostCell"
    
    override func prepareForReuse() {
        subscriber?.cancel()
    }
    
    override init(style: UITableViewCell.CellStyle, reuseIdentifier: String?) {
        super.init(style: style, reuseIdentifier: reuseIdentifier)
    }
    
    required init?(coder: NSCoder) {
         fatalError("init(coder:) has not been implemented")
    }
}

— Cancellable

Whenever a subscriber no longer needs to receive elements from a publisher, it may cancel its subscription.
The subscriber types created by sink(receiveCompletion:receiveValue:) and assign(to:on:) both implement the Cancellable protocol, which provides a cancel() method.

var subscriptions = Set<AnyCancellable>()

let dataTaskPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://jsonplaceholder.typicode.com/posts")!)

let postsPublisher = dataTaskPublisher
    .retry(2)
    .map(\.data)
    .decode(type: [Post].self, decoder: JSONDecoder())
    .replaceError(with: [])
    .sink { posts in
        print("There are \(posts.count) posts")
    }
    .store(in: &subscriptions)
    
postsPublisher.cancel()

This won’t print anything yet because reactive programming deals with asynchronous events however the cancel() method is called before data has been fetched from the service.
Adding the following code block will address the issue.

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    postsPublisher.cancel()
}
// prints There are 100 posts

— Type erasers

AnySubscriber and AnyPublisher wraps respectively the Subscriber and Publisher protocols. They come in handy when the details of an underlying object mustn’t be exposed using access controls.

A good use case is when dealing with closures.

Your app may also have the common pattern of using a closure as a property to invoke when certain events happen. With Combine, you can replace this pattern by using a Subject.

A subject allows you to imperatively publish a new element at any time by calling the send() method. 

Adopt this pattern by using a private PassthroughSubject or CurrentValueSubject, then expose this publicly as an AnyPublisher.</em>

A View Controller.

private lazy var passthroughSubject = PassthroughSubject<String, Never>()
lazy var anySubscriber = AnySubscriber(passthroughSubject)

passthroughSubject.send("Hello world !")

Another View Controller.

vc.anySubscriber.sink(receiveCompletion: { completion in
    print(completion)
}) { value in
    print(value)
}.store(in: &subscriptions)

AnyCancellable is a type-erased class that will automatically call cancel() on deinit using Swift’s powerful memory management system.

— RxSwift vs. Combine

In RxSwift..

Publishers are Observables
Subscribers are Observers
Cancellable is Disposable
CurrentValueSubject is BehaviorSubject
PassthroughSubject is PublishSubject
eraseToAnyPublisher is asObservable
handleEvents() is do()

Conclusion

With Combine, Swift takes a significant leap towards reactive programming making it easier to deal with asynchronous events in our apps.

While the adoption will be progressive — Combine is still in its early days — , the power of such a declarative API will definitely enhance the app development process.