kafka可插拔增强若何实现?

kafka可插拔增强若何实现?

导弹阻挡,精准防御。

靠山

阻挡器:在不修改应用程序营业逻辑的情况下,一组基于事宜的可插拔的逻辑处置链;
类比springMVC的阻挡器:

kafka可插拔增强若何实现?

kafka可插拔增强若何实现?

这些都是通过设置阻挡器,插入到应用程序中,实现可插拔的修改营业逻辑;

kafka在0.10.0.0版本中最先引入阻挡器。分为生产者阻挡器和消费者阻挡器,类似责任链的方式编排多个阻挡器为一个大阻挡器。

设置方式:设置参数


Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 阻挡器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 阻挡器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……

kafka可插拔增强若何实现?

注重: 设置阻挡器需要制订阻挡器的全限命名,而且保证生产者或者消费者客户端能够准确加载到设置的阻挡器;

kafka可插拔增强若何实现?

通过阻挡器实现,强制让所有的生产者,消费者设置该阻挡器,实现新闻审计的功效; |

生产者阻挡器

阻挡器需要实现org.apache.kafka.clients.producer.ProducerInterceptor

一站式解决使用枚举的种种痛点

kafka可插拔增强若何实现?

消费者阻挡器

org.apache.kafka.clients.consumer.ConsumerInterceptor

kafka可插拔增强若何实现?

实操

实现端到端的性能监控:

处置历程:

kafka可插拔增强若何实现?

生产者代码:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {


    private Jedis jedis; // 省略Jedis初始化


    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }


    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }

消费者代码:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {


    private Jedis jedis; //省略Jedis初始化


    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }


    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<String, ?> configs) 

设置到阻挡器到对应的生产者和消费者工具,即简朴的实现了平均新闻延时的端到端性能统计。

小结

类比AOP是Spring提供的焦点功效,即面向切面编程,可以把跟营业逻辑无关的平安,审计,性能相关功效放到切面增强中实现。
对Kafka举行一些可插拔的功效增强可以通过阻挡器实现。

本篇先容了kafka的阻挡器的使用方式,以及通过实例展示了详细的用法,希望对团队使用的kafka做一些增强功效的时刻可以行使这个点去扩展。

kafka可插拔增强若何实现?

原创不易,关注诚可贵,转发价更高!转载请注明出处,让我们互通有无,共同进步,迎接沟通交流。
我会连续分享Java软件编程知识和程序员生长职业之路,迎接关注,我整理了这些年编程学习的种种资源,关注民众号‘李福春连续输出’,发送’学习资料’分享给你!
kafka可插拔增强若何实现?

原创文章,作者:admin,如若转载,请注明出处:https://www.2lxm.com/archives/7882.html