Articles, podcasts and news about Swift development, by John Sundell.

Calling async functions within a Combine pipeline

Published on 25 Jun 2021
Discover page available: Combine

When starting to adopt Swift’s new async/await feature, we’re likely going to have to find ways to connect and bridge that pattern to any existing asynchronous code that we’ve already written within a given project.

Earlier this week, we took a look at how that can be done when it comes to completion handler-based APIs, and how single-output Combine publishers can be made async/await-compatible. In this article, let’s continue that journey by exploring how we can make it possible to call async-marked functions within a Combine pipeline.

Future-proofing

Let’s start by extending Combine’s Publisher protocol with a custom operator called asyncMap. Internally, we’ll then perform a flatMap using Combine’s built-in Future type, and we’ll then use a Task to trigger our asynchronous transform — like this:

extension Publisher {
    func asyncMap<T>(
        _ transform: @escaping (Output) async -> T
    ) -> Publishers.FlatMap<Future<T, Never>, Self> {
        flatMap { value in
            Future { promise in
                Task {
                    let output = await transform(value)
                    promise(.success(output))
                }
            }
        }
    }
}

It could be argued that the above operator should instead be an overload of the standard flatMap operator (since that’s the type operation that we’re actually performing under the hood). Naming is hard, but in this case, I think using a new name makes things more clear (given that we’re not actually returning a new publisher within our transformation closure), and also mimics the design of other map variants, such as tryMap.

With the above in place, we’ll now be able to freely call any non-throwing async function within a Combine pipeline. For example, here we’re mixing a Combine-powered call to a ColorProfile API with a series of async function calls:

struct PhotoRenderer {
    var colorProfile: ColorProfile
    var effect: PhotoEffect

    func render(_ photo: Photo) -> AnyPublisher<UIImage, Never> {
        colorProfile
            .apply(to: photo)
            .asyncMap { photo in
                let photo = await effect.apply(to: photo)
return await uiImage(from: photo)
            }
            .eraseToAnyPublisher()
    }

    private func uiImage(from photo: Photo) async -> UIImage {
        ...
    }
}

What’s really nice about the above new capability is that it enables us to more seamlessly migrate parts of a code base to async/await, while still keeping all of our existing Combine-based APIs completely intact.

Throwing asynchronous functions

Next, let’s also enable throwing async functions to be called within our Combine pipelines. That requires a second asyncMap overload that accepts a closure that throws, and that automatically forwards any error that was thrown to our wrapping Future:

extension Publisher {
    func asyncMap<T>(
        _ transform: @escaping (Output) async throws -> T
    ) -> Publishers.FlatMap<Future<T, Error>, Self> {
        flatMap { value in
            Future { promise in
                Task {
                    do {
    let output = try await transform(value)
    promise(.success(output))
} catch {
    promise(.failure(error))
}
                }
            }
        }
    }
}

Using the above new overload, we’ll now be able to easily insert any throwing async function calls into our Combine pipelines as well. For example, the following LoginStateController uses a Combine-powered Keychain API, which is then connected to an async/await-powered UserLoader to produce our final pipeline:

class LoginStateController {
    private let keychain: Keychain
    private let userLoader: UserLoader
    ...

    func loadLoggedInUser() -> AnyPublisher<User, Error> {
        keychain
            .loadLoggedInUserID()
            .asyncMap { [userLoader] userID in
                try await userLoader.loadUser(withID: userID)
            }
            .eraseToAnyPublisher()
    }
}

Note how we’re explicitly capturing our userLoader within the above asyncMap closure, to avoid having to capture a self reference. To learn more about that technique, check out “Swift’s closure capturing mechanics”.

Although we’ve now covered the vast majority of use cases when it comes to Combine and async/await interoperability, there’s one more type of situation that might be good to handle — and that’s to enable a throwing async function to be called within a non-throwing Combine pipeline.

Thankfully, this is something that the built-in flatMap operator is already handling (through a dedicated overload), so all that we have to do is to create a third overload of asyncMap that has an output type that matches that specific version of flatMap:

extension Publisher {
    func asyncMap<T>(
        _ transform: @escaping (Output) async throws -> T
    ) -> Publishers.FlatMap<Future<T, Error>,
                            Publishers.SetFailureType<Self, Error>> {
        flatMap { value in
            Future { promise in
                Task {
                    do {
                        let output = try await transform(value)
                        promise(.success(output))
                    } catch {
                        promise(.failure(error))
                    }
                }
            }
        }
    }
}

With the above in place, we’ll now be able to extend any non-throwing Combine pipeline with a throwing asyncMap call. For example, here we’re connecting the non-throwing PhotoRenderer API that we took a look at earlier to a throwing call to one of the new, async/await-powered URLSession APIs for performing uploads:

struct PhotoUploader {
    var renderer: PhotoRenderer
    var urlSession = URLSession.shared

    func upload(_ photo: Photo,
                to url: URL) -> AnyPublisher<URLResponse, Error> {
        renderer
            .render(photo)
            .asyncMap { image in
                guard let data = image.pngData() else {
                    throw PhotoUploadingError.invalidImage(image)
                }

                var request = URLRequest(url: url)
                request.httpMethod = "POST"

                let (_, response) = try await urlSession.upload(
    for: request,
    from: data
)

                return response
            }
            .eraseToAnyPublisher()
    }
}

With those three asyncMap overloads in place, we’ll now be able to insert any kind of async-marked call into any Combine pipeline. Pretty great!

Conclusion

Although I still think that Combine will remain an incredibly useful tool when it comes to reactive programming, many asynchronous APIs will likely move to async/await over time. Thankfully, that doesn’t mean that we have to rewrite any existing Combine-based code that our projects already use, since it’s definitely possible to bridge the gap between those two worlds.

I’ll continue exploring how async/await, actors, and the rest of Swift’s new concurrency system can be integrated with Combine in various ways, and will keep sharing my findings with you. I hope you enjoyed this article, and if you have any questions, comments or feedback, then feel free to reach out via either Twitter or email.

Thanks for reading!