柚子快報(bào)邀請碼778899分享:筆記 Kafka初學(xué)
柚子快報(bào)邀請碼778899分享:筆記 Kafka初學(xué)
Kafka3.x 學(xué)習(xí)
? 鍥而舍之,朽木不折;鍥而不舍,金石可鏤。 —— 荀況
觀看b站千鋒教育Kafka教學(xué)視頻:https://www.bilibili.com/video/BV1Xy4y1G7zA?p=1&vd_source=f5ed15a716a0d2394ab18fcc53eac495
3.20
1、消息隊(duì)列的兩種模式
1)點(diǎn)對點(diǎn)模式
2)發(fā)布/訂閱模式
2、Kafka 基礎(chǔ)架構(gòu)
3、Kafka 安裝
3.1 安裝部署
3.1.1 集群規(guī)劃
hadoop102hadoop103hadoop103zkzkzkkafkakafkakafka
2、集群部署
Kafka下載
3.21
1、單播消息
啟動 Kafka 服務(wù)器
bin/kafka-server-start.sh config/kraft/server.properties
創(chuàng)建 topic=test
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
生產(chǎn)者
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
消費(fèi)者組 group.id=testGroup1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test
同一組同一主題的消費(fèi)者只有一個(gè)能收到消息
2、多播消息
消費(fèi)者組 group.id=testGroup1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test
消費(fèi)者組 group.id=testGroup2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test
不同組同一主題的消費(fèi)者都能接受消息
3、查看消費(fèi)組及信息
# 查看當(dāng)前主題下有哪些消費(fèi)者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
#查看消費(fèi)組中的具體信息:比如當(dāng)前偏移量、最后一條消息的偏移量、堆積的消息數(shù)量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup1
4、主題和分區(qū)的概念
4.1 主題 topic
主題 topic 可以理解成一個(gè)類別的名稱
4.2 分區(qū) partition
創(chuàng)建分區(qū)
bin/kafka-topic.sh --create --bootstrap-server localhost:9092 --partitions 2 --topic test
查看 topic 的分區(qū)信息
bin/kafka-topic.sh --describe --bootstrap-server localhost:9092 --topic test
3.22
1、搭建 kafka 集群(三個(gè) broker)
# 進(jìn)入bin目錄啟動三臺服務(wù)器
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
基于 kraft 搭建集群,修改 kraft 下的 server.properities
配置 server1,server2 其余兩個(gè)節(jié)點(diǎn)配置除了 node.id 不同,其他配置都相同。
去 zookeeper 檢查是否啟動成功或者直接查詢進(jìn)程
# 啟動zookeeper客戶端
./zkCli.sh
# 查看brokers
ls /brokers/ids
ps -aux | grep server.properties
ps -aux | grep server1.properties
ps -aux | grep server2.properties
2、副本的概念
副本是對分區(qū)的備份。在集群中,不同的副本會被部署在不同的 broker 上。下面例子:創(chuàng)建 1 個(gè)主題, 2 個(gè)分區(qū)、 3 個(gè)副本。
# 查看所有topic
./kafka-topics.sh --bootstrap-server 192.168.163.130:9092 --list
# 創(chuàng)建 1個(gè)主題, 2 個(gè)分區(qū)、 3 個(gè)副本
./kafka-topics.sh --create --bootstrap-server 192.168.163.130:9092 --replication-factor 3 --partitions 2 --topic my-replicated-topic
#查看特定topic
# 查看topic情況
./kafka-topics.sh --describe --bootstrap-server 192.168.163.130:9092 --topic my-replicated-topic
isr: 可以同步的 broker 節(jié)點(diǎn)和已同步的 broker 節(jié)點(diǎn),存放在 isr 集合中。
3.broker、主題、分區(qū)、副本
kafka 集群中由多個(gè) broker 組成一個(gè) broker 中存放一個(gè) topic 的不同 partition——副本
4.kafka 集群消息的發(fā)送
./kafka-console-producer.sh --broker-list 192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094 --topic my-replicated-topic
5.kafka 集群消息的消費(fèi)
./kafka-console-consumer.sh --bootstrap-server 192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094 --from-beginning --topic my-replicated-topic
6.關(guān)于分區(qū)消費(fèi)組消費(fèi)者的細(xì)節(jié)
消費(fèi)組中消費(fèi)者的數(shù)量不能比一個(gè) topic 中的 partition 數(shù)量多,否則多出來的消費(fèi)者消費(fèi)不到消息。
3.25
1、Kafka 的 Java 客戶端-生產(chǎn)者
1.1.引入依賴
2.生產(chǎn)者發(fā)送消息的基本實(shí)現(xiàn)
public class MySimpleProducer {
private final static String _TOPIC_NAME _= "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig._BOOTSTRAP_SERVERS_CONFIG_, "192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094");
//把發(fā)送的key從字符串序列化為字節(jié)數(shù)組
props.put(ProducerConfig._KEY_SERIALIZER_CLASS_CONFIG_, StringSerializer.class.getName());
//把發(fā)送消息value從字符串序列化為字節(jié)數(shù)組
props.put(ProducerConfig._VALUE_SERIALIZER_CLASS_CONFIG_, StringSerializer.class.getName());
Producer
ProducerRecord
//發(fā)送消息
RecordMetadata metadata = producer.send(producerRecord).get();
System._out_.println("同步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
3.發(fā)送消息到指定分區(qū)上
ProducerRecord
4.未指定分區(qū),則會通過業(yè)務(wù) key 的 hash 運(yùn)算,算出消息往哪個(gè)分區(qū)上發(fā)
ProducerRecord
5.同步發(fā)送
生產(chǎn)者同步發(fā)消息,在收到 kafka 的 ack 告知發(fā)送成功之前一直處于阻塞狀態(tài)
//等待消息發(fā)送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式發(fā)送消息結(jié)果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());
6.異步發(fā)送
生產(chǎn)者發(fā)消息,發(fā)送完后不用等待 broker 給回復(fù),直接執(zhí)行下面的業(yè)務(wù)邏輯??梢蕴峁?callback,讓 broker 異步的調(diào)用 callback,告知生產(chǎn)者,消息發(fā)送的結(jié)果
//異步發(fā)送消息
producer.send(producerRecord, new Callback(){
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null){
System._out_.println("發(fā)送消息失敗" + e.getStackTrace());
}
if (metadata != null) {
System._out_.println("異步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
7.關(guān)于生產(chǎn)者的 ack 參數(shù)配置
在同步發(fā)消息的場景下:生產(chǎn)者發(fā)動 broker 上后,ack 會有 3 種不同的選擇:
acks=0: 表示 producer 不需要等待任何 broker 確認(rèn)收到消息的回復(fù),就可以繼續(xù)發(fā)送下一條消息。性能最高,但是最容易丟消息。acks=1: 至少要等待 leader 已經(jīng)成功將數(shù)據(jù)寫入本地 log,但是不需要等待所有 follower 是否成功寫入。就可以繼續(xù)發(fā)送下一條消息。這種情況下,如果 follower 沒有成功備份數(shù)據(jù),而此時(shí) leader 又掛掉,則消息會丟失。acks=-1 或 all: 需要等待 min.insync.replicas(默認(rèn)為 1 ,推薦配置大于等于 2) 這個(gè)參數(shù)配置的副本個(gè)數(shù)都成功寫入日志,這種策略會保證只要有一個(gè)備份存活就不會丟失數(shù)據(jù)。這是最強(qiáng)的數(shù)據(jù)保證。一般除非是金融級別,或跟錢打交道的場景才會使用這種配置。
下面是關(guān)于 ack 重試的配置
// 發(fā)送失敗會重試,默認(rèn)重試間隔100ms,重試能保證消息發(fā)送的可靠性,但是也可能造成消息重復(fù)發(fā)送,
// 接收者那邊做好接收消息的冪等性處理
props.put(ProducerConfig._RETRIES_CONFIG_, 3);
// 重試間隔設(shè)置
props.put(ProducerConfig._RETRY_BACKOFF_MS_CONFIG_, 300);
8.發(fā)送區(qū)消息的緩沖機(jī)制
kafka 默認(rèn)會創(chuàng)建一個(gè)消息緩沖區(qū),用來存放要發(fā)送的消息,緩沖區(qū)是 32M
props.put(ProducerConfig._BUFFER_MEMORY_CONFIG_, 33554432);
kafka 本地線程會去緩沖區(qū)一次拉 16K 的數(shù)據(jù),發(fā)送到 broker
props.put(ProducerConfig._BATCH_SIZE_CONFIG_, 16384);
如果線程拉不到 16K 的數(shù)據(jù),間隔 10ms 會將已拉到的數(shù)據(jù)發(fā)到 broker
props.put(ProducerConfig._LINGER_MS_CONFIG_, 10);
CountDownLatch 計(jì)數(shù),異步調(diào)用發(fā)完所有消息后主線程才結(jié)束
CountDownLatch countDownLatch = new CountDownLatch(msgNum);
//異步發(fā)送消息
countDownLatch.countDown();
countDownLatch.await(5, TimeUnit._SECONDS_);
3.26
1.消費(fèi)者消費(fèi)消息的基本實(shí)現(xiàn)
public class MyConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
// 消費(fèi)分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//創(chuàng)建一個(gè)消費(fèi)者的客戶端
KafkaConsumer
// 消費(fèi)者訂閱主題列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* poll() API 是拉取消息的?輪詢
*/
ConsumerRecords
for (ConsumerRecord
System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
}
}
}
}
2.自動提交 offset
3.手動提交 offset
設(shè)置手動提交參數(shù)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
在消費(fèi)完消息后進(jìn)行手動提交
手動同步提交
if (records.count() > 0 ) {//有消息
// 手動同步提交offset,當(dāng)前線程會阻塞直到offset提交成功
// 一般使用同步提交,因?yàn)樘峤恢笠话阋矝]有什么邏輯代碼了
consumer.commitSync();//會阻塞,直到broker返回ack
}
手動異步提交
if (records.count() > 0 ) {
// 手動異步提交offset,當(dāng)前線程提交offset不會阻塞,可以繼續(xù)處理后面的程序邏輯
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});
}
4、消費(fèi)者 poll 消息的細(xì)節(jié)
消費(fèi)者建立了與 broker 之間的?連接,開始 poll 消息。默認(rèn)一次 poll 500 條消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500 );
如果每隔 1s 內(nèi)沒有 poll 到任何消息,則繼續(xù)去 poll 消息,循環(huán)往復(fù),直到 poll 到消息。如果超出了 1s,則此次?輪詢結(jié)束。
ConsumerRecords
如果一次 poll 到 500 條,就直接執(zhí)行 for如果一次沒有 poll 到 500 條,且時(shí)間在 1s 內(nèi),那么長輪詢繼續(xù) poll,直到 500 條或者 1s如果多次 poll 沒到 500 條,但 1s 時(shí)間到了,則執(zhí)行 for但是 AI 這么說:在 Apache Kafka 中,當(dāng)消費(fèi)者調(diào)用 poll() 方法時(shí),它會嘗試從 Kafka 服務(wù)器拉取最多 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 設(shè)置數(shù)量的消息。如果在一次 poll() 調(diào)用期間,服務(wù)端可用的消息條數(shù)少于配置的最大值,消費(fèi)者并不會等待更多消息到來,而是立刻返回當(dāng)前可用的消息。
如果兩次 poll 的時(shí)間如果超出了 30s 的時(shí)間間隔,kafka 會認(rèn)為其消費(fèi)能力過弱,將其踢出消費(fèi)組。將分區(qū)分配給其他消費(fèi)者??梢酝ㄟ^這個(gè)值進(jìn)行設(shè)置:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000 );
3.29
1、消費(fèi)者健康狀態(tài)檢查
消費(fèi)者發(fā)送心跳的時(shí)間間隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 );
kafka 如果超過 10 秒沒有收到消費(fèi)者的心跳,則會把消費(fèi)者踢出消費(fèi)組,進(jìn)行 rebalance,把分區(qū)分配給其他消費(fèi)者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000 );
2、指定分區(qū)消費(fèi)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
3、消息回溯消費(fèi)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0 )));
4.指定 offset 消費(fèi)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seek(new TopicPartition(TOPIC_NAME, 0 ), 10 );
5.從指定時(shí)間點(diǎn)消費(fèi)
List
//從 1 小時(shí)前開始消費(fèi)
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60 ;
Map
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDataTime);
}
Map
for (Map.Entry
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() +"|offset-" + offset);
System.out.println();
//根據(jù)消費(fèi)里的timestamp確定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
6.新消費(fèi)組的消費(fèi)偏移量
latest(默認(rèn)) :只消費(fèi)自己啟動之后發(fā)送到主題的消息earliest:第一次從頭開始消費(fèi),以后按照消費(fèi) offset 記錄繼續(xù)消費(fèi),這個(gè)需要區(qū)別于 consumer.seekToBeginning(每次都從頭開始消費(fèi))
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
十、Kafka 線上問題優(yōu)化
1.如何防止消息丟失
發(fā)送方: ack 是 1 或者-1/all 可以防止消息丟失,如果要做到 99.9999%,ack 設(shè)成 all,把 min.insync.replicas 配置成分區(qū)備份數(shù)消費(fèi)方:把自動提交改為手動提交。
因網(wǎng)絡(luò)抖動,生產(chǎn)者未收到 ack
一條消息被消費(fèi)者消費(fèi)多次。如果為了消息的不重復(fù)消費(fèi),而把生產(chǎn)端的重試機(jī)制關(guān)閉、消費(fèi)端的手動提交改成自動提交,這樣反而會出現(xiàn)消息丟失,那么可以直接在防治消息丟失的手段上再加上消費(fèi)消息時(shí)的冪等性保證,就能解決消息的重復(fù)消費(fèi)問題。
3.30
Springboot 中使用 Kafka
1、引入依賴
2、配置文件
server:
port: 8080
spring:
kafka:
bootstrap-servers: 172.16.253.21: 9093
producer: # 生產(chǎn)者
retries: 3 # 設(shè)置大于 0 的值,則客戶端會將發(fā)送失敗的記錄重新發(fā)送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# 當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
# RECORD
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
# BATCH
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,距離上次提交時(shí)間大于TIME時(shí)提交
# TIME
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交
# COUNT
# TIME | COUNT 有一個(gè)條件滿足時(shí)提交
# COUNT_TIME
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后, 手動調(diào)用Acknowledgment.acknowledge()后提交
# MANUAL
# 手動調(diào)用Acknowledgment.acknowledge()后立即提交,一般使用這種
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
3、消息生產(chǎn)者
@RestController
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Resource
private KafkaTemplate
@RequestMapping("/send")
public void send() {
kafkaTemplate.send(TOPIC_NAME, 0 , "key", "this is a msg");
}
}
4、消息消費(fèi)者
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
public void listenGroup(ConsumerRecord
String value = record.value();
System.out.println(value);
System.out.println(record);
//手動提交offset
ack.acknowledge();
}
柚子快報(bào)邀請碼778899分享:筆記 Kafka初學(xué)
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。