System Design Interview - Distributed Message Queue

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
Hi. And welcome to the system design interview channel. Today we design a distributed message queue. First, let’s make sure we are on the same page regarding the problem statement. What is a distributed message queue? Let's say there are two web-services called producer and consumer, and they need to communicate with each other. One option is to setup a synchronous communication, when producer makes a call to a consumer and waits for a response. This approach has its own pros and cons. Synchronous communication is easier and faster to implement. At the same time synchronous communication makes it harder to deal with consumer service failures. We need to think when and how to properly retry failed requests, how not to overwhelm consumer service with too many requests and how to deal with a slow consumer service host. Another option is to introduce a new component that helps to setup asynchronous communication. Producer sends data to that component and exactly one consumer gets this data a short time after. Such component is called a queue. And it is distributed, because data is stored across several machines. Please do not confuse queue with a topic. In case of a topic, message that is published goes to each and every subscriber. In case of a queue, message is received by one and only one consumer. And as it often happens with interview questions, the statement is ambiguous. What are the functional requirements? What non-functional requirements have a priority over others? What is a scale we need to deal with? All these questions need to be clarified with the interviewer. Let's do our best and define requirements ourselves. Starting with functional requirements. At this stage of the interview it may be hard to come up with a definitive set of requirements. And it’s usually not needed. Time limit allows us to only focus on several core APIs, like send message and receive message. As for non-functional requirements, we want our system to be scalable and handle load increase, highly available and tolerate hardware and network failures, highly performant, so that both send and receive operations are fast, and durable, so that data is persisted once submitted to the queue. And of course, there may be many other requirements either explicitly stated by the interviewer or intentionally omitted. Among functional requirements, we can be asked to support create and delete queue APIs, or delete message API. There may be specific requirements for the producer (for example system needs to avoid duplicate submissions), or security requirements, or an ask to implement a specific ordering guarantee. As for non-functional requirements, the interviewer may define specific service level agreement numbers (so called SLA, for example minimum throughput our system needs to support), or requirements around cost-effectiveness (for example system needs to minimize hardware cost or operational support cost). But do not worry if you can’t think of all the possible requirements. Interviewer is your friend and will help to scope the problem. You just need to be proactive and outline main use cases. Now let’s start drafting the architecture. Let’s start with components that are common for many distributed systems. First, we need a virtual IP. VIP refers to the symbolic hostname (for example myWebService.domain.com) that resolves to a load balancer system. So next, we have a load balancer. A load balancer is a device that routs client requests across a number of servers. Next, we have a FrontEnd web service. A component responsible for initial request processing, like validation, authentication, etc. Queue metadata information like its name, creation date and time, owner and any other configuration settings will be stored in a database. And best practices dictate that databases should be hidden behind some facade, a dedicated web service responsible for handling calls to a database. And we need a place to store queue messages. So, lets introduce a backend web service, that will be responsible for message persistence and processing. Now, let’s take a look at each component one by one. Load balancing is a big topic. And unless interviewer encourages you to dive deep into load balancing topic, we better not deviate too much from the main question of the interview. Always try to stay focused on what really matters. Internals of how load balancers work may not matter, but in order to make sure non-functional requirements to the system we build are fully met, we need to explain how load balancers will help us achieve high throughput and availability. When domain name is hit, request is transferred to one of the VIPs registered in DNS for our domain name. VIP is resolved to a load balancer device, which has a knowledge of FrontEnd hosts. By looking at this architecture, several questions have probably popped in your head? First, load balancer seems like a single point of failure. What happens if load balancer device goes down? Second, load balancers have limits with regards to number of requests they can process and number of bytes they can transfer. What happens when our distributed message queue service becomes so popular that load balancer limits are reached? To address high availability concerns, load balancers utilize a concept of primary and secondary nodes. The primary node accepts connections and serves requests while the secondary node monitors the primary. If, for any reason, the primary node is unable to accept connections, the secondary node takes over. As for scalability concerns, a concept of multiple VIPs (sometimes referred as VIP partitioning) can be utilized. In DNS we assign multiple A records to the same DNS name for the service. As a result, requests are partitioned across several load balancers. And by spreading load balancers across several data centers, we improve both availability and performance. Let's move on to the next component, which is a FrontEnd web service. FrontEnd is a lightweight web service, consisting of stateless machines located across several data centers. FrontEnd service is responsible for: request validation, authentication and authorization, SSL termination, server-side data encryption, caching, rate limiting (also known as throttling), request dispatching, request deduplication, usage data collection. Let’s discuss some basics of these features. Request validation helps to ensure that all the required parameters are present in the request and values of these parameters honor constraints. For example, in our case we want to make sure queue name comes with every send message request. And message size does not exceed a specified threshold. During authentication check we verify that message sender is a registered customer of our distributed queue service. And during authorization check we verify that sender is allowed to publish messages to the queue it claims. TLS is a protocol that aims to provide privacy and data integrity. TLS termination refers to the process of decrypting request and passing on an unencrypted request to the backend service. And we want to do TLS termination on FrontEnd hosts because TLS on the load balancer is expensive. Termination is usually handled by not a FrontEnd service itself, but a separate HTTP proxy that runs as a process on the same host. Next is the server-side encryption. Because we want to store messages securely on backend hosts, messages are encrypted as soon as FrontEnd receives them. Messages are stored in encrypted form and FrontEnd decrypts them only when they are sent back to a consumer. Cache stores copies of source data. In FrontEnd cache we will store metadata information about the most actively used queues. As well as user identity information to save on calls to authentication and authorization services. Rate limiting or throttling is the process of limiting the number of requests you can submit to a given operation in a given amount of time. Throttling protects the web service from being overwhelmed with requests. Leaky bucket algorithm is one of the most famous. Rate limiting is a quite popular system design question on its own. And we will have a separate video for it. FrontEnd service makes remote calls to at least two other web services: Metadata service and backend service. FrontEnd service creates HTTP clients for both services and makes sure that calls to these services are properly isolated. It means that when one service let’s say Metadata service experiences a slowdown, requests to backend service are not impacted. There are common patterns like bulkhead and circuit breaker that helps to implement resources isolation and make service more resilient in cases when remote calls start to fail. Next, we have request deduplication. It may occur when a response from a successful send message request failed to reach a client. Lesser an issue for ‘at least once’ delivery semantics, a bigger issue for ‘exactly once’ and ‘at most once’ delivery semantics, when we need to guarantee that message was never processed more than one time. Caching is usually used to store previously seen request ids to avoid deduplication. Last but not least is a usage data collection. When we gather real-time information that can be used for audit. And even though FrontEnd service has many responsibilities, the rule of thumb is to keep it as simple as possible. Moving on to the next component, which is Metadata service. Metadata service stores information about queues. Every time queue is created, we store information about it in the database. Conceptually, Metadata service is a caching layer between the FrontEnd and a persistent storage. It handles many reads and a relatively small number of writes. As we read every time message arrives and write only when new queue is created. Even though strongly consistent storage is preferred to avoid potential concurrent updates, it is not strictly required. Lets take a look at different approaches of organizing cache clusters. The first option is when cache is relatively small and we can store the whole data set on every cluster node. FrontEnd host calls a randomly chosen Metadata service host, because all the cache cluster nodes contain the same information. Second approach is to partition data into small chunks, called shards. Because data set is too big and cannot be placed into a memory of a single host. So, we store each such chunk of data on a separate node in a cluster. FrontEnd then knows which shard stores the data and calls the shard directly. And the third option is similar to the second one. We also partition data into shards, but FrontEnd does not know on what shard data is stored. So, FrontEnd calls a random Metadata service host and host itself knows where to forward the request to. In option one, we can introduce a load balancer between FrontEnd and Metadata service. As all Metadata service hosts are equal and FrontEnd does not care which Metadata host handles the request. In option two and three, Metadata hosts represent a consistent hashing ring. Do not worry if this term is completely new to you. Distributed cache topic is big and we will have a separate video on how to design a distributed cache. Components we built so far were relatively straightforward. Not easy of course, but if you have understanding of several core design principles, you will at least progress thus far in the interview. By the way, the set of components we just discussed: VIP + Load Balancer + FrontEnd web service + Metadata web service that represents a caching layer on top of a database is so popular in the world of distributed systems, that you may consider it a standard and apply to many system designs. Now, let’s take a look at the backend component. This is where the real challenge starts. To understand how backend service architecture may look like, let’s try to answer some important questions first. By the way, if you stuck during the interview, not knowing how to progress further, start asking yourself questions. Asking right questions helps to split the problem into more manageable pieces. Plus, it helps to establish a better communication channel with the interviewer. Interviewer will let you know whether you are on the right path or not. So, what those question may be? We need to figure out where and how messages are stored, right? Is database an option? Yes, it is. But not the best one and let me explain why. We are building a distributed message queue, a system that should be able to handle a very high throughput. And this means that all this throughput will be offloaded to the database. In other words, a problem of building a distributed message queue becomes a problem of building a database that can handle high throughput. And we know that highly-available and scalable databases exist out there. And if you are a junior software engineer, it is totally reasonable to say that we will utilize a 3-rd party database solution and stop right there. But for a senior position, we need to either explain how to build a distributed database (and we promise you a separate video on this) or we need to keep seeking for other options. And if not a database, where else can we store data? Who thought about memory? Please let me know in the comments. And you are correct by the way. As well as those who said file system. As we may need to store messages for days or even weeks, we need a more durable storage, like a local disk. At the same time newly arrived messages may live in memory for a short period of time or until memory on the backend host is fully utilized. Next question we should ask ourselves: how do we replicate data? And I believe you may already figured this out. We will send copies of messages to some other hosts, so that data can survive host hardware or software failures. And finally, let's think about how FrontEnd hosts select backend hosts for both storing messages and retrieving them. We can leverage Metadata service, right? So, let's summarize what we have just discussed. Message comes to the FrontEnd, FrontEnd consults Metadata service what backend host to send data to. Message is sent to a selected backend host and data is replicated. And when receive message call comes, FrontEnd talks to Metadata service to identify a backend host that stores the data. Now, let's dive deep into the backend service architecture. We will consider two options of how backend hosts relate to each other. In the first option, each backend instance is considered a leader for a particular set of queues. And by leader we mean that all requests for a particular queue (like send message and receive message requests) go to this leader instance. Let's look at the example. Send message request comes to a FrontEnd instance. Message comes to a queue with ID equal to q1. FrontEnd service calls Metadata service to identify a leader backend instance for this queue. In this particular example, instance B is a leader for q1. Message is sent to the leader and the leader is fully responsible for data replication. When receive message request comes to a FrontEnd instance, it also makes a request to the Metadata service to identify the leader for the queue. Message is then retrieved from the leader instance and leader is responsible for cleaning up the original message and all the replicas. We need a component that will help us with leader election and management. Let’s call it In-cluster manager. And as already mentioned, in-cluster manager is responsible for maintaining a mapping between queues, leaders and followers. In-cluster manager is a very sophisticated component. It has to be reliable, scalable and performant. Creating such a component from scratch is not an easy task. Let’s see if we can avoid leader election in the first place. Can you think of an option when all instances are equal? Please pause this video and think for a while. In the second option, we have a set of small clusters, each cluster consists of 3-4 machines distributed across several data centers. When send message request comes, similar to the previous design option, we also need to call Metadata service to identify which cluster is responsible for storing messages for the q1 queue. After that we just make a call to a randomly selected instance in the cluster. And instance is responsible for data replication across all nodes in the cluster. When receive message request comes and we identified which cluster stores messages for the q1 queue, we once again call a randomly selected host and retrieve the message. Selected host is responsible for the message cleanup. As you may see, we no longer need a component for leader election, but we still need something that will help us to manage queue to cluster assignments. Let’s call this component an Out-cluster manager (not the best name, I know, but naming is hard). And this component will be responsible for maintaining a mapping between queues and clusters. Is out-cluster manager a simpler component than in-cluster manager? It turns out that not really. While in-cluster manager manages queue assignment within the cluster, out-cluster manager manages queue assignment across clusters. In-cluster manager needs to know about each and every instance in the cluster. Out-cluster manager may not know about each particular instance, but it needs to know about each cluster. In-cluster manager listens to heartbeats from instances. Out-cluster manager monitors health of each independent cluster. And while in-cluster manager deals with host failures and needs to adjust to the fact that instances may die and new instances may be added to the cluster, out-cluster manager is responsible for tracking each cluster utilization and deal with overheated clusters. Meaning that new queues may no longer be assigned to clusters that reached their capacity limits. And what about really big queues? When a single queue gets so many messages that a single leader (in design option A) or a single cluster (in design option B) cannot handle such a big load? In-cluster manager splits queue into parts (partitions) and each partition gets a leader server. Out-cluster manager may split queue across several clusters. So that messages for the same queue are equally distributed between several clusters. So far we have covered all the main components of the high-level architecture. Let’s see what else is important to mention while discussing distributed message queues. Queue creation and deletion. Queue can be auto-created, for example when the first message for the queue hits FrontEnd service, or we can define API for queue creation. API is a better option, as we will have more control over queue configuration parameters. Delete queue operation is a bit controversial, as it may cause a lot of harm and must be executed with caution. For this reason, you may find examples of well-known distributed queues that do not expose deleteQueue API via public REST endpoint. Instead, this operation may be exposed through a command line utility, so that only experienced admin users may call it. As for a message deletion, there are several options at our disposal. One option is not to delete a message right after it was consumed. In this case consumers have to be responsible for what they already consumed. And it is not as easy as it sounds. As we need to maintain some kind of an order for messages in the queue and keep track of the offset, which is the position of a message within a queue. Messages can then be deleted several days later, by a job. This idea is used by Apache Kafka. The second option, is to do something similar to what Amazon SQS is doing. Messages are also not deleted immediately, but marked as invisible, so that other consumers may not get already retrieved message. Consumer that retrieved the message, needs to then call delete message API to delete the message from a backend host. And if the message was not explicitly deleted by a consumer, message becomes visible and may be delivered and processed twice. We know that messages need to be replicated to achieve high durability. Otherwise, if we only have one copy of data, it may be lost due to unexpected hardware failure. Messages can be replicated synchronously or asynchronously. Synchronously means that when backend host receives new message, it waits until data is replicated to other hosts. And only if replication is fully completed, successful response is returned to a producer. Asynchronous replication means that response is returned back to a producer as soon as message is stored on a single backend host. Message is later replicated to other hosts. Both options have pros and cons. Synchronous replication provides higher durability, but with a cost of higher latency for send message operation. Asynchronous replication is more performant, but does not guarantee that message will survive backend host failure. There are three main message delivery guarantees. At most once, when messages may be lost but are never redelivered. At least once, when messages are never lost but may be redelivered. And exactly once, when each message is delivered once and only once. And you probably have a question already, why do we need three? Will anyone ever want other than exactly once delivery? Great question, and the simple answer is that it is hard to achieve exactly once delivery in practice. In a distributed message queue system there are many potential points of failure. Producer may fail to deliver or deliver multiple times, data replication may fail, consumers may fail to retrieve or process the message. All this adds complexity and leads to the fact that most distributed queue solutions today support at-least-once delivery, as it provides a good balance between durability, availability and performance. With a pull model, consumer constantly sends retrieve message requests and when new message is available in the queue, it is sent back to a consumer. With a push model, consumer is not constantly bombarding FrontEnd service with receive calls. Instead, consumer is notified as soon as new message arrives to the queue. And as always, there are pros and cons. Here I will not enumerate all of them, will simply state that from a distributed message queue perspective pull is easier to implement than a push. But from a consumer perspective, we need to do more work if we pull. Many of us think of FIFO acronym when we hear about queues. FIFO stands for first-in, first-out, meaning that the oldest message in a queue is always processed first. But in distributed systems, it is hard to maintain a strict order. Message A may be produced prior to message B, but it is hard to guarantee that message A will be stored and consumed prior to message B. For these reasons many distributed queue solutions out there either does not guarantee a strict order. Or have limitations around throughput, as queue cannot be fast while it’s doing many additional validations and coordination to guarantee a strict order. With regards to security, we need to make sure that messages are securely transferred to and from a queue. Encryption using SSL over HTTPS helps to protect messages in transit. And we also may encrypt messages while storing them on backend hosts. We discussed this when talked about FrontEnd service responsibilities. Monitoring is critical for every system. With regards to distributed message queue, we need to monitor components (or microservices) that we built: fronted, metadata and backend services. As well as provide visibility into customer’s experience. In other words, we need to monitor health of our distributed queue system and give customers ability to track state of their queues. Each service we built has to emit metrics and write log data. As operators of these services we need to create dashboards for each microservice and setup alerts. And customers of our queue have to be able to create dashboards and set up alerts as well. For this purpose, integration with monitoring system is required. Do not forget to mention monitoring aspect to the interviewer. Many times this topic is omitted by candidates, but it is very important. Let's take one final look at the architecture we built. And evaluate whether non-functional requirements are fulfilled. Is our system scalable? Yes. As every component is scalable. When load increases, we just add more load balancers, more FrontEnd hosts, more Metadata service cache shards, more backend clusters and hosts. Is our system highly available? Yes. As there is no a single point of failure, each component is deployed across several data centers. Individual hosts may die, network partitions may happen, but with this redundancy in place our system will continue to operate. Is our system highly performant? It’s actually very well depends on the implementation, hardware and network setup. Each individual microservice needs to be fast. And we need to run our software in high-performance data centers. Is our system durable? Sure. We replicate data while storing and ensure messages are not lost during the transfer from a producer and to a consumer. And that is it for today’s system design interview question. Thank you for watching this video. If you have any questions please leave them in the comments below. And I will see you next time.
Info
Channel: System Design Interview
Views: 136,273
Rating: undefined out of 5
Keywords: system design interview
Id: iJLL-KPqBpM
Channel Id: undefined
Length: 26min 28sec (1588 seconds)
Published: Thu Mar 14 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.