中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

家用電腦如何做網(wǎng)站服務(wù)器百度總部公司地址在哪里

家用電腦如何做網(wǎng)站服務(wù)器,百度總部公司地址在哪里,商城網(wǎng)站設(shè)計(jì)圖,網(wǎng)站接廣告一.WorkQueues模型 Work queues,任務(wù)模型。簡(jiǎn)單來說就是讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。 當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往,消息就會(huì)堆積越來越多&#xff0c…

一.WorkQueues模型

Work queues,任務(wù)模型。簡(jiǎn)單來說就是讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。

在這里插入圖片描述
當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往,消息就會(huì)堆積越來越多,無法及時(shí)處理。
此時(shí)就可以使用work 模型,多個(gè)消費(fèi)者共同處理消息處理,消息處理的速度就能大大提高了。
接下來,我們就來模擬這樣的場(chǎng)景。
首先,我們?cè)诳刂婆_(tái)創(chuàng)建一個(gè)新的隊(duì)列,命名為work.queue
在這里插入圖片描述

1.消息發(fā)送

這次我們循環(huán)發(fā)送,模擬大量消息堆積現(xiàn)象。
在publisher服務(wù)中的SpringAmqpTest類中添加一個(gè)測(cè)試方法:

 /*** workQueue* 向隊(duì)列中不停發(fā)送消息,模擬消息堆積。*/@Testpublic void testWorkQueue() throws InterruptedException {// 隊(duì)列名稱String queueName = "work.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 發(fā)送消息,每20毫秒發(fā)送一次,相當(dāng)于每秒發(fā)送50條消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}

2.消息接收

要模擬多個(gè)消費(fèi)者綁定同一個(gè)隊(duì)列,我們?cè)赾onsumer服務(wù)的SpringRabbitListener中添加2個(gè)新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費(fèi)者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費(fèi)者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}
注意到這兩消費(fèi)者,都設(shè)置了`Thead.sleep`,模擬任務(wù)耗時(shí):
  • 消費(fèi)者1 sleep了20毫秒,相當(dāng)于每秒鐘處理50個(gè)消息
  • 消費(fèi)者2 sleep了200毫秒,相當(dāng)于每秒處理5個(gè)消息

在這里插入圖片描述
可以看到消費(fèi)者1和消費(fèi)者2竟然每人消費(fèi)了25條消息:

  • 消費(fèi)者1很快完成了自己的25條消息
  • 消費(fèi)者2卻在緩慢的處理自己的25條消息。

也就是說消息是平均分配給每個(gè)消費(fèi)者,并沒有考慮到消費(fèi)者的處理能力。導(dǎo)致1個(gè)消費(fèi)者空閑,另一個(gè)消費(fèi)者忙的不可開交。沒有充分利用每一個(gè)消費(fèi)者的能力,最終消息處理的耗時(shí)遠(yuǎn)遠(yuǎn)超過了1秒。這樣顯然是有問題的。

3.能者多勞

在spring中有一個(gè)簡(jiǎn)單的配置,可以解決這個(gè)問題。我們修改consumer服務(wù)的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息

在這里插入圖片描述
可以發(fā)現(xiàn),由于消費(fèi)者1處理速度較快,所以處理了更多的消息;消費(fèi)者2處理速度較慢,只處理了6條消息。而最終總的執(zhí)行耗時(shí)也在1秒左右,大大提升。正所謂能者多勞,這樣充分利用了每一個(gè)消費(fèi)者的處理能力,可以有效避免消息積壓?jiǎn)栴}。

4.總結(jié)

Work模型的使用:

  • 多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條消息只會(huì)被一個(gè)消費(fèi)者處理
  • 通過設(shè)置prefetch來控制消費(fèi)者預(yù)取的消息數(shù)量

二.交換機(jī)類型

在之前的兩個(gè)測(cè)試案例中,都沒有交換機(jī),生產(chǎn)者直接發(fā)送消息到隊(duì)列。而一旦引入交換機(jī),消息發(fā)送的模式會(huì)有很大變化:
在這里插入圖片描述

可以看到,在訂閱模型中,多了一個(gè)exchange角色,而且過程略有變化:

  • Publisher:生產(chǎn)者,不再發(fā)送消息到隊(duì)列中,而是發(fā)給交換機(jī)
  • Exchange:交換機(jī),一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
  • Queue:消息隊(duì)列也與以前一樣,接收消息、緩存消息。不過隊(duì)列一定要與交換機(jī)綁定。
  • Consumer:消費(fèi)者,與以前一樣,訂閱隊(duì)列,沒有變化

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

交換機(jī)的類型有四種:

  • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列。我們最早在控制臺(tái)使用的正是Fanout交換機(jī)
  • Direct:訂閱,基于RoutingKey(路由key)發(fā)送給訂閱了消息的隊(duì)列
  • Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符
  • Headers:頭匹配,基于MQ的消息頭匹配,用的較少。

本次記錄前面的三種交換機(jī)模式。

1.Fanout交換機(jī)

說明

Fanout,英文翻譯是扇出,我覺得在MQ中叫廣播更合適。
在廣播模式下,消息發(fā)送流程是這樣的:
在這里插入圖片描述

  • 1) 可以有多個(gè)隊(duì)列
  • 2) 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
  • 3) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī)
  • 4) 交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
  • 5) 訂閱隊(duì)列的消費(fèi)者都能拿到消息
    我們的計(jì)劃是這樣的:
    在這里插入圖片描述
  • 創(chuàng)建一個(gè)名為 hmall.fanout的交換機(jī),類型是Fanout
  • 創(chuàng)建兩個(gè)隊(duì)列fanout.queue1fanout.queue2,綁定到交換機(jī)hmall.fanout

1.在控制臺(tái)增加兩個(gè)新的隊(duì)列

在這里插入圖片描述
然后再創(chuàng)建一個(gè)交換機(jī):
在這里插入圖片描述
然后綁定兩個(gè)隊(duì)列到交換機(jī):
在這里插入圖片描述

測(cè)試

1.消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測(cè)試方法:

@Test
public void testFanoutExchange() {// 交換機(jī)名稱String exchangeName = "hmall.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

2.消息接收

在consumer服務(wù)的SpringRabbitListener中添加兩個(gè)方法,作為消費(fèi)者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消費(fèi)者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消費(fèi)者2接收到Fanout消息:【" + msg + "】");
}

在這里插入圖片描述

3.總結(jié)

交換機(jī)的作用是什么?

  • 接收publisher發(fā)送的消息
  • 將消息按照規(guī)則路由到與之綁定的隊(duì)列
  • 不能緩存消息,路由失敗,消息丟失
  • FanoutExchange的會(huì)將消息路由到每個(gè)綁定的隊(duì)列

2.Direct交換機(jī)

說明

在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。
在這里插入圖片描述
在Direct模型下:

  • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息

案例需求如圖
在這里插入圖片描述

  1. 聲明一個(gè)名為hmall.direct的交換機(jī)
  2. 聲明隊(duì)列direct.queue1,綁定hmall.directbindingKeybludred
  3. 聲明隊(duì)列direct.queue2,綁定hmall.directbindingKeyyellowred
  4. consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽direct.queue1和direct.queue2
  5. 在publisher中編寫測(cè)試方法,向hmall.direct發(fā)送消息

聲明隊(duì)列和交換機(jī)

首先在控制臺(tái)聲明兩個(gè)隊(duì)列direct.queue1direct.queue2
在這里插入圖片描述
然后聲明一個(gè)direct類型的交換機(jī),命名為hmall.direct:
在這里插入圖片描述
然后使用redblue作為key,綁定direct.queue1hmall.direct
同理,使用redyellow作為key,綁定direct.queue2hmall.direct,步驟略,最終結(jié)果:
在這里插入圖片描述

測(cè)試

1.消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測(cè)試方法:

@Test
public void testSendDirectExchange() {// 交換機(jī)名稱String exchangeName = "hmall.direct";// 消息String message = "紅色警報(bào)!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!";// 發(fā)送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

2.消息接收

在consumer服務(wù)的SpringRabbitListener中添加方法:

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消費(fèi)者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消費(fèi)者2接收到direct.queue2的消息:【" + msg + "】");
}

由于使用的red這個(gè)key,所以兩個(gè)消費(fèi)者都收到了消息:
在這里插入圖片描述
我們?cè)偾袚Q為blue這個(gè)key:
在這里插入圖片描述

3.總結(jié)

描述下Direct交換機(jī)與Fanout交換機(jī)的差異?

  • Fanout交換機(jī)將消息路由給每一個(gè)與之綁定的隊(duì)列
  • Direct交換機(jī)根據(jù)RoutingKey判斷路由給哪個(gè)隊(duì)列
  • 如果多個(gè)隊(duì)列具有相同的RoutingKey,則與Fanout功能類似

3.Topic交換機(jī)

說明

Topic類型的ExchangeDirect相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。
只不過Topic類型Exchange可以讓隊(duì)列在綁定RoutingKey 的時(shí)候使用通配符!

RoutingKey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以.分割,例如: item.insert

通配符規(guī)則:

  • #:匹配一個(gè)或多個(gè)詞
  • *:匹配不多不少恰好1個(gè)詞

舉例:

  • item.#:能夠匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

在這里插入圖片描述

測(cè)試

假如此時(shí)publisher發(fā)送的消息使用的RoutingKey共有四種:

  • china.news 代表有中國(guó)的新聞消息;
  • china.weather 代表中國(guó)的天氣消息;
  • japan.news 則代表日本新聞
  • japan.weather 代表日本的天氣消息;

解釋:

  • topic.queue1:綁定的是china.# ,凡是以 china.開頭的routing key 都會(huì)被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:綁定的是#.news ,凡是以 .news結(jié)尾的 routing key 都會(huì)被匹配。包括:
    • china.news
    • japan.news

接下來,我們就按照上圖所示,來演示一下Topic交換機(jī)的用法。
首先,在控制臺(tái)按照?qǐng)D示例子創(chuàng)建隊(duì)列、交換機(jī),并利用通配符綁定隊(duì)列和交換機(jī)。此處步驟略。最終結(jié)果如下:
在這里插入圖片描述

1.消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測(cè)試方法:

/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交換機(jī)名稱String exchangeName = "hmall.topic";// 消息String message = "喜報(bào)!孫悟空大戰(zhàn)哥斯拉,勝!";// 發(fā)送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

2.消息接收

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消費(fèi)者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消費(fèi)者2接收到topic.queue2的消息:【" + msg + "】");
}

在這里插入圖片描述

3.總結(jié)

描述下Direct交換機(jī)與Topic交換機(jī)的差異?

  • Topic交換機(jī)接收的消息RoutingKey必須是多個(gè)單詞,以 **.** 分割
  • Topic交換機(jī)與隊(duì)列綁定時(shí)的bindingKey可以指定通配符
  • #:代表0個(gè)或多個(gè)詞
  • *:代表1個(gè)詞

4.聲明隊(duì)列和交換機(jī)

在之前我們都是基于RabbitMQ控制臺(tái)來創(chuàng)建隊(duì)列、交換機(jī)。但是在實(shí)際開發(fā)時(shí),隊(duì)列和交換機(jī)是程序員定義的,將來項(xiàng)目上線,又要交給運(yùn)維去創(chuàng)建。那么程序員就需要把程序中運(yùn)行的所有隊(duì)列和交換機(jī)都寫下來,交給運(yùn)維。在這個(gè)過程中是很容易出現(xiàn)錯(cuò)誤的。
因此推薦的做法是由程序啟動(dòng)時(shí)檢查隊(duì)列和交換機(jī)是否存在,如果不存在自動(dòng)創(chuàng)建。

1.基本API

SpringAMQP提供了一個(gè)Queue類,用來創(chuàng)建隊(duì)列
在這里插入圖片描述
SpringAMQP還提供了一個(gè)Exchange接口,來表示所有不同類型的交換機(jī):
在這里插入圖片描述
在這里插入圖片描述
我們可以自己創(chuàng)建隊(duì)列和交換機(jī),不過SpringAMQP還提供了ExchangeBuilder來簡(jiǎn)化這個(gè)過程:
在這里插入圖片描述

在這里插入圖片描述
而在綁定隊(duì)列和交換機(jī)時(shí),則需要使用BindingBuilder來創(chuàng)建Binding對(duì)象:在這里插入圖片描述
在這里插入圖片描述

1.fanout示例

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 聲明交換機(jī)* @return Fanout類型交換機(jī)*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");}/*** 第1個(gè)隊(duì)列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 綁定隊(duì)列和交換機(jī)*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2個(gè)隊(duì)列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 綁定隊(duì)列和交換機(jī)*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

2.direct示例

direct模式由于要綁定多個(gè)KEY,會(huì)非常麻煩,每一個(gè)Key都要編寫一個(gè)binding:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 聲明交換機(jī)* @return Direct類型交換機(jī)*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1個(gè)隊(duì)列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 綁定隊(duì)列和交換機(jī)*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 綁定隊(duì)列和交換機(jī)*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2個(gè)隊(duì)列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 綁定隊(duì)列和交換機(jī)*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 綁定隊(duì)列和交換機(jī)*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

3.基于注解聲明

基于@Bean的方式聲明隊(duì)列和交換機(jī)比較麻煩,Spring還提供了基于注解方式來聲明。

例如,我們同樣聲明Direct模式的交換機(jī)和隊(duì)列:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消費(fèi)者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消費(fèi)者2接收到direct.queue2的消息:【" + msg + "】");
}

再試試Topic模式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消費(fèi)者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消費(fèi)者2接收到topic.queue2的消息:【" + msg + "】");
}

4.消息轉(zhuǎn)換器

Spring的消息發(fā)送代碼接收的消息體是一個(gè)Object:
在這里插入圖片描述
而在數(shù)據(jù)傳輸時(shí),它會(huì)把你發(fā)送的消息序列化為字節(jié)發(fā)送給MQ,接收消息的時(shí)候,還會(huì)把字節(jié)反序列化為Java對(duì)象。只不過,默認(rèn)情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:

  • 數(shù)據(jù)體積過大
  • 有安全漏洞
  • 可讀性差
    我們來測(cè)試一下。
    1)創(chuàng)建測(cè)試隊(duì)列
    首先,我們?cè)赾onsumer服務(wù)中聲明一個(gè)新的配置類:
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageConfig {@Beanpublic Queue objectQueue() {return new Queue("object.queue");}
}

注意,這里我們先不要給這個(gè)隊(duì)列添加消費(fèi)者,我們要查看消息體的格式。

重啟consumer服務(wù)以后,該隊(duì)列就會(huì)被自動(dòng)創(chuàng)建出來了:
在這里插入圖片描述
2)發(fā)送消息
我們?cè)趐ublisher模塊的SpringAmqpTest中新增一個(gè)消息發(fā)送的代碼,發(fā)送一個(gè)Map對(duì)象:

@Test
public void testSendMap() throws InterruptedException {// 準(zhǔn)備消息Map<String,Object> msg = new HashMap<>();msg.put("name", "柳巖");msg.put("age", 21);// 發(fā)送消息rabbitTemplate.convertAndSend("object.queue", msg);
}

發(fā)送消息后查看控制臺(tái):
在這里插入圖片描述
可以看到消息格式非常不友好。

1.配置JSON轉(zhuǎn)換器

顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。
publisherconsumer兩個(gè)服務(wù)中都引入依賴:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果項(xiàng)目中引入了spring-boot-starter-web依賴,則無需再次引入Jackson依賴。

配置消息轉(zhuǎn)換器,在publisherconsumer兩個(gè)服務(wù)的啟動(dòng)類中添加一個(gè)Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定義消息轉(zhuǎn)換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息jackson2JsonMessageConverter.setCreate	MessageIds(true);return jackson2JsonMessageConverter;
}

消息轉(zhuǎn)換器中添加的messageId可以便于我們將來做冪等性判斷。
此時(shí),我們到MQ控制臺(tái)刪除object.queue中的舊的消息。然后再次執(zhí)行剛才的消息發(fā)送的代碼,到MQ的控制臺(tái)查看消息結(jié)構(gòu):
在這里插入圖片描述

2.消費(fèi)者接收Object
@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {System.out.println("消費(fèi)者接收到object.queue消息:【" + msg + "】");
}

三.發(fā)送者的可靠性

首先,我們一起分析一下消息丟失的可能性有哪些。
消息從發(fā)送者發(fā)送消息,到消費(fèi)者處理消息,需要經(jīng)過的流程是這樣的:
在這里插入圖片描述
消息從生產(chǎn)者到消費(fèi)者的每一步都可能導(dǎo)致消息丟失:

  • 發(fā)送消息時(shí)丟失:
    • 生產(chǎn)者發(fā)送消息時(shí)連接MQ失敗
    • 生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange
    • 生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue
    • 消息到達(dá)MQ后,處理消息的進(jìn)程發(fā)生異常
  • MQ導(dǎo)致消息丟失:
    • 消息到達(dá)MQ,保存到隊(duì)列后,尚未消費(fèi)就突然宕機(jī)
  • 消費(fèi)者處理消息時(shí):
    • 消息接收后尚未處理突然宕機(jī)
    • 消息接收后處理過程中拋出異常

綜上,我們要解決消息丟失問題,保證MQ的可靠性,就必須從3個(gè)方面入手:

  • 確保生產(chǎn)者一定把消息發(fā)送到MQ
  • 確保MQ不會(huì)將消息弄丟
  • 確保消費(fèi)者一定要處理消息

我們先來看如何確保生產(chǎn)者一定能把消息發(fā)送到MQ。

1.生產(chǎn)者重試機(jī)制(生產(chǎn)者發(fā)送消息時(shí)連接MQ失敗)

首先第一種情況,就是生產(chǎn)者發(fā)送消息時(shí),出現(xiàn)了網(wǎng)絡(luò)故障,導(dǎo)致與MQ的連接中斷。

為了解決這個(gè)問題,SpringAMQP提供的消息發(fā)送時(shí)的重試機(jī)制。即:當(dāng)RabbitTemplate與MQ連接超時(shí)后,多次重試。

修改publisher模塊的application.yaml文件,添加下面的內(nèi)容:

spring:rabbitmq:connection-timeout: 1s # 設(shè)置MQ的連接超時(shí)時(shí)間template:retry:enabled: true # 開啟超時(shí)重試機(jī)制initial-interval: 1000ms # 失敗后的初始等待時(shí)間multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = initial-interval * multipliermax-attempts: 3 # 最大重試次數(shù)

我們利用命令停掉RabbitMQ服務(wù):

docker stop mq

在這里插入圖片描述
然后測(cè)試發(fā)送一條消息,會(huì)發(fā)現(xiàn)會(huì)每隔1秒重試1次,總共重試了3次。消息發(fā)送的超時(shí)重試機(jī)制配置成功了!

:::warning
注意:當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率。不過SpringAMQP提供的重試機(jī)制是阻塞式的重試,也就是說多次重試等待的過程中,當(dāng)前線程是被阻塞的。
如果對(duì)于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請(qǐng)合理配置等待時(shí)長(zhǎng)和重試次數(shù),當(dāng)然也可以考慮使用異步線程來執(zhí)行發(fā)送消息的代碼。
:::

2生產(chǎn)者確認(rèn)機(jī)制

一般情況下,只要生產(chǎn)者與MQ之間的網(wǎng)路連接順暢,基本不會(huì)出現(xiàn)發(fā)送消息丟失的情況,因此大多數(shù)情況下我們無需考慮這種問題。
不過,在少數(shù)情況下,也會(huì)出現(xiàn)消息發(fā)送到MQ之后丟失的現(xiàn)象,比如:

  • MQ內(nèi)部處理消息的進(jìn)程發(fā)生了異常
  • 生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange
  • 生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue,因此無法路由

針對(duì)上述情況,RabbitMQ提供了生產(chǎn)者消息確認(rèn)機(jī)制,包括Publisher ConfirmPublisher Return兩種。在開啟確認(rèn)機(jī)制的情況下,當(dāng)生產(chǎn)者發(fā)送消息給MQ后,MQ會(huì)根據(jù)消息處理的情況返回不同的回執(zhí)。
具體如圖所示:
在這里插入圖片描述
總結(jié)如下:

  • 當(dāng)消息投遞到MQ,但是路由失敗時(shí),通過Publisher Return返回異常信息,同時(shí)返回ack的確認(rèn)信息,代表投遞成功
  • 臨時(shí)消息投遞到了MQ,并且入隊(duì)成功,返回ACK,告知投遞成功
  • 持久消息投遞到了MQ,并且入隊(duì)完成持久化,返回ACK ,告知投遞成功
  • 其它情況都會(huì)返回NACK,告知投遞失敗

其中acknack屬于Publisher Confirm機(jī)制,ack是投遞成功;nack是投遞失敗。而return則屬于Publisher Return機(jī)制。
默認(rèn)兩種機(jī)制都是關(guān)閉狀態(tài),需要通過配置文件來開啟。

1.實(shí)現(xiàn)生產(chǎn)者確認(rèn)機(jī)制

在publisher模塊的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 開啟publisher confirm機(jī)制,并設(shè)置confirm類型publisher-returns: true # 開啟publisher return機(jī)制
  • none:關(guān)閉confirm機(jī)制
  • simple:同步阻塞等待MQ的回執(zhí)
  • correlated:MQ異步回調(diào)返回回執(zhí)

一般我們推薦使用correlated,回調(diào)機(jī)制。

logging:pattern:dateformat: MM-dd HH:mm:ss:SSSlevel:com.weijisheng: debug
spring:rabbitmq:host: 192.168.30.140port: 5672virtual-host: /username: guestpassword: guest
#    connection-timeout: 1s
#    template:
#      retry:
#        enabled: true
#        multiplier: 2publisher-confirm-type: correlatedpublisher-returns: true

2.定義ReturnCallback

每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此我們可以在配置類中統(tǒng)一設(shè)置。我們?cè)趐ublisher模塊定義一個(gè)配置類:

@Slf4j
@AllArgsConstructor
@Configurationpublic class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("觸發(fā)return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

3.定義ConfirmCallback

由于每個(gè)消息發(fā)送時(shí)的處理邏輯不一定相同,因此ConfirmCallback需要在每次發(fā)消息時(shí)定義。具體來說,是在調(diào)用RabbitTemplate中的convertAndSend方法時(shí),多傳遞一個(gè)參數(shù):
在這里插入圖片描述
這里的CorrelationData中包含兩個(gè)核心的東西:

  • id:消息的唯一標(biāo)示,MQ對(duì)不同的消息的回執(zhí)以此做判斷,避免混淆
  • SettableListenableFuture:回執(zhí)結(jié)果的Future對(duì)象

將來MQ的回執(zhí)就會(huì)通過這個(gè)Future來返回,我們可以提前給CorrelationData中的Future添加回調(diào)函數(shù)來處理消息回執(zhí):
在這里插入圖片描述
我們新建一個(gè)測(cè)試,向系統(tǒng)自帶的交換機(jī)發(fā)送消息,并且添加ConfirmCallback

 @Testvoid testConfirmCallback() throws InterruptedException {// 1.創(chuàng)建cdCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {log.error("消息回調(diào)失敗", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.debug("收到confirm callback回執(zhí)");if(result.isAck()){// 消息發(fā)送成功log.debug("消息發(fā)送成功,收到ack");}else{// 消息發(fā)送失敗log.error("消息發(fā)送失敗,收到nack, 原因:{}", result.getReason());}}});rabbitTemplate.convertAndSend("hmall.direct", "weijisheng", "hello", cd);Thread.sleep(2000);}
}

在這里記錄一個(gè)小插曲

我的idea突然出現(xiàn)問題,不能運(yùn)行了報(bào)如下這個(gè)錯(cuò)誤
在這里插入圖片描述
然后我看了一下我的jdk版本是21我在網(wǎng)上找了一下發(fā)現(xiàn)網(wǎng)上jdk21的lombok版本要1.18.30,但是我的是1.18.26,我重新更改了一下maven配置就可以了。
在這里插入圖片描述
上面執(zhí)行過程執(zhí)行結(jié)果如下:
在這里插入圖片描述
可以看到,由于傳遞的RoutingKey是錯(cuò)誤的,路由失敗后,觸發(fā)了return callback,同時(shí)也收到了ack。
當(dāng)我們修改為正確的RoutingKey以后,就不會(huì)觸發(fā)return callback了,只收到ack。
而如果連交換機(jī)都是錯(cuò)誤的,則只會(huì)收到nack。

:::warning
注意
開啟生產(chǎn)者確認(rèn)比較消耗MQ性能,一般不建議開啟。而且大家思考一下觸發(fā)確認(rèn)的幾種情況:

  • 路由失敗:一般是因?yàn)镽outingKey錯(cuò)誤導(dǎo)致,往往是編程導(dǎo)致
  • 交換機(jī)名稱錯(cuò)誤:同樣是編程錯(cuò)誤導(dǎo)致
  • MQ內(nèi)部故障:這種需要處理,但概率往往較低。因此只有對(duì)消息可靠性要求非常高的業(yè)務(wù)才需要開啟,而且僅僅需要開啟ConfirmCallback處理nack就可以了。
    :::

四.MQ的可靠性

消息到達(dá)MQ以后,如果MQ不能及時(shí)保存,也會(huì)導(dǎo)致消息丟失,所以MQ的可靠性也非常重要。

1.數(shù)據(jù)持久化

為了提升性能,默認(rèn)情況下MQ的數(shù)據(jù)都是在內(nèi)存存儲(chǔ)的臨時(shí)數(shù)據(jù),重啟后就會(huì)消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:

  • 交換機(jī)持久化
  • 隊(duì)列持久化
  • 消息持久化

我們以控制臺(tái)界面為例來說明。

1.交換機(jī)持久化

在控制臺(tái)的Exchanges頁面,添加交換機(jī)時(shí)可以配置交換機(jī)的Durability參數(shù):
在這里插入圖片描述
設(shè)置為Durable就是持久化模式,Transient就是臨時(shí)模式。

2.隊(duì)列持久化

在控制臺(tái)的Queues頁面,添加隊(duì)列時(shí),同樣可以配置隊(duì)列的Durability參數(shù):
在這里插入圖片描述

3.消息持久化

在控制臺(tái)發(fā)送消息的時(shí)候,可以添加很多參數(shù),而消息的持久化是要配置一個(gè)properties
在這里插入圖片描述
1.注意消息非持久化發(fā)送到rabbitmq的時(shí)候如果大數(shù)據(jù)量會(huì)出現(xiàn)page out,在page out的時(shí)候消息發(fā)送速度會(huì)降低到零點(diǎn)
在這里插入圖片描述
2.當(dāng)消息持久化到rabbitmq的時(shí)候不會(huì)出現(xiàn)page out ,速度不會(huì)降低到零點(diǎn)一段時(shí)間的情況
在這里插入圖片描述

 @Testvoid testPageOut() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();for (int i = 0; i < 1000000; i++) {rabbitTemplate.convertAndSend("simple.queue", message);}}

:::warning
說明:在開啟持久化機(jī)制以后,如果同時(shí)還開啟了生產(chǎn)者確認(rèn),那么MQ會(huì)在消息持久化以后才發(fā)送ACK回執(zhí),進(jìn)一步確保消息的可靠性。
不過出于性能考慮,為了減少IO次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫的,而是每隔一段時(shí)間批量持久化。一般間隔在100毫秒左右,這就會(huì)導(dǎo)致ACK有一定的延遲,因此建議生產(chǎn)者確認(rèn)全部采用異步方式。
:::

2.LazyQueue

在默認(rèn)情況下,RabbitMQ會(huì)將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會(huì)導(dǎo)致消息積壓,比如:

  • 消費(fèi)者宕機(jī)或出現(xiàn)網(wǎng)絡(luò)故障
  • 消息發(fā)送量激增,超過了消費(fèi)者處理速度
  • 消費(fèi)者處理業(yè)務(wù)發(fā)生阻塞

一旦出現(xiàn)消息堆積問題,RabbitMQ的內(nèi)存占用就會(huì)越來越高,直到觸發(fā)內(nèi)存預(yù)警上限。此時(shí)RabbitMQ會(huì)將內(nèi)存消息刷到磁盤上,這個(gè)行為成為PageOut. PageOut會(huì)耗費(fèi)一段時(shí)間,并且會(huì)阻塞隊(duì)列進(jìn)程。因此在這個(gè)過程中RabbitMQ不會(huì)再處理新的消息,生產(chǎn)者的所有請(qǐng)求都會(huì)被阻塞。

為了解決這個(gè)問題,從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的模式,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:

  • 接收到消息后直接存入磁盤而非內(nèi)存
  • 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存(也就是懶加載)
  • 支持?jǐn)?shù)百萬條的消息存儲(chǔ)

而在3.12版本之后,LazyQueue已經(jīng)成為所有隊(duì)列的默認(rèn)格式。因此官方推薦升級(jí)MQ為3.12版本或者所有隊(duì)列都設(shè)置為L(zhǎng)azyQueue模式。

1.控制臺(tái)配置Lazy模式

在添加隊(duì)列的時(shí)候,添加x-queue-mod=lazy參數(shù)即可設(shè)置隊(duì)列為L(zhǎng)azy模式:
在這里插入圖片描述

2.代碼配置Lazy模式

在利用SpringAMQP聲明隊(duì)列的時(shí)候,添加x-queue-mod=lazy參數(shù)也可設(shè)置隊(duì)列為L(zhǎng)azy模式:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 開啟Lazy模式.build();
}

這里是通過QueueBuilderlazy()函數(shù)配置Lazy模式,底層源碼如下:
在這里插入圖片描述
當(dāng)然,我們也可以基于注解來聲明隊(duì)列并設(shè)置為L(zhǎng)azy模式:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

3.更新已有隊(duì)列為lazy模式

對(duì)于已經(jīng)存在的隊(duì)列,也可以配置為lazy模式,但是要通過設(shè)置policy實(shí)現(xiàn)。
可以基于命令行設(shè)置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
命令解讀:- `rabbitmqctl` :RabbitMQ的命令行工具
- `set_policy` :添加一個(gè)策略
- `Lazy` :策略名稱,可以自定義
- `"^lazy-queue$"` :用正則表達(dá)式匹配隊(duì)列的名字
- `'{"queue-mode":"lazy"}'` :設(shè)置隊(duì)列模式為lazy模式
- `--apply-to queues`:策略的作用對(duì)象,是所有的隊(duì)列當(dāng)然,也可以在控制臺(tái)配置policy,進(jìn)入在控制臺(tái)的`Admin`頁面,點(diǎn)擊`Policies`,即可添加配置:

在這里插入圖片描述
lazy隊(duì)列在100W條件下是最好的性能
在這里插入圖片描述

總結(jié)

RabbitMQ如何保證消息的可靠性

  • 首先通過配置可以讓交換機(jī)、隊(duì)列、以及發(fā)送的消息都持久化。這樣隊(duì)列中的消息會(huì)持久化到磁盤,MQ重啟消息依然存在。
  • RabbitMQ在3.6版本引入了LazzyQueue,并且在3.12版本后會(huì)成為隊(duì)列的默認(rèn)模式。LazyQueue會(huì)將所有消息都持久化。
  • 開啟持久化和生產(chǎn)者確認(rèn)時(shí),RabbitMQ只有在消息持久化完成后才會(huì)給生產(chǎn)者返回ACK回執(zhí)。

四.消費(fèi)者的可靠性

當(dāng)RabbitMQ向消費(fèi)者投遞消息以后,需要知道消費(fèi)者的處理狀態(tài)如何。因?yàn)橄⑼哆f給消費(fèi)者并不代表就一定被正確消費(fèi)了,可能出現(xiàn)的故障有很多,比如:

  • 消息投遞的過程中出現(xiàn)了網(wǎng)絡(luò)故障
  • 消費(fèi)者接收到消息后突然宕機(jī)
  • 消費(fèi)者接收到消息后,因處理不當(dāng)導(dǎo)致異常

一旦發(fā)生上述情況,消息也會(huì)丟失。因此,RabbitMQ必須知道消費(fèi)者的處理狀態(tài),一旦消息處理失敗才能重新投遞消息。
但問題來了:RabbitMQ如何得知消費(fèi)者的處理狀態(tài)呢?

本章我們就一起研究一下消費(fèi)者處理消息時(shí)的可靠性解決方案。

1.消費(fèi)者確認(rèn)機(jī)制

為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)。即:當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個(gè)回執(zhí),告知RabbitMQ自己消息處理狀態(tài)?;貓?zhí)有三種可選值:

  • ack:成功處理消息,RabbitMQ從隊(duì)列中刪除該消息
  • nack:消息處理失敗,RabbitMQ需要再次投遞消息
  • reject:消息處理失敗并拒絕該消息,RabbitMQ從隊(duì)列中刪除該消息
    在這里插入圖片描述

一般reject方式用的較少,除非是消息格式有問題,那就是開發(fā)問題了。因此大多數(shù)情況下我們需要將消息處理的代碼通過try catch機(jī)制捕獲,消息處理成功時(shí)返回ack,處理失敗時(shí)返回nack.

由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實(shí)現(xiàn)了消息確認(rèn)。并允許我們通過配置文件設(shè)置ACK處理方式,有三種模式:

  • **none**:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用
  • **manual**:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ackreject,存在業(yè)務(wù)入侵,但更靈活
  • **auto**:自動(dòng)模式。SpringAMQP利用AOP對(duì)我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
    • 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack
    • 如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject;

返回Reject的常見異常有:

Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:- o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
- o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
- o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
- o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.
- java.lang.NoSuchMethodException: Added in version 1.6.3.
- java.lang.ClassCastException: Added in version 1.6.3.

通過下面的配置可以修改SpringAMQP的ACK處理方式:
consumer

 @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消費(fèi)者接收到消息:【" + msg + "】");throw new RuntimeException("故意的");}

product

 @Testpublic void testSimpleQueue() {// 隊(duì)列名稱String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 發(fā)送消息rabbitTemplate.convertAndSend(queueName, message);}
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做處理

測(cè)試可以發(fā)現(xiàn):當(dāng)消息處理發(fā)生異常時(shí),消息依然被RabbitMQ刪除了。

斷點(diǎn)還停留在這里,但是消息已經(jīng)被消費(fèi)了
在這里插入圖片描述
在這里插入圖片描述

我們?cè)俅伟汛_認(rèn)機(jī)制修改為auto:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自動(dòng)ack

在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked(未確定狀態(tài)):
在這里插入圖片描述
放行以后,消息處理失敗后,會(huì)回到RabbitMQ,并重新投遞到消費(fèi)者。

2.失敗重試機(jī)制

當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者。如果消費(fèi)者再次執(zhí)行依然出錯(cuò),消息會(huì)再次requeue到隊(duì)列,再次投遞,直到消息處理成功為止。
極端情況就是消費(fèi)者一直無法執(zhí)行成功,那么消息requeue就會(huì)無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:
在這里插入圖片描述
當(dāng)然,上述極端情況發(fā)生的概率還是非常低的,不過不怕一萬就怕萬一。為了應(yīng)對(duì)上述情況Spring又提供了消費(fèi)者失敗重試機(jī)制:在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無限制的requeue到mq隊(duì)列。

修改consumer服務(wù)的application.yml文件,添加內(nèi)容:

spring:rabbitmq:listener:simple:retry:enabled: true # 開啟消費(fèi)者失敗重試initial-interval: 1000ms # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-intervalmax-attempts: 3 # 最大重試次數(shù)stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

重啟consumer服務(wù),重復(fù)之前的測(cè)試。可以發(fā)現(xiàn):

  • 消費(fèi)者在失敗后消息沒有重新回到MQ無限重新投遞,而是在本地重試了3次
  • 本地重試3次以后,拋出了AmqpRejectAndDontRequeueException異常。查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是reject

結(jié)論:

  • 開啟本地重試時(shí),消息處理過程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
  • 重試達(dá)到最大次數(shù)后,Spring會(huì)返回reject,消息會(huì)被丟棄
    在這里插入圖片描述

失敗處理策略

在之前的測(cè)試中,本地測(cè)試達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄。這在某些對(duì)于消息可靠性要求較高的業(yè)務(wù)場(chǎng)景下,顯然不太合適了。
因此Spring允許我們自定義重試次數(shù)耗盡后的消息處理策略,這個(gè)策略是由MessageRecovery接口來定義的,它有3個(gè)不同實(shí)現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。

在這里插入圖片描述

1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代碼如下:


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ErrorConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

錯(cuò)誤的信息被存儲(chǔ)到mq中的錯(cuò)誤隊(duì)列中
**在這里插入圖片描述**

總結(jié)

消費(fèi)者如何保證消息一定被消費(fèi)?

  • 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后返回ack,異常時(shí)返回nack
  • 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理。

3.業(yè)務(wù)冪等性

何為冪等性?
冪等是一個(gè)數(shù)學(xué)概念,用函數(shù)表達(dá)式來描述是這樣的:f(x) = f(f(x)),例如求絕對(duì)值函數(shù)。
在程序開發(fā)中,則是指同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對(duì)業(yè)務(wù)狀態(tài)的影響是一致的。例如:

  • 根據(jù)id刪除數(shù)據(jù)
  • 查詢數(shù)據(jù)

但數(shù)據(jù)的更新往往不是冪等的,如果重復(fù)執(zhí)行可能造成不一樣的后果。比如:

  • 取消訂單,恢復(fù)庫存的業(yè)務(wù)。如果多次恢復(fù)就會(huì)出現(xiàn)庫存重復(fù)增加的情況
  • 退款業(yè)務(wù)。重復(fù)退款對(duì)商家而言會(huì)有經(jīng)濟(jì)損失。

所以,我們要盡可能避免業(yè)務(wù)被重復(fù)執(zhí)行。
然而在實(shí)際業(yè)務(wù)場(chǎng)景中,由于意外經(jīng)常會(huì)出現(xiàn)業(yè)務(wù)被重復(fù)執(zhí)行的情況,例如:

  • 頁面卡頓時(shí)頻繁刷新導(dǎo)致表單重復(fù)提交
  • 服務(wù)間調(diào)用的重試
  • MQ消息的重復(fù)投遞

我們?cè)谟脩糁Ц冻晒髸?huì)發(fā)送MQ消息到交易服務(wù),修改訂單狀態(tài)為已支付,就可能出現(xiàn)消息重復(fù)投遞的情況。如果消費(fèi)者不做判斷,很有可能導(dǎo)致消息被消費(fèi)多次,出現(xiàn)業(yè)務(wù)故障。
舉例:

  1. 假如用戶剛剛支付完成,并且投遞消息到交易服務(wù),交易服務(wù)更改訂單為已支付狀態(tài)。
  2. 由于某種原因,例如網(wǎng)絡(luò)故障導(dǎo)致生產(chǎn)者沒有得到確認(rèn),隔了一段時(shí)間后重新投遞給交易服務(wù)。
  3. 但是,在新投遞的消息被消費(fèi)之前,用戶選擇了退款,將訂單狀態(tài)改為了已退款狀態(tài)。
  4. 退款完成后,新投遞的消息才被消費(fèi),那么訂單狀態(tài)會(huì)被再次改為已支付。業(yè)務(wù)異常。

因此,我們必須想辦法保證消息處理的冪等性。這里給出兩種方案:

  • 唯一消息ID
  • 業(yè)務(wù)狀態(tài)判斷

1.唯一消息ID

這個(gè)思路非常簡(jiǎn)單:

  1. 每一條消息都生成一個(gè)唯一的id,與消息一起投遞給消費(fèi)者。
  2. 消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息ID保存到數(shù)據(jù)庫
  3. 如果下次又收到相同消息,去數(shù)據(jù)庫查詢判斷是否存在,存在則為重復(fù)消息放棄處理。

我們?cè)撊绾谓o消息添加唯一ID呢?
其實(shí)很簡(jiǎn)單,SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開啟這個(gè)功能即可。
以Jackson的消息轉(zhuǎn)換器為例:

@Bean
public MessageConverter messageConverter(){// 1.定義消息轉(zhuǎn)換器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息jjmc.setCreateMessageIds(true);return jjmc;
}

在這里插入圖片描述

2.業(yè)務(wù)狀態(tài)判斷

業(yè)務(wù)判斷就是基于業(yè)務(wù)本身的邏輯或狀態(tài)來判斷是否是重復(fù)的請(qǐng)求或消息,不同的業(yè)務(wù)場(chǎng)景判斷的思路也不一樣。
例如我們當(dāng)前案例中,處理消息的業(yè)務(wù)邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務(wù)時(shí)判斷訂單狀態(tài)是否是未支付,如果不是則證明訂單已經(jīng)被處理過,無需重復(fù)處理。

相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫,所以我更推薦使用業(yè)務(wù)判斷的方案。

以支付修改訂單的業(yè)務(wù)為例,我們需要修改OrderServiceImpl中的markOrderPaySuccess方法:

  @Overridepublic void markOrderPaySuccess(Long orderId) {// 1.查詢訂單Order old = getById(orderId);// 2.判斷訂單狀態(tài)if (old == null || old.getStatus() != 1) {// 訂單不存在或者訂單狀態(tài)不是1,放棄處理return;}// 3.嘗試更新訂單Order order = new Order();order.setId(orderId);order.setStatus(2);order.setPayTime(LocalDateTime.now());updateById(order);}

上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動(dòng)作,因此在極小概率下可能存在線程安全問題。

我們可以合并上述操作為這樣:
在這里插入圖片描述

@Override
public void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();
}

注意看,上述代碼等同于這樣的SQL語句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我們?cè)趙here條件中除了判斷id以外,還加上了status必須為1的條件。如果條件不符(說明訂單已支付),則SQL匹配不到數(shù)據(jù),根本不會(huì)執(zhí)行。

總結(jié)

如何保證支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性?

  • 首先,支付服務(wù)會(huì)正在用戶支付成功以后利用MQ消息通知交易服務(wù),完成訂單狀態(tài)同步。
  • 其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費(fèi)者確認(rèn)、消費(fèi)者失敗重試等策略,確保消息投遞和處理的可靠性。同時(shí)也開啟了MQ的持久化,避免因服務(wù)宕機(jī)導(dǎo)致消息丟失。
  • 最后,在交易服務(wù)更新訂單狀態(tài)時(shí)做了業(yè)務(wù)冪等判斷,避免因消息重復(fù)消費(fèi)導(dǎo)致訂單狀態(tài)異常。

如果交易服務(wù)消息處理失敗,有沒有什么兜底方法?

  • 可以在交易服務(wù)設(shè)置定時(shí)任務(wù),定期查詢訂單支付狀態(tài)。這樣即使MQ通知失敗,還可以利用定時(shí)任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性。
http://www.risenshineclean.com/news/40497.html

相關(guān)文章:

  • 做網(wǎng)站可以用什么語言長(zhǎng)春網(wǎng)站優(yōu)化平臺(tái)
  • 購物網(wǎng)站建設(shè)公司網(wǎng)絡(luò)營(yíng)銷管理名詞解釋
  • wordpress國(guó)內(nèi)支付上海搜索優(yōu)化推廣
  • html可以做動(dòng)態(tài)網(wǎng)站嗎我是seo關(guān)鍵詞
  • 應(yīng)用商城下載seo服務(wù)是什么
  • 怎么用文件做網(wǎng)站快速優(yōu)化網(wǎng)站排名軟件
  • 淄博桓臺(tái)網(wǎng)站建設(shè)報(bào)價(jià)湘潭關(guān)鍵詞優(yōu)化公司
  • erp系統(tǒng)軟件免費(fèi)版優(yōu)化營(yíng)商環(huán)境心得體會(huì)2023
  • 番禺做網(wǎng)站公司哪家好網(wǎng)店推廣的方式
  • 一般做網(wǎng)站淘寶推廣軟件
  • 德州網(wǎng)站建設(shè)公司seo引擎優(yōu)化方案
  • 什么學(xué)做網(wǎng)站深圳網(wǎng)絡(luò)營(yíng)銷推廣外包
  • php網(wǎng)站搬家軟件成都疫情最新消息
  • 有沒有做美食的網(wǎng)站網(wǎng)絡(luò)營(yíng)銷的認(rèn)識(shí)與理解
  • 阿里云空間部署網(wǎng)站微信推廣加人
  • 設(shè)計(jì)網(wǎng)站推薦素材網(wǎng)站怎么去做推廣
  • 青島網(wǎng)站建設(shè)優(yōu)化長(zhǎng)沙官網(wǎng)seo推廣
  • 游戲推廣群seo網(wǎng)絡(luò)優(yōu)化招聘信息
  • sql數(shù)據(jù)庫查詢網(wǎng)站模板搜索引擎優(yōu)化的要點(diǎn)
  • 做網(wǎng)站必須得ipc百度熱搜榜今日頭條排名
  • 日照建網(wǎng)站廣告公司收費(fèi)價(jià)格表
  • 網(wǎng)站怎么弄二維碼服務(wù)營(yíng)銷理論
  • 官網(wǎng)網(wǎng)站建設(shè)收費(fèi)公司網(wǎng)站設(shè)計(jì)模板
  • 海南做網(wǎng)站請(qǐng)輸入搜索關(guān)鍵詞
  • 網(wǎng)頁版夢(mèng)幻西游火眼金睛seo人才招聘
  • 南京seo排名收費(fèi)廣州網(wǎng)站優(yōu)化軟件
  • 做網(wǎng)站的任務(wù)書淄博seo培訓(xùn)
  • 湖州做網(wǎng)站建設(shè)的公司女教師遭網(wǎng)課入侵直播錄屏曝光se
  • java做網(wǎng)站用什么軟件新聞報(bào)道最新消息今天
  • 庫車建設(shè)工程信息網(wǎng)站seo查詢愛站網(wǎng)