柚子快報(bào)邀請碼778899分享:《Kafka權(quán)威指南》讀書筆記
柚子快報(bào)邀請碼778899分享:《Kafka權(quán)威指南》讀書筆記
《Kafka權(quán)威指南》第一、三、四、六章,是重點(diǎn)。可以多看看。
一、 Kafka的組成
kafka是一個(gè)發(fā)布與訂閱消息系統(tǒng)消息:kafka的數(shù)據(jù)單元稱為"消息"??梢园严⒖闯墒菙?shù)據(jù)庫中的一個(gè)"數(shù)據(jù)行"。
消息的key:為key生成一個(gè)一致性散列值(HashCode),然后使用散列值對主題分區(qū)數(shù)進(jìn)行取模,為消息選取分區(qū)。
消息被分批次寫入kafka。
批次:就是一組消息,這些消息屬于同一個(gè)主題和分區(qū)。
主題(topic):kafka的消息,是通過topic來分類的。topic,好比數(shù)據(jù)庫里的表,或者文件系統(tǒng)里的文件夾。
topic,可分為若干個(gè)分區(qū)(partition)。
由于一個(gè)topic一般分為幾個(gè)partition,因此整個(gè)Topic范圍內(nèi)無法保證消息的順序,但可以保證消息在單個(gè)Partition內(nèi)的順序。
生產(chǎn)者(producer):創(chuàng)建消息。 消費(fèi)者(consumer):訂閱讀取消息。
consumer可以訂閱一個(gè)或多個(gè)topic,并按照消息生成的順序去讀取它們。消費(fèi)者通過檢查消息偏移量來區(qū)分已經(jīng)讀取過的消息。
偏移量(offset):是一個(gè)不斷遞增的整數(shù)值,在創(chuàng)建消息時(shí),Kafka會把它添加到消息里。在給定的分區(qū)里,每個(gè)消息的偏移量都是唯一的。 消費(fèi)者群組: 消費(fèi)者群組中,會有一個(gè)或多個(gè)消費(fèi)者讀取同一個(gè)topic。 另外,群組能保證每個(gè)分區(qū)只能被一個(gè)消費(fèi)者使用。 服務(wù)器(broker):Kafka的服務(wù)器broker,接收生產(chǎn)者的消息,為消息設(shè)置偏移量,并提交消息到磁盤保存。broker為消費(fèi)者提供服務(wù),對請求作出響應(yīng),返回已經(jīng)提交到磁盤上的消息。broker是集群的重要組成部分。
為什么選擇kafka?
多個(gè)生產(chǎn)者.多個(gè)消費(fèi)者.
多個(gè)消費(fèi)者,能夠提高消息的處理效率。
基于磁盤的數(shù)據(jù)存儲.伸縮性。
kafka可以靈活地配置broker的個(gè)數(shù),根據(jù)生產(chǎn)環(huán)境的需要進(jìn)行調(diào)整。
高性能.
二、Kafka配置
broker配置
broker.id:服務(wù)器idport:端口號zookeeper.connect:用于保存broker元數(shù)據(jù)的zookeeper地址是通過此配置來指定的。log.dirs:磁盤存放日志的路徑。auto.create.topics.enable:是否自動創(chuàng)建topic。
topic配置
num.partitions:指定topic包含多少個(gè)分區(qū)。log.retention.ms:指定日志保留的時(shí)間。log.retention.bytes:通過保留的消息字節(jié)數(shù)來判斷消息是否過期。message.max.bytes:限制單個(gè)消息的大小。
三、kafka生產(chǎn)者–向Kafka寫入數(shù)據(jù)
kafka生產(chǎn)者組件圖:
圖片/kafka分區(qū)與消費(fèi)者的關(guān)系
kafka生產(chǎn)者有3個(gè)必選屬性:
bootstrap.servers:指定broker的地址清單。建議提供兩個(gè)以上的broker信息,一旦其中一個(gè)宕機(jī),生產(chǎn)者仍然能夠連接上集群。
key.serializer:broker的消息的key和value都是字節(jié)數(shù)組,因此需要序列化。生產(chǎn)者得知道如何把java對象轉(zhuǎn)換成字節(jié)數(shù)組。key.serializer必須一個(gè)實(shí)現(xiàn)了Serializer接口的類,生產(chǎn)者會使用這個(gè)類把鍵對象序列化為字節(jié)數(shù)組。
value.serializer:同上。
生產(chǎn)者發(fā)送消息的三種方式:
發(fā)送并忘記(fire-and forge):生產(chǎn)者把消息發(fā)送給服務(wù)器,但并不關(guān)心它是否正常到達(dá)。生產(chǎn)者會自動嘗試重發(fā),不過使用這種方式有時(shí)候也會丟失一些消息。 同步發(fā)送:send()方法發(fā)送消息,會返回一個(gè)Future對象,調(diào)用get()方法進(jìn)行等待,就可以知道消息是否發(fā)送成功。 異步發(fā)送:調(diào)用send()方法,并指定一個(gè)回調(diào)函數(shù),服務(wù)器在返回響應(yīng)時(shí)調(diào)用該函數(shù)。
生產(chǎn)者使用多線程:
生產(chǎn)者是可以使用多線程來發(fā)送消息的。如果需要更高的吞吐量,可以在生產(chǎn)者數(shù)量不變的前提下增加線程數(shù)量。如果這樣做還不夠,可以增加生產(chǎn)者數(shù)量。
生產(chǎn)者配置:
key.serializer 用于 key 鍵的序列化,它實(shí)現(xiàn)了 org.apache.kafka.common.serialization.Serializer 接口 value.serializer 用于 value 值的序列化,實(shí)現(xiàn)了 org.apache.kafka.common.serialization.Serializer 接口 acks acks 參數(shù)指定了要有多少個(gè)分區(qū)副本接收消息,生產(chǎn)者才認(rèn)為消息是寫入成功的。此參數(shù)對消息丟失的影響較大
如果 acks = 0,就表示生產(chǎn)者也不知道自己產(chǎn)生的消息是否被服務(wù)器接收了,它才知道它寫成功了。如果發(fā)送的途中產(chǎn)生了錯(cuò)誤,生產(chǎn)者也不知道,它也比較懵逼,因?yàn)闆]有返回任何消息。這就類似于 UDP 的運(yùn)輸層協(xié)議,只管發(fā),服務(wù)器接受不接受它也不關(guān)心。
如果 acks = 1,只要集群的 Leader 接收到消息,就會給生產(chǎn)者返回一條消息,告訴它寫入成功。如果發(fā)送途中造成了網(wǎng)絡(luò)異常或者 Leader 還沒選舉出來等其他情況導(dǎo)致消息寫入失敗,生產(chǎn)者會受到錯(cuò)誤消息,這時(shí)候生產(chǎn)者往往會再次重發(fā)數(shù)據(jù)。因?yàn)橄⒌陌l(fā)送也分為 同步 和 異步,Kafka 為了保證消息的高效傳輸會決定是同步發(fā)送還是異步發(fā)送。如果讓客戶端等待服務(wù)器的響應(yīng)(通過調(diào)用 Future 中的 get() 方法),顯然會增加延遲,如果客戶端使用回調(diào),就會解決這個(gè)問題。
如果 acks = all,這種情況下是只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)都收到消息時(shí),生產(chǎn)者才會接收到一個(gè)來自服務(wù)器的消息。不過,它的延遲比 acks =1 時(shí)更高,因?yàn)槲覀円却恢灰粋€(gè)服務(wù)器節(jié)點(diǎn)接收消息。
buffer.memory 此參數(shù)用來設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息。如果應(yīng)用程序發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,會導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候,send() 方法調(diào)用要么被阻塞,要么拋出異常,具體取決于 block.on.buffer.null 參數(shù)的設(shè)置。 compression.type 此參數(shù)來表示生產(chǎn)者啟用何種壓縮算法,默認(rèn)情況下,消息發(fā)送時(shí)不會被壓縮。該參數(shù)可以設(shè)置為 snappy、gzip 和 lz4,它指定了消息發(fā)送給 broker 之前使用哪一種壓縮算法進(jìn)行壓縮。下面是各壓縮算法的對比 retries 生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng)),在這種情況下,reteis 參數(shù)的值決定了生產(chǎn)者可以重發(fā)的消息次數(shù),如果達(dá)到這個(gè)次數(shù),生產(chǎn)者會放棄重試并返回錯(cuò)誤。默認(rèn)情況下,生產(chǎn)者在每次重試之間等待 100ms,這個(gè)等待參數(shù)可以通過 retry.backoff.ms 進(jìn)行修改。 batch.size 當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。當(dāng)批次被填滿,批次里的所有消息會被發(fā)送出去。不過生產(chǎn)者井不一定都會等到批次被填滿才發(fā)送,任意條數(shù)的消息都可能被發(fā)送。 client.id 此參數(shù)可以是任意的字符串,服務(wù)器會用它來識別消息的來源,一般配置在日志里。 max.in.flight.requests.per.connection 此參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少消息,它的值越高,就會占用越多的內(nèi)存,不過也會提高吞吐量。把它設(shè)為1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器。 timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms 指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回的響應(yīng)時(shí)間,metadata.fetch.timeout.ms 指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標(biāo)分區(qū)的首領(lǐng)是誰)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間。如果等待時(shí)間超時(shí),生產(chǎn)者要么重試發(fā)送數(shù)據(jù),要么返回一個(gè)錯(cuò)誤。timeout.ms 指定了 broker 等待同步副本返回消息確認(rèn)的時(shí)間,與 asks 的配置相匹配----如果在指定時(shí)間內(nèi)沒有收到同步副本的確認(rèn),那么 broker 就會返回一個(gè)錯(cuò)誤。
max.block.ms 此參數(shù)指定了在調(diào)用 send() 方法或使用 partitionFor() 方法獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞時(shí)間當(dāng)生產(chǎn)者的發(fā)送緩沖區(qū)已捕,或者沒有可用的元數(shù)據(jù)時(shí),這些方法就會阻塞。在阻塞時(shí)間達(dá)到 max.block.ms 時(shí),生產(chǎn)者會拋出超時(shí)異常。 max.request.size 該參數(shù)用于控制生產(chǎn)者發(fā)送的請求大小。它可以指能發(fā)送的單個(gè)消息的最大值,也可以指單個(gè)請求里所有消息的總大小。 receive.buffer.bytes 和 send.buffer.bytes Kafka 是基于 TCP實(shí)現(xiàn)的,為了保證可靠的消息傳輸,這兩個(gè)參數(shù)分別指定了 TCP Socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)的大小。如果它們被設(shè)置為 -1,就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心,那么可以適當(dāng)增大這些值。
生產(chǎn)者發(fā)送消息的順序保證
kafka可以保證同一個(gè)分區(qū)里的消息是有序的。也就是說生產(chǎn)者按照一定的順序發(fā)送消息,broker就會按照這個(gè)順序把它們寫入分區(qū),消費(fèi)者也會按照順序讀取它們。 如果把retries設(shè)為非零整數(shù),同時(shí)把max.in.flight.requests.per.connection設(shè)為比1大的值,那么如果第一個(gè)批次消息寫入失敗,而第二個(gè)批次寫入成功,broker會重試寫入第一個(gè)批次。如果此時(shí)第一個(gè)批次也寫入成功,那么兩個(gè)批次的順序就反過來了。 可以把max.in.flight.requests.per.connection設(shè)為1,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時(shí),就不會有其他的消息發(fā)送給broker。對消息的順序有嚴(yán)格要求的情況下才能這么做,不然會影響吞吐量。
四、Kafka消費(fèi)者–從Kafka讀取消息
分區(qū)和消費(fèi)者的關(guān)系
為什么kafka每個(gè)Partition,只能被消費(fèi)者群組中的一個(gè)消費(fèi)者消費(fèi)?
假設(shè)群組內(nèi)的多個(gè)消費(fèi)者負(fù)責(zé)同一個(gè)分區(qū),那么會有什么問題呢?
我們知道,Kafka它在設(shè)計(jì)的時(shí)候就是要保證分區(qū)下消息的順序,也就是說消息在一個(gè)分區(qū)中的順序是怎樣的,那么消費(fèi)者在消費(fèi)的時(shí)候看到的就是什么樣的順序,那么要做到這一點(diǎn)就首先要保證消息是由消費(fèi)者主動拉取的(pull),其次還要保證一個(gè)分區(qū)只能由一個(gè)消費(fèi)者負(fù)責(zé)。倘若,兩個(gè)消費(fèi)者負(fù)責(zé)同一個(gè)分區(qū),那么就意味著兩個(gè)消費(fèi)者同時(shí)讀取分區(qū)的消息,由于消費(fèi)者自己可以控制讀取消息的offset,就有可能C1才讀到2,而C1讀到1,C1還沒處理完,C2已經(jīng)讀到3了,則會造成很多浪費(fèi),因?yàn)檫@就相當(dāng)于多線程讀取同一個(gè)消息,會造成消息處理的重復(fù),且不能保證消息的順序,這就跟主動推送(push)無異。
假如兩個(gè)不同群組的消費(fèi)者,分別去消費(fèi)同一個(gè)分區(qū),那么分區(qū)消息上的偏移量會怎么變化?這兩個(gè)消費(fèi)者,最終能讀取到相同的消息么?
不同的消費(fèi)者群組可以從同一主題獲取所有的消息,消費(fèi)者群組G1和消費(fèi)者群組G2之間互不影響。
多個(gè)應(yīng)用程序可以從同一主題獲取到所有的消息。
只要保證每個(gè)訂閱的應(yīng)用程序都有自己的不同的消費(fèi)者群組,每個(gè)訂閱的應(yīng)用程序都可以從同一主題獲取所有的消息,而不只是其中的一部分。
輪詢
輪詢是消費(fèi)者api的核心。通過一個(gè)簡單的輪詢向服務(wù)器請求數(shù)據(jù),一旦消費(fèi)者訂閱了主題,輪詢就會處理群組協(xié)調(diào)、分區(qū)再均衡、發(fā)送心跳、獲取數(shù)據(jù)。
其他
內(nèi)存映射
概括:用戶空間的一段內(nèi)存區(qū)域映射到內(nèi)核空間,這樣,無論是內(nèi)核空間或用戶空間對這段內(nèi)存區(qū)域的修改,都可以直接映射到另一個(gè)區(qū)域。 優(yōu)勢:如果內(nèi)核態(tài)和用戶態(tài)存在大量的數(shù)據(jù)傳輸,效率是非常高的。 為什么會提高效率:概括來講,傳統(tǒng)方式為read()系統(tǒng)調(diào)用,進(jìn)行了兩次數(shù)據(jù)拷貝;內(nèi)存映射方式為mmap()系統(tǒng)調(diào)用,只進(jìn)行一次數(shù)據(jù)拷貝
零拷貝方式: 如何讓數(shù)據(jù)不經(jīng)過用戶空間?零拷貝省略了拷貝到用戶緩沖的步驟,通過文件描述符,直接從內(nèi)核空間將數(shù)據(jù)復(fù)制到網(wǎng)卡接口。
柚子快報(bào)邀請碼778899分享:《Kafka權(quán)威指南》讀書筆記
精彩內(nèi)容
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。