kafka实现无新闻丢失与正确一次语义(exactly once)处置

在许多的流处置框架的先容中,都市说kafka是一个可靠的数据源,而且推荐使用Kafka看成数据源来举行使用。这是由于与其他新闻引擎系统相比,kafka提供了可靠的数据保留及备份机制。而且通过消费者位移这一观点,可以让消费者在因某些缘故原由宕机而重启后,可以轻易得回到宕机前的位置。

但实在kafka的可靠性也只能说是相对的,在整条数据链条中,总有可以让数据泛起丢失的情形,今天就来讨论若何制止kafka数据丢失,以及实现正确一致处置的语义。

kafka无新闻丢失处置

在讨论若何实现kafka无新闻丢失的时刻,首先要先清晰大部分情形下新闻丢失是在什么情形下发生的。为什么是大部分,由于总有一些异常特殊的情形会被人忽略,而我们只需要关注普遍的情形就足够了。接下来我们来讨论若何较为普遍的数据丢失情形。

1.1 生产者丢失

前面先容Kafka分区和副本的时刻,有提到过一个producer客户端有一个acks的设置,这个设置为0的时刻,producer是发送之后不管的,这个时刻就很有可能由于网络等缘故原由造成数据丢失,以是应该只管制止。然则将ack设置为1就没问题了吗,那也不一定,由于有可能在leader副本吸收到数据,但还没同步给其他副本的时刻就挂掉了,这时刻数据也是丢失了。而且这种时刻是客户端以为新闻发送乐成,但kafka丢失了数据。

要到达最严酷的无新闻丢失设置,应该是要将acks的参数设置为-1(也就是all),而且将min.insync.replicas设置项调高到大于1,这部分内容在上一篇副本机制有先容详细剖析kafka之kafka分区和副本

同时还需要使用带有回调的producer api,来发送数据。注重这里讨论的都是异步发送新闻,同步发送不在讨论局限。

public class send{
    ......
    public static void main(){
        ...
        /*
        *  第一个参数是 ProducerRecord 类型的工具,封装了目的 Topic,新闻的 kv
        *  第二个参数是一个 CallBack 工具,当生产者吸收到 Kafka 发来的 ACK 确认新闻的时刻,
        *  会挪用此 CallBack 工具的 onCompletion() 方式,实现回调功效
        */
         producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
                        new DemoCallBack(startTime, messageNo, messageStr));
        ...
    }
    ......
}

class DemoCallBack implements Callback {
    /* 最先发送新闻的时间戳 */
    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    /**
     * 生产者乐成发送新闻,收到 Kafka 服务端发来的 ACK 确认新闻后,会挪用此回调函数
     * @param metadata 生产者发送的新闻的元数据,若是发送过程中泛起异常,此参数为 null
     * @param exception 发送过程中泛起的异常,若是发送乐成为 null
     */
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n",
                    key, message, metadata.partition(), metadata.offset(), elapsedTime);
        } else {
            exception.printStackTrace();
        }
    }
}

更详细的代码可以参考这里:Kafka生产者剖析——KafkaProducer

我们之前提到过,producer发送到kafka broker的时刻,是有多种可能会失败的,而回调函数能准确告诉你是否确认发送乐成,固然这依托于acks和min.insync.replicas的设置。而当数据发送丢失的时刻,就可以举行手动重发或其他操作,从而确保生产者发送乐成。

1.2 kafka内部丢失

有些时刻,kafka内部由于一些不大好的设置,可能会泛起一些极为隐藏的数据丢失情形,那么我们划分讨论下大致都有哪几种情形。

首先是replication.factor设置参数,这个设置决议了副本的数目,默认是1。注重这个参数不能超过broker的数目。说这个参数实在是由于若是使用默认的1,或者不在建立topic的时刻指定副本数目(也就是副本数为1),那么当一台机械泛起磁盘损坏等情形,那么数据也就从kafka内里丢失了。以是replication.factor这个参数最好是设置大于1,好比说3

接下来要说的照样和副本相关的,也是上一篇副本中提到的unclean.leader.election.enable 参数,这个参数是在主副本挂掉,然后在ISR聚集中没有副本可以成为leader的时刻,要不要让进度比较慢的副本成为leader的。不用多说,让进度比较慢的副本成为leader,肯定是要丢数据的。虽然可能会提高一些可用性,但若是你的营业场景丢失数据加倍不能忍受,那照样将unclean.leader.election.enable设置为false吧

1.3 消费者丢失

消费者丢失的情形,实在跟消费者位移处置欠妥有关。消费者位移提交有一个参数,enable.auto.commit,默认是true,决议是否要让消费者自动提交位移。若是开启,那么consumer每次都是先提交位移,再举行消费,好比先跟broker说这5个数据我消费好了,然后才最先逐步消费这5个数据。

这样处置的话,利益是简朴,坏处就是漏消费数据,好比你说要消费5个数据,消费了2个自己就挂了。那下次该consumer重启后,在broker的纪录中这个consumer是已经消费了5个的。

以是最好的做法就是将enable.auto.commit设置为false,改为手动提交位移,在每次消费完之后再手动提交位移信息。固然这样又有可能会重复消费数据,究竟exactly once处置一直是一个问题呀(/摊手)。遗憾的是kafka现在没有保证consumer幂等消费的措施,若是确实需要保证consumer的幂等,可以对每条新闻维持一个全局的id,每次消费举行去重,固然花费这么多的资源来实现exactly once的消费到底值不值,那就得看详细营业了。

1.4 无新闻丢失小结

那么到这里先来总结下无新闻丢失的主要设置吧:

  • producer的acks设置位-1,同时min.insync.replicas设置大于1。而且使用带有回调的producer api发生新闻。
  • 默认副本数replication.factor设置为大于1,或者建立topic的时刻指定大于1的副本数。
  • unclean.leader.election.enable 设置为false,防止定期副本leader重选举
  • 消费者端,自动提交位移enable.auto.commit设置为false。在消费完后手动提交位移。

那么接下来就来说说kafka实现正确一次(exactly once)处置的方式吧。

实现正确一次(exactly once)处置

在分布式环境下,要实现新闻一致与正确一次(exactly once)语义处置是很难的。正确一次处置意味着一个新闻只处置一次,造成一次的效果,不能多也不能少。

设计模式 –面试高频之享元模式

那么kafka若何能够实现这样的效果呢?在先容之前,我们先来先容其他两个语义,至多一次(at most once)和至少一次(at least once)。

最多一次和至少一次

最多一次就是保证一条新闻只发送一次,这个实在最简朴,异步发送一次然后不管就可以,瑕玷是容易丢数据,以是一样平常不接纳。

至少一次语义是kafka默认提供的语义,它保证每条新闻都能至少吸收并处置一次,瑕玷是可能有重复数据。

前面有先容过acks机制,当设置producer客户端的acks是1的时刻,broker吸收到新闻就会跟producer确认。但producer发送一条新闻后,可能由于网络缘故原由新闻超时未达,这时刻producer客户端会选择重发,broker回应吸收到新闻,但很可能最最先发送的新闻延迟到达,就会造成新闻重复吸收。

那么针对这些情形,要若何实现正确一次处置的语义呢?

幂等的producer

要先容幂等的producer之前,得先领会一下幂等这个词是什么意思。幂等这个词最早起源于函数式编程,意思是一个函数无论执行多少次都市返回一样的效果。好比说让一个数加1就不是幂等的,而让一个数取整就是幂等的。由于这个特征以是幂等的函数适用于并发的场景下。

但幂等在分布式系统中寄义又做了进一步的延申,好比在kafka中,幂等性意味着一个新闻无论重复多少次,都市被看成一个新闻来持久化处置。

kafka的producer默认是支持最少一次语义,也就是说不是幂等的,这样在一些好比支付等要求正确数据的场景会泛起问题,在0.11.0后,kafka提供了让producer支持幂等的设置操作。即:

props.put(“enable.idempotence”, ture)

在建立producer客户端的时刻,添加这一行设置,producer就酿成幂等的了。注重开启幂等性的时刻,acks就自动是“all”了,若是这时刻手动将ackss设置为0,那么会报错。

而底层实现实在也很简朴,就是对每条新闻天生一个id值,broker会凭据这个id值举行去重,从而实现幂等,这样一来就能够实现正确一次的语义了。

然则!幂等的producery也并非万能。有两个主要是缺陷:

  • 幂等性的producer仅做到单分区上的幂等性,即单分区新闻不重复,多分区无法保证幂等性。
  • 只能保持单会话的幂等性,无法实现跨会话的幂等性,也就是说若是producer挂掉再重启,无法保证两个会话间的幂等(新会话可能会重发)。由于broker端无法获取之前的状态信息,以是无法实现跨会话的幂等。

事务的producer

当遇到上述幂等性的缺陷无法解决的时刻,可以思量使用事务了。事务可以支持多分区的数据完整性,原子性。而且支持跨会话的exactly once处置语义,也就是说若是producer宕机重启,依旧能保证数据只处置一次。

开启事务也很简朴,首先需要开启幂等性,即设置enable.idempotence为true。然后对producer发送代码做一些小小的修改。

//初始化事务
producer.initTransactions();
try {
    //开启一个事务
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    //提交
    producer.commitTransaction();
} catch (KafkaException e) {
    //泛起异常的时刻,终止事务
    producer.abortTransaction();
}

但无论开启幂等照样事务的特征,都市对性能有一定影响,这是一定的。以是kafka默认也并没有开启这两个特征,而是交由开发者凭据自身营业特点举行处置。

以上~

推荐阅读:
分布式系统一致性问题与Raft算法(上)
Scala函数式编程(五) 函数式的错误处置
大数据存储的进化史 –从 RAID 到 Hadoop Hdfs

原创文章,作者:admin,如若转载,请注明出处:https://www.2lxm.com/archives/1319.html