如何給網(wǎng)站死鏈接做404北京seo外包平臺
“如何實現(xiàn)Redis延時隊列”這個面試題應該也是比較常見的,解答如下:
使用sortedset(有序集合) ,拿時間戳作為 score ,消息內容作為key 調用 zadd 來生產(chǎn)消息,消費者用zrangebyscore 指令獲取 N 秒之前的數(shù)據(jù)輪詢進行處理。
目錄
- 實現(xiàn)思路
- 引入Jedis
- 指令簡介
- zadd
- zrem
- zrangeByScore
- Java實現(xiàn)Redis延時隊列
實現(xiàn)思路
Java實現(xiàn)Redis延時隊列,首先要了解何為延時隊列,即可以將消息存儲在隊列中,并在指定的延時時間后再將消息出隊。這種隊列在很多場景下都非常有用,例如消息延時處理,延時確認(訂單確認) 等,參考以上解答,思路應該拆分:
首先需要有個延時隊列,該隊列是通過一定順序(當前時間戳+延時時間)排序的(即優(yōu)先取到延時時間已結束的數(shù)據(jù)),然后消費者端就需要獲取到隊列中延時時間靠前結束的數(shù)據(jù)(即當前時間戳+延時時間靠前)。
引入Jedis
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.7.0</version>
</dependency>
指令簡介
zadd
zadd命令用于將一個成員Score值加入到有序集合中。Score值可以是整數(shù)或者浮點數(shù)。如果有序集合中已經(jīng)存在相同的成員,那么舊成員將被替代。
語法:ZADD key Score member [Score2 member2 …]
示例:ZADD students 100 alice 或 ZADD students 80 alice 90 bob (添加單個或多個情況)
zrem
zrem命令用于從有序集合中移除一個或多個成員。該命令接收兩個參數(shù):第一個參數(shù)是要操作的有序集合的鍵,第二個參數(shù)是將要移除的成員的值。
語法:ZREM key member [member …]
示例:ZREM students alice
zrangeByScore
zrangeByScore命令用于獲取分數(shù)在指定范圍內的所有成員。
語法:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
參數(shù)說明:
key:Redis中的鍵。
min:最小分數(shù)。
max:最大分數(shù)。
WITHSCORES:可選參數(shù),如果設置為true,則返回分數(shù)以及成員。
LIMIT:可選參數(shù),用于限制返回的成員數(shù)量。
offset:偏移量,從第幾個成員開始。
count:限制返回的成員數(shù)量。
返回值:
按照分數(shù)升序返回成員列表。
如果設置了LIMIT參數(shù),則返回限制數(shù)量的成員。
示例:ZADD ZRANGEBYSCORE students 80 90 WITHSCORES
Java實現(xiàn)Redis延時隊列
核心部分,消息隊列工具類
import redis.clients.jedis.Jedis;
import java.util.Set;public class DelayQueueWithRedis {private Jedis jedis;private String queueKey;public DelayQueueWithRedis(Jedis jedis, String queueKey) {this.jedis = jedis;this.queueKey = queueKey;}// 添加消息到延遲隊列public void push(String message, long delaySeconds) {// 計算消息的分數(shù),這里使用消息進入隊列的時間加上延遲時間long score = System.currentTimeMillis() / 1000 + delaySeconds;//向有序集合添加一個成員,并設置其分數(shù)jedis.zadd(queueKey, score, message);}// 獲取并消費一條消息public String pop() {while (true) {long now = System.currentTimeMillis() / 1000;// 只獲取分數(shù)在0到當前時間的元素Set<String> messages = jedis.zrangeByScore(queueKey, 0, now, 0, 1);if (messages.isEmpty()) {System.out.println("No messages");// 沒有可消費的消息,休眠一會兒繼續(xù)嘗試try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}} else {String message = messages.iterator().next();// 從有序集合中移除一個成員jedis.zrem(queueKey, message);return message;}}return null;}
}
生產(chǎn)者端測試
import redis.clients.jedis.Jedis;/*** @Author: zhangximing* @Email: 530659058@qq.com* @Date: 2024/2/19 16:53* @Description: 生產(chǎn)者端測試*/
public class MainP {public static void main(String[] args) {Jedis jedis = new Jedis("localhost",6379);DelayQueueWithRedis delayQueue = new DelayQueueWithRedis(jedis, "delay_queue");// 添加延時消息delayQueue.push("message1", 5);delayQueue.push("message2", 10);delayQueue.push("message3", 8);}}
消費者端測試
import redis.clients.jedis.Jedis;/*** @Author: zhangximing* @Email: 530659058@qq.com* @Date: 2024/2/19 16:51* @Description: 消費者端測試*/
public class MainC {public static void main(String[] args) {Jedis jedis = new Jedis("localhost",6379);DelayQueueWithRedis delayQueue = new DelayQueueWithRedis(jedis, "delay_queue");// 消費延時消息while (true) {String message = delayQueue.pop();if (message != null) {System.out.println("Consumed: " + message);}}}
}
測試結果:數(shù)據(jù)在延時指定時間后才正常打印