欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:分布式 Kafka基礎 (上)

柚子快報邀請碼778899分享:分布式 Kafka基礎 (上)

http://yzkb.51969.com/

前言

各位清明 快樂呀,近期博主也是學習了一下kafka,以下是博主的一些學習筆記,希望對你有所幫助

前置知識

線程中的數(shù)據(jù)交互以及進程中的數(shù)據(jù)交互

我們知道線程之間可以使用堆空間進行數(shù)據(jù)交互的

但是如果發(fā)送方和接收方處理數(shù)據(jù)的效率差距過大,這里就會造成消息積壓的問題,怎么處理呢?存入文件顯然是不可取的,因為這里文件的大小也是有上限的,所以我們加上一個中間件,也就是kafka

這里進程呢?

進程之間肯定是不可以使用共用內(nèi)存進行交互的,這里就采用網(wǎng)絡傳輸?shù)姆绞竭M行交互,因為他們的內(nèi)存都是獨立存在的,使用socket網(wǎng)絡傳輸即可

我們知道一個一般處理消息的不止一個消費者,這樣直接讓消費者和生產(chǎn)者進行交互耦合度也就太高了,我們也引入了消息中間件來降低消息的耦合度吧

JMS Java Message Service

JMS包含了p2p和消息訂閱發(fā)布模型,基本上很多mq都是遵循這個模型的

我們kafka沒有加上mq的的后綴,他其實不是完全遵循這個模型

下面我們介紹一下這個模型

p2p 點對點模型

這里指的是一條消息只能被消費者消費一次,然后消費者會給生產(chǎn)者一個反饋

sub/pub訂閱發(fā)布模型

生產(chǎn)者生產(chǎn)的消息會發(fā)送到對應的topic,訂閱了這個topic的消費者都可以消費數(shù)據(jù),同樣的數(shù)據(jù)可以被不同的消費者進行消費

注:本文基于的Windows的kafka進行演示學習,kafka一般部署在linux操作系統(tǒng)上

kafka的生產(chǎn)者消費者模型

kafka在底層大量的使用生產(chǎn)者消費者模型

并且為了保證數(shù)據(jù)的安全性,其還使用了日志文件進行了數(shù)據(jù)的保存

下面我們通過一個簡單的helloworld程序來感受一下

我們先啟動zookeeper 再啟動kafka即可

注意,先進行修改 兩個配置文件,存放對應的data

注:記得進入對應的文件夾,使用對應的bat腳本文件

先演示一下單機

開啟zookeeper腳本

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

開啟kafka腳本

call bin/windows/kafka-server-start.bat config/server.properties

創(chuàng)建主題

查看主題

執(zhí)行經(jīng)典helloworld

注:啟動完一定要先創(chuàng)建主題,主題是kafka一個基本的邏輯分類單位,先開啟zookeeper再開啟kafka

如果這里kafka客戶端一閃而過啟動失敗的情況,直接刪除data文件即可

maven項目簡單搭建

引入依賴

org.apache.kafka

kafka-clients

3.6.1

注:在kafka中提供服務的節(jié)點就稱之為broker

producer代碼

package kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;

import java.util.Map;

public class testProducer {

public static void main(String[] args) {

//TODO 創(chuàng)建配置對象

Map configMap = new HashMap<>();

configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

//TODO 對生產(chǎn)的KV操作進序列化

configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//TODO 創(chuàng)建生產(chǎn)者對象

//生產(chǎn)者對象需要確定泛型,是kv類型的

KafkaProducer producer = new KafkaProducer<>(configMap);

//TODO 創(chuàng)建數(shù)據(jù)

//構建數(shù)據(jù)時,需要傳入三個參數(shù),主題,key,value

for (int i = 0; i < 10; i++) {

ProducerRecord record = new ProducerRecord<>("test", "key"+i,"hello kafka"+i);

//TODO 通過生產(chǎn)者對象將數(shù)據(jù)發(fā)送給kafka

producer.send(record);

}

//TODO 關閉生產(chǎn)者對象

producer.close();

}

}

consumer代碼

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;

import java.util.HashMap;

import java.util.Map;

public class testConsumer {

public static void main(String[] args) {

//TODO 創(chuàng)建消費者對象

//消費者也需要相應的配置

Map consumerConfig = new HashMap<>();

consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

//反序列化

consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

//配置groupID

consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);

//TODO 訂閱主題

consumer.subscribe(Collections.singletonList("test"));

//TODO 從kafka的主題中獲取數(shù)據(jù)

//消費者從kafka拉取數(shù)據(jù) 不是推送的概念

while(true) {

ConsumerRecords dates = consumer.poll(100);

for (ConsumerRecord date : dates) {

System.out.println(date);

}

}

//TODO 關閉消費者對象

//consumer.close();

}

}

先啟動consumer再啟動producer,讓我們在可視化工具上查看一下信息是否存在了

這里使用的是kafkatool

執(zhí)行完成就可以發(fā)現(xiàn)數(shù)據(jù)已經(jīng)存在了

?Kafka系統(tǒng)架構以及核心組件

我們都知道kafka肯定不是只有一個生產(chǎn)者和一個消費者呀

當這里的數(shù)據(jù)頻繁生產(chǎn)消費就可能造成IO熱點問題

最后這個節(jié)點可能就成為分布式系統(tǒng)的性能瓶頸,一旦節(jié)點掛了,就可能造成數(shù)據(jù)丟失

tips:這里的掛了可能只是網(wǎng)絡不穩(wěn)定,資源耗盡等問題導致的長時間連接不上

解決方案

橫向擴展和縱向擴展

橫向擴展:使用更快的網(wǎng)絡,更大的磁盤...無法根本解決問題

縱向擴展:使用集群的方式,也是kafka的解決方案

這還沒有結束,光增加節(jié)點是沒用的,生產(chǎn)和消費請求還是指向同一個節(jié)點,我們需要將這里的數(shù)據(jù)分散到各個節(jié)點,實現(xiàn)同一個主題在不同的broker中

區(qū)分不同的數(shù)據(jù),加上編號就稱之為分區(qū),這也是kafka物理上的存儲單位?

例如 partition-0

一個主題可以有多個分區(qū),分散在不同的broker中

但是每個消費者也不能只消費一部分數(shù)據(jù)呀,每個消費者向每個分區(qū)發(fā)送請求,這里效率也是很低的,所以又提出了消費組的概念

并且為了數(shù)據(jù)的安全考慮,也提出了將數(shù)據(jù)進行備份的方案,但是并不在自己的節(jié)點進行備份,因為在自己的節(jié)點進行備份的話,自己掛了,備份也沒了,所以這里是在其他節(jié)點保存?zhèn)浞菸募?稱之為foller副本,kafka中備份統(tǒng)稱為副本,主文件叫做leader副本,只有l(wèi)eader副本可以讀寫,foller副本只負責備份.

基礎組件

每一個kafka節(jié)點中都包含很多個組件,下面我們來介紹一下經(jīng)典的幾個組件

首先就是我們的Controller了,在多個節(jié)點中我們得選舉出一個管理者

這里管理者選舉的操作就交給我們的Zookeeper了

這里的選舉也很簡單粗暴,哪個節(jié)點先和他建立連接,他就是Controller

Controller的備份

1.采用備份的方式

2.升級,每個節(jié)點都能做備份

這里假設一個節(jié)點掛了,可以通過Zookeeper的一個選舉功能在選舉出新的Controller?

broker架構

為啥生產(chǎn)者和消費者指向同一個broker呢

因為數(shù)據(jù)是有主題的,主題是有分區(qū)的,分區(qū)是有副本的,一個叫l(wèi)eader,一個叫foller

指向同一個broker是因為他對應的分區(qū)是leader副本,分區(qū)管理器會將其同步到文件

集群部署

我們知道kafka一般是以集群方式出現(xiàn)

為了模擬,我們也部署一下集群

這時候解壓三個kafka到不同文件夾,修改data配置以及端口即可

可以設置為9091 9092 9093??

注意Zookeeper也得配置

可以寫批處理腳本,這樣運行起來更加方便

出現(xiàn)以下問題就將其放在根目錄下或者將文件夾名改短

可視化工具創(chuàng)建主題

注:這里副本數(shù)量超過節(jié)點數(shù)量不會創(chuàng)建成功,因為一個節(jié)點放多個副本是無意義的

Zookeeper的作用

我們簡述一下Zookeeper的作用

1.Controller的選舉

選舉規(guī)則就是比較隨意,第一個建立和zookeeper建立連接的broker節(jié)點就是 controller 然后其他的節(jié)點來建立連接的時候也想創(chuàng)建,但是controller已經(jīng)有了,之后的節(jié)點就是放一個監(jiān)聽器,假設現(xiàn)在的Controller掛了,這個監(jiān)聽器就起作用了,從其余的broker中選舉出新的broker

2.對節(jié)點的監(jiān)聽

Znode節(jié)點有個監(jiān)聽功能 可以使用kafka對節(jié)點進行監(jiān)聽到節(jié)點的變化 數(shù)據(jù)的變化 連接超時... 監(jiān)聽到以后馬上通知kafka進行對應的處理

Controller和Broker之間的通信

第一個broker啟動的流程 1.注冊broker節(jié)點 監(jiān)聽controller節(jié)點 2.注冊controller節(jié)點 選舉成為controller,監(jiān)聽/broker/ids節(jié)點 因為broker啟動就會創(chuàng)建ids,所以這里的監(jiān)聽主要就是看看ids的變化,是否有新的節(jié)點創(chuàng)建了 第二個節(jié)點加入之后監(jiān)聽器就知道了,會通知broker1集群的變化 然后在第二個broker進來之后還會和第一個節(jié)點連接 傳輸一些集群的信息等等 但是第三個節(jié)點連接上來之后,controller會給兩個broker都發(fā)送相關的集群信息

這也就是說,每當有節(jié)點連上了之后,controller就會向各個節(jié)點發(fā)送對應的集群信息

Broker組件

主要是包含日志組件? 網(wǎng)絡客戶端? 副本管理器 controller信息? kafka apis(負責處理數(shù)據(jù))? ? ? ? ?Zookeeper的客戶端等等

手動創(chuàng)建主題

我們之前的主題使用的都是默認參數(shù)自動創(chuàng)建的,我們?nèi)绻胄薷钠渲械膮?shù)就得手動創(chuàng)建對應的admin管理員對象 從而對他的副本信息分區(qū)信息進行設置

import org.apache.kafka.clients.admin.Admin;

import org.apache.kafka.clients.admin.AdminClientConfig;

import org.apache.kafka.clients.admin.CreateTopicsResult;

import org.apache.kafka.clients.admin.NewTopic;

import java.util.Arrays;

import java.util.HashMap;

import java.util.Map;

public class AdminTopicTest {

public static void main(String[] args) {

Map configs = new HashMap<>();

configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

//TODO 創(chuàng)建管理員

Admin admin = Admin.create(configs);

//TODO 創(chuàng)建主題

//第一個參數(shù)是主題名

//第二個參數(shù)是分區(qū)的數(shù)量 int

//第三個參數(shù)是副本的因子(本質(zhì)是數(shù)量) short

String topicName = "test1";

int partitionCount = 1;

short replicationCount = 1;

NewTopic topic = new NewTopic(topicName,partitionCount,replicationCount);

String topicName2 = "test2";

int partitionCount2 = 2;

short replicationCount2 = 2;

NewTopic topic2 = new NewTopic(topicName2,partitionCount2,replicationCount2);

CreateTopicsResult result = admin.createTopics(

Arrays.asList(topic, topic2)

);

//TODO 關閉管理者對象

admin.close();

}

}

?副本分配策略

主題只是邏輯上的分類,只有分區(qū)才能在物理文件上以及存儲中有所體現(xiàn)

我們知道我們是使用多個副本冗余來提高數(shù)據(jù)的可靠性的

那么副本在節(jié)點中又是如何分配的呢,咱們接下來慢慢說

先說理想的情況

我們知道副本也分為leader和follower

我們這里的均衡指的是leader的分布應該是均勻的

我們先說理想情況下

我們希望每個節(jié)點的leader數(shù)量都是相近的

實際上kafka并不是這樣的

因為副本的創(chuàng)建是有順序的,我們無法再一開始就預測浩好這里的副本分配

kafka是采用一個簡單的分配算法來進行的副本分配

例子

注:我們也可以自己手動分配

一個重要的名詞? ISR? in-sync-Replication? ?就是同步副本列表的意思

主題創(chuàng)建流程

大概就是先問一下controller在哪,通過controller來創(chuàng)建topic

但是底層有很多生產(chǎn)者消費者模型

這里的具體操作由apis接口來實現(xiàn)

生產(chǎn)數(shù)據(jù)

一般主題都是提前創(chuàng)建好的,如果使用自動創(chuàng)建的話很可能導致IO熱點問題

因為副本的leader都在同一個節(jié)點上

具體流程如下圖

生產(chǎn)者數(shù)據(jù)先通過攔截器的攔截,然后去元數(shù)據(jù)區(qū)獲取controller的信息,然后進行序列化(因為是通過網(wǎng)絡傳輸?shù)臄?shù)據(jù)),在通過分區(qū)器確定分區(qū),最后加入緩沖區(qū)等待發(fā)送

注:攔截器是對數(shù)據(jù)進行了一些規(guī)范化的處理,但是出現(xiàn)錯誤之后不會導致程序的停止,不影響數(shù)據(jù)的發(fā)送,捕捉到異常也不會進行處理

然后通過發(fā)送線程繼續(xù)發(fā)送數(shù)據(jù),這里的在途請求緩沖區(qū)的大小表示一個節(jié)點在同一時間最多處理的請求數(shù)量,默認是5,這是經(jīng)過壓力測試的,這樣性能最優(yōu)

分區(qū)器

我們剛剛看到數(shù)據(jù)會經(jīng)過分區(qū)器處理來知道放到哪個分區(qū),下面我們介紹一下分區(qū)器是怎么工作的,分區(qū)器是從元數(shù)據(jù)區(qū)獲取到主題信息再開始計算分區(qū)的,注意這里根據(jù)的主題信息直接指定分區(qū)的話是不會做校驗而是直接使用的

算法

分區(qū)算法,將key使用散列算法后和分區(qū)數(shù)進行一次取模運算,在寫入數(shù)據(jù)收集器的時候,就需要進行處理了,如果當前主題分區(qū)是未知分區(qū),就會根據(jù)當前主題分區(qū)的負載情況動態(tài)進行分區(qū)(粘性分區(qū)策略)如果當前發(fā)送的時候沒有分區(qū)負載情況,這時候就是隨機選擇的,選擇以后就盡可能向這里添加,超過閾值就會切換另外一個分區(qū),閾值默認是16k,后面不為空之后就會根據(jù)每個分區(qū)的負載情況生成一個隨機的權重,然后通過一個二分查找找到一個和這個值相近的,然后算出來分區(qū)編號

緩沖區(qū)

緩沖區(qū)中對數(shù)據(jù)的追加是只要批次大小足夠,沒到達閾值,直接向后追加即可

批次對象空間不足 將滿了的批次對象鎖定并關閉,等著sender線程來拿,然后重新開一個批次對象來追加這里的數(shù)據(jù) 數(shù)據(jù)是可以超過16k的,比如60k的數(shù)據(jù),直接裝,然后關閉準備發(fā)車,是不可以拆開的

sender發(fā)送線程

會將符合發(fā)送條件的數(shù)據(jù)重新進行整合,前面是因為相同主題的不同分區(qū)可能在不同的broker中,但是不同主題的分區(qū)可能在相同的broker中,用topic進行區(qū)分效率更高一點

批次對象到達大小或者是時間閾值之后就會被發(fā)送

應答機制 ACKS

本質(zhì)上是使用異步的方式

發(fā)送數(shù)據(jù)無需等待應答以后再繼續(xù)發(fā)送

這樣數(shù)據(jù)的發(fā)送效率高了,但是安全性無法保證

Kafka就面對不同的場景給出了三個ACKS處理等級

分別是 0 1? all

0就是優(yōu)先考慮效率

all就是優(yōu)先考慮數(shù)據(jù)的安全性

1就是兩者之間的折中考慮

ACKS = 0

表示只是將數(shù)據(jù)放到網(wǎng)絡中了,根本不關心其是否發(fā)送完成,直到放到網(wǎng)絡中就給main線程發(fā)送一個應答

ACKS等級為1的時候是需要數(shù)據(jù)在leader中進行保存到文件中之后才能應答

all等級是等待數(shù)據(jù)進行備份之后才進行對應的應答

retry重試機制

我們知道數(shù)據(jù)既然是在網(wǎng)絡中傳輸?shù)?那么數(shù)據(jù)丟包是很正常的,假設網(wǎng)絡不穩(wěn)定等等情況就很容易導致數(shù)據(jù)的丟包等等

我們這時候和tcp協(xié)議一樣定義了數(shù)據(jù)的重傳機制

只要主線程沒有收到acks,到達一定的超時時間,這時候就會將數(shù)據(jù)再次放回緩沖區(qū)重新進行一次發(fā)送

但是這也會導致一定的問題,比如數(shù)據(jù)重復多次或者是數(shù)據(jù)亂序問題

好處是讓數(shù)據(jù)更安全,但是也有壞處

數(shù)據(jù)重復:

假設這里leader寫入了磁盤了,但是傳ack的時候網(wǎng)絡不穩(wěn)定,沒發(fā)成,這里就會再傳一次 這里數(shù)據(jù)就在文件中放了兩次

數(shù)據(jù)亂序:

還有一個問題就是數(shù)據(jù)的順序問題,發(fā)生順序是 a b c 但是可能 a發(fā)送失敗重發(fā)了 結果就是b c a 的順序了

這在某種情況下不是我們想看到的,于是我們又引入了冪等性操作和事務的概念

冪等性

冪等性要求數(shù)據(jù)的ACKS等級一定是all或者-1? (這倆等級一個意思),并且必須開啟retry ,并且要求在途請求緩沖區(qū)的數(shù)量必須小于等于5

實現(xiàn)?

就是給數(shù)據(jù)標上生產(chǎn)者編號,標上數(shù)據(jù)序號,但是注意這里的冪等性不可以跨越客戶端或者是跨越分區(qū)來起作用

這里的冪等性只是在同一個分區(qū)內(nèi)的冪等

數(shù)據(jù)在發(fā)送給Kafka的時候,kafka會記錄生產(chǎn)者的狀態(tài)

重復是靠在加入在途數(shù)據(jù)緩沖區(qū)的時候判斷一下contains

有序是按照序號來的,下一個加入的數(shù)據(jù)必須大于當前的最后一個數(shù)據(jù)

缺陷:

只能保證一個分區(qū)的數(shù)據(jù)是有序且不重復的

但是如果這時候生產(chǎn)者重啟了,此時仍然會導致數(shù)據(jù)的重復

這就要通過下面的事務來完成對這個缺點的補充了

事務

這里的事務是基于冪等性的,和數(shù)據(jù)庫中的事務完全不是一個意思

基本原理是保證生產(chǎn)者id重啟前后不會改變

執(zhí)行順序如下

注:事務這里的發(fā)送數(shù)據(jù)不是通過send方法進行發(fā)送,而是commit才會發(fā)送,send只是將數(shù)據(jù)放到緩沖區(qū)

柚子快報邀請碼778899分享:分布式 Kafka基礎 (上)

http://yzkb.51969.com/

參考閱讀

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權,聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19240922.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄