網(wǎng)頁設(shè)計(jì)和網(wǎng)站開發(fā)有什么區(qū)別百度競價(jià)是什么工作
文章目錄
- ZooKeeper 實(shí)戰(zhàn)(四) Curator Watch事件監(jiān)聽
- 0.前言
- 1.Watch 事件監(jiān)聽概念
- 2.NodeCache
- 2.1.全參構(gòu)造器參數(shù)
- 2.2.代碼DEMO
- 2.3.日志輸出
- 3.PathChildrenCache
- 3.1.全參構(gòu)造器參數(shù)
- 3.2.子節(jié)點(diǎn)監(jiān)聽時(shí)間類型
- 3.2.代碼DEMO
- 4.TreeCache
- 4.1.構(gòu)造器參數(shù)
- 4.2.代碼DEMO
- 4.3.日志輸出
ZooKeeper 實(shí)戰(zhàn)(四) Curator Watch事件監(jiān)聽
0.前言
上一篇博客只介紹了有關(guān)Curator中對ZNode的CRUD操作,從本篇起開始逐步介紹更加高級的API操作。
1.Watch 事件監(jiān)聽概念
ZooKeeper 中引入了Watcher機(jī)制來實(shí)現(xiàn)了發(fā)布/訂閱功能能,能夠讓多個(gè)訂閱者同時(shí)監(jiān)聽某一個(gè)對象,當(dāng)一個(gè)對象自身狀態(tài)變化時(shí),會通知所有訂閱者。雖然ZooKeeper原生支持通過注冊Watcher來進(jìn)行事件監(jiān)聽,但是其使用并不是特別方便,需要開發(fā)人員反復(fù)注冊Watcher,比較繁瑣。
而 Curator 引入了Cache 來實(shí)現(xiàn)對 ZooKeeper 服務(wù)端事件的監(jiān)聽。
Curator 中提供了三種 Cache(Watcher)來監(jiān)聽不同節(jié)點(diǎn)變化類型:
- NodeCache:監(jiān)聽指定的節(jié)點(diǎn)。
- PathChildrenCache:監(jiān)聽指定節(jié)點(diǎn)的子節(jié)點(diǎn)。
- TreeCache:監(jiān)聽指定節(jié)點(diǎn)及其子孫節(jié)點(diǎn)。
2.NodeCache
監(jiān)聽指定的節(jié)點(diǎn),增刪改都會監(jiān)聽。
2.1.全參構(gòu)造器參數(shù)
/*** @param: client 注冊監(jiān)聽的客戶端* @param: path 節(jié)點(diǎn)路徑* @param: dataIsCompressed 是否開啟數(shù)據(jù)壓縮。傳遞的數(shù)據(jù)會進(jìn)行壓縮,傳遞速度快,但取數(shù)據(jù)時(shí)需要把壓縮的數(shù)據(jù)進(jìn)行轉(zhuǎn)換,默認(rèn)為false*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);
2.2.代碼DEMO
@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完畢。。。。。。。。。。。。。。");String path = "/ahao/watcher";TimeUnit.SECONDS.sleep(3);// 創(chuàng)建NodeCache對象NodeCache nodeCache = new NodeCache(client,path);// 添加監(jiān)聽器nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData currentData = nodeCache.getCurrentData();if (currentData != null){String s = new String(currentData.getData(),StandardCharsets.UTF_8);log.info("監(jiān)聽{}節(jié)點(diǎn)發(fā)生變化,數(shù)據(jù)內(nèi)容:{}",path,s);}else {log.info("監(jiān)聽{}節(jié)點(diǎn)被刪除了",path);}}});// 開啟監(jiān)聽nodeCache.start();TimeUnit.SECONDS.sleep(2);// 創(chuàng)建節(jié)點(diǎn)client.create().creatingParentsIfNeeded().forPath(path,"第一次新增".getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 更新節(jié)點(diǎn)client.setData().forPath(path,"數(shù)據(jù)修改了".getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 刪除節(jié)點(diǎn)client.delete().deletingChildrenIfNeeded().forPath(path);}
2.3.日志輸出
3.PathChildrenCache
監(jiān)聽指定節(jié)點(diǎn)的子節(jié)點(diǎn)。當(dāng)一個(gè)子節(jié)點(diǎn)增刪改時(shí), PathChildrenCache會包含最新的子節(jié)點(diǎn)的數(shù)據(jù)和狀態(tài)。
3.1.全參構(gòu)造器參數(shù)
/*** @param: client 注冊監(jiān)聽的客戶端* @param: path 節(jié)點(diǎn)路徑* @param: cacheData 是否緩存節(jié)點(diǎn)內(nèi)容(包含節(jié)點(diǎn)狀態(tài))* @param: dataIsCompressed 是否開啟數(shù)據(jù)壓縮。傳遞的數(shù)據(jù)會進(jìn)行壓縮,傳遞速度快,但取數(shù)據(jù)時(shí)需要把壓縮的數(shù)據(jù)進(jìn)行轉(zhuǎn)換,默認(rèn)為false* @param: executorService 用于PathChildrenCache的后臺線程的線程池。該線程池應(yīng)該是單線程的,否則緩存可能會看到不一致的結(jié)果*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
3.2.子節(jié)點(diǎn)監(jiān)聽時(shí)間類型
public enum Type
{// 子節(jié)點(diǎn)添加CHILD_ADDED,// 子節(jié)點(diǎn)的數(shù)據(jù)變更CHILD_UPDATED,// 子節(jié)點(diǎn)被刪除CHILD_REMOVED,// 以下三個(gè)事件類型表示:當(dāng)連接斷開時(shí),PathChildrenCache將繼續(xù)保持其斷開連接之前的狀態(tài),并且在連接恢復(fù)后,PathChildrenCache將為斷開連接期間發(fā)生的所有添加、刪除和更新發(fā)出正常的子事件。// 當(dāng)連接狀態(tài)處于ConnectionState.SUSPENDED。CONNECTION_SUSPENDED,// 當(dāng)連接狀態(tài)處于ConnectionState.RECONNECTEDCONNECTION_RECONNECTED,// 當(dāng)連接狀態(tài)處于ConnectionState.LOSTCONNECTION_LOST,// 當(dāng)通過PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)啟動監(jiān)聽時(shí),該事件表示PathChildrenCache初始化完成This event signals that the initial cache has been populated.INITIALIZED
}
3.2.代碼DEMO
@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完畢。。。。。。。。。。。。。。");String path = "/ahao/watcher";TimeUnit.SECONDS.sleep(3);// 創(chuàng)建PathChildrenCache對象// 此處的cacheData參數(shù)一定要設(shè)置為true,不然Curator不會緩存數(shù)據(jù)當(dāng)本地,// 那么后續(xù)pathChildrenCache.getCurrentData()得到的數(shù)據(jù)都為nullPathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,true);// 添加監(jiān)聽器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED){log.info("PathChildrenCache初始化完,事件類型:{}", event.getType());}else {ChildData currentData = event.getData();log.info("事件類型:{},監(jiān)聽到的子節(jié)點(diǎn)發(fā)生變化:{}",event.getType(),currentData.getPath());}}});// 開啟監(jiān)聽pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);// 創(chuàng)建子節(jié)點(diǎn)TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path+"/c1");client.create().creatingParentsIfNeeded().forPath(path+"/c2");client.create().creatingParentsIfNeeded().forPath(path+"/c3/age");// 修改子節(jié)點(diǎn)TimeUnit.SECONDS.sleep(2);client.setData().forPath(path+"/c1","c1更新了".getBytes(StandardCharsets.UTF_8));client.setData().forPath(path+"/c2","c2更新了".getBytes(StandardCharsets.UTF_8));// 刪除子節(jié)點(diǎn)TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path+"/c3");}
3.3.日志輸出
可以看出,PathChildrenCache只會監(jiān)聽直屬子節(jié)點(diǎn)的變化,其非直屬子節(jié)點(diǎn)的后代節(jié)點(diǎn)如/c3/age,沒有發(fā)布通知。
4.TreeCache
監(jiān)聽指定節(jié)點(diǎn)及其子孫節(jié)點(diǎn)。
4.1.構(gòu)造器參數(shù)
/*** @param: client 注冊監(jiān)聽的客戶端* @param: path 節(jié)點(diǎn)路徑*/
public TreeCache(CuratorFramework client, String path)/*** @param: client 注冊監(jiān)聽的客戶端* @param: path 節(jié)點(diǎn)路徑* @param: cacheData 是否緩存節(jié)點(diǎn)內(nèi)容(包含節(jié)點(diǎn)狀態(tài))* @param: dataIsCompressed 是否開啟數(shù)據(jù)壓縮。傳遞的數(shù)據(jù)會進(jìn)行壓縮,傳遞速度快,但取數(shù)據(jù)時(shí)需要把壓縮的數(shù)據(jù)進(jìn)行轉(zhuǎn)換,默認(rèn)為false* @param: maxDepth 最大深度。最深的那個(gè)后代節(jié)點(diǎn)到path所需要經(jīng)過的節(jié)點(diǎn)數(shù)* @param: executorService 用于PathChildrenCache的后臺線程的線程池。該線程池應(yīng)該是單線程的,否則緩存可能會看到不一致的結(jié)果* @param: createParentNodes 是否需要創(chuàng)建父節(jié)點(diǎn)。如果父節(jié)點(diǎn)不存在澤創(chuàng)建父節(jié)點(diǎn)(容器節(jié)點(diǎn))* @param: TreeCacheSelector TreeCache選擇器。根據(jù)指定的策略和條件,選擇適合的緩存樹來創(chuàng)建和維護(hù)TreeCache*/
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)
4.2.代碼DEMO
@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完畢。。。。。。。。。。。。。。");String path = "/ahao/watcher/tree";TimeUnit.SECONDS.sleep(3);// 創(chuàng)建TreeCache對象,也可通過TreeCache.newBuilder()創(chuàng)建TreeCache treeCache = new TreeCache(client,path);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {if (event.getType() == TreeCacheEvent.Type.INITIALIZED){log.info("TreeCache初始化完,事件類型:{}", event.getType());}else {ChildData currentData = event.getData();log.info("事件類型:{},監(jiān)聽到的子節(jié)點(diǎn)發(fā)生變化:{}",event.getType(),currentData.getPath());}}});// 開啟監(jiān)聽treeCache.start();// 創(chuàng)建節(jié)點(diǎn)TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path);client.create().creatingParentsIfNeeded().forPath(path +"/t1");client.create().creatingParentsIfNeeded().forPath(path +"/t2/ccc");// 修改子節(jié)點(diǎn)TimeUnit.SECONDS.sleep(2);client.setData().forPath(path,"根節(jié)點(diǎn)更新了".getBytes(StandardCharsets.UTF_8));client.setData().forPath(path +"/t2/ccc","/t2/ccc更新了".getBytes(StandardCharsets.UTF_8));// 刪除子節(jié)點(diǎn)TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path +"/t2");}
4.3.日志輸出
可以看出TreeCache會監(jiān)聽當(dāng)前節(jié)點(diǎn)和后代節(jié)點(diǎn)的變化。