on
Functional Reactive Programming - Publisher and Observable
RxSwift and Combine are reactive programming solutions whose purpose is to handle asynchronous events. RxSwift is the most popular framework whereas Combine is Apple’s recently introduced built-in solution.
Dealing with such a different programming paradigm may be tricky at first however, when it comes to networking, one can really feels the benefits of these solutions.
Both frameworks provide with different ways of handling network requests in a declarative manner.
— Combine
struct Post: Decodable {
let userId: Int
let id: Int
let title: String
let body: String
}
final class Webservice {
private enum Error: Swift.Error {
case invalidResponse(URLResponse)
case invalidJSON(Swift.Error)
case invalidData
}
func fetchPosts() -> AnyPublisher<[Post], Swift.Error> {
let url = URL(
string: “https://jsonplaceholder.typicode.com/posts"
)!
return URLSession.shared.dataTaskPublisher(for: url)
.tryMap { data, response -> Data in
guard
let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw Error.invalidResponse(response)
}
return data
}
.decode(type: [Post].self, decoder: JSONDecoder())
.mapError { Error.invalidJSON($0) }
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
}
Combine is designed around the Publisher
and the Subscriber
protocols which are tightly bound to each other.
A Publisher
will emit events over time while a Subscriber
will observe them.
A handful of operators gives the ability to process these events as they occur and deliver the expected result.
The URLSession
class has a built-in dataTaskPublisher
that enables us to deal with the following operations:
- Map the URL response
- Decode the data
- Deal with the errors
- Specify a scheduler
The power of functional reactive programming lies in the ability to write such operations in a declarative, synchronous way while asynchronous requests are actually performed in the background.
The return type AnyPublisher<[Post], Swift.Error>
takes two generics types that is an Array<Post>
type and an Error
type.
Using Never
as en error type would mean the Publisher cannot error out.
Incoming errors would then have to be handled with the replaceError
operator to provide with the requested return type.
Let’s look at some other operators.
tryMap()
enables us to transforms all elements from the upstream publisher with a provided error-throwing closure.decode()
will decode the specified type with a given decoder (e.gJSONDecoder
).mapError()
enables us to converts any failure from the upstream publisher into a new error.
let webService = Webservice()
let posts = webService.fetchPosts()
var cancellable = Set<AnyCancellable>()
The posts
publisher needs to be tied to a subscriber to retrieve the events.
The sink
method comes in handy as it creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber.
posts.sink(receiveCompletion: { completion in
print(completion)
}) { posts in
print(posts.count)
}.store(in: &cancellable)
// prints 100
// prints finished
The Publisher
emit a hundred posts and terminates (e.g finished
event) as mentioned in Apple’s documentation a publisher continues to emit elements until it completes normally or fails.
Using the Sink
class from the Subscribers
namespace enables us to create and store the subscriber.
let subscriber = Subscribers.Sink<[Post], Swift.Error>(receiveCompletion: { completion in
print(completion)
}) { posts in
print(posts.count)
}
The subscriber needs to be attached to its publisher using the receive
method.
posts.receive(subscriber: subscriber)
// prints 100
// prints finished
subscriber.store(in: &cancellable)
The sink
method returns AnyCancellable
, a type-erased object, enabling the framework to automatically cancel a subscription at deinit.
Storing the cancellable object in a Set
using the store
method enables Combine to effectively manage the memory while avoiding retain cycles.
— RxSwift
RxSwift is designed around the ObservableType
and the ObserverType
protocols.
Just like in Combine, Observable
emits events over time and Observer
listen to them.
The framework, precisely its RxCocoa
component, provides a way to use URLSession
in a reactive way through its rx
extension.
URLSession.shared.rx.response(request: request)
The following fetch()
method will return an Observable
of the specified type — e.g Array<Post>
—
final class Webservice {
let disposeBag = DisposeBag()
private enum Error: Swift.Error {
case invalidResponse(URLResponse?)
case invalidJSON(Swift.Error)
}
func fetch() -> Observable<[Post]> {
let url = URL(
string: "https://jsonplaceholder.typicode.com/posts"
)!
let request = URLRequest(url: url)
return URLSession.shared.rx.response(request: request)
.map { result -> Data in
guard result.response.statusCode == 200 else {
throw Error.invalidResponse(result.response)
}
return result.data
}.map { data in
do {
let posts = try JSONDecoder().decode(
[Post].self, from: data
)
return posts
} catch let error {
throw Error.invalidJSON(error)
}
}
.observeOn(MainScheduler.instance)
.asObservable()
}
}
The RxSwift equivalent to Combine’s sink
is the subscribe
function which subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence. .
A way of notifying that a subscription has been made to some observable sequence.
let webService = Webservice()
webService.fetch().subscribe(onNext: { posts in
print(posts.count)
}, onError: { error in
print(error)
}).disposed(by: disposeBag)
The subscribe
method returns a Disposable
type that enables the framework to dispose (cancel) a subscription at deinit
.
The DisposeBag
class holds a reference to each suscription and release them when necessary.
RxSwift provides another way to create and return an Observable
using the static func create
from the ObservableType
protocol extension.
func fetch() -> Observable<[Post]> {
return Observable.create { observer in
let url = URL(
string: "https://jsonplaceholder.typicode.com/posts"
)!
let session = URLSession.shared
let dataTask = session.dataTask(with: url) {
(data, response, error) in
guard
let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 {
observer.onError(
Error.invalidResponse(response)
)
return
}
guard let data = date else {
observer.onError(Error.invalidData)
return
}
do {
let posts = try JSONDecoder().decode(
[Post].self, from: data
)
observer.onNext(posts)
observer.onCompleted()
} catch let error {
observer.onError(error)
return
}
}
dataTask.resume()
return Disposables.create {
dataTask.cancel()
}
}
}
Inside the block, the observer function argument is used to emit events such as onError
or onNext
or onCompleted
.
A disposable is returned so the reference to the subscription can be properly released from memory at deinit.
Although RxSwift and Combine implementations using Publisher
and Observable
are well-suited when it comes to processing a stream of values, both frameworks provide with variations of these protocols particularly adapted to deal with networking requests.
Meet Future and Single.