java-kafka安装以及使用案例

摘要:
2181/kafka配置zookeeper以管理kafka的路径192.168.155.56:9092使用zookeeper-eKAFKA_LISTENERS=PLAINTEXT:[{“channel”:“metric”:
docker 安装kafka

1:kafka需要zookeeper管理,所以需要先安装zookeeper。 下载docker pull wurstmeister/zookeeper:latest版本
1 安装docker zookeeper
docker pull wurstmeister/zookeeper
2. 启动镜像生成容器

docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

1、查询kafaka镜像
docker search kafka
2、拉取镜像
docker pull wurstmeister/kafka

4:启动kafka镜像生成容器

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.155.56:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.155.56:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

-e KAFKA_BROKER_ID=0  在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=192.168.155.56:2181/kafka 配置zookeeper管理kafka的路径192.168.155.56:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.155.56:9092  把kafka的地址端口注册给zookeeper

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

5:验证kafka是否可以使用

进入容器

docker exec -it kafka /bin/sh

进入路径:/opt/kafka_2.11-2.0.0/bin下

运行kafka生产者发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic sun

发送消息

{"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}

重新打开一个终端 运行kafka消费者接收消息

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning

查看zookeeper容器内,可以看到kafka注册信息

docker exec -it zookeeper /bin/sh

运行zkCli.sh进入zookeeper客户端

./zkCli.sh
ls / 可以查看zookeeper根节点下都挂了那些目录
java-kafka安装以及使用案例第1张

kafka实战应用&文章自动审核

今日目标

熟悉kafka的封装技巧

熟悉阿里审核图片和文本内容审核

完成自媒体文章审核代码

完成自媒体端发布文章发送消息

完成admin端接收消息并自动审核

1 kafka封装

1.1 功能需求

消息对于现代软件项目来说,占有很重要的地位;同时市场上也发展处ActiveMq、RabbitMQ、Kafka、RocketMQ、Pulsar等众多优秀的框架;这些优秀的框架都由自身的特点和擅长的业务领域,在大数据领域中Kafka目前是使用较多的框架,Pulsar是一个后起之秀,目前处于一个快速发展的状态,有望能够成为下一代中间件的黑马。在本案例中我们选择使用Kafka作为内部消息通知的框架,以适应项目中大数据量的高吞吐、实时流计算等功能实现。

1.2 定义

1.2.1 约束定义

(1)Topic命名约束

Topic分为单类和混合类消息,不同类的消息命名约束如下:

  • 单类:heima.topic.[自定义名称].sigle
  • 混合类:heima.topic.[自定义名称].bus

1.3 实现设计

java-kafka安装以及使用案例第2张

  • KafkaProducerConfig自动配置Kafka消费者

  • KafkaConsumerConfig自动配置Kafka消费者

  • RetryErrorHandler实现消费者处理消息失败后重新发送到消息队列

  • KafkaMessage实现对发送的消息包装,提供重试次数、分类等信息

  • KafkaSender实现消息的统一发送入口功能

  • KafkaTopicConfig自动装载topic名称信息

  • KafkaListener提供自动注册消息消费监听接口类

  • KafkaListenerFactory提供启动时自动注册实现了KafkaListener的消息消费者

1.4 开发实现

1.4.1 配置文件

Kafka功能有独立的配置文件,放置在srcmain esourceskafka.properties,相关的值在maven_*.properties中配置。

#kafka config
kafka.hosts=localhost:9092
kafka.group=heima.${profiles.name}.${spring.application.name}

# 单消息通道,需要以sigle结尾
kafka.topic.admin-test=${kafka.topic.admin-test}

1.4.2 KafkaMessage

创建类com.heima.common.kafka.KafkaMessage。KafkaMessage是一个抽象类包含记录当前消息重发处理的次数retry、消息类型type、第一次创建消息的时间time信息。

/**
 * Kafka消息
 */
public abstract class KafkaMessage<T> {

    // 尝试次数
    @Getter
    int retry;
    // 生成时间
    @Getter
    long time = System.currentTimeMillis();
    // 消息类型
    String type;
    // 消息实体数据
    @Setter
    @Getter
    T data;
    public KafkaMessage(){}
    public KafkaMessage(T data){
        this.data = data;
    }

    public void addRetry(){
        this.retry++;
    }
    // 获取消息类型
    protected abstract String getType();
}

1.4.3 KafkaListener

创建类com.heima.common.kafka.KafkaListener。KafkaListener是一个接口,继承ConsumerAwareMessageListener(提供Consumer信息和自动提交offsets功能)接口。

  • topic方法用于返回监听器监听的topic名称

  • factory方法用于指定监听器容器的创建工厂

  • group方法用于指定监听器的groupid,目前没用

/**
 * 消息监听实现接口
 */
public interface KafkaListener<K,V> extends ConsumerAwareMessageListener<K,V> {

    String topic();

    default String factory(){
        return "defaultKafkaListenerContainerFactory";
    }

    default  String group(){ return "default";}

}

1.4.4 KafkaTopicConfig

创建类:com.heima.common.kafka.KafkaTopicConfig。KafkaTopicConfig用于自动装入kafka.properties文件中的kafka.topic.*信息

@Data
@Configuration
@ConfigurationProperties(prefix="kafka.topic")
@PropertySource("classpath:kafka.properties")
public class KafkaTopicConfig {
    String userLogin;
    String userLogout;
    String userRefresh;
    String userRegister;
    String hotArticle;
}

1.4.5 KafkaProducerConfig

创建类com.heima.common.kafka.KafkaProducerConfig。KafkaProducerConfig类是自动化配置类,定义了默认的Producer工厂,以及KafkaTemplate,并约束了消息的类型为String,大小不超过16M。

@Data
@Configuration
@EnableKafka
@ConfigurationProperties(prefix="kafka")
@PropertySource("classpath:kafka.properties")
public class KafkaProducerConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;

    @Autowired(required = false)
    private ProducerListener<String, String> producerListener;

    @Bean
    public DefaultKafkaProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.getHosts());
        props.put(ProducerConfig.RETRIES_CONFIG, 10);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,3*MAX_MESSAGE_SIZE);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,3*MAX_MESSAGE_SIZE);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 256 * 1024);
        return new DefaultKafkaProducerFactory<>( props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory) {
        KafkaTemplate<String, String> t = new KafkaTemplate<>(producerFactory);
        if (this.producerListener != null) {
            t.setProducerListener(this.producerListener);
        }
        return t;
    }
}

1.4.6 KafkaSender

创建类com.heima.common.kafka.KafkaSender。KafkaSender类是所有发送消息的方法统一管理器,其实现通过kafkaTemplate发送。

@Component
public class KafkaSender {

    Logger logger = LoggerFactory.getLogger(KafkaSender.class);

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    ObjectMapper mapper;
    @Autowired
    KafkaTopicConfig kafkaTopicConfig;

    /**
     * 发送一个消息
     * @param topic
     * @param key
     * @param message
     */
    public void sendMesssage(String topic,String key,KafkaMessage<?> message){
        try {
            this.kafkaTemplate.send(topic, key, mapper.writeValueAsString(message));
        }catch (Exception e){
            logger.error("send message to [{}] error:",topic,e);
        }
    }

    /**
     * 发送一个不包装的消息
     * 只能是内部使用,拒绝业务上使用
     * @param topic
     * @param key
     * @param message
     */
    public void sendMesssageNoWrap(String topic,String key,String message){
        try {
            this.kafkaTemplate.send(topic, key, message);
        }catch (Exception e){
            logger.error("send message to [{}] error:",topic,e);
        }
    }
}

1.4.7 RetryErrorHandler

创建类com.heima.common.kafka.RetryErrorHandler。RetryErrorHandler类用于在消费者解析消息出现错误时,重新放回消息到队列中,并设置超过一个小时或者超过10次处理错误的消息丢弃,避免消息无限滚动;然后这类消息可以通过日志搜索查找出数据补偿重试。

@Component
public class RetryErrorHandler extends LoggingErrorHandler {
    private static Logger logger = LoggerFactory.getLogger(RetryErrorHandler.class);
    private static  final  int RETRY_COUNT = 10;
    private static  final  int TIME_OUT = 3_600_000;//1个小时超时

    @Autowired
    KafkaSender sender;
    @Autowired
    ObjectMapper mapper;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        super.handle(thrownException, record);
        if (record != null) {
            try{
                KafkaMessage<?> message = mapper.readValue((String)record.value(),KafkaMessage.class);
                message.addRetry();
                long time = System.currentTimeMillis()-message.getTime();
                if(message.getRetry()>RETRY_COUNT||time>TIME_OUT){
                    logger.info("超时或者尝试{}次后,抛弃消息[topic:{}][{}]",RETRY_COUNT,record.topic(),record.value());
                }else{
                    this.sender.sendMesssage(record.topic(),(String)record.key(),message);
                    logger.info("处理失败重新回滚到队列[retry:{}][topic:{}][key:{}]",message.getRetry(),record.topic(),record.key());
                }
            }catch (Exception e){
                sender.sendMesssageNoWrap(record.topic(),(String) record.key(),(String) record.value());
            }

        }
    }

}

1.4.8 KafkaProducerConfig

创建类com.heima.common.kafka.KafkaProducerConfig。KafkaProducerConfig主要配置消费者监听器,配置重试器、错误处理器等信息,同时设置group消费者。

@Data
@Configuration
@EnableKafka
@ConfigurationProperties(prefix="kafka")
@PropertySource("classpath:kafka.properties")
public class KafkaConsumerConfig {
    private static final int CONCURRENCY = 8;
    public final static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    String hosts;
    String group;


    @Bean("defaultKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryErrorHandler retryErrorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setRetryTemplate(this.buildRetryTemplate());
        factory.setErrorHandler(retryErrorHandler);
        factory.getContainerProperties().setAckOnError(false);
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(buildComsumerConfig()));
        factory.setConcurrency(KafkaConsumerConfig.CONCURRENCY);
        return factory;
    }

    protected Map<String, Object> buildComsumerConfig() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.getHosts());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, this.group);
        propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,8 * 1024 * 1024);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 90_000);
        return propsMap;
    }

    private RetryTemplate buildRetryTemplate() {
        RetryTemplate t = new RetryTemplate();
        ExponentialBackOffPolicy backOff = new ExponentialRandomBackOffPolicy();
        backOff.setInitialInterval(1000L);
        t.setBackOffPolicy(backOff);
        t.setRetryPolicy(new SimpleRetryPolicy(5));
        t.registerListener(new RetryListenerSupport() {
            @Override
            public <T, E extends Throwable> void onError(RetryContext context,
                                                         RetryCallback<T, E> callback, Throwable throwable) {
                KafkaConsumerConfig.LOGGER.warn("Retry processing Kafka message "
                        + context.getRetryCount() + " times", throwable);
            }
        });
        return t;
    }

}

1.4.9 KafkaListenerFactory

创建类com.heima.common.kafka.KafkaListenerFactory。KafkaListenerFactory类实现在构造之后扫描实现了的KafkaListener接口的Bean,并自动注册成消费者监听器。

@Component
public class KafkaListenerFactory implements InitializingBean {

    Logger logger = LoggerFactory.getLogger(KafkaListenerFactory.class);

    @Autowired
    DefaultListableBeanFactory defaultListableBeanFactory;

    @Override
    public void afterPropertiesSet() {
        Map<String,KafkaListener> map = defaultListableBeanFactory.getBeansOfType(KafkaListener.class);
        for (String key : map.keySet()) {
            KafkaListener k = map.get(key);
            AbstractKafkaListenerContainerFactory factory = (AbstractKafkaListenerContainerFactory)defaultListableBeanFactory.getBean(k.factory());
            AbstractMessageListenerContainer container = factory.createContainer(k.topic());
            container.setupMessageListener(k);
            String beanName = k.getClass().getSimpleName()+"AutoListener" ;
            defaultListableBeanFactory.registerSingleton(beanName,container);
            logger.info("add auto listener [{}]",beanName);
        }
    }
}

1.4.10 MessagesRegister

/**
 * 扫描所有的kafkamessage类
 */
@Log4j2
@Component
public class MessagesRegister implements InitializingBean {

    Map<String,Class> messages = Maps.newConcurrentMap();

    @Override
    public void afterPropertiesSet() throws Exception {
        Reflections reflections = new Reflections("com.heima");
        Set<Class<? extends KafkaMessage>> ms = reflections.getSubTypesOf(KafkaMessage.class);
        if(ms!=null){
            ms.forEach(cla->{
                try {
                    Constructor<?>[] cs = cla.getConstructors();
                    KafkaMessage mess = null;
                    if (cs != null && cs.length > 0) {
                        Class[] temp = cs[0].getParameterTypes();
                        Object[] parms = new Object[temp.length];
                        for (int i = 0; i < temp.length; i++) {
                            if(temp[i].isPrimitive()){
                                if(temp[i].getName().contains("boolean")){
                                    parms[i]=false;
                                }else {
                                    parms[i] = 0;
                                }
                            }else{
                                parms[i]=null;
                            }
                        }
                        mess = (KafkaMessage) cs[0].newInstance(parms);
                    } else {
                        mess = (KafkaMessage) cla.newInstance();
                    }
                    String type = mess.getType();
                    messages.put(type,cla);
                }catch (Exception e){
                    System.out.println(cla+"====================:"+cla.getConstructors()[0].getParameterCount());
                    e.printStackTrace();
                }
            });
        }
        log.info("=================================================");
        log.info("scan kafka message resultt[{}]",messages);
        log.info("=================================================");
    }

    /**
     * 通过消息的类型名称,查找对应的class定义
     * @param type
     * @return
     */
    public Class<? extends KafkaMessage> findClassByType(String type){
        return this.messages.get(type);
    }

}

1.5 消息生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaTest {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Test
    public void test(){
        try {
            this.kafkaTemplate.send("topic.test", "123key","123value");
            System.out.println("=================================");
            Thread.sleep(500000);// 休眠等待消费者接收消息
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1.6 消息消费者

@Component
public class TestKafkaListener implements KafkaListener<String,String> {
    @Override
    public String topic () {
        return "topic.test";
    }
    @Override
    public void onMessage (ConsumerRecord< String, String > data, Consumer< ?, ?> consumer){
        System.out.println("===========receive test message:" + data);
    }
}

免责声明:文章转载自《java-kafka安装以及使用案例》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇【转】oracle之错误处理Android JNI和NDK学习(08)JNI实例一 传递基本类型数据下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Winform异步解决窗体耗时操作(Action专门用于无返回值,Func专门用于有返回值)

http://blog.csdn.net/config_man/article/details/25578767 [csharp]view plaincopy #region 调用timer控件实时查询开关机时间   private void timer1_Tick(object sender, EventArgs e)   {       str...

写一个简易的java项目(五) websocket 弹幕 -1

目的:websocket做弹幕  用到的技术:springboot +websocket +uniapp (只写了后台) 菜鸟:https://www.runoob.com/html/html5-websocket.html 这是最后结果的展示: WebSocket是什么?为什么用它? 全双工通信的协议。允许服务端主动向客户端推送数据。 后台代码: 第一...

在Eclipse中调试Tomcat

Tomcat 1、下载源码,并解压文件,例如:E:\Programe\javaWorkShop\OA\Tomcat6 2、新建项目Java Project 3、从File System中导入 4、配置source 5、添加依赖包,我都是从eclipse插件中找到 主要包如下: 添加依赖后,基本上错误都消失 6、copy 文件:build.pro...

Kafka消费组(consumer group)

一、 误区澄清与概念明确 1 Kafka的版本 很多人在Kafka中国社区(替群主做个宣传,QQ号:162272557)提问时的开头经常是这样的:“我使用的kafka版本是2.10/2.11, 现在碰到一个奇怪的问题。。。。” 无意冒犯,但这里的2.10/2.11不是kafka的版本,而是编译kafka的Scala版本。Kafka的server端代码是由S...

微信公众账号支付商户接入指南

公众号支付商户接入指南----------------------------- 1 公众账号相关事宜说明1.1 申请微信支付的公众账号需符合的条件申请微信支付功能的公众号必须具备 2 个条件� 公众号为服务号� 公众号必须经过微信认证(如何转换服务号和申请微信认证在后面详述)1.2 申请公众账号如果商户还没有公众帐号,请按照以下流程指导申请微信公众账号。...

MetadataCache更新

MetadataCache什么时候更新 updateCache方法用来更新缓存的。 发起线程 controller-event-thread controller选举的时候 CLASS_NAME METHOD_NAME LINE_NUM kafka/controller/KafkaController sendUpdateMetadata...