曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?

摘要:
前言在大家的项目中,想必都有那种,启动时候要去其他服务拉一些数据的情况,如果我们启动时,其他服务没启动,按岂不是就起不来了吗,如果这段拉数据的代码,并不是核心业务,那你这就有点说不过去了:不能因为对方没启动,我们也不能启动吧?
曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了? 前言

在大家的项目中,想必都有那种,启动时候要去其他服务拉一些数据的情况,如果我们启动时,其他服务没启动,按岂不是就起不来了吗,如果这段拉数据的代码,并不是核心业务,那你这就有点说不过去了:不能因为对方没启动,我们也不能启动吧?

经过一些思考后,我觉得可以这样,启动的时候:

  • 启动一个定时的线程池,让它去执行拉数据的任务,如果任务执行失败,会过一段时间后再次执行
  • 我们希望,一旦某一次执行任务,成功后,就不要再去拉数据了,浪费网络流量和cpu

我这边可以大概就大家演示下。

示例代码

服务端

随便写了个spring boot服务端,监听本机8082端口。模拟第三方服务

@RestController
@Slf4j
public class BusinessController {


    @GetMapping("/")
    public String test() {
        return "success";
    }
}

@SpringBootApplication
@Slf4j
public class WebDemoApplicationServer {

	public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(WebDemoApplicationServer.class, args);
    }

}

客户端

客户端程序,依赖第三方服务,启动时,要去上面的服务端拉数据。

代码和上面差不多,唯一是在启动时,会执行以下逻辑:

@Component
public class InitRunner implements  CommandLineRunner{
   private static final Logger log = LoggerFactory.getLogger(InitRunner.class);

    @Autowired
    private RestTemplate restTemplate;

    @Override
    public void run(String... args) throws Exception {
        ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
        String s = entity.toString();
        log.info("get data:{}",s);
    }
}

在上面的服务没启动的时候,这个客户端是起不来的。

曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?第1张

怎么解决呢,很简单。

方案1
public class InitRunnerV2 implements CommandLineRunner {

    @Autowired
    private RestTemplate restTemplate;
	
  	// 1
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
            new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys"));

    @Override
    public void run(String... args)  {
      	//2 
        TestTask task = new TestTask(restTemplate);
      	//3 
        ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                        0, 10, TimeUnit.SECONDS);
      	// 4
        task.setScheduledFuture(scheduledFuture);
    }


}
  • 1处,new了一个线程池,ScheduledThreadPoolExecutor类型,可周期执行某个任务

  • 2处,new了一个任务,这个任务会执行我们的拉数据逻辑。

    这个任务的代码如下:

    @Slf4j
    public class TestTask implements Runnable{
        private RestTemplate restTemplate;
    
        private volatile ScheduledFuture<?> scheduledFuture;
    
        public TestTask(RestTemplate restTemplate) {
            this.restTemplate = restTemplate;
        }
    
        ...
    
        public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
            this.scheduledFuture = scheduledFuture;
        }
    }
    

    其实很简单,就是定义了2个字段,一个是RestTemplate,请求数据时要用;另一个是ScheduledFuture<?>类型,这个字段在上面的InitRunnerV2代码的第三处被赋值。

  • 3处,让这个任务循环执行,每10s一次。

  • 4处,给task的 ScheduledFuture 赋值,注意的是,在task中,这个字段我们定义为volatile,保证线程可见。

下面是任务代码的剖析:

@Override
    public void run() {
        try {
            ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
            String s = entity.toString();
            log.info("get data:{}",s);
        } catch (Exception e) {
//            log.error("e:{}",e);
            log.error("error");
            return;
        }

        /**
         * 1 有可能任务执行太快,future还没被赋值
         */
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }

    }

唯一有什么要说的,就是1处,如果成功了,我们就会调用scheduledFuture.cancel(true);,这样,这个scheduled 任务就不会继续执行了,也就达到了我们的目的,经济实惠。

到此,代码基本就这样了,详细代码见:

https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/spring-boot-scheduler-future-demo-parent

不成熟方案2

因为上面的方案挺简单实用,但感觉没啥干货,于是我想着是否可以自己来实现一个定制的线程池,把这些事情给自动化了。

希望实现的最终效果如下,给future增加一个回调,需要在任务执行成功时,该回调自动被调用:

public class InitRunnerV3 implements CommandLineRunner {

    @Autowired
    private RestTemplate restTemplate;

    CustomScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
            new CustomScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys"));

    @Override
    public void run(String... args)  {
        // 1
        TestTaskV3 task = new TestTaskV3(restTemplate);
        // 2
        CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                        0, 10, TimeUnit.SECONDS);
        // 3
        scheduledFuture.setCustomFutureCallBack(new CustomFutureCallBack() {

            @Override
            public void onSuccess(CustomScheduledFuture customScheduledFuture) {
                log.info("onSuccess");
                // 4
                customScheduledFuture.cancel(true);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("e:{}",throwable);
            }
        });
    }
  • 1处,执行任务,任务内部如下,去除了设置future的逻辑,和取消的逻辑

    
    @Slf4j
    public class TestTaskV3 implements Runnable{
        private RestTemplate restTemplate;
    
        public TestTaskV3(RestTemplate restTemplate) {
            this.restTemplate = restTemplate;
        }
      
        @Override
        public void run() {
            try {
                ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class);
                String s = entity.toString();
                log.info("get data:{}",s);
            } catch (Exception e) {
    //            log.error("e:{}",e);
                log.error("error");
                throw e;
            }
      
        }
    
    }
    
  • 2处,循环执行任务,这里的scheduled线程池,是我们自定义的,回头再说;获取其返回的future

  • 3处,给future增加回调,在回调中,如果成功,则取消该任务。

                @Override
                public void onSuccess(CustomScheduledFuture customScheduledFuture) {
                    log.info("onSuccess");
                    // 4
                    customScheduledFuture.cancel(true);
                }
    

寻找扩展点

曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?第2张

这里,afterExecute是个空实现,就是留给子线程池扩展用的:

    protected void afterExecute(Runnable r, Throwable t) { }

那我们可以考虑下,要怎么才能实现我们的目标呢,我们要在这个方法内,通过传进来的Runnable r,获取到下面这个future才能实现目的:

        CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task,
                        0, 10, TimeUnit.SECONDS);

获取到future,就能拿到在future上设置的callback对象,就能调用callback,所以,现在问题是,要在传进来的Runnable中,获取到scheduledFuture

所以,我们就得包装一下,传进来的runnable,我们定义了如下的Runnable:

@Data
public class CustomDecoratedRunnable implements Runnable {
    Runnable runnable;

    CustomScheduledFuture customScheduledFuture;

    public CustomDecoratedRunnable(Runnable runnable,CustomScheduledFuture customScheduledFuture) {
        this.runnable = runnable;
        this.customScheduledFuture = customScheduledFuture;
    }

    @Override
    public void run() {
        this.runnable.run();
    }


}

定制线程池

我们具体看看,我们定制的线程池对象,我们的线程池,直接继承了ScheduledThreadPoolExecutor

public class CustomScheduledThreadPoolExecutor<V> extends ScheduledThreadPoolExecutor {

    public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }
  
  	...
}

scheduleAtFixedRate方法,我们进行了重写:

@Override
    public CustomScheduledFuture<V> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        /**
         * 1 
         */
        CustomScheduledFuture customScheduledFuture = new CustomScheduledFuture();
		// 2 将future设置到task中
        CustomDecoratedRunnable customDecoratedRunnable = new CustomDecoratedRunnable(command,customScheduledFuture);
       // 3
        ScheduledFuture<?> scheduledFuture = super.scheduleAtFixedRate(customDecoratedRunnable,
                initialDelay, period, unit);

        /**
         * 4 将返回的future,设置到我们包装过的future
         */
        customScheduledFuture.setScheduledFuture((RunnableScheduledFuture) scheduledFuture);

        return customScheduledFuture;
    }
  • 1处,新建一个自定义的future

  • 2处,将自定义的future,设置到上面说的task中

  • 3处,把包装过的task,丢给线程池

  • 4处,返回一个定制的future,这个future,包装了原有的future,同时,支持设置callback

    public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V> {
        /**
         * 其实是下面这种类型:
         * {@link java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
         *
         */
        RunnableScheduledFuture<V> scheduledFuture;
      
    	// 设置callback时,赋值
        CustomFutureCallBack customFutureCallBack;
    
        Runnable runnable;
    }
    

丢给定制线程池的task

本来,我以为,丢给线程池什么Runnable对象,在afterExecute就能拿到什么样的Runnable对象,结果:

曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?第3张

发现,传进来的,已经被包装过了,应该是为了支持周期执行。

所以,没办法,看起来路被堵死了,通过这个传进来的Runnable,也拿不到我们原始的Runnable。

后边找了半天,找到下面这个点:

#java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        // 1
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
  • 1处,会调用decorateTask来包装task,默认实现,就是如下:

        protected <V> RunnableScheduledFuture<V> decorateTask(
            Runnable runnable, RunnableScheduledFuture<V> task) {
            return task;
        }
    

    这里的task,就是前面那个代码里的 ScheduledFutureTask<Void> sft:

            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            // 1
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    

所以,我们得想办法重载这个方法:

    @Override
    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
        CustomScheduledFuture<V> future = new CustomScheduledFuture<>();
        future.setRunnable(runnable);
        future.setScheduledFuture(task);
        return future;
    }

这里,利用CustomScheduledFuture,封装了task和runnable两个对象。

同时,我们自定义的这个CustomScheduledFuture,也是实现了这个方法的返回值,指定的接口:

@Data
public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V> 


目前为止,经过包装后,在afterExecute处,拿到的Runnable如下:

曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?第4张

afterExecute的逻辑,调用回调

 @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        CustomScheduledFuture future;
        CustomDecoratedRunnable runnable = null;
        if (r instanceof CustomScheduledFuture) {
            future = (CustomScheduledFuture) r;
            // 1
            runnable = (CustomDecoratedRunnable) future.getRunnable();
        }
        // 2
        CustomScheduledFuture customScheduledFuture = runnable.getCustomScheduledFuture();
        // 3
        CustomFutureCallBack customFutureCallBack = customScheduledFuture.getCustomFutureCallBack();
        if (customFutureCallBack != null) {
            if (t != null) {
                customFutureCallBack.onException(t);
            } else {
                // 4
                customFutureCallBack.onSuccess(customScheduledFuture);
            }
        }

    }
  • 1处,获取runnable
  • 2处,根据runnable,获取我们的future
  • 3处,通过future,获取回调
  • 4处,调用回调

效果展示

2020-04-10 09:45:28.068  INFO 14456 --- [           main] No active profile set, falling back to default profiles: default
2020-04-10 09:45:28.822  INFO 14456 --- [           main] Started WebDemoApplication in 1.153 seconds (JVM running for 1.805)
2020-04-10 09:45:36.933 ERROR 14456 --- [init-data-from-third-sys-1-thread-1] error
2020-04-10 09:48:48.975  INFO 14456 --- [init-data-from-third-sys-1-thread-1] onSuccess

可以看到,任务执行失败了,但为啥会调用onSuccess呢;另外,大家可以看到,都是在线程池的线程中执行的。

为啥会error了,还执行success呢,我发现,即使我在task中抛出了异常,但是上层没捕获。

曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?第5张

我猜测,是因为:

public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

这里没有抛出异常,所以,即使实现的runnable中抛了,上层也不管。

具体还要验证。

注意点

另一个点是,执行失败了,等了10s,并没有再次执行,猜测是我的定制task,导致了周期执行的问题。这个待验证和解决。

但,一个简单的回调,我们已经实现了。

总结

大家使用方案1 就可以了;后面的方案,是折腾着玩的。希望对大家有帮助。
全部代码都在:

https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/spring-boot-scheduler-future-demo-parent

免责声明:文章转载自《曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Go语言开发Windows应用洛谷 P1022 计算器的改良下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

一种面向高维数据的集成聚类算法

聚类集成已经成为机器学习的研究热点,它对原始数据集的多个聚类结果进行学习和集成,得到一个能较好地反映数据集内在结构的数据划分。很多学者的研究证明聚类集成能有效地提高聚类结果的准确性、鲁棒性和稳定性。本文提出了一种面向高维数据的聚类集成算法。该方法针对高维数据的特点,先用分层抽样的方法结合信息增益对每个特征簇选择合适数量比较重要的特征的生成新的具代表意义的...

1012.idea关联数据库

(需要idea ulitmate版本) 在代码编辑窗口右侧有 database工具  图标 1: 同步当前的数据库连接。 这个是最重要的操作。配置好连接以后或通过其他工具 操作数据库以后,需要及时同步。  图标 2: 配置当前的连接。  图标 3: 断开当前的连接。  图标 4:显示相应数据库对象的数据  图标 5:编辑修改当前数据库对象 1....

软件架构自学笔记----分享“去哪儿 Hadoop 集群 Federation 数据拷贝优化”

去哪儿 Hadoop 集群 Federation 数据拷贝优化 背景 去哪儿 Hadoop 集群随着去哪儿网的发展一直在优化改进,基本保证了业务数据存储量和计算量爆发式增长下的存储服务质量。然而,随着集群规模的发展,单组 NameNode 组成的集群也到达了新的瓶颈:因为 NameNode 内存使用和元数据量正相关,在 180GB 堆内存配置下,元数据量...

将xml文件数据导入到sql中[原]

        设计数据库的时候为了操作的方便,我们可以选择用xml格式文件来保存我们的数据表结构及其数据。这样的好处是多方面的,设计简单,操作方便,自己可以开发一个设计数据库表的应用程序直接对xml文件进行操作。其实另外一个好处是在最终使用的时候很容易将xml数据导入到任何我们想要的数据库中,不过注明的是这中设计的方法只在于简便,不可能达到其他专门工具的...

postgresql优化数据的批量插入

原文:http://www.cnblogs.com/mchina/archive/2012/08/11/2537393.html 有以下几种方法用于优化数据的批量插入。 1. 关闭自动提交:在批量插入数据时,如果每条数据都被自动提交,当中途出现系统故障时,不仅不能保障本次批量插入的数据一致性,而且由于有多次提交操作的发生,整个插入效率也会受到很大的打击。解...

十步法原则解决数据质量问题

  一、相关概念 1.1 数据质量 数据的一组固有属性满足数据消费者要求的程度。 1)数据固有属性 真实性:即数据是客观世界的真实反映 及时性:即数据是随着变化及时更新的 相关性:即数据是数据消费者关注和需要的 2)高质量数据满足要求(消费者角度) 可得的,当数据消费者需要时能够获取到; 及时的,当需要时,数据获得且是及时更新的; 完整的,数据是完整...