免費(fèi)ppt大全網(wǎng)seo的中文含義是什么
/*** 使用rabbitMQ* 1.引用amqp場景 RabbitAutoConfiguration就會(huì)自動(dòng)生效* 2.給容器中自動(dòng)配置了各種api RabbitTemplate AmqpAdmin CachingConnectionFactory RabbitMessagingTemplate* 所有屬性都是 spring.rabbitmq開頭* 3.通過注解@EnableRabbit使用* 4.監(jiān)聽消息 使用@RabbitListener 注解 必須有@EnableRabbit才能生效 如果是創(chuàng)建交換機(jī),創(chuàng)建隊(duì)列 不需要有@EnableRabbit注解* @RabbitListener 可以標(biāo)在類和方法上* @RabbitHandler 可以標(biāo)在方法上 場景 一個(gè)隊(duì)列返回的類型不同 使用這個(gè)注解來重載*/
1.引入依賴
<!-- mq依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.配置文件
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
#虛擬主機(jī)
spring.rabbitmq.virtual-host=/#開啟消息生產(chǎn)者發(fā)送消息確認(rèn)
# NONE:禁用發(fā)布確認(rèn)模式,是默認(rèn)值
#CORRELATED:發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法
#SIMPLEspring.rabbitmq.publisher-confirm-type=correlated
# 開啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn)
spring.rabbitmq.publisher-returns=true
# 只要抵達(dá)隊(duì)列,以異步發(fā)送優(yōu)先回調(diào)這個(gè)
spring.rabbitmq.template.mandatory=true#手動(dòng)ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3.注入
@AutowiredAmqpAdmin amqpAdmin;
操作創(chuàng)建交換機(jī),創(chuàng)建隊(duì)列,創(chuàng)建綁定關(guān)系
/*** 1.如何創(chuàng)建Exchange Queue Binding? 使用AmqpAdmin進(jìn)行創(chuàng)建* 2.如何接收消息*/@Testpublic void createExchange(){//創(chuàng)建一個(gè)交換機(jī)//String name 交換機(jī)名稱, boolean durable 是否持久化, boolean autoDelete 是否自動(dòng)刪除DirectExchange directExchange = new DirectExchange("hallo-java-exchange", true, false);amqpAdmin.declareExchange(directExchange);System.out.println("單點(diǎn)交換機(jī)創(chuàng)建成功"+directExchange);}@Testpublic void createQueue(){//String name 隊(duì)列名稱, boolean durable 是否持久化, boolean exclusive 是否排他,// boolean autoDelete 是否自動(dòng)刪除, @Nullable Map<String, Object> argumentsQueue queue = new Queue("hello-java-queue",true,false,false);amqpAdmin.declareQueue(queue);}/*** 測試創(chuàng)建綁定關(guān)系*/@Testpublic void createBing(){//String destination, 目的地// DestinationType destinationType,目的地類型//String exchange 交換機(jī)// String exchange, String routingKey, 路由key// @Nullable Map<String, Object> arguments 自定義參數(shù)Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hallo-java-exchange","hello.java",null);amqpAdmin.declareBinding(binding);}
/ 用來發(fā)送消息的@AutowiredRabbitTemplate rabbitTemplate;
/*** 測試發(fā)送消息功能*/@Testpublic void sendMessageTest(){for(int i=0;i<10;i++){if(i%2 == 0){OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();reasonEntity.setId(1L);reasonEntity.setCreateTime(new Date());reasonEntity.setName("哈哈");//發(fā)送消息 如果發(fā)送的消息是個(gè)對(duì)象,我們會(huì)使用序列化機(jī)制,將對(duì)象寫出去 對(duì)象必須實(shí)現(xiàn) Serializable//也可以用config的方法 將對(duì)象類型的消息轉(zhuǎn)為json//String exchange 發(fā)送的交換機(jī), String routingKey 路由key, Object object 發(fā)送的內(nèi)容,// correlationData 指定uuid,生產(chǎn)者發(fā)送消息給服務(wù)時(shí)候,獲得這個(gè)參數(shù),從而確定是哪條消息rabbitTemplate.convertAndSend("hallo-java-exchange","hello.java",reasonEntity,new CorrelationData(UUID.randomUUID().toString()));}else{OrderEntity orderEntity = new OrderEntity();orderEntity.setOrderSn(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("hallo-java-exchange","hello.java",orderEntity,new CorrelationData(UUID.randomUUID().toString()));}}}
接收消息在業(yè)務(wù)層操作 相關(guān)的注解有
@RabbitListener 可以標(biāo)在類和方法上
@RabbitHandler 可以標(biāo)在方法上 場景 一個(gè)隊(duì)列返回的類型不同 使用這個(gè)注解來重載
@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {/*** 測試接收消息* 注解中的queues 聲明需要監(jiān)聽的隊(duì)列,可以是多個(gè)* org.springframework.amqp.core.Message 第一個(gè)參數(shù)是原生的詳細(xì)信息* OrderReturnApplyEntity 第二個(gè)參數(shù) 消息的返回類型* Channel 第三個(gè)參數(shù) 服務(wù)獲取消息的通道** 場景* 多個(gè)客戶端啟動(dòng)時(shí),一個(gè)消息只能被一個(gè)客戶端接收 輪詢制度* 多個(gè)消息等待服務(wù)處理時(shí),處理完一個(gè)后,才會(huì)接收下一個(gè)** 設(shè)置ack配置 確認(rèn)前是unack狀態(tài) 若宕機(jī)或其他原因失敗,系統(tǒng)再次重啟狀態(tài)為ready 手動(dòng)簽收后消息才被處理掉**/@RabbitHandlerpublic void recieveMessage(Message message, OrderReturnApplyEntity returnApplyEntity,Channel channel) throws IOException {//獲取消息體byte[] body = message.getBody();//獲取消息頭MessageProperties messageProperties = message.getMessageProperties();System.out.println(returnApplyEntity);long deliveryTag = message.getMessageProperties().getDeliveryTag();//channel內(nèi)按照順自增的if(deliveryTag%2 == 0){//當(dāng)業(yè)務(wù)流程執(zhí)行完后 手動(dòng)簽收消息 deliveryTag 簽收的消息序號(hào), b 是否批量簽收channel.basicAck(deliveryTag,false);}else{//拒簽 拒簽的序號(hào),是否批量操作,是否歸隊(duì) 若false則丟棄 若ture則歸隊(duì)channel.basicNack(deliveryTag,false,false);// 和上面作用一樣 只是不能設(shè)置批量操作這個(gè)參數(shù)// channel.basicReject(deliveryTag,false);}}@RabbitHandlerpublic void recieveMessage2(Message message, OrderEntity order,Channel channel){//獲取消息體byte[] body = message.getBody();//獲取消息頭MessageProperties messageProperties = message.getMessageProperties();System.out.println(order);}
4.config文件
@Configuration
public class MyRabbitConfig {@AutowiredRabbitTemplate rabbitTemplate;@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/*** 定制RabbitTemplate*/@PostConstruct //MyRabbitConfig對(duì)象創(chuàng)建完成后,執(zhí)行這個(gè)方法public void initRabbitTemplate(){//設(shè)置確認(rèn)回調(diào)//1.生產(chǎn)者發(fā)送消息到服務(wù)這一步的確認(rèn)rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {//CorrelationData correlationData, 當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù)(可以理解為id)// boolean b, 判斷消息是否成功收到// String s 如果失敗,顯示失敗原因@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("correlationData==="+correlationData+"b==="+b+"s==="+s);}});//2.設(shè)置消息沒有投遞給指定的隊(duì)列,就觸發(fā)這個(gè)失敗的回調(diào)rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {//獲得投遞失敗的消息的詳細(xì)信息Message message = returnedMessage.getMessage();//獲得投遞消息失敗的交換機(jī)String exchange = returnedMessage.getExchange();//獲得投遞消息失敗的狀態(tài)碼int replyCode = returnedMessage.getReplyCode();//獲得投遞消息失敗的文本String replyText = returnedMessage.getReplyText();//獲得投遞消息失敗的路由keyString routingKey = returnedMessage.getRoutingKey();}});//3.消費(fèi)端確認(rèn)(保證每個(gè)消息 被正確消費(fèi),此時(shí)才可以刪除這個(gè)消息)}
保證消息可靠投遞
生產(chǎn)者到交換機(jī) 使用confirmCallback; 得到消息投遞的結(jié)果
交換機(jī)到隊(duì)列 使用 returnCallback; 如果消息接收失敗,將會(huì)觸發(fā),得到失敗消息的信息
隊(duì)列到消費(fèi)者 使用ack機(jī)制;進(jìn)行手動(dòng)確認(rèn)
常見的模式
1.點(diǎn)對(duì)點(diǎn)模式,如果路由key和綁定關(guān)系完全匹配,交換機(jī)才能收到;
2.主題訂閱模式,路由key和綁定關(guān)系以單詞維度匹配,路由key中可以用? “#”代表匹配多個(gè)或0個(gè)
“*” 代表匹配一個(gè)
3.廣播模式,不受路由key局限,只要交換機(jī)和隊(duì)列有綁定關(guān)系,就可以收到消息;
4.helder性能比較低,一般不適用,私信隊(duì)列不太熟悉