kafka producer interceptor拦截器(五)

摘要:
当发送数据时,生产者将经过拦截器和序列化,最终到达相应的分区。˃Arg0){}#获取代理@Overridepublicvoid close()的配置信息{}#在生产者关闭时调用此方法@Overridepublic void onAcknowledgement{}}#将数据写入代理时,无论回调@OverridepublicProducerRecord<String是否成功,String˃onSend{}#阻止的信息生产者:publicclassProducerDemo{privatestaticfinalLoggerLOG=LoggerFactory.getLogger;publicstaticvoidmaintrowsInterruptedException,ExecutionException{//1。加载配置信息Propertiesprop=loadProperties()//2.创建producer KafkaProducerproducer=newKafkaProduction//3.发送内容StringsendContent=“hello_kafka”;IntStream.range.forEach;producer.close();//回调拦截器中的close方法}//配置文件publicstaticPropertiesloadProperties()的设置{Propertiesprop=newProperties();prop.put;prop.pput;prop.pit;prop.pot;prop.port;prop.ppt;//发送到所有ISR队列returnprop;}}拦截器1:publicclassProductInterceptorDemomplementsProducerInterceptor<String,String˃{privatestaticfinalLoggerLOG=LoggerFactory.getLogger;privatevolatielongsuccNum=0;privateVolatielongfallNum=0;@Overridepublicvoidconfigure(Map˂String,?

  producer在发送数据时,会经过拦截器和序列化,最后到达相应的分区。在经过拦截器时,我们可以对发送的数据做进步的处理。

  要正确的使用拦截器需要以下步骤:

    1.实现拦截器ProducerInterceptor的方法

    2.在producer的prop中配置 

      prop.put("interceptor.classes", "com.xxx.interceptor.xxxInterceptor")

     如果是拦截器链的话,在后面追加即可

      prop.put("interceptor.classes", ""com.xxx.interceptor.xxxInterceptor1,com.xxx.interceptor.xxxInterceptor2");

生产者的拦截器需要实现ProducerInterceptor接口中的方法来实现

  @Override
  public
void configure(Map<String, ?> arg0) {}  #获取broker的配置信息
@Override
public void close() {}       #在producer关闭时调用此方法         
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} #数据在写到broker时,无论是否成功的回调 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {}  #拦截的信息

生产者:

public class ProducerDemo {
    
    private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
      
        //1.加载配置信息
        Properties prop = loadProperties();
        
        //2.创建生产者
        KafkaProducer<String,String> producer = new KafkaProducer<>(prop);
        
        //3.发送内容
        String sendContent = "hello_kafka";
        
        IntStream.range(0, 10).forEach(i ->{
            try {
                ProducerRecord<String,String> record = new ProducerRecord<>("test1",sendContent+"_"+i);
                Future<RecordMetadata> future = producer.send(record);
                RecordMetadata recordMetadata = future.get();
                LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        });
                
        producer.close();    //回调拦截器中的close方法
        
    }
        
    //配置文件的设置
    public static Properties loadProperties() {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("interceptor.classes", "com.zpb.interceptor.ProducerInterceptorDemo,com.zpb.interceptor.ProducerInterceptorDemo2");
        prop.put("acks", "all");    //发送到所有的ISR队列中
        return prop;
    }
}

拦截器一:

public class ProducerInterceptorDemo implements ProducerInterceptor<String, String>{
    
    private static final Logger LOG = LoggerFactory.getLogger(ProducerInterceptorDemo.class);

    private volatile long succNum = 0;
    
    private volatile long failNum = 0;
    
    @Override
    public void configure(Map<String, ?> arg0) {
        LOG.info("configure ==>"+arg0);
    }

    @Override
    public void close() {
        double succRatio = succNum/succNum+failNum;
        LOG.info("成功率是:"+succRatio*100);
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        
        if(null == e){
            succNum++;
        }else{
            failNum++;
        }
        
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        
        String prefixValue = producerRecord.value()+"prefix_1";
        
        return new ProducerRecord<String, String>(producerRecord.topic(),prefixValue);
    }
}

拦截器二:

public class ProducerInterceptorDemo2 implements ProducerInterceptor<String, String>{

    @Override
    public void configure(Map<String, ?> configs) {
        
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String prefixValue = record.value()+"prefix_2";
        return new ProducerRecord<String, String>(record.topic(), prefixValue);
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        
    }

    @Override
    public void close() {
        
    }
}

免责声明:文章转载自《kafka producer interceptor拦截器(五)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇RPA-UiPath学习之启程js 获取地址第1个斜杠后的内容或地址前一部分下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Java学习:JDBC各类详解

 JDBC各个类详解 代码实现: //1.导入驱动jar包 //2.注册驱动 Class.forName("com.mysql.jdbc.Driver"); //3.获取数据库连对象 Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/db3","root",...

postman 发送post请求,参数为json

mvc 控制器接收post请求,参数为json PostMan设置 Headers设置key和Value值 key:Content-Type,Value:application/json。 参数设置: 选中Body并进行参数设置,选择raw,格式为json。就酱 控制器代码: //post 请求测试 [HttpPost] //请求方法,...

stream流 list转map

package com.mayikt.stream; import com.mayikt.entity.UserEntity; import java.util.ArrayList; import java.util.Map; import java.util.function.BiConsumer; import java.util.functio...

MyBatis映射文件(编写SQL语句;可有可无(无的时候,使用注解编程))

 一、映射文件  1.简单的增删改(需要commit)---查 MyBatis允许增删改直接定义以下类型返回值   Integer、Long、Boolean、void 我们需要手动提交数据。   sqlSessionFactory.openSession();===>需要手动提交   sqlSessionFactory.openSession(tr...

获取DataTable中一列的数据

#region 获取合同号DataRow[] arrRow = new DataRow[ds.Rows.Count];int w = 0;foreach (DataRow row in ds.Rows){arrRow[w] = row;w++;}string[] ary = Array.ConvertAll(arrRow, r => r["cdon...

C# Pkcs8 1024位 加密 解密 签名 解签

部分代码来至https://www.cnblogs.com/dj258/p/6049786.html usingSystem; usingSystem.Collections.Generic; usingSystem.Linq; usingSystem.Text; usingOrg.BouncyCastle.Asn1.Pkcs; usingOrg...