公司網(wǎng)站設(shè)計費計入什么科目百度快照手機版
rabbitMQ安裝插件rabbitmq-delayed-message-exchange
交換機由此type
表示組件安裝成功
生產(chǎn)者發(fā)送消息時設(shè)置延遲值 消息在交換機滯納至指定延遲后,進入隊列,被消費者消費。
組件注解類:
package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedQueueConfig {//交換機public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//隊列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";/*** 基于插件聲明一個自定義交換機* @return*/@Beanpublic CustomExchange delayedExchange(){//String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) {Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true, false,arguments);}@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();}@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
生產(chǎn)者代碼實現(xiàn):
package com.esint.controller;//發(fā)送延遲消息import com.esint.configs.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){log.info("當(dāng)前時間:{},發(fā)送一條ttl為{}ms的消息給延遲交換機轉(zhuǎn)隊列:{}",new Date().toString(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, mes->{mes.getMessageProperties().setDelay(delayTime);return mes;});}}
消費者實現(xiàn):
package com.esint.consumer;import com.esint.configs.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 基于插件的延時消息*/
@Slf4j
@Component
public class DelayQueueConsumer {//監(jiān)聽消息隊列@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayQueue(Message message){String msg = new String(message.getBody());log.info("當(dāng)前時間{} 收到延遲消息:{}",new Date().toString(),msg);}
}
測試:
http://127.0.0.1:19092/ttl/sendDelayMsg/helloDelay1/30000
http://127.0.0.1:19092/ttl/sendDelayMsg/helloDelay2/3000
發(fā)送第一條消息:helloDelay1 延遲30s
發(fā)送第二條消息:helloDelay2 延遲3s
滿足條件。
總結(jié):
阻塞層在交換機。
發(fā)送消息靈活設(shè)置時間,現(xiàn)達(dá)到時間先被消費。
需要安裝延時插件。