手機(jī)微信網(wǎng)站模板買賣交易平臺
Routing Direct
在Fanout模式中,一條消息,會被所有訂閱的隊(duì)列都消費(fèi)。但是在某些場景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時就要用到Direct類型的Exchange。
在Direct模型下:
- 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 消息的發(fā)送方在向 Exchange發(fā)送消息時,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊(duì)列,而是根據(jù)消息的
Routing Key
進(jìn)行判斷,只有隊(duì)列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
- P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時,會指定一個routing key。
- X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給與routing key完全匹配的隊(duì)列。
- C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息。
- C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息。
創(chuàng)建生產(chǎn)者
public class MyProducer {@Testpublic void test() throws Exception {// 交換機(jī)String exchange = "logs_direct";// 創(chuàng)建工廠ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 創(chuàng)建連接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明交換機(jī)channel.exchangeDeclare(exchange, "direct");for (int i = 0; i < 3; i++) {// 發(fā)布消息channel.basicPublish(exchange, "DEBUG", null, ("DEBUG LOG -> " + i).getBytes());channel.basicPublish(exchange, "INFO", null, ("INFO LOG -> " + i).getBytes());channel.basicPublish(exchange, "WARN", null, ("WARN LOG -> " + i).getBytes());channel.basicPublish(exchange, "ERROR", null, ("ERROR LOG -> " + i).getBytes());}}
}
創(chuàng)建消費(fèi)者1
public class MyConsumer1 {public static void main(String[] args) throws Exception {// 指定交換機(jī)String exchange = "logs_direct";// 創(chuàng)建工廠ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 創(chuàng)建連接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定交換機(jī)channel.exchangeDeclare(exchange, "direct");// 創(chuàng)建臨時隊(duì)列String queue = channel.queueDeclare().getQueue();// 將臨時隊(duì)列綁定exchangechannel.queueBind(queue, exchange, "WARN");channel.queueBind(queue, exchange, "ERROR");// 處理消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費(fèi)者1: " + new String(body));// TODO 業(yè)務(wù)處理}});}
}
創(chuàng)建消費(fèi)者2
public class MyConsumer2 {public static void main(String[] args) throws Exception {// 指定交換機(jī)String exchange = "logs_direct";// 創(chuàng)建工廠ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 創(chuàng)建連接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定交換機(jī)channel.exchangeDeclare(exchange, "direct");// 創(chuàng)建臨時隊(duì)列String queue = channel.queueDeclare().getQueue();// 將臨時隊(duì)列綁定exchangechannel.queueBind(queue, exchange, "DEBUG");channel.queueBind(queue, exchange, "INFO");// 處理消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費(fèi)者2: " + new String(body));// TODO 業(yè)務(wù)處理}});}
}