kafka时间轮简易实现(二)

摘要:
概述上一篇主要介绍了kafka时间轮源码和原理,这篇主要介绍一下kafka时间轮简单实现和使用kafka时间轮。如果要实现一个时间轮,就要了解他的数据结构和运行原理,上一篇随笔介绍了不同种类的数据结构kafka时间轮的原理(一)。时间轮的启动和停止肯定也是一个单独的线程来保证时间轮中的任务正确执行和取消等。powerOf2){11thrownewRuntimeException;12}13this.bufferSize=bufferSize;14this.ringBuffer=newObject[bufferSize];15}时间轮的启动和停止下面就是初始化,时间轮初始化只需要一个线程实现就行。
概述

上一篇主要介绍了kafka时间轮源码和原理,这篇主要介绍一下kafka时间轮简单实现和使用kafka时间轮。如果要实现一个时间轮,就要了解他的数据结构和运行原理,上一篇随笔介绍了不同种类的数据结构kafka时间轮的原理(一)。大体上也就是需要使用数组或者链表组成一个环形的结构,数组或者链表的节点保存任务,这个任务肯定需要使用抽象一些的线程类来实现它以便于后期的任务执行等。时间轮的启动和停止肯定也是一个单独的线程来保证时间轮中的任务正确执行和取消等。现在详细说一下实现过程,其实知道原理实现起来就很方便了。

时间轮的功能

本文说明的时间轮具有的功能:

1,可以添加指定时间的延时任务,每个任务都是task抽象的父类,每个任务都放在环形object类型数组中,在这个任务中可以实现自己的业务逻辑。

2,有一个触发任务,实际上是一个线程,主要作用是相当于按时遍历时间轮每个节点,查看是否到时间执行,就相当于表针运行状态触发执行任务,这里就是TriggerJob。

3,停止运行(包含强制停止和所有任务完成后停止)。

4,查看待执行任务数量。

时间轮的数据结构

本文时间轮是一个object数组,每个数组元素这里定义是个set集合,set集合可以有多个任务,时间轮的大小规则是2的指数,这样设计的目的是可以通过左移达到取模的目的,这里使用线程池ExecutorService,因为多个任务肯定从线程池快速申请,见代码:

1     //时间轮默认大小2的五次方
2     private static final int STATIC_RING_SIZE=64;
3     //数组作为时间轮
4     privateObject[] ringBuffer;
5     private intbufferSize;
6     //线程池
7     privateExecutorService executorService;
8     
9     //时间轮中总任务个数
10     private volatile int size =0;
11     
12     //主要确定是否继续执行触发轮询时间轮的任务,相当关闭轮询时间轮的任务
13     private volatile boolean stop=false;
14     //使用原子类,初始化只需要一个线程执行,确定只一次初始化启动。
15     private volatile AtomicBoolean start= new AtomicBoolean(false);
16     //触发任务中的表针,tick 顾名思义
17     private AtomicInteger tick = newAtomicInteger();
18     
19     //条件锁,用于stop
20     private Lock lock = newReentrantLock();
21     private Condition condition =lock.newCondition();
22     
23     //每一个任务有一个任务id
24     private AtomicInteger taskId= newAtomicInteger();
25     
26     //用于按照taskId查找任务取消
27     private Map<Integer,Task>  taskMap= new HashMap<Integer,Task>();

时间轮的构造函数,有2个,一个默认大小。一个用户自定义大小:

1     publicRhettBufferWheel(ExecutorService  executorService){
2         this.executorService=executorService;
3         this.bufferSize=STATIC_RING_SIZE;
4         this.ringBuffer= newObject[bufferSize];
5 }
6     
7     public RhettBufferWheel(ExecutorService executorService, intbufferSize) {
8         this(executorService);
9         //判断bufferSize是否是2的指数
10         if(!powerOf2(bufferSize)){
11             throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
12 } 
13         this.bufferSize =bufferSize;
14         this.ringBuffer = newObject[bufferSize];
15     }
时间轮的启动和停止

下面就是初始化,时间轮初始化只需要一个线程实现就行。

1     public voidstart() {
2         if (!start.get()) {
3             if (start.compareAndSet(start.get(), true)) {
4                 logger.info("delay task is starting");
5                 Thread job = new Thread(newTriggerJob());
6                 job.setName("consumer RingBuffer thread");
7 job.start();
8                 start.set(true);
9 }
10 
11 }
12     }

有启动就有停止,停止有2中情况,一是强制停止所有任务,二是使用条件队列锁挂起所有任务,关闭addTask,直到任务执行完毕后被唤醒。

1     public void stop(booleanforce) {
2         if(force) {
3             logger.info("delay task is forced stop");
4             stop = true;
5 executorService.shutdownNow();
6         } else{
7             logger.info("delay task is stopping");
8             if (taskSize() > 0) {
9                 try{
10 lock.lock();
11 condition.await();
12                     stop = true;
13                 } catch(InterruptedException e) {
14                     logger.error("InterruptedException", e);
15                 } finally{
16 lock.unlock();
17 }
18 }
19 executorService.shutdown();
20 }
21 
22     }
时间轮的任务模块实现

上一节说明了时间轮的整体思路和实现。现在讲解时间轮的任务管理,先说明抽象任务类,这个类只是抽象了任务最基本的属性,任务的在时间轮的具体位置,以及时间轮的延时时间:

1 public abstract static class Task extendsThread{
2         //时间轮的索引位置
3         private intindex;
4         //时间轮的圈数
5         private intcycleNum;
6         //时间轮延时时间,到期执行时间
7         private intkey;
8         
9 @Override
10         public voidrun(){
11             
12 }
13         public intgetIndex() {
14             returnindex;
15 }
16         public void setIndex(intindex) {
17             this.index =index;
18 }
19         public intgetCycleNum() {
20             returncycleNum;
21 }
22         public void setCycleNum(intcycleNum) {
23             this.cycleNum =cycleNum;
24 }
25         public intgetKey() {
26             returnkey;
27 }
28         public void setKey(intkey) {
29             this.key =key;
30 }
31         
32         
33     }

知道了任务实现,下面我再和你们说一下任务的增删改查,任务的增加,增加是一个原子操作,所以这里实现了锁ReentrantLock。

1     public intaddTask(Task task){
2         int key=task.getKey();
3         intid;
4         try{
5 lock.lock();
6             //通过key到期时间计算出index位置也就是数组位置
7             int index =mod(key, bufferSize);
8             logger.info("task's key = {},task's index ={}",key,index);
9 task.setIndex(index);
10             //查看这个数组集合之前是否有数据,因为每个数组对应一个set集合所以这里要区分
11             Set<Task> tasks =get(index);
12 
13             if (tasks != null) {
14                 int cycleNum =cycleNum(key, bufferSize);
15 task.setCycleNum(cycleNum);
16 tasks.add(task);
17             } else{
18                 int cycleNum =cycleNum(key, bufferSize);
19 task.setIndex(index);
20 task.setCycleNum(cycleNum);
21                 //如果需要重新建立set集合就要重新增加task外,还要set对应正确的数组位置。
22                 Set<Task> sets = new HashSet<>();
23 sets.add(task);
24 put(key, sets);
25 }
26             //每个任务的唯一id,统一放到hashmap中,为了查找方便,指定取消任务
27             id =taskId.incrementAndGet();
28 taskMap.put(id, task);
29             size++;
30         } finally{
31 lock.unlock();
32 }
33         //启动时间轮
34 start();
35 
36         returnid;
37     }

增加有一个地方需要知道一下,就是按照与运算取模。

1     private int mod(int target, intmod) {
2         //equals target % mod
3         target = target +tick.get();
4         return target & (mod - 1);
5 }
6 
7     private int cycleNum(int target, intmod) {
8         //equals target/mod
9         return target >> Integer.bitCount(mod - 1);
10     }

首先是根据延时时间 (key) 计算出所在的位置,其实就和HashMap一样的取模运算,只不过这里使用了位运算替代了取模,同时效率会高上不少。这样也解释了为什么数组长度一定得是2∧n。其中的cycleNum()自然是用于计算该任务所处的圈数,也是考虑到效率问题,使用位运算替代了除法

任务的取消,任务取消就是用到了hashmap,按照key找到task,然后取消,取消相当于在集合中删除任务,也是需要加锁的,

1     /**
2 * Cancel task by taskId
3 * @paramid unique id through {@link#addTask(Task)}
4 * @return
5      */
6     public boolean cancel(intid) {
7 
8         boolean flag = false;
9         Set<Task> tempTask = new HashSet<>();
10 
11         try{
12 lock.lock();
13             Task task =taskMap.get(id);
14             if (task == null) {
15                 return false;
16 }
17 
18             Set<Task> tasks =get(task.getIndex());
19             for(Task tk : tasks) {
20                 if (tk.getKey() == task.getKey() && tk.getCycleNum() ==task.getCycleNum()) {
21                     size--;
22                     flag = true;
23                 } else{
24 tempTask.add(tk);
25 }
26 
27 }
28             //update origin data
29             ringBuffer[task.getIndex()] =tempTask;
30         } finally{
31 lock.unlock();
32 }
33 
34         returnflag;
35     }
时间轮的指针触发任务实现

触发任务是一个单独的线程,这个是时间轮的指针,是时间轮的核心。

1     private  class TriggerJob implementsRunnable{
2 @Override
3         public voidrun(){
4             int index=0;
5             while(!stop){
6                 try{
7                     //取出指定位置的集合,
8                     Set<Task> tasks=remove(index);
9                     for(Task task:tasks){
10                         //这个就是真正执行定时任务了
11 executorService.submit(task);
12 }
13                     //一个轮询
14                     if(++index>bufferSize-1){
15                         index=0;
16 }
17                     //Total tick number of records
18 tick.incrementAndGet();
19                     TimeUnit.SECONDS.sleep(1);
20                 }catch(Exception e){
21                     logger.error("Exception", e);
22 }
23 }
24             logger.info("delay task is stopped");
25 }
26     }

这里的remove方法需要注意,这个就是按照索引取出指定数组位置的set集合。

1     private Set<Task> remove(intkey) {
2         Set<Task> tempTask = new HashSet<>();
3         Set<Task> result = new HashSet<>();
4 
5         Set<Task> tasks = (Set<Task>) ringBuffer[key];
6         if (tasks == null) {
7             returnresult;
8 }
9 
10         for(Task task : tasks) {
11             if (task.getCycleNum() == 0) {
12 result.add(task);
13 
14 size2Notify();
15             } else{
16                 //decrement 1 cycle number and update origin data
17                 task.setCycleNum(task.getCycleNum() - 1);
18 tempTask.add(task);
19 }
20 }
21 
22         //update origin data
23         ringBuffer[key] =tempTask;
24 
25         returnresult;
26     }

其中的size2Notify()倒是值得说一下,他是用于在停止任务时,主线程等待所有延时任务执行完毕的唤醒条件。这类用法几乎是所有线程间通信的常规套路,值得收入技能包。

1     private voidsize2Notify() {
2         try{
3 lock.lock();
4             size--;
5             if (size == 0) {
6 condition.signal();
7 }
8         } finally{
9 lock.unlock();
10 }
11     }

简单说来就是,上文的stop时间轮中条件队列锁阻塞,这里就是唤醒所有的线程,真正的stop,因为没有任务了。

总结

看了kafka的时间轮,高大尚无比,这次也是按照晚上不错的时间轮自己实现,觉得对自己的代码和开发思路多有补益,感谢网络大神的帮助:

github源码:https://github.com/Rhett-wang-888/Java-Algorithm/blob/master/src/main/java/util/RhettBufferWheel.java

https://crossoverjie.top/2019/09/27/algorithm/time%20wheel/#more

免责声明:文章转载自《kafka时间轮简易实现(二)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇go安装goctlRZ、NRZ、NRZI、曼彻斯特编码下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

kafka学习指南(总结版)

版本介绍   目前最新版本为2.3(20190808更新)。   从使用上来看,以0.9为分界线,0.9开始不再区分高级(相当于mysql binlog的GTID,只需要跟topic打交道,服务器自动管理偏移量和负载均衡)/低级消费者API(相当于mysql binlog的文件+position,直接和分区以及偏移量打交道)。   从兼容性上来看,以0.8...

RocketMQ学习分享

消息队列的流派   什么是 MQ Message Queue(MQ),消息队列中间件。很多人都说:MQ 通过将消息的发送和接收分离来实现应用程序的异步和解偶,这个给人的直觉是——MQ 是异步的,用来解耦的,但是这个只是 MQ 的效果而不是目的。MQ 真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模...

kafka性能测试

一.硬件配置 3台服务器配置如下: CPU: 2物理CPU,12核/CPU ,  48 processor Intel(R) Xeon(R) Silver 4116 CPU @ 2.10GHz 内存: 128GB 硬盘: 480GB*1 SSD盘(OS)+6TB*7 SAS盘 Broker节点数: 3个 网络:10GE 二.测试方案 2.1 测试...

kafka消息的分发与消费

关于 Topic 和 Partition:   Topic: 在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有一个类别。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。   Partition...

spring kafka消费者配置介绍----ackMode

当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式; ackMode有以下7种值: public enum AckMode { // 当每一条记录被消费者监听器(ListenerConsumer)处理...

【干货】Kafka 事务特性分析

特性背景 消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。这个特性在0.10.2的版本是不支持的,从0.11版本开始才支持。华为云DMS率先提供Kafka 1.1.0的专享版服务,支持消息事务特性。       支持事务消息有什么作用?消息事务是实现分布式事务的一种方案,可以确保分布式场景下的数据最终一致性。例如最常用的转账...