KotlinConf 2019: Asynchronous Data Streams with Kotlin Flow by Roman Elizarov

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Music] welcome to Catherine concert krumping Hagen I really let you see all of you I'm a little bit worried always speaking at cotton count since that's our premier conference gathering all the cotton and through the S here I'm really getting yo come from different parts of the world to talk about Kotlin this two-day packet program so so I'm talking about cart yesterday so who you already using cotton curtains in production please raise your hand oh great great good crowd oh you know like about 1/2 so for other half who don't let's just quickly recap on what curtains are and why we brought them to call them what they do for us the curtains save us from Kaaba hell so when we program a synchronous code we used to have to wrap all our logic into callbacks colors callbacks but with coroutines we don't have to do this anymore we opes oops oh my god what happens with my clicker oh my god this is ice whoa hold on hold on hold on we'll fix it here it is yeah works fine no that was too fast for you to notice done don't give me you didn't show anything just pretend pretend did not happen so and with curtains we program in direct style you see we just program the logic the way we regularly write it and whisker teens you know we still have a synchrony our code can suspend wait for something network response some event but the logic itself is laid out in a sequential way so it's asynchronous gets sequential you know it's step by step we do one thing then we move to another thing then we move to another so the logic is easy to reason about in our code because it's it's my clicker doesn't seem to work normally which is fine I should have taken they could get over it to me so it's sequential and you see with proteins we have these suspending functions and we can use them to say that our function which are a response now so we will wait for server and over time we will get a response or you know instead of one response we have many responses we can return a list of things do you hear me okay good well we can return their list of responses from a suspending function which is the way to work as many responses least you know collections of stuff stuff like that so let's take a close look of how this works for example one implementation of the list return function can use a hypothetical function like build list we don't have this in code instead of library yet but will hopefully bring it soon to standard library and pathetically it could be doing some computation it can be computing like three things and then returning a list of results that's something that could happen in your application and from the main function how do you use it you know you call foo you get at least then you iterate over a list and for example you prune trees also process them into something else with it so let's take a close look at how it really really works I tried to use my clicker again hopefully it works this time no it doesn't well it works so I just need I just need to say closer probably lots of insurance here so how it works I call foo you know execution goes to foo foo computes a value a and adds it to list it computes b ad said to at least compute C ad said to least they at least gets returned the an easy cue ssin goes back to main function and the main function just was was waiting all this time for the results and then it would go iterate over list it printed so easy like normal sequential execution so the problem is that is you see if computing those fails takes a lot of time we could have started processing them as soon they're available but using lists we have to wait until the whole list of elements is ready and then we start working this is kind of inefficient this adds unnecessary delay in the data processing pipelines especially if we're clearly taking reading stuff from this addiction from Network tapes time you know we could do better we can start working on stuff before as soon as it's ready but how actually did one way to do that we always had or not always but for a few years when we release curtains our channels and channels are this pipes conceptually that you send your items on website and receive on the other what we can do with channels is we can change our code to using channels instead of building lists we would produce a channel and instead of adding two lists we would send results to channel on the receiving code on the other side of the code we still call our function foo but this gives us not least but gives us a channel we would iterate as we iterated over least we will iterate over a channel and then we process the elements what difference it makes it makes it a big big difference actually it would take a look at how this code executes then we see that main function calls foo to get a channel it returns quickly giving a channel back to the color now color has a channel but foo is now working producing the data so we now have two curtains working at the same time one is going to produce the data the other is consuming the data so now when we start receiving the data and the first one and the food starts computing the data it computes the value and sends it and it gets printed as soon as it was sent now it confused the other value and sends it and it gets printed as soon as it was sent it confused the third one and censored and the third one gets printed so now we don't have to wait all three of them to be computed now the foo complete basically returns from is lambda there's no more no more values so the for loop terminates and we're done so essentially the code stayed the same it's very similar to the code we had with list but now we can't process our that as soon as that it is available which is better but there is one problem with channels channels are hot what is that why is that a problem let's do simple modification to our code let's just come and tell the piece of code that's actually receiving the data for each Channel I mean it's not it could be a mistake or you could have an exception somewhere between you calling foo or and start receiving data or maybe you checked some other condition and you returned and you don't have this code to receive from the channel anymore what's going to happen if we round this code is that as before you know we get a reference to channel we have two proteins working but the problem now is that is that producing coding tries to send the value but there is no other protein on the other and that would wait for it so this the whole thing just sits there working doing nothing and they actually the code it will run it just hangs in it it's a real problem in practice it's like channels are like you know open files open network connections they're hard if I open the file I have to read from it or I have to close it if I'm not reading from it the same if I open network connection you know I have to be communicating with the other side or because if the other side expects me to send something I have to send the corresponding well used to the other side I mean it's it's ok we do have an our software some hot entities that we have to work with but they're not that convenient toward this and rather you know if we start using them all over the place they become a source of hard to tear box like we forgot to close it forgot to receive and you know this code becomes hard to debug so we can do better and this the but it is actually the reason with that the channels hard is kind of type our type system helps you so if you look at the signature of the food that was returned in channel it it has this coding scope b-before foo so foo is extension current in scope that's our convention to say that foo is returning hot entity it starts the curtain that's working there be careful about it but the fact that we a kind of type system helping you that we have this extension to indicate that this is a hot function doesn't really help a lot you know it's still error-prone to work with hot entities and that's big reason why we created content flow so cotton flow is this new addition to crotons library that we just stabilized recently few months ago released it in a stable form and the whole reason is to avoid this problem with hot channels and bring lots of other features that we'll talk about today so let's see what follow is and how working with flow is different from both lists and channels to use flow there's not that many changes we have to do in our code first of all we change the signature function instead of return channel list we now return a flow we use a flow builder to create it not building Li it's not producing channel but defining a flow inside the flow we emit values that's the verb we use to say that values are being sent or produced in a flow and only receive inside it looks very much similar we call foo we get a flow but instead of a loop we call a special function called collect so work flow is being collected to process all the elements that the flow sense so what happens to around this code let's see how it works main calling foo and just like it was with the channels foo returns quickly and gets back a flow the difference though we don't have any routine active here so this point of time Maine has a reference to flow but it's not doing anything just yet now when we start collecting the flow then that were the code in the flow activate and start emitting values sending them to a collector now collector prints those values and returns back to fill which emits next value then it gets processed with this code prints them you know next value mated and it goes on until who says okay I'm done no no nothing more to admit then collection complete very similar I'm omitting my data receiving them as soon as they're available the big difference here is that unlike channels flows are cold it means that if I don't collect my flow I've checked some other condition and they don't need the value or something can run in my code some exceptional zone the nothing bad happens you know flow did not activate there's no leaking resources no leftover channel nothing helps my code works normally when I work with called entities like flows the other big difference is flow is actually declarative it leads us to more declarative style of programming so let's take a look that's our original code and it's not I think that runs when we call foo it's a declaration what's going to happen when foo collect we can actually factor it a bit instead of calling compute three times let's start with their flow of strings ABC bets are the string three strings that we want to compute on and then let's take those strings and then use map operator to compute the results so now see we're now using operators so instead of doing repetitive code that we had before we can define our execution pipeline using operators and we can go further because in meetings three strings is kind of a standard thing we can use floor off which is you know ready to use builder they just give us a flow of three items easy that's not much different with how collections work so if you ever worked with collections and cutland shouldn't be hard to you to get flows you see types different list their flow there four characters for these four characters for flow the same amount of typing you know the difference is that Felice though the function that returns Elise is marked with suspend why so because you know when I call foo I compute this list whatever a synchronous computation that's going on happens in that moment at the moment when I call foo so who is imperative piece of code it runs when and call it immediately but the other food that returns a flow does not do anything when I call it it just returns my flow that just a declaration it defines the flow defines the data that will be produced when I need it but it does not want that's why it doesn't have suspend maneuver it doesn't do any such as quickly returns and to run the flow I use some terminal operator I'd collect so when I call collect then when it runs and it's easy to figure out which operator on the flow is terminal and which is not because terminal operators are marked with suspend modifier running them would actually run the flow there are other terminal operators for example I can ask to get a list from the flow and it's also suspends gets middle east and then I have all the elements in my hand and can work on them the execution order between working with lists and working with flow is also different so see when I work with Liz I take a list of two items and I'd get them in memory then I send them to map operator and then I perform computation and produce result so it's step by step each step in pipeline happens one after another when I work list but when I work with flows it doesn't work this way I omit a and I act on it i compute the result i omit be inactive and compute result i amid see an activity immediately you see it works in a reactive way it's the every next stage of the flow pipeline we add on the values emitted from the previous stage it doesn't wait until the operation is over it reacts immediately so flow is reactive flow is useful reactive primitive that lets me reactively process streams of values and this point when I say the word reactive you might remember that on GBM which is the primary platform the control zone there are lots of other relative library so there two most popular of them are Errol Java which has been there for ages and kind of opened the innovation directive space on on GBM and there is a project director from portal spring team both of those libraries provide primitive to work with relative streams and they actually build on the common directive stream specification so they can interoperate with each other so I can take a stream of registering for my Java and processes report reactor and vice-versa so it's not surprising that flow also works with reactive term specification because it's also about reactive streams of data how we do it in reactive streams a reactive stream is represented by an interface called publisher so in flow we can convert publisher to flow with s flow extension or we can convert flow to publisher with s publisher extension so we can do it both ways in Europe between flows and reactive streams so now the question why do we in create it flow you see if there's so many other active streams what difference does the flow make you know I've told you how is different from channels how it's different from working with lists but why didn't we just use you know H Java project character or other implementation of relative stream specification and of course one answer would be because multi-platform you know here in cotton conference I mean you hear about non supplies from all but that wouldn't be the right answer because if we just cared about multi-platform we could have just ported Eric's Java to Kotlin reporting and calling compiled before calling GS content native and we got multi-platform implementation of reactive streams so why didn't we do it why we had to do something different so what's the real difference let's take a closer look let's take for example it was Joe what if I have a welcome and I need to perform a transformation map it to some other welcome in a job I have an apparatus for that called nap and it takes a mapper function that turns one type into another which is great but this works only with synchronous mapping functions the functions that could compute result or return however if the operational transformation I want to do is asynchronous so it has to go over network wait for result from some service or database or something else then I have to use a different function called flat map single that works with a synchronous mappings now if I want to filter my data and I have a predicative filter then I have a function called filter but it works only with singers predicates ah what if my predicate is a synchronous what if decision of take it or not depends on the call to some third party service you know who knows what to do I don't know go figure you know there are hundreds of predators or fewer which one is a synchronous a log to filter so with flow it's all different with flow if I want to transfer my data I have a single operator called map and the transformation this operator is a lambda that marked with suspend modifier so it doesn't care if the code inside is synchronous or asynchronous it works for both cases one Operator covers both cases the same for filtering you know I have filter predicates and the lumbar disc is Marcos suspend magnifier which is a unique consonant feature that's why you know you can have them this kind of convenience in every Java project character because those are Java libraries which doesn't have you know anything like suspension so they had to dublicate their interfaces for synchronous and asynchronous cases we don't have to so this reliance on suspending function and cotton which is corner of cotton curtains lets us avoid lots of operators so instead of hundreds of operators we can have just few dozens you see we don't need a special operator to start our stream with the well yeah we can just say on start in Midvale you we don't need to have a special operator to start our stream of data with elements emitted by another flow we just say on start emit all from this flow we don't need a special operator to delay the start of this stream but by certain time we just say on start delay for this time we don't need a special operator to delay each value over stream you know we just say only delay for this time we don't need a whole family of air hanging operators we don't need any project of like on our return we just say catch and emit value if you cut exception if you want to catch exception and do something else just say catch add amid all from some other flow so we don't need the whole family of generator functions if you want to produce a flow programmatically you just say flow and the curly braces write the code that means the values in the way you want them to be emitted you know it's way more composable because of reliance a suspending function we can do more with less operators we can provide just core basic operators that you can combine in any imaginable combination to produce the data processing pipeline that you need or you can define your own operators easily if you use certain combination too often in your particular code we don't have to have those hundreds and hundreds operators so you see it boils down to the fact that cotton suspending functions combined with reactive streams is is the best sink imageable on earth so it's really the power of cotton suspending function you know with and reactive streams concept it's really big love between them they they do augment each other beautifully producing simpler easy api so we talked I said about API so let's see what sits inside so I mean for now it's like magic some flow some complicated things is how magically reduces and the number of raters we need and doing those beautiful things so let's take a look under the hood the flow itself actually turns out very simple interface it's just a face with a single suspending function called collect there's nothing more you can go to source click and and I'm not joking this is exactly what flow is inside and this collect function takes another interface called flow collector and if we click on that we see flow collector again simple interface single suspending function called emit that's it that's all the fold is that there's nothing more not three not four not five interfaces just two and interfaces to functionalism nothing more it's really really simple thing but this simple thing because it's based on suspended function produced produces amazing results let's see how it works so when I collect the flow the call actually goes to the other side to the definition of the flow that for example defined like this it's flow curly braces and then meeting a value so now the control goes to this block of code in in in braces and this Walker code starts running this lambda and calls him it this immediately immediately goes to the lambda on the left-hand side you know this just direct function call from the right-hand side Tuzla on the side and you know left hand has gas control and in our case it prints the value so the well gets printed lambda returns back to a meter if emitter has no other values to meet it returns returning control to collector that's it how it works there's no magic in between there just function calls back and forth between collector and emitter so if emitter emits two values it's just one more call from emitter to collector that's it nothing more but emitter for example can delay it can be a synchronous it doesn't have to immediately meet you can wait for something to happen wait for 100 milliseconds to pass wait for some event to happen and then emit which allows us to have any synchronous imaging emitter so we can have a synchronous streams of events that you know tick every second you know which produce some events or something like that but collector can also be a synchros collector can delay it can be slow it can be saving values to database and it takes time because that's a synchronous call to database to save the value and see while collector is suspended waiting for something there's no more emissions you know you can call it because it has not returned so this way we get back pressure support automatically if our collector is slow you know it it just slows down the emitter that produces the values and there is no special magic to support it it just happens automatically because of the nature of suspending function and the fact that this is our just function calls back and forth and nothing more so this simple design not only makes it easier to work with flows because there's simply less moving parts but because the less moving person flows there actually faster simple design leads to performance you already seen these numbers on the keynote but let's look at them a little bit more detail so there is a popular benchmark called Scrabble was originally developed by giuseppem art and was later adapted by David Carnac for Java it's like data stream used also for period there's you know Maps filters reduce it takes the dictionary of it takes Shakespeare plays and figures which words would get how many points in Scrabble game and figures house which was the the best words etc cetera it's it's really it has a fair bit amount of code so and if you report it it's been ported for different data streaming frameworks and we've ported this benchmark to follows to but as a baseline to see what how fast you could do this benchmark we're just running this benchmark on content sequences constant sequences are the primitives that are in content standard library and they are not asynchronous they don't support big pressure they just simple like each variable collections there's have as little overhead as possible so this number that you can do this benchmark in nine milliseconds tells you kind of baseline that how fast you could do if you don't have any overhead for you know a synchrony at all now if we run this and with our Java to the same benchmark on same machine we see takes 23 milliseconds is that an overhead of managing you know all the synchrony in its and this reactive data flows in a typical reactive framework and if you look at project corrector it shows very similar numbers because they are both based on the same reactive stream specifications and the inherent their inherent overhead is virtually the same but if we run this benchmark on flow what we see is that it just run in 30 milliseconds so it's still slow of course some sequels there is some over here you know you can get synchrony completely for free but it's very little overhead you know it runs almost twice as fast as Java and little bit slower than sequences while giving you way more possibilities you can have a synchronous data producers you can have you know back pressure it doesn't come for free but it comes cheaply and without adding overhead or a lot of complexity to your code so follow as I repeated several times is asynchronous it's for working with asynchronous data streams but it's still sequential in nature and that's very important thing a synchrony does not equal prism or concurrency and synchrony is means that we are working with events that happen after something we have to wait for them they don't happen synchronous and immediately there is some network response we're waiting for button click we're waiting for now and sounds like that but those are sequential and when I say sequential it's actually translates to the actual running times of our four pipelines so if my flow pipeline takes 100 millisecond to emit H value and hundred milliseconds to decide that there are no more well is to be emitted and my collector also takes 100 milliseconds to process each value then my whole pipeline together with collecting and emitting would take 700 milliseconds it's all sequential its collector and emitting working in tandem completely sequentially no concurrency happening here which might be not what we want because you know it's not in some cases this is not very efficient because now I'm I could have done it slightly faster I don't have to maybe I don't want to wait you know for previous event to be processing before I start computing the next one fortunately with flows it's easy to do you know we just need to go from this single protein that's a single sequential steps to multiple curtains how will do it we have to go concurrent and going concurrent with flow with flow is doesn't mean we have to write some complicated code going concurrent with a flow is easy we just use special concurrent operators like buffer buffer is already to use operator and flow that you just insert between emitter of your data and collector and what it does is that with buffer while collector as soon as collector has computed the value and gave it to emitter producer will give it to collector it can immediately start producing the next one and collector works on previous one in parallel so you get collector and emitter working in parallel with buffer now at the same time so the whole duration of the whole data process employed by now shortens to 400 milliseconds just by adding buffer operator behind the scenes what happens is that collection hour runs in a separate car routine from the emitter so you have it now a separate code in for a meteor separate curtain for collector and there is actually a channel behind these things that's sending data from emitting couraging to collecting one but you then don't have to write any of this code you don't have to set up this channel you don't have to make sure it's closed properly you don't have to do any of that you just insert or operator you declare your intent you say I want emissions be buffered between this emitter and this collector and the soul you don't have to actually write all this code to establish a separate curtains channel between them etc etc so it's declarative and safe there's no risk you'll forget to clean something up that's all good but in many environments the special one will program your applications it's important to understand where this equation happens you see modern UI apps are PQ in this there is a dedicated UI thread main thread and there is lots of things like updating UI that we are only allowed to do on this dedicated main thread it's not that critical in vacant application even though in back-end application it also happens in vacant application we can have some dedicated data handling threads that we should not be blocking etc etc it just happens to a lesser extent in back-end than in front end so it's so still important understandable so when I work with this where all the execution happening what's the context of execution so let's take a look how it relates to flow so here is my data producer definition of my flow that computes some values and here is collector that takes this flow calls collect so the question we may ask is word as this compute execute what's execution context of this computation this is some background thread or something else and there's this simple it's always executing on the same context we're a cold collector because remember just a simple function call you know collector meter they work in tandem it just functions calling back and forth nothing strange going on here so the context is always the same but what if I need to adjust this context what if my collector is on the main thread which I shouldn't be blocking but my computation is some intensive CPU work that I cannot be running on my precious main thread what I do then is I used operator the separate will flow on I say flow on and specify like a dispatcher like dispatcher default and this operator affects all the everything above it and changes execution context for the preceding code so now this code execution program thread what's important here though even though my computation now in background thread and data is immediate from backgrounds the code that I see here still execute in collectors context the context of my immediate never leagues downstream and that's that's an actual huge difference from how our other reactive streams you might be working with operate in flow there's there's never ever upstream flow affecting the context of the downstream so when I look at this code and my main function I don't have to know how my foo is implemented I don't have to read the documentation word source and to learn was context interest new uses I always know if I'm running the main function on the main thread then I know the code in this main function is going to be executed on the main thread - so this is called context preservation that's very important property that are all flow operators maintain these Lissa's all discussion lists kind of a uses your flow in you eyes so because again caring about context is a big thing in UI and of course flow is a generic concept it's both back-end and front-end then front-end is big so we have some special convenience things for front ends that are specifically at our target for them now let's take a look how do we use flow in a front end so in a front end we have event and flow is it because flow is a synchronous data stream we can use it to present sequences of streams of events in our UI so we can have some function that returns a flow of events and in UI what we do usually we do we want to subscribe to those events so we launch a curtain in our UI scope that gets those events and start collecting them what do we do with those events we update some you are right so we call what we call why don't we call anything here is it we call update UI so and because we because of the context reservation we don't have to look at the events implementation we know because we launched this curtain you iscope our updateui is going to be cold in the iscope everything is great so this is very common pattern subscribing to events and there's a kind of a mouthful of code for this really common pattern lots of indentation you know to two sets of indented braces it's kind of reminds of the callback hell that we wanted to avoid so so we have a better way to write it so instead we can say let's take events on each event do this and then launch the whole data pipeline in this code so there is a special launching operator that specifically designed to make writing this kind of code convenient so but it's not all that we have to worry about in our your applications you know your applications I have lots of different lifetimes you know we have windows open that work for a while then it gets closed and when I close the window or activity or some action I have to clean everything it was doing and there are lots of these lifetimes of different durations in a typical UI application so how would you manage those let's take a look at traditional approach to managing life time so let's take look for example Aries Java so in your Java the flow the data stream is represented by observable and when I subscribe to this observable I get thing called subscription subscription represents the the active flow is hot thing you know is working now and because it's working I can't just lose it you know if you just call subscribe forget to do something that's bad because you know if my activity or whatever I was doing now is closed it would still be working in background consumer sources so what I have to do in practice and I have to create some composite disposable don't forget to add the result of my subscribe to it and when I don't need all my subscriptions anymore I clear my composite that's a typical pattern I use with directive trims but with flows it works in a different way so with Coughlin flows we will present a sinkers data stream as a flow and then to start subscription we launch and quarantine and we'll want you cartoon will to get a job so in job in this case is kind of cinnamon for subscription but there is one big difference between subscriptions and jobs you see jobs always is launched in some corrosion scope this is the thing that's called structured concurrency it's never stand alone it's never it's never on its own it's always launched in some scope and because of that the management of flight time becomes more convenient you see because we have to specify scope when we launch activity you know and the scope where we take it you know we in mark your application we can have some as main scope for UI all we have some utility extensions like in android kekeke so we can take a scope of our of our activity or fragment or any other UI entity so and then when this UI entity closes we cancel it or it gets cancelled automatically and with flow we can't forget to provide this scope you see there's no way to launch a curtain with a flow without specifying scope there's no way we could forget specify need and let our job just working loose there I mean this parameter in luncheon is required we cannot omit it we cannot do without it so it's it's actually similar if you look at UI specific ways to represent the synchronous streams like live data you see that observe method on Android like that actually also has a required parameter for scope so it's interesting how flow being not UI specific frameworks because it's both for front-end and back-end still manages to use those best practices from UI I reinterred frameworks is that whenever you start subscribing today that you have to specify a lifetime within which you do that what's next next is what's next so this kind of the was the overview of the flow as we currently have it and we still have a little bit of time to tell you about the status where we're standing and work we're playing to the next with the flow so flow as I told you the beginning is now a stable it was released in stable way in content curtains 1.3 so it means stable means all the core API is in core operators like filtering mapping all the basic things are stable there's still a bunch of experimental operators that we will be finalizing next releases and there some new features coming up so what are those in future first of all we plan to do more things for your applications we plan to provide out of the book supports for different UI models like representing your state representing your events that's that's going to come we also will support sharing caching and flows because you know flows are called things so every time you collected it started again but sometimes you know producing flow is expensive operation so you want maybe to share one process computing the values with different collectors that's what caching and sharing is for we also plan to provide more operators for concurrency realism so you can do concurrent mapping you know and with convenient operators without having to write this code yourself in the college of fashion we will also be adding more chunking and window operators so you can split your flows into chunks either by time' by size for example if you want to periodically save data to database on certain conditions etc etc that will all be added in one of the next releases if you something that's missing you have some use case that flow does not cover don't hesitate to give us feedback so it's an open source project on github come to issues create issue explain what you use case what you want to do and will either give you a pointer to how you can do it with existing operators or will figure out what your parade is we need to provide to make your life easier if you want to learn more about flows these are resources that will recommend first of all the official cost and length side has specifically written chapter and flows with lots of examples of how to use them how they work there are API Docs that lets you depth dig in depth to all the functions that are there and you can also read post in my medium this basically give the same kind of amount of information given this talk but just in more depth in more detail for those who are like to read so that's it thank you very much and remember to vote for the talk [Music] [Applause]
Info
Channel: JetBrainsTV
Views: 51,211
Rating: 4.9563317 out of 5
Keywords: JetBrains, software development, developer tools, programming, developer, kotlin, kotlinconf, kotlinconf 19, Asynchronous Data Streams, Roman Elizarov, kotlin flow, data streams, kotlin flows
Id: tYcqn48SMT8
Channel Id: undefined
Length: 45min 4sec (2704 seconds)
Published: Wed Dec 18 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.