Spring Tips: The Reactor Context

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hi spring fans welcome to another installment of spring tips in this installment we're going to look at the solution to a very common problem something that I grant you is a little bit of a sort of small corner case in the ecosystem but one that none of s kind of intrigues a lot of people who start to think about it which is how do i propagate state across the sort of different stages of a reactive pipeline so think about what happens here in the reactive world in stark contrast to the more traditional approaches to to handling what web requests into a and to addressing those kinds of concerns in the reactive world code it's not guaranteed to execute on a given thread so this is as I say different than what you might have expected when using the reactive sorry the non reactive code in for example a servlet engine in a servlet engine you have a unless you specify otherwise by default it's one thread per request so a request comes in and there's a thread that's hosting that request and the response that gets produced for that request and that all happens in the context of a single thread and so machinery you know a mechanisms that we are with with which we are accustomed things like thread locals those serve us well in those use cases but they start to fall apart as soon as you introduce a synchronicity that is to say the possibility that work will hop to a different thread of execution this is a very real possibility in the world of reactive programming where execution can very easily very often hop threads of a you know hop from one thread to another in the in the process of producing a response so how do we solve this and we do need to solve this right this is this is something that has to be solved in order to address things like transaction management and demarcation until in order to address things like security context propagation right these are things that spring itself relies upon at lower levels but with which you may not have to directly interact it's a concern with it you may not have to directly interact but that needs to be addressed right today we're going to look at that mechanism it's called the reactor context so build an application called the weirdest use we're just gonna use the reactive web support here nothing else Oh Lombok weasel I'm back to make my life a little easier well hit generate that'll give us a new zip file which I'll I'll then open up here we go and we're gonna demonstrate a very simple example first we're gonna demonstrate just you know a simple rest endpoint that produces some data and we'll take a look at how we can use that data okay so let's suppose in our basic application this is we're gonna build an application let's just get rid of this one get rid of the basic one here I'll create one called well let's see we want to create a package I suppose okay it that first so simple and then in the simple package when you've got a new class called the simple application alright and this will just be just like any other springboard application it'll be a public static void main okay orgs and I appreciate that IntelliJ is trying to remind me after 25 years that I should I should have the brackets next to the type instead of the argument variable name like that which is what I've been doing so if I do that it gives me the yellow highlight I'll run there around the variable name thank you and I've been doing that because I came from C so sorry background so anyway spring application dot run simple application dot class and what we're gonna do is very simple when the application starts up we're gonna have a rest endpoint so we're gonna call this just data and we're going to return some data okay and that data will come from a publisher when you create that publisher in a second we want this to be a rest controller of course so we'll say at rest controller and we're just going to provide some data so where do I get that publisher well a flux of screen now keep in mind this is a reactive publisher to reactive stream it's gonna emit items of data that a subscriber can then subscribe to and process on different threads it may be in one nanosecond may be one year right there's no guarantee you any of temporal sort of coupling okay so what we're gonna do is going to build a publisher that returns just let's just do something simple let's say when I'm create a publisher that has names or letters I guess you can call flux dot just a B C okay and then we have a flux of integer all right numbers flux dot just one two three all right good now we've got that now we need to let's compose them let's say flux dot zip numbers dot map tuple and we're gonna take the tuple get t1 concatenate it with a colon and tuple get t2 is this useful no but it does demonstrate where we're creating publishers locating pipelines okay so combined and we're then gonna send that back now I want this data to I want these publishers to first of all announce where they are and what on what thread they're running and then I also want them to to possibly run on a separate thread so I'm gonna create a method here private static flux of longest T okay we're just gonna have a generic parameter there okay and so we're gonna say that we want to do in dot do on next right so what we're gonna do here is we're gonna provide a a callback that gets invoked whenever there is any kind of a processing okay actually we get to do on each as well that's probably you know more interesting I don't know if that's more interesting it might be yeah so let's do do one next I suppose that says that's the first step okay and when it's all we're gonna do here is we're going to consume the string and then we're going to say subscribe on and we're going to subscribe on a custom scheduler well the question then of course is what stitch what scheduler so I'm going to create one I guess or means final but scheduler scheduler equals scheduler dot from executors executors dot new fix thread pool alright there you go so there's my new scheduler and I'm cutting a custom schedule should you ever do this probably not right most of the time your scheduler will be the default one which is backed by the number of by a number of threads that corresponds to the number of cores you've got and each one acts as an event loop and so long as no none of your processing monopolizes the thread on which is running that's to say it doesn't do anything along with and takes to switch to the i/o then you don't really need this right don't need to manage this but if you are doing something with JDBC or something that blocks otherwise like Fibonacci or cryptography or Bitcoin or whatever that work should be done on a you know you have to scale out that interaction by having more threads and so it might become useful to do this in this case I'm just gonna provide extra threads alright so there we go I'm overriding I'm changing the incoming publisher by first of all configuring that I want to log out information I'm going to log out that information by using any log4j longer that'll plug in okay so long time info T and of course this could be written as a lambda and that can be written as root method reference like so alright very good so that's not too bad so we're gonna say prepare all right one and prepare and there we go okay and there's my combined you know pre-processed to publisher and I'm just gonna be turn the combined one all right so now let's just say we say we turn read all right and now we can restart this application let's see what we get go go go suppose we can inline this while fiddling around here good until today still thinks we have the old one that the older public static void main on which we sell them we first initialize the project so that's no longer true alrighty so now curl HTP localhost:8080 /data all right very simple all right there's our data and when we log it out you can see that things are executing on different threads right the number is executing on thread 3 here's the combined one on thread 3 as well here both letters end oh yeah that's all the letters on thread 2 and here is 3 for the numbers but here is thread for for the combined one so it was on thread 3 so they combined one and then it was on come on thread forward so you can see that the scheduler based on whatever criteria it needs to it moves work around different threads and that's that's one of the features right over the reactive API alright but that does make things a bit more complicated that means that we can't take for granted that if we insert something in a well-known place here that that data will then be visible and accessible to everything happening in this pipeline by the by this point here right we can't that we can't take that for granted because while these things may look imperative while it may look as though we've written things to be sequential they're a fact of the matter is that these things could be contemporaneous they could be concurrent right things might be happening at the same time on different threads and so we need to to rely of punch some other mechanism one way to do that is to provide a subscriber context okay now though I can text is an object is it's just a dictionary basically it's a dictionary that you have access to in in your pipeline that that reactor will take care to perpetuate for you right and the benefit of that is that you can make this data available you can make this data available at the very beginning of the pipeline and even if it moves across different thread boundaries it'll be visible only to the stages in that pipeline it doesn't matter which thread it's running on so we can use that to propagate for example you know application tokens and headers and three authentication principles and transactions and you know whatever thing you want to be accessible for a lifetime of a given publisher okay so let's do that we're gonna we have here this prepare method let's just try it with just this one okay so I'm going to give this a subscriber context okay I'm gonna say subscriber context and my job is very simple given a context return the context and I can do whatever I want with the context that can change it I can override it I can contribute to it etc etc etc that's one option and you know works for me I could actually do a set or app or whatever another option is to instead just provide the reference itself using the convenient builder method so here I can say context will be UID you know this would be my off ID or whatever okay your user ID there we go let's let's see and then we can actually store UID dot random to string never go that's a random string now that context will be visible inside the inside the stages of this pipeline so let's actually add that pipeline that's actually now you know cuz we've seen that the combined letters can execute across different threads so now let's prove that out let's say do on each okay so I'm gonna put a consumer that will consume a signal and here the signal is a representation of what method was a called was it the on next was it on subscribe etc so signal no I say if signal dot is on next right or put another way if it's not on next then return now otherwise we want to process it we want actually you know take the data from the con from the signal and so we can say signal don't get context okay and with that we can actually look up the the key all right so the key is called user ID so you can say get or default and we can say get that key all right there's our UID and we can log out log info user ID for this pipeline stage or data and then this will be signal dot get string dot is signal is user ID weather so here I'm logging out the the data that's in the on next method call and then I'm logging out the user ID that's been perpetuated along with that with that interaction okay let's try this out there you go look at that so now when I make the request user ID for this pipeline stage for a 1 B 2 and C 3 is exactly the same thing now keep in mind this one these are both on thread 10 these are both and you know different threads so you can see we have the fact that these things are being moved across different roads this is 9 that's a 1 and here's the same thing thread 9 so that's here's thread 10 for the B 2 C 3 is on beat is on 10 as well okay pretty straightforward right I mean you can kind of see what's happening here we just use the subscriber context to create a context object which acts as basically a dictionary and you can see that of there's a bunch of them here you know that you can that you can work with or you could provide a transforming function that takes the current context and you can use that you could a new context that has the existing values from the existing context and that also has new values that when our insert in there okay so that's the the theoretical and this is used to great effect in spring for example in in spring security right so if you go to spring security here we're gonna look for the the reactor context web filter reactor context web filters security okay and that's there you go and this code for example is in spring security in the integration for reactive code and this is actually a reactive web filter right this is a spring what web flex filter it's not a servlet filter - web flex filter and it in turn has a method called filter which takes a curtain HTTP request and chain and we configure the filter we say chained-up filter exchange propagating you know allowing things downstream to contribute to the the pipeline three active pipeline and then finally at the very tail end of that we add I subscribe a context taking the current context checking to see if it has a key if it does then we turn it otherwise we create a new context or actually which alter the context by adding the security context to it so they react so the spring security security context is a thing that holds the currently authenticated user so when you make a HTTP post or a form based login that information gets passed to some sort of authentication manager which then either confirms the you know it confirms that this is the person or client or whatever that is claiming it you know that it is claiming to be and if that's the case it then stashes that resulting authentication object somewhere well that's the security context holder and in turn that's the context that's made available through this filter at the very beginning of all web requests in spring web flex so now anywhere in your code in spring security you can inject the current authenticated principal because spring security can look at the current pipeline that's been modified by this reactive web filter and look at the context and extract out this object of type of security context our class that's the other thing I'm using a key here but imagine I wanted to store some more interesting kind of data I could say stat you know class user ID all right and I can do something like this I'm string UID well certainly you know I could do that and that would be here I can say instead of using a key I can use the class type right I can stash that and have that perpetuated along with the the pipeline as well obviously it'd be a smaller the data the better but the point is you can do both okay all right so that's a very simple abstract idea and I hope with spring security you can see the possible applications now let's be a little bit more concrete here I'm gonna take an example taken directly not even like I'm not even gonna bother writing this I don't even have to write this you know myself I didn't even have to imagine this there's a great example by reactor team legend the reactor reactive ninja extraordinaire seaman Bosley that is from C Mon Bosley contextual logging let's see if I can find this this amazing book okay so this blog from earlier this year looks at using the MDC looking at the reactive context object and using it as a way to perpetuate the MDC there the context that's made available for logging so when you log out stuff in your logs you have information keys and values that can be associated with that log that has traditionally historically been based on that thread local and so in this blog Simone looks at how to to you know rework things for ever so slightly to take advantage of the new context object in reactor so I'm just gonna show you that that's actually what we're gonna see next it's a very simple example and it demonstrates things beautifully beautifully so I I'm rather than do something completely different which would fail to meet the standard that he is said I'm just gonna use that so we're gonna create a very simple MDC application and this application it's going to be a Springwood application it's going to be I'm gonna have a logger and it'll be a rescue core and what we're gonna do is we're going to demonstrate that we can respond to an HTTP request and in providing that response we can associate information with the request that gets visible that's then made available to the logging context name the logging master data context okay so what I'm gonna do now is create a service just as he did a restaurant service and the service come on the service will provide a answer to the question of which restaurants come he gives it visit by max price all right there we go and we'll just use a double and price all right and when I'm a detail here class restaurant double price for person private string name all right this would be public static void main wouldn't get very far without that I expect all right spring application spring application got run MDC application dot class ARDS very good so there we go there's that right now what are we gonna do we provide a application that has constructors all right and we're going to return the data when we turn the data here and the data we're just gonna have some hard coded data I'm gonna just hard quote some data because I don't actually have a database connected to this though of course it's all reactive so we could talk to our to UBC we could use a reactive MongoDB you could use reactive Couchbase or Cassandra or Redis anything would work just fine but I am just gonna synthesize some data for our demo in the constructor here and in order to do that I am going to create a collection of data a hard coded response okay private final collection okay restaurant restaurants equals you concurrent skip this set all righty and that has to be comparable so we'll set new compare tour I'm gonna say oh one get price the person I'm going to do double one to the turn one compared to two alright and we'll create that collection and in the collection we're just gonna have some data I'm just gonna write some beta the old-fashioned way by generating it's all say I'm in stream dot I guess I just want to build a range let's just build a range zero to a thousand two objects I I integer to string restaurant I okay so that of my string Oh okay new restaurant and it'll be a double so we'll say new random dot next double I don't dot 100 okay and then the name will be string and then for each one of those I guess we can just we want to set that as a we're gonna add that to the collection there so for each this dot restaurant collection and right okay seems very straightforward okay so we're just gonna start up the application in the constructor we're gonna write a bunch of records to that collection which is gonna be comparable it's optimized for reading that for writing that's why it's it's no it's probably not a great fit but whatever it's a demo okay very simple demo and this is no longer true good restaurants okay so there's our data now here's our publisher here's our reactive endpoint and we're gonna say assuming that we find a record when say get the data this dot this dot restaurants dot hello stream so if I suppose filter when you say restaurant and then that will be when I say restaurant doctor kit price per person less than or equal to max price right so that'll be a stream of restaurant that matches our predicate here okay simple and from that we can get a publisher so flex dot from stream is okay so that's the that's the basic demo okay that's pretty simple right we're gonna create a little rest controller here just in case to have been that data so user ID you know crisis whatever restaurants restaurants let's just make this easy okay so I'm gonna assume a price as a category as a as a as a discriminator here but I want the user ID information to be visible in the life of the of the request so I'm going to say that this is the path variable and you can use request parameters as well it doesn't matter here and the double will be path variable double price alright so we want to inject the restaurant service like so and then already they turned this docked restaurant service dot get by max price price and I want that context when I log out the information in this publisher in this reactive publisher what I want to do is I want to log out information about the search then I want to log out the results of the search and then I want to log out the current you know the current application ID so let's now transform our little publisher here okay so private static lot specific to that okay teeth consumer key log on next and this will be a consumer long statement okay so there's my I am my handler and what I'm gonna do is I'm gonna change this I'm gonna change you know avoid adapt the results okay so I want to take the flux of restaurants in and we'll do work with it so I want to say adapt you could do this with a filter certainly results and I will return a flux of restaurant as before and here's we're gonna do all the work of sort of changing this result so I'm gonna say first return mono dot just strained format so I want to log out the beginning of the pipeline finding the restaurants and I guess we can actually um passing the price UID and the price okay string double price having price lower then okay so the dollar sign for that user ID okay and then when you string that format so UID that's right price UID and from there I'm going to say do on each and I'm gonna log out the results that come back so log on next and this will just be on in the consumer that I pass into this method all I'm gonna do is log and with that I'll then say I want to get the results so restaurant service start get my max price and actually we already have that don't be so I don't need to redo the search we just pass that along okay and then from there I want to log some more data so I'm saying here remember the restaurant data will be what we get back at this point so here I'm going to say log that info found restaurant for okay and then we are that get name or got to get price per person and then finally I'm going to set the subscriber context was that context of API ID and it'll be the UID expose that's what I'm that's what I really want to encode there okay so what's wrong with this now I'll do on each okay what is the problem we are going to ah don't want this to be a signal of T okay so that's gonna be a restaurant perfect alright so we've got that the restaurant was a little redundant and I'm fighting a problem that was real glad we fixed that now what I'm gonna do is I'm gonna say in this method this is where the actual work gets done so found restaurant okay it's a nice Dante an Italian with an eye so forgive me if the spelling is all yours all right so now we have our simple jobs given a signal we're gonna return a when I say if signal is on next or if it's not rather then just return okay but then we want to get the data out of the current context so I'm gonna say a PID like so I'm going to ask for the current context get or empty and the empty when you look for the API ID that's a UID I suppose is what we calling it and I'm gonna say okay if that's not no we're gonna process if it's there when you use it if it's not we're gonna process and I I made a mistake I generated this project using Java 1.8 but use 1111 actually it's a better version anyway and there's some nice API is here that make what I want to do here just a little bit easier okay okay so what I want to do is I want to say if that idea is there I'm going to say give me it or an optional that's empty I'm gonna say okay API ID dot if present or else so if it's there use it otherwise run the up runnable that doesn't exist in java 8 but it's there in job and 9 or 11 in this case so what I'm gonna do is I'm going to say if it's there there's a consumer of string yep present all right and the present you know like it's there I'll say let's just say UID otherwise run a bull run now or else okay not bad huh so and that runnable will be very simple we just say I'm gonna log the data out let's say log statement doc accept and I'm gonna call signal time to get some logging I'm remember this method takes a consumer and the consumer I'm passing in the value that's that's been emitted and I can do with it whatever I want if it's there however I want to take the UID and attach it to the current MDC and I can do this conveniently using the auto closeable feature here so there's an MDC closeable so I can say try closable equals MDC that put closable so it'll be alive for the scope of this demo of this interaction but no longer right that's convenient so while you know inside there let me put this up here dot run okay so it's a little confusing I grant you if credit or else so let's review here what's happening in this consumer which gets run on every each incantation I guess we could have called this on each but actually we only care about the on next events that's what we're doing is we're filtering those out if we're running on an on next event or method or whatever then we look at the current context and we ask for a that's actually just call this UID just be consistent and in fact I could have extracted all this out and I could do I could certainly have done a better job here a private final static string and would use the same key everywhere whether it's in the context or in my MDC scope or anything else okay so that'll make things a little bit simpler and I'm gonna rename this to UID optional there we go okay so now we can see I'm gonna have some sort of UID that's that's been packaged along the scope in packages along the life of this publisher and that UID I'll access here inside of this on next call back if the our next method gets called then I'll I'll check and see if it's on next if it is on next then I will run this block of code attaching the data from the current context that has the UID adding it to the current MDC context for the logger under the same key doesn't have to be the same key could be anything you want but I'm gonna attach it that way and of course one time once it's attached then I'll run the actual thing that does the work that we want to do which is the callback here that the consumer that the client code will pass to us so I'm just calling logs I'm basically invoking the function that somebody has given me I'm calling I'm gonna invoke it with the value that's been you know that's been provided for me in the current signal in the current generation of the current value that's been emitted okay so either it's present either the UID is present in the reactor context in which case I insert it into the logging MDC and then I run the consumer or it's not present in which case I just run the consumer but either way this has to get run it's just a matter of whether we set up anything inside of the Emma DMDC before we run it okay so that's the basic thing that sets up the reactor context that's a found to expecting one what did I miss oh all right is that right looks right there that should work okay that's better so now the only thing that remains of course is to set up the logging pattern so that we can actually see how this gets logged out and that's pretty easy to do in spring but-- so we're just gonna customize a logging pattern here pattern console equals and we want to say pound magenta thread cyan X UID okay highlight five-level logger 36 dot M message and and there we go that's our our custom logger that should kick in so you can see up until now I've gotten this sort of default spring boot logging pattern which is fine works just fine but we want to see that current app ID so let's do that are the current UI e oops ports already running applications already running on the same port rather so let's stop it and restart just the MDC application okay it's up and running and now I want to go visit this endpoint when I say I'm one too and so gonna be restaurants and the price will be so we don't know what price we have to it you should have logged out those prices well I suppose statistically something is gonna be less than that okay so JQ dot name it looks about right let's see statistically let's just see that smaller yeah okay so it's diminishing if I choose something you know statistically there's only 1% of the results is gonna be less than one right so so that's uh that's about fine okay um less than or equal to whether and so when I make these requests now let's see now I want to log out the information don't I so what do we have here log statement log info log info found restaurant get name here's a thread here's the app ID right versus 1212 we have 12 restaurants so now if I changes to be 15 you can see it says 15 so it's actually is so the request with a thread it's very hard to sort of prove this otherwise I mean let's see curl there should be a little pull post hey you know 20 restaurants 101 okay so there's two different sets of data I've actually just got two sets of results there but now we should see 20 and 15 and so on okay so this is a very simple example but it does demonstrate how useful this API is it's nice that the MDC API here in SL 4j is so convenient you can just it's an abstraction so we can easily in a generic way in certain things in the context it's nice that we have this this API you can use it in other ways if you want there's I'm sure you can think of things that you would like to perpetuate along the lifecycle of a given reactive pipeline this is how you do that you just add things to the to the context to the subscriber context and then you can pull those things out in either your your handlers or anywhere else in the pipeline you could stash it somewhere you could do all sorts of interesting things so my friend with that thank you so much for watching and thank you again to seaman Bosley who's got that great example it was bland and for the blog which you should all definitely go read remember this mechanism it underpins a lot of what we do transaction demarcation in and security context propagation all these kinds of things that you can imagine wanting to do in the old in the old world using thread locals you can now do in reactor and I think this is one of those things that sets reactor ahead of some of the other technologies in the same sort of category the same pack because this is actually one of the things that a lot of them don't have so I think it's a pretty interesting place to be alright with that with that my friend like I said thank you so much for watching and with that we'll see you next time you
Info
Channel: SpringDeveloper
Views: 9,508
Rating: undefined out of 5
Keywords: Web Development (Interest), spring, pivotal, Web Application (Industry) Web Application Framework (Software Genre), Java (Programming Language), Spring Framework, Software Developer (Project Role), Java (Software), Weblogic, IBM WebSphere Application Server (Software), IBM WebSphere (Software), WildFly (Software), JBoss (Venture Funded Company), cloud foundry, spring boot, spring cloud
Id: 5tlZddM5Jo0
Channel Id: undefined
Length: 45min 35sec (2735 seconds)
Published: Tue Jan 29 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.