c 做網站教程百度seo教程視頻
目錄
為什么有l(wèi)istpack?
listpack結構
listpack的節(jié)點entry
長度length
encoding編碼方式
listpack的API
1.創(chuàng)建listpack?
2.遍歷操作
正向遍歷
反向遍歷
3.查找元素
4.插入/替換/刪除元素
總結
為什么有l(wèi)istpack?
ziplist是存儲在連續(xù)內存空間,節(jié)省內存空間。quicklist是個節(jié)點為ziplist的雙向鏈表,其通過控制 quicklistNode 結構里的壓縮列表的大小或者元素個數,來減少連鎖更新帶來的性能影響,但這并沒有完全解決連鎖更新的問題。這是因為壓縮列表連鎖更新的問題來源于它的結構設計。
從Redis 5.0版本開始,設計了一個新的數據結構叫做listpack ,目的是替代原來的壓縮列表。在每個listpack節(jié)點中,不再保存前一個節(jié)點的長度,所以也就不存在出現連鎖更新的情況了。
Redis7.0?才將 listpack 完整替代 ziplist。這里講解的代碼版本是7.0.4。
listpack結構
?該結構沒有使用結構體來表示,通過lpNew函數創(chuàng)建listpack可以看出其結構組成。
- num of entry為listpack中的元素個數,即Entry的個數,占用2個字節(jié)。注意:這并不意味著listpack最多只能存放65535個Entry,當Entry個數大于等于65535時,num of entry設置為65535,此時如果需要獲取元素個數,需要遍歷整個listpack(這個和ziplist一樣的)。
- Entry為每個具體的節(jié)點。
- End為listpack結束標志,占用1個字節(jié),內容為0xFF。
listpack的節(jié)點entry
entry就是listpack的節(jié)點,entry的data就是存儲節(jié)點的元素值。
而為了避免ziplist引起的連鎖更新問題,listpack的entry不再像ziplist中保存前一個entry的長度,它只包含三個內容:當前元素的編碼類型(encoding)、元素數據(data)、編碼類型和元數據這兩部分的長度(length)。
其中 encoding和?length一定有值;有時 data 可能會缺失,因為對于一些小的元素可以直接將data放在encoding字段中。
長度length
- entry的length字段記錄了該entry的長度(encoding+data),注意:并不包括?length字段自身的長度,其占用的字節(jié)數小于等于5。
- length所占用的每個字節(jié)的第一個bit(最高位)用于標識:0代表結束,1代表尚未結束,每個字節(jié)只有7bit有效。
- length主要用于從后向前遍歷。當需要找到當前元素的前一個元素時,我們可以從后往前依次查找每個字節(jié),找到上一個entry的length字段的結束標識,從而可以計算出上個元素的長度。
關于length的函數有l(wèi)pDecodeBacklen和lpEncodeBacklen。
編碼長度的函數lpEncodeBacklen
length字段的 每個字節(jié)的低 7 位,記錄了實際的長度信息。這里你需要注意的是,length字段 每個字節(jié)的低 7 位采用了大端模式存儲,也就是說,length字段 的低位字節(jié)保存在內存高地址上。
//返回存儲l所需的字節(jié)數,并把l存儲在buf中
static inline unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) {if (l <= 127) {if (buf) buf[0] = l;return 1;} else if (l < 16383) {if (buf) {buf[0] = l>>7;buf[1] = (l&127)|128;}return 2;} else if (l < 2097151) {if (buf) {buf[0] = l>>14;buf[1] = ((l>>7)&127)|128;buf[2] = (l&127)|128;}return 3;} else if (l < 268435455) {if (buf) {buf[0] = l>>21;buf[1] = ((l>>14)&127)|128;buf[2] = ((l>>7)&127)|128;buf[3] = (l&127)|128;}return 4;} else {if (buf) {buf[0] = l>>28;buf[1] = ((l>>21)&127)|128;buf[2] = ((l>>14)&127)|128;buf[3] = ((l>>7)&127)|128;buf[4] = (l&127)|128;}return 5;}
}
解碼長度的函數lpDecodeBacklen
其結果返回節(jié)點的字段length值。這個時候p是一直往左移動,等到完全獲取了length值,p也移動到了length字段的首地址。之后再通過length值就可以移動到上一節(jié)點的尾地址。
/* Decode the backlen and returns it. If the encoding looks invalid (more than* 5 bytes are used), UINT64_MAX is returned to report the problem. */
static inline uint64_t lpDecodeBacklen(unsigned char *p) {uint64_t val = 0;uint64_t shift = 0;//127的二進制是01111 1111, 128的二進制是1000 0000do {val |= (uint64_t)(p[0] & 127) << shift;if (!(p[0] & 128)) break; //表示該字節(jié)的最高位是0,結束標識shift += 7;p--;if (shift > 28) return UINT64_MAX;} while(1);return val;
}
encoding編碼方式
?為了節(jié)省內存空間,listpack針對不同的數據做了不同的編碼,其數據內容是整數和字符串。
encoding由兩部分組成:區(qū)分整數和字符串的標識和data長度。
所以通過encoding是可以得出data的長度。?
整數編碼
#define LP_ENCODING_7BIT_UINT 0
#define LP_ENCODING_13BIT_INT 0xC0
#define LP_ENCODING_16BIT_INT 0xF1
#define LP_ENCODING_24BIT_INT 0xF2
#define LP_ENCODING_32BIT_INT 0xF3
#define LP_ENCODING_64BIT_INT 0xF4
?整數編碼對應的占用字節(jié)數和可以存儲的值如下圖。
整數編碼的,通過encoding的首字節(jié)的幾bit或者整個字節(jié),可以知道其儲存的整數值的長度。
?該圖片來自https://segmentfault.com/a/1190000041670843
?字符串編碼
3種字符串編碼,分別根據不同的字符串長度,采取不同的編碼方式。?
#define LP_ENCODING_6BIT_STR 0x80
#define LP_ENCODING_12BIT_STR 0xE0
#define LP_ENCODING_32BIT_STR 0xF0
?通過encoding字段的首字節(jié)就可以獲取字符串的data長度。紫色部分的就是存儲data的長度。
??該圖片來自https://segmentfault.com/a/1190000041670843
?所以,entry的encoding字段可以獲取到entry的data的長度,而length字段可以獲取到(encoding+data)的長度。
listpack有個有結構體listpackEntry。應該是在使用listpack的結構(比如zset)中會創(chuàng)建調用其。目前看該文章可以不用了解listpackEntry。
/* Each entry in the listpack is either a string or an integer. */
typedef struct {/* When string is used, it is provided with the length (slen). */unsigned char *sval;uint32_t slen;/* When integer is used, 'sval' is NULL, and lval holds the value. */long long lval;
} listpackEntry;void lpRandomPairs(unsigned char *lp, unsigned int count, listpackEntry *keys, listpackEntry *vals);
unsigned int lpRandomPairsUnique(unsigned char *lp, unsigned int count, listpackEntry *keys, listpackEntry *vals);//t_zset.c
//代碼片段,關于listpackEntry 的使用listpackEntry *keys, *vals = NULL;keys = zmalloc(sizeof(listpackEntry)*count);if (withscores)vals = zmalloc(sizeof(listpackEntry)*count);
listpack的API
1.創(chuàng)建listpack?
#define LP_HDR_SIZE 6 /* 32 bit total len + 16 bit number of elements. */unsigned char *lpNew(size_t capacity) {unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);if (lp == NULL) return NULL;lpSetTotalBytes(lp,LP_HDR_SIZE+1);lpSetNumElements(lp,0); //設置元素個數lp[LP_HDR_SIZE] = LP_EOF;return lp; //lp[6]即第7個字節(jié),尾部,設置為0XFF
}
?要注意的一點:不管分配的capacity多大,其創(chuàng)建listpack時候,初始化使用的長度就是只用了7字節(jié)而已,剩余的內存空間,是暫時不用。
2.遍歷操作
在listpack?中,因為每個節(jié)點只記錄了自己的長度,而不會像ziplist那樣記錄前一節(jié)點的長度。所以,在對listpack進行新增和修改元素時候,實際上只會涉及到對自己節(jié)點的操作,而不會影響后續(xù)節(jié)點的長度變化,這樣就避免了連鎖更新。
那沒有了prelen,那listpack還支持正向或反向遍歷嗎?這當然支持的。
?提供了正向遍歷和反向遍歷,參數lp是listpack的首地址,p是要要查找的entry。
unsigned char *lpFirst(unsigned char *lp);
unsigned char *lpLast(unsigned char *lp);
unsigned char *lpNext(unsigned char *lp, unsigned char *p);
unsigned char *lpPrev(unsigned char *lp, unsigned char *p);
正向遍歷
- 根據當前節(jié)點的第1個字節(jié)獲取到當前節(jié)點的編碼類型(是整數還是字符串),并根據編碼類型計算當前entry的存儲encoding所用的字節(jié)數和data的長度
- 再通過lpEncodeBacklen函數獲取存儲length所占用的字節(jié)數,這樣就知道整個entry的長度了。
//返回值是下一節(jié)點的首地址,參數p是當前節(jié)點的首地址,
unsigned char *lpNext(unsigned char *lp, unsigned char *p) {assert(p);p = lpSkip(p);if (p[0] == LP_EOF) return NULL;//這個就是判斷p是否還在listpack的內存范圍內,不然就報錯lpAssertValidEntry(lp, lpBytes(lp), p);return p;
}unsigned char *lpSkip(unsigned char *p) {//該函數是返回entry的length字段的值,不是返回存儲length字段所占用的字節(jié)數unsigned long entrylen = lpCurrentEncodedSizeUnsafe(p);entrylen += lpEncodeBacklen(NULL,entrylen); //返回存儲length字段所占用的字節(jié)數,講解length字段時候有講解了p += entrylen; //p+當前entry的整個entry的長度,即到達下一entryreturn p;
}
需要講解下lpCurrentEncodedSizeUnsafe和lpCurrentEncodedSizeBytes,這兩個函數很相似,但需要弄清楚各自的作用。
- lpCurrentEncodedSizeUnsafe是返回entry的length字段的值。
- lpCurrentEncodedSizeBytes是返回存儲entry的encoding字段所需的字節(jié)數。
static inline uint32_t lpCurrentEncodedSizeUnsafe(unsigned char *p) {if (LP_ENCODING_IS_7BIT_UINT(p[0])) return 1;if (LP_ENCODING_IS_6BIT_STR(p[0])) return 1+LP_ENCODING_6BIT_STR_LEN(p);if (LP_ENCODING_IS_13BIT_INT(p[0])) return 2;if (LP_ENCODING_IS_16BIT_INT(p[0])) return 3;if (LP_ENCODING_IS_24BIT_INT(p[0])) return 4;if (LP_ENCODING_IS_32BIT_INT(p[0])) return 5;if (LP_ENCODING_IS_64BIT_INT(p[0])) return 9;if (LP_ENCODING_IS_12BIT_STR(p[0])) return 2+LP_ENCODING_12BIT_STR_LEN(p);if (LP_ENCODING_IS_32BIT_STR(p[0])) return 5+LP_ENCODING_32BIT_STR_LEN(p);if (p[0] == LP_EOF) return 1;return 0;
}static inline uint32_t lpCurrentEncodedSizeBytes(unsigned char *p) {if (LP_ENCODING_IS_7BIT_UINT(p[0])) return 1;if (LP_ENCODING_IS_6BIT_STR(p[0])) return 1;if (LP_ENCODING_IS_13BIT_INT(p[0])) return 1;if (LP_ENCODING_IS_16BIT_INT(p[0])) return 1;if (LP_ENCODING_IS_24BIT_INT(p[0])) return 1;if (LP_ENCODING_IS_32BIT_INT(p[0])) return 1;if (LP_ENCODING_IS_64BIT_INT(p[0])) return 1;if (LP_ENCODING_IS_12BIT_STR(p[0])) return 2;if (LP_ENCODING_IS_32BIT_STR(p[0])) return 5;if (p[0] == LP_EOF) return 1;return 0;
}
反向遍歷
//參數p是當前entry的首地址
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {assert(p);//若是前面沒有節(jié)點了,就返回NULLif (p-lp == LP_HDR_SIZE) return NULL;p--; /* Seek the first backlen byte of the last element. *///p--是重點,要結合前面講解的lpDecodeBacklen函數的配圖來看uint64_t prevlen = lpDecodeBacklen(p); //獲取當前entry的encoding的占用字節(jié)數和data的長度prevlen += lpEncodeBacklen(NULL,prevlen); //獲取length字段所占用的字節(jié)數//這是prevlen就是當前entry的總長度了,p-prevlen就到達上一節(jié)點的尾地址//這里的-1,即是p+1,是因為前面p--,所以需要+1回去p -= prevlen-1; /* Seek the first byte of the previous entry. */lpAssertValidEntry(lp, lpBytes(lp), p); //進行校驗return p;
}
3.查找元素
其是在一個while循環(huán)中查找。?我們可以先跳過參數skip和函數內變量skipcnt那部分,這樣我們就可以很好理解主體了。
while循環(huán)中,調用lpGetWithSize(p, &ll, NULL, &entry_size)來獲取p位置的元素,之后進行比較,若符合條件就返回結果。(其中有很多校驗的代碼)
//從位置p開始查找元素s,s的長度是slen
unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, uint32_t slen, unsigned int skip) {int skipcnt = 0;unsigned char vencoding = 0;unsigned char *value;int64_t ll, vll;uint64_t entry_size = 123456789; /* initialized to avoid warning. */uint32_t lp_bytes = lpBytes(lp); //獲取listpack的總長度assert(p);while (p) {if (skipcnt == 0) {//該函數是返回p位置的元素值,ll是該元素的長度,entry_size是整個entry的長度value = lpGetWithSize(p, &ll, NULL, &entry_size);if (value) {/* check the value doesn't reach outside the listpack before accessing it */assert(p >= lp + LP_HDR_SIZE && p + entry_size < lp + lp_bytes);if (slen == ll && memcmp(value, s, slen) == 0) {return p;}} else {/* Find out if the searched field can be encoded. Note that* we do it only the first time, once done vencoding is set* to non-zero and vll is set to the integer value. */if (vencoding == 0) {/* If the entry can be encoded as integer we set it to* 1, else set it to UCHAR_MAX, so that we don't retry* again the next time. */if (slen >= 32 || slen == 0 || !lpStringToInt64((const char*)s, slen, &vll)) {vencoding = UCHAR_MAX;} else {vencoding = 1;}}/* Compare current entry with specified entry, do it only* if vencoding != UCHAR_MAX because if there is no encoding* possible for the field it can't be a valid integer. */if (vencoding != UCHAR_MAX && ll == vll) {return p;}}/* Reset skip count */skipcnt = skip;p += entry_size;} else {/* Skip entry */skipcnt--;p = lpSkip(p);}/* The next call to lpGetWithSize could read at most 8 bytes past `p`* We use the slower validation call only when necessary. */if (p + 8 >= lp + lp_bytes)lpAssertValidEntry(lp, lp_bytes, p);elseassert(p >= lp + LP_HDR_SIZE && p < lp + lp_bytes);if (p[0] == LP_EOF) break;}return NULL;
}
lpGetWithSize函數
根據當前節(jié)點的第1個字節(jié)獲取到當前節(jié)點的編碼類型(是整數還是字符串),并根據編碼類型計算當前entry的存儲encoding所用的字節(jié)數和data的長度,若是字符串就直接返回字符串。
//count是p指向的entry的字段data的長度,entry_size是整個entry的長度
static inline unsigned char *lpGetWithSize(unsigned char *p, int64_t *count, unsigned char *intbuf, uint64_t *entry_size) {int64_t val;uint64_t uval, negstart, negmax;//uval用來存儲元素,當元素是整數時候assert(p); /* assertion for valgrind (avoid NPD) */if (LP_ENCODING_IS_7BIT_UINT(p[0])) { //表明是整數negstart = UINT64_MAX; /* 7 bit ints are always positive. */negmax = 0;uval = p[0] & 0x7f;if (entry_size) *entry_size = LP_ENCODING_7BIT_UINT_ENTRY_SIZE;} else if (LP_ENCODING_IS_6BIT_STR(p[0])) { //字符串*count = LP_ENCODING_6BIT_STR_LEN(p);if (entry_size) *entry_size = 1 + *count + lpEncodeBacklen(NULL, *count + 1);return p+1;} else if (LP_ENCODING_IS_13BIT_INT(p[0])) { //表明是整數uval = ((p[0]&0x1f)<<8) | p[1];negstart = (uint64_t)1<<12;negmax = 8191;if (entry_size) *entry_size = LP_ENCODING_13BIT_INT_ENTRY_SIZE;} else if (LP_ENCODING_IS_16BIT_INT(p[0])) { //表明是整數uval = (uint64_t)p[1] |(uint64_t)p[2]<<8;negstart = (uint64_t)1<<15;negmax = UINT16_MAX;if (entry_size) *entry_size = LP_ENCODING_16BIT_INT_ENTRY_SIZE;} else if (LP_ENCODING_IS_24BIT_INT(p[0])) { //表明是整數uval = (uint64_t)p[1] |(uint64_t)p[2]<<8 |(uint64_t)p[3]<<16;negstart = (uint64_t)1<<23;negmax = UINT32_MAX>>8;if (entry_size) *entry_size = LP_ENCODING_24BIT_INT_ENTRY_SIZE;} else if (LP_ENCODING_IS_32BIT_INT(p[0])) { //表明是整數uval = (uint64_t)p[1] |(uint64_t)p[2]<<8 |(uint64_t)p[3]<<16 |(uint64_t)p[4]<<24;negstart = (uint64_t)1<<31;negmax = UINT32_MAX;if (entry_size) *entry_size = LP_ENCODING_32BIT_INT_ENTRY_SIZE;} else if (LP_ENCODING_IS_64BIT_INT(p[0])) { //表明是整數uval = (uint64_t)p[1] |(uint64_t)p[2]<<8 |(uint64_t)p[3]<<16 |(uint64_t)p[4]<<24 |(uint64_t)p[5]<<32 |(uint64_t)p[6]<<40 |(uint64_t)p[7]<<48 |(uint64_t)p[8]<<56;negstart = (uint64_t)1<<63;negmax = UINT64_MAX;if (entry_size) *entry_size = LP_ENCODING_64BIT_INT_ENTRY_SIZE;} else if (LP_ENCODING_IS_12BIT_STR(p[0])) { //字符串*count = LP_ENCODING_12BIT_STR_LEN(p);if (entry_size) *entry_size = 2 + *count + lpEncodeBacklen(NULL, *count + 2);return p+2;} else if (LP_ENCODING_IS_32BIT_STR(p[0])) { //字符串*count = LP_ENCODING_32BIT_STR_LEN(p);if (entry_size) *entry_size = 5 + *count + lpEncodeBacklen(NULL, *count + 5);return p+5;} else {uval = 12345678900000000ULL + p[0];negstart = UINT64_MAX;negmax = 0;}//存儲的是負數的一些操作/* We reach this code path only for integer encodings.* Convert the unsigned value to the signed one using two's complement* rule. */if (uval >= negstart) {/* This three steps conversion should avoid undefined behaviors* in the unsigned -> signed conversion. */uval = negmax-uval;val = uval;val = -val-1;} else {val = uval;}/* Return the string representation of the integer or the value itself* depending on intbuf being NULL or not. */if (intbuf) {*count = ll2string((char*)intbuf,LP_INTBUF_SIZE,(long long)val);return intbuf;} else {*count = val;return NULL;}
}
4.插入/替換/刪除元素
刪除,替換,插入都是該函數lpInsert
unsigned char *lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen,unsigned char *p, int where, unsigned char **newp)
{return lpInsert(lp, s, NULL, slen, p, where, newp);
}unsigned char *lpInsertInteger(unsigned char *lp, long long lval, unsigned char *p, int where, unsigned char **newp) {uint64_t enclen; /* The length of the encoded element. */unsigned char intenc[LP_MAX_INT_ENCODING_LEN];lpEncodeIntegerGetType(lval, intenc, &enclen);return lpInsert(lp, NULL, intenc, enclen, p, where, newp);
}unsigned char *lpReplace(unsigned char *lp, unsigned char **p, unsigned char *s, uint32_t slen) {return lpInsert(lp, s, NULL, slen, *p, LP_REPLACE, p);
}unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) {return lpInsert(lp,NULL,NULL,0,p,LP_REPLACE,newp);
}
主要步驟:
1.先通過where判斷插入的位置是p的前面還是后面,要是后插,就使用lpSkip函數把p移動下一接點,插入的就可以統(tǒng)一是前插;要是刪除就使用零長度元素替換。所以實際上只處理兩個情況:前插和替換。
2.判斷插入的元素是字符串還是整數,若字符串中存儲的是整數,嘗試用整數保存。獲取待插入entry的length字段的值和存儲length占用的字節(jié)數。還有替換的情況。
3.根據前面計算的,算出新listpack的總長度,進行內存分配,之后根據是前插還是替換,使用memmove函數把內存空間挪移到合適的位置
//源碼注釋的翻譯
/*listpack和其他數據結構非常不一樣的地方就在于,無論是增還是刪還是改,都用這同一個函數!
我們要操作的位置在元素p處,操作的對象是一個大小為size的字符串elestr或者整數eleint,
其中元素p的位置可以通過lpFirst(),lpLast(), lpNext(), lpPrev() or lpSeek() 找到。
通過where參數,元素會被插入到p指向的元素前面、后面或者替換該元素。并且如果elestr和elein都為空,
那么函數會刪除p指向的元素。
如果eleint不為空,size就為eleint的長度,函數會在p元素處插入或者替換一個64位的整數,
而如果elestr不為空,size則表示elestr的長度,函數會再p元素處插入或者替換一個字符串。
*/
unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char *eleint,uint32_t size, unsigned char *p, int where, unsigned char **newp)
{unsigned char intenc[LP_MAX_INT_ENCODING_LEN];unsigned char backlen[LP_MAX_BACKLEN_SIZE];//1uint64_t enclen; /* The length of the encoded element. */int delete = (elestr == NULL && eleint == NULL); //如果elestr和eleint都是空,那就是刪除p處的元素if (delete) where = LP_REPLACE; //當刪除時,它在概念上用零長度元素替換元素if (where == LP_AFTER) { //如果是后插入,就讓p移動到下一節(jié)點,這樣就變成前插。所以函數實際上只處理兩個情況:LP_BEFORE和LP_REPLACE。p = lpSkip(p);where = LP_BEFORE;ASSERT_INTEGRITY(lp, p);}unsigned long poff = p-lp; //獲取偏移量//2int enctype;if (elestr) {enctype = lpEncodeGetType(elestr,size,intenc,&enclen); //返回元素的類型(字符串或整數),把encoding字段存儲在intenc中,enclen是length字段的值if (enctype == LP_ENCODING_INT) eleint = intenc;} else if (eleint) {enctype = LP_ENCODING_INT;enclen = size; /* 'size' is the length of the encoded integer element. */} else {enctype = -1;enclen = 0;}unsigned long backlen_size = (!delete) ? lpEncodeBacklen(backlen,enclen) : 0;//返回存儲length字段所占用的字節(jié)數uint64_t old_listpack_bytes = lpGetTotalBytes(lp);uint32_t replaced_len = 0;if (where == LP_REPLACE) { //計算出替換所需的大小 replaced_len = lpCurrentEncodedSizeUnsafe(p); //獲取p位置節(jié)點的(encoding+data)的長度replaced_len += lpEncodeBacklen(NULL,replaced_len); //獲取存儲replaced_len所占用的字節(jié)數ASSERT_INTEGRITY_LEN(lp, p, replaced_len); //校驗用的//若是替換的話,這時,replaced_len就是p位置整個節(jié)點的長度}//3//設置新listpack的總長度uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size- replaced_len;if (new_listpack_bytes > UINT32_MAX) return NULL;//通過偏移量,找到原來操作元素p的位置unsigned char *dst = lp + poff; /* May be updated after reallocation. *///如果需要分配更多的空間,那就分配這個空間,如果分配失敗直接返回NULL/* Realloc before: we need more room. */if (new_listpack_bytes > old_listpack_bytes &&new_listpack_bytes > lp_malloc_size(lp)) {if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;dst = lp + poff;}/* Setup the listpack relocating the elements to make the exact room* we need to store the new one. */if (where == LP_BEFORE) { //如果是在這個位置之前插入,就調用memmove函數,把內存空間挨個的向后移動要插入的這個元素的空間memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);} else { /* LP_REPLACE. */ /*如果是替換,還是先移動出原來元素和新元素的內存差值 */long lendiff = (enclen+backlen_size)-replaced_len;memmove(dst+replaced_len+lendiff,dst+replaced_len,old_listpack_bytes-poff-replaced_len);}/* Realloc after: we need to free space. */// 如果新插入元素后listpack要用的字節(jié)數比原來的字節(jié)數少 if (new_listpack_bytes < old_listpack_bytes) {if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;dst = lp + poff;}//未完待續(xù)......................
}
4.把新listpack中原來p節(jié)點位置的節(jié)點賦值給newp
5.更新listpack的頭部,即是更新總長度和元素個數
unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char *eleint,uint32_t size, unsigned char *p, int where, unsigned char **newp)
{//該函數太長了,就分成兩部分。接著上面的講解/* Store the entry. *///把新listpack中原來p節(jié)點位置的節(jié)點賦值給newpif (newp) {*newp = dst;/* In case of deletion, set 'newp' to NULL if the next element is* the EOF element. */if (delete && dst[0] == LP_EOF) *newp = NULL;}//這個才是進行拷貝插入的數據if (!delete) {if (enctype == LP_ENCODING_INT) {memcpy(dst,eleint,enclen);} else {lpEncodeString(dst,elestr,size);}dst += enclen;memcpy(dst,backlen,backlen_size);dst += backlen_size;}/* Update header. *///更新listpack頭部,即是總長度和entry個數if (where != LP_REPLACE || delete) {uint32_t num_elements = lpGetNumElements(lp);if (num_elements != LP_HDR_NUMELE_UNKNOWN) {if (!delete)lpSetNumElements(lp,num_elements+1);elselpSetNumElements(lp,num_elements-1);}}lpSetTotalBytes(lp,new_listpack_bytes);return lp;
}
總結
從 ziplist 到 quicklist,再到 listpack,Redis中設計實現是不斷迭代優(yōu)化的。
ziplist中元素個數多了,其查找效率就降低。若是在ziplist里新增或修改數據,ziplist占用的內存空間還需要重新分配;更糟糕的是,ziplist新增或修改元素時候,可能會導致后續(xù)元素的previous_entry_length占用空間都發(fā)生變化,從而引起連鎖更新,導致每個元素的空間都需要重新分配,更加導致其訪問性能下降。
為了應對這些問題,Redis先是在3.0版本實現了quicklist結構。其是在ziplist基礎上,使用鏈表將多個ziplist串聯(lián)起來,即鏈表的每個元素是一個ziplist。這樣可以減少數據插入時內存空間的重新分配及內存數據的拷貝。而且,quicklist也限制了每個節(jié)點上ziplist的大小,要是某個ziplist的元素個數多了,會采用新增節(jié)點的方法。
但是因為quicklist使用節(jié)點結構指向了每個ziplist,這又增加了內存開銷。而為了減少內存開銷,并進一步避免ziplist連鎖更新的問題,所以就有了listpack結構。
listpack是沿用了ziplist緊湊型的內存布局。而listpack中每個節(jié)點不再包含前一節(jié)點的長度,所以當某個節(jié)點中的數據發(fā)生變化時候,導致節(jié)點的長度變化也不會影響到其他節(jié)點,這就可以避免連鎖更新的問題了。