MANUEL VIVO: Suspend functions in Coroutines are meant for one-shot calls that could return a result. But how could you create a stream of data or return multiple results over time? With Flow. Yes, you nailed it. This video builds up on "The ABC of Coroutines" episode. If you're a beginner, I would recommend watching that one first, as today we are going with the Flow. We'll cover the differences between suspend functions and Flow, the different entities involved in streams of data, and all the async possibilities that unfold when you use this powerful API. If you learn something new, like the video and subscribe to the channel, but only if you think we've earned it. [MUSIC PLAYING] In Coroutines, a Flow is a type that can emit multiple values sequentially, as opposed to suspend functions that return only a single value. For example, you can use a Flow to receive live updates from a database. Here, we have the suspend function loadData that returns an object of type Data. In the UI layer, we can call that function from a Coroutine, for example, created using a UI scope. When loadData returns, we update the UI with its result. This is a one-shot call. You call a suspend function, you get the result back. That's it. But a Flow is different. Flow is built on top of Coroutines. And since a Flow can emit multiple values sequentially, it is conceptually a stream of data whose values can be computed asynchronously. Here, we have a function that returns a Flow of type Data, meaning that this Flow is able to emit data objects. Notice that this function is no longer a suspend function. To trigger the Flow, we call collect, which is suspend function and therefore needs to be called from a Coroutine. In the lambda, we specify what to do when we receive an element from the Flow. When a new element is emitted to the Flow, updateUi will be called with the new value. New values will be processed until there are no more elements to emit to the Flow, or the UI goes away. What we've seen is a stream of data. There are three entities involved here. The producer produces data that is added to the stream. Thanks to Coroutines, Flows can also produce data asynchronously. The consumer consumes the values from the stream. In the previous example, updateUi was the consumer. But there can also be intermediaries that can modify each value emitted into the stream or the stream itself. For example, this intermediary changes the type of the emitted elements. The consumer, in this case, will receive a different type. In Android, a data source or repository is typically a producer of UI data that has the view model, or view, as the consumer. Other times, the view layer is a producer of user input events and other layers of the hierarchy consume them. Layers in between the producer and consumer usually act as intermediaries that modify the stream of data to adjust it to the requirements of the following layer. Now let's start with the producer and see how to create a Flow. The builder function Flow creates a new Flow where you can manually emit new values into a stream of data using the Emit function. In the following example, we'll see an app that fetches the latest news periodically. Here, NewsRemoteDataSource has their property latestNews that returns a flow of Lists of ArticleHeadline. As a single suspend function cannot return multiple consecutive values, the data source needs to create and return a Flow in order to fulfill the requirement of having to notify all the latest news every so often. In this case, the data source acts as the producer. NewsRemoteDataSource takes NewsApi as a dependency, which is the class that ultimately makes the network request exposing a suspend function. Cool. So what's the implementation of the latestNews property? We'll use the builder function Flow to create a new Flow. As the block of code will be executed in the context of a Coroutine, it can call suspend functions. To repeatedly make network requests, we create a while loop. Inside it, we call the API to get the latest news. And with its result, we call emit to add that object to the Flow. Flows are sequential. And as the producer is in a Coroutine, when calling a suspend function, the producer suspends until the suspend function returns. In this case, the Coroutine suspends until fetchLatestNews returns the response. Only then emit is called. To make requests on a fixed interval, we can call delay with a refresh interval passed as a parameter. Delay is a suspend function that suspends the Coroutine for some time. After that time, the Coroutine will resume, and another iteration in the while loop will happen. Something to keep in mind is that with a Flow Coroutine builder the producer cannot emit values from a different Coroutine context. Therefore, don't call emit in your Coroutines or in with context blocks of code. You can use other Flow builders, such as Callback Flow that we'll cover letter, for these cases. Intermediaries can use intermediate operators to modify the stream of data without consuming the values. These operators are functions that, when applied to a stream of data, set up a change of operations that aren't executed until the values are consumed in the future. Continuing our example, we have this NewsRepository. In its constructor, it takes the NewsRemoteDataSource class that we've seen before, as well as UserData to know more information about the logged in user. It exposes this favouriteLatestNews latest news property of type Flow of a List of ArticleHeadline. With this, we want to expose just the news articles that are relevant to the user. This function accesses the latestNews property from newsRemoteDataSource. And then it applies the map operator to modify the stream of data. We use filter on the list that has been emitted to choose those articles whose topic the user is interested in. The transformation happens on the Flow. And the consumer will see the filtered list instead of the original one. Also, we use onEach as a side effect to save the favorite user news in the cache. Intermediate operators can be applied one after the other, forming a chain of operations that are executed lazily when an item is emitted into the Flow. Note that simply applying an intermediate operator to a stream does not start the Flow collection. To trigger the Flow and start listening for values, use a terminal operator. With collect, you get all values at the time they are emitted into the stream. Now, in our view model-- in this case, the LatestNewsViewModel-- we want to consume that Flow to get notified of the news and update the UI accordingly. In there, we can call collect that triggers the Flow and starts using it for values. The lambda will be executed on every new value received. But, as we said, collect is a suspend function. Therefore, it needs to be executed within a Coroutine that we can create with the built-in ViewModelScope. So what's really happening here? When the ViewModel is created, we create a new Coroutine to collect the results from favouriteLatestNews. This triggers the Flow in the data source layer, which will start fetching the latest news from the network. All emissions are modified by the map operator in the repository layer to grab the user's favorite topics. After that, the repository will save that info in the cache. And the view model will get the latest filtered information. As the producer remains always active with the while loop, the stream of data will be closed when the ViewModel is cleared and ViewModelScope is canceled. There are two ways Flow collection can stop. One way is when the producer finishes emitting items. The stream of data is closed. And the Coroutine that called collect will resume executing. Or alternatively, the Coroutine that collects is canceled, as in our example. This will also stop the underlying producer. Flows are cold and lazy unless specified with other intermediate operators. This means that the producer code will be executed each time a terminal operator is called on the Flow. In the example, multiple collectors of the Flow makes the data source to fetch the latest news multiple times on different fixed intervals. See the shareIn operator to optimize and share the Flow when multiple consumers collect at the same time. The implementation of the producer can come from a third-party library. And as such, it could throw unexpected exceptions. To handle these exceptions, use the catch intermediate operator. Again, in the ViewModel layer, to catch unexpected exceptions, we can use the catch operator to handle them and show the right message to the user. As catch is an intermediate operator, it needs to be called before collect. But catch can also emit items to the Flow. If we wanted to handle those unexpected exceptions in a repository layer, we can use the catch operator and emit to the Flow using the emit function with the latest cached news. If we talk about Android specifically, Flow is integrated in many Jetpack libraries. And it's popular among Android third-party libraries as well. Flow is a great fit for data updates. For example, you can use Flow with Room to be notified of changes in your database. As shown in the code snippet, in the Dao, return a Flow type to get live updates. Every time there is a change in the Example table, a new List is emitted with the items in the database. At this point, you pretty much know everything you need about Flows. However, there is another Flow builder I want to talk about, as it is used quite often. And that is callbackFlow that lets you convert callback-based APIs into flows. As an example, the Firebase Firestore Android APIs use callbacks. Let's see how to convert those callbacks to flow and listen for Firestore database updates. Here, we have a FirestoreUserEventsDataSource, whose getUserEvents method returns a Flow of UserEvents. As you can see, we take an instance of FirebaseFirestore as a dependency. To create the flow, we use the callbackFlow API. As with the Flow builder API, here, we are in the context of a Coroutine. But, unlike the Flow builder, channelFlow allows values to be emitted from a different Coroutine context or outside a Coroutine with the offer method, as we'll see. The first thing to do is initialize in Firebase and getting the eventsCollection from it. Therefore, we write something like this code. However, this could potentially throw an exception if it fails getting the eventsCollection. If that happens, or Firebase cannot be initialized, we need to close the Flow. If Firebase can be initialized and we're able to get the eventsCollection, we need to add a callback using addSnapshotListener. The callback lambda will be executed every time there is a change to eventsCollection. In there, we check if the snapshot of the collection that we receive is new or not. And if not, we call offer to emit an item to the flow. Offer is not a suspend function. That's why we can call it from inside a callback. So far, we initialized Firestore and added the subscription. What's next? Now we want to keep the communication open with the consumer of the flow so that it can receive all events sent by Firestore. For that, we use the awaitClose method, which will wait until the flow is closed or canceled. When that happens, the callback inside awaitClose gets called. In our case, we remove the subscription from Firebase. Yay, you made it to the end. Hope you learned a lot in this video. Namely, what problems Flow solve, how you can create Flows and observe them, and how powerful they can be with the intermediate operators. You can learn more about Flow in our developer.android.com documentation. Thanks for watching. And go write better Android apps with Kotlin. [MUSIC PLAYING]
