柚子快報(bào)邀請(qǐng)碼778899分享:分布式 kafka事務(wù)的詳解
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 kafka事務(wù)的詳解
一 kafka事務(wù)的機(jī)制
1.1 冪等性
Producer 的冪等性指的是當(dāng)發(fā)送同一條消息時(shí),數(shù)據(jù)在 Server 端只會(huì)被持久化一次,數(shù)據(jù)不丟不重,Kafka為了實(shí)現(xiàn)冪等性,底層設(shè)計(jì)架構(gòu)中引入了ProducerID和SequenceNumbe。
當(dāng)Producer發(fā)送消息(x2,y2)給Broker時(shí),Broker接收到消息并將其追加到消息流中。此時(shí),Broker返回Ack信號(hào)給Producer時(shí),發(fā)生異常導(dǎo)致Producer接收Ack信號(hào)失敗。對(duì)于Producer來(lái)說(shuō),會(huì)觸發(fā)重試機(jī)制,將消息(x2,y2)再次發(fā)送,但是,由于引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發(fā)送給Broker,而之前Broker緩存過(guò)之前發(fā)送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會(huì)出現(xiàn)重復(fù)發(fā)送的情況。
缺點(diǎn):Kafka 的 Exactly Once 冪等性只能保證單次會(huì)話內(nèi)的精準(zhǔn)一次性,不能解決跨會(huì)話和跨分區(qū)的問(wèn)題;
1.2?kafka的事務(wù)機(jī)制
1.2.1 事務(wù)的作用
kafka的事務(wù)機(jī)制,是kafka實(shí)現(xiàn)端到端有且僅有一次語(yǔ)義的基礎(chǔ)。Kafka 的 Exactly Once 冪等性只能保證單次會(huì)話內(nèi)的精準(zhǔn)一次性,不能解決跨會(huì)話和跨分區(qū)的問(wèn)題;
Kafka的事務(wù)特性本質(zhì)上是支持了Kafka跨分區(qū)和Topic的原子寫操作。通過(guò)事務(wù)機(jī)制,KAFKA 可以實(shí)現(xiàn)對(duì)多個(gè) topic 的多個(gè) partition 的原子性的寫入,即處于同一個(gè)事務(wù)內(nèi)的所有消息,不管最終需要落地到哪個(gè) topic 的哪個(gè) partition, 最終結(jié)果都是要么全部寫成功,要么全部寫失?。ˋtomic multi-partition writes);開啟事務(wù),必須開啟冪等性,KAFKA的事務(wù)機(jī)制,在底層依賴于冪等生產(chǎn)者。
Kafka的事務(wù)特性就是要確??绶謪^(qū)的多個(gè)寫操作的原子性。
具體的場(chǎng)景包括:Producer多次發(fā)送消息可以封裝成一個(gè)原子性操作,即同時(shí)成功,或者同時(shí)失??;(可以是跨多分區(qū)的寫入)Consumer-Transform-Producer模式下,因?yàn)橄M(fèi)者提交偏移量出現(xiàn)問(wèn)題,導(dǎo)致在重復(fù)消費(fèi)消息時(shí),生產(chǎn)者重復(fù)生產(chǎn)消息。需要將這個(gè)模式下消費(fèi)者提交偏移量操作和生成者一系列生成消息的操作封裝成一個(gè)原子操作。
為支持事務(wù)機(jī)制,KAFKA 引入了兩個(gè)新的組件:Transaction Coordinator 和 Transaction Log,其中 transaction coordinator 是運(yùn)行在每個(gè) kafka broker 上的一個(gè)模塊,是 kafka broker 進(jìn)程承載的新功能之一(不是一個(gè)獨(dú)立的新的進(jìn)程);而 transaction log 是 kakafa 的一個(gè)內(nèi)部 topic;
1.2.2 事務(wù)的原子性
事務(wù)原子性是指 Producer 將多條消息作為一個(gè)事務(wù)批量發(fā)送,要么全部成功要么全部失敗。 引入了一個(gè)服務(wù)器端的模塊,名為Transaction Coordinator,用于管理 Producer 發(fā)送的消息的事務(wù)性。
該Transaction Coordinator維護(hù)Transaction Log,該 log 存于一個(gè)內(nèi)部的 Topic 內(nèi)。由于 Topic 數(shù)據(jù)具有持久性,因此事務(wù)的狀態(tài)也具有持久性。
Producer 并不直接讀寫Transaction Log,它與Transaction Coordinator通信,然后由Transaction Coordinator將該事務(wù)的狀態(tài)插入相應(yīng)的Transaction Log。
Transaction Log的設(shè)計(jì)與Offset Log用于保存 Consumer 的 Offset 類似。
Kafka事務(wù)的回滾,并不是刪除已寫入的數(shù)據(jù),而是將寫入數(shù)據(jù)的事務(wù)標(biāo)記為 Rollback/Abort 從而在讀數(shù)據(jù)時(shí)過(guò)濾該數(shù)據(jù)。
1.2.3?拒絕僵尸實(shí)例
在分布式系統(tǒng)中,一個(gè)instance的宕機(jī)或失聯(lián),集群往往會(huì)自動(dòng)啟動(dòng)一個(gè)新的實(shí)例來(lái)代替它的工作。此時(shí)若原實(shí)例恢復(fù)了,那么集群中就產(chǎn)生了兩個(gè)具有相同職責(zé)的實(shí)例,此時(shí)前一個(gè)instance就被稱為“僵尸實(shí)例(Zombie Instance)”。在Kafka中,兩個(gè)相同的producer同時(shí)處理消息并生產(chǎn)出重復(fù)的消息(read-process-write模式),這樣就嚴(yán)重違反了Exactly Once Processing的語(yǔ)義。這就是僵尸實(shí)例問(wèn)題。
解決辦法:
kafka事務(wù)特性通過(guò)transaction-id屬性來(lái)解決僵尸實(shí)例問(wèn)題。所有具有相同transaction-id的Producer都會(huì)被分配相同的pid,同時(shí)每一個(gè)Producer還會(huì)被分配一個(gè)遞增的epoch。Kafka收到事務(wù)提交請(qǐng)求時(shí),如果檢查當(dāng)前事務(wù)提交者的epoch不是最新的,那么就會(huì)拒絕該P(yáng)roducer的請(qǐng)求。從而達(dá)成拒絕僵尸實(shí)例的目標(biāo)。
1.2.4 開啟事務(wù)的生產(chǎn)者和消費(fèi)者
1)生產(chǎn)者:開啟了事務(wù)的生產(chǎn)者,生產(chǎn)的消息最終還是正常寫到目標(biāo) topic 中,但同時(shí)也會(huì)通過(guò) transaction coordinator 使用兩階段提交協(xié)議,將事務(wù)狀態(tài)標(biāo)記 transaction marker,也就是控制消息 controlBatch,寫到目標(biāo) topic 中,控制消息共有兩種類型 commit 和 abort,分別用來(lái)表征事務(wù)已經(jīng)成功提交或已經(jīng)被成功終止;
2)消費(fèi)者:開啟了事務(wù)的消費(fèi)者,如果配置讀隔離級(jí)別為 read-committed, 在內(nèi)部會(huì)使用存儲(chǔ)在目標(biāo) topic-partition 中的事務(wù)控制消息,來(lái)過(guò)濾掉沒(méi)有提交的消息,包括回滾的消息和尚未提交的消息,從而確保只讀到已提交的事務(wù)的 message;
開啟了事務(wù)的消費(fèi)者,過(guò)濾消息時(shí),KAFKA consumer 不需要跟 transactional coordinator 進(jìn)行 rpc 交互,因?yàn)?topic 中存儲(chǔ)的消息,包括正常的數(shù)據(jù)消息和控制消息,包含了足夠的元數(shù)據(jù)信息來(lái)支持消息過(guò)濾;
3)總結(jié):當(dāng)然 kakfa 的 producer 和 consumer 是解耦的,你也可以使用非 transactional consumer 來(lái)消費(fèi) transactional producer 生產(chǎn)的消息,此時(shí)目標(biāo) topic-partition 中的所有消息都會(huì)被返回,不會(huì)進(jìn)行過(guò)濾,此時(shí)也就丟失了事務(wù) ACID 的支持;
1.3 事務(wù)的api
對(duì)于Producer,需要設(shè)置transactional.id屬性,這個(gè)屬性的作用下文會(huì)提到。設(shè)置了transactional.id屬性后,enable.idempotence屬性會(huì)自動(dòng)設(shè)置為true。
對(duì)于Consumer,需要設(shè)置isolation.level = read_committed,這樣Consumer只會(huì)讀取已經(jīng)提交了事務(wù)的消息。另外,需要設(shè)置enable.auto.commit = false來(lái)關(guān)閉自動(dòng)提交Offset功能。
1.生產(chǎn)者
/**
* 初始化事務(wù)
*/
public void initTransactions();
/**
* 開啟事務(wù)
*/
public void beginTransaction() throws ProducerFencedException ;
/**
* 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量
*/
public void sendOffsetsToTransaction(Map
String consumerGroupId) throws ProducerFencedException ;
/**
* 提交事務(wù)
*/
public void commitTransaction() throws ProducerFencedException;
/**
* 丟棄事務(wù)
*/
public void abortTransaction() throws ProducerFencedException ;
2.Write-process-wirte
KafkaProducer producer = createKafkaProducer(
"bootstrap.servers", "localhost:9092",
"transactional.id”, “my-transactional-id");
producer.initTransactions();
producer.beginTransaction();
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
producer.commitTransaction();
3.Read-process-Write
KafkaProducer producer = createKafkaProducer(
"bootstrap.servers", "localhost:9092",
"transactional.id", "my-transactional-id");
KafkaConsumer consumer = createKafkaConsumer(
"bootstrap.servers", "localhost:9092",
"group.id", "my-group-id",
"isolation.level", "read_committed");
consumer.subscribe(singleton("inputTopic"));
producer.initTransactions();
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(producerRecord(“outputTopic”, record));
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
producer.commitTransaction();
}
1.4 事務(wù)的原理
圖中的 Transaction Coordinator 運(yùn)行在 Kafka 服務(wù)端,下面簡(jiǎn)稱 TC 服務(wù)。 __transaction_state 是 TC 服務(wù)持久化事務(wù)信息的 topic 名稱,下面簡(jiǎn)稱事務(wù) topic。 Producer 向 TC 服務(wù)發(fā)送的 commit 消息,下面簡(jiǎn)稱事務(wù)提交消息。 TC 服務(wù)向分區(qū)發(fā)送的消息,下面簡(jiǎn)稱事務(wù)結(jié)果消息。
一文讀懂 kafka 的事務(wù)機(jī)制 - 知乎
kafka之事務(wù)_kafka 事務(wù)_你的boy_Z的博客-CSDN博客
Kafka事務(wù)解析 - 知乎
https://www.cnblogs.com/xijiu/p/16917741.html
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 kafka事務(wù)的詳解
推薦閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。