如何在Java应用中提交Spark任务?

摘要:
我丈夫是一个用户定义的ID,作为参数传递给Spark应用程序;Spark初始化后,可以通过SparkContext_ ID和URL通过驱动程序连接到数据库,新版本关联关系的插入归因于互联网时代的信息爆炸。我看到了群友的聊天,了解了SparkLauncher。经过调查,我发现它可以基于Java代码自动提交Spark任务。因为SparkLauncher的类引用了SparkL

最近看到有几个Github友关注了Streaming的监控工程——Teddy,所以思来想去还是优化下代码,不能让别人看笑话啊。于是就想改一下之前觉得最丑陋的一个地方——任务提交。

本博客内容基于Spark2.2版本~在阅读文章并想实际操作前,请确保你有:

  1. 一台配置好Spark和yarn的服务器
  2. 支持正常spark-submit --master yarn xxxx 的任务提交

老版本

老版本任务提交是基于 ** 启动本地进程,执行脚本spark-submit xxx ** 的方式做的。其中一个关键的问题就是获得提交Spark任务的Application-id,因为这个id是跟任务状态的跟踪有关系的。如果你的资源管理框架用的是yarn,应该知道每个运行的任务都有一个applicaiton_id,这个id的生成规则是:

appplication_时间戳_数字

老版本的spark通过修改SparkConf参数spark.app.id就可以手动指定id,新版本的代码是直接读取的taskBackend中的applicationId()方法,这个方法具体的实现是根据实现类来定的。在yarn中,是通过Yarn的YarnClusterSchedulerBackend实现的,具体的实现逻辑可以参考对应的链接。

感兴趣的同学可以看一下,生成applicaiton_id的逻辑在hadoop-yarn工程的ContainerId中定义。

总结一句话就是,想要自定义id,甭想了!!!!

于是当时脑袋瓜不灵光的我,就想到那就等应用创建好了之后,直接写到数据库里面呗。怎么写呢?

  1. 我事先生成一个自定义的id,当做参数传递到spark应用里面;
  2. 等spark初始化后,就可以通过sparkContext取得对应的application_id以及url
  3. 然后再driver连接数据库,插入一条关联关系

如何在Java应用中提交Spark任务?第1张

新版本

还是归结于互联网时代的信息大爆炸,我看到群友的聊天,知道了SparkLauncer这个东西,调查后发现他可以基于Java代码自动提交Spark任务。SparkLauncher支持两种模式:

  1. new SparkLauncher().launch() 直接启动一个Process,效果跟以前一样
  2. new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器

当然是更倾向于第二种啦,因为好处很多:

  1. 自带输出重定向(Output,Error都有,支持写到文件里面),超级爽的功能
  2. 可以自定义监听器,当信息或者状态变更时,都能进行操作(对我没啥用)
  3. 返回的SparkAppHandler支持 暂停、停止、断连、获得AppId、获得State等多种功能,我就想要这个!!!!

如何在Java应用中提交Spark任务?第2张

一步一步,代码展示

首先创建一个最基本的Spark程序:

import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;

public class HelloWorld {
    public static void main(String[] args) throws InterruptedException {
        SparkSession spark = SparkSession
                .builder()
                //.master("yarn")
                //.appName("hello-wrold")
                //.config("spark.some.config.option", "some-value")
                .getOrCreate();

        List<Person> persons = new ArrayList<>();

        persons.add(new Person("zhangsan", 22, "male"));
        persons.add(new Person("lisi", 25, "male"));
        persons.add(new Person("wangwu", 23, "female"));


        spark.createDataFrame(persons, Person.class).show(false);

        spark.close();

    }
}

然后创建SparkLauncher类:

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;

public class Launcher {
    public static void main(String[] args) throws IOException {
        SparkAppHandle handler = new SparkLauncher()
                .setAppName("hello-world")
                .setSparkHome(args[0])
                .setMaster(args[1])
                .setConf("spark.driver.memory", "2g")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.cores", "3")
                .setAppResource("/home/xinghailong/launcher/launcher_test.jar")
                .setMainClass("HelloWorld")
                .addAppArgs("I come from Launcher")
                .setDeployMode("cluster")
                .startApplication(new SparkAppHandle.Listener(){
                    @Override
                    public void stateChanged(SparkAppHandle handle) {
                        System.out.println("**********  state  changed  **********");
                    }

                    @Override
                    public void infoChanged(SparkAppHandle handle) {
                        System.out.println("**********  info  changed  **********");
                    }
                });


        while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){
            System.out.println("id    "+handler.getAppId());
            System.out.println("state "+handler.getState());

            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

然后打包工程,打包过程可以参考之前的博客:
http://www.cnblogs.com/xing901022/p/7891867.html

打包完成后上传到部署Spark的服务器上。由于SparkLauncher所在的类引用了SparkLauncher,所以还需要把这个jar也上传到服务器上。

[xinghailong@hnode10 launcher]$ ls
launcher_test.jar  spark-launcher_2.11-2.2.0.jar
[xinghailong@hnode10 launcher]$ pwd
/home/xinghailong/launcher

由于SparkLauncher需要指定SPARK_HOME,因此如果你的机器可以执行spark-submit,那么就看一下spark-submit里面,SPARK_HOME是在哪

[xinghailong@hnode10 launcher]$ which spark2-submit
/var/lib/hadoop-hdfs/bin/spark2-submit

最后几行就能看到:

export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

综上,我们需要的是:

  1. 一个自定义的Jar,里面包含spark应用和SparkLauncher类
  2. 一个SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根据你自己的来就行
  3. 一个当前目录的路径
  4. 一个SARK_HOME环境变量指定的目录

然后执行命令启动测试:

java -Djava.ext.dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn

说明:

  1. -Djava.ext.dirs 设置当前目录为java类加载的目录
  2. 传入两个参数,一个是SPARK_HOME;一个是启动模式

观察删除发现成功启动运行了:

id    null
state UNKNOWN
Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
**********  state  changed  **********
...省略一大堆拷贝jar的日志
**********  info  changed  **********
**********  state  changed  **********
Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED)
... 省略一堆重定向的日志
application_1518263195995_37615 (state: ACCEPTED)
id    application_1518263195995_37615
state SUBMITTED
Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING)
**********  state  changed  **********
... 省略一堆重定向的日志
INFO: 	 user: hdfs
**********  state  changed  **********
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701

总结

这样就实现了基于Java应用提交Spark任务,并获得其Appliation_id和状态进行定位跟踪的需求了。

免责声明:文章转载自《如何在Java应用中提交Spark任务?》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Jboss前端er们如何最快开发h5移动端页面?下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

sparkSQL以JDBC为数据源

一、环境准备 安装oracle后,创建测试表、数据: createtabletest( usernamevarchar2(32)primarykey, passwordvarchar2(32) ); insertintotestvalues('John','1234'); insertintotestvalues('Mike'...

Hadoop2.7.3集群安装scala-2.12.8 和spark2.7

Apache Spark™是用于大规模数据处理的统一分析引擎。 从右侧最后一条新闻看,Spark也用于AI人工智能 spark是一个实现快速通用的集群计算平台。它是由加州大学伯克利分校AMP实验室 开发的通用内存并行计算框架,用来构建大型的、低延迟的数据分析应用程序。它扩展了广泛使用的MapReduce计算 模型。高效的支撑更多计算模式,包括交互式查询和...

launcher- 第三方应用图标替换

有时候我们感觉第三方应用的icon不美观,或者跟我们主题风格不一致,这时候我们希望换成我们想要的icon,那我们可以这么做(以更换QQ应用icon为例): 1.首先我们当然要根据自己的需要做一张替换icon了(图片我们不妨命名为qq) 2.接下来我们需要得到第三方应用的信息,可以通过GetDftlayoutXml.apk 工具获得 具体步骤如下 1)网上...

Flink实战(103):配置(二)参数配置和常见参数调优

来源: 1 Flink 1.1 Flink参数配置 jobmanger.rpc.address jm的地址。 jobmanager.rpc.port jm的端口号。 jobmanager.heap.mb jm的堆内存大小。不建议配的太大,1-2G足够。 taskmanager.heap.mb tm的堆内存大小。大小视任务量而定。需要存储任务的中间值,网络...

Spark History Server配置使用

Spark history Server产生背景 以standalone运行模式为例,在运行Spark Application的时候,Spark会提供一个WEBUI列出应用程序的运行时信息;但该WEBUI随着Application的完成(成功/失败)而关闭,也就是说,Spark Application运行完(成功/失败)后,将无法查看Application...

大数据技术-spark+hive+hbase研究

大数据 spark 研究(0基础入门)一 背景 1 基础 Scala 语言基础:Scala详细总结(精辟版++) spark 介绍    :  spark介绍     二 环境 1 部署spark   <![if !supportLists]>1、<![endif]>环境准备(1)配套软件版本要求: Java 6+  Python...