Rob Story | Data Engineering Architecture at Simple

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
I'm a data engineer at simple my twitter name is at ocean kid Billy and I'm going to talk about it engineering architecture I'm gonna focus on a little bit of Python but a lot of sort of higher-level work sort of how we've structured our pipelines how we've built a data engineering how we use the warehouse and I'm gonna talk a lot about mistakes we've made things that we've learned because we did it wrong the first time so I have a few examples of that but what I really want to start with sort of leading question is why does simple have a data team like why is there a data engineering team at symbol what is our goal within the company because if you look at engineering as a whole and like the functional teams it's pretty clear what all these teams do right so back-end actually builds core banking services front-end and mobile build clients you know if you if you go to simple comm and sign up for a bank account and you log on to your app you know you're interacting with clients from either front end or our mobile infrastructure builds you know sort of core infrastructure helps us ship code security those security things it's always good for a bank to have a big security team so where does data fit into to all these teams well it turns out that the rest of the company needs to answer questions about how the business is doing with data and and everyone wants to know like facets of customer behavior whether it's like marketing and product trying to figure out how customers interact with with the application we have a huge customer service team they need data to figure out if they're serving customers well so the data team we exist to help the company sort of make decisions and tell stories with our data so how do we get this data simple started out with a service-oriented architecture the company now runs on more than 35 separate services and almost all those each have their own Postgres database so that's a lot of databases right it's a lot of places to get data from so I'm so we're gonna pose a question here in a world without a data team or a data warehouse what would it look like to answer a pretty what should be a pretty simple business question about our data so we want to find all the users who have made a transaction greater than $1 in the past week and in theory there should be a pretty simple question for us to answer but what does this look like without a data warehouse or a data team so you've got security production database number one and you've got scary production database number two and assuming that your ops and DBA folks will let you log into your production databases either that or they're gonna have to download order like output this data to CSV for you you pull it as a CSV you pull it into our Python you do the join in memory and then then you go from there that's kind of a pain that makes a lot of assumptions about how you get it data and if you start to answer questions that are more complex and you've got 35 different databases that you have to do this for it gets tedious this is how simply used to work in the early days this is how like the the young beta team which was two people worked they would do dumps from from production and and do their work and Python some ones that look like when you have a data warehouse well if all of those databases are in one warehouse it's a pretty pretty straightforward sequel statement and it's just it's just a regular joint so the data warehouse lets us collapse 35 individual DB's each for a separate service and to essentially one database to the analyst it just looks like a database so were they the engineers and the business through dashboards you know dashboards that work with sequel there are a lot of dashboarding tools so that they appreciate having all the data in one place and we have controls over that in terms of permissions the warehouse lets us get data to everyone from one place so what does data engineering do we build the tools and systems to make data available to everyone who needs it so that that is our job and our data warehouse of choice is redshift so I'm going to talk a little bit about redshift and what it is sort of what we've learned while using it and sort of some pointers for using redshift if you choose it as your data warehouse of choice so to start with why would you want to use redshift like why use redshift and not a dupe or some other some other data store one of the biggest ones honestly is its sequel right it was it was fort from Postgres at 8.2 so you know it's more or less just sequel if you know sequel you can query redshift it's got incredibly fast load times from s3 so using s3 is like an intermediate intermediate stage to get data into redshift which is something we do a lot is incredibly fast it's got very fast query times redshift is really fast if you model your data correctly which we'll get to in a minute the security model is the same as the rest of AWS s security model so you can leverage like I am and Paroles and that sort of thing to give people permission to get in and one of the biggest things is that the rest of simple all runs on ec2 and runs on AWS so sort of a no-brainer for us right like we already have services running an ec2 we already have data stored in s3 redshift was a pretty clear choice as a warehouse for us so here's a wallet text and this is a wall of text written by the Amazon redshift documentation writer talking about the database and it's a little business sea but I think there's actually some good things in here and I'm gonna walk through sort of these these highlighted lines so enterprise class relational database Korean management system massively parallel processing columnar data storage targeted data compression so this reads like you know sort of a business line but these things are actually really important to how you use the warehouse so let's talk about enterprise class relational database query and management system it quacks like in a sequel database right it looks like any other sequel database like I said it was for it from postgrads 8.2 there are some functions that aren't available that are in Postgres but for the most part it complies with the sequel standard the JDBC and ODBC connectors that you already have work out of the box for the most part which is really nice so a lot of my work and redshift happens to be a psycho PG to write the Python connector it just works with redshift it's totally fine the query language like I said it looks a little different but it's pretty close so massively parallel processing what does that mean so if you if you stand up a post grants Postgres instance you've got like one node database right you have a Postgres instance you have like an ec2 box or RDS it's one instance of Postgres redshift is parallel so it splits your data amongst many compute nodes so you distribute your data across a bunch of nodes and within those nodes there's split among what they call slices which is allocated alike portion of the nodes memory and disk space so redshift can parallelized both data storage and data queries and part of that parallelization is with these things called disk keys and sort keys so redshift not only allows you but forces you to decide and think about how your data is both distributed and sorted and this can make a huge difference in your query times and and how effective redshift is to you as a database so I'm gonna come take a look at this create statement so this looks a lot like a normal like Postgres create table statement you'll notice that we have to specify an encoding so a compression encoding at the end and then in this case we're gonna distribute the data by user ID you can assume it's a UID and we're gonna sort the data by wind record it so it's often really useful to have your data sorted by timestamp so ya redshift emphasizes the use of these two keys to specify both distribution and sordidness and you can leave them out but if you do leave them out redshift will make some assumptions for you and those assumptions might result in your queries being quite slow so performance good supper if you don't think about them so let's look at an example so in that create table example what we did was we we distributed the data by user ID so you know user ID one every every all pieces of data that come in with user ID one are going to compute another one and all pieces of data that come in with user ID to we're going to compute no to same for three because redshift is doing like hash space partitioning partitioning on user ID to go to the different nodes but this is actually something that we saw in production when you're like alright well we're using a new IDs it should be very evenly distributed well we noticed that one node was just like growing and growing we were getting really close to the limit of disk on that box and like what is happening um so this is what was happening so we got a we had a lot of messages come through with user ID equals null and these weren't invalid data like this was actually valid data just sometimes for this particularly use case there is no user ID attached so we had to recreate the table and redistribute the data so this is the new table where we said all right the disk key is even so basically we're going to distribute the data evenly across all nodes which fixes our problem with disk space but actually started to make some queries slower and this is like I mean this is how it looks now this is a trade-off that we had to make because now before if all user IDs for a given user ID lived on a single node when you needed to join two tables together that join can happen on that node just on that node now that we've distributed user IDs across the whole cluster those joins require shuffles across all notes so we saw some grades get slower and we fixed our disk problem but you know our analysts you know also note you know notice that some some craze were slowed down a little bit so it requires trade-offs and this is one that this is one that we saw in production so here's another one that we saw I'm going to talk about sort keys so so data dated sordidness so in this case I'm joining two tables based on based on user ID and distribution aside if your if your data is not sorted if you don't have a sort key on this what it's gonna do is it's gonna create a hash join so this if you've never used the explained query and Postgres the red shift it tells you exactly how it's going to execute your query and is really useful in figuring out all right my query is slow why is my query slow so in this case we're joining the Gov the transactions table with the goals table the goals tables going to be the smaller one so we're gonna build a hash table with a goals table and then the join the two together we're gonna probe it you know as we walk through the transactions table and we learned that in redshift your hash table for join cans built a disk not only can it's built a disk it can fill up all the available disk space on your cluster and it can cause the entire cluster to go down we had this happen more than once before we fixed it we still we still see it get close these days red shift has query timeouts but they're not on by default so if you're running a red shift Riley recommends setting some sort of reasonable timeout for your query else this happens because this is like a very real use case you don't have to have that much data before you start building these giant hash tables for joins and start seeing them spill the disk and suddenly you see your disk usage basically go to 100% on multiple nodes so to move on in this example here's an interesting fix so redshift lets you create temp tables and it's actually really cheap and fast to create empty temp tables that have different data distributions and different data sort keys so in this case we're creating two temp tables one of transactions on the goals both distributed by user ID and both sorted by user ID so now when we do the join the query plan actually does emerge join which can not only which can basically be streamed in memory and it's really fast it's it's amazing it's faster it's been faster for us to create a temp table where we get merged joined behavior than to then just like to ask join like even if even if it's not spilling the disk merge joins and redshift are so so fast so this is an example of where like thinking about sort keys is really important in your queries so back to the the business speak columnar data storage so I think this was touched on a little bit in the feather talk earlier if you're if you have a columnar database and your query and columnar databases you really want to try to avoid all select star from queries you always want to limit only the columns that you're actually working with and the reason for that is the way that Postgres stores data is it stores it row wise right so if you select a and B from the table and it's just like select a be all a baby you have to do a full table scan so with Postgres you're doing a full table scan you're throwing away everything but a and B but it's the only way to get it all of a and B it has to scan all rows red ship doesn't store data in rows and stores data in columns so when you select a and B all it has to do is fetch the columns a and B from disk so the more select so it you know in this case like if you do a select star unless you really need all that data you're throwing everything away in terms of i/o so always try to limit how many columns you're selecting so now we're gonna talk about compression so redshift has very efficient targeted data compression encoding schemes so redshift I think by default it won't encourage your data but you can select an encoding for your data to compress it a really nice thing that redshift will do if you already have data in a table you can run analyze compression and redshift will take some subset of that data and recommend a compression for you which is really nice quick caveat that coming and that sequel call analyze compression requires a like a full table lock so I would not recommend doing this for a table where your act like actively loading data or serving queries because it will lock that table until it is done compression is really important for redshift especially like keeping your disk sizes like keeping your disk reasonable it improves both query performance and storage overhead so it improves query performance because if you're pulling in code like IO if you're pulling encoded data off disk that's much faster than on encoded data and then of course is improved your storage overhead right because you're storing less data on disk I give a short lightning talk and call them a compression if you're interested in how redshift encodes data there's a lot information there there are a lot of really clever tricks that redshift uses for different types so that's there on github so now sort of the grab-bag of important operational notes things we've learned while using redshift the first one being that redshift supports redshift as an acid compliant database it supports fully serializable isolation levels and like okay what does that mean for me like running redshift it means that transactions can be kind of expensive and the commit queue for transactions can backup so if you're thinking oh I'm gonna I'm gonna stream data into rich if I'm gonna load data into redshift every minute that probably won't work because we tried it we tried loading data like every three to five minutes and we had to back it off to about ten except for critical tables because we found that like with our creo load and all these loads happening or committee was backing up so remember this when you're thinking about loading data to redshift and trying to stream data in do that uh redshift has a maintenance window right redshift has a maintenance window every week probably like 2 a.m. ours is 2 a.m. on Monday if you're building systems that require uptime systems that expect redshift to be up systems that have health checks and might page an engineer in the middle of the night if redshift goes down highly recommend building those systems to be resilient resilient to this maintance window we still haven't fit we got page for this last week like we're still uh once in while we still get paged for a maintenance window because there's some system that doesn't account for properly redshift so var car is a common data type in sequel databases redshift lets you specify of our car max which is basically var car 65,535 bytes and you think why would I ever specify something that big well the answer is because Postgres has a text type and you can put about as much as data as you want in the Postgres text to type and a lot of people use it and that translates most closely to this var car max type but it's really expensive some joins won't work you'll get errors like air 8001 detail the combined length of columns process in the sequel statement exceeded the query processing limit of 6535 characters stuff like that so when you're thinking about VAR car max remember it can be pretty sensitive when doing joins and then the last one is redshift is a black box once so basically they ship software updates so we're like in the Google Chrome an or they just accept they do even worse than that they just ship them behind the scenes like you never know when redshift is updating it just updates right and like they've broken the Cory planer on us only once but it happened we had a query that just stopped working because the query pointer broke and when redshift goes down so this is this is good and bad right when redshift goes down there's something you can do you just have to wait for Amazon to fix it so it's bad because redshift is down it's good because like you're not the one fighting the fire so it's a pretty big trade-off but like I have this on here but honestly redshift has been incredibly stable for us like it it has gone down and very rarely I can count on one hand the number of times we've seen it go down and when I say go down I mean for a period of ten or fifteen minutes it's usually when some sort of node fails and they have to replace a node so see our queries look like Postgres redshift is fork from Postgres but it's not post Breaston you need to remember that Metheny about like distributed nature and it's a tribute nature when like building tables and queries so a word Python conference what are the ways that you can talk to redshift with Python there's sort of the venerable psycho PG to library that library can kind of be a pain to install if you've ever tried to install that library especially if you've tried to install it with like SSL it can be it can be kind of painful there's a brand-new Python 3 point 5 plus pure Python PostgreSQL async PG that I haven't used yet but it's supposed to be incredibly fast and and looks quite good and we might we might look at starting to switch some of our psycho PG 2 uses to that and for our analyst if at travel trouble installing psycho PG 2 we might start pointing them at async PG there's a sequel alchemy dialect for redshift that sort of built on top of the existing sequel alchemy stuff with some redshift specific operators and then finally we at simple we have an open source library to interact with redshift called shift manager and it's got its like a bunch of helpers like some for creating users doing schema reflection which helps like deep copies of tables deepa duplication migrations and then poorly documented but in there there are some tools for doing bulk loads of data from Postgres to redshift and the same thing with bulk json we'd probably need to write some Doc's but they're in there if you go look at the code so if you're working with redshift you might find some useful some useful stuff in there alright so let's talk about simples pipelines this is the entirety of simples data pipeline so boxes with borders are our services and then everything else is sort of connective tissue so Kafka rabbit and queue message brokers things holding state like s3 redshift elasticsearch and that sort of thing so this is kind of an eye chart so let's walk through what each of these things does for us so we have a service called salt tooth it's old it's it's basically Postgres replication and that we crawl Postgres on a schedule like every 10 or 15 minutes and then you zookeeper to store the timestamps for the last crawl and when we crawl it we fetch like the last 15 minutes of data we put that the rabid mq that goes to redshift we have some really tricky bugs in this library around the updates of ROS we found though doing this can be a little tricky when you have a lot of row updates and sometimes we have some tables with like poor data models where it's not clear when the when the row is updated and so we found some subtle bugs in here and we're moving away of this away from this towards a more robust solution that we call PG Kafka so in Postgres 9.4 plus there's this new feature where you can you can basically that's called logical replication slots and what that does is you can have Postgres stream every change in the database to something and for us that's the Kafka like message queue right so it's more or less a real-time stream of the right ahead log to two Kafka so every update every insert every delete all these comes through the Kafka and then downstream talked a little bit about how they get into redshift right now I'm working on a question that I've gotten before is okay when engineers in the backend team make changes to the Postgres database how do you know those changes are reflected in redshift and right now the answer is an analyst realizes that something is missing comes and talks to us and we have to backfill data the answer in the future is going to be we can also replicate DDL statements like create table and alter table downstream and apply those to downstream databases we're obviously not going to do this with destructive statements I think automatically applying the leak table and delete column statements to be very bad so that's not gonna happen that anything additive we're working on on streaming those downstream here's the grab bag of things like I think any data engineering team you eventually end up just like with this pile of things that have to handle different things for the business so you've got a service that handles data transformation we've got a service that handles emails so anytime a customer like signs up and we need to send them live cycling emails you've got a service that does that we're folding some of these things into one sort of big service called estuary that is built on kafka streams and will let us supply let us build like individual jobs and sort of have all this one place in one place it's really tough to manage multiple services everyone is sort of like is a weight on the team and in terms of being on-call and that sort of thing and then finally we've got a service that handles user events from client so every time like you interact with simple on your phone or on the web it sends events into this HTTP endpoint those go into redshift all right so let's talk about I've talked a lot about putting data onto message queues how do we get that data from the message queues into redshift we have a service called horizon that's like our ETL service so we read from both Kafka and RabbitMQ we batch those messages and put them to s3 and then we have a loader that loads those batches from s3 to redshift one kind of neat piece that we have is it'll take a batch redshift has this thing called a no load copy where you can attempt a copy against redshift and it'll tell you if it's pass/fail like hey this copy that you're gonna try to do works or hey this failed if we have a batch that fails we'll start bisecting that batch and extract the bad messages out and put them to s3 for something to happen in the future right now that something is they're just sitting there and like none of us actually wants to go deal with it like you know they're there for the future and so that's basically it got a batcher we got a loader we've got like six or seven instances of horizon running at the same time loading from s3 into redshift so we use do keeper to keep a table lock so those instances talk to zookeeper and say hey are you learning the transact is the transaction table being loaded right now yeah okay I'm gonna move on load a different table so zookeepers like our locking mechanism so all the instances of horizon can load in parallel one decision this happened before I actually showed up that it was made sort of early on about the data schema that I thought was really smart is this is all that it takes to send data to redshift you need a table name and you need a datum with column like key value pairs with a column name and then a value and so if you want to put data in redshift at simple all you need to do is create the table horizon we'll pick up that table because horizon queries redshift like every 10 minutes for table schemas so create the table in redshift after 10 minutes horizon will pick it up put it in a schema registry and then any data sent to horizon it'll automatically start loading that data in a redshift so you don't have to do a bunch of fiddling between you and redshift you just say alright I'm gonna create the table I'm gonna start sending the data to the cube that horizon consumes from and boom we're done moving on to a couple of our other services we also have a transaction start service so if you sign up for a simple account right now and you spend some money and you go to search of transactions and you see some analytics and that sort of thing that's all served by a service from the data team it's ingesting from Kafka putting to elasticsearch and then it serves HTTP requests from clients and mobile and then finally our our biggest Python service at some point somebody said what we really need is like a janky version of cron and at that point Jiang cron was born and what Jack Rhonda does is like purely run scheduled tasks basically scheduled tasks to get data from third parties and put it into redshift so we pull beta from like our email our third-party email provider we pull data from mannequins we actually pull a lot of github data so well we run github enterprise and every night we pull everything that happened in github that day and put it in a redshift so that our our business folks can like sort of look our engineering leadership can sort of look at how we're working on github and that's our thing and that's actually been a pretty fun one that's pretty fun data to analyze um so those are all the pipes that that's all of simple simple data engineering so what have we learned from building all of this the first one is that we're moving away the whole company is moving from RabbitMQ to kafka RabbitMQ was in place like when simples started we're transitioning to Kafka why are we doing that so one reason is Kafka has this idea of a retention period so you can tell Kafka how long to hold on to your data and at anytime let's say that your service fails goes down for some period of time Kafka stores the last offset that you store from that date and like you can go back in time at any point from Kafka right like you can say hey I advanced all the way to offset 2,000,000 but I need to go back to like offset 1 million 500 let's go back and replay all this data through through the pipeline I mean that's super valuable whereas with rabbit like if you act a message like you got that message you better not drop it the other thing is Kafka actually has like real Network partition tolerance we've had instances in the past where rabbit like and to be fair to rabbit like rabbit is very clear in the documentation up front does not tolerate network partitions well like first line in the documentation what happens if two rabbit nodes stopped talking to one another basically the one node that got separated like I'm in a bad state you need to fix me and by fixing me I mean you need to drain those messages manually to like your computer and then put them back through your pipeline by hand and that didn't happen too often but when it did it was kind of a big deal and then super painful so the next one is metrics and health checks I just talked about a bunch of services there are a ton of moving parts to keep track of if you don't have your services instrumented with metrics and health checks deployment is scary you don't know when your stuff is broken it's really hard to know like the health of your systems without having these things in place flask and Django both have libraries that support metrics and health checks and honestly I'd never written anything with any HTTP service with a metric or health check before I got to simple but all of mine in the future will have them because I think they're really important to knowing like observability for your systems is really important like knowing the health of your systems is incredibly important Postgres it's a really good default it's a great data store what do you need like relational JSON key value it's a great solution well you should also you should be flexible right so in the in the case of like doing transaction search elastic search which is built on Lucene has a lot more flexibility in terms of NLP you can scale a better the scaling story for Postgres with full-text search wasn't quite clear and the query capabilities were a bit more limited so I always tell people like default to Postgres but in certain specific use cases you might you might need to look at other databases I mentioned this earlier the flexibility of our data model with horizon is made scaling our redshift schema a lot easier and along with that table migration tools for redshift have made schema Mangin management easier we use a flyaway which is a java library but if you haven't used Olympique for python which is a table migration library I think it's written by the same person who did sequel alchemy it's really good if you're doing your migrations database migrations by hand you should take a look at it because it's just a really nice software practice to have celery the task queue the Python task queue I love celery we used before simple I use celery we used celery at simple all the ETL jobs that we have are most of them that aren't streaming our batch jobs so we didn't really need airflow airflow is out at that point who said we don't we don't have any tags like we don't have any we don't need a graph of jobs they're just like scheduled batch jobs you really need something that looks a lot like cron but lets you write your jobs in Python because we had analysts writing jobs so celery was great and celery continues to work well for us so ok this is PI data I'm gonna I'm gonna talk a little bit more about Python because confession most of simple runs on the JVM a lot of our services are written in Scala job and closure and that's because the company one of the cofounders of simple came from was an early Twitter engineer and and like all third of Scala book right but a lot of our engineers use Python every day for data science analysis ETL connecting databases I think every single a JVM service I've and has a Python script in the root to do something whether it's like low testing or like smoke testing or something like that I end up using Python for all of my scripting work and the other thing is you can build our entire pipeline with Python running JVM services like zookeeper and Kafka doesn't mean that you need to interact with them with jvm languages right so there are some really good libraries out there written in Python to interact with these JVM tools so you don't need to be writing Java you can write Python and there are some companies company called parsley that does this there are they're streaming billions of messages through Python services with Python libraries like these so there's a really good zookeeper library called kazoo we use pica and Python Kafka to talk to RabbitMQ and Kafka within our celery task worker there's a really good blog post at benchmarks the Kafka clients and they they're really good you can get a huge volume through the Python clients and then the last thing elasticsearch so we have this search service I think elastic search and dynamic languages are really perfect for one another this is an example of what it's like to write a query and elastic search in Java basically it's one big string and then you have like a string builder and multiple builders and you basically have to do string manipulation to build queries that's so painful whereas in a language like Python it's elastic search takes JSON so it's just a dictionary right you can basically compose and come by and dictionaries that represent concrete ideas about queries together using Python to build your query so I think when working with elastic serves like dynamic languages are they the way to go when working with it and that's it that's all I've got you have any questions yeah we do so the infrastructure team oh yeah yes if we have systems for setting up the infrastructure yes so we work with Amazon's cloud formation and we have some sort of helper libraries to work with cloud formation and build templates to set up our systems so we have cloud formation and Jenkins running the data I'm very lucky I don't have to build those systems we have an infrastructure team that like sets up the deployment like their job is to make it easier for us to set stand-up new services so for us it's you know go go change the CloudFormation template let him know that we're building this thing that sort of thing so the question was do we hire do we hire entry level data engineers I think simples engineering staff is pretty top-heavy to be honest we have very few junior engineers and I hope that changes I think we need to hire more I came in to simple never having written a single line of code that runs in the JVM right so I knew Python pretty well I knew javascript pretty well but I don't have to learn Scala and that sort of thing and I think there's I think there to be honest there used to be more flexibility for that I think we're actually gonna focus more on junior developers you know coming soon as we get bigger I think as an organization gets big enough like you can't keep hiring senior developers the whole time like I feel like you sort of owe it to like the community to hire juniors and sort of build them up into seniors so the answer is I hope we get better at it yep oh yeah yep so the question was how do we use segment segment comm I'm a very complex relationship with segment comm we use them the thing is that marketing and business folks often want to use various analytics providers and and ad like ad serving providers and that sort of thing so we have a lot basically our clients pipe all of their like the client interactions all go through segment first and then come through segments web hooks to us for us to put in a redshift and then once they're in segment the marketing people can send them to whatever third party integration they want all of our email integration goes through segment first so we send dated a segment and then they send it to exact target or email provider and sorry what was your second question Oh anecdotal data volumes um so what I say what I say about simple is simple doesn't have big data so we don't have a bunch of data where we can like we don't have a huge data stream where we can afford to drop a little bit we've got a small data stream where we can afford to drop nothing right because we're a bank you can't drop transactions that's very bad so I mean to give you an example like our search service you know we're getting you know ten or ten or fifteen you know messages a second and other services I think our highest volume service like we're in the hundreds right we're not we're not having to deal with thousands tens of thousands you know millions of messages a second so it's a really nice problem like it's nice not having huge piles of data but we sort have different problems to solve because we can't really afford to drop any of it so we have to have really really resilient systems all right all right thank you you
Info
Channel: PyData
Views: 27,919
Rating: 4.9087453 out of 5
Keywords:
Id: 9nX35zrN20E
Channel Id: undefined
Length: 34min 7sec (2047 seconds)
Published: Fri Sep 23 2016
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.