柚子快報邀請碼778899分享:zookeeper之分布式隊列
柚子快報邀請碼778899分享:zookeeper之分布式隊列
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
簡單的分布式隊列 @author jerome_s@qq.com @date 2016/8/30 20:19
*/
public class DistributedSimpleQueue {
protected final ZkClient zkClient;
/**
根節(jié)點路徑
*/
protected final String root;
/**
順序節(jié)點的前綴
*/
protected static final String Node_NAME = “qn_”;
public DistributedSimpleQueue(ZkClient zkClient, String root) {
this.zkClient = zkClient;
this.root = root;
}
/**
獲取隊列的大小 @return
*/
public int getQueueSize() {
return zkClient.getChildren(root).size();
}
/**
向隊列提交數(shù)據(jù) @param element 提交的數(shù)據(jù) @return @throws Exception
*/
public boolean offer(T element) throws Exception {
String nodeFullPath = root.concat(“/”).concat(Node_NAME);
try {
// 創(chuàng)建持久的順序節(jié)點
zkClient.createPersistentSequential(nodeFullPath, element);
} catch (ZkNoNodeException e) {
zkClient.createPersistent(root);
offer(element);
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
return true;
}
public List element() {
List list = zkClient.getChildren(root);
// 排序隊列 根據(jù)名稱由小到大
Collections.sort(list, new Comparator() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
}
});
List res = new ArrayList<>();
try {
for (String nodeName : list) {
String nodeFullPath = root.concat(“/”).concat(nodeName);
res.add((T)zkClient.readData(nodeFullPath));
}
} catch (ZkNoNodeException e) {
// 其他客戶端消費了 繼續(xù)循環(huán)
}
return res;
}
/**
從隊列獲取數(shù)據(jù) @return @throws Exception
*/
public T poll() throws Exception {
try {
List list = zkClient.getChildren(root);
if (list.size() == 0) {
return null;
}
// 排序隊列 根據(jù)名稱由小到大
Collections.sort(list, new Comparator() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
}
});
for (String nodeName : list) {
String nodeFullPath = root.concat(“/”).concat(nodeName);
try {
T node = (T) zkClient.readData(nodeFullPath);
zkClient.delete(nodeFullPath);
return node;
} catch (ZkNoNodeException e) {
// 其他客戶端消費了 繼續(xù)循環(huán)
}
}
return null;
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
}
private String getNodeNumber(String str, String nodeName) {
int index = str.lastIndexOf(nodeName);
if (index >= 0) {
index += Node_NAME.length();
return index <= str.length() ? str.substring(index) : “”;
}
return str;
}
}
測試用例:
package com.queue;
import com.queue.model.User;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.util.List;
/**
測試簡單的分布式隊列 @author jerome_s@qq.com @date 2016/8/30 19:48
*/
public class TestDistributedSimpleQueue {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(“192.168.40.138:2181”, 10000, 500000, new SerializableSerializer());
DistributedSimpleQueue queue = new DistributedSimpleQueue<>(zkClient, “/queue”);
User user1 = new User();
user1.setId(“1”);
user1.setName(“jerome1”);
User user2 = new User();
user2.setId(“2”);
user2.setName(“jerome2”);
try {
queue.offer(user1);
queue.offer(user2);
System.out.println(“queue.offer end!”);
List element = queue.element();
System.out.println(element.toString());
User u1 = queue.poll();
User u2 = queue.poll();
System.out.println("queue.poll() u1 = " + u1.toString());
System.out.println("queue.poll() u2 = " + u2.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
/* console:
queue.offer end!
queue.poll() u1 = User{name=‘jerome1’, id=‘1’}
queue.poll() u2 = User{name=‘jerome2’, id=‘2’}
*/
} 運行結(jié)果:
寫在最后
還有一份JAVA核心知識點整理(PDF):JVM,JAVA集合,JAVA多線程并發(fā),JAVA基礎(chǔ),Spring原理,微服務(wù),Netty與RPC,網(wǎng)絡(luò),日志,Zookeeper,Kafka,RabbitMQ,Hbase,MongoDB,Cassandra,設(shè)計模式,負載均衡,數(shù)據(jù)庫,一致性哈希,JAVA算法,數(shù)據(jù)結(jié)構(gòu),加密算法,分布式緩存,Hadoop,Spark,Storm,YARN,機器學(xué)習(xí),云計算…
加入社區(qū):https://bbs.csdn.net/forums/4304bb5a486d4c3ab8389e65ecb71ac0 ps://img-blog.csdnimg.cn/20200329203257407.png)
寫在最后
還有一份JAVA核心知識點整理(PDF):JVM,JAVA集合,JAVA多線程并發(fā),JAVA基礎(chǔ),Spring原理,微服務(wù),Netty與RPC,網(wǎng)絡(luò),日志,Zookeeper,Kafka,RabbitMQ,Hbase,MongoDB,Cassandra,設(shè)計模式,負載均衡,數(shù)據(jù)庫,一致性哈希,JAVA算法,數(shù)據(jù)結(jié)構(gòu),加密算法,分布式緩存,Hadoop,Spark,Storm,YARN,機器學(xué)習(xí),云計算…
[外鏈圖片轉(zhuǎn)存中…(img-Tq1KEQYl-1725749210915)]
加入社區(qū):https://bbs.csdn.net/forums/4304bb5a486d4c3ab8389e65ecb71ac0
柚子快報邀請碼778899分享:zookeeper之分布式隊列
精彩鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。