微信h5免費制作網(wǎng)站seo優(yōu)化與推廣招聘
簡介
MQ(message queue),從字面意思上看就個 FIFO 先入先出的隊列,只不過隊列中存放的內(nèi)容是 message 而已,它是一種具有接收數(shù)據(jù)、存儲數(shù)據(jù)、發(fā)送數(shù)據(jù)等功能的技術服務。
作用:流量削峰、應用解耦、異步處理。
生產(chǎn)者將消息發(fā)送到消息隊列中,消息隊列負責轉發(fā)消息給消費者,消費者在處理完消息后會對消息隊列進行應答,消息隊列收到應答信息會將相應的消息進行丟棄。
批量應答會導致高并發(fā)時消息的丟失,所以盡力以channel.ack()進行手動應答。
docker安裝
- 拉取鏡像并后臺運行
docker run -id --name=rabbitmq -v rabbitmq-home:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=yi -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq
需要將RABBITMQ_DEFAULT_USER、RABBITMQ_DEFAULT_PASS改成自己的用戶名、密碼。
- 開啟manager插件,可以在網(wǎng)頁進行管理。
docker exec -it 容器id /bin/bash #這里可以用docker ps 查詢剛剛開啟的容器id#進入容器后輸入,開啟rabbitmq-plugins enable rabbitmq_management
可以登錄 http://服務器IP:15672 訪問web管理界面,訪問成功則代表開啟成功。
JAVA環(huán)境搭建
jar包:
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version></dependency></dependencies>
Helloworld實例
生產(chǎn)者
public static void main(String[] args) throws IOException, TimeoutException {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("yi");connectionFactory.setPassword("123456");//獲取連接Connection connection = connectionFactory.newConnection();//獲取信道,一個連接中有多個信道Channel channel = connection.createChannel();//聲明一個隊列 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsAMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world";//(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("發(fā)送成功");}
消費者
public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("yi");connectionFactory.setPassword("123456");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println(new String(message.getBody()));};CancelCallback cancelCallback=(String var1)->{System.out.println("消息消費被中斷");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
工作隊列(任務隊列)
RabbitMQ默認為工作隊列模式,消費者C1,C2為競爭關系,接收到的消息將輪詢發(fā)送給C1,C2處理,即C1一條C2一條依次循環(huán)。
手動應答ack
因為自動應答不會考慮消息是否處理成功,所以可能會導致消息丟失,需要在代碼中將自動應答改為手動應答。批量應答在高并發(fā)的時候也容易丟失消息,也應該關閉。
生產(chǎn)者的代碼無需修改。
消費者:
public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("work2 waiting:");DeliverCallback deliverCallback= (String s, Delivery delivery)->{System.out.println(new String(delivery.getBody()));// do something//手動回復ack,false為關閉批量應答channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(s)->{System.out.println("消息被打斷");};//false表示不自動應答ackchannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
不公平分發(fā)
會存在有些線程能力差耗時長,有些能力強耗時短的情況,不公平分發(fā)將實現(xiàn)能者多勞。
設立channel的basicQos即可實現(xiàn)不公平分發(fā), basicQos的數(shù)值意味著channel的最大存儲上限,channel為1時,消費者最多同時緩存一條待處理消息。
channel.basicQos(1);
發(fā)布確認
在開啟隊列持久化、消息持久化后,RabbitMQ服務器仍然可能在將消息存儲在磁盤前宕機,需要發(fā)布確認才能保證消息不丟失,即RabbitMQ在存儲磁盤成功后,發(fā)送確認給生產(chǎn)者。
單個發(fā)布確認
每條消息存儲在磁盤后進行發(fā)布確認,只有發(fā)送者在接收到消費者對應的發(fā)布確認消息后才會給此消費者發(fā)送下一條消息。
public static void publicMsgIndividual()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//開啟持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//開啟發(fā)布確認long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes()); boolean flag = channel.waitForConfirms(); //等待發(fā)布確認if(flag){System.out.println("消息發(fā)送成功");}}long end = System.currentTimeMillis();System.out.println("發(fā)布1000條耗時:"+(end-begin)+"ms");}
批量發(fā)布確認
每發(fā)送100條消息進行一次發(fā)布確認。速度快,但是不知道具體是哪一條消息發(fā)送失敗了。
public static void publicMsgIndividual()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//開啟持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//開啟發(fā)布確認long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());if(i%100==0){boolean flag = channel.waitForConfirms(); //等待發(fā)布確認if(flag){System.out.println("消息發(fā)送成功");}}}long end = System.currentTimeMillis();System.out.println("發(fā)布1000條耗時:"+(end-begin)+"ms");}
異步發(fā)布確認
推薦使用,需要加入確認發(fā)布監(jiān)聽器confirmListener,并且記錄序列號與消息的關聯(lián)(ConcurrentSkipListMap)。
public static void publicMsgAsync()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//開啟持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//開啟發(fā)布確認// 將序列號與信息相關聯(lián),ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap<Long,String>();//加入確認監(jiān)聽器channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long msgTag, boolean multiply) throws IOException {System.out.println("消息發(fā)送成功:"+msgTag);if(multiply) { //如果是批量確認,批量刪除//headMap返回小于msgTag的map視圖ConcurrentNavigableMap concurrentNavigableMap = concurrentSkipListMap.headMap(msgTag);//清理已經(jīng)標記的MapconcurrentNavigableMap.clear();}else {concurrentSkipListMap.remove(msgTag);}}@Overridepublic void handleNack(long msgTag, boolean multiply) throws IOException {System.out.println("未確認的消息:"+concurrentSkipListMap.get(msgTag));}});long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());//記錄發(fā)送的信息與其序列號concurrentSkipListMap.put(channel.getNextPublishSeqNo(),new String(i+" "));}long end = System.currentTimeMillis();System.out.println("發(fā)布1000條耗時:"+(end-begin)+"ms");}
發(fā)布/訂閱模式(fanout交換機)
首先要弄明白交換機和隊列的關系,交換機負責信息的接收,通過不同的RountingKey將消息轉發(fā)到不同的隊列,每個隊列上的接收者都是競爭關系(即隊列上的消息只會被處理一次),那么當一個交換機對應多個隊列時,每個隊列僅有一個消費者,這個時候即發(fā)布/訂閱模式,消息會被每個消費者接收。
生產(chǎn)者代碼:向交換機中發(fā)送消息
public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String next = scanner.next();channel.basicPublish(EXCHANGE_NAME,"", null,next.getBytes());}}
消費者代碼:聲明匿名隊列,將隊列綁定到交換機上,不同的消費者用相同的RountingKey,以便同時接收到消息。
public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //FANOUT煽出,就是發(fā)布訂閱模式String queue = channel.queueDeclare().getQueue(); //聲明匿名隊列channel.queueBind(queue,EXCHANGE_NAME,""); //將隊列綁定到交換機上,RountingKey為“”DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println("接收到消息:"+new String(message.getBody()));};channel.basicConsume(queue,true,deliverCallback, (consumerTag)->{});}
Direct交換機
與fanout模式相比,不同的隊列有不同的Rounting key,通過Rounting Key能夠直接向指定隊列發(fā)送消息。
Topic交換機
rountingKey作為匹配串,發(fā)送消息時,匹配上的則能進行發(fā)送。
routingKey必須是單詞列表,用.隔開。如aa.bb.cc
*可以代表一個單詞 ,#可以代表若干個單詞
比如向rountingKey為aa.orange.rabbit發(fā)送消息,Q1和Q2都能接收到消息,而向aa.orange.bb發(fā)送消息則只有Q1能夠接收到消息。
當隊列的rountingKey綁定的#,則相當于fanout煽出交換機。
當隊列的rountingKey綁定不帶#*時,相當于direct交換機。
死信隊列
在隊列中1消息超時、2無法處理、3隊列已滿時,消息會被送入死信隊列。