如何保证Kafka不丢失消息?技术学习视频教程⽹盘资源整理 https://pan.baidu.com/s/13dbR69NLIEyP1tQyRTl4xwkafka如何保证不丢消息ps:这篇⽂章⾃我感觉说的很⼤⽩话了 !希望你们看过了之后能有收获。⽣产者丢失消息的情况⽣产者(Producer) 调⽤send⽅法发送消息之后,消息可能因为⽹络问题并没有发送过去。所以,我们不能默认在调⽤send⽅法发送消息之后消息消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka ⽣产者(Producer) 使⽤ send ⽅法发送消息实际上是异步的操作,我们可以通过 get()⽅法获取调⽤结果,但是这样也让它变为了同步操作,⽰例代码如下:SendResult
sendResult = kafkaTemplate.send(topic, o).get();if (sendResult.getRecordMetadata() != null) {logger.info('⽣产者成功发送消息到' + sendResult.getProducerRecord().topic() + '-> ' + sendResult.getProducerRecord().value().toString());}但是⼀般不推荐这么做!可以采⽤为其添加回调函数的形式,⽰例代码如下:ListenableFuture> future = kafkaTemplate.send(topic, o);future.addCallback(result -> logger.info('⽣产者成功发送消息到topic:{} partition:{}的消息',result.getRecordMetadata().topic(),result.getRecordMetadata().partition()),ex -> logger.error('⽣产者发送消失败,原因:{}', ex.getMessage()));如果消息发送失败的话,我们检查失败的原因之后重新发送即可!另外这⾥推荐为 Producer 的retries(重试次数)设置⼀个⽐较合理的值,⼀般是 3 ,但是为了保证消息不丢失的话⼀般会设置⽐较⼤⼀点。设置完成之后,当出现⽹络问题之后能够⾃动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太⼩的话重试的效果就不明显了,⽹络波动⼀次你3次⼀下⼦就重试完了消费者丢失消息的情况我们知道消息在被追加到 Partition(分区)的时候都会分配⼀个特定的偏移量(offset)。偏移量(offset)表⽰ Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。kafka offset当消费者拉取到了分区的某个消息之后,消费者会⾃动提交了 offset。⾃动提交的话会有⼀个问题,试想⼀下,当消费者刚拿到这个消息准备进⾏真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被⾃动提交了。解决办法也⽐较粗暴,我们⼿动关闭⾃动提交 offset,每次在真正消费完消息之后之后再⾃⼰⼿动提交 offset 。 但是,细⼼的朋友⼀定会发现,这样会带来消息被重新消费的问题。⽐如你刚刚消费完消息之后,还没提交 offset,结果⾃⼰挂掉了,那么这个消息理论上就会被消费两次。Kafka 弄丢了消息我们知道 Kafka 为分区(Partition)引⼊了多副本(Replica)机制。分区(Partition)中的多个副本之间会有⼀个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进⾏同步。⽣产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。试想⼀种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出⼀个 leader ,但是 leader 的数据还有⼀些没有被 follower 副本的同步的话,就会造成消息丢失。设置 acks = all解决办法就是我们设置 acks = all。acks 是 Kafka ⽣产者(Producer) 很重要的⼀个参数。acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。设置 replication.factor >= 3为了保证 leader 副本能有 follower 副本能同步消息,我们⼀般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) ⾄少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。设置 min.insync.replicas > 1⼀般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息⾄少要被写⼊到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际⽣产中应尽量避免默认值1。但是,为了保证整个 Kafka 服务的⾼可⽤性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想⼀下加⼊两者相等的话,只要是有⼀个副本挂掉,整个分区就⽆法正常⼯作了。这明显违反⾼可⽤性!⼀般推荐设置成 replication.factor = min.insync.replicas + 1。设置 unclean.leader.election.enable = falseKafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进⾏同步。多个 follower 副本之间的消息同步情况不⼀样,当我们配置了unclean.leader.election.enable = false 的话,当 leader 副本发⽣故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。