做設(shè)計比較好的網(wǎng)站網(wǎng)站推廣排名服務(wù)
目錄
11.4 SpringAMQP
11.4.2 Work Queue工作隊列
11.4.3 發(fā)布訂閱模型
11.4.4 FanoutExchange(廣播交換機(jī))
11.4.5 DirectExchange(路由模式交換機(jī))
11.4.6 TopicExchange
11.5 消息轉(zhuǎn)換器
11.4 SpringAMQP
父工程引入AMQP依賴
<!--AMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
編寫測試方法
yml配置文件中編寫配置
spring:rabbitmq:host: 192.168.142.130 ? # rabbitmq的ip地址port: 5672 # 端口username: xxxxxpassword: xxxxxxxvirtual-host: /
發(fā)消息測試
@SpringBootTestpublic class AMQPTest {?@Autowiredprivate RabbitTemplate rabbitTemplate;?@Testpublic void testSendMessage2SimpleQueue(){String queueName = "simple.queue";String message = "hello,spring amqp";rabbitTemplate.convertAndSend(queueName,message);}}
在consumer中編寫消費(fèi)邏輯,監(jiān)聽simple.queue
配置文件配置 :
spring:rabbitmq:host: 192.168.142.129 ? # rabbitmq的ip地址port: 5672 # 端口username: xxxxxpassword: xxxxxvirtual-host: /
編寫監(jiān)聽類
@Componentpublic class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void ListenSimpleQueue(String msg){System.out.println("消費(fèi)者接收到simple.queue的消息 : " + msg);}}
啟動主啟動類,控制臺可看到輸出的監(jiān)聽到的消息
消息一旦被消費(fèi),就會從隊列中刪除,沒有回收機(jī)制
11.4.2 Work Queue工作隊列
publisher代碼
@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello,spring amqp__";for(int i = 1 ; i <= 50 ; i ++){rabbitTemplate.convertAndSend(queueName,message + i); ? Thread.sleep(20);}}
consumer接收消息
// 消費(fèi)者1@RabbitListener(queues = "simple.queue")public void ListenWork1Queue(String msg) throws InterruptedException {System.out.println("消費(fèi)者1接收到simple.queue的消息 : " + msg + LocalTime.now());Thread.sleep(20);}?// 消費(fèi)者2@RabbitListener(queues = "simple.queue")public void ListenWork2Queue(String msg) throws InterruptedException {System.err.println("消費(fèi)者2接收到simple.queue的消息 : " + msg + LocalTime.now());Thread.sleep(200);}
消息預(yù)取機(jī)制使得兩者平均分配消息 不符預(yù)期
配置文件中 :
處理預(yù)取值
spring:rabbitmq:host: 192.168.142.129 ? # rabbitmq的ip地址port: 5672 ?# 端口username: xxxxxxpassword: xxxxxxxvirtual-host: /listener:simple:prefetch: 1 ? ?# 每次只能獲取一條消息 ,處理完成才能獲取下一個信息
11.4.3 發(fā)布訂閱模型
11.4.4 FanoutExchange(廣播交換機(jī))
步驟一 : 聲明交換機(jī),隊列 , 并綁定隊列和交換機(jī)
在consumer中編寫配置類
@Configurationpublic class FanoutConfig {// 聲明交換機(jī)@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("xinbo.fanout");}?// 聲明隊列1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}?// 綁定隊列1到交換機(jī)@Beanpublic Binding fanoutBindind(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}?// 聲明隊列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}?// 綁定隊列2到交換機(jī)@Beanpublic Binding fanoutBindind2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
消息監(jiān)聽 :
@Componentpublic class SpringRabbitListener {?// 消費(fèi)者1@RabbitListener(queues = "fanout.queue1")public void ListenWork1Queue(String msg) throws InterruptedException {System.out.println("消費(fèi)者1接收到fanout.queue1的消息 : " + msg + LocalTime.now());Thread.sleep(20);}?// 消費(fèi)者2@RabbitListener(queues = "fanout.queue2")public void ListenWork2Queue(String msg) throws InterruptedException {System.err.println("消費(fèi)者2接收到fanout.queue2的消息 : " + msg + LocalTime.now());Thread.sleep(200);}}
消息發(fā)送 :
@Testpublic void testSendFanoutExchange(){String exchangeName = "xinbo.fanout"; ? ? ? ? ? ? ? // 交換機(jī)名稱String message = "hello,everyone";rabbitTemplate.convertAndSend(exchangeName,null,message);}
11.4.5 DirectExchange(路由模式交換機(jī))
利用@RabbitListener聲明Exchange Queue RoutingKey
SpirngRabbitListener中
@Componentpublic class SpringRabbitListener {?@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "xinbo.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void ListenDirectQueue1(String msg) throws InterruptedException {System.out.println("消費(fèi)者接收到direct.queue1的消息 : " + msg + LocalTime.now());Thread.sleep(20);}?@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "xinbo.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void ListenDirectQueue2(String msg) throws InterruptedException {System.out.println("消費(fèi)者接收到direct.queue2的消息 : " + msg + LocalTime.now());Thread.sleep(20);}?}
發(fā)送消息測試 :
@Testpublic void testSendDirectExchange(){// 交換機(jī)名稱String exchangeName = "xinbo.direct";String message = "hello,blue";rabbitTemplate.convertAndSend(exchangeName,"blue",message);}
11.4.6 TopicExchange
綁定隊列和交換機(jī)的關(guān)系 :
@Componentpublic class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name="xinbo.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void ListenTopicQueue1(String msg){System.out.println("消費(fèi)者接收到topic.queue1的消息 : " + msg + LocalTime.now());}?@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name="xinbo.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void ListenTopicQueue2(String msg){System.out.println("消費(fèi)者接收到topic.queue2的消息 : " + msg + LocalTime.now());}?}
發(fā)送消息 :
@Testpublic void testSendTopicExchange(){// 交換機(jī)名稱String exchangeName = "xinbo.topic";String message = "中國發(fā)生了xxxxx";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
11.5 消息轉(zhuǎn)換器
發(fā)送和接受json類型的消息
添加依賴 :
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
在配置類中
@Beanpublic MessageConverter messageCondition(){return new Jackson2JsonMessageConverter();}
接收消息 :
引依賴 :同上
在Listener中 :
@RabbitListener(queues = "object.queue")public void ListenObjectQueue(Map<String,Object> msg){System.out.println(msg);}