wordpress下載代碼刷關(guān)鍵詞排名seo軟件軟件
分布式系統(tǒng)中某些節(jié)點(diǎn)任務(wù)當(dāng)滿足某個(gè)條件時(shí)才允許繼續(xù)運(yùn)行,如果不滿足則當(dāng)前節(jié)點(diǎn)需要等待。這個(gè)時(shí)候就需要一個(gè)屏障來阻止節(jié)點(diǎn)的處理。ZooKeeper Barrier是ZooKeeper提供的一種用于分布式環(huán)境中實(shí)現(xiàn)同步和協(xié)調(diào)的機(jī)制。具體邏輯就是:
1、檢測某個(gè)barrier node是否存在
2、如果屏障節(jié)點(diǎn)不存在,則屏障不存在可以繼續(xù)執(zhí)行
3、如果屏障存在,則需要watcher屏障節(jié)點(diǎn)的刪除事件,當(dāng)屏障節(jié)點(diǎn)刪除當(dāng)前程序才可繼續(xù),刪除之前當(dāng)前程序一直阻塞等待。
這里使用Curator框架API寫一個(gè)簡單的樣例程序:
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
String path = "/barrier";
//創(chuàng)建屏障節(jié)點(diǎn)
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
//另一個(gè)業(yè)務(wù)執(zhí)行邏輯,完成后刪除屏障
new Thread(()->{try {Thread.sleep(2000);client.delete().forPath(path);} catch (Exception e) {e.printStackTrace();}
}).start();
Object obj = new Object();
//主線程wathcer方式監(jiān)聽屏障刪除事件
Stat stat = client.checkExists().usingWatcher(new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted) {System.out.println("節(jié)點(diǎn)刪除");//節(jié)點(diǎn)刪除 通知主線程synchronized (obj){obj.notifyAll();}}}
}).forPath(path);
//主線程同步等待屏障刪除
synchronized (obj){obj.wait();
}System.out.println("barrier delete");
這里最后主線程會等待另一個(gè)線程執(zhí)行完成才繼續(xù)。這里在同一個(gè)程序里模擬,同時(shí)運(yùn)行兩個(gè)程序可能更直觀。
另外Curator的recipes也提供的對barrier的封裝:DistributedBarrier類。
使用例子:
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1));
try {client.start();final DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");//添加屏障barrier.setBarrier();new Thread(()->{try {Thread.sleep(2000);//屏障移除barrier.removeBarrier();}catch (Exception e) {e.printStackTrace();}}).start();//阻塞等待屏障移除barrier.waitOnBarrier(10, TimeUnit.SECONDS);System.out.println("end");
} catch (Exception e) {e.printStackTrace();
} finally {client.close();
}
其內(nèi)部阻塞原理還是使用的基礎(chǔ)的wait,notify機(jī)制。這里封裝方法可以設(shè)置等待時(shí)間。
Double Barriers
double barriers即多屏障。在某些情況下需要多個(gè)條件同時(shí)滿足程序才可以繼續(xù)。比如批量任務(wù),并行分成5個(gè)線程任務(wù)去做同一階段工作。5個(gè)線程都執(zhí)行完成才可進(jìn)入下一階段。
還是使用Curator封裝的DistributedDoubleBarrier來演示:
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1));
client.start();
ExecutorService executor = Executors.newFixedThreadPool(5);
int quantity = 5;//屏障數(shù)量
String barrierPath = "/barrier1";//屏障節(jié)點(diǎn)路徑
for (int i = 0; i < 5; i++) {executor.execute(()->{DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client,barrierPath,quantity);try {int time = 1000*new Random().nextInt(10);System.out.println(time+ " sleeps for enter,"+Thread.currentThread().getName());Thread.sleep(time);/**當(dāng)前參與者執(zhí)行完前置邏輯進(jìn)入屏障等待其它參與者到達(dá)(阻塞等待)當(dāng)所有參與者都到達(dá)屏障點(diǎn)后,屏障會通知所有參與者繼續(xù)執(zhí)行(解除阻塞)*/barrier.enter(10,TimeUnit.SECONDS);System.out.println(System.currentTimeMillis()+ " do sth,"+Thread.currentThread().getName());time = 1000*new Random().nextInt(10);System.out.println(time+ " sleeps for leave,"+Thread.currentThread().getName());Thread.sleep(time);/**完成同步操作后,調(diào)用leave()方法告知屏障該參與者已經(jīng)離開屏障點(diǎn),并等待其他參與者也離開當(dāng)所有參與者都離開屏障點(diǎn)后,屏障會通知所有參與者繼續(xù)執(zhí)行后續(xù)操作*/barrier.leave();//刪除屏障System.out.println(System.currentTimeMillis()+" leave,"+Thread.currentThread().getName());} catch (Exception e) {e.printStackTrace();}});
}executor.shutdown();
這里定義了5個(gè)屏障,所有的程序會在barrier.enter()處阻塞等待,直到所有的線程都執(zhí)行到該方法才會繼續(xù)執(zhí)行。這時(shí)候查看/barrier1節(jié)點(diǎn),會發(fā)現(xiàn)其下有5個(gè)uuid類型的子節(jié)點(diǎn)和一個(gè)ready節(jié)點(diǎn)。每個(gè)enter的線程創(chuàng)建了一個(gè)子節(jié)點(diǎn),barrier判斷達(dá)到屏障數(shù)量時(shí)自動創(chuàng)建一個(gè)ready節(jié)點(diǎn)。
[zk: localhost:2181(CONNECTED) 20] ls /barrier1
[0ffbe0f0-0bf6-4098-a494-912ce57d8f5f, 10e54092-9909-4a0a-a764-774e55584b1d, 4b72c6bb-bff4-4bf4-9762-dbb56daaaf87, 8d8ecafa-31a8-4799-8b49-35f61098a05f, cbe75b72-8fdb-48dd-9620-2d34b59d411e, ready]
后面leave()方法離開屏障點(diǎn)又是一個(gè)阻塞點(diǎn),屏障會等到所有的參與者都調(diào)用leave方法后才會通知所有參與者繼續(xù)執(zhí)行。最后 enter()->leave()方法之間是多個(gè)參與者同步操作業(yè)務(wù)邏輯。