濟南公司網(wǎng)站建設(shè)體驗式營銷案例
??一、介紹????
????????RabbitMQ消息傳遞模型的核心思想是:生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊列。實際上,通常生產(chǎn)者甚至不知道這些消息傳遞到了哪些隊列中。
???????相反,生產(chǎn)者只能將消息發(fā)送到交換機,交換機工作的內(nèi)容非常簡單,一方面他接受來自生產(chǎn)者的消息,另一方面他將他們推入隊列。交換機必須確切知道如何處理收到的消息。是應該把這些消息放到特定隊列還是說把他們放到許多隊列還是說應該丟棄他們。這就由交換機來決定。
??二、類型
?????????1、類型
????????總共有以下類型:直接(direct)[路由],主題(topic),標題(headers),扇出(fanout)[發(fā)布訂閱],
??????????默認類型[無名類型]?通過("")進行標識
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
??????????第一個參數(shù)是交換機名稱,空字符串表示默認或無名的交換機;消息能路由發(fā)送到隊列中,其實是由routingKey(bindingKey)綁定key指定的,如果它存在的話。
??????2、?臨時隊列
???????每當我們連接到Rabbit時,我們都需要一個全新的空隊列,為此我們可以創(chuàng)建一個具有隨機名稱的隊列,或者能讓服務(wù)器為我們選擇一個隨機隊列名稱。其次一旦我們斷開了消費者連接,隊列將被自動刪除。
???????創(chuàng)建臨時隊列的方式如下
String queueName = channel.queueDeclare().getQueue();
?????3、綁定(bingings)
???????binding其實時exchange和queue之間的橋梁,他告訴我們exchange和哪個隊列進行了綁定關(guān)系
????4、fanout
????????他是將接收到的所有消息廣播到他知道的所有隊列中
????????消費者,另一個復制即可
public class ReceiveLogs01 {//交換機的名稱public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明一個交換機channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//聲明一個隊列 臨時隊列/*** 隊列的名稱是隨機的* 當消費者斷開與隊列的連接的時候,隊列就自動刪除*/String queueName = channel.queueDeclare().getQueue();/*** 綁定隊列與交換機*/channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");//接收消息DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("01控制臺打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}
}
???生產(chǎn)者
public class EmitLog {//交換機的名稱public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());System.out.println("生產(chǎn)者發(fā)出消息:"+message);}}
}
?????結(jié)果

???5、direct
??????消息只去到他綁定的routingKey隊列中,支持多重綁定,當exchange的綁定類型是direct,但是他綁定的多個隊列的key如果都相同,在這種情況下雖然綁定類型是direct但是他表現(xiàn)的就和fanout有點類似了。
?????生產(chǎn)者
public class DirectLogs {//交換機的名稱public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes());System.out.println("生產(chǎn)者發(fā)出消息:"+message);}}
}
???消費者1
public class ReceiveLogs01 {//交換機名稱public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明一個交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明一個隊列channel.queueDeclare("console",false,false,false,null);//綁定隊列與交換機channel.queueBind("console",EXCHANGE_NAME,"info");channel.queueBind("console",EXCHANGE_NAME,"warning");DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("direct01控制臺打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume("console",true,deliverCallback,cancelCallback);}
}
??消費者2
public class ReceiveLogs02 {//交換機名稱public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明一個交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明一個隊列channel.queueDeclare("disk",false,false,false,null);//綁定隊列與交換機channel.queueBind("disk",EXCHANGE_NAME,"error");DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("direct02控制臺打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume("disk",true,deliverCallback,cancelCallback);}
}
??????5、topic
???????????發(fā)送到類型是topic交換機的消息的routing_key不能隨意寫,必須滿足一定的要求,它必須是一個單詞列表,以點號分隔開。這些單詞可以是任意單詞,比如說“stock.usd.nyse”,"nyse.vmw","quick.orange.rabbit"這種類型的。但是這個單詞列表最多不能超過255個字節(jié)?!??可以代替一個單詞;#可以替代零個或多個單詞】
??????????例如Q1->綁定的是orange帶三個單詞的字符串(*.orange.*)
?????????????????Q2->綁定的是最后一個是rabbit的3個單詞(*.*.rabbit)
??????????????????????????第一個單詞是lazy的多個單詞(lazy.#)
?????????當一個隊列綁定鍵是#,那么這個隊列將接收所有數(shù)據(jù),有點像fanout;如果隊列綁定鍵當中沒有#h和*出現(xiàn),那么該隊列綁定類型就是direct了。
????????生產(chǎn)者
public class EmitLogTopic {//交換機名稱public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();Map<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被隊列Q1Q2接收到");bindingKeyMap.put("lazy.orange.eleplant","被隊列Q1Q2接收到");bindingKeyMap.put("quick.orange.fox","被隊列Q1接收到");bindingKeyMap.put("lazy.brown.fox","被隊列Q2接收到");bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個綁定但只被隊列Q2接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何綁定不會被任何隊列接收到會被丟棄");bindingKeyMap.put("quick.orange.male.rabbit","是四個單詞不匹配任何綁定會被丟棄");bindingKeyMap.put("lazy.orange.male.rabbit","是四個單詞但匹配Q2");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());System.out.println("生產(chǎn)者發(fā)出消息:"+message);}}
}
??消費者1
public class ReceiveLogsTopic01 {//交換機名稱public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//聲明隊列String queueName = "Q1";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");System.out.println("Q1等待接收消息。。。。。。");//接收消息channel.basicConsume(queueName,true,(consumeTag,message)->{System.out.println(new String(message.getBody()));System.out.println(" 接收隊列:"+queueName+" 綁定鍵:"+message.getEnvelope().getRoutingKey());},(message)->{});}
}
???消費者2
public class ReceiveLogsTopic02 {//交換機名稱public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//聲明隊列String queueName = "Q2";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");System.out.println("Q2等待接收消息。。。。。。");//接收消息channel.basicConsume(queueName,true,(consumeTag,message)->{System.out.println(new String(message.getBody()));System.out.println(" 接收隊列:"+queueName+" 綁定鍵:"+message.getEnvelope().getRoutingKey());},(message)->{});}
}