High Performance Data Streaming with Amazon Kinesis: Best Practices and Common Pitfalls

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
alright looks like we're gonna get started here hello everybody my name is rainy originally I'm a principal solutions architect and today I'm going to be presenting probably the longest title slide on a show on Kinesis so we're going to talk about best practices with Kinesis we're gonna go over some of the overview of Canadian streaming first just to make sure everybody is up to speed on what we're going to be talking about deeper we're gonna build on yeah on each step of the way showing how we can leverage Kinesis data streams integrating that with Kinesis data firehose and building out solutions to not only respond to data in real time but also land that data already pre processed and and curated into your data Lake solution that you're working on and then finally at the end of this the slide deck there'll be a link for a takeaway notebook so you can do any of the demos that we did here today through the the slides and so let's start with just streaming data overview really quickly so the reason why real-time streaming has come about is because we want to respond faster to our customers needs that may be building out mobile applications and being able to respond through notifications in real time it could be generating clickstream data and capturing that and reporting on and utilizing that for feedback back into your products it could be doing log analysis so capturing all of the log data you have from your applications and providing insight on that many of you may be interested in smart cities IOT solutions all of these have a real time component to them and this is really where Kinesis can help you deliver those solutions and when we talk about the time frames for each of these types of workloads we're talking in not only milliseconds of times where people typically see real-time streaming is instantly having this data back but it could also be down into the minutes range to do some ETL processing before landing that data into your data like and a little bit of everything in between so hopefully maybe some of these use cases fit you know what you're looking to deliver egde with Kinesis as well but what we found in building these real-time streaming solutions are there are some real challenges to to overcome to make them work consistently to stop being such high touch applications and allow them to be automated out more a lot of these solutions have tended to be typical typically difficult to set up they're hard to scale as your business scales having that high availability therefore you requires a number of components to be available durably coordination things of that nature and that ends up leading to being very expensive to maintain as you're you're deploying and scaling these solutions out and this is really why we came out with Kinesis and with Kinesis we have a number of services within the suite of tools here we start with the Kinesis data streams this is going to be your typical producer-consumer semantics of sending data to a stream and then processing it with a consumer on the back end next up we have the Kinesis data firehose the firehose is a solution that allows you to figure out what persistent data store you want to land that data to it could be s3 it could be elasticsearch you could be into redshift into your data warehouse solution and point it to that and have that data delivered directly to there we also have Kinesis analytics and this has the ability to in real time inject itself into the real time stream to do aggregations analysis on that data leveraging either sequel or java applications to do that processing and maybe curate and and persist to another stream of data and then we also have support for video streams as well but for the majority of the talk today I'm going to be focusing mostly on how you can leverage both Kinesis data streams and Kinesis data firehose and all of the different configuration options that are there that you could leverage for your various workloads that you're going after and hopefully what you'll find out of this is there's some real key benefits to using a managed service like Kinesis because there's a lot of things that you will no longer have to worry about things like provisioning infrastructure scaling these solutions up as your business scales if there is no streaming going through the system at a time you're not incurring costs because you're not utilizing those services and you'll see that they're very highly available as well as secure you can encrypt your data at rest and it provides the ability to ensure that the stream of data keeps going and when we talk about streaming you know we need to get some of the terminology out there so that everybody's familiar with it what we're going to be talking about is on the producer side there's a number of tools and utilities for ingesting data into your streaming solution and with Kinesis specifically we really break it up into sort of three different categories we've got the you know toolkits and libraries that we have support for the CLI is the SDK we have a Kinesis agent that you can leverage which is an agent you can install on your systems to be able to aggregate the data at the source and then persist out in batches as well as the Kinesis producer library that can do similar functionality when we're talking about some of those workloads like smart cities or log and aggregation and analysis we can leverage our IOT service as well as the cloud watch events and logs as you're pushing data there to push them through the real-time streaming solution to provide new insight there and then there's a whole slew of third-party offerings in the Big Data space there's a number of open source tools commercial tools out there I'm going to call out a few of them here that we see a lot of use for them is the the integrations with log4j if you're building java applications maybe you're using flew more fluent d to to pull that log data off of those source systems and throw it into the stream now on the backend side of that for that consumption of those of that data we again break it down into three different categories we have with Kinesis as I talked about with Kinesis data analytics you can use an z sequel or you could use a blink service to pick up that data and process it we also have a Kinesis client library that will allow you to create your own applications off of it and scale those out through leveraging ec2 instances potentially auto scaling and have that client library process that data what we're going to be focusing on more though today is to really show somewhat of a server list process to build out these real-time streams and we'll be using AWS lambda as the consumer of the the Kinesis fire data stream that we're leveraging and then again just like there is a number of third-party open source ingestion mechanisms there's also a number of third-party sources for consumption of that data the most popular out there probably today is Apache spark being able to utilize spark streaming off of a Kinesis stream to process that data but there's a whole slew of them as you move forward things like Apache storm and data bricks to do that processing as well and so to kind of get started I'd like to start with the Kinesis data streams side of processing and we'll walk through what it takes and and some of the the guidelines around building best practices into building out your streaming solutions and so as I mentioned Kinesis data streams it's very easy to get started with we'll walk through what it looks like to create that through the console and then start leveraging that that stream it's real time it has elastic performance you can utilize application auto scaling to increase the number of shards that you're using as the the number of messages through the system increases and so walking through what the console looks like today what you would do is you would go into the Kinesis console and you're gonna create a data stream you're going to give it a name because we need to know where we're putting that data and then finally you have really one lover that you're working with here and that's gonna be the number of shards that you want this data stream to have and the shard itself is a unit of scale for determining how much data can go through that stream at any given time and so you can see we've got a an estimator there so if you know the types of data that you're pushing through there you'll be able to figure out how many shards you need for this is it for the particular stream and I'll show a diagram that will explain this a little better here shortly so some of the key metrics to think about when you're building these Kinesis data streaming solutions are you can have up to one megabyte or a thousand records per second per shard so if you're thinking about the types of systems that you're building real-time screaming you'll need to have an understanding of how much data you're pushing through so that you're not throttling at the stream based on ingestion that you're working with and on the consumer side the consumers are going to make a call called get records and that's going to pull that stream / shard to get that data to start consuming each of the shards within the stream and so the data will be returned and with a single consumer on a standard Kinesis data stream you can have two megabytes per second per shard through there and you can have up to five transactions per second per shard so on a single on a single consumer you can pull about every 200 milliseconds that's where it stands today but what happens if I want to have a number of consumers on there so now what will happen is and things you'll need to be aware of is the the throughput of each of those consumer applications goes down to about 400 kbps as well as the propagation delay in how often each one of those consumers can poll will be affected as well so if I have five consumer applications on this particular stream each one of those consumer applications will only be able to poll for that data once a second so those are some of the limitations within the way standard consumers work today and so some of the I mentioned pitfalls in the initial slide some of the pitfalls to be aware of are things like poison messages so in this example here I have 300 records that have been pushed through the Kinesis data stream and I have a lambda consumer on that when you configure a consumer you're going to say what the batch size is for the number of Records to pull back from the stream to process each time and in this case I'm saying I want to pull back 200 messages so it's going to invoke a function from lambda to do that processing and let's say for some reason it airs out processing one of those messages within the batch so it won't have the ability to checkpoint back to the service to allow the stream to continue processing that data and what will happen is when it invoke a new lambda function it's going to invoke it with that same 200 records and you're as you can see going to get the same problem it's going to have that poison pill message so you're really going to want to be aware of how that lambda function how that Kinesis consumer is processing those messages and be aware of that and that will continue on until that message expires out of the data stream which would be anywhere from the 24-hour period that is the default for Kinesis data streams up to seven days so one of the things that you really want to be cognizant of is capture and log the exceptions to be able to process those poison pill messages at a later time so if we take that same example example and now in the lambda function maybe we have a try-catch or we're catching the air that we can't process that message let's just log that out to cloud watch logs maybe we write it out to be picked up by an alarm or some other you know mechanism that you have and continue on processing to return the successful response from that batch of Records that you're executing on because now what will happen is once I've processed those 200 records I can go and take that other hundred records in that batch and invoke the new lambda function to process the rest of that stream and so from a lambda perspective how do we create these consumers and I'm always using the examples of the the console but but keep in mind typically you're going to do this in script you might be leveraging cloud formation templates you might be leveraging the newly newish announced cdk2 to have infrastructure as code but when you're starting to gain you know just kick the tires on how some of these services work it's always good to just you know go through the council and figure out how some of these things work so in this example I'm gonna leverage one of the existing blueprints that exists out there it's the Kinesis process record in python i chose Python it could have been no js' any number of languages there and what I'm going to do is I'm going to give that function a name because this is the consumer I want utilize for that Kinesis datastream I give it a roll that's going to allow me get record access on that stream in this example I have a sample Kinesis iam roll and then you're going to select the stream that it's going to process that data from so so as you can see here I want to grab it from that demo stream that I just created earlier and I want to have a batch size of a hundred records at a time and now I can go in there and and start that lambda function and it will start pulling and consuming that data the last piece there is the starting position which could be important depending on how you're leveraging the applications so as you're writing records out to the stream it's keeping a pointer of where it has finished processing in the consumption of that particular stream on the consumer side so I can say you know what this is a brand new consumer I don't care about the data that was at the very beginning just start with the latest messages that are in here and start processing that data or if I want to make sure that I start with everything that's in there I can select the trim horizon and pick up from the beginning of the stream as opposed to at the end of the stream and so what you'll have from a code perspective is you'll have a very simple lambda handler here and what you can do is the event contains the number of records based on that batch size that you can loop through decode that payload and then start doing your processing within there so this is really the the the totality of default codes you need to get started with a Kinesis consumer leveraging lambda everything else will be taken care of for you and as we talked about the Kinesis data streams standard consumers we had some of those limitations you could only process you know five transactions per second per shard only two megabytes per second per shard across the board and so a lot of our customers we wanted they wanted to add more than just you know five consumers they may have you know 20 consumers that they want to have leveraged that same data stream and not look at things like fan-out strategies to push data into other and and processed the data that way so what we announced were enhanced fan-out consumers and what that does is these consumers are no longer going to be polling they're going to be requesting to subscribe to that shard and start processing that data so what happens is you'll see us subscribe to the shard it's going to leverage in HTTP to mechanism to start GATT aggregating that data and as new messages come in it's going to persist that shard to disk so we have the durability there and then it's going to start pushing those records in an array to that consumer to be processed and so what does that look like if I start adding multiple consumers on there before we had all of those limitations with the enhanced fan-out I can register a number of consumers each of those consumers are going to have their own enhanced fan-out pipe where that data is being delivered to where that subscription occurs from that consumer and it's going to leverage up the full two megabytes per second for that consumer itself instead of sharing those resources like the standard consumers were so if I have another consumer on here it's going to go through that same process a new efo pipe is going to be generated and we can start streaming that data to that enhanced consumer and so with lambda because lambda was a polling mechanism when we created that blueprint what we can do is make some modifications to that original code that we started with and what what will happen is the lambda service will subscribe to that stream just like a consumer application would and then based on that record size it's going to batch those up on the lambda service and invoke a function for you so you really don't have to change anything within your lambda code to take advantage of that what will have to happen though is you will need to register as an enhanced fan-out consumer and so the way you know typically you would go about that is I showed an example with the CLI here but you can leverage the SDKs as well is you're going to register that consumer based on a name that you're going to give it and then point it to the stream Arn that you want to register it with and what will happen is on that Kinesis data stream you'll see in the enhanced fan-out tab there that you do have a consumer registered so now I can go back to that lamda code that I just built and select a consumer in that second option there of that last update consumer and now I've converted that lambda function from a standard consumer to an enhanced fan-out consumer so that'll give you them the ability to scale beyond the limitations of the standard consumer so then typically the next questions that come out of this is so when do I use which one right there's a lot of different reasons why you would pick and enhance consumer fan-out consumer from a standard consumer but I've kind of really broke it down into these few options here a lot of times you want to leverage Kinesis data streams because you want to have that real time streaming element and separation of services but maybe you don't have a lot of services maybe you don't have a lot of consumers connected to that particular stream look at using the standard consumers and see if you're getting the performance you're looking for you may not have latency sensitive solutions and maybe you're really wanting to optimize on cost but maybe you do want to be able to access that data faster be able to scale out the number of consumers then you're going to want to look at the enhanced fan-out solution for consumption so as some of you may know yeah we really take care and understanding about what our customers are looking for in the services that we build and you know up to 90% of our roadmap is driven by feedback and so when Kinesis data streams first came about and and I was one of the lucky few as a client as well before before I was here at Amazon I was using Kinesis data data streams to just aggregate data in memory wait a particular period of time and then aggregate that out to s3 for the the initial creation of the data like that I was building and what what the Kinesis team found was I wasn't the only one doing this there's a number of customers that were duplicating that same functionality maybe buffering data for the number of megabytes they're leveraging buffering data based on a time frame and then persisting that out to help simplify that persistent process and so that's really where Kinesis date a firehose came konista state a firehose has all the same semantics when publishing data to it as a Kinesis data stream but the difference is you're not building custom code for the consumers of that particular stream what you're doing is pointing to a persistent store and letting you know letting the data buffer in the firehose before it gets persisted out and the three options that we have today are s3 for object storage to build out that data Lake strategy you can load directly to Amazon redshift through that data stream elasticsearch and then with one of our partners Splunk we can push the data into a Splunk instance as well and so some of the key features of leveraging this fire hose is you have that raw data coming in and a lot of times with maybe that consumer aspect you're doing some work to enrich the data maybe filter that data with the Kinesis data firehose you're going to have the ability to in those batches that were persisting out execute a lambda transformation on them and do things like maybe in the data stream so in this case I have raw data coming in that has an IP address in it and maybe I want to do some geo IP lookups to add on that metadata that it was you know from Boston Massachusetts I can leverage lambda with the Kinesis data firehose to enrich that data before it persists of whatever data store I'm using but I can also filter that data too maybe I'm pushing log data in and I don't really care about you know the info level logging the warning any of that stuff but I just want to capture all of the airs and then maybe send it back to the app dev team and say fix this right you know I can take the Kinesis data fire hose apply that lambda transformation to filter out only the air types that get persisted to the data store that I'm working with and finally I can also convert the data stream so maybe I'm capturing Apache logs Pachi logs that have a row format that isn't really structured I wouldn't even say it's semi structured maybe a semi structured give or take but I want to convert that data into a JSON document before I land it into my data Lake the data warehouse I can use that same lambda functionality to convert that row based data into a JSON document and finally one of the features that we just released a few months ago I believe it was is the ability to do record format conversion and this this becomes really critical when you're building out a data like strategy and you you want to leverage a columnar format to improve the performance of the analytics engines that you're querying with on the back end and so with record format conversion it's going to rely on leveraging our glue data catalog and and for those that aren't familiar with what our glue data catalog is it's a hive compliant metadata repository for the information that you want to persist into your data Lake and so the data the glue data catalog will have a schema for what that messaging looks like through the firehose and I can convert that data if it's in JSON format into park' or o or into o RC so depending on which columnar format you're leveraging you can do that and some of the benefits of that is I can really simplify a lot of my ETL processing not having to leverage maybe spinning up an EMR cluster using glue jobs to execute these things and if I have some simpler formats and not having to do a lot of ETL transformation I can do that directly in the stream and land that data in park' format so it's ready to be utilized by you know Athena or redshift thru spectrum or or EMR any number of partner tools that we have out there and the nice thing is if any for any reason in any of those records fail to be processed those records can be pushed into a prefix in the s3 bucket in a failed status so you can go and investigate that deeper and then another option that came out again just a few months ago is the ability to have custom s3 prefixes for that data running through your Kinesis data firehose so if anybody's use the Kinesis data firehose before the the standard format of delivering data into s3 would be in a year date month format with just those integer values within there and so they weren't hive compliant prefixes to allow easy partition ability of the data for many of these engines and so in this example here I can have that JSON data I can define the custom prefix and I'll show you what that looks like in a moment and now when that data lands instead of just being in year month format I'll have it in year equals of the year month equals the month date equals the day and so on so that that's yet another step that I don't have to take to do some conversion after I've streamed that data I can do all of that in line and so you'll have hive compatible partitioning for the naming conventions of the data you're landing for your data like when leveraging s3 and that was a lot of words and no you know if you're you do have a if you are going on a data Lake journey you know you may understand some of the best practices around that in storing your data and a lot of times you may hear gold silver bronze or you may hear raw data processed data curated data well a lot of times you want to save that raw data and just a minute ago I said well do all these conversions and who cares about the raw data well one of the things they've added is the ability to have a source record backup for that Kinesis firehose so if I do a lot of that conversion converting into park' maybe stripping out some columns doing all of that data manipulation that ETL inline i can still store the raw data in another location so in this example here maybe I have a lambda transformation calling out to an enrichment service I want to process that data put it in parque format but then finally I want to be able to come back in and store that raw data for consumption by Yahoo ever it could be your bi folks it could be a data scientist looking at the raw data for a machine learning model they're looking to go after any number of of reasons to store that raw data and one thing I don't have on on the slides but to really kind of call out is as you're storing that raw data right now I have it shown in s3 you can always add our lifecycle transitions from the different storage tiers that we have so maybe you keep that raw data in for 30 days 90 days but maybe then push that data into glacier for long-term storage maybe you have a compliance reason that you need to do that you can set those lifecycle policies in s3 so that you don't have to manually go manipulate and mess with the data right we want to really work on having low touch performance of building streaming solutions into a data Lake and so now what does that look like well the first thing I need to do as I showed you I want to convert the data into park' and into doing that I need to use the glue data catalog so if I go into glue again seoi sdk this always applies I can come in and I can create a database just giving it a name web logs because I'm going to be processing some Apache web logs I can then use something like Athena or I can use the Korea table API call with glue to create the DDL of my parquet version of those streaming logs and so what you'll see here is I've given it a number of attributes that it has columns for request path the request size the host address and then I've also included the partition information inside of there so I want to partition this data by year-month-day an hour now that I have that definition defined in the glue data catalog you're gonna see a table sorry a database I can use that database I'm an ant table and specify that I'm leveraging parquet and when I dig into the details of that I'll have all of that information up front to see where I'm going to process that data in a particular s3 bucket I'm going to see that it's parquet I can see the full schema and partitions of that table where I'm going to use Kinesis fire hose to land that data so I can pre prep all of that stuff so now just like I did with the Kinesis data stream before where I created it I gave it a name but in the data stream I was defining shards because fire hose isn't have to work with that mechanism it's working on persistent storage I'm going to then have an option for either directly putting data to this but even better yet to be able to chain these solutions together I can select that data stream that I defined earlier and say anything that gets streamed to there push it into this fire hose for me to do some work on it as well and the benefit of that is with the Kinesis data stream I can hang off consumers to do real-time notification back to applications real-time processing for any of that data coming in but I can also capture all of that real-time data in the fire hose to persist out for later usage and so the next step I'm going to do is I'm going to enable that record transformation because what I'm going to be pushing through here is an Apache log and if we remember from a few slides ago those logs are in this row based format and I want to take that I want to filter out some of the call that came in there to match the DDL that I had created in the glue table and then converted into JSON so I can do the automatic record transformation into parquet and so there's a lambda function with you know 15 lines of code to do that processing and now I want to enable record conversion so I here I can enable it and I have the solution options for parquet or leveraging oor C and then I want to finally give it the location of where I want to land that data and show it the table metadata that I'm leveraging in the catalog by selecting that web logs database selecting the P underscore streaming underscore logs table and I want to use that as the latest version of the that I have in there to match up when it's doing that Park a conversion because I'm doing the park a conversion typically all of these would be enabled if I was just creating a standard data firehose here I only have s3 enabled because support for that Park a conversion is only for s3 today so the other three options will be grayed out for you I give it the bucket so I know where I want that bucket to be I want to leverage those custom prefixes because I don't want just the year date month in the prefixes I want to specifically call out year equals and there's a variable that I can leverage called timestamp which is the approximate arrival timestamp of the record that came into the Kinesis stream and I can use that by grabbing the year the month of the day and I can also create there's another variable in there called random string so if I want to generate a random string in that path when I'm landing it I can do that as well and when I do select these custom prefixes I'll have to select an air prefix as well so this would be for any of those Park a converted records that failed for any reason I can put those in this failed prefix and decide how I want to process those you know secondarily after that and now that I've done all of these different conversions I've basically thrown the kitchen sink at you of all of the options that exist for the Kinesis data firehose I want to keep in mind that I also want to keep that source data that that I had go into the stream originally and so I can also enable that source record backup I'm going to select a bucket and then I'm going to select a prefix of where I want that data to land and all of that once the the firehose is created will be taken care of for you from a configuration perspective you can then just focus on the business of pushing your data from whatever ingestion sources you have and will take care of making sure all of that process happens downstream and then with the firehouse as I mentioned because we're not working with shards anymore we're working with buffering of those records before persisting you're going to have a buffer size that you can define and you're gonna have a buffer interval and the way it would work is whichever one it hits first is when that data is going to get dumped to the persistent store that you've selected so if I have a 128 megabytes of data I filled up in less than 300 seconds that data would be persisted but if not we don't want to leave that data just sitting in the buffer never filling up maybe because I'm not streaming a lot of data to it once I have any data inner if I hit that five hour 30-second mark I'm gonna sorry 300 second mark I'm gonna persist that data out so then I'm going to show you this lovely picture of creating the stream that's going to go take all of those configuration items make sure everything's ready for you it takes a couple of minutes to get the stream set up now maybe 30 to 30 seconds to 90 seconds and you're ready to go so now I can start streaming that data and after I hit those buffer interval limits I then can take a look at you know querying that data with things like Athena so now that data is in my Glu data catalog I can see the web logs that I created that database that I created the P underscore streaming logs table that I created and then through all of that relation I have the results out leveraging Athena one of our server lists ad-hoc query engines so that I can completely build out this server list data like architecture and so to kind of you know summarize all of the different options that we've leveraged there what you can do is really build these server list data like congestion architectures by just leveraging the Kinesis suite of tools here I have an Apache web server I can side load the Kinesis agent and point to those Apache logs for the web server that I'm leveraging and I can push those logs that in that raw form to the Kinesis data stream from there maybe I have a consumer that I want to take a look at and filter out maybe just the 500 errors and you know report them to simple notification service or aggregate those out and persist them somewhere else or feed those back into a web application a dashboard whatever the case may be I have the capability of doing that but then as I mentioned I can now connect a Kinesis data firehose to that Kinesis data stream through just configuration so there's no infrastructure to maintain there's no patching you're doing you're literally just configuring these resources that we're managing for you to do these actions I can utilize those lambda functions for record transformation I've got a processing service that's doing some work once that comes back in I can persist and convert that record into park' format leveraging those custom s3 prefixes so now I have data in s3 ready to be utilized in that data Lake fashion and I can also store that raw original Apache log data and that data can then be utilized just like the park' data in your data Lake strategy that you're leveraging and so I wanted to talk about all of these different options but I also did want to show finally that not only did we build these for our customers to leverage we also utilize these in-house as well things like AWS metering the s3 events so we weren't only the creator of these but we also all clients of the services that we build as well so we want to make sure that that they're meeting the expectations that you have when building these solutions so with that that's all I had for today there's a demo link up there everyone take your pictures maybe I can jump in and it's a jupiter notebook that's going to walk through each one of these steps that I walk through leveraging Python so that you can go and exercise these in your own accounts and see how all of these things work so with that thank you very much appreciate the time and enjoy the rest of the conference
Info
Channel: Amazon Web Services
Views: 50,544
Rating: undefined out of 5
Keywords: AWS, Amazon Web Services, Cloud, cloud computing, AWS Cloud
Id: MELPeni0p04
Channel Id: undefined
Length: 38min 5sec (2285 seconds)
Published: Thu Jun 20 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.