信息机

消息队列之kafka高级应用

发布时间:2023/8/6 16:58:35   
白癜风丸说明书 http://pf.39.net/bdfyy/zjdy/140223/4342728.html

1、幂等与事务

所谓的幂等,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幕等性功能之后可以避免这种情况。

开启幕等性功能的方式很简单,只需要显式地将生产者客户端参数enable.idempotence设置为true即可(这个参数的默认值为false)。

为了实现生产者的幕等性,Kafka为此引入了producerid(PID)和序列号(sequencenumber)这两个概念。每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将PID,分区对应的序列号的值加1。

broker端会在内存中为每一对PID,分区维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new=SN_old+1)时,broker才会接收它。如果SN_newSN_old+1,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_newSN_old+1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出异常。

引入序列号来实现幕等也只是针对每一对PID,分区而言的,也就是说,Kafka的幂等只能保证单个生产者会话中单分区的幂等。

幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。

对流式应用而言,一个典型的应用模式为“consume-transform-produce”。在这种模式下消费和生产并存:应用程序从某个主题中消费消息,然后经过一系列转换后写入另一个主题,消费者可能在提交消费位移的过程中出现问题而导致重复消费,也有可能生产者重复生产消息。Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

为了实现事务,应用程序必须提供唯一的transactionalId,这个transactionalld通过客户端参数transactional.id来显式设置。事务要求生产者开启幕等特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将enable.idempotence设置为true。

transactionalld与PID一一对应,两者之间所不同的是transactionalld由用户显式设置,而PID是由Kafka内部分配的。另外,为了保证新的生产者启动后具有相同transactionalld的旧生产者能够立即失效,每个生产者通过transactionalld获取PID的同时,还会获取一个单调递增的producerepoch。

从生产者的角度分析,通过事务Kafka可以保证跨生产者会话的消息幕等发送,以及跨生产者会话的事务恢复。前者表示具有相同transactionalld的新生产者实例被创建且工作的时候,旧的且拥有相同transactionalld的生产者实例将不再工作。后者指当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交,要么被中止,如此可以使新的生产者实例从一个正常的状态开始工作。

一个典型的事务消息发送的操作如下:

Propertiesproperties=newProperties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfiq.TRANSACTIONAL_ID_CONFIG,transactionid);KafkaProducerString,Stringproducer=newKafkaProducer(properties);producer.initTransactions();producer.beginTransaction();try{//处理业务逻辑并创建ProducerRecordProducerRecordString,Stringrecordl=newProducerRecord(topic,"msgl");producer.send(recordl);ProducerRecordString,Stringrecord2=newProducerRecord(topic,"msg2");producer.send(record2);ProducerRecordString,Stringrecord3=newProducerRecord(topic,"msg3");producer.send(record3);//处理一些其他逻辑producer.

转载请注明:http://www.aideyishus.com/lkyy/5598.html

------分隔线----------------------------