Functional Reactive Programming - Publisher and Observable

RxSwift and Combine are reactive programming solutions whose purpose is to handle asynchronous events. RxSwift is the most popular framework whereas Combine is Apple’s recently introduced built-in solution.

Dealing with such a different programming paradigm may be tricky at first however, when it comes to networking, one can really feels the benefits of these solutions.

Both frameworks provide with different ways of handling network requests in a declarative manner.

— Combine

struct Post: Decodable {
    let userId: Int
    let id: Int
    let title: String
    let body: String
}

final class Webservice {
    private enum Error: Swift.Error {
        case invalidResponse(URLResponse)
        case invalidJSON(Swift.Error)
        case invalidData
    }
    
    func fetchPosts() -> AnyPublisher<[Post], Swift.Error> {
        let url = URL(
            string: “https://jsonplaceholder.typicode.com/posts"
        )!
        return URLSession.shared.dataTaskPublisher(for: url)
            .tryMap { data, response -> Data in
                guard 
                    let httpResponse = response as? HTTPURLResponse, 
                    httpResponse.statusCode == 200 else {
                        throw Error.invalidResponse(response)
                }
                return data
            }
            .decode(type: [Post].self, decoder: JSONDecoder())
            .mapError { Error.invalidJSON($0) }
            .receive(on: DispatchQueue.main)
            .eraseToAnyPublisher()
        }
}

Combine is designed around the Publisher and the Subscriber protocols which are tightly bound to each other. A Publisher will emit events over time while a Subscriber will observe them.

A handful of operators gives the ability to process these events as they occur and deliver the expected result. The URLSession class has a built-in dataTaskPublisher that enables us to deal with the following operations:

  1. Map the URL response
  2. Decode the data
  3. Deal with the errors
  4. Specify a scheduler

The power of functional reactive programming lies in the ability to write such operations in a declarative, synchronous way while asynchronous requests are actually performed in the background.

The return type AnyPublisher<[Post], Swift.Error> takes two generics types that is an Array<Post> type and an Error type.

Using Never as en error type would mean the Publisher cannot error out. Incoming errors would then have to be handled with the replaceError operator to provide with the requested return type.

Let’s look at some other operators.

let webService = Webservice()
let posts = webService.fetchPosts()
var cancellable = Set<AnyCancellable>()

The posts publisher needs to be tied to a subscriber to retrieve the events. The sink method comes in handy as it creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber.

posts.sink(receiveCompletion: { completion in
    print(completion)
}) { posts in
    print(posts.count)
}.store(in: &cancellable)
// prints 100
// prints finished

The Publisher emit a hundred posts and terminates (e.g finished event) as mentioned in Apple’s documentation a publisher continues to emit elements until it completes normally or fails.

Using the Sink class from the Subscribers namespace enables us to create and store the subscriber.

let subscriber = Subscribers.Sink<[Post], Swift.Error>(receiveCompletion: { completion in
    print(completion)
}) { posts in
    print(posts.count)
}

The subscriber needs to be attached to its publisher using the receive method.

posts.receive(subscriber: subscriber)
// prints 100
// prints finished
subscriber.store(in: &cancellable)

The sink method returns AnyCancellable, a type-erased object, enabling the framework to automatically cancel a subscription at deinit.

Storing the cancellable object in a Set using the store method enables Combine to effectively manage the memory while avoiding retain cycles.

— RxSwift

RxSwift is designed around the ObservableType and the ObserverType protocols. Just like in Combine, Observable emits events over time and Observer listen to them.

The framework, precisely its RxCocoa component, provides a way to use URLSession in a reactive way through its rx extension.

URLSession.shared.rx.response(request: request)

The following fetch() method will return an Observable of the specified type — e.g Array<Post>

final class Webservice {
    let disposeBag = DisposeBag()
    private enum Error: Swift.Error {
        case invalidResponse(URLResponse?)
        case invalidJSON(Swift.Error)
    }
    func fetch() -> Observable<[Post]> {
        let url = URL(
            string: "https://jsonplaceholder.typicode.com/posts"
        )!
        let request = URLRequest(url: url)
        return URLSession.shared.rx.response(request: request)
            .map { result -> Data in
                guard result.response.statusCode == 200 else {
                    throw Error.invalidResponse(result.response)
                }
                return result.data
            }.map { data in
                do {
                    let posts = try JSONDecoder().decode(
                        [Post].self, from: data
                    )
                    return posts
                } catch let error {
                    throw Error.invalidJSON(error)
                }
            }
            .observeOn(MainScheduler.instance)
            .asObservable()
   }
}

The RxSwift equivalent to Combine’s sink is the subscribe function which subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence. . A way of notifying that a subscription has been made to some observable sequence.

let webService = Webservice()
webService.fetch().subscribe(onNext: { posts in
    print(posts.count)
}, onError: { error in
    print(error)
}).disposed(by: disposeBag)

The subscribe method returns a Disposable type that enables the framework to dispose (cancel) a subscription at deinit. The DisposeBag class holds a reference to each suscription and release them when necessary.

RxSwift provides another way to create and return an Observable using the static func create from the ObservableType protocol extension.

func fetch() -> Observable<[Post]> {
    return Observable.create { observer in
        let url = URL(
            string: "https://jsonplaceholder.typicode.com/posts"
        )!
        let session = URLSession.shared
        let dataTask = session.dataTask(with: url) { 
            (data, response, error) in
            guard
                let httpResponse = response as? HTTPURLResponse,
                httpResponse.statusCode == 200 {
                    observer.onError(
                        Error.invalidResponse(response)
                    )
                    return
            }
            guard let data = date else {            
                observer.onError(Error.invalidData)
                return
            }
            do {
                let posts = try JSONDecoder().decode(
                    [Post].self, from: data
                )
                observer.onNext(posts)
                observer.onCompleted()
            } catch let error {
                observer.onError(error)
                return
            }
        }
    
        dataTask.resume()
    
        return Disposables.create {
            dataTask.cancel()
        }
     }
}

Inside the block, the observer function argument is used to emit events such as onError or onNext or onCompleted.

A disposable is returned so the reference to the subscription can be properly released from memory at deinit.

Although RxSwift and Combine implementations using Publisher and Observable are well-suited when it comes to processing a stream of values, both frameworks provide with variations of these protocols particularly adapted to deal with networking requests.

Meet Future and Single.