欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)邀請碼778899分享:筆記 Kafka初學(xué)

柚子快報(bào)邀請碼778899分享:筆記 Kafka初學(xué)

http://yzkb.51969.com/

Kafka3.x 學(xué)習(xí)

? 鍥而舍之,朽木不折;鍥而不舍,金石可鏤。 —— 荀況

觀看b站千鋒教育Kafka教學(xué)視頻:https://www.bilibili.com/video/BV1Xy4y1G7zA?p=1&vd_source=f5ed15a716a0d2394ab18fcc53eac495

3.20

1、消息隊(duì)列的兩種模式

1)點(diǎn)對點(diǎn)模式

2)發(fā)布/訂閱模式

2、Kafka 基礎(chǔ)架構(gòu)

3、Kafka 安裝

3.1 安裝部署

3.1.1 集群規(guī)劃

hadoop102hadoop103hadoop103zkzkzkkafkakafkakafka

2、集群部署

Kafka下載

3.21

1、單播消息

啟動 Kafka 服務(wù)器

bin/kafka-server-start.sh config/kraft/server.properties

創(chuàng)建 topic=test

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

生產(chǎn)者

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

消費(fèi)者組 group.id=testGroup1

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test

同一組同一主題的消費(fèi)者只有一個(gè)能收到消息

2、多播消息

消費(fèi)者組 group.id=testGroup1

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test

消費(fèi)者組 group.id=testGroup2

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test

不同組同一主題的消費(fèi)者都能接受消息

3、查看消費(fèi)組及信息

# 查看當(dāng)前主題下有哪些消費(fèi)者

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

#查看消費(fèi)組中的具體信息:比如當(dāng)前偏移量、最后一條消息的偏移量、堆積的消息數(shù)量

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup1

4、主題和分區(qū)的概念

4.1 主題 topic

主題 topic 可以理解成一個(gè)類別的名稱

4.2 分區(qū) partition

創(chuàng)建分區(qū)

bin/kafka-topic.sh --create --bootstrap-server localhost:9092 --partitions 2 --topic test

查看 topic 的分區(qū)信息

bin/kafka-topic.sh --describe --bootstrap-server localhost:9092 --topic test

3.22

1、搭建 kafka 集群(三個(gè) broker)

# 進(jìn)入bin目錄啟動三臺服務(wù)器

./kafka-server-start.sh -daemon ../config/server.properties

./kafka-server-start.sh -daemon ../config/server1.properties

./kafka-server-start.sh -daemon ../config/server2.properties

基于 kraft 搭建集群,修改 kraft 下的 server.properities

配置 server1,server2 其余兩個(gè)節(jié)點(diǎn)配置除了 node.id 不同,其他配置都相同。

去 zookeeper 檢查是否啟動成功或者直接查詢進(jìn)程

# 啟動zookeeper客戶端

./zkCli.sh

# 查看brokers

ls /brokers/ids

ps -aux | grep server.properties

ps -aux | grep server1.properties

ps -aux | grep server2.properties

2、副本的概念

副本是對分區(qū)的備份。在集群中,不同的副本會被部署在不同的 broker 上。下面例子:創(chuàng)建 1 個(gè)主題, 2 個(gè)分區(qū)、 3 個(gè)副本。

# 查看所有topic

./kafka-topics.sh --bootstrap-server 192.168.163.130:9092 --list

# 創(chuàng)建 1個(gè)主題, 2 個(gè)分區(qū)、 3 個(gè)副本

./kafka-topics.sh --create --bootstrap-server 192.168.163.130:9092 --replication-factor 3 --partitions 2 --topic my-replicated-topic

#查看特定topic

# 查看topic情況

./kafka-topics.sh --describe --bootstrap-server 192.168.163.130:9092 --topic my-replicated-topic

isr: 可以同步的 broker 節(jié)點(diǎn)和已同步的 broker 節(jié)點(diǎn),存放在 isr 集合中。

3.broker、主題、分區(qū)、副本

kafka 集群中由多個(gè) broker 組成一個(gè) broker 中存放一個(gè) topic 的不同 partition——副本

4.kafka 集群消息的發(fā)送

./kafka-console-producer.sh --broker-list 192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094 --topic my-replicated-topic

5.kafka 集群消息的消費(fèi)

./kafka-console-consumer.sh --bootstrap-server 192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094 --from-beginning --topic my-replicated-topic

6.關(guān)于分區(qū)消費(fèi)組消費(fèi)者的細(xì)節(jié)

消費(fèi)組中消費(fèi)者的數(shù)量不能比一個(gè) topic 中的 partition 數(shù)量多,否則多出來的消費(fèi)者消費(fèi)不到消息。

3.25

1、Kafka 的 Java 客戶端-生產(chǎn)者

1.1.引入依賴

org.apache.kafka

kafka-clients

2.4.1

2.生產(chǎn)者發(fā)送消息的基本實(shí)現(xiàn)

public class MySimpleProducer {

private final static String _TOPIC_NAME _= "my-replicated-topic";

public static void main(String[] args) throws ExecutionException, InterruptedException {

Properties props = new Properties();

props.put(ProducerConfig._BOOTSTRAP_SERVERS_CONFIG_, "192.168.163.130:9092,192.168.163.130:9093,192.168.163.130:9094");

//把發(fā)送的key從字符串序列化為字節(jié)數(shù)組

props.put(ProducerConfig._KEY_SERIALIZER_CLASS_CONFIG_, StringSerializer.class.getName());

//把發(fā)送消息value從字符串序列化為字節(jié)數(shù)組

props.put(ProducerConfig._VALUE_SERIALIZER_CLASS_CONFIG_, StringSerializer.class.getName());

Producer producer = new KafkaProducer(props);

ProducerRecord producerRecord = new ProducerRecord(_TOPIC_NAME_, "mykey", "hellokafka");

//發(fā)送消息

RecordMetadata metadata = producer.send(producerRecord).get();

System._out_.println("同步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"

+ metadata.partition() + "|offset-" + metadata.offset());

}

}

3.發(fā)送消息到指定分區(qū)上

ProducerRecord producerRecord = new ProducerRecord(_TOPIC_NAME_, 0, "mykey", "hellokafka");

4.未指定分區(qū),則會通過業(yè)務(wù) key 的 hash 運(yùn)算,算出消息往哪個(gè)分區(qū)上發(fā)

ProducerRecord producerRecord = new ProducerRecord(_TOPIC_NAME_, "mykey", "hellokafka");

5.同步發(fā)送

生產(chǎn)者同步發(fā)消息,在收到 kafka 的 ack 告知發(fā)送成功之前一直處于阻塞狀態(tài)

//等待消息發(fā)送成功的同步阻塞方法

RecordMetadata metadata = producer.send(producerRecord).get();

System.out.println("同步方式發(fā)送消息結(jié)果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());

6.異步發(fā)送

生產(chǎn)者發(fā)消息,發(fā)送完后不用等待 broker 給回復(fù),直接執(zhí)行下面的業(yè)務(wù)邏輯??梢蕴峁?callback,讓 broker 異步的調(diào)用 callback,告知生產(chǎn)者,消息發(fā)送的結(jié)果

//異步發(fā)送消息

producer.send(producerRecord, new Callback(){

@Override

public void onCompletion(RecordMetadata metadata, Exception e) {

if(e != null){

System._out_.println("發(fā)送消息失敗" + e.getStackTrace());

}

if (metadata != null) {

System._out_.println("異步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"

+ metadata.partition() + "|offset-" + metadata.offset());

}

}

});

7.關(guān)于生產(chǎn)者的 ack 參數(shù)配置

在同步發(fā)消息的場景下:生產(chǎn)者發(fā)動 broker 上后,ack 會有 3 種不同的選擇:

acks=0: 表示 producer 不需要等待任何 broker 確認(rèn)收到消息的回復(fù),就可以繼續(xù)發(fā)送下一條消息。性能最高,但是最容易丟消息。acks=1: 至少要等待 leader 已經(jīng)成功將數(shù)據(jù)寫入本地 log,但是不需要等待所有 follower 是否成功寫入。就可以繼續(xù)發(fā)送下一條消息。這種情況下,如果 follower 沒有成功備份數(shù)據(jù),而此時(shí) leader 又掛掉,則消息會丟失。acks=-1 或 all: 需要等待 min.insync.replicas(默認(rèn)為 1 ,推薦配置大于等于 2) 這個(gè)參數(shù)配置的副本個(gè)數(shù)都成功寫入日志,這種策略會保證只要有一個(gè)備份存活就不會丟失數(shù)據(jù)。這是最強(qiáng)的數(shù)據(jù)保證。一般除非是金融級別,或跟錢打交道的場景才會使用這種配置。

下面是關(guān)于 ack 重試的配置

// 發(fā)送失敗會重試,默認(rèn)重試間隔100ms,重試能保證消息發(fā)送的可靠性,但是也可能造成消息重復(fù)發(fā)送,

// 接收者那邊做好接收消息的冪等性處理

props.put(ProducerConfig._RETRIES_CONFIG_, 3);

// 重試間隔設(shè)置

props.put(ProducerConfig._RETRY_BACKOFF_MS_CONFIG_, 300);

8.發(fā)送區(qū)消息的緩沖機(jī)制

kafka 默認(rèn)會創(chuàng)建一個(gè)消息緩沖區(qū),用來存放要發(fā)送的消息,緩沖區(qū)是 32M

props.put(ProducerConfig._BUFFER_MEMORY_CONFIG_, 33554432);

kafka 本地線程會去緩沖區(qū)一次拉 16K 的數(shù)據(jù),發(fā)送到 broker

props.put(ProducerConfig._BATCH_SIZE_CONFIG_, 16384);

如果線程拉不到 16K 的數(shù)據(jù),間隔 10ms 會將已拉到的數(shù)據(jù)發(fā)到 broker

props.put(ProducerConfig._LINGER_MS_CONFIG_, 10);

CountDownLatch 計(jì)數(shù),異步調(diào)用發(fā)完所有消息后主線程才結(jié)束

CountDownLatch countDownLatch = new CountDownLatch(msgNum);

//異步發(fā)送消息

countDownLatch.countDown();

countDownLatch.await(5, TimeUnit._SECONDS_);

3.26

1.消費(fèi)者消費(fèi)消息的基本實(shí)現(xiàn)

public class MyConsumer {

private final static String TOPIC_NAME = "my-replicated-topic";

private final static String CONSUMER_GROUP_NAME = "testGroup";

public static void main(String[] args) {

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");

// 消費(fèi)分組名

props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

//創(chuàng)建一個(gè)消費(fèi)者的客戶端

KafkaConsumer consumer = new KafkaConsumer(props);

// 消費(fèi)者訂閱主題列表

consumer.subscribe(Arrays.asList(TOPIC_NAME));

while (true) {

/*

* poll() API 是拉取消息的?輪詢

*/

ConsumerRecords records =consumer.poll(Duration.ofMillis( 1000 ));

for (ConsumerRecord record : records) {

System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());

}

}

}

}

2.自動提交 offset

3.手動提交 offset

設(shè)置手動提交參數(shù)

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

在消費(fèi)完消息后進(jìn)行手動提交

手動同步提交

if (records.count() > 0 ) {//有消息

// 手動同步提交offset,當(dāng)前線程會阻塞直到offset提交成功

// 一般使用同步提交,因?yàn)樘峤恢笠话阋矝]有什么邏輯代碼了

consumer.commitSync();//會阻塞,直到broker返回ack

}

手動異步提交

if (records.count() > 0 ) {

// 手動異步提交offset,當(dāng)前線程提交offset不會阻塞,可以繼續(xù)處理后面的程序邏輯

consumer.commitAsync(new OffsetCommitCallback() {

@Override

public void onComplete(Mapoffsets, Exception exception) {

if (exception != null) {

System.err.println("Commit failed for " + offsets);

System.err.println("Commit failed exception: " + exception.getStackTrace());

}

}

});

}

4、消費(fèi)者 poll 消息的細(xì)節(jié)

消費(fèi)者建立了與 broker 之間的?連接,開始 poll 消息。默認(rèn)一次 poll 500 條消息

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500 );

如果每隔 1s 內(nèi)沒有 poll 到任何消息,則繼續(xù)去 poll 消息,循環(huán)往復(fù),直到 poll 到消息。如果超出了 1s,則此次?輪詢結(jié)束。

ConsumerRecords records =consumer.poll(Duration.ofMillis(1000));

如果一次 poll 到 500 條,就直接執(zhí)行 for如果一次沒有 poll 到 500 條,且時(shí)間在 1s 內(nèi),那么長輪詢繼續(xù) poll,直到 500 條或者 1s如果多次 poll 沒到 500 條,但 1s 時(shí)間到了,則執(zhí)行 for但是 AI 這么說:在 Apache Kafka 中,當(dāng)消費(fèi)者調(diào)用 poll() 方法時(shí),它會嘗試從 Kafka 服務(wù)器拉取最多 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 設(shè)置數(shù)量的消息。如果在一次 poll() 調(diào)用期間,服務(wù)端可用的消息條數(shù)少于配置的最大值,消費(fèi)者并不會等待更多消息到來,而是立刻返回當(dāng)前可用的消息。

如果兩次 poll 的時(shí)間如果超出了 30s 的時(shí)間間隔,kafka 會認(rèn)為其消費(fèi)能力過弱,將其踢出消費(fèi)組。將分區(qū)分配給其他消費(fèi)者??梢酝ㄟ^這個(gè)值進(jìn)行設(shè)置:

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000 );

3.29

1、消費(fèi)者健康狀態(tài)檢查

消費(fèi)者發(fā)送心跳的時(shí)間間隔

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 );

kafka 如果超過 10 秒沒有收到消費(fèi)者的心跳,則會把消費(fèi)者踢出消費(fèi)組,進(jìn)行 rebalance,把分區(qū)分配給其他消費(fèi)者。

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000 );

2、指定分區(qū)消費(fèi)

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));

3、消息回溯消費(fèi)

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));

consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0 )));

4.指定 offset 消費(fèi)

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));

consumer.seek(new TopicPartition(TOPIC_NAME, 0 ), 10 );

5.從指定時(shí)間點(diǎn)消費(fèi)

List topicPartitions =consumer.partitionsFor(TOPIC_NAME);

//從 1 小時(shí)前開始消費(fèi)

long fetchDataTime = new Date().getTime() - 1000 * 60 * 60 ;

Map map = new HashMap<>();

for (PartitionInfo par : topicPartitions) {

map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDataTime);

}

Map parMap =consumer.offsetsForTimes(map);

for (Map.Entry entry :parMap.entrySet()) {

TopicPartition key = entry.getKey();

OffsetAndTimestamp value = entry.getValue();

if (key == null || value == null) continue;

Long offset = value.offset();

System.out.println("partition-" + key.partition() +"|offset-" + offset);

System.out.println();

//根據(jù)消費(fèi)里的timestamp確定offset

if (value != null) {

consumer.assign(Arrays.asList(key));

consumer.seek(key, offset);

}

}

6.新消費(fèi)組的消費(fèi)偏移量

latest(默認(rèn)) :只消費(fèi)自己啟動之后發(fā)送到主題的消息earliest:第一次從頭開始消費(fèi),以后按照消費(fèi) offset 記錄繼續(xù)消費(fèi),這個(gè)需要區(qū)別于 consumer.seekToBeginning(每次都從頭開始消費(fèi))

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

十、Kafka 線上問題優(yōu)化

1.如何防止消息丟失

發(fā)送方: ack 是 1 或者-1/all 可以防止消息丟失,如果要做到 99.9999%,ack 設(shè)成 all,把 min.insync.replicas 配置成分區(qū)備份數(shù)消費(fèi)方:把自動提交改為手動提交。

因網(wǎng)絡(luò)抖動,生產(chǎn)者未收到 ack

一條消息被消費(fèi)者消費(fèi)多次。如果為了消息的不重復(fù)消費(fèi),而把生產(chǎn)端的重試機(jī)制關(guān)閉、消費(fèi)端的手動提交改成自動提交,這樣反而會出現(xiàn)消息丟失,那么可以直接在防治消息丟失的手段上再加上消費(fèi)消息時(shí)的冪等性保證,就能解決消息的重復(fù)消費(fèi)問題。

3.30

Springboot 中使用 Kafka

1、引入依賴

org.springframework.kafka

spring-kafka

2、配置文件

server:

port: 8080

spring:

kafka:

bootstrap-servers: 172.16.253.21: 9093

producer: # 生產(chǎn)者

retries: 3 # 設(shè)置大于 0 的值,則客戶端會將發(fā)送失敗的記錄重新發(fā)送

batch-size: 16384

buffer-memory: 33554432

acks: 1

# 指定消息key和消息體的編解碼方式

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer:

group-id: default-group

enable-auto-commit: false

auto-offset-reset: earliest

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

max-poll-records: 500

listener:

# 當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交

# RECORD

# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交

# BATCH

# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,距離上次提交時(shí)間大于TIME時(shí)提交

# TIME

# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交

# COUNT

# TIME | COUNT 有一個(gè)條件滿足時(shí)提交

# COUNT_TIME

# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后, 手動調(diào)用Acknowledgment.acknowledge()后提交

# MANUAL

# 手動調(diào)用Acknowledgment.acknowledge()后立即提交,一般使用這種

# MANUAL_IMMEDIATE

ack-mode: MANUAL_IMMEDIATE

3、消息生產(chǎn)者

@RestController

public class KafkaController {

private final static String TOPIC_NAME = "my-replicated-topic";

@Resource

private KafkaTemplate kafkaTemplate;

@RequestMapping("/send")

public void send() {

kafkaTemplate.send(TOPIC_NAME, 0 , "key", "this is a msg");

}

}

4、消息消費(fèi)者

@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")

public void listenGroup(ConsumerRecord record,Acknowledgment ack) {

String value = record.value();

System.out.println(value);

System.out.println(record);

//手動提交offset

ack.acknowledge();

}

柚子快報(bào)邀請碼778899分享:筆記 Kafka初學(xué)

http://yzkb.51969.com/

參考文章

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19072292.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄