Hi. And welcome to the system design interview
channel. Today we design a distributed message queue. First, let’s make sure we are on the same
page regarding the problem statement. What is a distributed message queue? Let's say there are two web-services called
producer and consumer, and they need to communicate with each other. One option is to setup a synchronous communication,
when producer makes a call to a consumer and waits for a response. This approach has its own pros and cons. Synchronous communication is easier and faster
to implement. At the same time synchronous communication
makes it harder to deal with consumer service failures. We need to think when and how to properly
retry failed requests, how not to overwhelm consumer service with too many requests and
how to deal with a slow consumer service host. Another option is to introduce a new component
that helps to setup asynchronous communication. Producer sends data to that component and
exactly one consumer gets this data a short time after. Such component is called a queue. And it is distributed, because data is stored
across several machines. Please do not confuse queue with a topic. In case of a topic, message that is published
goes to each and every subscriber. In case of a queue, message is received by
one and only one consumer. And as it often happens with interview questions,
the statement is ambiguous. What are the functional requirements? What non-functional requirements have a priority
over others? What is a scale we need to deal with? All these questions need to be clarified with
the interviewer. Let's do our best and define requirements
ourselves. Starting with functional requirements. At this stage of the interview it may be hard
to come up with a definitive set of requirements. And it’s usually not needed. Time limit allows us to only focus on several
core APIs, like send message and receive message. As for non-functional requirements, we want
our system to be scalable and handle load increase, highly available and tolerate hardware
and network failures, highly performant, so that both send and receive operations are
fast, and durable, so that data is persisted once submitted to the queue. And of course, there may be many other requirements
either explicitly stated by the interviewer or intentionally omitted. Among functional requirements, we can be asked
to support create and delete queue APIs, or delete message API. There may be specific requirements for the
producer (for example system needs to avoid duplicate submissions), or security requirements,
or an ask to implement a specific ordering guarantee. As for non-functional requirements, the interviewer
may define specific service level agreement numbers (so called SLA, for example minimum
throughput our system needs to support), or requirements around cost-effectiveness (for
example system needs to minimize hardware cost or operational support cost). But do not worry if you can’t think of all
the possible requirements. Interviewer is your friend and will help to
scope the problem. You just need to be proactive and outline
main use cases. Now let’s start drafting the architecture. Let’s start with components that are common
for many distributed systems. First, we need a virtual IP. VIP refers to the symbolic hostname (for example
myWebService.domain.com) that resolves to a load balancer system. So next, we have a load balancer. A load balancer is a device that routs client
requests across a number of servers. Next, we have a FrontEnd web service. A component responsible for initial request
processing, like validation, authentication, etc. Queue metadata information like its name,
creation date and time, owner and any other configuration settings will be stored in a
database. And best practices dictate that databases
should be hidden behind some facade, a dedicated web service responsible for handling calls
to a database. And we need a place to store queue messages. So, lets introduce a backend web service,
that will be responsible for message persistence and processing. Now, let’s take a look at each component
one by one. Load balancing is a big topic. And unless interviewer encourages you to dive
deep into load balancing topic, we better not deviate too much from the main question
of the interview. Always try to stay focused on what really
matters. Internals of how load balancers work may not
matter, but in order to make sure non-functional requirements to the system we build are fully
met, we need to explain how load balancers will help us achieve high throughput and availability. When domain name is hit, request is transferred
to one of the VIPs registered in DNS for our domain name. VIP is resolved to a load balancer device,
which has a knowledge of FrontEnd hosts. By looking at this architecture, several questions
have probably popped in your head? First, load balancer seems like a single point
of failure. What happens if load balancer device goes
down? Second, load balancers have limits with regards
to number of requests they can process and number of bytes they can transfer. What happens when our distributed message
queue service becomes so popular that load balancer limits are reached? To address high availability concerns, load
balancers utilize a concept of primary and secondary nodes. The primary node accepts connections and serves
requests while the secondary node monitors the primary. If, for any reason, the primary node is unable
to accept connections, the secondary node takes over. As for scalability concerns, a concept of
multiple VIPs (sometimes referred as VIP partitioning) can be utilized. In DNS we assign multiple A records to the
same DNS name for the service. As a result, requests are partitioned across
several load balancers. And by spreading load balancers across several
data centers, we improve both availability and performance. Let's move on to the next component, which
is a FrontEnd web service. FrontEnd is a lightweight web service, consisting
of stateless machines located across several data centers. FrontEnd service is responsible for: request
validation, authentication and authorization, SSL termination, server-side data encryption,
caching, rate limiting (also known as throttling), request dispatching, request deduplication,
usage data collection. Let’s discuss some basics of these features. Request validation helps to ensure that all
the required parameters are present in the request and values of these parameters honor
constraints. For example, in our case we want to make sure
queue name comes with every send message request. And message size does not exceed a specified
threshold. During authentication check we verify that
message sender is a registered customer of our distributed queue service. And during authorization check we verify that
sender is allowed to publish messages to the queue it claims. TLS is a protocol that aims to provide privacy
and data integrity. TLS termination refers to the process of decrypting
request and passing on an unencrypted request to the backend service. And we want to do TLS termination on FrontEnd
hosts because TLS on the load balancer is expensive. Termination is usually handled by not a FrontEnd
service itself, but a separate HTTP proxy that runs as a process on the same host. Next is the server-side encryption. Because we want to store messages securely
on backend hosts, messages are encrypted as soon as FrontEnd receives them. Messages are stored in encrypted form and
FrontEnd decrypts them only when they are sent back to a consumer. Cache stores copies of source data. In FrontEnd cache we will store metadata information
about the most actively used queues. As well as user identity information to save
on calls to authentication and authorization services. Rate limiting or throttling is the process
of limiting the number of requests you can submit to a given operation in a given amount
of time. Throttling protects the web service from being
overwhelmed with requests. Leaky bucket algorithm is one of the most
famous. Rate limiting is a quite popular system design
question on its own. And we will have a separate video for it. FrontEnd service makes remote calls to at
least two other web services: Metadata service and backend service. FrontEnd service creates HTTP clients for
both services and makes sure that calls to these services are properly isolated. It means that when one service let’s say
Metadata service experiences a slowdown, requests to backend service are not impacted. There are common patterns like bulkhead and
circuit breaker that helps to implement resources isolation and make service more resilient
in cases when remote calls start to fail. Next, we have request deduplication. It may occur when a response from a successful
send message request failed to reach a client. Lesser an issue for ‘at least once’ delivery
semantics, a bigger issue for ‘exactly once’ and ‘at most once’ delivery semantics,
when we need to guarantee that message was never processed more than one time. Caching is usually used to store previously
seen request ids to avoid deduplication. Last but not least is a usage data collection. When we gather real-time information that
can be used for audit. And even though FrontEnd service has many
responsibilities, the rule of thumb is to keep it as simple as possible. Moving on to the next component, which is
Metadata service. Metadata service stores information about
queues. Every time queue is created, we store information
about it in the database. Conceptually, Metadata service is a caching
layer between the FrontEnd and a persistent storage. It handles many reads and a relatively small
number of writes. As we read every time message arrives and
write only when new queue is created. Even though strongly consistent storage is
preferred to avoid potential concurrent updates, it is not strictly required. Lets take a look at different approaches of
organizing cache clusters. The first option is when cache is relatively
small and we can store the whole data set on every cluster node. FrontEnd host calls a randomly chosen Metadata
service host, because all the cache cluster nodes contain the same information. Second approach is to partition data into
small chunks, called shards. Because data set is too big and cannot be
placed into a memory of a single host. So, we store each such chunk of data on a
separate node in a cluster. FrontEnd then knows which shard stores the
data and calls the shard directly. And the third option is similar to the second
one. We also partition data into shards, but FrontEnd
does not know on what shard data is stored. So, FrontEnd calls a random Metadata service
host and host itself knows where to forward the request to. In option one, we can introduce a load balancer
between FrontEnd and Metadata service. As all Metadata service hosts are equal and
FrontEnd does not care which Metadata host handles the request. In option two and three, Metadata hosts represent
a consistent hashing ring. Do not worry if this term is completely new
to you. Distributed cache topic is big and we will
have a separate video on how to design a distributed cache. Components we built so far were relatively
straightforward. Not easy of course, but if you have understanding
of several core design principles, you will at least progress thus far in the interview. By the way, the set of components we just
discussed: VIP + Load Balancer + FrontEnd web service + Metadata web service that represents
a caching layer on top of a database is so popular in the world of distributed systems,
that you may consider it a standard and apply to many system designs. Now, let’s take a look at the backend component. This is where the real challenge starts. To understand how backend service architecture
may look like, let’s try to answer some important questions first. By the way, if you stuck during the interview,
not knowing how to progress further, start asking yourself questions. Asking right questions helps to split the
problem into more manageable pieces. Plus, it helps to establish a better communication
channel with the interviewer. Interviewer will let you know whether you
are on the right path or not. So, what those question may be? We need to figure out where and how messages
are stored, right? Is database an option? Yes, it is. But not the best one and let me explain why. We are building a distributed message queue,
a system that should be able to handle a very high throughput. And this means that all this throughput will
be offloaded to the database. In other words, a problem of building a distributed
message queue becomes a problem of building a database that can handle high throughput. And we know that highly-available and scalable
databases exist out there. And if you are a junior software engineer,
it is totally reasonable to say that we will utilize a 3-rd party database solution and
stop right there. But for a senior position, we need to either
explain how to build a distributed database (and we promise you a separate video on this)
or we need to keep seeking for other options. And if not a database, where else can we store
data? Who thought about memory? Please let me know in the comments. And you are correct by the way. As well as those who said file system. As we may need to store messages for days
or even weeks, we need a more durable storage, like a local disk. At the same time newly arrived messages may
live in memory for a short period of time or until memory on the backend host is fully
utilized. Next question we should ask ourselves: how
do we replicate data? And I believe you may already figured this
out. We will send copies of messages to some other
hosts, so that data can survive host hardware or software failures. And finally, let's think about how FrontEnd
hosts select backend hosts for both storing messages and retrieving them. We can leverage Metadata service, right? So, let's summarize what we have just discussed. Message comes to the FrontEnd, FrontEnd consults
Metadata service what backend host to send data to. Message is sent to a selected backend host
and data is replicated. And when receive message call comes, FrontEnd
talks to Metadata service to identify a backend host that stores the data. Now, let's dive deep into the backend service
architecture. We will consider two options of how backend
hosts relate to each other. In the first option, each backend instance
is considered a leader for a particular set of queues. And by leader we mean that all requests for
a particular queue (like send message and receive message requests) go to this leader
instance. Let's look at the example. Send message request comes to a FrontEnd instance. Message comes to a queue with ID equal to
q1. FrontEnd service calls Metadata service to
identify a leader backend instance for this queue. In this particular example, instance B is
a leader for q1. Message is sent to the leader and the leader
is fully responsible for data replication. When receive message request comes to a FrontEnd
instance, it also makes a request to the Metadata service to identify the leader for the queue. Message is then retrieved from the leader
instance and leader is responsible for cleaning up the original message and all the replicas. We need a component that will help us with
leader election and management. Let’s call it In-cluster manager. And as already mentioned, in-cluster manager
is responsible for maintaining a mapping between queues, leaders and followers. In-cluster manager is a very sophisticated
component. It has to be reliable, scalable and performant. Creating such a component from scratch is
not an easy task. Let’s see if we can avoid leader election
in the first place. Can you think of an option when all instances
are equal? Please pause this video and think for a while. In the second option, we have a set of small
clusters, each cluster consists of 3-4 machines distributed across several data centers. When send message request comes, similar to
the previous design option, we also need to call Metadata service to identify which cluster
is responsible for storing messages for the q1 queue. After that we just make a call to a randomly
selected instance in the cluster. And instance is responsible for data replication
across all nodes in the cluster. When receive message request comes and we
identified which cluster stores messages for the q1 queue, we once again call a randomly
selected host and retrieve the message. Selected host is responsible for the message
cleanup. As you may see, we no longer need a component
for leader election, but we still need something that will help us to manage queue to cluster
assignments. Let’s call this component an Out-cluster
manager (not the best name, I know, but naming is hard). And this component will be responsible for
maintaining a mapping between queues and clusters. Is out-cluster manager a simpler component
than in-cluster manager? It turns out that not really. While in-cluster manager manages queue assignment
within the cluster, out-cluster manager manages queue assignment across clusters. In-cluster manager needs to know about each
and every instance in the cluster. Out-cluster manager may not know about each
particular instance, but it needs to know about each cluster. In-cluster manager listens to heartbeats from
instances. Out-cluster manager monitors health of each
independent cluster. And while in-cluster manager deals with host
failures and needs to adjust to the fact that instances may die and new instances may be
added to the cluster, out-cluster manager is responsible for tracking each cluster utilization
and deal with overheated clusters. Meaning that new queues may no longer be assigned
to clusters that reached their capacity limits. And what about really big queues? When a single queue gets so many messages
that a single leader (in design option A) or a single cluster (in design option B) cannot handle such a big load? In-cluster manager splits queue into parts
(partitions) and each partition gets a leader server. Out-cluster manager may split queue across
several clusters. So that messages for the same queue are equally
distributed between several clusters. So far we have covered all the main components
of the high-level architecture. Let’s see what else is important to mention
while discussing distributed message queues. Queue creation and deletion. Queue can be auto-created, for example when
the first message for the queue hits FrontEnd service, or we can define API for queue creation. API is a better option, as we will have more
control over queue configuration parameters. Delete queue operation is a bit controversial,
as it may cause a lot of harm and must be executed with caution. For this reason, you may find examples of
well-known distributed queues that do not expose deleteQueue API via public REST endpoint. Instead, this operation may be exposed through
a command line utility, so that only experienced admin users may call it. As for a message deletion, there are several
options at our disposal. One option is not to delete a message right
after it was consumed. In this case consumers have to be responsible
for what they already consumed. And it is not as easy as it sounds. As we need to maintain some kind of an order
for messages in the queue and keep track of the offset, which is the position of a message
within a queue. Messages can then be deleted several days
later, by a job. This idea is used by Apache Kafka. The second option, is to do something similar
to what Amazon SQS is doing. Messages are also not deleted immediately,
but marked as invisible, so that other consumers may not get already retrieved message. Consumer that retrieved the message, needs
to then call delete message API to delete the message from a backend host. And if the message was not explicitly deleted
by a consumer, message becomes visible and may be delivered and processed twice. We know that messages need to be replicated
to achieve high durability. Otherwise, if we only have one copy of data,
it may be lost due to unexpected hardware failure. Messages can be replicated synchronously or
asynchronously. Synchronously means that when backend host
receives new message, it waits until data is replicated to other hosts. And only if replication is fully completed,
successful response is returned to a producer. Asynchronous replication means that response
is returned back to a producer as soon as message is stored on a single backend host. Message is later replicated to other hosts. Both options have pros and cons. Synchronous replication provides higher durability,
but with a cost of higher latency for send message operation. Asynchronous replication is more performant,
but does not guarantee that message will survive backend host failure. There are three main message delivery guarantees. At most once, when messages may be lost but
are never redelivered. At least once, when messages are never lost
but may be redelivered. And exactly once, when each message is delivered
once and only once. And you probably have a question already,
why do we need three? Will anyone ever want other than exactly once
delivery? Great question, and the simple answer is that
it is hard to achieve exactly once delivery in practice. In a distributed message queue system there
are many potential points of failure. Producer may fail to deliver or deliver multiple
times, data replication may fail, consumers may fail to retrieve or process the message. All this adds complexity and leads to the
fact that most distributed queue solutions today support at-least-once delivery, as it
provides a good balance between durability, availability and performance. With a pull model, consumer constantly sends
retrieve message requests and when new message is available in the queue, it is sent back
to a consumer. With a push model, consumer is not constantly
bombarding FrontEnd service with receive calls. Instead, consumer is notified as soon as new
message arrives to the queue. And as always, there are pros and cons. Here I will not enumerate all of them, will
simply state that from a distributed message queue perspective pull is easier to implement
than a push. But from a consumer perspective, we need to
do more work if we pull. Many of us think of FIFO acronym when we hear
about queues. FIFO stands for first-in, first-out, meaning
that the oldest message in a queue is always processed first. But in distributed systems, it is hard to
maintain a strict order. Message A may be produced prior to message
B, but it is hard to guarantee that message A will be stored and consumed prior to message
B. For these reasons many distributed queue solutions
out there either does not guarantee a strict order. Or have limitations around throughput, as
queue cannot be fast while it’s doing many additional validations and coordination to
guarantee a strict order. With regards to security, we need to make
sure that messages are securely transferred to and from a queue. Encryption using SSL over HTTPS helps to protect
messages in transit. And we also may encrypt messages while storing
them on backend hosts. We discussed this when talked about FrontEnd
service responsibilities. Monitoring is critical for every system. With regards to distributed message queue,
we need to monitor components (or microservices) that we built: fronted, metadata and backend
services. As well as provide visibility into customer’s
experience. In other words, we need to monitor health
of our distributed queue system and give customers ability to track state of their queues. Each service we built has to emit metrics
and write log data. As operators of these services we need to
create dashboards for each microservice and setup alerts. And customers of our queue have to be able
to create dashboards and set up alerts as well. For this purpose, integration with monitoring
system is required. Do not forget to mention monitoring aspect
to the interviewer. Many times this topic is omitted by candidates,
but it is very important. Let's take one final look at the architecture
we built. And evaluate whether non-functional requirements
are fulfilled. Is our system scalable? Yes. As every component is scalable. When load increases, we just add more load
balancers, more FrontEnd hosts, more Metadata service cache shards, more backend clusters
and hosts. Is our system highly available? Yes. As there is no a single point of failure,
each component is deployed across several data centers. Individual hosts may die, network partitions
may happen, but with this redundancy in place our system will continue to operate. Is our system highly performant? It’s actually very well depends on the implementation,
hardware and network setup. Each individual microservice needs to be fast. And we need to run our software in high-performance
data centers. Is our system durable? Sure. We replicate data while storing and ensure
messages are not lost during the transfer from a producer and to a consumer. And that is it for today’s system design
interview question. Thank you for watching this video. If you have any questions please leave them
in the comments below. And I will see you next time.