柚子快報激活碼778899分享:分布式 ZooKeeper
柚子快報激活碼778899分享:分布式 ZooKeeper
命令操作
啟動 、停止、查看狀態(tài)
./zkServer.sh start
./zkServer.sh stop
./zkServer.sh status
連接客戶端
./zkCli.sh -server localhost:2181
節(jié)點命令
javaAPI操作
引入依賴
API操作
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
public class CuratorTest {
private static RetryPolicy retry ;
private static CuratorFramework client;
/**
* 創(chuàng)建
*/
@Test
void create() throws Exception {
String path = client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/xyx4/sd1", "china".getBytes());
System.out.println("path = " + path);
}
/**
* 查詢
*/
@Test
void query() throws Exception {
// 查詢數(shù)據(jù) get
byte[] bytes = client.getData().forPath("/xyx4/sd");
System.out.println(new String(bytes));
// 查詢子節(jié)點 ls
List
// 查詢子節(jié)點狀態(tài)信息 ls -s
Stat status = new Stat(); //節(jié)點信息會放在這個對象里
client.getData().storingStatIn(status).forPath("/xyx");
}
/**
* 修改數(shù)據(jù)
*/
@Test
void set() throws Exception {
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/xyx4/sd");
int version = status.getVersion();
client.setData().withVersion(version).forPath("/xyx4/sd", "zhouyu3".getBytes());
}
/**
* 刪除
*/
@Test
void delete() throws Exception{
// 1.刪除單個節(jié)點
client.delete().forPath("/xyx4/sd1");
// 2.刪除帶有子節(jié)點的節(jié)點
client.delete().deletingChildrenIfNeeded().forPath("/xyx4/sd");
// 3.必須成功的刪除
client.delete().guaranteed().forPath("/xyx3");
// 4.回調(diào)
client.delete().guaranteed().inBackground((q1,q2) -> {
// 回調(diào)函數(shù) 執(zhí)行刪除后自動執(zhí)行
}).forPath("/xyx3");
}
@BeforeEach
void setUp() {
retry = new ExponentialBackoffRetry(3000, 10);
client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.namespace("zyw")
.build();
client.start();
}
@AfterEach
void tearDown() {
if (client != null) {
client.close();
}
}
}
Watch監(jiān)聽
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class CuratorTest1 {
private static RetryPolicy retry;
private static CuratorFramework client;
/**
* 給指定一個節(jié)點注冊監(jiān)聽器
*/
@Test
void testNode() throws Exception {
// 1.創(chuàng)建NodeCache對象
NodeCache nodeCache = new NodeCache(client, "/");
nodeCache.getListenable().addListener(() -> {
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
});
// 3.開啟監(jiān)聽,如果設(shè)置為true,則開啟監(jiān)聽是,加載緩沖數(shù)據(jù)
nodeCache.start(true);
while (true);
}
/**
* 監(jiān)聽某個節(jié)點的所有子節(jié)點,不感知自己的變化
*/
@Test
void testChildren() throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/", true);
pathChildrenCache.getListenable().addListener((client, event) -> {
System.out.println("子節(jié)點變化了");
// 獲取類型
PathChildrenCacheEvent.Type type = event.getType();
if (PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(type)) {
// 變更后的數(shù)據(jù)
byte[] data = event.getData().getData();
}
});
pathChildrenCache.start();
while (true);
}
/**
* 監(jiān)聽某個節(jié)點自己和所有子節(jié)點
*/
@Test
void testTreeCache() throws Exception{
TreeCache treeCache = new TreeCache(client, "/");
treeCache.getListenable().addListener((client, event) -> {
// TODO
});
treeCache.start();
while (true);
}
@BeforeEach
void setUp() {
retry = new ExponentialBackoffRetry(3000, 10);
client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.namespace("zyw")
.build();
client.start();
}
@AfterEach
void tearDown() {
if (client != null) {
client.close();
}
}
}
ZooKeeper分布式鎖
原理
Curator的五種鎖方案
InterProcessMutex 演示
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class TicketsSell implements Runnable {
private static int tickets = 50;
private InterProcessMutex lock;
public TicketsSell() {
RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry)
.build();
client.start();
lock = new InterProcessMutex(client, "/lock");
}
@Override
public void run() {
while (true) {
try {
// 獲取鎖
boolean isLock = lock.acquire(300, TimeUnit.SECONDS);
if (isLock) {
if (tickets > 0) {
System.out.println(Thread.currentThread().getName() + ": " + tickets);
tickets--;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 釋放鎖
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
ZooKeeper集群
介紹
例如集群有5臺zookeeper服務(wù), 按順序啟動, 3號超過半數(shù),3號就是Leader, 4和5號啟動也不會變成Leader
3個服務(wù)的集群, 如果2個follower掛了, leader雖然沒有掛, 但也無法對外提供服務(wù).
搭建
集群搭建指南
柚子快報激活碼778899分享:分布式 ZooKeeper
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。