手機(jī)怎么做網(wǎng)站免費的百度學(xué)術(shù)官網(wǎng)入口
MQ
同步通信
異步通信
事件驅(qū)動優(yōu)勢:
-
服務(wù)解耦
-
性能提升,吞吐量提高
-
服務(wù)沒有強(qiáng)依賴,不擔(dān)心級聯(lián)失敗問題
-
流量消峰
? 小結(jié): 大多情況對時效性要求較高,所有大多數(shù)時間用同步。而如果不需要對方的結(jié)果,且吞吐量,并發(fā)量較高則需要使用異步通信
MQ常見框架
MQ(MessageQueue),消息隊列,字面來看就是存放消息的隊列,也就是事件驅(qū)動架構(gòu)中的Broker
消息:就是事件,比如支付成功了這個事件,在MQ中就是一個消息
RabbitMQ,RocketMQ 適合處理業(yè)務(wù)(若需要優(yōu)化定制則選Rocket,因為用Java寫的)
Kafka 適合處理日志(海量數(shù)據(jù)且對數(shù)據(jù)安全性要求不高的場景),ActiveMQ用的較少
RabbitMQ
RabbitMQ概述與安裝
RabbitMQ是基于Erlang語言(面向并發(fā)的語言,天生為分布式系統(tǒng)而設(shè)計的)開發(fā)的開源消息通信中間件,官網(wǎng)地址:https://www.rabbitmq.com/
參考課前資料(鏈接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取碼:1468) 來安裝RabbitMQ
之后在瀏覽器輸入:http://192.168.83.130:15672/ 進(jìn)入RabbitMQ管理頁面,按docker run中設(shè)置的賬號密碼進(jìn)行登錄
結(jié)果如下
mq整體架構(gòu)
小結(jié)
常見消息模型
HelloWorld 案例
動手實踐
案例: 完成官方Demo中的hello world案例(鏈接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取碼:1468)
打開項目,將ip調(diào)成自己的rabbitmq使用虛擬機(jī)(或電腦)的ip,再運(yùn)行一次PublisherTest中的 testSendMessage() 方法
發(fā)送一條消息。再運(yùn)行ConsumerTest 中main方法來接收消息。
小結(jié)
SpringAMQP
AMOP(Advanced Message Queuing Protocol)高級消息隊列協(xié)議,大大簡化消息發(fā)送和接收的代碼量,且與語言無關(guān)
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
AMQP依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency> <groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在配置文件中添加mq連接信息
spring:rabbitmq:host: 192.168.83.130 # 主機(jī)名port: 5672 # 端口virtual-host: / # 虛擬主機(jī) username: itcast # 用戶名password: 123321 # 密碼
Basic Queue 簡單隊列模型
案例:利用SpringAMQP實現(xiàn)HelloWorld中的基礎(chǔ)消息隊列功能
流程如下:
1.在父工程中引入spring-amqp的依賴,以及在publisher服務(wù)中編寫配置
2.在publisher服務(wù)中利用RabbitTemplate的convertAndSend方法,發(fā)送消息到simple.queue這個隊列
SpringAMQP發(fā)送消息步驟:引入依賴和設(shè)置配置---->利用RabbitTemplate的convertAndSend方法
3.在consumer中編寫代碼,接收消息
SpringAMQP接收消息步驟:引入依賴和設(shè)置配置—》定義類,添加Component注解,類中聲明方法添加@RabbitListener注解
Work Queue 工作隊列模型
Work queue,工作隊列,可以提高消息處理速度,避免隊列消息堆積
比如隊列 一秒來50條消息 一個消費者一秒處理40條消息,那么需要兩個消費者才能使得隊列中消息被處理不丟失
案例:實現(xiàn)一個隊列綁定多個消費者
問題:rabbitMQ消息預(yù)取,會將50條消息平均分給消費者1和消費者2,但消費者2處理速度慢,因此在1s內(nèi)處理不完publisher發(fā)過來的50條消息
解決方案:讓能者多勞,設(shè)置preFetch
,控制預(yù)取消息的上限
小結(jié)
發(fā)布、訂閱模型-Fanout
注意:exchange負(fù)責(zé)消息路由,而不是存儲(queue負(fù)責(zé)存儲),路由失敗則消息丟失
Fanout Exchange 會將接收到的消息路由到每一個跟其綁定的queue(廣播)
案例:利用SpringAMQP演示FanoutExchange的使用
step1 在consumer服務(wù)中聲明Exchange、Queue、Binding(綁定關(guān)系)
step2 在consumer服務(wù)聲明兩個消費者
在consumer服務(wù)的SpringRabbitListener類中,添加兩個方法,分別監(jiān)聽fanout.queue1和fanout.queue2:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}
step3 在publisher服務(wù)發(fā)送消息到FanoutExchange
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
@Test
public void testFanoutExchange() {// 隊列名稱 String exchangeName = "itcast.fanout"; // 消息String message = "hello, everyone!";// 發(fā)送消息,參數(shù)分別是:交互機(jī)名稱、RoutingKey(暫時為空)、消息 rabbitTemplate.convertAndSend(exchangeName, "", message);
}
小結(jié)
發(fā)布、訂閱模型-Direct
案例:利用SpringAMQP演示DirectExchange的使用
步驟一 在consumer服務(wù)聲明Exchange、Queue
1.在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2,
2.并利用@RabbitListener聲明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消費者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消費者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}
步驟二 在publisher服務(wù)發(fā)送消息到DirectExchange
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
@Test
public void testDirectExchange() {//交換機(jī)名字String exchangeName = "itcast.direct";//消息String message = "紅色警報!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!";//發(fā)送消息,參數(shù)依次為:交換機(jī)名稱,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
從blue->yellow->red 運(yùn)行三次,得到結(jié)果如下
小結(jié)
發(fā)布、訂閱模型-Topic
案例 利用SpringAMQP演示TopicExchange的使用
步驟一:在consumer服務(wù)聲明Exchange、Queue
1.在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽topic.queue1和topic.queue2,
2.并利用@RabbitListener聲明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消費者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消費者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}
步驟二:在publisher服務(wù)發(fā)送消息到TopicExchange
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
@Test
public void testTopicExchange() {//交換機(jī)名字String exchangeName = "itcast.topic";//消息String message = "喜報!孫悟空大戰(zhàn)哥斯拉,勝!";//發(fā)送消息,參數(shù)依次為:交換機(jī)名稱,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
小結(jié)
消息轉(zhuǎn)化器
Spring的對消息對象的處理是由org.springframework.amqp.support.converter.MessageConverter來處理的。而默認(rèn)實現(xiàn)是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定義一個MessageConverter 類型的Bean即可。推薦用JSON方式序列化,步驟如下:
? 在publisher服務(wù)引入依賴
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId>
</dependency>
? 在publisher服務(wù)聲明MessageConverter。(原本應(yīng)該放到配置類中,但啟動類也是配置類,所以可以放啟動類中)
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
案例 測試發(fā)送Object類型消息
結(jié)果如下(沒有更改JDK序列化方式)
使用json序列化器之后
consumer接收消息過程
step1:加jackson依賴,依賴上面已經(jīng)放父工程中,就不用做了
step2: 將pulisher中相同的MessageConverter放入consumer 啟動類中(發(fā)送方與接收方必須相同)
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
step3: 定義一個消費者,監(jiān)聽object.queue隊列并消費消息
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){System.out.println("消費者........接收到對象消息:【" + msg + "】" + LocalTime.now());
}