Spring Tips: Spring Cloud Stream

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
bring fans in today's installment of spring tips we're going applications using spring cloud stream in an earlier video we looked at spring integration which is a way of building event-driven applications in a clean consistent way one thing that we saw in that spring integration video was that message channels can be used to connect different components the components that are in the same virtual machine or indeed components that are distributed because you can have at the terminus ease of these different channels adapters and these adapters are one of the many types of components of spring integration shipped with a an adapter is meant to transform a Spring Framework message as it transits through a Spring Framework message channel into something that can be used in the outside world so we could use a Patrick Kafka we could use uki you know we could use them Redis we could use a Twitter we could use email we could use whatever anything you can imagine thinking about in terms of events or messages can be adapted into a Spring Framework message so I can listen to messages or events from the outside world I can say whenever a new email arrives in an IMAP server IMAP idle supporting email service publish a message or whenever a new message arrives on an JMS broker publish a message and then I will listen for the message at the end of the other side of that of that channel so we have two things happening here we've got a component that's monitoring the outside world and as soon as it perceives an event it publishes a message into that channel and on the other side of course we have a component that listens to it similarly we could have a component that publishes messages something that takes a message or generates a message and then writes it out to an outbound adapter in this way the message enters the channel and then on the other side we have a listener an outbound adapter that translates the message from Spring Framework into something that can be used on out in the outside world maybe translate the Spring Framework message into a Twitter message or a file on an FTP server or whatever write the number of things you can talk to we're using these adapters is myriad and so spring negation makes a very very nice basis for messaging based integration right this is a the most durable way of integrating services especially those separated by a network partition messaging offers a lot of benefits it's temporarily decoupled by is to say it doesn't matter if both the producer and the consumer are alive and awake and able to respond at the same time I can write a message to a message broker and as long as that broker has acknowledged the message and stored it durably then the consumer can come along later on and then retroactively Li consume the message so that's one benefit of messaging it's also locational decoupling right I don't care as a producer of a message or a consumer where the message ends up or where it originates right I just care where about where that broker is so I can have as many different clients as I want and they can live wherever they're wherever they want right they don't have to be in a specific IP or location or anything else all I need to worry about is finding that broker and indeed that broker can scale out right so we can have as many instances of that broker as we need to and to scale it horizontally so it's very easy to say I want to use multicast I'm going to use a ring to find a broker and then talk to it now by by working in terms of the message queue we're forced to commoditize our interactions we're supposed to force to use a commodity sort of protocol or payload rather to talk to each other right so it doesn't matter if one system speaks you know Java and then one speaks net and then one speaks C for example they can all talk so long as they can talk to that message broker so we've also now extended the reach of our of our integration to other different kinds of clients this is different for example than what you'd get if you use RPC or something like that that was technology specific so we get a lot of benefits by using messaging that we don't get with synchronous RPC kind of interactions such as you such as you would have with a for example EJB right so messaging is not a new idea but it happens to be a really great fit for what we're trying to do in the micro services world where everything really is just a distributed system a different component and messaging in particular allows it allows us to talk to legacy systems systems that don't perhaps speak the same language as the one we're trying to speak so integration spring integration particular is a great fit for that but when we're trying to build modern micro services we're not trying to integrate legacy systems right we're building systems in in 2016 or beyond and these components are probably going to be able to work in working in terms of JSON or XML or something sort of modern like that there they're probably going to support access to modern brokers map modern message queues right so we can take for granted that our mut our messaging based micro services today will have access to things like RabbitMQ or Apache Kafka or whatever right so in this case we don't really need the full flexibility that spring innovation gives us it's nice to be able to talk to any old system under the Sun and and be able to talk to it in terms of messages but that that extra flexibility comes at the cost of having to configure and specify the sort of adapters which again it's nice it's nice that we have those adapters it's nice that we have these components that are in the sort of crayon box if you will the sort of you know the pallet full of colors we have those options but if we are just going to use rabbitmq or if we're going to use Apache Kafka for example then having to specify that for every single new communication you know relay every single new interaction between components can become tedious what I want is the flexibility to work in terms of messages I want the flexibility to work in terms of the of a message queue and to take advantage of all those benefits but I don't want to have to write so much code that I end up regretting the the decision to use messaging right so if we're willing to take for granted that we're going to use something modern and commodity like a Apache Kafka RabbitMQ or anything else that has for which we can find a binder then we can use something called spring cloud stream spring cloud stream builds on top of spring integration it has that it features the same idea of a of a message channel through which messages are passed but it makes them it makes a matter of configuration and convention the the wiring of these components so let's go ahead and build a simple producer here okay we'll say producer and the producer will be a web application so we have a rest endpoint and we're going to send some data in the rest endpoint to another service so we're going to use spring cloud stream we'll use random cue stream processing here at startup spring and we'll use the actuator because it's nice to be able to visualize and we can use integration as well we might we don't need the integration of support here because spring cloud stream and sprinkled stream RabbitMQ bring in the minimal bits that we need to do the messaging but it's nice to have the integration component set there if we want actually we don't really need it for the producer do we we can we can get a lot we can get by without that that said that's our producer and then let's generate a consumer something that'll listen for messages and and then you know accept the data and then do something with them so we use h2 we use JP a will remove the web support we don't really need that anymore and I think that'll do that'll be oh we can use integration for this we use integration to to use the spray notation component model to respond to the incoming message okay so there's that now let's build a simple producer now I have on my machine right now I have a rabid MQ already running right so I've got RabbitMQ on my machine as we talked and it's running in the background right running on another node which we can get to and what we're going to do is we want to set up an application that produces a message whenever somebody calls a rest endpoint and then we're gonna write that message to REM cue and then have it delivered to our consumer all in terms of spring cloud streams so again if you if you haven't seen the spring negation video then it'll be useful to to understand what's happening there first before we get too far into the weeds on this one you don't need to know about spray innovation but it's nice to know about the full power that sort of underneath the hood of what we're doing because you can at any time drop down into spring notation so here we go we have an application and we're going to talk to another service and the service you know in this case we're just gonna call it consumer because it doesn't particularly have a an interesting denomination I'm not really setting up a particularly plausible example here I'm just setting up a basic pub/sub or producer-consumer kind of exchange so if we have another service and we want to talk to that service we need to have a channel into which we can send messages so let's let's first build a simple wrist controller okay we'll build the rest controller and now when somebody goes to get mapping we're going to say send a message actually I guess we can just do a post mapping what about that let's just do a post mapping and we will just accept a message at /and so now when public public void publish right here we are publish and what we're going to expect is a JSON structure of type map okay so look we're going to send a a JSON body here and what we want to do is we want to write that message whenever somebody makes a call to forward slash you know greet let's just call it greet and we'll pass in a parameter there a path variable there called name so whenever somebody sends a a message that's even better we'll do that we'll just send a message explicitly that it's very simple we don't have to reason or think about it too much right so whenever somebody sends a whenever somebody posts to this endpoint we're going to send a message to somebody right we're going to say greeting equals hello name okay so now we want to publish that message to a downstream service and I could if I had a rest service if the downstream service book rest I could use rest in this case but I can use the rest template and then make a call to that service if I want to but I don't want to really take that for granted I don't want to take for granted that that service is available all the time it may well not be available at this time and if it if it isn't available then I'm going to call a service that isn't there I'm going to end up dropping State on the floor here right I need to guarantee that this data is eventually delivered whenever possible so when I'm going to build you know sort of cognitive systems we need to make sure that these things do the right thing even in the face of topology changes and failures and in this case messaging is a better fit so we're going to use spring cloud stream as I said before we have spring cloud stream RabbitMQ on the class path here there are other binders that we could use certainly but we've got random queue on my machine it works fine so we'll use that and in order to make this work I need to find a a connection to this other service so we do this declaratively do this using an interface so I'm going to call this the producer channels interface and the name is completely and utterly arbitrary and what we need to define here is the message channels to which we would like to communicate in this case I've only got one channel and the name of the channel in this case because I'm lazy and I don't really have a more imaginative name is consumer that is the service that we're going to send a message to it's our local logical perspective this has no bearing on anything in the broker itself this is entirely our perspective of of the downstream messaging micro service we don't care about rabbim queue right now we don't care about anything really we just care about what are we doing in the code right now right this is a channel that can send a message out so I'm going to annotate this with ad output now we could have multiple definitions here right we in a micro services system you might have different services that are sort of a set up and in service to different parts of the domain parts of the business logic right so you might have a channel for the you know the customer service and another one for the order service and another one for the fulfillment service and another one for whatever right different channels for different downstream messaging services in this case we've only got one and we're going to call it the consumer so there was that okay now in order to activate spring cloud stream we say at enable binding and we tell it about this interface definition that we've just created and now this beam this producer channels thing is injectable I can inject the producer channels and and use it right so I'm going to create a constructor here I'll create a constructor and I'm going to inject the producer channels here and remember I'm using Spring Framework 4.3 because I'm using spring boot 1.4 and shrink 4.3 I don't need the ad on a wired annotation so well I'll leave that off here I'm trying to remember to do that and all I'm going to do is I'm going to store the message channel itself right I don't care about the intermediate interface but I it's nice to know that I can get access to that if I want so I can say this Dutch consumer equals channels output or that consumer rather and now I can send a message so this is going to be a Spring Framework message this is the very same type of message that you use if you use Spring Framework support for WebSockets for example - it's all you know the message object and the message builder and all this the message channel all that operetta came from spring integration and Spring Framework for it was merged into the framework proper right so it's now part of Spring Framework and it's been around for more than a year so if you have spring integration and spring framework on clasp and prefer the spring framework version of these types right okay so message builder with payload and our payload of course is going to be a string there's nothing fancy there I'm going to then build it right so uh you know just building up a message this is a builder so I could set you know headers for example I can add errors and so on I could add an error channel but in this case I just want to you know to describe the message as quickly as possible so this dot consumer dot send message okay now we've written this entire application in terms of our logical use case the logical thing that we're trying to do we're trying to send a message to a rest endpoint and then send it out to the consumer service we haven't thus far defined or made concrete what that means what is the consumer service to do this we need to configure we need to configure our application to talk to random to write and we do this using convention so remember we have spring cloud stream rabbitmq on the class path and the auto configuration the spring boot auto configuration for RabbitMQ has been transitively added to the class path so transitively we're going to have a rabid mq connection factory and i could specify you know the the details for that rabid mq connection factory by using any of the well-known properties all for spring that rabbitmq so spring that Reb mq at addresses and and host and the port and the password all of this stuff but the one we have by default right the one that we have by default here spring that random queue which connects to localhost and and so on that will work just fine right so here's the okay it's going to connect to 57 56 72 it's already there it's going to connect to localhost you can see that here so that's going to be enough we it's running on my local machine but keep in mind that you should and could override those when you need to what I do need to do is I need to define the bindings though so I say spring plowed stream bindings dot consumer that destination equals consumer and this is going to correspond to a agreed upon rendezvous point in the rev MQ broker so remember in RabbitMQ RabbitMQ is a is a little bit different for example than JMS in JMS you have a a deluxe JMS destination a destination is a logical address if you will in the broker upon which both the producer and the consumer need to agree in random queue there's a little bit of an extra indirection there's something called an exchange in Rabbid mq and if you send a message to the exchange the the exchange has the ability to then decide how to route that message to the appropriate queue so the consumer connects to a queue the producer connects to the exchange and the in the handoff between the exchange and the queue isn't isn't static it can change later right you can change it in the broker itself thus adding the ability to do extra kind of routing in fact one of the most handy things you can do there's actually have Reb mq write the message from an exchange to another queue which then gets routed to another queue etc all before it's ever delivered to the consumer so you can add extra steps you can add extra bits of a indirection and processing all within rather than queue and within AMQP protocol so we're going to send it to an exchange called consumer by convention that'll this will turn into a queue on the other side we're going to you know give the same name and it's gonna use a routing key right it's gonna do the routing key as well to figure out which queue it should talk to right from the from the exchange alright so there's that let's start the application on port 8000 there's this as well that'll be enough right that'll be our simple rabbim queue producer let's start that up okay there's that that should be enough now let's go ahead and look at our consumer the other side of this equation right we have to go ahead and build a consumer that's going to take the incoming message from the producer from RabbitMQ and then just print it out we'll just acknowledge the receipt of that message right so we'll say consumer application and in this case we're going to do the same thing in Reverse we're going to say interface consumer channels and we're going to say at input this is the reverse right so it's a message that will accept delivery it's a channel it accept delivery of incoming messages and in this case the the channel name is whatever you want we can actually just call this the producer if you want right so when the message comes in we're going to route it to this channel and we want to listen to that message so let's say at enable binding and we're going to say consumer channels class and then we'll define a spring integration integration flow right this is my preferred way I like doing this a lot I like using spring negation since I have a lot of back on with spring regression but that's it there's also a component model you can use interpreting code stream right you can use a component model it's been clubbed stream to declaratively you know wire a a method invocation to the arrival of a new message so integration flows dot from and we're going to take advantage of our newly continually created consumer channels right so there's this I'm going to say C dot input sorry producer handle and what we want to do is when I want to you know just accept the incoming payload and the payload is of type string so I'm going to say string that class and then we have to provide a lambda that will accept the headers and the payload and then we'll do something with it and what we're going to do right now is we're just gonna be turn no then we're going to create the the pipeline the flow there and now we can revisit this now that we know things sort of wired together so all I'm going to do is I'm going to log the incoming data right so I'll use a a logger here or the type and if you saw the more the recent video on you know components that you can use inside of spring framework to make your code more spring aware one of the nice things that we have there is a an injection point and that's a very convenient object there so with that lets us do is we can define loggers like this so injection point IP it turn logger dot get logger IP dot get declared type get name right so there's that and now we can inject the logger here as well and we can just say log or dot info new message payload ok there we are okay so now there's our consumer it's going to spit up it's going to listen for messages that are coming in on this this input channel and as soon as the messages arrive we're going to pass it to this this service activator this handle method and then the handle methods going to simply print out the messages that is arrives ok so here we go spin that up okay so this is up and running our other services on port 8000 to go ahead and send a message to it and we'll say let's see we want to line things up so we can still see what's happening here on the console we're going to say curl minus D that's an mth to be post to localhost 8004 session I've already forgotten what the name of the thing was greet name so greet world okay so we should see if everything goes to plan hello world okay ah it didn't quite work did it we didn't see the message arrived because we haven't specified the consumer we didn't see it we didn't we didn't see it arrived because we haven't specified the consumer the the the reverse configuration so spring cloud stream bindings dot consumer that destination equals and what do we say over here for the agreed-upon rendezvous point equals well this is the producer the equals consumer right so we're going to be upon sending a message to this exchange and pointing this could be you know whatever actually we could just call this greetings hey we should do that we'll call this greetings on both the producer and the consumer there it doesn't really matter right that's just a great agreed-upon rendezvous point in the rebbe MQ broker it doesn't you know that can change if I want to this is the Java channel this is the message channel that we created in our Java code so this can't change this has to match what's in the Java code but as long as both producer and consumer agree upon this and they're both talking to the same rabbitmq broker then things will work just fine so this is the consumer and this will work right this is going to work just fine you know we can start the application try again now there's the message right it says hello world things are fine but but what's going to happen if we set the port to be zero and then we start this and started a few different times all right we're going to start a few different instances I'm going to disable a single instance only thing here I'm going to hit apply hit okay and then I'll restart my consumer create another instance of the consumer as well so I'll have two of them running at the same time so this one's on fifty one three seven seven here's another one running on port well I don't know we'll see so five one four one one and five one three seven seven so what I'm going to do now is I'm going to send the same message I'm going to send another message to my producer which is going to then publish the message and we've got two consumers now what do you suppose will happen hello world there on the consumer on one port and hello world there again another port that's not quite what I think most of us were probably expecting is it we've broadcast the message to two different consumers which is fine maybe that's fine but if I'm building a system where I want to you know deliver the message to one and one layer one and only one consumer then this is probably not what I want and instead of load-balancing by adding more consumers to divide the work I've duplicated the work I've broadcast the work and that's that's not what I want so what I want to do in my consumer is is a define a group okay so I can call this whatever I want this the name here the group name is completely arbitrary this is a consumer group and in spring cloud stream parlance that means that if I have one or more consumers in the same group only one instance in that group will get any one given message so it's exclusive to among the among them the members of that group the message will be delivered exclusively to one node so I'm going to call this the greetings group and we're gonna go ahead and restart this instance here and then this instance whoops on that one there okay there's this and we'll restart this one so now I've restarted both of them with this new configuration right now two everything goes to plan we should see it this is 51 469 this is 51 472 we should see the message delivered to one and only one consumer right we should see it on the canto of just one node so let's try again here we go okay it didn't get delivered to this one hopefully got delivered to this one there it is hello world so now our consumer group has done what we expected it to let's see if I make another call may be the load bounced for us yep the the second one got it right so we're effectively load balancing the work now where this is called a work stealing right we can have as many consumers as we need another way to another pattern another name for this pattern is called the the aggressive consumer right so what's happening is we're taking as much work as we can from the queue but if we have more consumers and we're able to divide our ability to handle that work which is ideal if you're for example using Cloud Foundry okay so this is we've looked at what happens if we have one or more instances of the consumer what happens if we have zero what happens then how do we well you know how do we make sure that if we deliver a message that the the messages is processed eventually even if there is no consumer by default this is not the behavior that we'll have so if we have if we have zero instances of the service available then that message will sit dormant on the broker it's still there it's been acknowledged by the broker but we don't have anything to take delivery of that message in to do something with it so what we need to do now is to make our our subscription durable right so we're going to take this here and we're going to say producer that durable sub script chin equals true okay so now we've got three attributes here for this producer channel that we've just described in our definition here producer is this channel now I'm going to say that the subscription is durable so if we kill this and kill this so both of them are gone I have no instance here goodbye and I have no instance here and now I'm going to send another message I'll say hello world durable world right we should see this now delivered on start up I'm going to send the message oh whoops we'll send the message I'm going to say you know again for example so we should now have two messages that are sent to the broker but not yet confirmed not yet delivered so we're going to start up an instance of this of this consumer and we should see it retro actively consume the messages that have been delivered to RabbitMQ on the console here right there's Hello durable and hello again so it worked right we've we successfully sent a message from the consumer from the producer rather to the broker and then to the consumer but we've also taken care of uh you know we've looked at how to do publish/subscribe as well as point-to-point kind of messaging and we've looked at how to build systems that do the right thing in the face of service outages and failures after all that is one of the big benefits of using a message broker is the ability to store and then forward those messages that gives us the ability to take then take down a service and still guarantee that everything gets processed eventually right we're not able to take down our consumer if we need to and upgrade it and you know deploy a new version whatever without losing the data it's eventually consistent we will eventually process the data and the world will converge upon a a well known state eventually so this has been a brief look at spring cloud stream now spring cloud stream builds a punch spring integration in a later video we're going to look at something called spring cloud dataflow which makes it dead simple to then compose or to sort of Koya graph lots of small messaging based micro services like this in support of stream processing right the the sort of perpetuation of messages through a series of messaging based services to arrive at a more complex result sort of like the pipes and filters model that you're you may be filled with if you've used bash in the UNIX command line shell alright thanks very much for watching and I'll see you next time you
Info
Channel: SpringDeveloper
Views: 27,139
Rating: 4.9250937 out of 5
Keywords: Web Development (Interest), spring, pivotal, Web Application (Industry) Web Application Framework (Software Genre), Java (Programming Language), Spring Framework, Software Developer (Project Role), Java (Software), Weblogic, IBM WebSphere Application Server (Software), IBM WebSphere (Software), WildFly (Software), JBoss (Venture Funded Company), cloud foundry, spring boot, spring cloud, spring cloud stream, rabbitmq, apache kafka
Id: HQ00E60kB6c
Channel Id: undefined
Length: 30min 43sec (1843 seconds)
Published: Mon Nov 21 2016
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.