Erlang Factory 2014 - That's 'Billion' with a 'B': Scaling to the Next Level at WhatsApp

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
thank you as he said I'm Rick Reed I'm from a small startup you've probably never heard of and I want to start by apologizing for the title when I submitted the talk in December I did not anticipate that it would mean something different in march so i i've been with whatsapp since 2011 I learned Erlang at whatsapp and my focus is mostly on scalability in the multimedia system I'm honored to work with a fantastic team but we're small there's only about ten of us plus or minus that work with Erlang and we handle all the development in the ops now before I get into why all the things that you need to tweak and change and patch and whatnot to get something of the scale of whatsapp working on Erlang I want to start out by saying that it's been a superb choice for for whatsapp primarily because of scalability in a number of directions dimensions just in terms of programming productivity I don't think that it would be you'd be hard-pressed to find a better language to be able to support so many users with so few engineers than Erlang secondly just in terms of the the scale of systems that you can build using it and the the programming model that comes naturally and all the supporting infrastructure with Erlang and finally in terms of SMP scalability allows us to run very large boxes and keep our node count small and and our sort of our operational complexity scales with the number of nodes not with the number of cores so the great SMP scalability really gives us an advantage the other great thing about Erlang is our ability to to basically update this thing on the fly literally on the fly so that we always think about in terms of we're flying an airplane that can never land but we constantly have to rebuild it to make it bigger we're constantly adding seats gets too heavy we need to change the engines we need more fuels we got to change the wings and we're doing all this wall while it's all flying now the question is is Erlang are our secret sauce and you could make the argument that it is but I think an innovation of ours is really what tells a story we call it founder directed development where Yan and Brian personally supervised every line of code as it's written and it really gives us an advantage in terms of just the quality and what we can produce so some quick numbers most of these you've probably seen in the press if you were following a few of the stories we've got about 400 closing in on 500 million monthly users 19 billion messages in and 40 billion out per day you can see the breakdown in terms of pictures voice messages and videos at peak we have about almost 150 million phones connected to the system at one time and at peak phones are connecting and disconnecting at about 230,000 per second and finally our peak message rates you can see coming in about close to 350 K and going out stoever 700 K in terms of multimedia the holidays become our biggest throughput time so on Christmas Eve we were pushing out at peak about 146 gigabits per second which when you consider that we're not streaming HD to the home these are small video files going to phones is quite a bit of bandwidth so we had 360 million videos go out on Christmas Eve about 2 billion photos were downloaded on New Year's Eve at a peak creative 46 thousand a second and we had one picture that was extremely vault viral on New Year's Eve that got forwarded 32 million times so that's kind of the scale of things wow what a revolutionary architecture so we've got some clients we've got some friends that connect to the Kelantan ter that the clients connect to back-end systems a little bit of storage too to hold on to the the messages while they're in transit between the the senders and receivers and that's about it so I'm going to drill into some of this the scale of how those work now it when we're sending messages to the phones some percentage of those are the actual text messages but but the we're also sending it right at the protocol level extra messages that include patience these are things like group subjects profile photo changes things of that nature and then that that red band is our presence messages so typing idle connected not connected so when you sum those all up although the the peak message rate out is just over 700 K the total protocol message rate is about 1.6 million a second on the back end here's a here's some stats coming out of a couple of our backends in the first two lines are from a system which this this particular box is hosting one quarter of the traffic now in this in the column where you see disc message in message out that's that's tracking distribution messages so you can see this one box is taking about 230,000 just mentions a second and going out with about it close to 150 K out on the second line you can see our amnesia stats and that's tracking the number transactions that are going in and out of Niva on that particular box it's about 11 K going in on and out now this other box on the bottom here is doing less in terms of distributions so there's a little less traffic going between nodes so only about 175,000 messages coming in but you can see it's doing a lot more and nisha updates so over in the the TM n TM out columns there's it's more like 75,000 writes per second on that particular system so you know we have the identical counter but you know we have on the order of maybe a dozen of these types of backends which are running at these kinds of speeds to support the traffic and the things that are going on in terms of scaling the in terms of data storage so I mentioned that for message we don't store messages except while they're just in the time between when somebody sends it and the receivers phone connects and picks it up for multimedia it's the same thing we only store the media during the time but it's in transit but we but we need to keep track of where what media we have and who it's being sent to so this is one sixteenth of the the database for the multimedia so you can see it's a little over a billion records for 116 so the total multimedia database is about 18 billion records in in media and that's you know it's close to well there 200 gigabytes per so you know closing in on 2 terabytes of RAM to hold that database so what's the hardware platform looked like to to just support all this so we have about 550 servers plus some standby gear of those 150 are the chat servers which are the ones that that are handled the messaging traffic that the phones connect directly to so that so given that our our peak concurrent connections about 150 million so that's each each one's holding about a million phones now if for those of you see my talk from two years ago we were pushing upwards of 2 million connections per server at that point we're running lower now basically because a we run with more Headroom because we got a lot more users and B actually there's three reasons second reason is that our users are more active so we have more users and they're sending more messages so that the server's get busier and then we've also moved some functionality that used to be outside those servers into those servers so they're so they're a lot busier so we're not getting as many connections per server there's about 250 multi media servers and our standard config is this dual 2690 v2 Ivy Bridge they're 10 core packages so they hate so a total of 20 cores but with hyper-threading x' we've got 40 cpu threads total and we put between 64 and and half a terabyte of RAM depending on like the the database ones have the the large Ram the just the standard compute nodes have more like 64 gig in general we use SSD primarily for reliability except in the cases in second in the case of video where we need a lot more storage SSD just becomes a little untenable in terms of cost there in terms of networking they're all dual link GigE times 2 so we have a front end which is user facing and the distribution runs over the back end so there's dual linked on both of those both for load balancing and for reliability and when you add it all up it's probably a little over eleven thousand cores that are running the Erlang system now as far as software we're running FreeBSD nine two with Erlang R 16 B zero 1 plus our patch set that's that's basically the platform that we're on now when we think about scalability you know no surprises here a lot of decoupling just stuff that you would expect parallelizing as stuff as much as we can we do some optimization in the application as we can occasionally have to patch OTP or beam and then we're always monitoring and measuring how we're doing in terms of capacity and what bottlenecks we might have so it drilling down a little bit into some of the decoupling topics so you know the primary thing for us in terms of decoupling is the ability to isolate bottlenecks so they don't spread through the system so back-end services that are deeper down in the stack don't don't bubble up if they have problems out to the front end and part well everything's partitioned and so one partition has trouble we don't want the other partitions to be affected so we can keep as much throughput going as we can while we address whatever issues we might have now if we have more a more coupled system then then things are going to cascade pretty quickly at the at the rates that we run stuff through the system now we try to be as asynchronous as possible that allows us to keep the throughput as high as possible even when there are you know even when there's latency at various points in the system or when latency is a little unpredictable since we have everything flowing as asynchronously as possible again it reduces the amount of coupling and it allows us to keep going as fast as we can so some specific examples of things that we've done so we avoid transaction coupling in amnesia we use them nisha extensively but most of the time we're doing it with a sink dirty without transactions we use calls only when we're returning data we need to get something back from a database otherwise we're casting everything all around again to preserve that asynchronous mode of operation what we do need to make calls we do it using timeouts only so we don't set monitors so like a gen server call we'll typically set a monitor to make sure that the the targets there so we just saw we just rely on the timeouts of a calf you know cast that goes nowhere we'll eventually timeout and so we handle it that way and that just reduces contention on procs on the far end and it reduces traffic over the distribution channel in cases where we only need best effort delivery of something will often use no suspend on the casts so this allows us to not be affected by downstream problems when either a node starts to have problems or the network between the nodes start to have problems in the cases where that happens eventually the distribution buffers will start to back up on the sending node and procs it try to send into that try to send into that node will start getting suspended by the scheduler and that just cascades further and further up the system so in the cases where we don't need to have an assuredness of aura you know you know like I say if we just need best effort to get it there then we use no suspend and that way those procs aren't going to get suspended even if there's trouble downstream in what they're trying to get to and we use large distribution buffers to absorb to better absorb those you sort of mormon terry things that we might see on a node downstream node or in the the distribution network between those nodes switching over to parallelizing so obviously we need to distribute this work over all these 11,000 course that we have so if we start with a gen server you know a single threaded approach we've split that into multiple workers in something we call gen factory like a factory for there's a dispatch process that spreads to work but it turns out that once we got to a certain point the dispatch process itself became a bottleneck and not necessarily just because of the execute time but as we have a lot of nodes coming in to a particular box and there's a dispatch process the locks on that process become a bottleneck between the distribution ports that are coming in and the process itself so by by spreading that dispatch out across so we have a gen industry which is a layer above gen factory multiple dispatched procs and so that allows us to parallel ice all the input coming into the box as well as doing the dispatch down to the workers themselves now that workers are selected by a key for most of the sort of the database type of operations but in the cases where we have a non-deterministic latency we use a use a FIFO model for distributing that stuff so that we don't get head of line blocking now as you'd expect all our services are partitioned we usually at this point are partitioning between two and thirty two ways most stuff is thirty two right now we use PG two addressing to address those partitions throughout the the cluster and usually what we do is we run them in pairs so a particular node will be primary for one partition and secondary for another and then and then they'll have a peer which is the flip and if one or the other goes down then then they'll be handling both primary traffic and secondary traffic as far as parallelizing what happens within Asia I could mention we were doing mostly async dirty and to give us as much consistency as possible since we don't have transactions running around this stuff we isolate all the record access to a single process on a single node by hashing so we're hashing to a partition which is hashing to our to an immediate fragment it's going to actually end up being dispatched down to one factory one worker and so all the access on a single record is going through a single process we also ensure that each amnesic fragments only being written to or read from on one node at the application level and that allows us to have transit our replication streams that only go in one direction so if we have two peers one sides doing access to one set of fragments and replicating to the other side and and the other it's pure node is doing the same in the reverse direction but they're not they're not sharing application access on the same fragments at the same time now at the rate that we're doing writes and on amnesia even oops even at even doing it a sink dirty we have bottleneck in terms of how much we can push through a replication channel so actually I'm jumping ahead this is a different one same issue though if there's a replication stream be going but going between peers the immunity em becomes a bottleneck in in terms of how fast it can update the fragments so we patched OTP to have multiple Amasia teams running for 8 per a sink dirty only there's still a you know a master amnesia cam that's running for the transactional stuff but for a sink dirty applying records it all happens in in parallel so that gave us a lot more throughput in terms of replication another patch we applied was to allow the amnesia library directory to be split over multiple directories which allows you to put on multiple drives and get increased throughput to the disks especially when obviously when they're dumping and it's a real issue during load time when when amis is loading from the pier its load load load the load and then it's a whole bunch of dumping and being able to spread that over that IO over multiple drives even SSD gives us a lot more scalability in terms of how fast we can load our databases finally we've been shrinking our amnesia Islands more and more that at this point we're basically running two nodes per Island so an island is is basically the amnesia cluster so even if we've got say 32 partitions we only have 16 we'd have 16 separate islands that support that particular table so that in this we do this for two reasons it gives us better opportunity to complete scheme operations under load because there's only two nodes that have to get into a state where they can complete the schema operation and it also reduces the amount of load time coordination if we're trying to bring them if we're trying to bring either one node or both nodes up at the same time there's not as many nodes in the amnesia cluster that we need to coordinate that load moving back to some more decoupling so I mentioned had a line blocking we separate read and write queues in places especially where we're doing transactions on the tables so that if there's any latency issues on the right side that doesn't block the read side because the in general the read side is going much faster so if there's any backup we can we can quickly get a queue that's out of control if we don't split those another thing that we ran into early on was again if a node runs into trouble or a network between a particular node and another set of nodes rinse into trouble that distribution channel backs up you started getting disparate bizzy's and that backs up all the way into the the application that's doing the casts and even if they're casting to multiple nodes so what we do is we can't so when we're doing stuff in turn-up internode we hand that off into separate procs so that if there is a problem with a particular node only the the only the messages that are destined for that node are going to get backed up which they have to anyway because it's not working right but messages that are going to other nodes can flow freely without without interruptions so it keeps again it keeps the throughput up and isolates the the problem to exactly where it is so that so we use that for when we're doing node to node messaging for message forwarding especially on those front ends and then also I made a patched amnesia to do that at the sink dirty replication time so so ordinarily when you do a write an async dirty write with replication the the local process does a write into the X table on the local node and then does the cast to the remote node so what we do is we hand that cast off to a set of workers that are specific to that node so that the replication streams are both running in parallel so we've got multiple replication streams outbound to each node and there's separate ones per node so if a single node has a trouble or the network has trouble then again the replication stream for it stops but but the but that back pressure doesn't go all the way up into the application that's doing the original write again we're trying to decouple this as much as possible so that if there's issues they get isolated into just the places that that actually you know there's nothing we can do about it but then everything think else keeps flowing and then I mentioned when we're doing things with a non-deterministic latency we'll we'll use the Q model where a FIFO worker dispatch so that we don't get head of line blocking on that on those types of operations so here's an example of some optimizing work we did the off lines the offline storage system used to be in a big bottleneck when we had load spikes because we just couldn't push stuff to the to the file system fast enough now it turns out that most messages are picked up very quickly I think the last time I looked it's something like 50 percent within 60 seconds so in a lot of cases we'd like to be able to deliver those messages before we even have to push them into the file system so we added a write back cache with a variable sync delay how long it would sit in the cache before it actually gets pushed into the file system and what that allowed us to do is for a lot of messages they get served directly out of the cache without ever touching the filesystem and in cases where the i/o system falls behind we get a large load excursion that sync delay can basically absorb guess it's extra the right cat the write back cache gives us extra buffering so that we can continue to deliver messages at full rate while the i/o system tries to catch up so you can see here's some statistics I don't know if it's readable back there but but basically under normal conditions that that one right in the middle X kak that's the percentage of messages which are the percentage of pops a pop is when a phone connects we pop their mailbox and deliver all their messages so the percentage of pops that can be handled exclusively out of the cache its 98.7 percent so extremely high percentage just to the right the sync age so we usually run with about a 20 second sync age that's the delay between when it gets put in the cache and when it gets flush to the file system and then the max-age the next one the 50 1000 seconds so the better part of a day of messages or the oldest message in the cache at that point is the better part of a day old so that one's been long since fish flushed to disk but it's in the cache so we don't actually have to touch the disk to pull it back out some recent improvements to offline we fixed a head-of-line blocking problem in a sync file i/o so beam wants to hash the file port and us and assign all the i/o for a file port to a single async thread which for us was a problem because the way the offline system works is each cache partition then there's 48 of cache partitions per node each one only has one file port that it's doing all its i/o over so we'd have a we're a large mailbox or a slow desk or whatever would would slow down a particular async thread and then everything on that cache partition would get blocked behind it so I patched beam so that we could round-robin our file port requests to all the async worker threads and that's that smooths everything out and I'm not quite sure why it hashes that way because all the file i/o is synchronous at that level but I'm sure there's a good reason but for us this was a this was a big win in terms of keeping that offline system performant in the face of some varying sizes and Layton sees on the on the i/o and finally we got better at handling large mailboxes who found that they were at very large mailboxes people who either are in a tremendous amount of groups and are getting thousands of messages per hour we're polluting the cache and really slowing things down so by evicting and keeping large mailboxes out of the cache it greatly an enhancer improvement or enhance the performance of that system ok another example this was a fun one our account table has 512 fragments amnesia fragments and you know we're partitioning this into those islands and which means that there's basically a sparse mapping of the users over these 512 fragments so any given partition is going to be only accessing a small percentage of the fragments and all the rest are going to be empty and idle so we so we doubled the number of hosts because we we were running close to capacity on that system and the throughput actually went down and and after a little bit digging we discovered that the the record access was at the X level was really slow so I took a look at sin foe stats on a few of the amiga pets fragments and discovered the hash chains were that the maximum hash change size was being reported over 2000 the target for heads is like 6 or 7 so this was a problem so what was happening is X uses basically uses P hash to in the as its mechanism for hashing over its linear hash and we were using P hash - with higher level to select partitions so since the hashes were coincident we had a large number of empty buckets in the ATS hash linear hash and a few that were extremely long and ETSU's --is the average chain length to decide when to split the hash so the average was 7 even though the maximum was huge so this two line change to basically seed the hash so that they did so that p hash - and the x hash didn't coincide basically eliminated that so that we get the full width of the linear hash in X and that two line change this was this was one of those really exciting days where we des played this and it gave us like a 4 to 1 an improvement of speed on that particular box now as far as patching so we're not patching FreeBSD anymore and the configuration changes are pretty minor just the things that you'd expect for a large network stack and large Ram I'm not going to go through the the the patches that we're still running from my talk two years ago that we've been ported forward R 16 but but I described those in the other talk some of the new ones since then we ran into we did run into problems with contention on the timer wheels it was sort of like the last big chunk of contention that lock counting showed in being and it's easy to understand because if we have a million or two million connections into a single host and each of those is setting and resetting a timer every time something happens with that particular phone we're talking about hundreds of thousands of timer resets and resets per second so with having just one timer wheel one lock that was a big source of contention so I split that into multiple timer wheels there's basically one per schedule er that I mean they're not tied to a scheduler but that it's on the same order and that basically eliminated that that contention and and gave us pretty clean stats on lock contention using lock counting a couple problems with there was a problem there is a problem with amnesia TM in that it's a big select receive loop so under load trying to load tables created a problem because we'd get backlog in the amnesia TM as it's loading and it could it would get to a point of no return because because of the Selective receive so there was a little patch to pull stuff out of the incoming transaction state stream and save it to process later so that the Select reflective receive would would run quickly I mentioned before that we added the multiple async dirty senders I added marking set on a port because prim file has the Selective receive so when you're doing file i/o what an easier was trying to dump a table is doing file i/o in the context of that same process if it's taking a lot of message input this would be in the transaction manager again it that would get into a point of no return amount of backlog because of that selectively receive so at adding the ability to mark and set on that file port allowed that selected received to be taken out of the equation and so that we could grow and run full speed dumping those those tables and then finally some of our clusters span the continent and so I taught amnesia how to load from a nearby node rather than pulling stuff cross country I mentioned the round-robin scheduling for a sync file i/o and breaking the the hash coincidence made a pass on optimizing the s main and name tables which are accessed on every its operation so on those nodes where we have lots of fragments several tables with lots of fragments it's lots of its tables there was a lot of contention on those tables and then finally we're running with a patch so that if Nisha is already dumping a table and another dump gets triggered that it doesn't queue that up because we get in a situation where there were so many cute where we'd fall behind on dumping there would be so many dumps queued up they would never complete and you can't complete a schema operation in media while there's a dump pending so we get in this situation where we couldn't do schema ops while the system was running because all these queued up dumps and there's really no point in queuing the extra dumps because it's it's dumping as fast as it can all right a little more about decoupling so our you know you see how the size of our cluster and it's getting a little a bit a little bit large I mean we're not running into too many problems but we needed a way to be able to contain the size of the cluster and also allowed to span long distances so I built this dislike transport over gen TCP which basically runs like dist only instead of connecting all nodes it's a met it's the the mesh is just between the nodes that talk need to talk to each other and we use a transparent routing layer just about PG to to determine whether you know if we have a lookup for a particular PG to group we look in the in the local PG to is it on the on the current cluster or it can look at each of the other connection randa's connections it has and says oh I see this group on this when disconnection and it can forward it across and it's all just single hop routing there's no multiple hops to get through this thing that looks like this so you've got two main clusters in two different data centers to multi media clusters in two different data centers and then we might have a shared set of global clusters between the two and they all have landis connections between them drilling down they look like this where where the mesh is just between the nodes that need to talk to each other but not it's not a fully connected mesh and again this is what the routing looks like so any given client will have a list of Brandes links with the with the PG to groups that have been published from the far end on to those groups and so they can do make their local routing decisions and I apologize I'm going fast I need to run out of here to catch a flight so I'm trying to leave lots of time for questions alright so scalability for us is kind of like clearing a minefield and we're generally able to diffuse them detect them and diffuse them before they explode because we get lots of world events especially live sports creates big vertical load spikes when you say goals happen or or the games over we get server failures it's usually Ram we have network problems and and hypothetically we could have software bad software pushes I mean never happens at whatsapp but now we're not always successful you may have read about an outage on the 22nd which although it was severe at least the timing was almost as bad as it could possibly be so contrary to popular belief it had nothing to do with load it started with a back-end router glitch we actually had a short incident the night before where that that same back-end router dropped to SPF but it didn't affect us very much in the in the morning however it dropped our veal and which caused a massive node disconnect and reconnect throughout the cluster and when everything reconnected it was in it was in this unstable state that we had never ever seen before we tried unsuccessfully to stabilize it and we finally decided that we had to stop everything so that plane that never lands had to land so this is the first time we had ever had to do this in years and it was you know it was a time-consuming process to bring everything down start it all back up and get it and get it running it was in the process of doing that we found one one overly coupled subsystem it was just a side-effect it wasn't it wasn't the cause but it turns out that we believe what what happened was with that massive disconnect reconnect pg to actually can get into a state worth doing n cubed messaging because what we were saying was basically PG Q's PG to message queues going from like 0 to 4 million within seconds so we're rolling out a patch to try to cut down on to try to cut down on that messaging so that if we ever see that sort of disconnect reconnect again that it won't get into that state and I'm just going to stop here and hopefully leave some time a lot of time for questions yeah how do we deal with network partitions in amnesia so we alert on it quickly and then and then basically have a reconciliation and we continue running even though the nodes are now they've disconnected at the amnesia level so we continue running and then we have a reconciliation process which is mostly manual now where we dump one site and reconcile on the other side and do the byte and do the vice-versa to basically recombine them and we join it back oh well we join them back together immediately and then we go back through and reconcile the tables basically by a dump and load yeah right right exactly yeah so so even if we yeah so the question is the the islands of two so even if we've got say sixteen partitions each of those partitions will be its own little amnesia cluster of only two nodes so there's only replication flowing between two peers and only in one direction yeah scalability issues with s not not that you wouldn't expect I mean we generally partition things enough so that we don't have a lot of concurrent access from a lot of procs so we generally try to limit the number of procs that might simultaneously access a single at table or a single amnesia fragment usually to about eight so as long as we as long as we keep the potential number of workers that might access a given fragment down to about eight then that keeps the the contention the lock contention pretty much under control and so we can it doesn't become the primary bottleneck on that particular case but but of course I mean you know it's locking right so we were always looking for ways to improve that and we've done I actually ran an experiment with because amnesia I unless it's changed recently last time I look didn't have a way to turn on read and write concurrency ah nets so I experimented with runny with a patch patch Tunisia on one of our high-volume cables to turn those on and I actually didn't wasn't able to see a difference but I was it was just a quick test so I think we need to do some more analysis on how that what why that was because it was unexpected I expected it to be a lot better and it wasn't yeah yeah well even even two years ago I was saying you know how the question is how do we simulate this load and the answer is we don't because it's extremely difficult I mean even two years ago when our traffic was one-tenth what it is now it was it was just so difficult to simulate the load that we do so if we have something you know extremely disruptive that word that we're trying out we you know we roll it out very slowly you know we'll put it on something that isn't taking that there's not only a small piece of the traffic but isn't taking all of that traffic and just quickly iterate on on that until we're confident that it's that it's working well and then roll it out across the rest of the cluster because otherwise it just it it's so difficult at this scale to simulate what's actually going to happen especially in these weird cases you know in cases where you know there's huge excursions you know it's New Year's Eve at midnight or those types of things yeah in the back how do we roll them out well basically we basically we create we just have a set of versions that we're just rolling forward so we we call them W so we're on like W a 42 now which is basically all of the OTP package built with our patch set and then we just we do rolling upgrades so everything's redundant so in the case that we want to do a beam upgrade across a you know a subsystem or or whatever it is we'll install it and then gradually do restarts across that cluster to pick up the new changes in a rare circumstance if it's just a Erlang patch that we can hot load you know something in the OTP libraries as opposed to beam then then we'll roll it out without a complete restart but that's that's rare we usually just upgrade the whole thing more questions yeah mm-hm well it's one way because we ensure okay the question is how why is replication only one way in our in our enemies Islands it's because we ensure that the traffic is only steered one direction or the other so so in the case of a set of right so let's say we had a to fragment table with in an island of two we'd make sure that all the one traffic goes to one note and all the two traffic goes to the other node so they so they won't so so one node will only see application access for one of its fragments in a replicate the other direction on the other node all its seeing on the fragments that its secondary for that it's not primary for the only traffic it sees is replication traffic so the other reason we you know primary reason we do that is first of all it's better for operational simplicity but also it reduces the amount of contention so instead of having both the application access and the transaction manager applying updates competing for the same locks on those debts tables it's either one or the other so it just it reduces the amount of contention on on those replication streams how we doing the time we still got more time so I can I can talk more if because I blew through some stuff yeah Oh ch a legit yeah why do I do that and then I'll answer your question Anthony so remaining challenges okay so DB scaling especially our whole team ET databases is a issue now as I mentioned yeah I'm sure well I'm not sure but some of you may have tables are big as 18 million 18 18 billion records but it get it gets a little unwieldy especially trying to load so we had direct experience with this two weeks ago but I mean these databases actually get reloaded on a fairly regular basis as we do upgrades and we're limited by how much we can push in at a at a time so those databases can load about a million records a second but the real problem is that sometimes the load will fail because right when it gets to the end of loading you know if you don't know if you're familiar with the way and I'm using table loads but it basically finds out it finds another node in the network that has the the table fragment that wants the load and sets up a load stream from that fragment so a load load load loads all goes into memory once it's into memory on the the load this noting the the node that's loading then it dumps it to disk and the problem is when we start we don't want to do one loader at a time so we run multiple loaders at a time we try to we try to run with sixteen loaders simultaneously sometimes that works sometimes it doesn't sometimes we have to back it off to like eight loaders but the problem is insert since all these tables are partitioned pretty accurately they're all the same size so they all finish loading at the same time and they all start dumping at the same time so on this multimedia database it'll load load load the load at million records a second after fifteen minutes it goes to let goes to dump and well it's loading multiple fragments during there but but they all tend to line up and so we had to add that parallel a disk i/o because because amnesia would like to dump you know that as the system's loading it can it can push over oh I have it on there over 700 megabytes a second to the disk but even that's not enough because at the point that it's dumping its it turns on it turns on the replication stream the replication streams coming on and it's it's I don't completely understand how what the state is but it's kind of the tables kind of blocked but it's also accepting the replication stream so there's so you start generating backlog and if the dump finishes in time before the backlog gets to appoint a no-return we're okay but occasionally they'll line up in such a way that that that backlog just becomes unrecoverable and the node engine just blown up and crashing and we have to restart the load so I patched one selected receive issue there but I think there are a couple more that need to be fixed so that we get reliable starts on these big databases the cluster is becoming big enough that that sort of real-time cluster status and control at scale is an issue for us and I don't mean in terms of sort of the typical operation I mean the reason there are plenty of cool operational tools out there even some that are that our laying base you know people who are here but you know there's kind of a specific set of things we need to look at when when we're diagnosing problems in a specific set of actions that we need to take and in the past just having a bunch of shell windows was enough but but now that we have say 150 front ends for sure Chad and 254 MMS that you can't really get 250 windows on there on a screen so so we're going to have to figure out a better solution to that and then finally power to partitioning is going to be an issue before too long I mean we're at thirty two partitions now next one sixty-four that's not too bad when we start jumping in increments of 64 128 and 256 and that at a certain point we're gonna have to say okay we need something we need something better than that for now for now it's okay but we're a couple maybe one or two steps away from where it's going to be kind of impractical to take the next step without doing something more intelligent let me answer Anthony's first yeah uh-huh I think so I think yeah I think you know Eugene you asked me that a couple weeks ago we'll think about yeah no yeah I think the most actually I usually I run so I have a 30-inch monitor so usually I run out of pity's before I run out of screen real estate but yeah yeah well that's true good point thank you you solved the problem yeah most entertaining part of the project to build entertaining that's that's a good question well yeah I mean the thing that's really exciting for us I mean the number that I'm probably I showed you all those numbers the number I'm proudest of is number of users - number of engineers I mean because it you know like 40 million users per engineer that's I'm really proud of the way we've been able to scale the system with a small team and and be able to operate the whole thing that's not really a source of entertainment source of pride but entertaining part of the system oh gosh well I think so I rebuilt that the multimedia system two years ago and what was exciting about that was that we made it a lot we were just a lot of friction in being able to send pictures and send video and then eventually we added that you know the voice message is the easy push-to-talk voice messages last year and seeing how users responded to that was really satisfying they're like wow this is easy and then we you know watch our bandwidth will go like whew but but just being able to see how people wanted to use how people wanted to connect in ways other than just text messages was was was really satisfying Yeah right it's mostly the most of the rest of the tech team is our work on our clients so since we strive to support every smartphone platform there is you know we've got teams of several engineers on each platform so the bulk of those are working on that we have a couple engineers that are working on our upcoming voice over IP stuff I mean we're all contributing to that but a few that are a few that are dedicated to that and that's that's basically it I mean like I said we the Erlang team handles all the operations of the cluster basically and they're relays anybody else technical and the rest of the whatsapp team is our customer sort people and a few GA folks anything else well we run in the cloud so so yeah it's not quite fair to say that we're 40 million users per engineer because yeah there's there's a large team underneath of us underneath us running the infrastructure in terms of networking and the data center and that which isn't counted in the 55 people at whatsapp so yeah I mean we interface with them in terms of problems that we're having especially when you know things like that outage happened but we don't we don't do any direct stuff on the network site or the server side I thought let's see I have time for a couple more questions I'm sorry um I don't know if I can say that it's well that's true the university knows its SoftLayer we like we have we have a great relationship with them we get these very large boxes bare-metal we do all the you know we install the OS we install the application we do all that they take care of the hardware in the network and it's worked out really well for us yeah you know we there's still a certain amount of shared tendency going on with the network but but for the most part where we're running isolated yeah well I answer I you know as I mentioned the other guy occasionally it doesn't happen a lot we get it we get a really reliable network on the back end which we like but there are occasions where where the nodes will run into problems but occasionally like there there will be Network problems and we'll get no disconnects in being consistent they'll run split will join them back together and do that reconciliation process but other than that well other than that and the 22nd where the whole thing disconnected and it created a huge distributed system problem it's been super reliable I mean even at the scale of the cluster that we're running which is 450 nodes or something like that yeah all right I got to run and catch a flight but thank you for your attention
Info
Channel: Erlang Solutions
Views: 41,563
Rating: 4.9474835 out of 5
Keywords:
Id: c12cYAUTXXs
Channel Id: undefined
Length: 49min 15sec (2955 seconds)
Published: Fri Mar 07 2014
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.