政府網(wǎng)站信息建設工作軟文代寫兼職
提示:本文章非原創(chuàng),記錄一下優(yōu)秀的干貨。
[原創(chuàng)參考]:https://blog.csdn.net/qq_40378034/article/details/117014648
前言
ZooKeeper源碼的zookeeper-recipes目錄下提供了分布式隊列、分布式鎖和選舉的實現(xiàn)GitHub地址。
本文主要對這幾種實現(xiàn)做實現(xiàn)原理的解析和源碼剖析.
一、分布式隊列
使用路徑為/queue
的znode
下的節(jié)點表示隊列中的元素。
/queue
下的節(jié)點都是順序持久化znode, 這些znode名字的后綴數(shù)字表示了對應隊列元素在隊列中的位置。
znode名字后綴數(shù)字越小,對應隊列元素在隊列中的位置越靠前
- offer方法
offer方法在/queue下面創(chuàng)建一個順序znode。因為znode的后綴數(shù)字是/queue下面現(xiàn)有znode最大后綴數(shù)字加1,所以該znode對應的隊列元素處于隊尾
public class DistributedQueue {public boolean offer(byte[] data) throws KeeperException, InterruptedException {for (; ; ) {try {zookeeper.create(dir + "/" + prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);return true;} catch (KeeperException.NoNodeException e) {zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);}}}
- element方法
public class DistributedQueue {public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {Map<Long, String> orderedChildren;while (true) {try {//獲取所有排好序的子節(jié)點orderedChildren = orderedChildren(null);} catch (KeeperException.NoNodeException e) {throw new NoSuchElementException();}if (orderedChildren.size() == 0) {throw new NoSuchElementException();}//返回隊頭節(jié)點的數(shù)據(jù)for (String headNode : orderedChildren.values()) {if (headNode != null) {try {return zookeeper.getData(dir + "/" + headNode, false, null);} catch (KeeperException.NoNodeException e) {//另一個客戶端已經(jīng)移除了隊頭節(jié)點,嘗試獲取下一個節(jié)點}}}}}private Map<Long, String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {Map<Long, String> orderedChildren = new TreeMap<>();List<String> childNames;childNames = zookeeper.getChildren(dir, watcher);for (String childName : childNames) {try {if (!childName.regionMatches(0, prefix, 0, prefix.length())) {LOG.warn("Found child node with improper name: {}", childName);continue;}String suffix = childName.substring(prefix.length());Long childId = Long.parseLong(suffix);orderedChildren.put(childId, childName);} catch (NumberFormatException e) {LOG.warn("Found child node with improper format : {}", childName, e);}}return orderedChildren;}
- remove方法
public class DistributedQueue {public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {Map<Long, String> orderedChildren;while (true) {try {//獲取所有排好序的子節(jié)點orderedChildren = orderedChildren(null);} catch (KeeperException.NoNodeException e) {throw new NoSuchElementException();}if (orderedChildren.size() == 0) {throw new NoSuchElementException();}//移除隊頭節(jié)點for (String headNode : orderedChildren.values()) {String path = dir + "/" + headNode;try {byte[] data = zookeeper.getData(path, false, null);zookeeper.delete(path, -1);return data;} catch (KeeperException.NoNodeException e) {//另一個客戶端已經(jīng)移除了隊頭節(jié)點,嘗試移除下一個節(jié)點}}}}
二、分布式鎖
1.排他鎖
排他鎖的核心是如何保證當前有且僅有一個事務獲取鎖,并且鎖被釋放后,所有正在等待獲取鎖的事務都能夠被通知到
定義鎖
通過在ZooKeeper上創(chuàng)建一個子節(jié)點來表示一個鎖,例如/exclusive_lock/lock
節(jié)點就可以被定義為一個鎖
獲取鎖
在需要獲取排他鎖時,所有的客戶端都會試圖通過調(diào)用create()
接口,在/exclusive_lock
節(jié)點下創(chuàng)建臨時子節(jié)點/exclusive_lock/lock
。ZooKeeper會保證在所有的客戶端中,最終只有一個客戶能夠創(chuàng)建成功,那么就可以認為該客戶端獲取了鎖。同時,所有沒有獲取到鎖的客戶端就需要到/exclusive_lock
節(jié)點上注冊一個子節(jié)點變更的watcher監(jiān)聽,以便實時監(jiān)聽到lock節(jié)點的變更情況
釋放鎖
/exclusive_lock/lock
是一個臨時節(jié)點,因此在以下兩種情況下,都有可能釋放鎖
- 當前獲取鎖的客戶端機器發(fā)生宕機,那么ZooKeeper上的這個臨時節(jié)點就會被移除
- 正常執(zhí)行完業(yè)務邏輯后,客戶端就會主動將自己創(chuàng)建的臨時節(jié)點刪除
無論在什么情況下移除了lock節(jié)點,ZooKeeper都會通知所有在/exclusive_lock
節(jié)點上注冊了子節(jié)點變更watcher監(jiān)聽的客戶端。
這些客戶端在接收到通知后,再次重新發(fā)起分布式鎖獲取,即重復獲取鎖過程。
羊群效應
上面的排他鎖的實現(xiàn)可能引發(fā)羊群效應:當一個特定的znode改變的時候ZooKeeper觸發(fā)了所有watcher的事件,由于通知的客戶端很多,所以通知操作會造成ZooKeeper性能突然下降,這樣會影響ZooKeeper的使用
改進后的分布式鎖實現(xiàn)
獲取鎖
首先,在Zookeeper當中創(chuàng)建一個持久節(jié)點ParentLock。當?shù)谝粋€客戶端想要獲得鎖時,需要在ParentLock這個節(jié)點下面創(chuàng)建一個臨時順序節(jié)點Lock1
之后,Client1查找ParentLock下面所有的臨時順序節(jié)點并排序,判斷自己所創(chuàng)建的節(jié)點Lock1是不是順序最靠前的一個。如果是第一個節(jié)點,則成功獲得鎖
這時候,如果再有一個客戶端Client2前來獲取鎖,則在ParentLock下再創(chuàng)建一個臨時順序節(jié)點Lock2
Client2查找ParentLock下面所有的臨時順序節(jié)點并排序,判斷自己所創(chuàng)建的節(jié)點Lock2是不是順序最靠前的一個,結果發(fā)現(xiàn)節(jié)點Lock2并不是最小的,于是,Client2向排序僅比它靠前的節(jié)點Lock1注冊watcher,用于監(jiān)聽Lock1節(jié)點是否存在。
這意味著Client2搶鎖失敗,進入了等待狀態(tài)
這時候,如果又有一個客戶端Client3前來獲取鎖,則在ParentLock下再創(chuàng)建一個臨時順序節(jié)點Lock3
Client3查找ParentLock下面所有的臨時順序節(jié)點并排序,判斷自己所創(chuàng)建的節(jié)點Lock3是不是順序最靠前的一個,結果同樣發(fā)現(xiàn)節(jié)點Lock3并不是最小的
于是,Client3向排序僅比它靠前的節(jié)點Lock2注冊watcher,用于監(jiān)聽Lock2節(jié)點是否存在。這意味著Client3同樣搶鎖失敗,進入了等待狀態(tài)
這樣一來,Client1得到了鎖,Client2監(jiān)聽了Lock1,Client3監(jiān)聽了Lock2。這恰恰形成了一個等待隊列,很像是Java當中ReentrantLock所依賴的AQS
釋放鎖
釋放鎖分為兩種情況:
1.任務完成,客戶端顯示釋放:當任務完成時,Client1會顯示調(diào)用刪除節(jié)點Lock1的指令
2.任務執(zhí)行過程中,客戶端崩潰
獲得鎖的Client1在任務執(zhí)行過程中,如果客戶端崩潰,則會斷開與Zookeeper服務端的連接。根據(jù)臨時節(jié)點的特性,相關聯(lián)的節(jié)點Lock1會隨之自動刪除
由于Client2一直監(jiān)聽著Lock1的存在狀態(tài),當Lock1節(jié)點被刪除,Client2會立刻收到通知。這時候Client2會再次查詢ParentLock下面的所有節(jié)點,確認自己創(chuàng)建的節(jié)點Lock2是不是目前最小的節(jié)點。如果是最小,則Client2獲得了鎖
同理,如果Client2也因為任務完成或者節(jié)點崩潰而刪除了節(jié)點Lock2,那么Client3就會接到通知
最終,Client3成功得到了鎖
2.共享鎖
共享鎖又稱為讀鎖,在同一時刻可以允許多個線程訪問,典型的就是ReentrantReadWriteLock里的讀鎖,它的讀鎖是可以被共享的,但是它的寫鎖確實每次只能被獨占
定義鎖
和排他鎖一樣,同樣是通過ZooKeeper上的數(shù)據(jù)節(jié)點來表示一個鎖,是一個類似于/shared_lock/[Hostname]-請求類型-序號
的臨時順序節(jié)點,例如/shared_lock/192.168.0.1-R-0000000001
,那么,這個節(jié)點就代表了一個共享鎖,如下圖所示:
獲取鎖
在需要獲取共享鎖時,所有客戶端都會到/shared_lock
這個節(jié)點下面創(chuàng)建一個臨時順序節(jié)點,如果當前是讀請求,那么就創(chuàng)建。例如/shared_lock/192.168.0.1-R-0000000001
的節(jié)點;如果是寫請求,那么就創(chuàng)建例如/shared_lock/192.168.0.1-W-0000000001
的節(jié)點
判斷讀寫順序
每個鎖競爭者,只需要關注/shared_lock
節(jié)點下序號比自己小的那個節(jié)點是否存在即可,具體實現(xiàn)如下:
1)客戶端調(diào)用create()
方法創(chuàng)建一個類似于/shared_lock/[Hostname]-請求類型-序號
的臨時順序節(jié)點
2)客戶端調(diào)用getChildren()
接口來獲取所有已經(jīng)創(chuàng)建的子節(jié)點列表
3)判斷是否可以獲取共享鎖:
讀請求:沒有比自己序號小的節(jié)點或者所有比自己序號小的節(jié)點都是讀請求
寫請求:序號是否最小
4)如果無法獲取共享鎖,那么就調(diào)用exist()
來對比自己小的那個節(jié)點注冊watcher
讀請求:向比自己序號小的最后一個寫請求節(jié)點注冊watcher監(jiān)聽
寫請求:向比自己序號小的最后一個節(jié)點注冊watcher監(jiān)聽
5)等待watcher通知,繼續(xù)進入步驟2
釋放鎖
釋放鎖的邏輯和排他鎖是一致的
整個共享鎖的獲取和釋放流程如下圖:
排他鎖源碼解析
1)加鎖過程
public class WriteLock extends ProtocolSupport {public synchronized boolean lock() throws KeeperException, InterruptedException {if (isClosed()) {return false;}//確認持久父節(jié)點是否存在ensurePathExists(dir);//真正獲取鎖的邏輯 調(diào)用ProtocolSupport的retryOperation()方法return (Boolean) retryOperation(zop);}}
class ProtocolSupport {protected Object retryOperation(ZooKeeperOperation operation)throws KeeperException, InterruptedException {KeeperException exception = null;for (int i = 0; i < RETRY_COUNT; i++) {try {//調(diào)用LockZooKeeperOperation的execute()方法return operation.execute();} catch (KeeperException.SessionExpiredException e) {LOG.warn("Session expired {}. Reconnecting...", zookeeper, e);throw e;} catch (KeeperException.ConnectionLossException e) {if (exception == null) {exception = e;}LOG.debug("Attempt {} failed with connection loss. Reconnecting...", i);retryDelay(i);}}throw exception;}
public class WriteLock extends ProtocolSupport {private class LockZooKeeperOperation implements ZooKeeperOperation {private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)throws KeeperException, InterruptedException {List<String> names = zookeeper.getChildren(dir, false);for (String name : names) {if (name.startsWith(prefix)) {id = name;LOG.debug("Found id created last time: {}", id);break;}}if (id == null) {id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL);LOG.debug("Created id: {}", id);}}@SuppressFBWarnings(value = "NP_NULL_PARAM_DEREF_NONVIRTUAL",justification = "findPrefixInChildren will assign a value to this.id")public boolean execute() throws KeeperException, InterruptedException {do {if (id == null) {long sessionId = zookeeper.getSessionId();String prefix = "x-" + sessionId + "-";//創(chuàng)建臨時順序節(jié)點findPrefixInChildren(prefix, zookeeper, dir);idName = new ZNodeName(id);}//獲取所有子節(jié)點List<String> names = zookeeper.getChildren(dir, false);if (names.isEmpty()) {LOG.warn("No children in: {} when we've just created one! Lets recreate it...", dir);id = null;} else {//對所有子節(jié)點進行排序SortedSet<ZNodeName> sortedNames = new TreeSet<>();for (String name : names) {sortedNames.add(new ZNodeName(dir + "/" + name));}ownerId = sortedNames.first().getName();SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);//是否存在序號比自己小的節(jié)點if (!lessThanMe.isEmpty()) {ZNodeName lastChildName = lessThanMe.last();lastChildId = lastChildName.getName();LOG.debug("Watching less than me node: {}", lastChildId);//有序號比自己小的節(jié)點,則調(diào)用exist()向前一個節(jié)點注冊watcherStat stat = zookeeper.exists(lastChildId, new LockWatcher());if (stat != null) {return Boolean.FALSE;} else {LOG.warn("Could not find the stats for less than me: {}", lastChildName.getName());}} //沒有序號比自己小的節(jié)點,則獲取鎖else {if (isOwner()) {LockListener lockListener = getLockListener();if (lockListener != null) {lockListener.lockAcquired();}return Boolean.TRUE;}}}}while (id == null);return Boolean.FALSE;}
2)解鎖過程
public class WriteLock extends ProtocolSupport {public synchronized void unlock() throws RuntimeException {if (!isClosed() && id != null) {try {//刪除當前節(jié)點,此時會觸發(fā)后一個節(jié)點的watcherZooKeeperOperation zopdel = () -> {zookeeper.delete(id, -1);return Boolean.TRUE;};zopdel.execute();} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);Thread.currentThread().interrupt();} catch (KeeperException.NoNodeException e) {} catch (KeeperException e) {LOG.warn("Unexpected exception", e);throw new RuntimeException(e.getMessage(), e);} finally {LockListener lockListener = getLockListener();if (lockListener != null) {lockListener.lockReleased();}id = null;}}}
}
參考:漫畫:如何用Zookeeper實現(xiàn)分布式鎖?
3、選舉
使用臨時順序znode來表示選舉請求,創(chuàng)建最小后綴數(shù)字znode的選舉請求成功。在協(xié)同設計上和分布式鎖是一樣的,不同之處在于具體實現(xiàn)。不同于分布式鎖,選舉的具體實現(xiàn)對選舉的各個階段做了細致的監(jiān)控
public class LeaderElectionSupport implements Watcher { public synchronized void start() {state = State.START;dispatchEvent(EventType.START);LOG.info("Starting leader election support");if (zooKeeper == null) {throw new IllegalStateException("No instance of zookeeper provided. Hint: use setZooKeeper()");}if (hostName == null) {throw new IllegalStateException("No hostname provided. Hint: use setHostName()");}try {//發(fā)起選舉請求 創(chuàng)建臨時順序節(jié)點makeOffer();//選舉請求是否被滿足determineElectionStatus();} catch (KeeperException | InterruptedException e) {becomeFailed(e);}}private void makeOffer() throws KeeperException, InterruptedException {state = State.OFFER;dispatchEvent(EventType.OFFER_START);LeaderOffer newLeaderOffer = new LeaderOffer();byte[] hostnameBytes;synchronized (this) {newLeaderOffer.setHostName(hostName);hostnameBytes = hostName.getBytes();newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL));leaderOffer = newLeaderOffer;}LOG.debug("Created leader offer {}", leaderOffer);dispatchEvent(EventType.OFFER_COMPLETE);}private void determineElectionStatus() throws KeeperException, InterruptedException {state = State.DETERMINE;dispatchEvent(EventType.DETERMINE_START);LeaderOffer currentLeaderOffer = getLeaderOffer();String[] components = currentLeaderOffer.getNodePath().split("/");currentLeaderOffer.setId(Integer.valueOf(components[components.length - 1].substring("n_".length())));//獲取所有子節(jié)點并排序List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(rootNodeName, false));for (int i = 0; i < leaderOffers.size(); i++) {LeaderOffer leaderOffer = leaderOffers.get(i);if (leaderOffer.getId().equals(currentLeaderOffer.getId())) {LOG.debug("There are {} leader offers. I am {} in line.", leaderOffers.size(), i);dispatchEvent(EventType.DETERMINE_COMPLETE); //如果當前節(jié)點是第一個,則成為Leaderif (i == 0) {becomeLeader();} //如果有選舉請求在當前節(jié)點前面,則進行等待,調(diào)用exist()向前一個節(jié)點注冊watcherelse {becomeReady(leaderOffers.get(i - 1));}break;}}}
總結
提示:這里對文章進行總結:
例如:以上就是今天要講的內(nèi)容,本文僅僅簡單介紹了pandas的使用,而pandas提供了大量能使我們快速便捷地處理數(shù)據(jù)的函數(shù)和方法。