長沙市做網(wǎng)站百度查關(guān)鍵詞顯示排名
SpringBoot集成消息處理框架
Spring framework提供了對JMS和AMQP消息框架的無縫集成,為Spring項目使用消息處理框架提供了極大的便利。
與Spring framework相比,Spring Boot更近了一步,通過auto-configuration機制實現(xiàn)了對jms及amqp主流框架如ActiveMQ、RabbitMQ以及Kafka的自動配置,應(yīng)用層開發(fā)過程中無需自己配置,只要classpath下加入了相應(yīng)的消息處理框架包,Spring Boot會自動完成加載,程序員直接就可以使用,簡直不能再方便了。
JMS
The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages. It enables distributed communication that is loosely coupled, reliable, and asynchronous.
JMS是java message service的簡稱,是一個基于java的消息處理規(guī)范,允許基于JAVA EE平臺的應(yīng)用組件創(chuàng)建、發(fā)送、接收、讀取消息。使得分布式通訊耦合度更低、更加可靠、異步處理。
JMS提供兩種編程模式:
Point-to-Point—Messages are sent to a single consumer using a JMS queue.
Publish and Subscribe—Messages are broadcast to all registered listeners through JMS topics.
點對點通訊:消息通過隊列發(fā)送給單一的消費者。
發(fā)布訂閱模式:消息通過主題以廣播的形式發(fā)送給所有的訂閱者。
JMS規(guī)范規(guī)定了5種不同類型的消息體:
- StreamMessage:流式消息,順序讀取。
- MapMessage:鍵值對消息,可順序讀取,也可以通過鍵隨機讀取。
- TextMessage:文本消息,當(dāng)初制定規(guī)范時候認為xml會成為最主流的消息載體,通過TextMessage可以發(fā)送xml格式的文本數(shù)據(jù)。
- ObjectMessage:對象消息,java對象序列化后發(fā)送。
- BytesMessage:字節(jié)消息。
JMS API對消息的發(fā)送、接收、存儲等操作做了約定,每一個JMS消息框架的提供者(實現(xiàn)者)都必須遵守這些約定。
ActiveMQ
ActiveMQ是Apache旗下的、基于JMS的一款開源消息處理中間件,官網(wǎng)介紹:
Apache ActiveMQ? is the most popular open source, multi-protocol, Java-based message broker. It supports industry standard protocols so users get the benefits of client choices across a broad range of languages and platforms. Connect from clients written in JavaScript, C, C++, Python, .Net, and more. Integrate your multi-platform applications using the ubiquitous AMQP protocol. Exchange messages between your web applications using STOMP over websockets. Manage your IoT devices using MQTT. Support your existing JMS infrastructure and beyond. ActiveMQ offers the power and flexibility to support any messaging use-case.
最流行的開源、多協(xié)議、基于JAVA的消息處理中間件。支持工業(yè)級協(xié)議所以用戶可以從多語言、跨平臺的客戶端選擇中受益。等等…
目前ActiveMQ有兩個主流版本:
There are currently two “flavors” of ActiveMQ available - the well-known “classic” broker and the “next generation” broker code-named Artemis. Once Artemis reaches a sufficient level of feature parity with the “Classic” code-base it will become the next major version of ActiveMQ. Initial migration documentation is available as well as a development roadmap for Artemis.
等到代表“下一代”的Artemis成熟之后,就會替代“classic”成為ActiveMQ的主版本。
ActiveMQ的下載安裝
官網(wǎng)找一個合適的版本下載安裝即可,非常簡單。
安裝后提供了一個管理端口:
可以通過管理端口做測試,管理端口是8161,而默認的MQ服務(wù)端口是61616。
SpringBoot項目自動配置ActiveMQ
首先初步了解一下SpringBoot對ActiveMQ的集成情況,輕車熟路的,檢查一下auto-configuration:
找到這個JmsAutoConfiguration類:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ Message.class, JmsTemplate.class })
@ConditionalOnBean(ConnectionFactory.class)
@EnableConfigurationProperties(JmsProperties.class)
@Import(JmsAnnotationDrivenConfiguration.class)
public class JmsAutoConfiguration {
很明顯,他就是SpringBoot的自動配置類,如果classpath下存在Message.class, JmsTemplate.class 類、以及Spring容器中存在ConnectionFactory Bean的話就會被啟用。檢查一下代碼發(fā)現(xiàn)他會自動裝配JmsTemplate、JmsMessagingTemplate等對象到Spring IoC容器中。
SpringBoot項目引入ActiveMQ
POM文件引入:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>
以上starter包含了spring-jms以及activemq的相關(guān)配置,所以通過以上對auto-configuration的分析,JmsTemplate以及JmsMessagingTemplate等相關(guān)組件會被SpringBoot自動配置好,后面我們就可以直接拿來使用了。
ps:JmsTemplate組件的自動配置過程源碼也比較復(fù)雜,今天暫時不涉及。
到此,其實我們并沒做什么具體的工作,但是ActiveMQ已經(jīng)準(zhǔn)備好了,我們項目中就可以直接使用JmsTemplate收發(fā)消息了。
消息發(fā)送
在application.yml文件中加入activeMQ的配置:
spring:activemq:broker-url: tcp://localhost:61616user: adminpassword: admin
消息發(fā)送類中通過自動裝配引入JmsTemplate:
@Autowiredprivate JmsTemplate jmsTemplate;
消息發(fā)送方法:
public void sendMessage(String message){jmsTemplate.send("active.myqueue", new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {// 也可以創(chuàng)建對象 session.createObjectMessage()TextMessage textMessage = session.createTextMessage();textMessage.setText(message);return textMessage;}});}
Controller中加入對sendMessage方法的調(diào)用,以便測試:
@GetMapping ("/sendmessage/{msg}")public String sendMessage(@PathVariable String msg){userService.sendMessage(msg);return "hello";}
消息接收
同樣,消息接收方法中通過自動裝配引入JmsTemplate。
消息接收方法:
public String revieveMessage(){String message = (String)jmsTemplate.receiveAndConvert("active.myqueue");log.info("revieved message:"+message);return message;}
Controller方法中加入接收方法以便測試:
@GetMapping ("/recievemsg")public String recieveMessage(){return userService.revieveMessage();}
這樣,activeMQ的“點對點模式”收發(fā)消息的代碼準(zhǔn)備完畢,非常簡單。
驗證
首先啟動activeMQ,啟動之后在ActiveMQ的管理端監(jiān)控隊列(截圖之前我已經(jīng)測試過一次消息收發(fā)了,如果沒測試過的話,隊列是空的,不會存在active.myqueue這個隊列):
啟動我們剛才創(chuàng)建的測試應(yīng)用,測試發(fā)送消息:
發(fā)送消息 1230230,收到反饋“hello”,說明消息發(fā)送成功。
我們從ActiveMQ管理端查看隊列:
Number of pending Message 隊列中尚未消費的消息數(shù)量為1,說明我們剛才的消息已經(jīng)成功發(fā)送到隊列中了。
消息接收測試:
成功接收到消息1230230。
再次從ActiveMQ管理端驗證:
隊列中的pending message數(shù)量變?yōu)?,入隊數(shù)量和出隊數(shù)量都為1,說明剛才的消息已經(jīng)被成功消費。
此時,隊列中尚未消費的數(shù)量為0的情況下,如果再次執(zhí)行消息消費方法(recievemsg方法),消費方法會阻塞等待,直到再次調(diào)用消息發(fā)送方法發(fā)送一條新消息到隊列、消費方法獲取到新消息后結(jié)束阻塞等待。
監(jiān)聽器方式接收消息
發(fā)布訂閱模式與點對點模式的區(qū)別主要是在消息接收端,Spring提供了接收消息的注解@JmsListener。
@JmsListener需要與@Component家族的注解結(jié)合使用,UserService中編寫兩個listener監(jiān)聽active.listenqueue::
@Service
@Slf4j
public class UserService {@Autowiredprivate JmsTemplate jmsTemplate;@JmsListener(destination = "active.listenqueue")public void revieveMsgListener(String content){log.info("revieveMsgListener:"+ content);}@JmsListener(destination = "active.listenqueue")public void revieveMsgListenerA(String content){log.info("revieveMsgListenerA"+content);}
UserService編寫一個向該ActiveMQ發(fā)送消息的方法:
public void sendMessage(String message){jmsTemplate.send("active.listenqueue", new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {// 也可以創(chuàng)建對象 session.createObjectMessage()TextMessage textMessage = session.createTextMessage();textMessage.setText(message);return textMessage;}});}
Controller中調(diào)用發(fā)送方法:
@GetMapping ("/sendmessage/{msg}")public String sendMessage(@PathVariable String msg){userService.sendMessage(msg);return "hello";}
}
啟動activeMQ,啟動項目之后,查看activemq的admin端:
active.listenqueue以及2個監(jiān)聽器已經(jīng)注冊到ActiveMQ中,發(fā)送消息:
檢查應(yīng)用已經(jīng)接收到了消息:
但是只有一個監(jiān)聽器接收到了消息,反復(fù)發(fā)送消息,后臺log發(fā)現(xiàn)兩個監(jiān)聽器輪番收到消息、但是一條消息不能被兩個監(jiān)聽器同時接收到:
發(fā)布訂閱模式
發(fā)布訂閱模式下,消息發(fā)送給topic,訂閱者僅訂閱感興趣的topic內(nèi)的消息,消息消費完成后并不會從topic中消失,多個消費者可以從同一個topic內(nèi)消費消息,所以,一條消息允許被多次消費。
默認情況下,SpringBoot集成ActiveMQ采用的是點對點隊列模式,application.yml文件配置 spring:jms:pub-sub-domain參數(shù)為true開啟topic模式:
spring:activemq:broker-url: tcp://localhost:61616user: adminpassword: adminjms:pub-sub-domain: true
啟動activeMQ,啟動項目,topic下看到項目中啟動的兩個topic:
調(diào)用sendmessage方法發(fā)送消息:
檢查ActiveMQ狀態(tài),可以發(fā)現(xiàn)1條消息成功發(fā)送到隊列中,被2個消費者分別消費了一次、消息共被消費了2次:
檢查log:
兩個監(jiān)聽方法都接收到了消息。
OK,let’s say it’s a day!