柚子快報邀請碼778899分享:分布式 Kafka基礎 (上)
柚子快報邀請碼778899分享:分布式 Kafka基礎 (上)
前言
各位清明 快樂呀,近期博主也是學習了一下kafka,以下是博主的一些學習筆記,希望對你有所幫助
前置知識
線程中的數(shù)據(jù)交互以及進程中的數(shù)據(jù)交互
我們知道線程之間可以使用堆空間進行數(shù)據(jù)交互的
但是如果發(fā)送方和接收方處理數(shù)據(jù)的效率差距過大,這里就會造成消息積壓的問題,怎么處理呢?存入文件顯然是不可取的,因為這里文件的大小也是有上限的,所以我們加上一個中間件,也就是kafka
這里進程呢?
進程之間肯定是不可以使用共用內(nèi)存進行交互的,這里就采用網(wǎng)絡傳輸?shù)姆绞竭M行交互,因為他們的內(nèi)存都是獨立存在的,使用socket網(wǎng)絡傳輸即可
我們知道一個一般處理消息的不止一個消費者,這樣直接讓消費者和生產(chǎn)者進行交互耦合度也就太高了,我們也引入了消息中間件來降低消息的耦合度吧
JMS Java Message Service
JMS包含了p2p和消息訂閱發(fā)布模型,基本上很多mq都是遵循這個模型的
我們kafka沒有加上mq的的后綴,他其實不是完全遵循這個模型
下面我們介紹一下這個模型
p2p 點對點模型
這里指的是一條消息只能被消費者消費一次,然后消費者會給生產(chǎn)者一個反饋
sub/pub訂閱發(fā)布模型
生產(chǎn)者生產(chǎn)的消息會發(fā)送到對應的topic,訂閱了這個topic的消費者都可以消費數(shù)據(jù),同樣的數(shù)據(jù)可以被不同的消費者進行消費
注:本文基于的Windows的kafka進行演示學習,kafka一般部署在linux操作系統(tǒng)上
kafka的生產(chǎn)者消費者模型
kafka在底層大量的使用生產(chǎn)者消費者模型
并且為了保證數(shù)據(jù)的安全性,其還使用了日志文件進行了數(shù)據(jù)的保存
下面我們通過一個簡單的helloworld程序來感受一下
我們先啟動zookeeper 再啟動kafka即可
注意,先進行修改 兩個配置文件,存放對應的data
注:記得進入對應的文件夾,使用對應的bat腳本文件
先演示一下單機
開啟zookeeper腳本
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
開啟kafka腳本
call bin/windows/kafka-server-start.bat config/server.properties
創(chuàng)建主題
查看主題
執(zhí)行經(jīng)典helloworld
注:啟動完一定要先創(chuàng)建主題,主題是kafka一個基本的邏輯分類單位,先開啟zookeeper再開啟kafka
如果這里kafka客戶端一閃而過啟動失敗的情況,直接刪除data文件即可
maven項目簡單搭建
引入依賴
注:在kafka中提供服務的節(jié)點就稱之為broker
producer代碼
package kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class testProducer {
public static void main(String[] args) {
//TODO 創(chuàng)建配置對象
Map
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//TODO 對生產(chǎn)的KV操作進序列化
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//TODO 創(chuàng)建生產(chǎn)者對象
//生產(chǎn)者對象需要確定泛型,是kv類型的
KafkaProducer
//TODO 創(chuàng)建數(shù)據(jù)
//構建數(shù)據(jù)時,需要傳入三個參數(shù),主題,key,value
for (int i = 0; i < 10; i++) {
ProducerRecord
//TODO 通過生產(chǎn)者對象將數(shù)據(jù)發(fā)送給kafka
producer.send(record);
}
//TODO 關閉生產(chǎn)者對象
producer.close();
}
}
consumer代碼
package kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class testConsumer {
public static void main(String[] args) {
//TODO 創(chuàng)建消費者對象
//消費者也需要相應的配置
Map
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//反序列化
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//配置groupID
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
KafkaConsumer
//TODO 訂閱主題
consumer.subscribe(Collections.singletonList("test"));
//TODO 從kafka的主題中獲取數(shù)據(jù)
//消費者從kafka拉取數(shù)據(jù) 不是推送的概念
while(true) {
ConsumerRecords
for (ConsumerRecord
System.out.println(date);
}
}
//TODO 關閉消費者對象
//consumer.close();
}
}
先啟動consumer再啟動producer,讓我們在可視化工具上查看一下信息是否存在了
這里使用的是kafkatool
執(zhí)行完成就可以發(fā)現(xiàn)數(shù)據(jù)已經(jīng)存在了
?Kafka系統(tǒng)架構以及核心組件
我們都知道kafka肯定不是只有一個生產(chǎn)者和一個消費者呀
當這里的數(shù)據(jù)頻繁生產(chǎn)消費就可能造成IO熱點問題
最后這個節(jié)點可能就成為分布式系統(tǒng)的性能瓶頸,一旦節(jié)點掛了,就可能造成數(shù)據(jù)丟失
tips:這里的掛了可能只是網(wǎng)絡不穩(wěn)定,資源耗盡等問題導致的長時間連接不上
解決方案
橫向擴展和縱向擴展
橫向擴展:使用更快的網(wǎng)絡,更大的磁盤...無法根本解決問題
縱向擴展:使用集群的方式,也是kafka的解決方案
這還沒有結束,光增加節(jié)點是沒用的,生產(chǎn)和消費請求還是指向同一個節(jié)點,我們需要將這里的數(shù)據(jù)分散到各個節(jié)點,實現(xiàn)同一個主題在不同的broker中
區(qū)分不同的數(shù)據(jù),加上編號就稱之為分區(qū),這也是kafka物理上的存儲單位?
例如 partition-0
一個主題可以有多個分區(qū),分散在不同的broker中
但是每個消費者也不能只消費一部分數(shù)據(jù)呀,每個消費者向每個分區(qū)發(fā)送請求,這里效率也是很低的,所以又提出了消費組的概念
并且為了數(shù)據(jù)的安全考慮,也提出了將數(shù)據(jù)進行備份的方案,但是并不在自己的節(jié)點進行備份,因為在自己的節(jié)點進行備份的話,自己掛了,備份也沒了,所以這里是在其他節(jié)點保存?zhèn)浞菸募?稱之為foller副本,kafka中備份統(tǒng)稱為副本,主文件叫做leader副本,只有l(wèi)eader副本可以讀寫,foller副本只負責備份.
基礎組件
每一個kafka節(jié)點中都包含很多個組件,下面我們來介紹一下經(jīng)典的幾個組件
首先就是我們的Controller了,在多個節(jié)點中我們得選舉出一個管理者
這里管理者選舉的操作就交給我們的Zookeeper了
這里的選舉也很簡單粗暴,哪個節(jié)點先和他建立連接,他就是Controller
Controller的備份
1.采用備份的方式
2.升級,每個節(jié)點都能做備份
這里假設一個節(jié)點掛了,可以通過Zookeeper的一個選舉功能在選舉出新的Controller?
broker架構
為啥生產(chǎn)者和消費者指向同一個broker呢
因為數(shù)據(jù)是有主題的,主題是有分區(qū)的,分區(qū)是有副本的,一個叫l(wèi)eader,一個叫foller
指向同一個broker是因為他對應的分區(qū)是leader副本,分區(qū)管理器會將其同步到文件
集群部署
我們知道kafka一般是以集群方式出現(xiàn)
為了模擬,我們也部署一下集群
這時候解壓三個kafka到不同文件夾,修改data配置以及端口即可
可以設置為9091 9092 9093??
注意Zookeeper也得配置
可以寫批處理腳本,這樣運行起來更加方便
出現(xiàn)以下問題就將其放在根目錄下或者將文件夾名改短
可視化工具創(chuàng)建主題
注:這里副本數(shù)量超過節(jié)點數(shù)量不會創(chuàng)建成功,因為一個節(jié)點放多個副本是無意義的
Zookeeper的作用
我們簡述一下Zookeeper的作用
1.Controller的選舉
選舉規(guī)則就是比較隨意,第一個建立和zookeeper建立連接的broker節(jié)點就是 controller 然后其他的節(jié)點來建立連接的時候也想創(chuàng)建,但是controller已經(jīng)有了,之后的節(jié)點就是放一個監(jiān)聽器,假設現(xiàn)在的Controller掛了,這個監(jiān)聽器就起作用了,從其余的broker中選舉出新的broker
2.對節(jié)點的監(jiān)聽
Znode節(jié)點有個監(jiān)聽功能 可以使用kafka對節(jié)點進行監(jiān)聽到節(jié)點的變化 數(shù)據(jù)的變化 連接超時... 監(jiān)聽到以后馬上通知kafka進行對應的處理
Controller和Broker之間的通信
第一個broker啟動的流程 1.注冊broker節(jié)點 監(jiān)聽controller節(jié)點 2.注冊controller節(jié)點 選舉成為controller,監(jiān)聽/broker/ids節(jié)點 因為broker啟動就會創(chuàng)建ids,所以這里的監(jiān)聽主要就是看看ids的變化,是否有新的節(jié)點創(chuàng)建了 第二個節(jié)點加入之后監(jiān)聽器就知道了,會通知broker1集群的變化 然后在第二個broker進來之后還會和第一個節(jié)點連接 傳輸一些集群的信息等等 但是第三個節(jié)點連接上來之后,controller會給兩個broker都發(fā)送相關的集群信息
這也就是說,每當有節(jié)點連上了之后,controller就會向各個節(jié)點發(fā)送對應的集群信息
Broker組件
主要是包含日志組件? 網(wǎng)絡客戶端? 副本管理器 controller信息? kafka apis(負責處理數(shù)據(jù))? ? ? ? ?Zookeeper的客戶端等等
手動創(chuàng)建主題
我們之前的主題使用的都是默認參數(shù)自動創(chuàng)建的,我們?nèi)绻胄薷钠渲械膮?shù)就得手動創(chuàng)建對應的admin管理員對象 從而對他的副本信息分區(qū)信息進行設置
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class AdminTopicTest {
public static void main(String[] args) {
Map
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//TODO 創(chuàng)建管理員
Admin admin = Admin.create(configs);
//TODO 創(chuàng)建主題
//第一個參數(shù)是主題名
//第二個參數(shù)是分區(qū)的數(shù)量 int
//第三個參數(shù)是副本的因子(本質(zhì)是數(shù)量) short
String topicName = "test1";
int partitionCount = 1;
short replicationCount = 1;
NewTopic topic = new NewTopic(topicName,partitionCount,replicationCount);
String topicName2 = "test2";
int partitionCount2 = 2;
short replicationCount2 = 2;
NewTopic topic2 = new NewTopic(topicName2,partitionCount2,replicationCount2);
CreateTopicsResult result = admin.createTopics(
Arrays.asList(topic, topic2)
);
//TODO 關閉管理者對象
admin.close();
}
}
?副本分配策略
主題只是邏輯上的分類,只有分區(qū)才能在物理文件上以及存儲中有所體現(xiàn)
我們知道我們是使用多個副本冗余來提高數(shù)據(jù)的可靠性的
那么副本在節(jié)點中又是如何分配的呢,咱們接下來慢慢說
先說理想的情況
我們知道副本也分為leader和follower
我們這里的均衡指的是leader的分布應該是均勻的
我們先說理想情況下
我們希望每個節(jié)點的leader數(shù)量都是相近的
實際上kafka并不是這樣的
因為副本的創(chuàng)建是有順序的,我們無法再一開始就預測浩好這里的副本分配
kafka是采用一個簡單的分配算法來進行的副本分配
例子
注:我們也可以自己手動分配
一個重要的名詞? ISR? in-sync-Replication? ?就是同步副本列表的意思
主題創(chuàng)建流程
大概就是先問一下controller在哪,通過controller來創(chuàng)建topic
但是底層有很多生產(chǎn)者消費者模型
這里的具體操作由apis接口來實現(xiàn)
生產(chǎn)數(shù)據(jù)
一般主題都是提前創(chuàng)建好的,如果使用自動創(chuàng)建的話很可能導致IO熱點問題
因為副本的leader都在同一個節(jié)點上
具體流程如下圖
生產(chǎn)者數(shù)據(jù)先通過攔截器的攔截,然后去元數(shù)據(jù)區(qū)獲取controller的信息,然后進行序列化(因為是通過網(wǎng)絡傳輸?shù)臄?shù)據(jù)),在通過分區(qū)器確定分區(qū),最后加入緩沖區(qū)等待發(fā)送
注:攔截器是對數(shù)據(jù)進行了一些規(guī)范化的處理,但是出現(xiàn)錯誤之后不會導致程序的停止,不影響數(shù)據(jù)的發(fā)送,捕捉到異常也不會進行處理
然后通過發(fā)送線程繼續(xù)發(fā)送數(shù)據(jù),這里的在途請求緩沖區(qū)的大小表示一個節(jié)點在同一時間最多處理的請求數(shù)量,默認是5,這是經(jīng)過壓力測試的,這樣性能最優(yōu)
分區(qū)器
我們剛剛看到數(shù)據(jù)會經(jīng)過分區(qū)器處理來知道放到哪個分區(qū),下面我們介紹一下分區(qū)器是怎么工作的,分區(qū)器是從元數(shù)據(jù)區(qū)獲取到主題信息再開始計算分區(qū)的,注意這里根據(jù)的主題信息直接指定分區(qū)的話是不會做校驗而是直接使用的
算法
分區(qū)算法,將key使用散列算法后和分區(qū)數(shù)進行一次取模運算,在寫入數(shù)據(jù)收集器的時候,就需要進行處理了,如果當前主題分區(qū)是未知分區(qū),就會根據(jù)當前主題分區(qū)的負載情況動態(tài)進行分區(qū)(粘性分區(qū)策略)如果當前發(fā)送的時候沒有分區(qū)負載情況,這時候就是隨機選擇的,選擇以后就盡可能向這里添加,超過閾值就會切換另外一個分區(qū),閾值默認是16k,后面不為空之后就會根據(jù)每個分區(qū)的負載情況生成一個隨機的權重,然后通過一個二分查找找到一個和這個值相近的,然后算出來分區(qū)編號
緩沖區(qū)
緩沖區(qū)中對數(shù)據(jù)的追加是只要批次大小足夠,沒到達閾值,直接向后追加即可
批次對象空間不足 將滿了的批次對象鎖定并關閉,等著sender線程來拿,然后重新開一個批次對象來追加這里的數(shù)據(jù) 數(shù)據(jù)是可以超過16k的,比如60k的數(shù)據(jù),直接裝,然后關閉準備發(fā)車,是不可以拆開的
sender發(fā)送線程
會將符合發(fā)送條件的數(shù)據(jù)重新進行整合,前面是因為相同主題的不同分區(qū)可能在不同的broker中,但是不同主題的分區(qū)可能在相同的broker中,用topic進行區(qū)分效率更高一點
批次對象到達大小或者是時間閾值之后就會被發(fā)送
應答機制 ACKS
本質(zhì)上是使用異步的方式
發(fā)送數(shù)據(jù)無需等待應答以后再繼續(xù)發(fā)送
這樣數(shù)據(jù)的發(fā)送效率高了,但是安全性無法保證
Kafka就面對不同的場景給出了三個ACKS處理等級
分別是 0 1? all
0就是優(yōu)先考慮效率
all就是優(yōu)先考慮數(shù)據(jù)的安全性
1就是兩者之間的折中考慮
ACKS = 0
表示只是將數(shù)據(jù)放到網(wǎng)絡中了,根本不關心其是否發(fā)送完成,直到放到網(wǎng)絡中就給main線程發(fā)送一個應答
ACKS等級為1的時候是需要數(shù)據(jù)在leader中進行保存到文件中之后才能應答
all等級是等待數(shù)據(jù)進行備份之后才進行對應的應答
retry重試機制
我們知道數(shù)據(jù)既然是在網(wǎng)絡中傳輸?shù)?那么數(shù)據(jù)丟包是很正常的,假設網(wǎng)絡不穩(wěn)定等等情況就很容易導致數(shù)據(jù)的丟包等等
我們這時候和tcp協(xié)議一樣定義了數(shù)據(jù)的重傳機制
只要主線程沒有收到acks,到達一定的超時時間,這時候就會將數(shù)據(jù)再次放回緩沖區(qū)重新進行一次發(fā)送
但是這也會導致一定的問題,比如數(shù)據(jù)重復多次或者是數(shù)據(jù)亂序問題
好處是讓數(shù)據(jù)更安全,但是也有壞處
數(shù)據(jù)重復:
假設這里leader寫入了磁盤了,但是傳ack的時候網(wǎng)絡不穩(wěn)定,沒發(fā)成,這里就會再傳一次 這里數(shù)據(jù)就在文件中放了兩次
數(shù)據(jù)亂序:
還有一個問題就是數(shù)據(jù)的順序問題,發(fā)生順序是 a b c 但是可能 a發(fā)送失敗重發(fā)了 結果就是b c a 的順序了
這在某種情況下不是我們想看到的,于是我們又引入了冪等性操作和事務的概念
冪等性
冪等性要求數(shù)據(jù)的ACKS等級一定是all或者-1? (這倆等級一個意思),并且必須開啟retry ,并且要求在途請求緩沖區(qū)的數(shù)量必須小于等于5
實現(xiàn)?
就是給數(shù)據(jù)標上生產(chǎn)者編號,標上數(shù)據(jù)序號,但是注意這里的冪等性不可以跨越客戶端或者是跨越分區(qū)來起作用
這里的冪等性只是在同一個分區(qū)內(nèi)的冪等
數(shù)據(jù)在發(fā)送給Kafka的時候,kafka會記錄生產(chǎn)者的狀態(tài)
重復是靠在加入在途數(shù)據(jù)緩沖區(qū)的時候判斷一下contains
有序是按照序號來的,下一個加入的數(shù)據(jù)必須大于當前的最后一個數(shù)據(jù)
缺陷:
只能保證一個分區(qū)的數(shù)據(jù)是有序且不重復的
但是如果這時候生產(chǎn)者重啟了,此時仍然會導致數(shù)據(jù)的重復
這就要通過下面的事務來完成對這個缺點的補充了
事務
這里的事務是基于冪等性的,和數(shù)據(jù)庫中的事務完全不是一個意思
基本原理是保證生產(chǎn)者id重啟前后不會改變
執(zhí)行順序如下
注:事務這里的發(fā)送數(shù)據(jù)不是通過send方法進行發(fā)送,而是commit才會發(fā)送,send只是將數(shù)據(jù)放到緩沖區(qū)
柚子快報邀請碼778899分享:分布式 Kafka基礎 (上)
參考閱讀
本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權,聯(lián)系刪除。