Learn you Akka Streams for great Good! by Heiko Seeberger

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
okay let's get started I'm Michael and I'm here to talk about acha streams but something terrible has happened if you go to Martin's keynote yesterday who went okay did you see what happened to his slides he lost a couple slides very bad total disaster something even worse happened to me I lost all my slide I don't have slides so let's do some live coding live demos maybe you can help me a little bit okay so sometimes when I'm out of ideas please jump in I'm so glad to have Victor here one of the inventor of this thing here reactive streams if you don't know what akka streams is make sure you understand reactive streams but let's what we that's code okay first a little bit of a story about the basics armed extremes of course is built on top of vodka so therefore in order to use the API we also need to create an active system and we need to create this strange materializer which we'll cover later in some detail and before we start hacking let's take a look at what we are would like to do so I did prepare something it's not shown here anyway and let me just run the application I'm going through we have your own to write for you guys so this very advanced example of strings as you can see it's printing this learn racket integrate good seven times seven lines always indented a little bit more and obviously pretty slowly character the character that's what we are going to implement now step step oh no that's the solution let me go back to the initial state I hope you can see that alright so here we go um so first in the thing about how a stream processing topology could look like that behaves like this in Arcis streams there's an API to express linear training topologies and this is comprised of mainly three building blocks called source flow and think a source can produce elements a sink can consume element and a flow is the thing in between armed let's start with a source we have to import the scallop is l1 because for some reason scholar also has a net so a Java API and we start with a single element let me copy that single element so if we look at the type of this expression we see that is a source string and let's just ignore they're not used for the time being so the source is producing strings so far so good what can we do with it in order to the minimal stream that can be run we need to feed the element into the sink so we use two Combinator and we create a sink dot for each it's okay let me import that okay for each print ln3 format that's nicely and now we have a runnable graph so whenever you arm combine or connect all the inlets and outlets of the simpson source and flows you get a run of the graph nothing is unconnected and therefore it can be run and running means to invoke the run method so until here before invoking the run method this here is just a blueprint it's a cold stream nothing is flowing no element nothing has happened when we call run we actually run the stream it becomes hot the problem with this run method is it takes an implicit materializer and that's the reason why we had to define it up here the materializer take the blueprint and maps it in this case it's an active tool either on two actors okay um so what do we get as a return value not used all right let's give it a try and oh sorry no breaker and run it by the way look who is already using walker screams okay cool we will do some more of that stuff later no worries and you can see it's working unfortunately the program is still running it hasn't terminated although the stream has completed um and that's because the active system is still there it hasn't terminated we have in terminating it so let's try to do that next it would be nice to know when the stream has completed and this is when materials w's enter the state if we look at this I think for each R we get a sink of string and future of done so now let's talk about the second type parameter in extremes every stage sync flow source whatever custom stages has a materialized value or produces a materialized value when you run the screen and this value lets you interact with the hot with the running stream um some of the stages don't have interesting but us values like the source single it has this are just value had not used and that's like unit um nothing interesting but the thing for each tells us or gives us a future of done so that signal when it has completed so the question is why don't we see that if we report this it's a relative graph of not used and the not used comes from the source similar where have we lost our material value of the state for each well the reason why we lost it is this two Combinator the two Combinator just keeps the left materialized value so maybe we can just do a little trick and use two net of sorry - Matt is more like the same but we have to specify which materials values to keep now let's keep the right one if we look at the type now it's a render graph of putrid and gray we could also use keep both and we would get a tuple and that way we could keep or get all the material values of all the stages in a huge stream usually we are just interested in one or two so if we ran that we get this future and now we can do dot on complete regardless what it is failure or normal completion we go to the system when we terminate it hooray and we should not forget to import the almighty dispatcher we need that for the future here we go and if we run that again we will see the process has exited ray great by the way if you have questions you can ask a meta because we did the mic sorry for that so we are getting close to the solution right but so single is just creating a single element we need more than say we need seven so let's use source repeat but that creates a real stream an infinite stream okay so we have to limit that to seven could you help with that some such ah yeah thank you take of course hooray okay so try um next thing we have to do is take care of the indentation okay um any ideas how we could do that yeah we could slip with an index or something so sources or streams can be dipped um let's live it up here doesn't really matter but and that's even a liberal index but we can also sip it with another source um just to show you how we could create a source which produces deserve this okay arm and now if you look at that we have a source of string or sorry what what's wrong with a source of string in two tuples okay and we fail this we could we could not use map unfortunately this is not totally so we have to use case it's the string and that vdd be in and now we can do something wonderful say well i equals blank times and in scala we can multiply strings do that awesome and while we do s-sorry alright wonderful string interpolation almost like a boss John are you here no anyway and yeah that's it cool so there's only one piece missing which is we should print our character by character any idea how we could do that sorry totaling yeah we do that later all right yeah exactly so we have strings and we should probably map each string to all these chars the problem is strings a single element charge on multiple elements so let's see yeah exactly you know your API so there's a map concat map concat expects a let me just show it here expect function that takes string our case our to an iterable of some other type character in our case so that's perfect fit right the cool thing is scallop treats strings as sequences of characters implicitly if necessary so we even don't have to change anything well except for the fact that we lost the arm well no let me just do it like this we have a small problem which we will see here so we're printing each character so we don't do the println here but the print and now we don't have line break so we have to add a line break you're using string interpolation with the F string context and gray that well that was too fast right and you already mentioned we should throttle that thing so this is another nice feature or another nice API methods here we have the throttle we can say okay we want one element or not of course 42 per second I think I have to import duration and um and then we have to say all the maximum burst 1 and muttering mode shaping the other one is enforcing which would then fail the stream is upstream is too fast I think we are now getting there yeah cool it's just not printed at life easier in today but that's it thanks for your help guys um so um what we have a scene here is a simple example of a linear stream processing topology using sources sinks and using a couple of these built in stages like libtech map Concannon's on typically our marker streams would map each of those stages onto an actor when we are using this X materializer but by default there's an optimization turned on it's called Auto fusing so it will use all those operations into a single actor you could either turn off that in the configuration ah but I think it's a good default what you rather should do if you really want to have asynchronous boundaries in your stream are introduced those explicitly like this calling async this would now effectively map this blueprint onto actors one for those stages sorry and left ok one of the design principles of ARCA streams was to make it easy to create parts of stream processing topology they could be to for you to create sources or sings or flows and to reuse them and therefore each of those blueprints is immutable and can be freely shared because it's only the blueprint right I want to materialize it things start happening and I will start flowing and what we could also do but I don't do that live now I will just show you the final solution is that we could break the graph into reusable parts so I have split that into several random parts so this is seven lines but just the sauce repeat take seven so it has the type source of string I've used here I have defined a flow and I have taken seven lines via this flow so B is not a Combinator - well yet combined source and a flow so that has type string char not user flow that take strings and as an input and Whopper to have chars and finally print throttled is a sink um so it's flow throttle and then to Matt with this thing for each and yeah we can then reuse those components or you can use them like we did here but we could reuse it throughout our code base alright this is the flow API this is the API to define lunar stream processing topologies which is probably what you need in in most of the cases 80% I don't know but sometimes um you wanna write stream topology that is more complicated and rocket screams allows you to do that it has another API the graph API and when you use this graphic API you can build almost random graph even having cycles so I want to show you that but before that I would like to say that even if you use the flow API under the hood everything is represented as graph okay so this is like the one shop API for the linear stuff so the other use case I have for you guys is an access lock for HTTP server written without HTTP so last week I was that declined and they were asking me okay Aachen HTTP that the HCP so or why doesn't write an access look like Apache I said oh wait a moment actually peace just a library and you can use it to build an HTP server but because it's library doesn't have an axe log built in um and I started thinking and I said it should be probably pretty easy to implement um and who knows akka HTTP okay many almost most that's great so um for those who don't know I think it's not terribly complicated to understand what I'm going to show so I not gonna actually PU invoke this bind and handle method with an interface and port for the socket and a route which is the handler for handling requests it's a flow from request to response okay so it is defined in this method in this example here using directives yeah pretty into DSL to define those flows as a route and the bind and handle method as I said expect a handler which is a flow from request response so I started thinking it should be possible to wrap the handler in a larger flow which has a step before the request is moved into the handler and a step or a stage which is applied after to all the responses right so I came up with with that idea here now where is it that was not the demo sorry it's go I came up with that idea for a graph okay so if you look at that shape of the whole graph its flow because it has an inlet and an outlet and within that graph with a couple of stages so here we have a broadcast stage which takes requests and broadcasts the request to to our downstream stages so one goes to the handler of the requests and from the handler it goes to another broadcast the other broadcast for the responsive emits elements through the outlet of the overall flow but also to a zip state request goes to another flow which enriches it so we could add a timestamp or something here and feed it to the SIP stage too and then here we have our access log let's sink for in which requests and responses so in the simplest case you would just write stuff to a log like in this demo here we want to be able to do route with access log this is a function enriching the request so it's tooling up the request with timestamp and then we say access log and that's just a think it's a sink as you can see here R which takes the request and timestamp tuples plus the response in another out tuple and writes log the method the status and the time in micros but that makes sense so the only question is how can we write that nice graph because it's not yet written and let's try that so in order to do that we need to use the flow so it the graph API so we create a flow dot from graph that's the appear to do that because we want to have a flow at the end of the day here we have to use the graph DSL we have a create method on it and so we'll create method on it which so here we have an our axis lock the X log stock type alias for a sink which takes the imaged our request and the response so the first thing we need to provide as a one or argument to the create method is a function that takes a builder and in this case we also have to give it back the access log that's just a detail I don't want to dive into that and let's make the compiler happy by adding a couple of question marks it's always important thank you so what we should return from that method is the shape and ah we want to return a flow shape okay a flow shape has an inlet and an outlet we don't yet know what they are but from looking at this picture here we can already tell that we probably want to add broadcast our stage to that graph and if let's begin first we also have to import the graph DSL enclose it and then we can get started so ya see that so for let's add this broadcast request to our graph by using the Builder so the Builder is immutable builder and feed is only mutated as long so winner so we know the bill is immutable but it produces an immutable fill so produces a blueprint which can be freely shared again so we want to add a broadcast and the broadcast is if you look here define using the apply methods we have to provide the type of the elements it's a uniform fan-out shape it emits the same elements to all its outlets we have to define number so the type of element that is HTTP request right okay and we won't have two outlets good so we can then already use this broadcast request stage and use it in that to connect it so this is the first connection we are drawing okay um we also have another broadcast response that was easy we have our Handler and some other stuff so let's start drawing a graph oh sorry the block has weak response can also be used of course to connect it if it's all red so we have to say broadcast we like zero one doesn't really matter so to connect it to the flow shape arm in order to connect those various stages within the graph we could use normal methods like add edge I even don't know the exact names of those methods but we can use a very nice operator based syntax that's what I'm going to show you now so if you want to connect the broadcast request with the handler we can do it the following way broadcast request dot out zero arrow and or okay and think me an illicit builder here okay cool um so that at the edge that adds the connection between the broadcast across stage and the handler stage and as you can probably guess we can continue from the handler to broadcast response so we have already infant disappear cool um next thing you need to do is in which request so that is a flow that applies the given function here from a request to a so let's at that well in which across equals builder ad but we need to add a flow taking requests and we met a graph okay so the type of this thing is flow shape of a GPU crest to a so what we can do now is we can say okay we connect the other outlet of this thing here so our flow which requests great we finally need the SIP stage of filter that add we need to add a lip lip is not a uniform fan in our heed as to cut so I think in this case depending on how we want to zip it I think I forgot it damn on so let's say the first element is this one here the a which comes from the rich request flow and the other one is the HTTP response yeah great so once we do that we can say okay feed that into zip dot in zero that's the inlet for the a great and then we also need to feed something else into the lip then one mainly the our broadcast response dot out one okay because we have already used the out zero to stream elements to the outlet of the flow shape so therefore they connected with zip this way okay I mean this looks a little bit like like a cycle so you can see you can use operators in both directions that's pretty neat in this case I wouldn't even do it because it's not a cycle so it's maybe confusing so instead we can do that a block because grass is here so maybe to make it nice we can put it up there with like this I don't know I will later show this absolutely our sourcing is very nicely followed it so I think there's only one thing missing arm yeah and that is here the access log we need to connect connected to dip so let's say that dot out connected to the access log that is given up here at the program over a since we're great arm yeah that's a neat graph that does its job just let me show you that it's really working um I better undo my changes and show you the sample solutions as I just said okay so in the samples lucien i have tried to make it a live look a little nicer to really resemble this drawing it's not always possible perfect fashion but yeah hopefully I wasn't too bad at that so you can see from the broadcast request week let's flow to the handler and to in which request and from there to reserve and enter answer ok um so when we run that oh maybe I run it in the console that makes the most sense okay I'm running it here ok cool listening so if we send some HTP requests we should see the access log up here method path status code and so on great it's and interestingly I haven't found something like this mini library or Micro library in the interwebs so far so I have created an open-source project you can use it it's called accesses if you think that's useful it's a micro library micro all the things right but the whole that's really a whole code okay cool um so we still have ten minutes I would like to show you one more example of an even more powerful graphical initially I told you that grass could even have cycles um don't worry we don't have to code that ourselves that would be too hard um I will show you a neat little example um so in aachen HTP there is support for service and events you know service and event a couple of you it's almost like what WebSockets but only in one direction from the server to the client and let me stop that and Oh Stark sorry cool now we have a server which oh I don't know the URL sorry so with a server that produces the service and events ok and here we have another program which ones consume those service and events and in order to that acha no alpaca the candle Don right version of vodka has connector for source and event that's called event source and that has to be given at your eye to connect to the HP server to get the source of certain events a way to send requests it can start with a last event ID it's optional and here that's the most important one is a reconnect delay in case connections are are interrupted or just completed by the server so if we are if we run this here we will see the following hopefully oh yeah ok cool so you can see that for each println we get a service events these are case classes and we start with 43 because the the last we told the server we had seen was 42 perfect number right and what you also notice here is that after 10 elements there's a delay like 82 here three second delay so this server is not really giving us a continuous source of events but it just gives us ten and then we have to reconnect um and this lovely does it and we can even stop the server and then no more meant to print it down here of course we started again and this event source Klein is retrying or every three seconds and you can see it continues as soon as the server's up again um so this is quite nice to integrate or integration of micro services for example and what I would like to show you is how one can implement such an event source which we tries to get stuff from the HCP server and that is a quote from the alpaca project so again we write something using the graph DSL in this case it's a source not a flow but yeah it looks very similar to what you have seen before and here this is like the essence we have a graph that that has a cycle so you can see that something flows from here to emerge to another flow to broadcast and from the broadcast it goes to that flow but also to back in a cycle via that flow via that flow to the merge or here's even a nice picture but that's too much anyway almost so I'm it if I close can see here and nothing special needs to be done as you can see down there to implement the graph our little cycle you just need to connect the various shapes in the way that is necessary of course you need to be pay attention to cycles because you can either use deadlocks or something which hopefully is not the case in this graph here I don't want to dive into the details if you're interested just check it out it's on alpaca SSD collector and that's it most ylides sorry for that thanks for your help with the coding and I think we still have a couple of minutes for questions so once once this is compiled and you have the nice little drawing up top from from the code do is there enough information in that bytecode to produce the drawing or some of that kind of loss at that point no I had to do that drawing manually but input but I know that you did that but it does the in-memory model contain enough detail that you could yeah probably okay thanks oh I have a question regarding the graph you showed regarding the access log so given the at most once delivery semantics in Hakka I think you might end up zipping wrong requests and like what wrong requests with wrong answers in the zip operator how would you go about that about very good question so I don't think it's you just mentioned at least okay at least once they deliver guarantee yeah that's a good point but looking at the actual API in question we are dealing with the bind and handle which expects a flow from request to response now when I haven't told you is that our HTTP materializes a sub flow for each connection so per connection subset handler is materialized or run and the HCP protocol demands or 5061 one demands that ah there's a strict ordering so what I did here is valid but this is not valid in general so if you have a flow elements which oh that's putting other way the sequence of elements that flow into that flow may not be the same sequence that flows all over flow in general so that could happen in general but not in the case of HTTP so if this would happen before a bug in our case DP okay up there two questions yes I have a question about the state in graphs so I had a problem where I wanted to implement a TCP protocol which started off with a marker what kind of message was in the payload was of a different size and I had a really hard time to get working in a graph so that the general question is how would you keep state to sort of and become different types of flows within the graph I ended up building a custom stage and then is curious if there's any a smart way to handle these problems um so using a custom state is certainly a smart way to handle that problem I have done that several times but of course preferable to use the building stages if that's enough for a phase well serve the purpose so maybe it all depends on how complicated your problem is so there's a stateful map contact for example it's a built in stage which well stateful can accumulate state or commutable state okay so maybe give that a try that's not enough write your own graph state which is not too hard in my opinion it's a good API Thanks just a question about how do you handle two things after broadcast where one thing is becoming significantly slower than the other and you basically want to make sure that the fasting is one that gets the data so that you start dropping when you're like going beyond 80 percent of the speed of the other thing you want to start dropping so you're not interested that's the second thing which is always the best effort that you really like you have to broadcast so normally I think it will result in slowing down the whole graph so but I want like one thing to be like a primary sync and the secondary sync to be a best effort and like at the moment when it's dropping down to 50% start ignoring okay I would do that okay so that's various building stages which you can use to decouple the speeds of upstream and downstream so in this case Wow how they call is it called conflate victor is called conflate oh he's not people to pay attention um [Laughter] okay let me just check that so there's a conflict state allow a faster upstream to progress independent evil oh yeah that would be one so we fix it with that stage that accumulates everything maybe calculates the bushmeat average average thank you or whatever and then the slow thing can consume the average once it is ready to do that okay and then it's a couple of others okay so I'm running out of time um I'll be around so if you're interested to talk about it just find me somewhere I don't want to stand between you and lunch so thanks for your attention until the service [Applause]
Info
Channel: Scala Days Conferences
Views: 4,385
Rating: 4.9375 out of 5
Keywords: scala, scaladays, scala days, copenhagen, 2017, #scaladays, heiko, seeberger, heiko seeberger, akka, streams, reactive, cyclic, graph, code, learn coding, zero slides, talk, session, live demos, advanced features, stream, @hseeberger
Id: ryxrWVI3PMA
Channel Id: undefined
Length: 41min 55sec (2515 seconds)
Published: Wed Jun 28 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.