hishop網(wǎng)站頁面排名優(yōu)化
目錄
- 秒殺優(yōu)化
- 一:異步秒殺
- 1:思路
- 2:實現(xiàn)
- 二:redis實現(xiàn)消息隊列
- 1:什么是消息隊列
- 2:基于list結(jié)構(gòu)實現(xiàn)消息隊列
- 3:基于pubsub實現(xiàn)消息隊列
- 4:基于stream實現(xiàn)消息隊列
- 5:stream的消費者組模式
- 三:基于redis的stream結(jié)構(gòu)實現(xiàn)消息隊列
秒殺優(yōu)化
一:異步秒殺
1:思路
原本我們每一個請求都是串行執(zhí)行,從頭到尾執(zhí)行完了才算一個請求處理成功,這樣過于耗時,我們看到執(zhí)行的操作中查詢優(yōu)惠券,查詢訂單,減庫存,創(chuàng)建訂單都是數(shù)據(jù)庫操作,而數(shù)據(jù)庫的性能又不是很好,我們可以將服務(wù)拆分成兩部分,將判斷優(yōu)惠券信息和校驗一人一單的操作提取出來,先執(zhí)行判斷優(yōu)惠券和校驗操作,然后直接返回訂單id,我們在陸續(xù)操作數(shù)據(jù)庫減庫存和創(chuàng)建訂單,這樣前端響應(yīng)的會非常快,并且我們可以將優(yōu)惠券和一人一單的操作放在redis中去執(zhí)行,這樣又能提高性能,然后我們將優(yōu)惠券信息,用戶信息,訂單信息,先保存在隊列里,先返回給前端數(shù)據(jù),在慢慢的根據(jù)隊列的信息去存入數(shù)據(jù)
我們之前說將查詢和校驗功能放在redis中實現(xiàn),那么用什么結(jié)構(gòu)呢,查詢訂單很簡單,只要查詢相應(yīng)的優(yōu)惠券的庫存是否大于0就行,我們就可以是否字符串結(jié)構(gòu),key存優(yōu)惠券信息,value存庫存;那么校驗?zāi)?#xff0c;因為是一人一單,所以我們可以使用set,這樣就能保證用戶的唯一性;
我們執(zhí)行的具體步驟是:先判斷庫存是否充足,不充足直接返回,充足判斷是否有資格購買,沒有返回,有就可以減庫存,然后將用戶加入集合中,在返回,因為我們執(zhí)行這些操作時要保證命令的原子性,所以這些操作我們都使用lua腳本來編寫;
具體的執(zhí)行流程就是,先執(zhí)行l(wèi)ua腳本,如果結(jié)果不是0那么直接返回,如果不是0,那么就將信息存入阻塞隊列然后返回訂單id;
2:實現(xiàn)
1:新增時添加到redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
2:lua腳本編寫:
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
return 0
然后就能改變之前的代碼,在redis中實現(xiàn)異步下單:
@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {Long id = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA, Collections.emptyList(), voucherId.toString(), id.toString());if (res!=0){return Result.fail(res==1?"庫存不足":"一人只能購買一單");}long orderID = redisIDWork.nextId("order");return Result.ok(orderID);}
初始化lua腳本文件
@Resource
private RedissonClient redissonClient2;
public static final DefaultRedisScript SECKIL_ORDER_LUA;
static {//初始化SECKIL_ORDER_LUA=new DefaultRedisScript<>();//定位到lua腳本的位置SECKIL_ORDER_LUA.setLocation(new ClassPathResource("seckill.lua"));//設(shè)置lua腳本的返回值SECKIL_ORDER_LUA.setResultType(Long.class);
}
還剩一個阻塞隊列沒有實現(xiàn):
阻塞隊列的功能就是異步的將訂單信息存入數(shù)據(jù)庫;
阻塞隊列可以使用blockdeque
BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<VoucherOrder>(1024*1024);
在類上直接初始化
然后使用的時候就是,將訂單添加到阻塞隊列,讓另一個線程去執(zhí)行,往數(shù)據(jù)庫中添加阻塞隊列中的訂單信息:
blockingQueue.add(voucherOrder);
然后就要開出一個線程,然后執(zhí)行往數(shù)據(jù)庫添加元素的任務(wù)了:
//創(chuàng)建一個線程private ExecutorService SECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();//注解PostConstruct,添加這個注解的方法就是在類初始化完成之后就會執(zhí)行;@PostConstructprivate void init(){//提交任務(wù)SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());}//定義一個任務(wù)內(nèi)部類,實現(xiàn)Runnable,然后需要實現(xiàn)run方法,run方法中就是我們的任務(wù)private class VoucherOrderHandle implements Runnable {@Overridepublic void run() {try {//從阻塞隊列中取出訂單VoucherOrder voucherOrder = blockingQueue.take();//執(zhí)行方法handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.info("下單業(yè)務(wù)異常",e);}}}
當類加載是就會一直提交任務(wù),只要阻塞隊列里有訂單,就會將訂單取出然后調(diào)用方法將訂單存入數(shù)據(jù)庫
調(diào)用的方法是嘗試獲取鎖的方法,而獲取鎖其實并不需要,因為我們自己開出來的線程只有一個是單線程,而且在lua腳本中已經(jīng)對一人一單還有超賣問題進行處理,這里只是為了更加保險
@Transactionalpublic void handleVoucherOrder(VoucherOrder voucherOrder) throws InterruptedException {
// SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order" + id, stringRedisTemplate);Long userId = voucherOrder.getUserId();RLock simpleRedisLock = redissonClient2.getLock("lock:order" + userId);boolean trylock = simpleRedisLock.tryLock(1L, TimeUnit.SECONDS);if (!trylock){log.info("獲取鎖失敗");}try {orderService.createVoucherOrder(voucherOrder);} catch (IllegalStateException e) {throw new RuntimeException(e);}finally {simpleRedisLock.unlock();}}
然后獲取鎖成功后就會調(diào)用方法執(zhí)行數(shù)據(jù)庫操作,但是這個方法是帶有事務(wù)的,我們單獨開出來的子線程無法使事務(wù)生效,只能在方法的外部聲明一個代理對象,然后通過代理對象去調(diào)用方法使事務(wù)生效;
@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Integer count = query().eq("user_id", voucherOrder.getUserId()).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.info("一個用戶只能下一單");}//進行更新,庫存減一boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();// where id = ? and stock > 0//扣減失敗,返回錯誤信息;if (!success) {log.info("扣減失敗");}save(voucherOrder);}
因為我們是開出來的子線程調(diào)用的方法,所以不能從線程中獲取值,只能從我們傳入的訂單對象獲取,然后就是減庫存和存入訂單的操作了;
總結(jié):
我們使用異步操作,將下單和存入訂單分開來執(zhí)行,大大提高了執(zhí)行的銷量,在redis中完成超賣和一人一單的問題;
然后使用阻塞隊列,開出一個子線程異步存入數(shù)據(jù)庫下單;
問題:
我們的阻塞隊列是在jvm中的,jvm中內(nèi)存是有上線的,超過上限就會有異常,還有就是我們的數(shù)據(jù)都是存放在內(nèi)存中,要是出現(xiàn)了一些事故會導致數(shù)據(jù)丟失
二:redis實現(xiàn)消息隊列
1:什么是消息隊列
消息隊列由三個角色構(gòu)成:
1:生產(chǎn)者:發(fā)送消息到消息隊列
2:消息隊列:存儲和管理消息隊列,也被稱為消息代理
3:消費者:從消息隊列中獲取消息并處理
好的消息隊列有這幾個特點:
1:有獨立的服務(wù),獨立的內(nèi)存;
2:可以做到數(shù)據(jù)的持久化
3:能夠發(fā)送消息給消費者并且確保消息處理完成
2:基于list結(jié)構(gòu)實現(xiàn)消息隊列
使用brpop可以實現(xiàn)阻塞獲取
3:基于pubsub實現(xiàn)消息隊列
4:基于stream實現(xiàn)消息隊列
stream發(fā)送消息的方式xadd key * msg
key是指消息隊列的名稱,* 是發(fā)送消息的名稱由redis來生成,后面的msg就是鍵值對,我們要發(fā)宋的消息
xread是讀取消息的命令:count指定讀取消息的數(shù)量,block指定阻塞時間,不指定就是不阻塞,指定0就是無限等待,sreams 是消息隊列的名稱,可以是多個,id是消息的id,0是從0開始讀,$是從最新的開始讀
但是有個問題就是,指定$是獲取最新的消息,但是只是獲取使用這個命令之后最新的消息,而如果一次性發(fā)多條,只會獲取最后一個,就會出現(xiàn)漏消息;
5:stream的消費者組模式
消費者組就是將消費者劃分到一個組中監(jiān)聽一個消息隊列:
有這些好處:
1:消息分流:消息發(fā)送到消費者組中,消費者會處于競爭關(guān)系,會爭奪消息來處理,這個發(fā)送多個消息就會實現(xiàn)分流,就會由不同的消費者來處理,加快了處理速度;
2:消息標識:在讀取消息后會記錄最后一個被處理的消息,這樣就不會出現(xiàn)消息漏讀的情況;
3:消息確認:消息發(fā)出去會,消息會處于pending狀態(tài),會等待消息處理完畢,這個時候會將消息存入pendinglist中,當處理完后才會從pending中移除;確保了消息的安全性,保證消息不會丟失,就算再消息發(fā)出去后,服務(wù)宕機了,也能知道該消息沒有被處理,這個功能的作用就是確保消息至少被消費一次;
三:基于redis的stream結(jié)構(gòu)實現(xiàn)消息隊列
首先再redis客戶端中輸入命令創(chuàng)建一個隊列和接受這個隊列消息的組
然后修改秒殺下單的lua腳本,直接在redis中通過消息隊列將消息發(fā)送給消費者:
local orderId=ARGV[3]
local stock =tonumber(redis.call('get', 'seckill:stock:' .. ARGV[1]))
if (stock<=0) thenreturn 1
end
local userId=ARGV[2]
local isok=tonumber(redis.call('sadd','seckill:order:'..ARGV[1],userId))
if isok==0 thenreturn 2
end
redis.call('incrby','seckill:stock:'..ARGV[1],-1)
--將消息發(fā)送給stream.orders隊列
redis.call('xadd','stream.orders','*','userId',userId,'id',orderId,'voucherId',ARGV[1])
return 0
這里發(fā)送的是優(yōu)惠券id,用戶id還有訂單id,正是我們存入數(shù)據(jù)庫中所需要的參數(shù)
然后就可以去修改前面秒殺下單的邏輯,不用去將消息放到阻塞隊列,我們直接從redis的隊列中取出就行;
@Override
public Result seckilOrder(Long voucherId) throws InterruptedException {long orderId = redisIDWork.nextId("order");Long userId = UserHolder.getUser().getId();Long res = (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA,Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId));if (res != 0) {return Result.fail(res == 1 ? "庫存不足" : "一人只能購買一單");}orderService = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);
}
這里我們需要將訂單id作為lua腳本的參數(shù)傳入進去,然后將訂單信息存入阻塞隊列的操作可以省略,因為我們已經(jīng)將訂單信息存入了redis中的消息隊列;
然后這里我們需要單獨開出一個線程去將隊列中的消息存入數(shù)據(jù)庫:
private class VoucherOrderHandle implements Runnable {String ququeName="stream.orders";@Overridepublic void run() {try {//從消息隊列中取出訂單while (true){//xreadgroup GROUP group consumer count(1) block(2000) streams key >List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(ququeName, ReadOffset.lastConsumed()));//如果消息為空就繼續(xù)等待接收if (msg==null||msg.isEmpty()){continue;}//因為每次讀取一個消息,所以我們獲取第一個消息MapRecord<String, Object, Object> entries = msg.get(0);//獲取消息的值,是一些我們傳入的鍵值對Map<Object, Object> value = entries.getValue();//將map轉(zhuǎn)成voucherorder對象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//執(zhí)行方法handleVoucherOrder(voucherOrder);//確認消息已經(jīng)處理stringRedisTemplate.opsForStream().acknowledge(ququeName,"g1",entries.getId());}} catch (InterruptedException e) {log.info("下單業(yè)務(wù)異常",e);handleVoucherOrderError();}}
我們要做的就是接受消息,然后再將消息存入數(shù)據(jù)庫:
我們調(diào)用stream的方法,作為消費者從隊列中讀取消息,阻塞時間是2秒,每次讀取一個消息,從下一個未消費的消息讀取,如果讀取的消息為空那么就繼續(xù)循環(huán)讀取消息,如果有消息就將消息取出,然后將其轉(zhuǎn)成對象map,再將其轉(zhuǎn)成對象,然后再去做確認消息的處理,如果不確認消息,消息就會存在待處理的隊列中;如果出現(xiàn)的異常,那么我們?nèi)〕龅南⒖赡軟]有進行確認,沒有確認的會存入待處理隊列,我們就要從隊列里取出然后進行處理;
出錯只會執(zhí)行的方法:
private void handleVoucherOrderError() {try {//從消息隊列中取出訂單while (true){//xreadgroup GROUP group consumer count(1) streams key 0,表示從第一個未處理的消息開始讀取List<MapRecord<String, Object, Object>> msg = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1), StreamOffset.create(ququeName, ReadOffset.from("0")));//如果為空就說明沒有待處理的消息結(jié)束就行if (msg==null||msg.isEmpty()){break;}//因為每次讀取一個消息,所以我們獲取第一個消息MapRecord<String, Object, Object> entries = msg.get(0);//獲取消息的值,是一些我們傳入的鍵值對Map<Object, Object> value = entries.getValue();//將map轉(zhuǎn)成voucherorder對象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//執(zhí)行方法handleVoucherOrder(voucherOrder);}} catch (InterruptedException e) {log.info("下單業(yè)務(wù)異常",e);}}
}
這里因為是再待處理中直接取出,所以不用阻塞處理,然后從待消費隊列中第一個消息開始讀,如果為空,那么就說明沒有待處理的消息,我們直接返回就行,如果不為空我們再處理
這樣使用redis中的消息隊列就實現(xiàn)了:1:獨立的服務(wù),足夠的內(nèi)存;2:有確認機制,避免消息漏讀;3:消息持久化
BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);
//執(zhí)行方法
handleVoucherOrder(voucherOrder);
}
} catch (InterruptedException e) {
log.info(“下單業(yè)務(wù)異?!?e);
}
}
}
> 這里因為是再待處理中直接取出,所以不用阻塞處理,然后從待消費隊列中第一個消息開始讀,如果為空,那么就說明沒有待處理的消息,我們直接返回就行,如果不為空我們再處理這樣使用redis中的消息隊列就實現(xiàn)了:1:獨立的服務(wù),足夠的內(nèi)存;2:有確認機制,避免消息漏讀;3:消息持久化