一級a做愛av網(wǎng)站百度新聞官網(wǎng)
作者:vivo 互聯(lián)網(wǎng)服務(wù)器團(tuán)隊-Wang Fei
Redis是一種基于客戶端-服務(wù)端模型以及請求/響應(yīng)的TCP服務(wù)。在遇到批處理命令執(zhí)行時,Redis提供了Pipelining(管道)來提升批處理性能。本文結(jié)合實踐分析了Spring Boot框架下Redis的Lettuce客戶端和Redisson客戶端對Pipeline特性的支持原理,并針對實踐過程中遇到的問題進(jìn)行了分析,可以幫助開發(fā)者了解不同客戶端對Pipeline支持原理及避免實際使用中出現(xiàn)問題。
一、前言
Redis 已經(jīng)提供了像 mget 、mset 這種批量的命令,但是某些操作根本就不支持或沒有批量的操作,從而與 Redis 高性能背道而馳。為此, Redis基于管道機制,提供Redis Pipeline新特性。Redis Pipeline是一種通過一次性發(fā)送多條命令并在執(zhí)行完后一次性將結(jié)果返回,從而減少客戶端與redis的通信次數(shù)來實現(xiàn)降低往返延時時間提升操作性能的技術(shù)。目前,Redis Pipeline是被很多個版本的Redis 客戶端所支持的。?
二、Pipeline 底層原理分析
?2.1 Redis單個命令執(zhí)行基本步驟
Redis是一種基于客戶端-服務(wù)端模型以及請求/響應(yīng)的TCP服務(wù)。一次Redis客戶端發(fā)起的請求,經(jīng)過服務(wù)端的響應(yīng)后,大致會經(jīng)歷如下的步驟:
-
客戶端發(fā)起一個(查詢/插入)請求,并監(jiān)聽socket返回,通常情況都是阻塞模式等待Redis服務(wù)器的響應(yīng)。
-
服務(wù)端處理命令,并且返回處理結(jié)果給客戶端。
-
客戶端接收到服務(wù)的返回結(jié)果,程序從阻塞代碼處返回。
2.2 RTT 時間
Redis客戶端和服務(wù)端之間通過網(wǎng)絡(luò)連接進(jìn)行數(shù)據(jù)傳輸,數(shù)據(jù)包從客戶端到達(dá)服務(wù)器,并從服務(wù)器返回數(shù)據(jù)回復(fù)客戶端的時間被稱之為RTT(Round Trip Time - 往返時間)。我們可以很容易就意識到,Redis在連續(xù)請求服務(wù)端時,如果RTT時間為250ms, 即使Redis每秒能處理100k請求,但也會因為網(wǎng)絡(luò)傳輸花費大量時間,導(dǎo)致每秒最多也只能處理4個請求,導(dǎo)致整體性能的下降。
2.3 Redis Pipeline
為了提升效率,這時候Pipeline出現(xiàn)了。Pipelining不僅僅能夠降低RRT,實際上它極大的提升了單次執(zhí)行的操作數(shù)。這是因為如果不使用Pipelining,那么每次執(zhí)行單個命令,從訪問數(shù)據(jù)的結(jié)構(gòu)和服務(wù)端產(chǎn)生應(yīng)答的角度,它的成本是很低的。但是從執(zhí)行網(wǎng)絡(luò)IO的角度,它的成本其實是很高的。其中涉及到read()和write()的系統(tǒng)調(diào)用,這意味著需要從用戶態(tài)切換到內(nèi)核態(tài),而這個上下文的切換成本是巨大的。
當(dāng)使用Pipeline時,它允許多個命令的讀通過一次read()操作,多個命令的應(yīng)答使用一次write()操作,它允許客戶端可以一次發(fā)送多條命令,而不等待上一條命令執(zhí)行的結(jié)果。不僅減少了RTT,同時也減少了IO調(diào)用次數(shù)(IO調(diào)用涉及到用戶態(tài)到內(nèi)核態(tài)之間的切換),最終提升程序的執(zhí)行效率與性能。如下圖:
要支持Pipeline,其實既要服務(wù)端的支持,也要客戶端支持。對于服務(wù)端來說,所需要的是能夠處理一個客戶端通過同一個TCP連接發(fā)來的多個命令,可以理解為,這里將多個命令切分,和處理單個命令一樣,Redis就是這樣處理的。而客戶端,則是要將多個命令緩存起來,緩沖區(qū)滿了就發(fā)送,然后再寫緩沖,最后才處理Redis的應(yīng)答。
三、Pipeline 基本使用及性能比較
下面我們以給10w個set結(jié)構(gòu)分別插入一個整數(shù)值為例,分別使用jedis單個命令插入、jedis使用Pipeline模式進(jìn)行插入和redisson使用Pipeline模式進(jìn)行插入以及測試其耗時。
@Slf4j
public class RedisPipelineTestDemo {public static void main(String[] args) {//連接redisJedis jedis = new Jedis("10.101.17.180", 6379);//jedis逐一給每個set新增一個valueString zSetKey = "Pipeline-test-set";int size = 100000;long begin = System.currentTimeMillis();for (int i = 0; i < size; i++) {jedis.sadd(zSetKey + i, "aaa");}log.info("Jedis逐一給每個set新增一個value耗時:{}ms", (System.currentTimeMillis() - begin));//Jedis使用Pipeline模式 Pipeline Pipeline = jedis.Pipelined();begin = System.currentTimeMillis();for (int i = 0; i < size; i++) { Pipeline.sadd(zSetKey + i, "bbb");} Pipeline.sync();log.info("Jedis Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));//Redisson使用Pipeline模式Config config = new Config();config.useSingleServer().setAddress("redis://10.101.17.180:6379");RedissonClient redisson = Redisson.create(config);RBatch redisBatch = redisson.createBatch();begin = System.currentTimeMillis();for (int i = 0; i < size; i++) {redisBatch.getSet(zSetKey + i).addAsync("ccc");}redisBatch.execute();log.info("Redisson Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));//關(guān)閉 Pipeline.close();jedis.close();redisson.shutdown();}
}
測試結(jié)果如下:
Jedis逐一給每個set新增一個value耗時:162655ms
Jedis Pipeline模式耗時:504ms
Redisson Pipeline模式耗時:1399ms
我們發(fā)現(xiàn)使用Pipeline模式對應(yīng)的性能會明顯好于單個命令執(zhí)行的情況。
四、項目中實際應(yīng)用
在實際使用過程中有這樣一個場景,很多應(yīng)用在節(jié)假日的時候需要更新應(yīng)用圖標(biāo)樣式,在運營進(jìn)行后臺配置的時候, 可以根據(jù)圈選的用戶標(biāo)簽預(yù)先計算出單個用戶需要下發(fā)的圖標(biāo)樣式并存儲在Redis里面,從而提升性能,這里就涉及Redis的批量操作問題,業(yè)務(wù)流程如下:
為了提升Redis操作性能,我們決定使用Redis Pipelining機制進(jìn)行批量執(zhí)行。
4.1 Redis 客戶端對比
針對Java技術(shù)棧而言,目前Redis使用較多的客戶端為Jedis、Lettuce和Redisson。
目前項目主要是基于SpringBoot開發(fā),針對Redis,其默認(rèn)的客戶端為Lettuce,所以我們基于Lettuce客戶端進(jìn)行分析。
4.2 Spring環(huán)境下Lettuce客戶端對Pipeline的實現(xiàn)
在Spring環(huán)境下,使用Redis的Pipeline也是很簡單的。spring-data-redis提供了StringRedisTemplate簡化了對Redis的操作, ?只需要調(diào)用StringRedisTemplate的executePipelined方法就可以了,但是在參數(shù)中提供了兩種回調(diào)方式:SessionCallback和RedisCallback。
兩種使用方式如下(這里以操作set結(jié)構(gòu)為例):
-
RedisCallback的使用方式:
public void testRedisCallback() {List<Integer> ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);Integer contentId = 1;redisTemplate.executePipelined(new InsertPipelineExecutionA(ids, contentId));}@AllArgsConstructorprivate static class InsertPipelineExecutionA implements RedisCallback<Void> {private final List<Integer> ids;private final Integer contentId;@Overridepublic Void doInRedis(RedisConnection connection) DataAccessException {RedisSetCommands redisSetCommands = connection.setCommands();ids.forEach(id-> {String redisKey = "aaa:" + id;String value = String.valueOf(contentId);redisSetCommands.sAdd(redisKey.getBytes(), value.getBytes());});return null;}}
- SessionCallback的使用方式:
public void testSessionCallback() {List<Integer> ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);Integer contentId = 1;redisTemplate.executePipelined(new InsertPipelineExecutionB(ids, contentId));}@AllArgsConstructorprivate static class InsertPipelineExecutionB implements SessionCallback<Void> {private final List<Integer> ids;private final Integer contentId;@Overridepublic <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();ids.forEach(id-> {String redisKey = "aaa:" + id;String value = String.valueOf(contentId);setOperations.add(redisKey, value);});return null;}}
4.3 RedisCallBack和SessionCallback之間的比較
1、RedisCallBack和SessionCallback都可以實現(xiàn)回調(diào),通過它們可以在同一條連接中一次執(zhí)行多個redis命令。
2、RedisCallback使用的是原生RedisConnection,用起來比較麻煩,比如上面執(zhí)行set的add操作,key和value需要進(jìn)行轉(zhuǎn)換,可讀性差,但原生api提供的功能比較齊全。
3、SessionCalback提供了良好的封裝,可以優(yōu)先選擇使用這種回調(diào)方式。
最終的代碼實現(xiàn)如下:
public void executeB(List<Integer> userIds, Integer iconId) {redisTemplate.executePipelined(new InsertPipelineExecution(userIds, iconId));
}@AllArgsConstructor
private static class InsertPipelineExecution implements SessionCallback<Void> {private final List<Integer> userIds;private final Integer iconId;@Overridepublic <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();userIds.forEach(userId -> {String redisKey = "aaa:" + userId;String value = String.valueOf(iconId);setOperations.add(redisKey, value);});return null;}
}
4.4 源碼分析
那么為什么使用Pipeline方式會對性能有較大提升呢,我們現(xiàn)在從源碼入手著重分析一下:
4.4.1 Pipeline方式下獲取連接相關(guān)原理分析:
@Overridepublic List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");Assert.notNull(session, "Callback object must not be null");//1. 獲取對應(yīng)的Redis連接工廠RedisConnectionFactory factory = getRequiredConnectionFactory();//2. 綁定連接過程RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);try {//3. 執(zhí)行命令流程, 這里請求參數(shù)為RedisCallback, 里面有對應(yīng)的回調(diào)操作return execute((RedisCallback<List<Object>>) connection -> {//具體的回調(diào)邏輯connection.openPipeline();boolean PipelinedClosed = false;try {//執(zhí)行命令Object result = executeSession(session);if (result != null) {throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the Pipeline");}List<Object> closePipeline = connection.closePipeline(); PipelinedClosed = true;return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);} finally {if (!PipelinedClosed) {connection.closePipeline();}}});} finally {RedisConnectionUtils.unbindConnection(factory);}}
① 獲取對應(yīng)的Redis連接工廠,這里要使用Pipeline特性需要使用LettuceConnectionFactory方式,這里獲取的連接工廠就是LettuceConnectionFactory。
② 綁定連接過程,具體指的是將當(dāng)前連接綁定到當(dāng)前線程上面, 核心方法為:doGetConnection。
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,boolean enableTransactionSupport) {Assert.notNull(factory, "No RedisConnectionFactory specified");//核心類,有緩存作用,下次可以從這里獲取已經(jīng)存在的連接RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);//如果connHolder不為null, 則獲取已經(jīng)存在的連接, 提升性能if (connHolder != null) {if (enableTransactionSupport) {potentiallyRegisterTransactionSynchronisation(connHolder, factory);}return connHolder.getConnection();}......//第一次獲取連接,需要從Redis連接工廠獲取連接RedisConnection conn = factory.getConnection();//bind = true 執(zhí)行綁定if (bind) {RedisConnection connectionToBind = conn;......connHolder = new RedisConnectionHolder(connectionToBind);//綁定核心代碼: 將獲取的連接和當(dāng)前線程綁定起來TransactionSynchronizationManager.bindResource(factory, connHolder);......return connHolder.getConnection();}return conn;}
里面有個核心類RedisConnectionHolder,我們看一下RedisConnectionHolder connHolder =?
(RedisConnectionHolder)?TransactionSynchronizationManager.getResource(factory);
@Nullablepublic static Object getResource(Object key) {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Object value = doGetResource(actualKey);if (value != null && logger.isTraceEnabled()) {logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +Thread.currentThread().getName() + "]");}return value;}
里面有一個核心方法doGetResource(actualKey),大家很容易猜測這里涉及到一個map結(jié)構(gòu),如果我們看源碼,也確實是這樣一個結(jié)構(gòu)。
@Nullableprivate static Object doGetResource(Object actualKey) {Map<Object, Object> map = resources.get();if (map == null) {return null;}Object value = map.get(actualKey);// Transparently remove ResourceHolder that was marked as void...if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {map.remove(actualKey);// Remove entire ThreadLocal if empty...if (map.isEmpty()) {resources.remove();}value = null;}return value;}
resources是一個ThreadLocal類型,這里會涉及到根據(jù)RedisConnectionFactory獲取到連接connection的邏輯,如果下一次是同一個actualKey,那么就直接使用已經(jīng)存在的連接,而不需要新建一個連接。第一次這里map為null,就直接返回了,然后回到doGetConnection方法,由于這里bind為true,我們會執(zhí)行TransactionSynchronizationManager.bindResource(factory, connHolder);,也就是將連接和當(dāng)前線程綁定了起來。
public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, "Value must not be null");Map<Object, Object> map = resources.get();// set ThreadLocal Map if none foundif (map == null) {map = new HashMap<>();resources.set(map);}Object oldValue = map.put(actualKey, value);......}
③?我們回到executePipelined,在獲取到連接工廠,將連接和當(dāng)前線程綁定起來以后,就開始需要正式去執(zhí)行命令了, 這里會調(diào)用execute方法
@Override
@Nullable
public <T> T execute(RedisCallback<T> action) {return execute(action, isExposeConnection());
}
這里我們注意到execute方法的入?yún)镽edisCallback<T>action,RedisCallback對應(yīng)的doInRedis操作如下,這里在后面的調(diào)用過程中會涉及到回調(diào)。
connection.openPipeline();
boolean PipelinedClosed = false;
try {Object result = executeSession(session);if (result != null) {throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the Pipeline");}List<Object> closePipeline = connection.closePipeline(); PipelinedClosed = true;return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {if (!PipelinedClosed) {connection.closePipeline();}
}
我們再來看execute(action,?isExposeConnection())方法,這里最終會調(diào)用<T>execute(RedisCallback<T>action, boolean exposeConnection, boolean Pipeline)方法。
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean Pipeline) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");Assert.notNull(action, "Callback object must not be null");//獲取對應(yīng)的連接工廠RedisConnectionFactory factory = getRequiredConnectionFactory();RedisConnection conn = null;try {if (enableTransactionSupport) {// only bind resources in case of potential transaction synchronizationconn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);} else {//獲取對應(yīng)的連接(enableTransactionSupport=false) conn = RedisConnectionUtils.getConnection(factory);}boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);RedisConnection connToUse = preProcessConnection(conn, existingConnection);boolean PipelineStatus = connToUse.isPipelined();if (Pipeline && !PipelineStatus) {connToUse.openPipeline();}RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));//核心方法,這里就開始執(zhí)行回調(diào)操作T result = action.doInRedis(connToExpose);// close Pipelineif (Pipeline && !PipelineStatus) {connToUse.closePipeline();}// TODO: any other connection processing?return postProcessResult(result, connToUse, existingConnection);} finally {RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);}
}
我們看到這里最開始也是獲取對應(yīng)的連接工廠,然后獲取對應(yīng)的連接(enableTransactionSupport=false),具體調(diào)用是RedisConnectionUtils.getConnection(factory)方法,最終會調(diào)用RedisConnection doGetConnection(RedisConnectionFactory factory, booleanallowCreate, boolean bind, boolean enableTransactionSupport),此時bind為false
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,boolean enableTransactionSupport) {Assert.notNull(factory, "No RedisConnectionFactory specified");//直接獲取與當(dāng)前線程綁定的Redis連接RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);if (connHolder != null) {if (enableTransactionSupport) {potentiallyRegisterTransactionSynchronisation(connHolder, factory);}return connHolder.getConnection();}......return conn;
}
前面我們分析過一次,這里調(diào)用RedisConnectionHolder connHolder =?(RedisConnectionHolder)TransactionSynchronizationManager.getResource(factory);會獲取到之前和當(dāng)前線程綁定的Redis,而不會新創(chuàng)建一個連接。
然后會去執(zhí)行T result = action.doInRedis(connToExpose),這里的action為RedisCallback,執(zhí)行doInRedis為:
//開啟Pipeline功能
connection.openPipeline();
boolean PipelinedClosed = false;
try {//執(zhí)行Redis命令Object result = executeSession(session);if (result != null) {throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the Pipeline");}List<Object> closePipeline = connection.closePipeline(); PipelinedClosed = true;return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {if (!PipelinedClosed) {connection.closePipeline();}
}
這里最開始會開啟Pipeline功能,然后執(zhí)行Object result = executeSession(session);
private Object executeSession(SessionCallback<?> session) {return session.execute(this);
}
這里會調(diào)用我們自定義的execute方法
@AllArgsConstructor
private static class InsertPipelineExecution implements SessionCallback<Void> {private final List<Integer> userIds;private final Integer iconId;@Overridepublic <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();userIds.forEach(userId -> {String redisKey = "aaa:" + userId;String value = String.valueOf(iconId);setOperations.add(redisKey, value);});return null;}
}
進(jìn)入到foreach循環(huán),執(zhí)行DefaultSetOperations的add方法。
@Override
public Long add(K key, V... values) {byte[] rawKey = rawKey(key);byte[][] rawValues = rawValues((Object[]) values);//這里的connection.sAdd是后續(xù)回調(diào)要執(zhí)行的方法return execute(connection -> connection.sAdd(rawKey, rawValues), true);
}
這里會繼續(xù)執(zhí)行redisTemplate的execute方法,里面最終會調(diào)用我們之前分析過的
<T>T execute(RedisCallback<T>action, boolean exposeConnection, boolean Pipeline)方法。
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean Pipeline) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");Assert.notNull(action, "Callback object must not be null");RedisConnectionFactory factory = getRequiredConnectionFactory();RedisConnection conn = null;try {......//再次執(zhí)行回調(diào)方法,這里執(zhí)行的Redis基本數(shù)據(jù)結(jié)構(gòu)對應(yīng)的操作命令T result = action.doInRedis(connToExpose);......// TODO: any other connection processing?return postProcessResult(result, connToUse, existingConnection);} finally {RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);}
}
這里會繼續(xù)執(zhí)行T result =?action.doInRedis(connToExpose);,這里其實執(zhí)行的doInRedis方法為:
connection -> connection.sAdd(rawKey, rawValues)
4.4.2 Pipeline方式下執(zhí)行命令的流程分析:
① 接著上面的流程分析,這里的sAdd方法實際調(diào)用的是DefaultStringRedisConnection的sAdd方法
@Override
public Long sAdd(byte[] key, byte[]... values) {return convertAndReturn(delegate.sAdd(key, values), identityConverter);
}
②?這里會進(jìn)一步調(diào)用DefaultedRedisConnection的sAdd方法
@Override
@Deprecated
default Long sAdd(byte[] key, byte[]... values) {return setCommands().sAdd(key, values);
}
③?接著調(diào)用LettuceSetCommands的sAdd方法
@Override
public Long sAdd(byte[] key, byte[]... values) {Assert.notNull(key, "Key must not be null!");Assert.notNull(values, "Values must not be null!");Assert.noNullElements(values, "Values must not contain null elements!");try {// 如果開啟了 Pipelined 模式,獲取的是 異步連接,進(jìn)行異步操作if (isPipelined()) { Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));return null;}if (isQueueing()) {transaction(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));return null;}//常規(guī)模式下,使用的是同步操作return getConnection().sadd(key, values);} catch (Exception ex) {throw convertLettuceAccessException(ex);}
}
這里我們開啟了Pipeline, 實際會調(diào)用Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values))); 也就是獲取異步連接getAsyncConnection,然后進(jìn)行異步操作sadd,而常規(guī)模式下,使用的是同步操作,所以在Pipeline模式下,執(zhí)行效率更高。
從上面的獲取連接和具體命令執(zhí)行相關(guān)源碼分析可以得出使用Lettuce客戶端Pipeline模式高效的根本原因:
-
普通模式下,每執(zhí)行一個命令都需要先打開一個連接,命令執(zhí)行完畢以后又需要關(guān)閉這個連接,執(zhí)行下一個命令時,又需要經(jīng)過連接打開和關(guān)閉的流程;而Pipeline的所有命令的執(zhí)行只需要經(jīng)過一次連接打開和關(guān)閉。
-
普通模式下命令的執(zhí)行是同步阻塞模式,而Pipeline模式下命令的執(zhí)行是異步非阻塞模式。
五、項目中遇到的坑
前面介紹了涉及到批量操作,可以使用Redis Pipelining機制,那是不是任何批量操作相關(guān)的場景都可以使用呢,比如list類型數(shù)據(jù)的批量移除操作,我們的代碼最開始是這么寫的:
public void deleteSet(String updateKey, Set<Integer> userIds) {if (CollectionUtils.isEmpty(userIds)) {return;}redisTemplate.executePipelined(new DeleteListCallBack(userIds, updateKey));}@AllArgsConstructor
private static class DeleteListCallBack implements SessionCallback<Object> {private Set<Integer> userIds;private String updateKey;@Overridepublic <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {ListOperations<String, String> listOperations = (ListOperations<String, String>) operations.opsForList();userIds.forEach(userId -> listOperations.remove(updateKey, 1, userId.toString()));return null;}
}
在數(shù)據(jù)量比較小的時候沒有出現(xiàn)問題,直到有一條收到了Redis的內(nèi)存和cpu利用率的告警消息,我們發(fā)現(xiàn)這么使用是有問題的,核心原因在于list的lrem操作的時間復(fù)雜度是O(N+M),其中N是list的長度, M是要移除的元素的個數(shù),而我們這里還是一個一個移除的,當(dāng)然會導(dǎo)致Redis數(shù)據(jù)積壓和cpu每秒ops升高導(dǎo)致cpu利用率飚高。也就是說,即使使用Pipeline進(jìn)行批量操作,但是由于單次操作很耗時,是會導(dǎo)致整個Redis出現(xiàn)問題的。
后面我們進(jìn)行了優(yōu)化,選用了list的ltrim命令,一次命令執(zhí)行批量remove操作:
public void deleteSet(String updateKey, Set<Integer> deviceIds) {if (CollectionUtils.isEmpty(deviceIds)) {return;}int maxSize = 10000;redisTemplate.opsForList().trim(updateKey, maxSize + 1, -1);}
由于ltrim本身的時間復(fù)雜度為O(M), 其中M要移除的元素的個數(shù),相比于原始方案的lrem,效率提升很多,可以不需要使用Redis Pipeline,優(yōu)化結(jié)果使得Redis內(nèi)存利用率和cpu利用率都極大程度得到緩解。
六、Redisson 對 Redis Pipeline 特性支持
在redisson官方文檔中額外特性介紹中有說到批量命令執(zhí)行這個特性, 也就是多個命令在一次網(wǎng)絡(luò)調(diào)用中集中發(fā)送,該特性是RBatch這個類支持的,從這個類的描述來看,主要是為Redis Pipeline這個特性服務(wù)的,并且主要是通過隊列和異步實現(xiàn)的。
/*** Interface for using Redis Pipeline feature.* <p>* All method invocations on objects got through this interface* are batched to separate queue and could be executed later* with <code>execute()</code> or <code>executeAsync()</code> methods.*** @author Nikita Koksharov**/
public interface RBatch {/*** Returns stream instance by <code>name</code>** @param <K> type of key* @param <V> type of value* @param name of stream* @return RStream object*/<K, V> RStreamAsync<K, V> getStream(String name);/*** Returns stream instance by <code>name</code>* using provided <code>codec</code> for entries.** @param <K> type of key* @param <V> type of value* @param name - name of stream* @param codec - codec for entry* @return RStream object*/<K, V> RStreamAsync<K, V> getStream(String name, Codec codec);....../*** Returns list instance by name.** @param <V> type of object* @param name - name of object* @return List object*/<V> RListAsync<V> getList(String name);<V> RListAsync<V> getList(String name, Codec codec);....../*** Executes all operations accumulated during async methods invocations.* <p>* If cluster configuration used then operations are grouped by slot ids* and may be executed on different servers. Thus command execution order could be changed** @return List with result object for each command* @throws RedisException in case of any error**/BatchResult<?> execute() throws RedisException;/*** Executes all operations accumulated during async methods invocations asynchronously.* <p>* In cluster configurations operations grouped by slot ids* so may be executed on different servers. Thus command execution order could be changed** @return List with result object for each command*/RFuture<BatchResult<?>> executeAsync();/*** Discard batched commands and release allocated buffers used for parameters encoding.*/void discard();/*** Discard batched commands and release allocated buffers used for parameters encoding.** @return void*/RFuture<Void> discardAsync();}
簡單的測試代碼如下:
@Slf4j
public class RedisPipelineTest {public static void main(String[] args) {//Redisson使用Pipeline模式Config config = new Config();config.useSingleServer().setAddress("redis://xx.xx.xx.xx:6379");RedissonClient redisson = Redisson.create(config);RBatch redisBatch = redisson.createBatch();int size = 100000;String zSetKey = "Pipeline-test-set";long begin = System.currentTimeMillis();//將命令放入隊列中for (int i = 0; i < size; i++) {redisBatch.getSet(zSetKey + i).addAsync("ccc");}//批量執(zhí)行命令redisBatch.execute();log.info("Redisson Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));//關(guān)閉redisson.shutdown();}
}
核心方法分析:
1.建Redisson客戶端RedissonClient redisson = redisson.create(config), 該方法最終會調(diào)用Reddison的構(gòu)造方法Redisson(Config config)。
protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);connectionManager = ConfigSupport.createConnectionManager(configCopy);RedissonObjectBuilder objectBuilder = null;if (config.isReferenceEnabled()) {objectBuilder = new RedissonObjectBuilder(this);}//新建異步命令執(zhí)行器commandExecutor = new CommandSyncService(connectionManager, objectBuilder);//執(zhí)行刪除超時任務(wù)的定時器evictionScheduler = new EvictionScheduler(commandExecutor);writeBehindService = new WriteBehindService(commandExecutor);
}
該構(gòu)造方法中會新建異步命名執(zhí)行器CommandAsyncExecutor commandExecutor和用戶刪除超時任務(wù)的EvictionScheduler evictionScheduler。
2.創(chuàng)建RBatch實例RBatch redisBatch = redisson.createBatch(), 該方法會使用到步驟1中的commandExecutor和evictionScheduler實例對象。
@Override
public RBatch createBatch(BatchOptions options) {return new RedissonBatch(evictionScheduler, commandExecutor, options);
}public RedissonBatch(EvictionScheduler evictionScheduler, CommandAsyncExecutor executor, BatchOptions options) {this.executorService = new CommandBatchService(executor, options);this.evictionScheduler = evictionScheduler;
}
其中的options對象會影響后面批量執(zhí)行命令的流程。
3. 異步給set集合添加元素的操作addAsync,這里會具體調(diào)用RedissonSet的addAsync方法
@Override
public RFuture<Boolean> addAsync(V e) {String name = getRawName(e);return commandExecutor.writeAsync(name, codec, RedisCommands.SADD_SINGLE, name, encode(e));
}
(1)接著調(diào)用CommandAsyncExecutor的異步寫入方法writeAsync。
@Override
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {RPromise<R> mainPromise = createPromise();NodeSource source = getNodeSource(key);async(false, source, codec, command, params, mainPromise, false);return mainPromise;
}
(2)?接著調(diào)用批量命令執(zhí)行器CommandBatchService的異步發(fā)送命令。
@Override
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect) {if (isRedisBasedQueue()) {boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;RedisExecutor<V, R> executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise,false, connectionManager, objectBuilder, commands, connections, options, index, executed, latch, referenceType);executor.execute();} else {//執(zhí)行分支RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise,false, connectionManager, objectBuilder, commands, options, index, executed, referenceType);executor.execute();}}
(3)?接著調(diào)用了RedisBatchExecutor.execute方法和BaseRedisBatchExecutor.addBatchCommandData方法。
@Override
public void execute() {addBatchCommandData(params);
}protected final void addBatchCommandData(Object[] batchParams) {MasterSlaveEntry msEntry = getEntry(source);Entry entry = commands.get(msEntry);if (entry == null) {entry = new Entry();Entry oldEntry = commands.putIfAbsent(msEntry, entry);if (oldEntry != null) {entry = oldEntry;}}if (!readOnlyMode) {entry.setReadOnlyMode(false);}Codec codecToUse = getCodec(codec);BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codecToUse, command, batchParams, index.incrementAndGet());entry.getCommands().add(commandData);
}
這里的commands以主節(jié)點為KEY,以待發(fā)送命令隊列列表為VALUE(Entry),保存一個MAP.然后會把命令都添加到entry的commands命令隊列中, Entry結(jié)構(gòu)如下面代碼所示。
public static class Entry {Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<>();volatile boolean readOnlyMode = true;public Deque<BatchCommandData<?, ?>> getCommands() {return commands;}public void setReadOnlyMode(boolean readOnlyMode) {this.readOnlyMode = readOnlyMode;}public boolean isReadOnlyMode() {return readOnlyMode;}public void clearErrors() {for (BatchCommandData<?, ?> commandEntry : commands) {commandEntry.clearError();}}}
4. 批量執(zhí)行命令redisBatch.execute(),這里會最終調(diào)用CommandBatchService的executeAsync方法,該方法完整代碼如下,我們下面來逐一進(jìn)行拆解。
public RFuture<BatchResult<?>> executeAsync() {......RPromise<BatchResult<?>> promise = new RedissonPromise<>();RPromise<Void> voidPromise = new RedissonPromise<Void>();if (this.options.isSkipResult()&& this.options.getSyncSlaves() == 0) {......} else {//這里是對異步執(zhí)行結(jié)果進(jìn)行處理,可以先忽略, 后面會詳細(xì)講,先關(guān)注批量執(zhí)行命令的邏輯voidPromise.onComplete((res, ex) -> {......});}AtomicInteger slots = new AtomicInteger(commands.size());......//真正執(zhí)行的代碼入口,批量執(zhí)行命令for (Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise,connectionManager, this.options, e.getValue(), slots, referenceType);executor.execute();}return promise;}
里面會用到我們在3.3步驟所生成的commands實例。
(1)接著調(diào)用了基類RedisExecutor的execute方法
public void execute() {......connectionFuture.onComplete((connection, e) -> {if (connectionFuture.isCancelled()) {connectionManager.getShutdownLatch().release();return;}if (!connectionFuture.isSuccess()) {connectionManager.getShutdownLatch().release();exception = convertException(connectionFuture);return;}//調(diào)用RedisCommonBatchExecutor的sendCommand方法, 里面會將多個命令放到一個List<CommandData<?, ?>> list列表里面sendCommand(attemptPromise, connection);writeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {checkWriteFuture(writeFuture, attemptPromise, connection);}});});......}
(2)接著調(diào)用RedisCommonBatchExecutor的sendCommand方法,里面會將多個命令放到一個List<commanddata> list列表里面。
@Overrideprotected void sendCommand(RPromise<Void> attemptPromise, RedisConnection connection) {boolean isAtomic = options.getExecutionMode() != ExecutionMode.IN_MEMORY;boolean isQueued = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC|| options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC;//將多個命令放到一個List<CommandData<?, ?>> list列表里面List<CommandData<?, ?>> list = new ArrayList<>(entry.getCommands().size());if (source.getRedirect() == Redirect.ASK) {RPromise<Void> promise = new RedissonPromise<Void>();list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));}for (CommandData<?, ?> c : entry.getCommands()) {if ((c.getPromise().isCancelled() || c.getPromise().isSuccess())&& !isWaitCommand(c)&& !isAtomic) {// skip commandcontinue;}list.add(c);}......//調(diào)用RedisConnection的send方法,將命令一次性發(fā)到Redis服務(wù)器端writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0));}
(3)接著調(diào)用RedisConnection的send方法,通過Netty通信發(fā)送命令到Redis服務(wù)器端執(zhí)行,這里也驗證了Redisson客戶端底層是采用Netty進(jìn)行通信的。
public ChannelFuture send(CommandsData data) {return channel.writeAndFlush(data);
}
5. 接收返回結(jié)果,這里主要是監(jiān)聽事件是否完成,然后組裝返回結(jié)果, 核心方法是步驟4提到的CommandBatchService的executeAsync方法,里面會對返回結(jié)果進(jìn)行監(jiān)聽和處理, 核心代碼如下:
public RFuture<BatchResult<?>> executeAsync() {......RPromise<BatchResult<?>> promise = new RedissonPromise<>();RPromise<Void> voidPromise = new RedissonPromise<Void>();if (this.options.isSkipResult()&& this.options.getSyncSlaves() == 0) {......} else {voidPromise.onComplete((res, ex) -> {//對返回結(jié)果的處理executed.set(true);......List<Object> responses = new ArrayList<Object>(entries.size());int syncedSlaves = 0;for (BatchCommandData<?, ?> commandEntry : entries) {if (isWaitCommand(commandEntry)) {syncedSlaves = (Integer) commandEntry.getPromise().getNow();} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())&& !this.options.isSkipResult()) {......//獲取單個命令的執(zhí)行結(jié)果Object entryResult = commandEntry.getPromise().getNow();......//將單個命令執(zhí)行結(jié)果放到List中responses.add(entryResult);}}BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);promise.trySuccess(result);......});}......return promise;
}
這里會把單個命令的執(zhí)行結(jié)果放到responses里面,最終返回RPromise<batchresult>promise。
從上面的分析來看,Redisson客戶端對Redis Pipeline的支持也是從多個命令在一次網(wǎng)絡(luò)通信中執(zhí)行和異步處理來實現(xiàn)的。
七、總結(jié)
Redis提供了Pipelining進(jìn)行批量操作的高級特性,極大地提高了部分?jǐn)?shù)據(jù)類型沒有批量執(zhí)行命令導(dǎo)致的執(zhí)行耗時而引起的性能問題,但是我們在使用的過程中需要考慮Pipeline操作中單個命令執(zhí)行的耗時問題,否則帶來的效果可能適得其反。最后擴展分析了Redisson客戶端對Redis Pipeline特性的支持原理,可以與Lettuce客戶端對Redis Pipeline支持原理進(jìn)行比較,加深Pipeline在不同Redis客戶端實現(xiàn)方式的理解。
參考資料:
-
Redis Pipelining??
-
RedisTemplate使用Pipeline管道命令?
-
如何使用好Redis Pipeline
-
Redisson 管道批量發(fā)送命令流程分析?