High Performance Batch Processing

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
thanks everybody for coming I know this was a slot in the conference that was hard to pick choose and we appreciate you coming to this one my name is Michael Manila you already know about me so you'll be able to skip over this I watch my co-presenter sim soft though hi my name is Mahmud Benison I work for pivotal as a member of the spring patch team so I have the privilege to work closely with Michael on spring bat on a daily basis I also run a couple of open source projects that you can find at Jez dot org you can find me on github and Twitter so please feel free to reach out this is my first store cat spring one platform so I'm very happy and excited to be part of this great event cool a little selfless are selfish promotion I'm on a podcast called off heap Java off heap calm it's a pundit podcast so if you want to hear a bunch of people hanging out in a bar talking about what's going on in the JVM you know Oracle via Google what's going on with Linus Torvalds and the place stuff there check us out like said java.com as well as I also have a book on batch coming out later this year from a press it'll be the definitive guide to spring batch it's basically my previous book updated for all the spring boot stuff so check that out later this year all the slides and demos are out on github so everything you see here today you'll be able to download play with etc I'll wait I'll wait for the pictures on this one Cameras Cameras Cameras Cameras Cameras Cameras five four three two one all right please ask questions when you have them I don't want you have to wait to the end of forget and frankly I'd rather make sure you get what you need out of this session then us talking at you for the next 70 minutes quick lay of the land I'm assuming since this is spring one everybody's familiar with spring that to the case yes good how many people have you spring batch awesome cool cool so just like before this is gonna be the most exciting talk you've heard all week long cuz we're gonna talk all about batch processing while it you know it it has that stigma of being boring and whatnot I personally find it as one of the more interesting areas of computer science now many people build a website that handles the volumes that most batch processes do every day there's some really interesting challenges with in batch processing that that you just don't get in most other areas and we're gonna look at ways of handling some of those today but the concept goes all the way back to literally the beginning of computing batch processing is really about the efficient use of resources though you know you hear people talk about streaming data and reactive and all these kinds of things real-time processing those are great for speed and low latency but if you're looking for using your compute the most efficient way possible there's no better way than batch processing and today we're gonna talk about how to scale spring batch so if you have a when like the example I showed in the keynote just a few minutes ago that was a single-threaded job doing one thing importing one file that wasn't very big into a single table not very complex let's not kid ourselves that's not what most of you see in your enterprises you see much more complex use cases you've got a lot more things going on in a lot larger volumes of data and so what we're gonna do today is we're gonna talk we're gonna walk through the evolution of spring spring batch application we're going to talk about how you can enhance it by adding the spring batch features that allow you to scale there's five different ways that we'll talk about before we get there though about half the hands went up when I when they asked who is familiar with spring batch so I'm gonna do a quick 101 of spring batch lingo so that everybody is kind of comfortable with what I'm talking about yeah a funny fact about this slide so when we generated this cloud have you noticed that the skip word was somehow skip there so I wonder which kind of a is behind the scene but skipping the word skip anyway cool so a job so you've heard me talk about jobs job is the flow that you're gonna execute as within batch processing so this may be multiple steps maybe multiple things that need to happen but when you kick off a job the expectation is that there's no additional interaction or interruption until it completes jobs are made up of steps so job can have one or many steps so when I get using my keynote a minute ago I only had one step the ones we'll be looking at today we also only have one step but you can have multiple steps that transition one to the next to the next they can be sequential like I just mentioned or you can have steps that run in parallel which we'll look at today you can have all kinds of different configurations conditional steps as well so where it may or may not run based on some type of logic within a step most spring batch type step types are a tasket type so a task that is essentially an interface that we provide with in spring batch it has a single method on it called execute and we execute that within the scope of a transaction the idea being that if you implement the interface yourself we won't handle whatever you do in that transaction and then you get to choose do I run this again in a loop or do I just run once and move on to the next step and then we provide a couple of custom tasks let's the main one that we'll be talking about today is the chunk oriented task light so here's where you get chunk processing which leads me to chunk a chunk is is what it sounds like it's a chunk of data so if you're want to process let's say a million records you probably don't wanna process all those in a single transaction right you want to buy those up into smaller transactions and commit along the way also maintaining state so if something fails you want to be able to go back to the best-known state instead of restarting from from day one chunking allows you to do that so the chunk where any task that executes a chunk of the processing in a single transaction and then it will keep doing that until is essentially of data and then the step will complete a chunk is made up of items so we define the the chunk size by either hard coding or dynamically providing a fixed number of items or there you can implement a strategy that will allow you to dynamically determine the chunk size but the the its this is a key point when you're designing your batch applications which is what is an item you know if you think of banks have customers that have transactions is the item the transaction is it the customer with a collection of transactions you know that kind of art around that is is probably one of the harder things you have to work out when you're designing your batch jobs within a batch application this is what you might see so step one you'll have an item reader and an optional item processor and an item writer the item reader within the step is responsible for the input of the step the item processor does whatever processing it is it takes an item does something in returns an item an item writer is responsible for the output of the step now it's important to know step one is going to start process and finish then step two will start process and finish then step three will start process and finish each one of them going through that loop of reader processor writer over and over inside it so make sense yeah so the question was a way to pair all the pieces within a step so reading ready yep so we'll actually get to that in just a few minutes there are five different methods to scaling spring batch applications multi-threaded steps which is actually what you're asking for that's where we execute a chunk each chunk in its own thread parallel steps we run two steps in parallel or multiple steps in parallel async item processor and async item writer these are two components used in in conjunction with each other to execute the processing piece of a step in parallel with the reader and writer essentially being synchronous there's partitioning which if you're familiar with how that works in you know let's say Kafka or whatnot it's pretty much the same concept and then finally remote chunking we're gonna walk through all of these in much more detail over the rest of the time the use case we're gonna be doing is really complicated we've got some transaction files we're gonna import we're gonna run a batch job that imports them into a database like that really complicated the idea here though is I don't have focus on the use case or the domain model I want you to be able focus on the spring batch features themselves so I'll start off by multi-threaded steps so with a multi-threaded step what we do is we provide a task a task executor that launches additional threads to the step and what it will do is it will launch each chunk in its own thread so when I mentioned earlier the the chunk where he ate a slit and how it it essentially wraps this execution so the reader processor writer all of that is wrapped within that chunk where any task let each call to that task late' is called in its own thread so that allows you to execute chunks in parallel now there's some caveats to this right parallel processing typically means you don't get restart ability or you--if you may depending on how you write the the writer most of our out-of-the-box ones do not support restart ability in this particular example there are other ways to handle this we'll get to later that you handle it but this is by far the easiest and fastest way to scale spring batch up let's take a look an example so the question was is basically how did the chunks divided based on when you're using this multi-threaded model files are actually a really bad example because files aren't a data model that that works very well with it's actually if you think of a database that is is a little more accurate it's not the way you think though what you're you so the example you gave was you know records 1 through 100 would be chunk 1 101 to 200 mean chunk to and those would each be in their own thread that is not the case with multi-threaded steps we'll get to a way on how to do that specific thing but when you're reading with when you're doing multi-threaded steps what happens is the reader is going to read whatever is the next available so if I have let's say 200 records and I got 2 threads going chunk one maybe 1 5 7 8 9 15 20 you know those are the records and the other Junction to pick up the other ones so there's no ordering guaranteed here yeah so the question was is how do we how does the job repository work with melting threaded steps in most cases we actually enter documentation tell you to turn off the save state piece before that reason there's most of our readers are not are not threads well I shouldn't say most of them we specifically call out which ones to turn off the state save state on because things like reading from a cursor will first sort of cursors aren't even thread safe but a lot of the way we keep track of the state of what has been processed is by things like item count so by doing that you know if you have multiple threads going at the same time we that count may not be accurate so you turn off the save state the jabbered process Tory works fine but you lose restart ability we'll get to more restartable options as in other examples before alright so the application you are going to scale is a spring boot application so as Michael described it will be a spring fashion job when everybody read down the back is this okay so the input file is this one so it's a pretty big so I'm going to show you just some records yeah so we have some transaction data this is the account the amount and the timestamp basically we will read data from this input file and write it in a relational database in this transaction table so here I have my sequel running locally on my my laptop alright so here we enable batch processing with this annotation so we get a handle of the job builder factory instead build a factory to create jobs and steps the job definition is this one so we have a single step step one so this step is a chunk oriented step we will read 100 items at a time in a single transaction we have a reader and the writer so for the reader is a flat file item meter so since we are reading data from a flat file so we give it a name resource which is the input file passed as a job parameter here it's a delimited file by default it's the delimiter is common so we specify the fields we want to map to the domain object transaction here we have the account amount and time stamp and how to map those fields to the domain object basically the format of the data and so on that's for reading for writing we use a JDBC batch item writer so we we provide the data source which is configured automatically by spring boot according to the properties I have here in my my properties file so it points to the localhost so we give it the data source the secret statement so in this case it's an insert in the transaction table for these fields and we basically say get those fields from this transaction todo so the question is can you provide the sequel on a file yeah you'd use regular spring boot property injection so anything anything you do with regular property injection with with spring spring frankly Spring Framework you can do that okay so this for writing and that's it for the step so by default this step will run in a single thread in order to make it a multi threaded step all we need to do is to add a test executor here as executor test executor is an interface from Spring Framework and we can use whatever implementation we want so for instance here I'm going to use a thread port task executor to reuse threads with four threads and that's it so let's run the app so the input file contains 60,000 records I guess so let's see yeah so I'm expecting to have 60,000 records in the transaction table it's done so the job is finished here let's take a look at how many records we have here and 60,000 so that's it for multi-threaded steps that make sense all right the second option in scaling spring batch applications is parallel steps so here you have two single-threaded typically it doesn't have to be steps they're just completely unrelated but they have to be done before something else so let's say I've got a store import and a customer import there's no referential integrity between them I just need to get both files imported into my database before I run some type of aggregation later on down line I can run those in parallel instead of sequentially and speed up speed it up obviously here's an example of what it would look like so you've got step one step two and three will will execute in parallel it should be noted by default spring batch will not execute them in parallel you have to provide a task executor that will do this for you by default we use the synchronous test executor really because there's no other good default it's otherwise you just be kind of randomly saying it for sounds good for threads which is just isn't a good default so you provide a task executor that will execute these and threads and then all sides of this need to execute and finish before the job continues so if I had let's say instead of one over here let's say I had two steps that had ran in the sequence and one step over here both of the ones on this side and the one on this side would have to complete before the step after those continues on that make sense all right [Music] so in order to show you how parallel steps working pregnant working practice it's clearly database used to lon you so in this case we are going to read data from two different files one is a CSV file the one which we saw earlier and another file which is an XML file the XML file looks like this let's take a look so basically it's a we have the same transaction data but in XML format so we have a tag per transaction we have the account amount so in this case we are going to create two different steps step one and step two and we want to make those run in parallel step one is the same as we saw earlier so same reader and writer the reader will read data from the input flat file and step two is exactly the same except the reader here which is a XML it's a Stax event item reader to read the XML data so we give it a name resource which is the input XML file passed as a job parameter here we pass both files as parameters to the job we need to specify the tag name the root element name here on our domain object we have XML root element we are going to use a Jack's B and Marshall and Marshall transaction data into this object and we specify the unmarshal here that's it now if you want to run these two steps in sequence we would write something like this so we create a job we start with step one and then we run step two so those will run in sequence but in this case we want to run a parallel flow so in order to do that we will create a parallel flow so this is the flow API from spring batch so this parallel flow will start with step one and then we split the flow using it as executor to each step in parallel with a different threat and when we speed the flow we add the second flow and the second flow will execute step 2 so we start with step 1 split the flow step 2 and we run the amount in parallel so we build the flow and the job will of course run the parallel flow and this is how step 1 and step 2 will run in parallel in two different threads thanks to this task executor here all right so let's run the app we're going to clean up database [Music] so as you can see here we have step 1 and step 2 running in parallel in two different threads okay the input file the CSV one is the same as we saw before it contains 60,000 records this file the XML file let's check quickly how many records tags in there so I'm going to do something like grab count the action tag in the big transaction XML file so we have 110,000 tags so I'm expecting to have a total of 170 thousand records in the transaction table when the job is finished so it should shortly finish here we go so the job is completed cool let's check the database and we have one hundred one hundred seventy thousand records in there so that's how we run two steps in parallel within a job yeah so the question was is important to write thread-safe types when writing your jobs when you say writing thread-safe types do you mean the item itself or the other components that batch uses the job components themselves yeah those it depends it depends what you wanna use them for I yeah red safety is a good thing to aspire for but quite frankly not all of the readers for example in Spring bachelor thread-safe the cursor one isn't the jdbc cursor one isn't but that's because a data source and in Java isn't thread safe there's nothing we can do about that so you know it really depends on your use case obviously do the best you can and you know use those components in the right in the right manner there are ways to use threading with in spring batch and there's ways to do parallelization without using threads that you can also take advantage of flex of those parallel mechanisms without having to get into those so the question was is how does spring match nowhere in the file it is is that fair yep yep so the so the readers are stateful so it opens up the file at when the step starts and then it's reading from that a single input stream all the threads are hitting that same input stream in a file case in a database a situation you'd be hitting the same result set you know or there are some nuances to that but yeah so it's really dependent on the implementation questions yeah excellent question so the question was is there a way to paralyze reading from the same file we've looked into it quite a bit actually there was actually a flow request open by about three years ago now where somebody tried doing that with offsets and whatnot we never accepted that pull request because it was our fear that a lot of people see more threads equals faster I'll go with that one and all of our benchmarks said that they were it was a wash sometimes it was faster sometimes it was and it was very Hardware specific we'd love to if somebody was interested in in in contributing back in working with us on that idea again we'd be open to it but based on the hardware we have access to and that we were able to try it out with I couldn't get a definitive you know you need this you know these are the check boxes you need to check in order to make that faster and I just saw Stack Overflow blowing up very so the question was how does basically sparked able to do it and why can't batch so spark it has a much more dedicated infrastructure than bad batch does bachelor on commodity hardware you drop spark you've got us up a special cluster for it handles a lot more things at a much lower level than spring batch does spring batch goes directly through more egular java api is for a lot of the stuff for spark doesn't yeah i'll leave it at that there are other ways to gain performance that is similar to spark from a reading perspective with spring batch just not here's a one big massive file do it that way when you think of how most spark jobs run they typically run on an HDFS right well that by nature is a distributed file system that's not what we're talking about here if you throw a spring batch with a partition job against a distribute file system you can see read numbers that work that are we are in the same ballpark that make sense in again it would be interested in fact like said I'm looking for that list of checkboxes to identify what that hardware is because just doing the same thing on a laptop for example it's actually slower for example so yeah so the question was if you got a file let's say a few gigs you feel for that whole thing in your in your heap space right so with this chunking model the only things that are kept in the heap are the items that are currently within that chunk so if I've got say I've got a file that's you know 10 gigs large if my chunk size is a hundred records and that's only let's say 100k I'll leave a hundred K sitting in heap at a time but it's an it's an input it's an input stream so it's only pulling data off as it needs it Java doesn't when you open up an input stream load everything in the memory otherwise no Java app would work so you will have at most chunk size item in your heap and not the whole file this is the idea of chunk processing cha-chang all good stuff and the else for movie on cool async item processor an async item writer so most batch processes are i/o bound so the the bottleneck in batch processing is typically how fast can I get stuff off disk how fast and I get stuff across a network it's a database those kinds of things are typically the bottleneck and a batch process however that isn't always the case so this offers an example of how to parallel eyes the processing piece of it the processing piece here the what happens is you take your normal item processor so let's say I've got using the example from riff uppercase so I want to uppercase a strength for example for each item that goes through that the item writer is decorated with the async item sorry the item process your custom item processor is decorated with hours and hours is going to launch that item processor in a separate thread so for each eye I'll launch a separate thread obviously you can use thread pools and that kind of stuff just for the processing piece the item processor instead of returning the result of the transformation returns of future the future is then passed to the item writer and the item writer will then call future get for each one getting the values out and writing them as they return that makes sense it's important to note that you do have to use the item the async item processor and async item writer together because otherwise you'd have to unwrap your own futures in the writer piece yep so let's see it in action [Music] so in this demo we are going to reuse the same file reader and writer and we will add an item processor here to simulate some processing on each transaction let's imagine the processing takes five milliseconds so we were going to simulate that with thread dot sleep five milliseconds so by default the processing so the item processor will run in the main thread and what we want to do is to offload this processing to a separate thread so in order to do that we are going to wrap to use to wrap our processor into a async item processor so this async item processor will have our regular processor as a delegate and we need to set a task executor in order to execute the processing in a separate thread so as Michael described this async item processor will return a future of the processing result coming from our regular processor here now this future needs to be unwrapped to get the actual processing result and then written written using the item writer and this is the role of the async item writer so the async item writer will have as a delegate our regular writer it will unwrap the future and then pass it to our regular writer to write it to the database so important to mention as Michael said these two components should be used together in order to do the processing and writer asynchronously so now now that we have defined these two guys we can use them in the step here processor async item processor and writer async item writer so we use them instead of the regular regular ones the input file is the same so let's run the app so 5 milliseconds times 60,000 records how long is that so if you do the math that should take about five minutes sequentially see ya sequentially without the async item processor and writer write here the job is running I hear the fan Yellin let's make sure it's finished shortly nope so the question was it's a new feature it's not new but it was kind of hidden for many years so there was a separate project called spring batch you had been for a while we've since sunsetted that in favor of spin cloud dataflow but this and another component we're gonna show in a bit at the time that project but that sund'ys were created that project was iterating faster so he kind of dumped it over there so he could get it out faster it wasn't till spring batch three that we brought that stuff over so the question was is a still processing chunks yes so the chunk concept still applies so we're still processing one transaction hundred records the only thing we're parallelizing is the transformation piece that processor piece that's the only thing that we're changing here so how fast did it run I'm just going to yeah so the question was is this work with the flat file item writer yeah because the item writer in this case is not impacted in any way yeah so the question was is there an ideal range for a chunk size it's very use case specific so how big how big of a box you're running out how much cheap space you have how big your items are I'd love to say X but yeah yep so the question was is there an empirical way of determining what that chunk size should be benchmarking get to in just one sec Jordan yeah so as we said instead of five minutes the job took 35 seconds so that's a huge difference I guess between synchronous processing and asynchronous processing we can run the five-minute version if you want there's a question the back so the question was is there any consideration or concerns about transaction boundaries with these async processes all of these acing processes are still within the boundary of a transaction nothing spans the boundary of a single transaction a chunk is what defines the transaction boundaries and all of these things that we've shown so far are within the size of one chunk sorry somebody was coughing I missed the so does it does it tie back to the overall transaction boundary of the job itself the job itself doesn't have a transaction boundary there is no global transaction for an entire job if you want to roll back something let's say a job failed and you need to roll back all of the work all of the chunks within a job you need to implement compensating controls for that as opposed to just roll back a global transaction kind of thing other questions on that that's been the easy stuff so up to now though everything we've talked about so far is thread base single JVM I think we all are aware that not everything fits nicely into a nice simple Jai single JVM there are just workloads that just need more processing power whether it be more I Oh more compute etc partitioning is one of the first two options that we have for both within a single JVM as well as remote partitioning for multi JVM here we have a master worker configuration so the master has two key components a partition ER and a partition handler the partition Earth is responsible for understanding your data and how to divide it up so if I've got let's say a directory of files the partition er may say I create a partition per file if I'm looking at a database table it may look at a key and send you know partition based on those keys the partition handler is responsible for communicating the metadata about each partition to the workers so if I'm doing files and I'm doing a partition per file it'll say you work on partition number one you work on file number two you work on file number three and so on it's important to note that in this model we're only sending meta data over the wire if we're using remote partitioning the partition er we provide one out of the box it's a file based one you throw a directory of or expression of resources and we create a partition per resource it's a simple interface it's really easy to do just about anything else you'd need a partition handler there's actually three options to within spring batching one with in spring cloud tasks the two with in spring batch are a thread based one which is the task executor a partition handler and a spring integration based one which is the message channel partition handler that one uses spring integration channels to communicate between the master and worker the third option which is provided be a spring cloud task is the deployer partition handler here backing up so the thread based one will launch threads inside a JVM that's normal the message channel one your workers have to be read already running waiting for that work before we send it or you can use durable hardware and I'll be sitting there waiting when you look bring them up but it's I can do the processing until those workers are there you're responsible for orchestrating the workers I should say the third one with spring cloud testing the deployer partition handler is it will launch the workers dynamically for you on a platform so if you want to run this on cloud foundry you can configure it to launch the workers as tasks at runtime so let's say I need I've got five files I need five partitions I need five workers it will dynamically launch five workers they'll launch they'll run and they'll shut back down giving you that cloud elasticity that we've been looking for yeah so the question was does the partitioner basically the master have to be running at all time it has to be running the whole time the workers are running so cuz it's going to keep because this is a single step if you had steps before and after it could be and it's not gonna advance until all the workers are done anyways so there's that a master is responsible for that aggregation of did all my workers complete or not something else oh the deploy a partition handler it does you can also configure it with a high watermark so if you have you know the example I used in my keynote a few minutes ago if I dump a thousand files at it you may not want to launch a thousand tasks all at once somebody might get mad at you for that so we can actually configure what's the max and then it'll work through and keep basically that same amount busy until it's got throwball so the question was this will that can you partition a monolithic file like what the answer is no you'd have to split yourself you can use command line stuff like literally all xsplit works fast and I mean I've seen plenty of jobs do that they'll have step one split file step two partition file that kind of thing it's important to note with partitioning one of the night get one thing with partitioning restart ability all works nicely each one of the partitions are all each one the workers is its own independent step state is managed at each one for if any one of them fail and all the others finish when you restart the job just the ones that failed will be reprocessed now the other ones will be will be taking reprocessed yeah how do you test which like this so the question was how do you test this any specific piece or in general or [Music] so these so basically it's how do you test like restart ability for this these one of these bottom workers is no different than if we were to take this and drop it in that stop that top spot and just let it run and part of your job so testing these is exactly the same as you would test it if it was part of a regular job the orchestration piece yeah there's a little extra work there you'll see how my mood bootstraps things actually be interesting to open up a couple issues to talk about ways we can maybe help out with that yeah maybe after the talk I can show you we will be running multiple workers and the master step and then it's just killing one of the workers and see how the master will handle the situation so if you want after the talk I can show you how it works so we're gonna be demoing remote partitioning so if you're familiar spring integration or the integration enterprise pet or enterprise integration patterns these icons should look familiar if you're not I'm sorry but so basically the way this flow works is you've got the master in the worker obviously have one master many workers we start off with the partition handler because remember I said that was responsible for communicating to the workers what the process so going across the top we it sends out requests messages to an Alpina adapter the adapter is connected to rabbit in our case so rabbit will spend send the stuff background to the inbound adapter at the top of the worker goes into a channel which goes to a service activator which executes the remote step the remote step is a standard spring batch step it logs its state to the job repository all those kinds of things the remote staff then says hey I'm done service activators to send a message back to the replies the up on adapter over rabbit back to the inbound adapter where it sits in a staging queue we then use an aggregator basically waiting for all of the stuff to all the other steps to to complete and then we'll release it once they all come through and then partition analyst a yes or no it passed or failed the return trip of this is optional we also support the ability for you to pull the job repository since the workers are maintaining their state in the job repository you can just pull that as well if you don't want to have the round the the trip back so you look alright yeah over here so the question was can you customize the metadata coming back to the master you can but there's some constraints about how that's promoted and whatnot and let's talk about that one I'll fly yep yeah so typically the question was is what happens if the worker fails and let's say it doesn't send a response so typically the aggregator will have the time out you know then it'll say and you can even configure it send what's the property send incomplete results or partial results something along those lines it'll send those along and then the master will say hey I didn't get all my stuff back this step is a failure right right well so if a step fails the job is considered failed anyways so then when it starts back up it'll start back up at that partition step soomi that's why you configured it it'll start at that step in a long run we're on the partitions that failed so forth all right so for this demo of remote partitioning we are going to read data from three different files and for each file you are going to create a partition as Michael described we have a master worker configuration so here I have one configuration class for the master and other one follow the worker the first thing to do is to define the round trip between the master and the worker as we saw on the ski on the diagram so basically on the master side we have two flows and outbound flow going from the master two workers and an inbound flow from the workers to the master so using spring integration this is pretty much easy to to implement we have a channel called V quests and an outbound adapter which will take messages from this request channel and send them on the request queue on RabbitMQ so here I have a rabbit MQ running locally on my laptop and I have defined two queues requests and replies so this is the outbound flow from the master to workers now the inbound flow coming from workers we have a channel called drip replies and an integration flow so we have an inbound adapter which will take messages from the RabbitMQ queue called replies and put them on this replies channel so this abstract the way we communicate with with the rabbitmq we can change it change it to GMs if you want so this is another level of abstraction so now that we define the outbound flow and inbound flow on the master side we need to define the partitioner here we are going to use the multi resource petitioner this guy takes a bunch of files here we are passing them as a job parameter so here are the files so we have a white card that will take all these files here and we'll create a partition per file and the key will be fine now in in order to glue all these pieces together what we added in spring patch 4.1 is a remote partitioning master step builder this is available in the application context if we use a new annotation called enable batch integration so this is to enable the integration piece in spring batch so once we have this actually this help building a master step in a remote partitioning setup so if you think about it on the master side all I have to do is to say here is how I partition data here is how I create partition where should I send the request to workers and where should I get the replies so I can specify my partition ER and the name of the worker step that will be executed on the worker side an output channel from the master to the workers it's an output channel so it's request see and an input channel on which we will receive the replies so this is a master step the the builder will behind the scene create the partition handler that Myka described which is the message channel partition handler before spring batch for that one you we had to configure this manually so with this new builder it's created automatically so it's a bit declarative I'm a master step here is how i partition data I want to send request on this channel and get replies on that channel and that's it now on the worker side things are pretty much similar we have to define an inbound flow coming from the master and an outbound flow going to the master so the inbound flow so those are requests coming on this channel an inbound adapter from the rabbit queue and to send replies back to them to the master we have replies Channel and an outbound adapter to the reply skew on RabbitMQ so those are the flows now I want the worker side as we said we execute a step so it's a spring batch step we created another builder so this time it's called remote partitioning worker step builder so this guy will create will simplify creation of a worker step so on the worker side what we need to specify is where do I get the requests from master where do I send the replies to the master and then I define the step as usual so it's a chunk oriented step with 100 items the reader processor and the writer let me saw earlier so for the demo I'm going to use two spring profiles master and worker let's see how it works so I'm going to launched a couple of workers so here you have commands is so good yeah so the question is is it mandatory to use these profiles the answer is no we can create two different jars a one for the master and another one for the worker and then you can run just a spin boot app here it just for convenience one jar and we use it we use the same jar for workers and master just a matter of fun it's also not uncommon to have the worker or the master also be a worker since it's just waiting while the other workers are processing it can also be participating in the work all right so here I'm going to run a first worker sorry question another question sorry yeah yeah so the question is what happens if a new file arrives when the master is already started okay normally when you start the job and the master starts creating the partitions it's already done so partitions are already defined we know the files so if you have another file coming after creating these partitions I think you need to start another instance of the job otherwise you use another mechanism of listening to the input directory and then creating one partition on the fly yeah that's need to be done differently the reason for that is restart ability we need the the data set to be immutable once the partitions are started so if we were table in just that kind of stuff that means the data set could be changing at any time now granted you still have to do some things like if you're processing it files is the easy one where you can just move them out of the way or just we you know what the file names are well let's say database records for example you know if that's changing between runs you use something like a listener to flag you know these are the ones that are being processed and that kind of thing yeah usually for batch processing the data set we work on is fixed otherwise it becomes swimming it's the same issue happens when you write a batch job for instance that weeds item from a queue so let's imagine you start the job it reads some messages from the queue and then another message comes in so this kind of input is not fixed so normally for the file case you should operate on a single file set so that when you restart the job you have the same job parameter and you can reprocess the famed file for instance so here I started a first worker I'm going to start another worker so these two workers are waiting for work and now a master so as you can see here so the master sent messages to workers and workers are processing a transaction data so here the impeach input file each input input file contains 20,000 records so we are expecting to have 60,000 records in the database after the job is finished so looks like workers have finished yeah and the job is completed so let's check how many records we have here 60 thousand records so all partitions were processed if you noticed we have three partitions since we have three files but only two workers I launched only two workers so one of the workers process two files and the master then aggregate aggregated replies from these workers and finish the job execution one may be important thing to show is in this case if we take a look at how many steps have been executed we have four step execution so one from for the master and three for each partition so each one read and written 20 thousand records that's it yeah so the question was is our way to dynamically start the number of workers that would be using the on a platform you can with the deploy a partition handler that what it actually does is it will once the partition handler piece so once the partitions have been figured out okay I need five partitions it tells the partition handler and a partition handler will then launch let's say if I need five partitions over launched five workers or courier kubernetes yet it was to their support yeah yeah and actually I'm not gonna put you on the spot to do it but I think you should be able to do on when you run your commands you should be able to on that master one do master comma worker and it would launch it would that the master one would serve it both as a master and a worker I'm not convinced about to do that like we can no worries but indeed we're why the workers are working the master is sitting there doing nothing we can use the master as a worker at the same time and this is very good point indeed so what does shut downs the workers in this model it's up to you it's your problem that's the that's the benefit of running this on a platform with the deploy a partition handler because it launches them in a platform native way that that they automatically shut down so on cloud foundry it runs them as tasks which basically once the JVM is done it shuts down on core identities they're run as I forget if it's bare pods or jobs I know we were bare pods and I think we're gonna switch back to jobs but in either case they shut down once the processing is done it's also possible to shutdown the worker with a listener so once this step is done you say I'm finished we can shut down the JVM yeah so the question is how the master knows how many workers are there and how do they finish basically okay so when we create the partitions we know how many partitions we created right so we will send behind the thing we will send messages and we have a correlation ID so we know how many workers are there and so we know how many replies we are expecting so if one of the workers fails the master will time out and say look I started three partitions only two have been processed the other guy is not here so I failed after one hour for instance yes it's configurable on the messaging template that you use to send the requests yes sorry the question was is that timeout configurable and the answer is yes the last option so partition remote partitioning is great for general use cases it's especially places where your i/o bound but sometimes you need additional processing power but in beyond the single JVM this is a remote chunking comes into play so here we have a it's still a master worker configuration the master has a regular item reader and then it as a special item writer the item writer what it does is it's going to actually write out instead of writing out metadata about what to process it's gonna send the actual data over the wire to the workers the workers are then going to pick though that work up and then it'll do the processing and then it can either write locally or send a message back to the master to be written all there it's important to note that this is a very i/o intensive option so it's only useful when processing is the bottleneck that's why partitioning is by far used more frequently but there are use cases where processing is the the bottleneck this comes in really handy unlike part remote partitioning where you do not have to use durable middleware in between the master and the worker because the job repository takes care of lost messages and that kind of thing indeed ooping this case because you're sending the actual data over the wire you do need durable messaging middleware in between the master the worker so what does this flow look like so this kind of ends up at the chunk messaging channel item writer that's that special item writer so it's going to send a message to the request upon adapter over rabbit inbound adapter request service activator most of this should sound relatively familiar it's the same basic flow the only difference that was instead of sending metadata over the wire we're sending the actual data over the wire you'll also notice there's no aggregation on the reply site on the master because all the writing in our case we're doing it at the worker level so there's nothing to aggregate let's take a look at the selection yes so can you basically can you run the async item processor an item writer in remote chunking you can at yeah yeah you can and actually you know although I laugh too now that I think about it depending on what your processing is it may actually make sense depending on the size of the workers and whatnot so yeah just like in a partition job you can also have that be able to thread and step inside the each worker for example alright so for remote chunking we are going to configure the same ram trade between the master and the worker so here I have two configuration classes so let's start with the master side let's make this thing so on the master side the same flow request outbound adapter to the request queue on rabbit replies inbound adapter from the replies queue on rabbit so as Michael described on the master side we're going to read data and send the data over the wire to to to the workers so all we have to specify on the master side is a reader so I am going to reuse the same flat file item reader and just like for remote partitioning master set builder we saw here we have remote chunking master set builder so this guy can be injected if we have the enable batch integration annotation added so on the master side we have a chunk oriented step so we we read data using this wither we send request on this output Channel and we are expecting replies on this input Channel so if we think about it that's all we need on the master side read the data send it over the wire on the worker side always the same inbound flow and outbound flow for requests and replies I'm not going to repeat those we think on the worker side for remote chunking we don't have a spring batch step so you can think about them as just regular message listeners we get a chunk request with the items we have to process we process them and send chunk replies so there is no spring batch knowledge on the worker side that's why for the worker we don't have a step we have only an integration flow and for to create this integration flow we have this builder called remote chunking worker builder no there is no step in this the name of this builder there is no step so the integration flow is is defined like this so we have the input channel on which we get request from the master the output channel on which we send replies and then what what do we do for each chunk request so we have to define an item processor and writer but we yeah actually we can use an item processor and writer but we don't have to you can use any other API here we are using spring batch as a library if you want just if we have already defined the processor and the writer we can reuse them but we can use any kind of processing on the worker side so the processor is simply processing transaction here we are going to print a statement on the console and the writer is the one which will write data to the database as Michael said it's important that the cues on RabbitMQ here are durable so that if we if one of the workers fail the data is not lost is not lost so it will stay on rabbitmq and another worker can can take it that's it so let's see how it works always bring those up yeah the function of restart ability in partitioning is because of the state management in the job repository restarted built the ability for remote chunking to restart is based on the fact that your data sitting in a persistent queue so I started a first worker here and a second worker here and now I'm going to start a master all right so as we see here data is being sent over the wire and workers are getting transaction items and processing the chunks alright so looks like the job is finished here the input file is this one in more chunking yeah it contains 10,000 records that's not going to do that it's better on the command line 10,000 records so let's see if we have them there and we have 10,000 focus that's it yeah so one of the question was what if the worker one if what if one of the workers failed for the master for the what if one of the workers fail on the remote partitioning side we said that when we restart the job only the partition that failed will be restarted for remote chunking since we send the data if one of the worker workers fail the message that was taken by this worker will be sent back to RabbitMQ so we don't lose the data and another worker will pick it up and process the check request so it's a bit different than remote partitioning that's why we need a durable middleware for remote tracking that's it exactly yep it does we require that it so the question is does do we depend on on the queue to be transactional and the answer is yes we do that but if you want restart ability otherwise if you if if worker fails and your content clearing out the queues and starting over again you can do that so those are those of you two options either you have support queuing mechanisms that are transactional and that you can that we can do a true rollback with or not and if you don't that means you're gonna have to do some cleanup in between the door restart you can it's we know what we know yeah we although yep yeah so the question was is can you use XA transactions for for the workers so that the everything's in sync yes you can yep when's your exercise so imagine camels closer took questions what's the difference between apache camel this this penny camels are closer relation to spring integration than this it's more messaging and the adapters and the enterprise integration patterns that's the spring integration not spring batch yeah okay so the question is is batch still relevant in a cloud native architecture yes definitely there are plenty of use cases that batch processing makes sense this is just like a conclusion thanks everyone you
Info
Channel: SpringDeveloper
Views: 50,422
Rating: undefined out of 5
Keywords: Web Development (Interest), spring, pivotal, Web Application (Industry) Web Application Framework (Software Genre), Java (Programming Language), Spring Framework, Software Developer (Project Role), Java (Software), Weblogic, IBM WebSphere Application Server (Software), IBM WebSphere (Software), WildFly (Software), JBoss (Venture Funded Company), cloud foundry, spring boot, spring cloud task, batch, batch processing
Id: J6IPlfm7N6w
Channel Id: undefined
Length: 71min 22sec (4282 seconds)
Published: Thu Oct 04 2018
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.