網(wǎng)站 線框圖seo搜論壇
消息隊(duì)列中的可靠性主要是分為三部分:
- 消息不丟失:確保消息從生產(chǎn)者發(fā)送到消費(fèi)者消息不丟失
- 消息不重復(fù):確保消息不被重復(fù)消費(fèi)
- 消息順序性:確保消費(fèi)的順序性
解決方案主要有以下幾部分:
- 消息不丟失
-
- 生產(chǎn)者確認(rèn)機(jī)制
- 持久化機(jī)制
- 消費(fèi)者確認(rèn)機(jī)制
- 高可用
- 消息不重復(fù):
-
- 消費(fèi)者確認(rèn)機(jī)制
- 消息重試機(jī)制
- 冪等性設(shè)計(jì)
- 消息順序性
-
- 單消費(fèi)者模式
- 消息編號(hào)
生產(chǎn)者確認(rèn)機(jī)制
作用
確保生產(chǎn)者發(fā)送的消息成功到達(dá)mq,避免消息在傳輸過程中丟失
實(shí)現(xiàn)原理
- 生產(chǎn)者會(huì)發(fā)送消息之后,mq會(huì)向生產(chǎn)者返回確認(rèn)(ack)或未確認(rèn)(nack)信號(hào)
- 如果生產(chǎn)者收到ack,說明消息已經(jīng)成功發(fā)送
配置方式
- 開啟生產(chǎn)者確認(rèn)模式
Channel channel = connection.createChannel();
channel.confirmSelect(); // 開啟確認(rèn)模式
- 處理確認(rèn)和未確認(rèn)
channel.addConfirmListener((sequenceNumber, multiple) -> {// 消息確認(rèn)System.out.println("Message confirmed: " + sequenceNumber);
}, (sequenceNumber, multiple) -> {// 消息未確認(rèn)System.out.println("Message not confirmed: " + sequenceNumber);
});
適用場景
對消息可靠性要求高的場景,如金融交易、訂單處理等
持久化機(jī)制
作用
將消息和隊(duì)列持久化到磁盤,防止mq重啟或者崩潰時(shí)消息丟失
實(shí)現(xiàn)原理
- 隊(duì)列持久化:將隊(duì)列中的數(shù)據(jù)保存在磁盤
- 消息持久化:將消息內(nèi)容保存到磁盤
配置方式
- 隊(duì)列持久化
boolean durable = true;
channel.queueDeclare("my_queue", durable, false, false, null);
- 消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2 表示持久化消息.build();
channel.basicPublish("", "my_queue", properties, message.getBytes());
適用場景
需要保證消息不丟失的場景,如日志存儲(chǔ)、重要數(shù)據(jù)同步
消費(fèi)者確認(rèn)機(jī)制
作用
確保消費(fèi)者成功處理消息之后,mq才會(huì)將消息從消息隊(duì)列中刪除,避免消息丟失
實(shí)現(xiàn)原理
- 消費(fèi)者處理完成消息后,手動(dòng)發(fā)送ack信號(hào)
- 如果消費(fèi)者未發(fā)送ack,mq會(huì)將消息重新投遞給其他消費(fèi)者
配置方式
- 開啟消費(fèi)者確認(rèn)
boolean autoAck = false; // 關(guān)閉自動(dòng)確認(rèn)
channel.basicConsume("my_queue", autoAck, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 處理消息System.out.println("Received: " + new String(body));// 手動(dòng)發(fā)送 ackchannel.basicAck(envelope.getDeliveryTag(), false);}
});
適用場景
需要確保消息成功處理的場景,如訂單處理、消息通知等
死信隊(duì)列
作用
處理無法正常消費(fèi)的消息(被拒絕或者過期),避免消息丟失
實(shí)現(xiàn)原理
- 當(dāng)消息無法被正常消費(fèi)時(shí),mq將其轉(zhuǎn)發(fā)到死信隊(duì)列中
- 死信隊(duì)列可以配置獨(dú)立的交換機(jī)和路由規(guī)則
配置方式
- 配置死信隊(duì)列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "my_dlx_exchange"); // 設(shè)置死信交換機(jī)
args.put("x-dead-letter-routing-key", "my_dlx_routing_key"); // 設(shè)置死信路由鍵
channel.queueDeclare("my_queue", true, false, false, args);
適用場景
需要處理異常消息的場景,如消息重試,失敗信息分析等
消息重試機(jī)制
作用
當(dāng)消息處理失敗的時(shí)候,通過重試機(jī)制重新投遞消息,確保消息最終被成功處理
實(shí)現(xiàn)原理
- 消費(fèi)者捕獲異常,并拒絕消息,同時(shí)設(shè)置requeue=true
- 使用死信隊(duì)列和TTL實(shí)現(xiàn)延遲重試
配置方式
- 拒絕消息,重新入隊(duì)
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {// 處理消息System.out.println("Received: " + new String(body));channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {// 處理失敗,拒絕消息并重新入隊(duì)channel.basicNack(envelope.getDeliveryTag(), false, true);}}
});
適用場景
需要重試機(jī)制的場景,如網(wǎng)絡(luò)抖動(dòng)、依賴服務(wù)不可用等
高可用性
作用
通過集群和經(jīng)銷隊(duì)里額,確保mq在節(jié)點(diǎn)出現(xiàn)故障仍然可以要運(yùn)行,避免消息丟失。
鏡像隊(duì)列結(jié)構(gòu)是一主多從,所有操作都是主節(jié)點(diǎn)完成,然后同步給鏡像節(jié) 點(diǎn),如果主節(jié)點(diǎn)宕機(jī)后,鏡像節(jié)點(diǎn)會(huì)替代成新的主節(jié)點(diǎn)
實(shí)現(xiàn)原理
- 集群:多個(gè)mq節(jié)點(diǎn)組成的集群,共享數(shù)據(jù)和隊(duì)列狀態(tài)
- 鏡像隊(duì)列:將隊(duì)列鏡像到多個(gè)節(jié)點(diǎn),確保隊(duì)列的高可用性
配置方式
- 集群配置
rabbitmqctl join_cluster rabbit@node1
- 鏡像隊(duì)列配置
Map<String, Object> args = new HashMap<>();
args.put("x-ha-policy", "all"); // 鏡像到所有節(jié)點(diǎn)
channel.queueDeclare("my_queue", true, false, false, args);
適用場景
適用于高可用的場景,如分布式系統(tǒng)、關(guān)鍵業(yè)務(wù)系統(tǒng)等
高可用性數(shù)據(jù)丟失怎么解決
在鏡像隊(duì)列主從同步的過程中,會(huì)出現(xiàn)在主從同步完成前,主節(jié)點(diǎn)就已經(jīng)宕機(jī),可能出現(xiàn)數(shù)據(jù)丟失。
我們可以通過仲裁隊(duì)列來進(jìn)行解決,和鏡像隊(duì)列一樣,都是主從模式,支持主從數(shù)據(jù)同步但是不一樣的點(diǎn)就是主從同步基于Raft協(xié)議,強(qiáng)一致性
并且使用起來也非常簡單,不需要額外的配置,在聲明隊(duì)列的時(shí)候只要指定 這個(gè)是仲裁隊(duì)列即可