MANUEL VIVO: Suspend
functions in Coroutines are meant for one-shot calls
that could return a result. But how could you
create a stream of data or return multiple
results over time? With Flow. Yes, you nailed it. This video builds up on "The
ABC of Coroutines" episode. If you're a beginner,
I would recommend watching that one
first, as today we are going with the Flow. We'll cover the differences
between suspend functions and Flow, the different entities
involved in streams of data, and all the async
possibilities that unfold when you use this powerful API. If you learn something
new, like the video and subscribe to the channel,
but only if you think we've earned it. [MUSIC PLAYING] In Coroutines, a
Flow is a type that can emit multiple
values sequentially, as opposed to suspend
functions that return only a single value. For example, you can use a
Flow to receive live updates from a database. Here, we have the suspend
function loadData that returns an object of type Data. In the UI layer, we can call
that function from a Coroutine, for example, created
using a UI scope. When loadData returns, we
update the UI with its result. This is a one-shot call. You call a suspend function,
you get the result back. That's it. But a Flow is different. Flow is built on
top of Coroutines. And since a Flow can emit
multiple values sequentially, it is conceptually a
stream of data whose values can be computed asynchronously. Here, we have a function that
returns a Flow of type Data, meaning that this Flow is
able to emit data objects. Notice that this function is
no longer a suspend function. To trigger the Flow,
we call collect, which is suspend
function and therefore needs to be called
from a Coroutine. In the lambda, we specify
what to do when we receive an element from the Flow. When a new element is
emitted to the Flow, updateUi will be called
with the new value. New values will be
processed until there are no more elements to emit to
the Flow, or the UI goes away. What we've seen is
a stream of data. There are three
entities involved here. The producer produces data
that is added to the stream. Thanks to Coroutines,
Flows can also produce data asynchronously. The consumer consumes the
values from the stream. In the previous example,
updateUi was the consumer. But there can also
be intermediaries that can modify
each value emitted into the stream or
the stream itself. For example, this
intermediary changes the type of the emitted elements. The consumer, in this case,
will receive a different type. In Android, a data
source or repository is typically a
producer of UI data that has the view model,
or view, as the consumer. Other times, the view layer is
a producer of user input events and other layers of the
hierarchy consume them. Layers in between the
producer and consumer usually act as intermediaries
that modify the stream of data to adjust it to the requirements
of the following layer. Now let's start
with the producer and see how to create a Flow. The builder function Flow
creates a new Flow where you can manually emit new
values into a stream of data using the Emit function. In the following
example, we'll see an app that fetches the
latest news periodically. Here, NewsRemoteDataSource
has their property latestNews that returns a flow of
Lists of ArticleHeadline. As a single suspend function
cannot return multiple consecutive values, the
data source needs to create and return a Flow in order
to fulfill the requirement of having to notify all the
latest news every so often. In this case, the data
source acts as the producer. NewsRemoteDataSource
takes NewsApi as a dependency, which
is the class that ultimately makes the
network request exposing a suspend function. Cool. So what's the implementation
of the latestNews property? We'll use the builder function
Flow to create a new Flow. As the block of code
will be executed in the context of a Coroutine,
it can call suspend functions. To repeatedly make
network requests, we create a while loop. Inside it, we call the API
to get the latest news. And with its result, we
call emit to add that object to the Flow. Flows are sequential. And as the producer
is in a Coroutine, when calling a suspend
function, the producer suspends until the
suspend function returns. In this case, the Coroutine
suspends until fetchLatestNews returns the response. Only then emit is called. To make requests on
a fixed interval, we can call delay with a refresh
interval passed as a parameter. Delay is a suspend
function that suspends the Coroutine for some time. After that time, the
Coroutine will resume, and another iteration in
the while loop will happen. Something to keep in mind is
that with a Flow Coroutine builder the producer cannot
emit values from a different Coroutine context. Therefore, don't call
emit in your Coroutines or in with context
blocks of code. You can use other Flow
builders, such as Callback Flow that we'll cover
letter, for these cases. Intermediaries can use
intermediate operators to modify the stream of data
without consuming the values. These operators are
functions that, when applied to a stream
of data, set up a change of
operations that aren't executed until the values
are consumed in the future. Continuing our example, we
have this NewsRepository. In its constructor, it takes
the NewsRemoteDataSource class that we've seen
before, as well as UserData to know
more information about the logged in user. It exposes this
favouriteLatestNews latest news property of type Flow of
a List of ArticleHeadline. With this, we want to expose
just the news articles that are relevant to the user. This function accesses
the latestNews property from newsRemoteDataSource. And then it applies
the map operator to modify the stream of data. We use filter on
the list that has been emitted to choose those
articles whose topic the user is interested in. The transformation
happens on the Flow. And the consumer will
see the filtered list instead of the original one. Also, we use onEach
as a side effect to save the favorite
user news in the cache. Intermediate operators can be
applied one after the other, forming a chain
of operations that are executed lazily when an
item is emitted into the Flow. Note that simply applying
an intermediate operator to a stream does not
start the Flow collection. To trigger the Flow and
start listening for values, use a terminal operator. With collect, you get all
values at the time they are emitted into the stream. Now, in our view model-- in this case, the
LatestNewsViewModel-- we want to consume that Flow
to get notified of the news and update the UI accordingly. In there, we can call collect
that triggers the Flow and starts using it for values. The lambda will be executed
on every new value received. But, as we said, collect
is a suspend function. Therefore, it needs to be
executed within a Coroutine that we can create with the
built-in ViewModelScope. So what's really happening here? When the ViewModel
is created, we create a new Coroutine
to collect the results from favouriteLatestNews. This triggers the Flow
in the data source layer, which will start
fetching the latest news from the network. All emissions are modified
by the map operator in the repository layer to grab
the user's favorite topics. After that, the repository will
save that info in the cache. And the view model will get the
latest filtered information. As the producer remains always
active with the while loop, the stream of data
will be closed when the ViewModel is cleared
and ViewModelScope is canceled. There are two ways Flow
collection can stop. One way is when the producer
finishes emitting items. The stream of data is closed. And the Coroutine that called
collect will resume executing. Or alternatively, the
Coroutine that collects is canceled, as in our example. This will also stop the
underlying producer. Flows are cold and
lazy unless specified with other
intermediate operators. This means that
the producer code will be executed
each time a terminal operator is called on the Flow. In the example, multiple
collectors of the Flow makes the data source to fetch
the latest news multiple times on different fixed intervals. See the shareIn operator
to optimize and share the Flow when multiple consumers
collect at the same time. The implementation
of the producer can come from a
third-party library. And as such, it could throw
unexpected exceptions. To handle these
exceptions, use the catch intermediate operator. Again, in the ViewModel layer,
to catch unexpected exceptions, we can use the catch operator
to handle them and show the right message to the user. As catch is an
intermediate operator, it needs to be called
before collect. But catch can also
emit items to the Flow. If we wanted to handle
those unexpected exceptions in a repository layer, we
can use the catch operator and emit to the Flow using the
emit function with the latest cached news. If we talk about
Android specifically, Flow is integrated in
many Jetpack libraries. And it's popular among Android
third-party libraries as well. Flow is a great fit
for data updates. For example, you can
use Flow with Room to be notified of
changes in your database. As shown in the code
snippet, in the Dao, return a Flow type
to get live updates. Every time there is a
change in the Example table, a new List is emitted with
the items in the database. At this point, you pretty
much know everything you need about Flows. However, there is
another Flow builder I want to talk about, as
it is used quite often. And that is callbackFlow
that lets you convert callback-based APIs into flows. As an example, the Firebase
Firestore Android APIs use callbacks. Let's see how to convert those
callbacks to flow and listen for Firestore database updates. Here, we have a
FirestoreUserEventsDataSource, whose getUserEvents method
returns a Flow of UserEvents. As you can see, we take an
instance of FirebaseFirestore as a dependency. To create the flow, we
use the callbackFlow API. As with the Flow
builder API, here, we are in the context
of a Coroutine. But, unlike the Flow
builder, channelFlow allows values to be emitted from
a different Coroutine context or outside a Coroutine with
the offer method, as we'll see. The first thing to do is
initialize in Firebase and getting the
eventsCollection from it. Therefore, we write
something like this code. However, this could
potentially throw an exception if it fails getting
the eventsCollection. If that happens, or Firebase
cannot be initialized, we need to close the Flow. If Firebase can be
initialized and we're able to get the
eventsCollection, we need to add a callback
using addSnapshotListener. The callback lambda
will be executed every time there is a
change to eventsCollection. In there, we check if the
snapshot of the collection that we receive is new or not. And if not, we call offer
to emit an item to the flow. Offer is not a suspend function. That's why we can call it
from inside a callback. So far, we initialized Firestore
and added the subscription. What's next? Now we want to keep
the communication open with the consumer of the
flow so that it can receive all events sent by Firestore. For that, we use the
awaitClose method, which will wait until the
flow is closed or canceled. When that happens, the callback
inside awaitClose gets called. In our case, we remove the
subscription from Firebase. Yay, you made it to the end. Hope you learned a
lot in this video. Namely, what
problems Flow solve, how you can create
Flows and observe them, and how powerful
they can be with the intermediate operators. You can learn more about Flow
in our developer.android.com documentation. Thanks for watching. And go write better
Android apps with Kotlin. [MUSIC PLAYING]