鄭州公司網(wǎng)站設(shè)計(jì)宣傳產(chǎn)品的方式
生產(chǎn)者將信道設(shè)置成 confirm 模式,一旦信道進(jìn)入 confirm 模式, 所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的 ID (從 1 開始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫入磁盤之后發(fā)出,broker 回傳給生產(chǎn)者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號(hào),此外 broker 也可以設(shè)置basic.ack 的 multiple 域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理。
單個(gè)確認(rèn)發(fā)布 ?
這是一種簡(jiǎn)單的確認(rèn)方式,它是一種 同步確認(rèn)發(fā)布 的方式,也就是發(fā)布一個(gè)消息之后只有它
被確認(rèn)發(fā)布,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long)這個(gè)方法只有在消息被確認(rèn)
的時(shí)候才返回,如果在指定時(shí)間范圍內(nèi)這個(gè)消息沒(méi)有被確認(rèn)那么它將拋出異常。
這種確認(rèn)方式有一個(gè)最大的缺點(diǎn)就是: 發(fā)布速度特別的慢, 因?yàn)槿绻麤](méi)有確認(rèn)發(fā)布的消息就會(huì)
阻塞所有后續(xù)消息的發(fā)布,這種方式最多提供每秒不超過(guò)數(shù)百條發(fā)布消息的吞吐量。當(dāng)然對(duì)于某
些應(yīng)用程序來(lái)說(shuō)這可能已經(jīng)足夠了。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;public class publishMessageIndividually {private static final int MESSAGE_COUNT = 5;public static void publishMessageIndividually() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發(fā)布確認(rèn)channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//服務(wù)端返回 false 或超時(shí)時(shí)間內(nèi)未返回,生產(chǎn)者可以消息重發(fā)boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息發(fā)送成功");}}long end = System.currentTimeMillis();System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)單獨(dú)確認(rèn)消息,耗時(shí)" + (end - begin) +"ms");}}
}
耗時(shí)
?批量確認(rèn)發(fā)布
上面那種方式非常慢,與單個(gè)等待確認(rèn)消息相比,先發(fā)布一批消息然后一起確認(rèn)可以極大地
提高吞吐量,當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問(wèn)題時(shí),不知道是哪個(gè)消息出現(xiàn)
問(wèn)題了,我們必須將整個(gè)批處理保存在內(nèi)存中,以記錄重要的信息而后重新發(fā)布消息。當(dāng)然這種
方案仍然是同步的,也一樣阻塞消息的發(fā)布。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;public class publishMessageBatch {private static final int MESSAGE_COUNT = 5;public static void publishMessageBatch() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發(fā)布確認(rèn)channel.confirmSelect();//批量確認(rèn)消息大小int batchSize = 100;//未確認(rèn)消息個(gè)數(shù)int outstandingMessageCount = 0;long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {channel.waitForConfirms();outstandingMessageCount = 0;}}//為了確保還有剩余沒(méi)有確認(rèn)消息 再次確認(rèn)if (outstandingMessageCount > 0) {channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)批量確認(rèn)消息,耗時(shí)" + (end - begin) +"ms");}}public static void main(String[] args) throws Exception {publishMessageBatch.publishMessageBatch();}
}
?耗時(shí)
?異步確認(rèn)發(fā)布
異步確認(rèn)雖然編程邏輯比上兩個(gè)要復(fù)雜,但是性價(jià)比最高,無(wú)論是可靠性還是效率都沒(méi)得說(shuō),
他是利用回調(diào)函數(shù)來(lái)達(dá)到消息可靠性傳遞的,這個(gè)中間件也是通過(guò)函數(shù)回調(diào)來(lái)保證是否投遞成功,
下面就讓我們來(lái)詳細(xì)講解異步確認(rèn)是怎么實(shí)現(xiàn)的。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;public class publishMessageAsync {private static final int MESSAGE_COUNT = 5;public static void publishMessageAsync() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發(fā)布確認(rèn)channel.confirmSelect();/*** 線程安全有序的一個(gè)哈希表,適用于高并發(fā)的情況* 1.輕松的將序號(hào)與消息進(jìn)行關(guān)聯(lián)* 2.輕松批量刪除條目 只要給到序列號(hào)* 3.支持并發(fā)訪問(wèn)*/ConcurrentSkipListMap<Long, String> outstandingConfirms = newConcurrentSkipListMap<>();/*** 確認(rèn)收到消息的一個(gè)回調(diào)* 1.消息序列號(hào)* 2.true 可以確認(rèn)小于等于當(dāng)前序列號(hào)的消息* false 確認(rèn)當(dāng)前序列號(hào)消息*/ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {if (multiple) {//返回的是小于等于當(dāng)前序列號(hào)的未確認(rèn)消息集合 是一個(gè) mapConcurrentNavigableMap<Long, String> confirmed =outstandingConfirms.headMap(sequenceNumber, true);//清除該部分未確認(rèn)消息集合confirmed.clear();}else{//只清除當(dāng)前序列號(hào)的消息outstandingConfirms.remove(sequenceNumber);}};ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {String message = outstandingConfirms.get(sequenceNumber);System.out.println("發(fā)布的消息"+message+"未被確認(rèn),序列號(hào)"+sequenceNumber);};/*** 添加一個(gè)異步確認(rèn)的監(jiān)聽器* 1.確認(rèn)收到消息的回調(diào)* 2.未收到消息的回調(diào)*/channel.addConfirmListener(ackCallback, nackCallback);long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;/*** channel.getNextPublishSeqNo()獲取下一個(gè)消息的序列號(hào)* 通過(guò)序列號(hào)與消息體進(jìn)行一個(gè)關(guān)聯(lián)* 全部都是未確認(rèn)的消息體*/outstandingConfirms.put(channel.getNextPublishSeqNo(), message);channel.basicPublish("", queueName, null, message.getBytes());}long end = System.currentTimeMillis();System.out.println("發(fā)布" + MESSAGE_COUNT + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) +"ms");}}public static void main(String[] args) throws Exception {publishMessageAsync.publishMessageAsync();}
}
耗時(shí)
?以上 3 種發(fā)布確認(rèn)速度對(duì)比
單獨(dú)發(fā)布消息
同步等待確認(rèn),簡(jiǎn)單,但吞吐量非常有限。
批量發(fā)布消息
批量同步等待確認(rèn),簡(jiǎn)單,合理的吞吐量,一旦出現(xiàn)問(wèn)題但很難推斷出是那條
消息出現(xiàn)了問(wèn)題。
異步處理: 最佳性能和資源使用,在出現(xiàn)錯(cuò)誤的情況下可以很好地控制,但是實(shí)現(xiàn)起來(lái)稍微難些