ElixirConf 2019 - Build Efficient Data Processing Pipelines... Marlus Saraiva

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Music] so good afternoon so my name is Marlo so I work at platform attack and I'm here today to talk about building efficient data pipelines and elixir so before I begin I just want to say that I'm very excited to see the previous talk from Emerson and it's really satisfying to see it acknowledge that you build being used by companies in production solving real problems so that's the whole point and I'm really glad that I'm being I've been meeting people here that say oh we're using broader in production it's great it's stable and it's it's really nice to have that feedback so here's our agenda so we first gonna try to start with some basic concepts so this is important to give you some context before we move to the implementation details which are more advanced topics so we are gonna focus on fault tolerance and graceful shutdown when we are talking about those details because I think it's the most important one and it's usually the ones that are really tricky when you try to to solve this kind of problems then at last I will show you a new feature that we were working on which is the integration with telemetry so what is broader is so broad is an open source tool ever developed by platform attack that aims to streamline data processing pipelines but that's beautiful but where is a data pipeline a data pipeline is a set of data processing elements connected in series where the output of one element is the input of the next one so here's a simple example imagine have an application and you want to collect data from the users and collect statistics from the users so you can maybe later use that information to using a must learn aging or whatever so one approach you could do is actually use your may end point from an application and send that information along with the information from the application itself the problems that this approach depending on how you process that that that information might have a very negative impact in the response time because you're receiving data for an application received data that is not directly related to your application and you processing that information so maybe the response time will increase maybe a failure in one of those processing can influence the other one so a better approach would be to instead of use your main endpoint you use a separate endpoint and just send that data to a queue in this case it could be SQL so it could be wrapped in queue or whatever Kafka whatever you need so here is where broader comes in so the idea is that you can use broader to streamline that process to create that pipeline so in our example here we have all that information that you gather from the user we send it to SQL so it can be processed asynchronously later without without having any relation to the actual flow of the your application so the pipeline you need to create needs to consume those messages from SQS you will have some kind of processing you can transform the data and then you're probably going to upload that data that result the result into an s3 bucket so then later can your muscular an angel read that information into whatever it wants so why broaden why why platform attack decided to invest in such a tool one thing that realized is it was that many companies were using elixir to build processing data pipelines so we saw a lot of companies use in James stage which is a great tool actually broadly is built on top of gen stage but the conclusion nature the concurrent nature of a Lakes here along with gen stage make it actually a perfect fit for this kind of problems to define data pipe data processing pipelines so what we saw was that many of those companies were like reimplemented the same features over and over so everybody that was talking to us or implemented this like this like that and then in order to implement those features they aimed at assembling very complex gen stage pipelines so in order to get that all the features that they want they needed to create that complex adding more stages and then they start running into the same pitfalls so here are some examples of some of those features right I mean if you want to process data efficiently you need concurrency you need back pressure so you don't want to drink all resources from your machine you probably gonna need batching especially if you work with for instance s3 you can improve like an order of magnitude the throughput if you use batch instead of sending message by message so graceful shutdown is something that it's really important and it's kind of hard to implement and you also want to consume from different sources I mean you want to consume data for sqs rabbitmq Kafka and many other sources so some of those features can really make it hard to maintain your pipeline so let's take back pressure for example gen stage has a building mechanism for back pressure however if you're using rabbitmq and you want to consume messages efficiently from RabbitMQ you should not use you should not pull messages by yourself you should use an active client that will receive masters as the terms they arrive the problem is that you lose back pressure at least the mechanism that it's building for gen stage you cannot use it because you can you don't control the message that are coming so you need to use RabbitMQ prefetch count in order to do this so even though jen-jen stage has already a solution it doesn't apply for abdomen kill if you want to consume that efficiently batching I mean if you batches another example if you want to to create batches from your messages efficiently you need to create more stages otherwise if you try to do in the same stage you're gonna just stop the flow to create a batches which is bad so you want concurrency graceful shutdown gets even more interesting because you're gonna need an external process to control that if I want to shut down my pipeline without losing data I will need an external process to do that to synchronize how each step of the pipeline can safely be shutdown flush their message then going to add another one so in this case it's interesting because that process that that process doesn't belongs in a pipeline but it has to belong to the same supervision tree so it starts to get really complicated as you add more features so in order to define those pipelines basically to answer those three questions which are basically the the questions that during development we were faced I mean we need to solve those problems so the first one it's how to define the right topology for the pipeline and in case of broader is even more complicated because broad is supposed to be a library so we don't know actually the problem you're trying to solve so you need to have some kind we need to have a way to define the topology and work for many different cases how to structure the supervision trick correctly I mean we want to have photo Larssen in a pipeline I mean if I want to to if I have a failure in one of the stages how should i how the supervisor should work in order to restart parts of the pipeline parts of the system that could come back in a stable State so we're going to start with the first one and how we would dress this embroidery so how to define the right topology for the pipeline the idea is quite simple what you have in Broadway it's a configuration you have a behavior and then you create the you let broadly create a pipeline for you so instead of using gen stage and manually creating your pipeline you have a standard way to define the topology it's using a configuration and then you implement a behavior just like Jen's a gel server you have callbacks implement those callbacks you inject your coal and then the pipeline will behave as one so here's an example that's how it looks like so you use broad day which is a behavior you you pass the configuration you want which will define the topology and you implement the callbacks so that's the code that has your business logic so taking a closer look this would be a configuration a very simple configuration this is the the simplest topology that you can create a broadly so we just have we defined one group of producers called defo the module of the producer it's a counter counter can be Gen staged producer if you already have one and dep that group of producers we have two stages in the processes we also have a group of process that's called defo with three stages so it's a very simple pipeline here we have a more complex pipeline because now we want to use batches as I told you you can get an order of magnitude improvement in throughput if you use batches when working with s3 most of the service from Amazon you can work with batches so in that case the topology is going to change because now I'm defining two batches so the first one I'm just defining one producer and defining two processors then I'm defining one batch one batch are called sqs with two stages so I'm going to start receiving messages I'm going to process those messages and depending on some kind of information that I can retrieve it from the message I will forward that message to sqs in some other cases I want send them to a stream so that's what's going to happen so those messages will be forward to write better according to that information so here's a quick demo to demonstrate how the how you can define a topology using through the configuration so probably better come to this one this is a dashboard a simple dashboard was implemented using live view and for now we just care about this one which is the configuration we have only four options here but since everything to that defines the topology is in that configuration you could add all the other options here so you can change the topology okay said okay let's three producers four processes three bachelor consumers for s3 a maybe no for first us so and then now we have which you change the topology of the pipeline so this pipeline probably gonna going to be able to process much a large number of messages at least in compares to the first one you need to forgive me about my voice but I recently had damaged my vocal cords so it's been really hard for me to get to this tone so please forgive me I'm not fully recovered so we're talking about two parts when you you need to import to do two two things when you using Broadus so the first one is to define a configuration and the second one is to implement the behavior so we need to implement the callbacks so one important thing that we need to to pay attention here is that unlike general server where the callbacks will run in the same process or handling info handle call whatever will run in the same process with Broadway it's not like this each callback will be executed in a different process for example the handle message is the callback that will be executed for each message that will be retrieved from the producer so the producer will get 100 messages so it will run handle message for each one of them and in this case they will run in the processors in this example we are using batcher so inside the handle message is where I say okay I want to forward that message if it's a message that contains this kind of information when to send to excuse otherwise I want to send to a stream so now I said it's going to be forward to a different process in a pipeline that's three you don't have a head of batch so handle batch will run in another process which are the batch consumers which is like which is the best processors actually so if the badger is sqs is going to execute this handle batch in one of those processes if it's a stirry it's gonna run in another process so this is important to pay attention so you cannot you don't have state embroidery it's not like a general server when you have the state in you can happen at this stage you cannot have state here in your student so we want to quick demonstrate this the batching how'd it work so we can change here and then we can send some messages let's send 5,000 messages we just have a naive implementation of Fibonacci just to simulate CPU processing here can just define the percentage of mass that should be go to the s to ask us or to s3 in this case you're gonna say that 100% of the masses should be forward to a stream so as you can see the whole flow it's always going to s3 you can see that the sqs batcher doesn't even change color because it's not processing anything so you have the producers that produces a retrieving message from from Repton queue and then it could go to any on any one of the processors but then inside the processes we say okay this one should go to ask us or should go to a stream so with this we answer the first question how to define the right topology for the pipeline now in each race in the second one how to struck the Supervision tree correctly so here's an example of a Supervision tree of a broadway pipeline since it's a lot of information you're gonna say we're going to take a look at each supervisor individually so the first one's domain this domain supervisor we use is a strategy rest for one and you can see that we have a supervisor for each of the steps you have a producer supervisor processor supervisors and a batch supervisor the last one that terminated we're gonna we're gonna talk about it later on the talk but for now the one thing you need to have in mind is that we use the restful one strategy for that the restful one strategy works like this if one of the children fail the children will be restarted in every in all the children define it f the distillery will also be restarted so in this case if the process supervisor fail because the student also failed or for any other reason all the other children will also be restarted and this is a very conservative approach but we need this because it you shouldn't those things shouldn't actually crash because those are generally code and even a code that's not generic if you using the the other code that you have inside the callbacks they are wrapped by a track guide so you shouldn't have any trouble here so if you really have a trouble if that process fail it's better to just restart everything and try to get to a stable state again so the producer supervising the producer brother supervisor takes another approach because each producer if I say I want to have five producers to retrieve message from RabbitMQ each one of them are completely independent for each other so each producer will hold his own connection and open his own channel so when it's better to use the one for one because in the case of one for one if one of the children dies only that you will be restarted by the supervisor which makes sense since they are independent they don't share state if one of them fail I want to keep receiving message from the other ones the processor supervisor use another strategy we already used it the three strategy do we have four four supervisors in this case we're going to use the one for all as I told you all the code that runs inside the callbacks it's wrapped in a try-catch so we capture any kind of error that could happen inside the callbacks which makes sense because all code you have in brodo belongs to us but the code that runs inside the callbacks it's the code from the user we don't have access to the code so it makes sense to wrap it and if there's an arrow we're going to to mark the message as fail it and going to move on so in this case if something happens if there is a crash in the processor it's something it's a buggy in Broadway so it's not supposed to happen so in that case if that happens the one for all will just restart everything and in this case since you have max restarts zero it will actually shut down you actually restart the supervisor it says to have rest for one it will actually restart the whole thing better petition supervisor also use the one for one just like the producers why because each bachelor is completely independent for each other they don't share state so we can safely restart one if it fails we don't have to restart the other one and then you have the Badger consumer which is again rest for one because you have the Badger which is one process that we saw in the pipeline before you have the the other process that will process those batches which are the Badger consumers so if there is a problem in the Badger I can no longer trust the information that I have which makes sense I mean it's a pipeline so just like a pipe pipeline in the fracture if one of the steps fail the other steps are compromised so if the Badger fails I have no idea that if the information I have it's safe so failure the Badger you should also restart all the consumers and the last one is is the wolf is the consumer supervisor so it's one for our again just like the processors why because it's not supposed to crash there is no code there that should run there is not trapped in a try-catch so if it crashes it cry the problem was in Broadway and if the problem is in probably broadly it's better to restart the whole thing even the supervise itself so this we will sir the second questions how to extract a super visual treat correctly so it's a very conservative approach because we know what we have we know the code we have and we know what parts of the code doesn't belong to us for instance the produce so we don't we don't have access to the code the producer the producer can be implemented by anybody it could consume data for anywhere so it can crash the consumer might hold the connection the connection might be unstable and in this case we expect an arrow but otherwise in other places like in the processors or the Badger consumers we don't SPECT an error the last question is how to hit a graceful shutdown without data loss and that's where we come back into the Terminator I told you before that in order to have graceful shutdown you cannot trust in your pipeline you need a separated process to synchronize how all the stages should be shut now so what we want to do is that ok if I want to upgrade my system I don't want to lose messages or maybe I don't want to duplicate message it will depend in what kind of server you use and what strategy you're using but if I just want to upgrade my system I don't want to be worried about if I'm gonna lose matches or not so basically the grace of shut down it's ok I'm going to tell the system that it's going to be shut down but I want to process all the message there are in the pipeline and it's important to notice that you might have thousands of messages in the pipeline because each stage has a buffer and you might have a lot of masters in those buffers so it's an important feature for us this was something that I remember talking to Tirzah he said no we have to do this because this is there is this probably the single feature that most people get it wrong so the first step for the grace of shut down for their terminator is to to tell the processors to stop subscribing to the producers why'd that happen because when you we try to do graceful shutdown since we have a lot of messes in the pipeline it might take a couple of seconds to process all those messages and during that period of time maybe a connection failed we're gonna get a new producer going to be her starter by the supervisor so if that happens doing the chef down the processor we try to resub scribe again so the first important thing to do it's eight processes do not resupply the second step is then cancel and shut down the producers so in order to do this we need to tell the producer to first stop receiving messages so we have a callback if you want to implement a new producer you have a callback which is called prepare for draining where you tell the Terminator tell look producer please don't get any more messages because I'm going to shut down the pipeline then we tell to stop accepting the man so there's no cease I'm going to shut down there's no need to get more than if I receive them and I'm just going to ignore it just go to accumulator that the man because I'm only going to process the message that I already have it then you have to flush the events in a buffer right so we don't want to lose any message and then finally we cancel the consumers so the last step is to the Terminator monitors the last set of stages from the pipeline right because when you cancel the producers when we're in gen stage if the if a consumer realize that all its producers were cancelled it will cancel itself it will process the message that it has already and then we'll cancel itself so with this we actually when you cancel the the processes we actually create a cascade effect so after canceling the processes then the bachelor is going to realize oh there is no there's no more producers in this case the producers of the bachelors were the processors so in each type of the pipeline you just start to flush their messages processing their messages until the last set of stages which is the the one that processes the batches will also be cancelled and died so in that moment since the Terminator is monitoring that in waiting for them to die then you say okay now I can I can also die and then finding the producers so if you take a look at the the supervision tree and the rest for one strategy that you use you can see why the Terminator is the last one because in OTP when the supervisor is going to shut down his children it's always going to shut down in the reverse order that they would created so what are you doing here actually it's cheating the default implementation of the shutdown so the supervisor got a signal I'm going to shut down my children when the signal gets the Terminator which is the first one that should be shut down the Terminator actually trap exits and then inside the Terminator call back he will do all that things that we solved before so you cancel the producers as to her subscribe stop receiving the man so in somehow the Terminator actually do a cheat the supervisor say look wait for me don't stop the other children I'm gonna do this one thing that you can see here it's that why it's important the graceful shutdown we're gonna we're gonna I'm going to send some messages here let's say twenty thousand messages so the pipeline is working right so now she going to purge this and reset because I want to have the okay I want to have Odin zero so so what you're going to do here is to send 10,000 messages right and why the pipeline is running we actually going to change the topology we're gonna say okay I'm going to have one producer limpets five maybe three here whatever and let me change the topology while the pipeline is running as you can see and it keeps actually working because we have graceful shutdown in order to do this we actually shut down the pipeline at any Stargate with the new configuration that's much easier than try to synchronize this without shutting down the pipeline so as you can see I sent 10,000 message I changed the topology of the pipeline and we got all the 10,000 messages without losing anyone so this way we answer the the third challenge that we have so with this I think it was the basic that I think there was the important aspects of broader that I think it's you should know about because this is our the trickiest one that we need to face when you want to roll your own solution and the last thing I want to show you today is the new future that we're working on which is the integration with telemetry we actually already seen this in the demo so as I told you this is a dashboard July vo dashboard and we have two different kinds of metrics you have the throughput that it's actually how many masses you can process for a second and then you have for each stage you have a relation between the processing time in the idle time so that can help you to to find bottlenecks in your pipeline so we can try very quick here let's see let's start with a very bad pipeline and let's just send 100,000 messages Oh actually we're gonna purge this because it's everything set to the 2s3 you don't want that so we won here it's a maybe send twenty percent of those message to s3 that would be better yeah maybe 100 here so you can have simulate IO just just put in a delay in the batch of processors okay so I sent the message and as you can see it's a very bad pipeline I mean you can only get to 120 messages per second because you only have one process in each step and you can see even the second that the second one which is the processor it's two blinks so you're actually not using the the CPU you can also see in the CPU the usage of the CPU that there are a lot of things that you can do that bottleneck here it's the last one that you can see that doesn't even blink is the red one in the end it's the last stage which is the the batch processor so what you can do is try to increase that so let's see we forget some some improvement all right we got some improvement so now you can see the bottleneck is not the batch processor anymore so the bottleneck is probably the processor we have only one processor which is red I mean it's processing all the time so we can do something about this as well you can increase the number of processes then you can get some better results you still can see that the processors are not there is two blinking right so we probably can add a few more producers maybe we can get a little bit more because and maybe can also add a little bit more here then so this is the integration that telemetry that are working on they're still we're still playing around and experimenting some of the metrics but the goal here is actually give you a set of tools that you can use to improve your pipeline because you need to know exactly what to do when you're working with with with this kind of situation you need to know your data you need to know for instance if you just want to set 100 producers it's not going to be effective because maybe a pipeline cannot process all that information and if you use your epidemic you for instance for each connection that you have with the server it will consume a couple of megabytes in the server so with 100 producers you just gonna have a lot of memory in the server and then you start to create pipelines your memory is gone so this is where we are heading and this is what we're working on right now so this is basic what I have for you today and just quick what's next it's a metrics and statistics that what working on multiple processors Kefka connector we are in working on it rate limiter and there's a bunch of other features so if you if you want to discuss any other new features like Emerson told you a lot in this presentation there's a lot of features there's a lot of issues that are open in on github if you're interested you can look at the issue tracker follow the discussions and if you have any questions I'm going to be around if you want to talk about broader or anything else just come by and say hello so thank you give it up
Info
Channel: ElixirConf
Views: 5,319
Rating: 4.9595962 out of 5
Keywords:
Id: tPu-P97-cbE
Channel Id: undefined
Length: 39min 49sec (2389 seconds)
Published: Fri Aug 30 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.