菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
267
0

kafka如何保证不重复消费又不丢失数据_Kafka写入的数据如何保证不丢失?

原创
05/13 14:22
阅读数 88253

 

我们暂且不考虑写磁盘的具体过程,先大致看看下面的图,这代表了 Kafka 的核心架构原理。

33670f6ec88e654b48fc29e21eced6b8.png

Kafka 分布式存储架构

那么现在问题来了,如果每天产生几十 TB 的数据,难道都写一台机器的磁盘上吗?这明显是不靠谱的啊!所以说,这里就得考虑数据的分布式存储了,我们结合 Kafka 的具体情况来说说。在 Kafka 里面,有一个核心的概念叫做“Topic”,这个 Topic 你就姑且认为是一个数据集合吧。举个例子,如果你现在有一份网站的用户行为数据要写入 Kafka,你可以搞一个 Topic 叫做“user_access_log_topic”,这里写入的都是用户行为数据。然后如果你要把电商网站的订单数据的增删改变更记录写 Kafka,那可以搞一个 Topic 叫做“order_tb_topic”,这里写入的都是订单表的变更记录。然后假如说咱们举个例子,就说这个用户行为 Topic 吧,里面如果每天写入几十 TB 的数据,你觉得都放一台机器上靠谱吗?明显不太靠谱,所以 Kafka 有一个概念叫做 Partition,就是把一个 Topic 数据集合拆分为多个数据分区,你可以认为是多个数据分片,每个 Partition 可以在不同的机器上,储存部分数据。这样,不就可以把一个超大的数据集合分布式存储在多台机器上了吗?大家看下图,一起来体会一下。

209a1d56fe70700526296c48c0fc1ceb.png

Kafka 高可用架构

但是这个时候,我们又会遇到一个问题,就是万一某台机器宕机了,这台机器上的那个 Partition 管理的数据不就丢失了吗?

所以说,我们还得做多副本冗余,每个 Partition 都可以搞一个副本放在别的机器上,这样某台机器宕机,只不过是 Partition 其中一个副本丢失。如果某个 Partition 有多副本的话,Kafka 会选举其中一个 Parititon 副本作为 Leader,然后其他的 Partition 副本是 Follower。只有 Leader Partition 是对外提供读写操作的,Follower Partition 就是从 Leader Partition 同步数据。一旦 Leader Partition 宕机了,就会选举其他的 Follower Partition 作为新的 Leader Partition 对外提供读写服务,这不就实现了高可用架构了?

大家看下面的图,看看这个过程:

825cda3ef24652e3495e08a06abd7bcd.png

Kafka 写入数据丢失问题

现在我们来看看,什么情况下 Kafka 中写入数据会丢失呢?其实也很简单,大家都知道写入数据都是往某个 Partition 的 Leader 写入的,然后那个 Partition 的 Follower 会从 Leader 同步数据。但是万一 1 条数据刚写入 Leader Partition,还没来得及同步给 Follower,此时 Leader Partiton 所在机器突然就宕机了呢?

大家看下图:

5d37867b90b6b54a3987c979ba1ef002.png

如上图,这个时候有一条数据是没同步到 Partition0 的 Follower 上去的,然后 Partition0 的 Leader 所在机器宕机了。此时就会选举 Partition0 的 Follower 作为新的 Leader 对外提供服务,然后用户是不是就读不到刚才写入的那条数据了?因为 Partition0 的 Follower 上是没有同步到最新的一条数据的。这个时候就会造成数据丢失的问题。

Kafka 的 ISR 机制是什么?

现在我们先留着这个问题不说具体怎么解决,先回过头来看一个 Kafka 的核心机制,就是 ISR 机制。这个机制简单来说,就是会自动给每个 Partition 维护一个 ISR 列表,这个列表里一定会有 Leader,然后还会包含跟 Leader 保持同步的 Follower。也就是说,只要 Leader 的某个 Follower 一直跟他保持数据同步,那么就会存在于 ISR 列表里。但是如果 Follower 因为自身发生一些问题,导致不能及时的从 Leader 同步数据过去,那么这个 Follower 就会被认为是“out-of-sync”,被从 ISR 列表里踢出去。所以大家先得明白这个 ISR 是什么,说白了,就是 Kafka 自动维护和监控哪些 Follower 及时的跟上了 Leader 的数据同步。

Kafka 写入的数据如何保证不丢失?

所以如果要让写入 Kafka 的数据不丢失,你需要保证如下几点:

每个 Partition 都至少得有 1 个 Follower 在 ISR 列表里,跟上了 Leader 的数据同步。

每次写入数据的时候,都要求至少写入 Partition Leader 成功,同时还有至少一个 ISR 里的 Follower 也写入成功,才算这个写入是成功了。

如果不满足上述两个条件,那就一直写入失败,让生产系统不停的尝试重试,直到满足上述两个条件,然后才能认为写入成功。

按照上述思路去配置相应的参数,才能保证写入 Kafka 的数据不会丢失。

好!现在咱们来分析一下上面几点要求。

第一条,必须要求至少一个 Follower 在 ISR 列表里。

那必须的啊,要是 Leader 没有 Follower 了,或者是 Follower 都没法及时同步 Leader 数据,那么这个事儿肯定就没法弄下去了。

第二条,每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功。

大家看下面的图,这个要求就是保证说,每次写数据,必须是 Leader 和 Follower 都写成功了,才能算是写成功,保证一条数据必须有两个以上的副本。这个时候万一 Leader 宕机,就可以切换到那个 Follower 上去,那么 Follower 上是有刚写入的数据的,此时数据就不会丢失了。

98c19af929cbe1f5240f6afa1941d8c0.png

如上图所示,假如现在 Leader 没有 Follower 了,或者是刚写入 Leader,Leader 立马就宕机,还没来得及同步给 Follower。在这种情况下,写入就会失败,然后你就让生产者不停的重试,直到 Kafka 恢复正常满足上述条件,才能继续写入。这样就可以让写入 Kafka 的数据不丢失。

总结

最后总结一下,其实 Kafka 的数据丢失问题,涉及到方方面面。譬如生产端的缓存问题,包括消费端的问题,同时 Kafka 自己内部的底层算法和机制也可能导致数据丢失。但是平时写入数据遇到比较大的一个问题,就是 Leader 切换时可能导致数据丢失。所以本文仅仅是针对这个问题说了一下生产环境解决这个问题的方案。

 

 

 

【消息队列】kafka是如何保证消息不被重复消费的

 

一、kafka自带的消费机制

  kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。

  但是当我们直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset。等重启之后,少数消息就会再次消费一次。

  其他MQ也会有这种重复消费的问题,那么针对这种问题,我们需要从业务角度,考虑它的幂等性。

 

二、通过保证消息队列消费的幂等性来保证

  举个例子,当消费一条消息时就往数据库插入一条数据。如何保证重复消费也插入一条数据呢?

  那么我们就需要从幂等性角度考虑了。幂等性,我通俗点说,就一个数据,或者一个请求,无论来多次,对应的数据都不会改变的,不能出错。

 

怎么保证消息队列消费的幂等性?

我们需要结合业务来思考,比如下面的例子:

  1.比如某个数据要写库,你先根据主键查一下,如果数据有了,就别插入了,update一下好吧

  2.比如你是写redis,那没问题了,反正每次都是set,天然幂等性

  3.对于消息,我们可以建个表(专门存储消息消费记录)

    生产者,发送消息前判断库中是否有记录(有记录说明已发送),没有记录,先入库,状态为待消费,然后发送消息并把主键id带上。

    消费者,接收消息,通过主键ID查询记录表,判断消息状态是否已消费。若没消费过,则处理消息,处理完后,更新消息记录的状态为已消费。

 

今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 :point_right: 如何保证 Kafka 消息不重复消费? 我们在做开发的时候为了程序的健壮性,在使用 Kafka 的时候一般都会设置重试的次数,但是因为网络的一些原因,设置了重试就有可能导致有些消息重复发送了(当然导致消息重复也有可能是其他原因),那么怎么解决消息重复这个问题呢?

关于这个问题,我这儿提供了如下三种解决方案,供大家参考。

解决方案

方案一 / 保存并查询

给每个消息都设置一个独一无二的 key,消费的时候把 key 记录下来,然后每次消费新的消息的时候都查询一下,看当前消息的这个 key 是否消费过,如果没有消费过才进行消费。(这种方式好想,但是其实实现起来一点也不简单)

方案二 / 利用幂等

幂等(Idempotence) 在数学上是这样定义的,如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。

这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是, 其任意多次执行所产生的影响均与一次执行的影响相同 。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。 所 以,对于幂等的方法,不用担心重复执行会对系统造 成任何改变。

我们举个例子:chestnut: 来说明一下。在不考虑并发的情况下,“将 X 老 师的 账户余额设置为 100 万元”,执行一次后对系统的影响是,X 老师的账户余额变成了 100 万元。 只要提供的参数 100万元不变,那即使再执行多少次,X 老师的账户余额始终都是 100万元,不会变化,这个操作就是一个幂 等的操作。

再举一个例子:chestnut: ,“将 X 老师的余额加 100 万元”,这个操作它就不是幂等的,每执行一次,账户余额就会增加 100 万元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。

所以,通过这两个例子,我们可以想到如果系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。 也就可以认为,消费多次等于消费一次。

那么,如何实现幂等操作呢? 最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作 。 但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。

下面我们介绍一种常用的方法: 利用数据库的唯一约束实现幂等 。

例如,我们刚刚提到的那个不具备幂等特性的转账的例子: 将 X 老师的账户余额加 100 万元。 在这个例子中,我们可以通过改造业务逻辑,让它具备幂等性。

首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段: 转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。

这样,我们消费消息的逻辑可以变为: “在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。 ”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。

方案三 / 设置前置条件

为更新的数据设置前置条件另外一种实现幂等的思路是, 给数据变更设置一个前置条件 ,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。

这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断 的数据,不满足前置条件,则不会重复执行更新数据操作。

比如,刚刚我们说过,“将 X 老师的账户的余额增加 100 万元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为: “如果X老师的账户当前的余额为 500万元,将余额加 100万元”,这个操作就具备了幂等性。

对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。

但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办? 用什么作为前置判断条件呢? 更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致 ,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等。

每天都会有更新看过的朋友可以点波关注,Java学习路线和优质资源评论或后台回复“Java”获取。

一、kafka自带的消费机制

  kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。

  但是当我们直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset。等重启之后,少数消息就会再次消费一次。

  其他MQ也会有这种重复消费的问题,那么针对这种问题,我们需要从业务角度,考虑它的幂等性。

 

二、通过保证消息队列消费的幂等性来保证

  举个例子,当消费一条消息时就往数据库插入一条数据。如何保证重复消费也插入一条数据呢?

  那么我们就需要从幂等性角度考虑了。幂等性,我通俗点说,就一个数据,或者一个请求,无论来多次,对应的数据都不会改变的,不能出错。

 

怎么保证消息队列消费的幂等性?

我们需要结合业务来思考,比如下面的例子:

  1.比如某个数据要写库,你先根据主键查一下,如果数据有了,就别插入了,update一下好吧

  2.比如你是写redis,那没问题了,反正每次都是set,天然幂等性

  3.对于消息,我们可以建个表(专门存储消息消费记录)

    生产者,发送消息前判断库中是否有记录(有记录说明已发送),没有记录,先入库,状态为待消费,然后发送消息并把主键id带上。

    消费者,接收消息,通过主键ID查询记录表,判断消息状态是否已消费。若没消费过,则处理消息,处理完后,更新消息记录的状态为已消费。

 

今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 :point_right: 如何保证 Kafka 消息不重复消费? 我们在做开发的时候为了程序的健壮性,在使用 Kafka 的时候一般都会设置重试的次数,但是因为网络的一些原因,设置了重试就有可能导致有些消息重复发送了(当然导致消息重复也有可能是其他原因),那么怎么解决消息重复这个问题呢?

关于这个问题,我这儿提供了如下三种解决方案,供大家参考。

解决方案

方案一 / 保存并查询

给每个消息都设置一个独一无二的 key,消费的时候把 key 记录下来,然后每次消费新的消息的时候都查询一下,看当前消息的这个 key 是否消费过,如果没有消费过才进行消费。(这种方式好想,但是其实实现起来一点也不简单)

方案二 / 利用幂等

幂等(Idempotence) 在数学上是这样定义的,如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。

这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是, 其任意多次执行所产生的影响均与一次执行的影响相同 。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。 所 以,对于幂等的方法,不用担心重复执行会对系统造 成任何改变。

我们举个例子:chestnut: 来说明一下。在不考虑并发的情况下,“将 X 老 师的 账户余额设置为 100 万元”,执行一次后对系统的影响是,X 老师的账户余额变成了 100 万元。 只要提供的参数 100万元不变,那即使再执行多少次,X 老师的账户余额始终都是 100万元,不会变化,这个操作就是一个幂 等的操作。

再举一个例子:chestnut: ,“将 X 老师的余额加 100 万元”,这个操作它就不是幂等的,每执行一次,账户余额就会增加 100 万元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。

所以,通过这两个例子,我们可以想到如果系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。 也就可以认为,消费多次等于消费一次。

那么,如何实现幂等操作呢? 最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作 。 但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。

下面我们介绍一种常用的方法: 利用数据库的唯一约束实现幂等 。

例如,我们刚刚提到的那个不具备幂等特性的转账的例子: 将 X 老师的账户余额加 100 万元。 在这个例子中,我们可以通过改造业务逻辑,让它具备幂等性。

首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段: 转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。

这样,我们消费消息的逻辑可以变为: “在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。 ”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。

方案三 / 设置前置条件

为更新的数据设置前置条件另外一种实现幂等的思路是, 给数据变更设置一个前置条件 ,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。

这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断 的数据,不满足前置条件,则不会重复执行更新数据操作。

比如,刚刚我们说过,“将 X 老师的账户的余额增加 100 万元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为: “如果X老师的账户当前的余额为 500万元,将余额加 100万元”,这个操作就具备了幂等性。

对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。

但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办? 用什么作为前置判断条件呢? 更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致 ,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等。

每天都会有更新看过的朋友可以点波关注,Java学习路线和优质资源评论或后台回复“Java”获取。

发表评论

0/200
267 点赞
0 评论
收藏
为你推荐 换一批