Combine - share() and multicast()

Combine is designed around structs — which are value types — ensuring that a copy is made by the system whenever a resource is stored in a property (or passed around in functions) so it can deliver values without side-effects.

extension Publishers {
    /// A publisher that publishes a given sequence of elements.
    ///
    /// When the publisher exhausts the elements in the sequence, the next request causes the publisher to finish.
    public struct Sequence<Elements, Failure> : Publisher where Elements : Sequence, Failure : Error {
        {...}
    }
}

When creating multiple subscribers, a copy of the publisher will be created and the values will start flowing for each one of them.

let numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    .publisher
    .print()
        
numbers
    .sink(receiveValue: { _ in })
    .store(in: &cancellables)
    
numbers
    .sink(receiveValue: { _ in })
    .store(in: &cancellables)

Let’s check the console output to see both distinct subscriptions.

// First subscription
receive subscription: ([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
request unlimited
receive value: (1)
{..}
receive value: (10)
receive finished

// Second subscription
receive subscription: ([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
request unlimited
receive value: (1)
{..}
receive value: (10)
receive finished

The weakness of such a copying process is in the resource-intensive operations — e.g network requests — that may lead to poor performance since outcomes will be duplicated rather than shared.

Combine provides with the share() operator that enables us to obtain a publisher by reference rather than by value.

share() returns a Publishers.Share<Self> type which is a class.

/// A publisher implemented as a class, which otherwise behaves like its upstream publisher.
final public class Share<Upstream> : Publisher, Equatable where Upstream : Publisher

Implementing a network request using the share() operator will result in only one subscription being made to the upstream publisher since a unique resource will be shared to the downstream subscribers.

let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!
        
let posts = URLSession.shared.dataTaskPublisher(for: url)
    .map { $0.data }
    .decode(type: [Post].self, decoder: JSONDecoder())
    .replaceError(with: [])
    .print()
    .share()            
        
posts
    .sink(receiveValue: { 
        print("subscription1 value: \($0)") })
    .store(in: &cancellables)
    
posts
    .sink(receiveValue: { 
        print("subscription2 value: \($0)") })
    .store(in: &cancellables)

Let’s check the console output to see the unique subscription.

receive subscription: (ReplaceError)
request unlimited
receive value: {..}

subscription1 value: {..}
subscription2 value: {..}

receive finished

The process is straightforward —

  1. The first subscriber subscribes to the posts publisher triggering the receive subscription event.
  2. Through the sink() method, an unlimited amount of values is requested.
  3. The value starts flowing.
  4. First subscriber receives the value from the resource.
  5. Second subscriber receives the value from the same resource.
  6. The publisher’s job is done.. a completion event is sent.

Try removing the share() operator and you will notice the exact same events occurring twice (since it is copied).

The share() operator does not involve any buffering system which means that if the second subscription occurs after the request has completed, it will only receive the completion event.

Unfortunately, Combine doesn’t provide with a shareReplay() operator such as in its RxSwift counterpart however we could use the multicast() operator to address this issue.

The multicast() operator uses a ConnectablePublisher type which provides a connect() method to trigger the publisher once all of the subscribers are setup.

A subject must be provided to deliver elements to the multiple subscribers.

let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!
let postsSubject = PassThroughSubject<[Post], Never>()
        
let posts = URLSession.shared.dataTaskPublisher(for: url)
            .map { $0.data }
            .decode(type: [Post].self, decoder: JSONDecoder())
            .replaceError(with: [])
            .print()
            .multicast(subject: postsSubject)
        
posts
    .sink(receiveValue: { 
        print("subscription1 value: \($0)") })
    .store(in: &cancellables)
        
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    posts
        .sink(receiveValue: { print("subscription2 value: \($0)") })
        .store(in: &self.cancellables)
}

At this point, nothing happens since we didn’t connect our subscribers to the upstream publisher.

Let’s fix it by adding the following code right after the second subscription in the DispatchQueue block.

posts
    .connect()
    .store(in: &self.cancellables)

Let’s check the console output to see both subscriptions receiving the requested value.

receive subscription: (ReplaceError)
request unlimited
receive value: {..}

subscription1 value: {..}
subscription2 value: {..}

receive finished

The multicast() operator is useful when sharing a single resource with multiple subscribers while keeping the actual network request implementation private.

There is a nice alternative to create a connectable wrapper around a publisher using the makeConnectable() method which will turn instantly your publisher into a ConnectablePublisher without having to deal with a subject.

Remove the postsSubject property and replace the multicast(subject:) operator by makeConnectable().

It works just as fine as in the previous implementation.

Conclusion

Combine enables us to manage efficiently our resources through operators such as share() and multicast() along with its connectable publishers giving us the power to improve the app performance when necessary.