当前观点:一碰就头疼的 Kafka 消息重复问题,立马解决!
文章来源:https://juejin.cn/post/7172897190627508237
一、前言
(资料图片仅供参考)
数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。
通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。
整理下消息重复的几个场景:
生产端:遇到异常,基本解决措施都是 重试。
场景一:leader
分区不可用了,抛 LeaderNotAvailableException
异常,等待选出新 leader
分区。 场景二: Controller
所在 Broker
挂了,抛 NotControllerException
异常,等待 Controller
重新选举。 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException
异常,等待网络恢复。 消费端:poll
一批数据,处理完毕还没提交 offset
,机子宕机重启了,又会 poll
上批数据,再度消费就造成了消息重复。
怎么解决?
先来了解下消息的三种投递语义:
最多一次(at most once
):消息只发一次,消息可能会丢失,但绝不会被重复发送。例如: mqtt
中 QoS = 0
。 至少一次(at least once
):消息至少发一次,消息不会丢失,但有可能被重复发送。例如: mqtt
中 QoS = 1
精确一次(exactly once
):消息精确发一次,消息不会丢失,也不会被重复发送。例如: mqtt
中 QoS = 2
。 了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:
Kafka
幂等性 Producer
:保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话) Kafka
事务:保证生产端发送消息幂等。解决幂等 Producer
的局限性。 消费端幂等:保证消费端接收消息幂等。蔸底方案。 1)Kafka
幂等性 Producer
幂等性指:无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。
幂等性使用示例:在生产端添加对应配置即可
Properties props = newProperties();props.put(\"enable.idempotence\", ture); // 1. 设置幂等props.put(\"acks\", \"all\"); // 2. 当 enable.idempotence 为 true,这里默认为 allprops.put(\"max.in.flight.requests.per.connection\", 5); // 3. 注意
设置幂等,启动幂等。 配置 acks
,注意:一定要设置 acks=all
,否则会抛异常。 配置 max.in.flight.requests.per.connection
需要 <= 5
,否则会抛异常 OutOfOrderSequenceException
。
0.11 >= Kafka < 1.1
, max.in.flight.request.per.connection = 1
Kafka >= 1.1
, max.in.flight.request.per.connection <= 5
为了更好理解,需要了解下 Kafka 幂等机制:
Producer
每次启动后,会向 Broker
申请一个全局唯一的 pid
。(重启后 pid
会变化,这也是弊端之一) Sequence Numbe
:针对每个
都对应一个从0开始单调递增的 Sequence
,同时 Broker
端会缓存这个 seq num
判断是否重复:拿
去 Broker
里对应的队列 ProducerStateEntry.Queue
(默认队列长度为 5)查询是否存在
如果 nextSeq == lastSeq + 1
,即 服务端seq + 1 == 生产传入seq
,则接收。
如果 nextSeq == 0 && lastSeq == Int.MaxValue
,即刚初始化,也接收。
反之,要么重复,要么丢消息,均拒绝。
这种设计针对解决了两个问题:
消息重复:场景Broker
保存消息后还没发送 ack
就宕机了,这时候 Producer
就会重试,这就造成消息重复。 消息乱序:避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。 那什么时候该使用幂等:
如果已经使用acks=all
,使用幂等也可以。 如果已经使用 acks=0
或者 acks=1
,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。
2)Kafka
事务
使用
Kafka
事务解决幂等的弊端:单会话且单分区幂等。
Tips
:这块篇幅较长,这先稍微提及下使用,之后另起一篇。
事务使用示例:分为生产端 和 消费端
Properties props = newProperties();props.put(\"enable.idempotence\", ture); // 1. 设置幂等props.put(\"acks\", \"all\"); // 2. 当 enable.idempotence 为 true,这里默认为 allprops.put(\"max.in.flight.requests.per.connection\", 5); // 3. 最大等待数props.put(\"transactional.id\", \"my-transactional-id\"); // 4. 设定事务 idProducer producer = newKafkaProducer(props);// 初始化事务producer.initTransactions();try{// 开始事务producer.beginTransaction();// 发送数据producer.send(newProducerRecord(\"Topic\", \"Key\", \"Value\"));// 数据发送及 Offset 发送均成功的情况下,提交事务producer.commitTransaction();} catch(ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 数据发送或者 Offset 发送出现异常时,终止事务producer.abortTransaction();} finally{// 关闭 Producer 和 Consumerproducer.close();consumer.close();}
这里消费端 Consumer
需要设置下配置:isolation.level
参数
read_uncommitted
:这是默认值,表明 Consumer
能够读取到 Kafka
写入的任何消息,不论事务型 Producer
提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer
,那么对应的 Consumer
就不要使用这个值。
read_committed
:表明 Consumer
只会读取事务型 Producer
成功提交事务写入的消息。当然了,它也能看到非事务型 Producer
写入的所有消息。 3)消费端幂等
“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。
只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。
典型的方案是使用:消息表,来去重:
上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id
新增到本地消息表中,同时更新订单信息。
如果消息重复,则新增操作 insert
会异常,同时触发事务回滚。
二、案例:
Kafka 幂等性 Producer 使用
环境搭建可参考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic
准备工作如下:
1、Zookeeper
:本地使用 Docker
启动 $docker run -d --name zookeeper -p 2181:2181 zookeepera86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
2、Kafka
:版本 2.7.1
,源码编译启动(看上文源码搭建启动) 3、启动生产者: Kafka
源码中 exmaple
中 4、启动消息者:可以用 Kafka
提供的脚本
#举个栗子:topic 需要自己去修改$cd./kafka-2.7.1-src/bin$./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
创建 topic
:1副本,2 分区
$./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2#查看$./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
生产者代码:
publicclassKafkaProducerApplication {privatefinal Producer producer;final StringoutTopic;publicKafkaProducerApplication(final Producer producer,final Stringtopic) {this.producer = producer;outTopic = topic;}publicvoidproduce(final Stringmessage) {final String[] parts = message.split(\"-\");final Stringkey, value;if(parts.length > 1) {key = parts[0];value = parts[1];} else{key = null;value = parts[0];}final ProducerRecord producerRecord = newProducerRecord<>(outTopic, key, value);producer.send(producerRecord,(recordMetadata, e) -> {if(e != null) {e.printStackTrace();} else{System.out.println(\"key/value \"+ key + \"/\"+ value + \"\twritten to topic[partition] \"+ recordMetadata.topic() + \"[\"+ recordMetadata.partition() + \"] at offset \"+ recordMetadata.offset());}});}publicvoidshutdown() {producer.close();}publicstaticvoidmain(String[] args) {final Properties props = newProperties();props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, \"true\");props.put(ProducerConfig.ACKS_CONFIG, \"all\");props.put(ProducerConfig.CLIENT_ID_CONFIG, \"myApp\");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);final Stringtopic = \"myTopic\";final Producer producer = newKafkaProducer<>(props);final KafkaProducerApplication producerApp = newKafkaProducerApplication(producer, topic);StringfilePath = \"/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt\";try{List linesToProduce = Files.readAllLines(Paths.get(filePath));linesToProduce.stream().filter(l -> !l.trim().isEmpty()).forEach(producerApp::produce);System.out.println(\"Offsets and timestamps committed in batch from \"+ filePath);} catch(IOException e) {System.err.printf(\"Error reading file %s due to %s %n\", filePath, e);} finally{producerApp.shutdown();}}}
启动生产者后,控制台输出如下:
启动消费者:
$ ./kafka-console-consumer.sh--bootstrap-server localhost:9092--topic myTopic
修改配置 acks
启用幂等的情况下,调整 acks
配置,生产者启动后结果是怎样的:
修改配置 acks = 1
acks = 0
会直接报错:
Exception in thread \"main\" org.apache.kafka.common.config.ConfigException: Must setacks toall inordertousethe idempotent producer. Otherwise we cannot guaranteeidempotence.
修改配置 max.in.flight.requests.per.connection
启用幂等的情况下,调整此配置,结果是怎样的:
将 max.in.flight.requests.per.connection > 5
会怎样?
当然会报错:
Causedby: org.apache.kafka.common.config.ConfigException: Mustsetmax.in.flight.requests.per.connectiontoatmost5 tousetheidempotentproducer.
责任编辑:宋璟
-
当前观点:一碰就头疼的 Kafka 消息重复问题,立马解决!
-
焦点速读:*** 次数:9999999 已用完请联系开发者*** 汉初三杰分别是谁
-
时隔三年,三星堆新年大典重磅回归!
-
世界视讯!海浪和风暴潮蓝色警报!自然资源部北海区海洋预报台发布警报
-
今亮点!母乳的储存方法 母乳保存的方法
-
城关镇开展春节前安全生产大检查行动|环球微资讯
-
新春佳节到,拜年要赶早!话境君祝大家新年快乐、大展宏兔! 每日看点
-
工厂营销承包合同范本(共41篇)-每日快讯
-
热门中概股周五普涨 途牛涨超23% 玖富涨超19%
-
刘国梁委以肖战重任,负责混双兼顾王楚钦,国乒巴黎奥运金牌稳了
-
23厦门银行CD005今日发布发行公告 全球通讯
-
全球热资讯!智学网考试查看排名教程
-
刘奕和陈永亮被查之后,李璇称足协有五个当务之急,选帅迫在眉睫|天天实时
-
焦点消息!亚图快捷需要多少天才能销户?
-
单晶硅切片成云南2022年出口值最大单品
-
教育部:2023年高考全国统考将于6月7日、8日举行
-
2023港珠澳大桥珠海公路口岸通关香港交通攻略 全球观察
-
超视力老花镜眼镜是真的吗
-
我市省级研究生培养基地实现零突破
-
全球热点!美媒披露美司法部未让FBI介入拜登机密文件搜查
-
【快播报】利时集团控股(00526.HK)1月18日收盘跌3.33%
-
福建罗源:一“石”激起文旅热浪
-
人民网1月18日盘中跌幅达5%_今日热议
-
全球热文:孩子王:融资净偿还647.81万元,融资余额2.18亿元(01-17)
-
牛奶密度对照表大全_牛奶密度-今日看点
-
34岁知名女星患癌去世,她的葬礼,来了半个娱乐圈
-
天天报道:高新发展(000628)1月17日主力资金净买入3941.19万元
-
福彩3D第2023017期牛魔王中奖诗
-
指南针(300803)1月16日主力资金净买入1.09亿元_每日速看
-
热资讯!李自健美术馆春节开放吗?
-
PyTorch统治学术论文,TensorFlow只占4%,LeCun:还能为啥?
-
2023重庆医科大学附属口腔医院招聘信息
-
每日消息!新春走基层|虞城淮海医院走进心连心敬老院慰问义诊
-
(新春走基层)国网哈供电:穿梭于地下17米电缆隧道的光明守护者|全球热头条
-
土地使用权的转移的方式有哪些?_世界热点评