欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:Zookeeper

柚子快報(bào)邀請(qǐng)碼778899分享:Zookeeper

http://yzkb.51969.com/

基本介紹

框架特征

Zookeeper 是 Apache Hadoop 項(xiàng)目子項(xiàng)目,為分布式框架提供協(xié)調(diào)服務(wù),是一個(gè)樹(shù)形目錄服務(wù) Zookeeper 是基于觀(guān)察者模式設(shè)計(jì)的分布式服務(wù)管理框架,負(fù)責(zé)存儲(chǔ)和管理共享數(shù)據(jù),接受觀(guān)察者的注冊(cè)監(jiān)控,一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper 會(huì)通知觀(guān)察者

Zookeeper 是一個(gè)領(lǐng)導(dǎo)者(Leader),多個(gè)跟隨者(Follower)組成的集群集群中只要有半數(shù)以上節(jié)點(diǎn)存活就能正常服務(wù),所以 Zookeeper 適合部署奇數(shù)臺(tái)服務(wù)器全局?jǐn)?shù)據(jù)一致,每個(gè) Server 保存一份相同的數(shù)據(jù)副本,Client 無(wú)論連接到哪個(gè) Server,數(shù)據(jù)都是一致更新的請(qǐng)求順序執(zhí)行,來(lái)自同一個(gè) Client 的請(qǐng)求按其發(fā)送順序依次執(zhí)行數(shù)據(jù)更新原子性,一次數(shù)據(jù)更新要么成功,要么失敗實(shí)時(shí)性,在一定的時(shí)間范圍內(nèi),Client 能讀到最新數(shù)據(jù)心跳檢測(cè),會(huì)定時(shí)向各個(gè)服務(wù)提供者發(fā)送一個(gè)請(qǐng)求(實(shí)際上建立的是一個(gè) Socket 長(zhǎng)連接) 參考視頻:【尚硅谷】大數(shù)據(jù)技術(shù)之Zookeeper 3.5.7版本教程_嗶哩嗶哩_bilibili

應(yīng)用場(chǎng)景

Zookeeper 提供的主要功能包括:統(tǒng)一命名服務(wù)、統(tǒng)一配置管理、統(tǒng)一集群管理、服務(wù)器節(jié)點(diǎn)動(dòng)態(tài)上下線(xiàn)、軟負(fù)載均衡、分布式鎖等

在分布式環(huán)境中,經(jīng)常對(duì)應(yīng)用/服務(wù)進(jìn)行統(tǒng)一命名,便于識(shí)別,例如域名相對(duì)于 IP 地址更容易被接收

/service/www.baidu.com # 節(jié)點(diǎn)路徑

192.168.1.1 192.168.1.2 # 節(jié)點(diǎn)值

如果在節(jié)點(diǎn)中記錄每臺(tái)服務(wù)器的訪(fǎng)問(wèn)數(shù),讓訪(fǎng)問(wèn)數(shù)最少的服務(wù)器去處理最新的客戶(hù)端請(qǐng)求,可以實(shí)現(xiàn)負(fù)載均衡

192.168.1.1 10 # 次數(shù)

192.168.1.1 15

配置文件同步可以通過(guò) Zookeeper 實(shí)現(xiàn),將配置信息寫(xiě)入某個(gè) ZNode,其他客戶(hù)端監(jiān)視該節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)數(shù)據(jù)被修改,通知各個(gè)客戶(hù)端服務(wù)器集群環(huán)境中,需要實(shí)時(shí)掌握每個(gè)集群節(jié)點(diǎn)的狀態(tài),可以將這些信息放入 ZNode,通過(guò)監(jiān)控通知的機(jī)制實(shí)現(xiàn)實(shí)現(xiàn)客戶(hù)端實(shí)時(shí)觀(guān)察服務(wù)器上下線(xiàn)的變化,通過(guò)心跳檢測(cè)實(shí)現(xiàn)

基本操作

安裝搭建

安裝步驟:

安裝 JDK拷貝 apache-zookeeper-3.5.7-bin.tar.gz 安裝包到 Linux 系統(tǒng)下,并解壓到指定目錄conf 目錄下的配置文件重命名:

mv zoo_sample.cfg zoo.cfg

修改配置文件:

vim zoo.cfg

# 修改內(nèi)容

dataDir=/home/seazean/SoftWare/zookeeper-3.5.7/zkData

在對(duì)應(yīng)目錄創(chuàng)建 zkData 文件夾:

mkdir zkData

Zookeeper 中的配置文件 zoo.cfg 中參數(shù)含義解讀:

tickTime = 2000:通信心跳時(shí)間,Zookeeper 服務(wù)器與客戶(hù)端心跳時(shí)間,單位毫秒initLimit = 10:Leader 與 Follower 初始通信時(shí)限,初始連接時(shí)能容忍的最多心跳次數(shù)syncLimit = 5:Leader 與 Follower 同步通信時(shí)限,LF 通信時(shí)間超過(guò) syncLimit * tickTime,Leader 認(rèn)為 Follwer 下線(xiàn)dataDir:保存 Zookeeper 中的數(shù)據(jù)目錄,默認(rèn)是 tmp目錄,容易被 Linux 系統(tǒng)定期刪除,所以建議修改clientPort = 2181:客戶(hù)端連接端口,通常不做修改

操作命令

服務(wù)端

Linux 命令:

啟動(dòng) ZooKeeper 服務(wù):./zkServer.sh start查看 ZooKeeper 服務(wù):./zkServer.sh status停止 ZooKeeper 服務(wù):./zkServer.sh stop重啟 ZooKeeper 服務(wù):./zkServer.sh restart查看進(jìn)程是否啟動(dòng):jps

客戶(hù)端

Linux 命令:

連接 ZooKeeper 服務(wù)端:

./zkCli.sh # 直接啟動(dòng)

./zkCli.sh –server ip:port # 指定 host 啟動(dòng)

客戶(hù)端命令:

基礎(chǔ)操作:

quit # 停止連接

help # 查看命令幫助

創(chuàng)建命令:/ 代表根目錄

create /path value # 創(chuàng)建節(jié)點(diǎn),value 可選

create -e /path value # 創(chuàng)建臨時(shí)節(jié)點(diǎn)

create -s /path value # 創(chuàng)建順序節(jié)點(diǎn)

create -es /path value # 創(chuàng)建臨時(shí)順序節(jié)點(diǎn),比如node10000012 刪除12后也會(huì)繼續(xù)從13開(kāi)始,只會(huì)增加

查詢(xún)命令:

ls /path # 顯示指定目錄下子節(jié)點(diǎn)

ls –s /path # 查詢(xún)節(jié)點(diǎn)詳細(xì)信息

ls –w /path # 監(jiān)聽(tīng)子節(jié)點(diǎn)數(shù)量的變化

stat /path # 查看節(jié)點(diǎn)狀態(tài)

get –s /path # 查詢(xún)節(jié)點(diǎn)詳細(xì)信息

get –w /path # 監(jiān)聽(tīng)節(jié)點(diǎn)數(shù)據(jù)的變化

# 屬性,分為當(dāng)前節(jié)點(diǎn)的屬性和子節(jié)點(diǎn)屬性

czxid: 節(jié)點(diǎn)被創(chuàng)建的事務(wù)ID, 是ZooKeeper中所有修改總的次序,每次修改都有唯一的 zxid,誰(shuí)小誰(shuí)先發(fā)生

ctime: 被創(chuàng)建的時(shí)間戳

mzxid: 最后一次被更新的事務(wù)ID

mtime: 最后修改的時(shí)間戳

pzxid: 子節(jié)點(diǎn)列表最后一次被更新的事務(wù)ID

cversion: 子節(jié)點(diǎn)的變化號(hào),修改次數(shù)

dataversion: 節(jié)點(diǎn)的數(shù)據(jù)變化號(hào),數(shù)據(jù)的變化次數(shù)

aclversion: 節(jié)點(diǎn)的訪(fǎng)問(wèn)控制列表變化號(hào)

ephemeralOwner: 用于臨時(shí)節(jié)點(diǎn),代表節(jié)點(diǎn)擁有者的 session id,如果為持久節(jié)點(diǎn)則為0

dataLength: 節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)的長(zhǎng)度

numChildren: 當(dāng)前節(jié)點(diǎn)的子節(jié)點(diǎn)數(shù)量

刪除命令:

delete /path # 刪除節(jié)點(diǎn)

deleteall /path # 遞歸刪除節(jié)點(diǎn)

數(shù)據(jù)結(jié)構(gòu)

ZooKeeper 是一個(gè)樹(shù)形目錄服務(wù),類(lèi)似 Unix 的文件系統(tǒng),每一個(gè)節(jié)點(diǎn)都被稱(chēng)為 ZNode,每個(gè) ZNode 默認(rèn)存儲(chǔ) 1MB 的數(shù)據(jù),節(jié)點(diǎn)上會(huì)保存數(shù)據(jù)和節(jié)點(diǎn)信息,每個(gè) ZNode 都可以通過(guò)其路徑唯一標(biāo)識(shí) 節(jié)點(diǎn)可以分為四大類(lèi):

PERSISTENT:持久化節(jié)點(diǎn)EPHEMERAL:臨時(shí)節(jié)點(diǎn),客戶(hù)端和服務(wù)器端斷開(kāi)連接后,創(chuàng)建的節(jié)點(diǎn)刪除PERSISTENT_SEQUENTIAL:持久化順序節(jié)點(diǎn),創(chuàng)建 znode 時(shí)設(shè)置順序標(biāo)識(shí),節(jié)點(diǎn)名稱(chēng)后會(huì)附加一個(gè)值,順序號(hào)是一個(gè)單調(diào)遞增的計(jì)數(shù)器,由父節(jié)點(diǎn)維護(hù)EPHEMERAL_SEQUENTIAL:臨時(shí)順序節(jié)點(diǎn) 注意:在分布式系統(tǒng)中,順序號(hào)可以被用于為所有的事件進(jìn)行全局排序,這樣客戶(hù)端可以通過(guò)順序號(hào)推斷事件的順序

代碼實(shí)現(xiàn)

添加 Maven 依賴(lài):

org.apache.zookeeper

zookeeper

3.5.7

實(shí)現(xiàn)代碼:

public static void main(String[] args) {

// 參數(shù)一:連接地址

// 參數(shù)二:會(huì)話(huà)超時(shí)時(shí)間

// 參數(shù)三:監(jiān)聽(tīng)器

ZooKeeper zkClient = new ZooKeeper("192.168.3.128:2181", 20000, new Watcher() {

@Override

public void process(WatchedEvent event) {

System.out.println("監(jiān)聽(tīng)處理函數(shù)");

}

});

}

集群介紹

相關(guān)概念

Zookeepe 集群三個(gè)角色:

Leader 領(lǐng)導(dǎo)者:處理客戶(hù)端事務(wù)請(qǐng)求,負(fù)責(zé)集群內(nèi)部各服務(wù)器的調(diào)度Follower 跟隨者:處理客戶(hù)端非事務(wù)請(qǐng)求,轉(zhuǎn)發(fā)事務(wù)請(qǐng)求給 Leader 服務(wù)器,參與 Leader 選舉投票O(jiān)bserver 觀(guān)察者:觀(guān)察集群的最新?tīng)顟B(tài)的變化,并將這些狀態(tài)進(jìn)行同步;處理非事務(wù)性請(qǐng)求,事務(wù)性請(qǐng)求會(huì)轉(zhuǎn)發(fā)給 Leader 服務(wù)器進(jìn)行處理;不會(huì)參與任何形式的投票。只提供非事務(wù)性的服務(wù),通常用于在不影響集群事務(wù)處理能力的前提下,提升集群的非事務(wù)處理能力(提高集群讀的能力,但是也降低了集群選主的復(fù)雜程度) 相關(guān)屬性:SID:服務(wù)器 ID,用來(lái)唯一標(biāo)識(shí)一臺(tái)集群中的機(jī)器,和 myid 一致ZXID:事務(wù) ID,用來(lái)標(biāo)識(shí)一次服務(wù)器狀態(tài)的變更,在某一時(shí)刻集群中每臺(tái)機(jī)器的 ZXID 值不一定完全一致,這和 ZooKeeper 服務(wù)器對(duì)于客戶(hù)端更新請(qǐng)求的處理邏輯有關(guān)Epoch:每個(gè) Leader 任期的代號(hào),同一輪選舉投票過(guò)程中的該值是相同的,投完一次票就增加 選舉機(jī)制:半數(shù)機(jī)制,超過(guò)半數(shù)的投票就通過(guò)第一次啟動(dòng)選舉規(guī)則:投票過(guò)半數(shù)時(shí),服務(wù)器 ID 大的勝出第二次啟動(dòng)選舉規(guī)則:

EPOCH 大的直接勝出EPOCH 相同,事務(wù) ID 大的勝出(事務(wù) ID 越大,數(shù)據(jù)越新)事務(wù) ID 相同,服務(wù)器 ID 大的勝出

初次選舉

選舉過(guò)程:

服務(wù)器 1 啟動(dòng),發(fā)起一次選舉,服務(wù)器 1 投自己一票,票數(shù)不超過(guò)半數(shù),選舉無(wú)法完成,服務(wù)器 1 狀態(tài)保持為 LOOKING服務(wù)器 2 啟動(dòng),再發(fā)起一次選舉,服務(wù)器 1 和 2 分別投自己一票并交換選票信息,此時(shí)服務(wù)器 1 會(huì)發(fā)現(xiàn)服務(wù)器 2 的 SID 比自己投票推舉的(服務(wù)器 1)大,更改選票為推舉服務(wù)器 2。投票結(jié)果為服務(wù)器 1 票數(shù) 0 票,服務(wù)器 2 票數(shù) 2 票,票數(shù)不超過(guò)半數(shù),選舉無(wú)法完成,服務(wù)器 1、2 狀態(tài)保持 LOOKING服務(wù)器 3 啟動(dòng),發(fā)起一次選舉,此時(shí)服務(wù)器 1 和 2 都會(huì)更改選票為服務(wù)器 3,投票結(jié)果為服務(wù)器 3 票數(shù) 3 票,此時(shí)服務(wù)器 3 的票數(shù)已經(jīng)超過(guò)半數(shù),服務(wù)器 3 當(dāng)選 Leader,服務(wù)器 1、2 更改狀態(tài)為 FOLLOWING,服務(wù)器 3 更改狀態(tài)為 LEADING服務(wù)器 4 啟動(dòng),發(fā)起一次選舉,此時(shí)服務(wù)器 1、2、3 已經(jīng)不是 LOOKING 狀態(tài),不會(huì)更改選票信息,交換選票信息結(jié)果后服務(wù)器 3 為 3 票,服務(wù)器 4 為 1 票,此時(shí)服務(wù)器 4 更改選票信息為服務(wù)器 3,并更改狀態(tài)為 FOLLOWING服務(wù)器 5 啟動(dòng),同 4 一樣

再次選舉

ZooKeeper 集群中的一臺(tái)服務(wù)器出現(xiàn)以下情況之一時(shí),就會(huì)開(kāi)始進(jìn)入 Leader 選舉:

服務(wù)器初始化啟動(dòng)服務(wù)器運(yùn)行期間無(wú)法和 Leader 保持連接 當(dāng)一臺(tái)服務(wù)器進(jìn)入 Leader 選舉流程時(shí),當(dāng)前集群可能會(huì)處于以下兩種狀態(tài):集群中本來(lái)就已經(jīng)存在一個(gè) Leader,服務(wù)器試圖去選舉 Leader 時(shí)會(huì)被告知當(dāng)前服務(wù)器的 Leader 信息,對(duì)于該服務(wù)器來(lái)說(shuō),只需要和 Leader 服務(wù)器建立連接,并進(jìn)行狀態(tài)同步即可集群中確實(shí)不存在 Leader,假設(shè)服務(wù)器 3 和 5 出現(xiàn)故障,開(kāi)始進(jìn)行 Leader 選舉,SID 為 1、2、4 的機(jī)器投票情況

(EPOCH,ZXID,SID): (1, 8, 1), (1, 8, 2), (1, 7, 4)

根據(jù)選舉規(guī)則,服務(wù)器 2 勝出

數(shù)據(jù)寫(xiě)入

寫(xiě)操作就是事務(wù)請(qǐng)求,寫(xiě)入請(qǐng)求直接發(fā)送給 Leader 節(jié)點(diǎn):Leader 會(huì)先將數(shù)據(jù)寫(xiě)入自身,同時(shí)通知其他 Follower 寫(xiě)入,當(dāng)集群中有半數(shù)以上節(jié)點(diǎn)寫(xiě)入完成,Leader 節(jié)點(diǎn)就會(huì)響應(yīng)客戶(hù)端數(shù)據(jù)寫(xiě)入完成

寫(xiě)入請(qǐng)求直接發(fā)送給 Follower 節(jié)點(diǎn):Follower 沒(méi)有寫(xiě)入權(quán)限,會(huì)將寫(xiě)請(qǐng)求轉(zhuǎn)發(fā)給 Leader,Leader 將數(shù)據(jù)寫(xiě)入自身,通知其他 Follower 寫(xiě)入,當(dāng)集群中有半數(shù)以上節(jié)點(diǎn)寫(xiě)入完成,Leader 會(huì)通知 Follower 寫(xiě)入完成,由 Follower 響應(yīng)客戶(hù)端數(shù)據(jù)寫(xiě)入完成

底層協(xié)議

Paxos

Paxos 算法:基于消息傳遞且具有高度容錯(cuò)特性的一致性算法 優(yōu)點(diǎn):快速正確的在一個(gè)分布式系統(tǒng)中對(duì)某個(gè)數(shù)據(jù)值達(dá)成一致,并且保證不論發(fā)生任何異常,都不會(huì)破壞整個(gè)系統(tǒng)的一致性 缺陷:在網(wǎng)絡(luò)復(fù)雜的情況下,可能很久無(wú)法收斂,甚至陷入活鎖的情況

ZAB

算法介紹

ZAB 協(xié)議借鑒了 Paxos 算法,是為 Zookeeper 設(shè)計(jì)的支持崩潰恢復(fù)的原子廣播協(xié)議,基于該協(xié)議 Zookeeper 設(shè)計(jì)為只有一臺(tái)客戶(hù)端(Leader)負(fù)責(zé)處理外部的寫(xiě)事務(wù)請(qǐng)求,然后 Leader 將數(shù)據(jù)同步到其他 Follower 節(jié)點(diǎn) Zab 協(xié)議包括兩種基本的模式:消息廣播、崩潰恢復(fù)

消息廣播

ZAB 協(xié)議針對(duì)事務(wù)請(qǐng)求的處理過(guò)程類(lèi)似于一個(gè)兩階段提交過(guò)程:廣播事務(wù)階段、廣播提交操作

客戶(hù)端發(fā)起寫(xiě)操作請(qǐng)求,Leader 服務(wù)器將請(qǐng)求轉(zhuǎn)化為事務(wù) Proposal 提案,同時(shí)為 Proposal 分配一個(gè)全局的 ID,即 ZXIDLeader 服務(wù)器為每個(gè) Follower 分配一個(gè)單獨(dú)的隊(duì)列,將廣播的 Proposal 依次放到隊(duì)列中去,根據(jù) FIFO 策略進(jìn)行消息發(fā)送Follower 接收到 Proposal 后,將其以事務(wù)日志的方式寫(xiě)入本地磁盤(pán)中,寫(xiě)入成功后向 Leader 反饋一個(gè) ACK 響應(yīng)消息Leader 接收到超過(guò)半數(shù)以上 Follower 的 ACK 響應(yīng)消息后,即認(rèn)為消息發(fā)送成功,可以發(fā)送 Commit 消息Leader 向所有 Follower 廣播 commit 消息,同時(shí)自身也會(huì)完成事務(wù)提交,F(xiàn)ollower 接收到 Commit 后,將上一條事務(wù)提交 兩階段提交模型可能因?yàn)?Leader 宕機(jī)帶來(lái)數(shù)據(jù)不一致:Leader 發(fā)起一個(gè)事務(wù) Proposal 后就宕機(jī),F(xiàn)ollower 都沒(méi)有 ProposalLeader 收到半數(shù) ACK 宕機(jī),沒(méi)來(lái)得及向 Follower 發(fā)送 Commit

崩潰恢復(fù)

Leader 服務(wù)器出現(xiàn)崩潰或者由于網(wǎng)絡(luò)原因?qū)е?Leader 服務(wù)器失去了與過(guò)半 Follower的聯(lián)系,那么就會(huì)進(jìn)入崩潰恢復(fù)模式,崩潰恢復(fù)主要包括兩部分:Leader 選舉和數(shù)據(jù)恢復(fù) Zab 協(xié)議崩潰恢復(fù)要求滿(mǎn)足以下兩個(gè)要求:

已經(jīng)被 Leader 提交的提案 Proposal,必須最終被所有的 Follower 服務(wù)器正確提交丟棄已經(jīng)被 Leader 提出的,但是沒(méi)有被提交的 Proposal Zab 協(xié)議需要保證選舉出來(lái)的 Leader 需要滿(mǎn)足以下條件:新選舉的 Leader 不能包含未提交的 Proposal,即新 Leader 必須都是已經(jīng)提交了 Proposal 的 Follower 節(jié)點(diǎn)新選舉的 Leader 節(jié)點(diǎn)含有最大的 ZXID,可以避免 Leader 服務(wù)器檢查 Proposal 的提交和丟棄工作 數(shù)據(jù)恢復(fù)階段:完成 Leader 選舉后,在正式開(kāi)始工作之前(接收事務(wù)請(qǐng)求提出新的 Proposal),Leader 服務(wù)器會(huì)首先確認(rèn)事務(wù)日志中的所有 Proposal 是否已經(jīng)被集群中過(guò)半的服務(wù)器 CommitLeader 服務(wù)器需要確保所有的 Follower 服務(wù)器能夠接收到每一條事務(wù)的 Proposal,并且能將所有已經(jīng)提交的事務(wù) Proposal 應(yīng)用到內(nèi)存數(shù)據(jù)中,所以只有當(dāng) Follower 將所有尚未同步的事務(wù) Proposal 都從 Leader 服務(wù)器上同步,并且應(yīng)用到內(nèi)存數(shù)據(jù)后,Leader 才會(huì)把該 Follower 加入到真正可用的 Follower 列表中

異常處理

Zab 的事務(wù)編號(hào) zxid 設(shè)計(jì):

zxid 是一個(gè) 64 位的數(shù)字,低 32 位是一個(gè)簡(jiǎn)單的單增計(jì)數(shù)器,針對(duì)客戶(hù)端每一個(gè)事務(wù)請(qǐng)求,Leader 在產(chǎn)生新的 Proposal 事務(wù)時(shí),都會(huì)對(duì)該計(jì)數(shù)器加 1,而高 32 位則代表了 Leader 周期的 epoch 編號(hào)epoch 為當(dāng)前集群所處的代或者周期,每次 Leader 變更后都會(huì)在 epoch 的基礎(chǔ)上加 1,F(xiàn)ollower 只服從 epoch 最高的 Leader 命令,所以舊的 Leader 崩潰恢復(fù)之后,其他 Follower 就不會(huì)繼續(xù)追隨每次選舉產(chǎn)生一個(gè)新的 Leader,就會(huì)從新 Leader 服務(wù)器上取出本地事務(wù)日志中最大編號(hào) Proposal 的 zxid,從 zxid 中解析得到對(duì)應(yīng)的 epoch 編號(hào),然后再對(duì)其加 1 后作為新的 epoch 值,并將低 32 位數(shù)字歸零,由 0 開(kāi)始重新生成 zxid Zab 協(xié)議通過(guò) epoch 編號(hào)來(lái)區(qū)分 Leader 變化周期,能夠有效避免不同的 Leader 錯(cuò)誤的使用了相同的 zxid 編號(hào)提出了不一樣的 Proposal 的異常情況 Zab 數(shù)據(jù)同步過(guò)程:數(shù)據(jù)同步階段要以 Leader 服務(wù)器為準(zhǔn)一個(gè)包含了上個(gè) Leader 周期中尚未提交過(guò)的事務(wù) Proposal 的服務(wù)器啟動(dòng)時(shí),這臺(tái)機(jī)器加入集群中會(huì)以 Follower 角色連上 LeaderLeader 會(huì)根據(jù)自己服務(wù)器上最后提交的 Proposal 和 Follower 服務(wù)器的 Proposal 進(jìn)行比對(duì),讓 Follower 進(jìn)行一個(gè)回退或者前進(jìn)操作,到一個(gè)已經(jīng)被集群中過(guò)半機(jī)器 Commit 的最新 Proposal(源碼解析部分詳解)

CAP

CAP 理論指的是在一個(gè)分布式系統(tǒng)中,Consistency(一致性)、Availability(可用性)、Partition Tolerance(分區(qū)容錯(cuò)性)不能同時(shí)成立,ZooKeeper 保證的是 CP

ZooKeeper 不能保證每次服務(wù)請(qǐng)求的可用性,在極端環(huán)境下可能會(huì)丟棄一些請(qǐng)求,消費(fèi)者程序需要重新請(qǐng)求才能獲得結(jié)果進(jìn)行 Leader 選舉時(shí)集群都是不可用 CAP 三個(gè)基本需求,因?yàn)?P 是必須的,因此分布式系統(tǒng)選擇就在 CP 或者 AP 中:一致性:指數(shù)據(jù)在多個(gè)副本之間是否能夠保持?jǐn)?shù)據(jù)一致的特性,當(dāng)一個(gè)系統(tǒng)在數(shù)據(jù)一致的狀態(tài)下執(zhí)行更新操作后,也能保證系統(tǒng)的數(shù)據(jù)仍然處于一致的狀態(tài)可用性:指系統(tǒng)提供的服務(wù)必須一直處于可用的狀態(tài),即使集群中一部分節(jié)點(diǎn)故障,對(duì)于用戶(hù)的每一個(gè)操作請(qǐng)求總是能夠在有限的時(shí)間內(nèi)返回結(jié)果分區(qū)容錯(cuò)性:分布式系統(tǒng)在遇到任何網(wǎng)絡(luò)分區(qū)故障時(shí),仍然能夠保證對(duì)外提供服務(wù),不會(huì)宕機(jī),除非是整個(gè)網(wǎng)絡(luò)環(huán)境都發(fā)生了故障

監(jiān)聽(tīng)機(jī)制

實(shí)現(xiàn)原理

ZooKeeper 中引入了 Watcher 機(jī)制來(lái)實(shí)現(xiàn)了發(fā)布/訂閱功能,客戶(hù)端注冊(cè)監(jiān)聽(tīng)目錄節(jié)點(diǎn),在特定事件觸發(fā)時(shí),ZooKeeper 會(huì)通知所有關(guān)注該事件的客戶(hù)端,保證 ZooKeeper 保存的任何的數(shù)據(jù)的任何改變都能快速的響應(yīng)到監(jiān)聽(tīng)?wèi)?yīng)用程序 監(jiān)聽(tīng)命令:只能生效一次,接收一次通知,再次監(jiān)聽(tīng)需要重新注冊(cè)

ls –w /path # 監(jiān)聽(tīng)【子節(jié)點(diǎn)數(shù)量】的變化

get –w /path # 監(jiān)聽(tīng)【節(jié)點(diǎn)數(shù)據(jù)】的變化

工作流程:

在主線(xiàn)程中創(chuàng)建 Zookeeper 客戶(hù)端,這時(shí)就會(huì)創(chuàng)建兩個(gè)線(xiàn)程,一個(gè)負(fù)責(zé)網(wǎng)絡(luò)連接通信(connet),一個(gè)負(fù)責(zé)監(jiān)聽(tīng)(listener)通過(guò) connect 線(xiàn)程將注冊(cè)的監(jiān)聽(tīng)事件發(fā)送給 Zookeeper在 Zookeeper 的注冊(cè)監(jiān)聽(tīng)器列表中將注冊(cè)的監(jiān)聽(tīng)事件添加到列表中Zookeeper 監(jiān)聽(tīng)到有數(shù)據(jù)或路徑變化,將消息發(fā)送給 listener 線(xiàn)程listener 線(xiàn)程內(nèi)部調(diào)用 process() 方法 Curator 框架引入了 Cache 來(lái)實(shí)現(xiàn)對(duì) ZooKeeper 服務(wù)端事件的監(jiān)聽(tīng),三種 Watcher:NodeCache:只是監(jiān)聽(tīng)某一個(gè)特定的節(jié)點(diǎn)PathChildrenCache:監(jiān)控一個(gè) ZNode 的子節(jié)點(diǎn)TreeCache:可以監(jiān)控整個(gè)樹(shù)上的所有節(jié)點(diǎn),類(lèi)似于 PathChildrenCache 和 NodeCache 的組合

監(jiān)聽(tīng)案例

整體架構(gòu)

客戶(hù)端實(shí)時(shí)監(jiān)聽(tīng)服務(wù)器動(dòng)態(tài)上下線(xiàn)

代碼實(shí)現(xiàn)

客戶(hù)端:先啟動(dòng)客戶(hù)端進(jìn)行監(jiān)聽(tīng)

public class DistributeClient {

private String connectString = "192.168.3.128:2181";

private int sessionTimeout = 20000;

private ZooKeeper zk;

public static void main(String[] args) throws Exception {

DistributeClient client = new DistributeClient();

// 1 獲取zk連接

client.getConnect();

// 2 監(jiān)聽(tīng)/servers下面子節(jié)點(diǎn)的增加和刪除

client.getServerList();

// 3 業(yè)務(wù)邏輯

client.business();

}

private void business() throws InterruptedException {

Thread.sleep(Long.MAX_VALUE);

}

private void getServerList() throws KeeperException, InterruptedException {

ArrayList servers = new ArrayList<>();

// 獲取所有子節(jié)點(diǎn),true 代表觸發(fā)監(jiān)聽(tīng)操作

List children = zk.getChildren("/servers", true);

for (String child : children) {

// 獲取子節(jié)點(diǎn)的數(shù)據(jù)

byte[] data = zk.getData("/servers/" + child, false, null);

servers.add(new String(data));

}

System.out.println(servers);

}

private void getConnect() throws IOException {

zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

getServerList();

}

});

}

}

服務(wù)端:?jiǎn)?dòng)時(shí)需要 Program arguments

public class DistributeServer {

private String connectString = "192.168.3.128:2181";

private int sessionTimeout = 20000;

private ZooKeeper zk;

public static void main(String[] args) throws Exception {

DistributeServer server = new DistributeServer();

// 1 獲取 zookeeper 連接

server.getConnect();

// 2 注冊(cè)服務(wù)器到 zk 集群,注意參數(shù)

server.register(args[0]);

// 3 啟動(dòng)業(yè)務(wù)邏輯

server.business();

}

private void business() throws InterruptedException {

Thread.sleep(Long.MAX_VALUE);

}

private void register(String hostname) throws KeeperException, InterruptedException {

// OPEN_ACL_UNSAFE: ACL 開(kāi)放

// EPHEMERAL_SEQUENTIAL: 臨時(shí)順序節(jié)點(diǎn)

String create = zk.create("/servers/" + hostname, hostname.getBytes(),

ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(hostname + " is online");

}

private void getConnect() throws IOException {

zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override

public void process(WatchedEvent event) {

}

});

}

}

分布式鎖

實(shí)現(xiàn)原理

分布式鎖可以實(shí)現(xiàn)在分布式系統(tǒng)中多個(gè)進(jìn)程有序的訪(fǎng)問(wèn)該臨界資源,多個(gè)進(jìn)程之間不會(huì)相互干擾 核心思想:當(dāng)客戶(hù)端要獲取鎖,則創(chuàng)建節(jié)點(diǎn),使用完鎖,則刪除該節(jié)點(diǎn)

客戶(hù)端獲取鎖時(shí),在 /locks 節(jié)點(diǎn)下創(chuàng)建臨時(shí)順序節(jié)點(diǎn)

使用臨時(shí)節(jié)點(diǎn)是為了防止當(dāng)服務(wù)器或客戶(hù)端宕機(jī)以后節(jié)點(diǎn)無(wú)法刪除(持久節(jié)點(diǎn)),導(dǎo)致鎖無(wú)法釋放使用順序節(jié)點(diǎn)是為了系統(tǒng)自動(dòng)編號(hào)排序,找最小的節(jié)點(diǎn),防止客戶(hù)端饑餓現(xiàn)象,保證公平

獲取 /locks 目錄的所有子節(jié)點(diǎn),判斷自己的子節(jié)點(diǎn)序號(hào)是否最小,成立則客戶(hù)端獲取到鎖,使用完鎖后將該節(jié)點(diǎn)刪除反之客戶(hù)端需要找到比自己小的節(jié)點(diǎn),對(duì)其注冊(cè)事件監(jiān)聽(tīng)器,監(jiān)聽(tīng)刪除事件客戶(hù)端的 Watcher 收到刪除事件通知,就會(huì)重新判斷當(dāng)前節(jié)點(diǎn)是否是子節(jié)點(diǎn)中序號(hào)最小,如果是則獲取到了鎖, 如果不是則重復(fù)以上步驟繼續(xù)獲取到比自己小的一個(gè)節(jié)點(diǎn)并注冊(cè)監(jiān)聽(tīng)

Curator

Curator 實(shí)現(xiàn)分布式鎖 API,在 Curator 中有五種鎖方案:

InterProcessSemaphoreMutex:分布式排它鎖(非可重入鎖)InterProcessMutex:分布式可重入排它鎖InterProcessReadWriteLock:分布式讀寫(xiě)鎖InterProcessMultiLock:將多個(gè)鎖作為單個(gè)實(shí)體管理的容器InterProcessSemaphoreV2:共享信號(hào)量

public class CuratorLock {

public static CuratorFramework getCuratorFramework() {

// 重試策略對(duì)象

ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

// 構(gòu)建客戶(hù)端

CuratorFramework client = CuratorFrameworkFactory.builder()

.connectString("192.168.3.128:2181")

.connectionTimeoutMs(2000) // 連接超時(shí)時(shí)間

.sessionTimeoutMs(20000) // 會(huì)話(huà)超時(shí)時(shí)間 單位ms

.retryPolicy(policy) // 重試策略

.build();

// 啟動(dòng)客戶(hù)端

client.start();

System.out.println("zookeeper 啟動(dòng)成功");

return client;

}

public static void main(String[] args) {

// 創(chuàng)建分布式鎖1

InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

// 創(chuàng)建分布式鎖2

InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

new Thread(new Runnable() {

@Override

public void run() {

lock1.acquire();

System.out.println("線(xiàn)程1 獲取到鎖");

Thread.sleep(5 * 1000);

lock1.release();

System.out.println("線(xiàn)程1 釋放鎖");

}

}).start();

new Thread(new Runnable() {

@Override

public void run() {

lock2.acquire();

System.out.println("線(xiàn)程2 獲取到鎖");

Thread.sleep(5 * 1000);

lock2.release();

System.out.println("線(xiàn)程2 釋放鎖");

}

}).start();

}

}

org.apache.curator

curator-framework

4.3.0

org.apache.curator

curator-recipes

4.3.0

org.apache.curator

curator-client

4.3.0

源碼解析

服務(wù)端

服務(wù)端程序的入口 QuorumPeerMain

public static void main(String[] args) {

QuorumPeerMain main = new QuorumPeerMain();

main.initializeAndRun(args);

}

initializeAndRun 的工作:

解析啟動(dòng)參數(shù)提交周期任務(wù),定時(shí)刪除過(guò)期的快照初始化通信模型,默認(rèn)是 NIO 通信

// QuorumPeerMain#runFromConfig

public void runFromConfig(QuorumPeerConfig config) {

// 通信信組件初始化,默認(rèn)是 NIO 通信

ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();

// 初始化NIO 服務(wù)端socket,綁定2181 端口,可以接收客戶(hù)端請(qǐng)求

cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);

// 啟動(dòng) zk

quorumPeer.start();

}

啟動(dòng) zookeeper

// QuorumPeer#start

public synchronized void start() {

if (!getView().containsKey(myid)) {

throw new RuntimeException("My id " + myid + " not in the peer list");

}

// 冷啟動(dòng)數(shù)據(jù)恢復(fù),將快照中數(shù)據(jù)恢復(fù)到 DataTree

loadDataBase();

// 啟動(dòng)通信工廠(chǎng)實(shí)例對(duì)象

startServerCnxnFactory();

try {

adminServer.start();

} catch (AdminServerException e) {

LOG.warn("Problem starting AdminServer", e);

System.out.println(e);

}

// 準(zhǔn)備選舉環(huán)境

startLeaderElection();

// 執(zhí)行選舉

super.start();

}

選舉機(jī)制

環(huán)境準(zhǔn)備

QuorumPeer#startLeaderElection 初始化選舉環(huán)境:

synchronized public void startLeaderElection() {

try {

// Looking 狀態(tài),需要選舉

if (getPeerState() == ServerState.LOOKING) {

// 選票組件: myid (serverid), zxid, epoch

// 開(kāi)始選票時(shí),serverid 是自己,【先投自己】

currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());

}

}

if (electionType == 0) {

try {

udpSocket = new DatagramSocket(getQuorumAddress().getPort());

// 響應(yīng)投票結(jié)果線(xiàn)程

responder = new ResponderThread();

responder.start();

} catch (SocketException e) {

throw new RuntimeException(e);

}

}

// 創(chuàng)建選舉算法實(shí)例

this.electionAlg = createElectionAlgorithm(electionType);

}

// zk總的發(fā)送和接收隊(duì)列準(zhǔn)備好

protected Election createElectionAlgorithm(int electionAlgorithm){

// 負(fù)責(zé)選舉過(guò)程中的所有網(wǎng)絡(luò)通信,創(chuàng)建各種隊(duì)列和集合

QuorumCnxManager qcm = createCnxnManager();

QuorumCnxManager.Listener listener = qcm.listener;

if(listener != null){

// 啟動(dòng)監(jiān)聽(tīng)線(xiàn)程, 調(diào)用 client = ss.accept()阻塞,等待處理請(qǐng)求

listener.start();

// 準(zhǔn)備好發(fā)送和接收隊(duì)列準(zhǔn)備

FastLeaderElection fle = new FastLeaderElection(this, qcm);

// 啟動(dòng)選舉線(xiàn)程,【W(wǎng)orkerSender 和 WorkerReceiver】

fle.start();

le = fle;

}

}

選舉源碼

當(dāng) Zookeeper 啟動(dòng)后,首先都是 Looking 狀態(tài),通過(guò)選舉讓其中一臺(tái)服務(wù)器成為 Leader 執(zhí)行 super.start() 相當(dāng)于執(zhí)行 QuorumPeer#run() 方法

public void run() {

case LOOKING:

// 進(jìn)行選舉,選舉結(jié)束返回最終成為 Leader 勝選的那張選票

setCurrentVote(makeLEStrategy().lookForLeader());

}

FastLeaderElection 類(lèi):

lookForLeader:選舉

public Vote lookForLeader() {

// 正常啟動(dòng)中其他服務(wù)器都會(huì)向我發(fā)送一個(gè)投票,保存每個(gè)服務(wù)器的最新合法有效的投票

HashMap recvset = new HashMap();

// 存儲(chǔ)合法選舉之外的投票結(jié)果

HashMap outofelection = new HashMap();

// 一次選舉的最大等待時(shí)間,默認(rèn)值是0.2s

int notTimeout = finalizeWait;

// 每發(fā)起一輪選舉,logicalclock++,在沒(méi)有合法的epoch 數(shù)據(jù)之前,都使用邏輯時(shí)鐘代替

synchronized(this){

// 更新邏輯時(shí)鐘,每進(jìn)行一次選舉,都需要更新邏輯時(shí)鐘

logicalclock.incrementAndGet();

// 更新選票(serverid, zxid, epoch)

updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());

}

// 廣播選票,把自己的選票發(fā)給其他服務(wù)器

sendNotifications();

// 一輪一輪的選舉直到選舉成功

while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ }

}

sendNotifications:廣播選票

private void sendNotifications() {

// 遍歷投票參與者,給每臺(tái)服務(wù)器發(fā)送選票

for (long sid : self.getCurrentAndNextConfigVoters()) {

// 創(chuàng)建發(fā)送選票

ToSend notmsg = new ToSend(...);

// 把發(fā)送選票放入發(fā)送隊(duì)列

sendqueue.offer(notmsg);

}

}

FastLeaderElection 中有 WorkerSender 線(xiàn)程:

ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS):阻塞獲取要發(fā)送的選票process(m):處理要發(fā)送的選票 manager.toSend(m.sid, requestBuffer):發(fā)送選票

if (this.mySid == sid):如果消息的接收者 sid 是自己,直接進(jìn)入自己的 RecvQueue(自己投自己)else:如果接收者是其他服務(wù)器,創(chuàng)建對(duì)應(yīng)的發(fā)送隊(duì)列或者復(fù)用已經(jīng)存在的發(fā)送隊(duì)列,把消息放入該隊(duì)列connectOne(sid):建立連接

sock.connect(electionAddr, cnxTO):建立與 sid 服務(wù)器的連接initiateConnection(sock, sid):初始化連接 startConnection(sock, sid):創(chuàng)建并啟動(dòng)發(fā)送器線(xiàn)程和接收器線(xiàn)程

dout = new DataOutputStream(buf):獲取 Socket 輸出流,向服務(wù)器發(fā)送數(shù)據(jù)din = new DataInputStream(new BIS(sock.getInputStream()))):通過(guò)輸入流讀取對(duì)方發(fā)送過(guò)來(lái)的選票if (sid > self.getId()):接收者 sid 比我的大,沒(méi)有資格給對(duì)方發(fā)送連接請(qǐng)求的,直接關(guān)閉自己的客戶(hù)端SendWorker sw:初始化發(fā)送器,并啟動(dòng)發(fā)送器線(xiàn)程,線(xiàn)程 run 方法

while (running && !shutdown && sock != null):連接沒(méi)有斷開(kāi)就一直運(yùn)行ByteBuffer b = pollSendQueue():從發(fā)送隊(duì)列 SendQueue 中獲取發(fā)送消息lastMessageSent.put(sid, b):更新對(duì)于 sid 這臺(tái)服務(wù)器的最近一條消息send(b):執(zhí)行發(fā)送

RecvWorker rw:初始化接收器,并啟動(dòng)接收器線(xiàn)程

din.readFully(msgArray, 0, length):輸入流接收消息addToRecvQueue(new Message(messagg, sid)):將消息放入接收消息 recvQueue 隊(duì)列 FastLeaderElection 中有 WorkerReceiver 線(xiàn)程

response = manager.pollRecvQueue():從 RecvQueue 中阻塞獲取出選舉投票消息(其他服務(wù)器發(fā)送過(guò)來(lái)的)

狀態(tài)同步

選舉結(jié)束后,每個(gè)節(jié)點(diǎn)都需要根據(jù)角色更新自己的狀態(tài),Leader 更新?tīng)顟B(tài)為 Leader,其他節(jié)點(diǎn)更新?tīng)顟B(tài)為 Follower,整體流程:

Follower 需要讓 Leader 知道自己的狀態(tài) (sid, epoch, zxid)Leader 接收到信息,根據(jù)信息構(gòu)建新的 epoch,要返回對(duì)應(yīng)的信息給 Follower,F(xiàn)ollower 更新自己的 epochLeader 需要根據(jù) Follower 的狀態(tài),確定何種方式的數(shù)據(jù)同步 DIFF、TRUNC、SNAP,就是要以 Leader 服務(wù)器數(shù)據(jù)為準(zhǔn)

DIFF:Leader 提交的 zxid 比 Follower 的 zxid 大,發(fā)送 Proposal 給 Follower 提交執(zhí)行TRUNC:Follower 的 zxid 比leader 的 zxid 大,F(xiàn)ollower 要進(jìn)行回滾SNAP:Follower 沒(méi)有任何數(shù)據(jù),直接全量同步

執(zhí)行數(shù)據(jù)同步,當(dāng) Leader 接收到超過(guò)半數(shù) Follower 的 Ack 之后,進(jìn)入正常工作狀態(tài),集群?jiǎn)?dòng)完成 核心函數(shù)解析:Leader 更新?tīng)顟B(tài)入口:Leader.lead()

zk.loadData():恢復(fù)數(shù)據(jù)到內(nèi)存cnxAcceptor = new LearnerCnxAcceptor():?jiǎn)?dòng)通信組件

s = ss.accept():等待其他 Follower 節(jié)點(diǎn)向 Leader 節(jié)點(diǎn)發(fā)送同步狀態(tài)LearnerHandler fh:接收到 Follower 的請(qǐng)求,就創(chuàng)建 LearnerHandler 對(duì)象fh.start():?jiǎn)?dòng)線(xiàn)程,通過(guò) switch-case 語(yǔ)法判斷接收的命令,執(zhí)行相應(yīng)的操作

Follower 更新?tīng)顟B(tài)入口:Follower.followerLeader()

QuorumServer leaderServer = findLeader():查找 LeaderconnectToLeader(addr, hostname):與 Leader 建立連接long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO):向 Leader 注冊(cè)

主從工作

Leader:主服務(wù)的工作流程

Follower:從服務(wù)的工作流程,核心函數(shù)為 Follower#followLeader()

readPacket(qp):讀取信息processPacket(qp):處理信息

protected void processPacket(QuorumPacket qp) throws Exception{

switch (qp.getType()) {

case Leader.PING:

break;

case Leader.PROPOSAL:

break;

case Leader.COMMIT:

break;

case Leader.COMMITANDACTIVATE:

break;

case Leader.UPTODATE:

break;

case Leader.REVALIDATE:

break;

case Leader.SYNC:

break;

default:

break;

}

}

客戶(hù)端

柚子快報(bào)邀請(qǐng)碼778899分享:Zookeeper

http://yzkb.51969.com/

推薦鏈接

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀(guān)點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19192041.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪(fǎng)問(wèn)

文章目錄