Articles, podcasts and news about Swift development, by John Sundell.

Async sequences, streams, and Combine

Published on 26 Aug 2021
Discover page available: Concurrency

When iterating over any Swift collection using a standard for loop, there are two key components that decide what elements that will be passed into our iteration code — a sequence, and an iterator. For example, Swift’s standard Array type is a sequence, and uses IndexingIterator as its iterator type.

While we very often interact directly with sequences when writing our Swift code, we rarely have to deal with iterators ourselves, as the language will automatically manage those instances for us whenever we’re using a for loop.

The fact that iterations are explicitly controlled by dedicated types is really powerful, though, since that enables us to write our own, completely custom sequences that can be iterated over the exact same way as when looping through built-in types like Array, Dictionary, and Set.

In this article, let’s take a look at how this system works, and how it extends into the world of Swift concurrency — enabling us to create completely asynchronous sequences and streams that deliver their values over time.

Sequences and iterators

Let’s say that we wanted to build a custom sequence that lets us download data from a series of URLs. To make that happen, we’ll first need to implement a custom type that conforms to the Sequence protocol, which in turn creates an iterator using the required makeIterator method — like this:

struct RemoteDataSequence: Sequence {
    var urls: [URL]

    func makeIterator() -> RemoteDataIterator {
        RemoteDataIterator(urls: urls)
    }
}

Next, let’s implement the above RemoteDataIterator type, which performs its iterations by incrementing an index property every time that a new element was requested by the system:

struct RemoteDataIterator: IteratorProtocol {
    var urls: [URL]
    fileprivate var index = 0

    mutating func next() -> Data? {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        // If a download fails, we simply move on to
        // the next URL in this case:
        guard let data = try? Data(contentsOf: url) else {
            return next()
        }

        return data
    }
}

We don’t need to worry about managing multiple, concurrent iterations, since the system will automatically create a fresh new iterator (using our sequence’s makeIterator method) every time a new for loop is started.

With the above in place, we’ll now be able to iterate over all of our downloaded data using the exact same syntax as we’d use when iterating over an array:

for data in RemoteDataSequence(urls: urls) {
    ...
}

Really cool! However, downloading data synchronously like that probably isn’t a very good idea (except when writing things like scripts or command line tools), as doing so will completely block the current thread until all downloads have completed. So let’s explore how we could turn the above into an asynchronous sequence instead.

Asynchronous iterations

Swift 5.5’s new concurrency system introduces the concept of asynchronous sequences and iterators, which are defined in almost the exact same way as their synchronous counterparts. So, to make our RemoteDataSequence asynchronous, all that we have to do is to make it conform to the AsyncSequence protocol and implement the makeAsyncIterator method — like this:

struct RemoteDataSequence: AsyncSequence {
    typealias Element = Data

    var urls: [URL]

    func makeAsyncIterator() -> RemoteDataIterator {
        RemoteDataIterator(urls: urls)
    }
}

Note that the above typealias shouldn’t really be needed, since the compiler should be able to infer our sequence’s Element type, but that doesn’t seem to be the case as of Xcode 13.0.

Next, let’s give our RemoteDataIterator an async makeover as well — which involves adding the async and throws keywords to its next method (since we want our iteration to be able to yield errors when a data download failed) — and we can then use the built-in URLSession networking API to download our data completely asynchronously:

struct RemoteDataIterator: AsyncIteratorProtocol {
    var urls: [URL]
    fileprivate var urlSession = URLSession.shared
    fileprivate var index = 0

    mutating func next() async throws -> Data? {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        let (data, _) = try await urlSession.data(from: url)
        return data
    }
}

To learn more about the new, async/await-powered URLSession APIs, check out this article over on WWDC by Sundell & Friends.

With the above changes in place, our RemoteDataSequence has now been turned into a fully asynchronous sequence, which requires us to use await (and, in this case, try) when iterating over its elements — since our data will now be downloaded in the background, and will be delivered into our for loop when ready:

for try await data in RemoteDataSequence(urls: urls) {
    ...
}

The fact that async iterators can throw errors is really powerful, as that lets us automatically exit out of a for loop if an error was encountered, rather than requiring us to keep track of such errors manually. Of course, that doesn’t mean that all async sequences need to be capable of throwing. If we simply omit the throws keyword when declaring our iterator’s next method, then our sequence will be considered non-throwing (and we no longer need to use try when iterating over its elements).

Asynchronous streams

While being able to define completely custom, asynchronous sequences is incredibly powerful, the standard library also ships with two stand-alone types that enable us to create such sequences without having to define any types of our own. Those types are AsyncStream and AsyncThrowingStream, with the former letting us create non-throwing async sequences, while the latter gives us the option to throw errors.

Going back to the example of downloading data from a series of URLs, let’s take a look at how we could implement that same functionality using an AsyncThrowingStream, rather than declaring custom types. Doing so would involve kicking off an async Task, within which we yield all of the data that was downloaded, and we then finish by reporting any error that was encountered — like this:

func remoteDataStream(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
    AsyncThrowingStream { continuation in
        Task {
            do {
                for url in urls {
                    let (data, _) = try await urlSession.data(from: url)
                    continuation.yield(data)
                }

                continuation.finish(throwing: nil)
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
}

While the above is a perfectly fine implementation, it can actually be simplified quite a bit using another AsyncThrowingStream initializer — which gives us a closure that’s already marked as async, within which we can focus on returning the next element in our stream:

func remoteDataStream(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
    var index = 0

    return AsyncThrowingStream {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        let (data, _) = try await urlSession.data(from: url)
        return data
    }
}

Above we’re capturing the local index variable within our stream’s closure, enabling us to use it to keep track of the state of our iteration. To learn more about that technique, check out “Swift’s closure capturing mechanics”.

With either of the above two implementations in place, we can now iterate over our new async stream just like how we previously looped over our custom AsyncSequence:

for try await data in remoteDataStream(forURLs: urls) {
    ...
}

So AsyncStream and AsyncThrowingStream can be seen as concrete implementations of the AsyncSequence protocol, just like how Array is a concrete implementation of the synchronous Sequence protocol. In most situations, using a stream will probably be the most straightforward implementation, but if we want to gain complete control over a given iteration, then building a custom AsyncSequence will probably be the way to go.

How does all of this relate to Combine?

Now, if you’ve worked with Apple’s Combine framework, then you might be asking yourself how this new suite of async sequence APIs relates to that framework, given that they both enable us to emit and observe values over time.

While I already discussed this to some extent in my WWDC article “What Swift’s new concurrency features might mean for the future of Combine”, the way I look at it is that Combine is a fully-featured reactive programming framework, while this new async sequence system offers more low-level APIs for constructing any kind of async sequence — either with or without embracing a reactive programming style.

The good news, though, is that Combine has now been made fully AsyncSequence compatible, which enables us to turn any Publisher into such an async sequence of values. For example, here’s a Combine-powered version of our data downloading functionality from before:

func remoteDataPublisher(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
    urls.publisher
        .setFailureType(to: URLError.self)
        .flatMap(maxPublishers: .max(1)) {
            urlSession.dataTaskPublisher(for: $0)
        }
        .map(\.data)
        .eraseToAnyPublisher()
}

To then convert the AnyPublisher that the above function returns into an AsyncSequence, all that we have to do is to access its values property — and the system will take care of the rest:

let publisher = remoteDataPublisher(forURLs: urls)

for try await data in publisher.values {
    ...
}

Very neat! The above API should prove to be incredibly useful within code bases that have existing Combine code, since it lets us keep using that code (without modifications!) even when adopting Swift’s new concurrency system.

To be able to go the other way around, and use async-marked APIs within Combine-based code, check out “Calling async functions within a Combine pipeline”.

Conclusion

I hope that this article has given you a few new insights into how Swift’s new AsyncSequence and AsyncStream APIs work, how they can be used to implement various kinds of asynchronous sequences, and how those new APIs relate to Combine.

If you have any questions, comments, or feedback, then feel free to reach out via email.

Thanks for reading!