#44 Golang - Mastering Kafka with Golang: A Beginner's Guide

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Music] hi everyone today we will explore how to use Kafka with galang by building a coffee order and Brewing System we will cover how to send coffee orders to Kafka and process those orders to notify when the coffee is being brewed let's dive in Apache kfka is a powerful distributed event streaming platform capable of handling trillions of events a day it's perfect for systems that require a large volume of events or message exchange we'll use golang with the sarama library a go client library for Kafka without further Ado let's get started with the code this project has two parts a producer and a consumer or a worker here is the code of the producer and this is the worker code let's start with the producer the producer's job is to take coffee orders from users and send them to a Kafka topic here we have the order struct which holds the customer's name and the type of coffee they want when this structure is converted to Json the fields get converted to these keys in the main function we set up an HTTP server that listens for post requests at the order endpoint the place order function handles the incoming orders and here the server is started that listens on Port 3000 let's go to the request Handler it checks if the request is not post an error is returned to implement a producer that writes events to Kafka we will take these steps first we will pass the request body into the order structure then we convert it into bytes so that we can send data to Kafka next we will write the converted data to Kafka at the end we will respond back to the user create a new order instance let's decode the request body into an order this returns an error if there is an error we will respond back to the user oh this function should be new decoder let's respond with bad request this variable order represents the order placed by the user in the next step we Marshall the order into bytes as we need to send it over Kafka that does not understand the structure of order it returns bytes and an error let's receive these values the error needs to be handled now that we have the order marshaled we will write to Kafka let's create a separate function for this purpose let's call it push order to Q its arguments would be the topic which is a string and the message which is bites it would return an error if any let's define the URLs of Kafka Brokers we have only one broker running on Local Host 9092 now we will make a new connection to the broker we will put this in a function connect producer it should accept broker URLs this function should return the producer object sync producer and error if any we will Begin by creating a new config we will use these configuration settings on successful delivery the message is returned on the successes Channel we will wait for all acknowledgements and Max retries is set to five now let's create a new producer it requires the broker addresses and config this function Returns the producer and error which matches the return type of our function so let's return from here let's go back to push order to Q function let's create a new producer here using connect producer function if an error occurred return the error we need to close the producer once we are done so let's defer it let's create a new Kafka message it is of type producer message it should have the topic and the value the value will contain the message we want to send this message is bytes let's convert it to string finally we can send this message this function send message returns partition offset in the que and error let's receive them we will handle the error here as well let's add a log for ourselves here we print the topic partition and Q offset here in the end let's return nil as there has been no error now we can use this function in the Handler to send the message let's say the topic is coffee orders and this is the message handle the error and return with internal server error next we respond back to the user this map is the response body success is set to true and the message is set to this text then we write it to the response writer our producer is ready let's move on to the consumer the consumer listens to the coffee orders topic and processes the incoming orders we will Begin by creating a new Kafka consumer that will receive messages from the queue we will also handle OS signals to stop the worker process and clean exit next we will run the consumer in a go routine in the end close the consumer let's start with step one we will create a new function to get the consumer connection which is similar to creating the producer let's copy this let's make changes to this the function name is connect consumer instead of producer it will return the consumer let's set the consumer config to return errors here create a new consumer let's use this function to create a new consumer our broker is on Port 1992 we have our worker ready let's Panic if an error occurs we will need a Kafka topic let's define it at the beginning of the function now we will create the partition consumer that will receive the messages from the Kafka Q this function consume partition takes the topic partition and offset here is the topic the partition is zero let's get the oldest offset this Returns the partition consumer and error let's handle the error here as well let's add a print here let's create a channel and receive OS signals in it next we create a done channel that we will use shortly let's create a goutine that will handle all messages this will run infinitely we will add a select statement for different kinds of messages if there are consumer errors we will print those this block receives a message from the consumer let's keep account of the messages for this let's define the message count and set it to zero we will increment the message count here let's add a log displaying the message count topic and the message itself now extract the order from the message value the value is a slice of byes convert it into a string this print simulates coffee brewing next we will handle the OS signal channel on receiving a signal we set the done Channel out of the go routine in the main function let's wait for the done channel to receive a message after dun Channel receives a message let's add a print showing the total number of messages processed in the end close the worker if there is an error handle it our consumer is ready to see our system in action start your Kafka server I use Docker to run Kafka let's pull this image next run this image on the required Port we are using port 9092 let's run the producer HTTP server in this terminal we will run the consumer let's send a request to the producer server here customer's name is Alice and the coffee ordered is latte this is the response from the server let's look at the logs the producer pushed the message on the que the consumer received the order on the topic of coffee orders here is the message customer name and coffee type the Brewing of the coffee started here let's Place another order the producer sent another message the consumer received the second order from Tom of iced Americano this was the complete message let's stop the consumer process here it prints the number of processed orders and that's it you've now seen how to use Kafka with golang to create a coffee order and Brewing notification system this example demonstrates the power of kfka for managing event-driven applications and how golang makes it simple and efficient to interact with Kafka I hope this was helpful if you have any question questions or comments feel free to leave them below don't forget to like And subscribe for more Tech tutorials happy coding
Info
Channel: codeHeim
Views: 1,405
Rating: undefined out of 5
Keywords:
Id: 4EdrCc29vXY
Channel Id: undefined
Length: 17min 6sec (1026 seconds)
Published: Wed Jul 03 2024
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.