spark提交任务的三种的方法
spark提交任务的三种的方法在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有三种:1、使用spark 自带的spark-submit工具提交任务通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:./spark-submit --class com.learn.spark.Simpl
spark提交任务的三种的方法
在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有三种:
1、使用spark 自带的spark-submit工具提交任务
通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
参数含义就不解释了,请参考官网资料。
2、通过JAVA API编程的方式提交有两种方式
提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
根据官网的示例,通过JAVA API编程的方式提交有两种方式:
2.1、方式一:new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器
调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:
package com.xxx.utils;
/**
* @author yyz
* @class LanuncherAppV
* @date 2021/04/22 15:27
* 第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,
* 所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:
* 注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
**/
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
public class LanuncherAppV {
private static Log log = LogFactory.getLog(LanuncherAppV.class);
public static void main(String[] args) throws IOException, InterruptedException {
HashMap env = new HashMap();
//这两个属性必须设置
env.put("HADOOP_CONF_DIR", "/opt/soft/client/hadoop/xxx/etc/hadoop");
env.put("JAVA_HOME", "/opt/soft/jdk");
//可以不设置
//env.put("YARN_CONF_DIR","");
log.info("init spark env complete");
CountDownLatch countDownLatch = new CountDownLatch(10);
//这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在
SparkAppHandle handler = new SparkLauncher(env)
.setSparkHome("/opt/soft/client/spark_install_home")
.setAppResource("/opt/soft/client/spark/xjprc-hadoop-spark2.3/spark_install_home/examples/jars/spark-examples_xxxxx.jar")
.setMainClass("org.apache.spark.examples.SparkPi")
.setMaster("local")
.setAppName("LanuncherAppV_yyz")
// .setMaster("yarn")
// .setDeployMode("cluster")
// .setConf("spark.app.id", "")
// .setConf("spark.driver.memory", "2g")
// .setConf("spark.akka.frameSize", "")
// .setConf("spark.executor.memory", "1g")
// .setConf("spark.executor.instances", "")
// .setConf("spark.executor.cores", "")
// .setConf("spark.default.parallelism", "")
// .setConf("spark.driver.allowMultipleContexts", "true")
.setVerbose(true).startApplication(new SparkAppHandle.Listener() {
//这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
if (sparkAppHandle.getState().isFinal()) {
countDownLatch.countDown();
}
System.out.println("state:" + sparkAppHandle.getState().toString());
System.out.println("AppId " + sparkAppHandle.getAppId());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
System.out.println("Info:" + sparkAppHandle.getState().toString());
System.out.println("AppId " + sparkAppHandle.getAppId());
}
});
log.info("start spark SparkLauncher ……");
System.out.println("The task is executing, please wait ....");
//线程等待任务结束
countDownLatch.await();
System.out.println("The task is finished!");
log.info("finish spark SparkLauncher task");
}
}
注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
调用命令如下:
[work@hadoop01 testSparkLanuncher]$ java -Djava.ext.dirs=/home/work/xxx/project/testSparkLanuncher -cp TestJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar:spark-launcher_2.11-2.3.4.jar com.xxx.utils.LanunchAppV
或者
[work@hadoop01 testSparkLanuncher]$ java -classpath /home/work/xxx/project/testSparkLanuncher/TestJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/work/xxx/project/testSparkLanuncher/spark-launcher_2.11-2.3.4.jar com.xxx.utils.LanunchAppV
2.2、方式二:new SparkLauncher().launch() 直接启动一个Process
通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:
package com.xxx.utils;
/**
* @author yyz
* @class LauncherApp
* @date 2021/04/23 10:30
* 通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,需要自定义InputStreamReaderRunnable类实现
* 好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:
**/
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.HashMap;
public class LauncherApp {
public static void main(String[] args) throws IOException, InterruptedException {
HashMap env = new HashMap();
//这两个属性必须设置
env.put("HADOOP_CONF_DIR", "/opt/soft/client/hadoop/xxx/etc/hadoop");
env.put("JAVA_HOME", "/opt/soft/jdk");
//env.put("YARN_CONF_DIR","");
SparkLauncher handle = new SparkLauncher(env)
.setSparkHome("/opt/soft/client/spark_install_home")
.setAppResource("/opt/soft/client/spark/xjprc-hadoop-spark2.3/spark_install_home/examples/jars/spark-examples_xxxxx.jar")
.setMainClass("org.apache.spark.examples.SparkPi")
.setMaster("local")
.setAppName("LauncherApp_yyz")
// .setMaster("yarn")
// .setDeployMode("cluster")
// .setConf("spark.app.id", "")
// .setConf("spark.driver.memory", "2g")
// .setConf("spark.akka.frameSize", "")
// .setConf("spark.executor.memory", "1g")
// .setConf("spark.executor.instances", "")
// .setConf("spark.executor.cores", "")
// .setConf("spark.default.parallelism", "")
// .setConf("spark.driver.allowMultipleContexts","true")
.setVerbose(true);
Process process = handle.launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
int exitCode = process.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
}
}
使用的自定义InputStreamReaderRunnable类实现如下:
package com.xxx.utils;
/**
* @author yyz
* @class InputStreamReaderRunnable
* @date 2021/04/23 10:31
* 使用的自定义InputStreamReaderRunnable类实现如下:
**/
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
调度方式:
[work@hadoop01 testSparkLanuncher]$ java -Djava.ext.dirs=/home/work/xxx/project/testSparkLanuncher -cp TestJavaSpark-1.0-SNAPSHOT.jar:spark-launcher_2.11-2.3.4.jar com.xxx.utils.LauncherApp
或者
[work@hadoop01 testSparkLanuncher]$ java -classpath /home/work/xxx/project/testSparkLanuncher/TestJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/work/xxx/project/testSparkLanuncher/spark-launcher_2.11-2.3.4.jar com.xxx.utils.LauncherApp
2.3、总结
老版本
老版本任务提交是基于启动本地进程,执行脚本spark-submit xxx
** 的方式做的。其中一个关键的问题就是获得提交Spark任务的Application-id,因为这个id是跟任务状态的跟踪有关系的。如果你的资源管理框架用的是yarn,应该知道每个运行的任务都有一个applicaiton_id,这个id的生成规则是:
appplication_时间戳_数字
老版本的spark通过修改SparkConf参数spark.app.id
就可以手动指定id,新版本的代码是直接读取的taskBackend中的applicationId()方法,这个方法具体的实现是根据实现类来定的。在yarn中,是通过Yarn的YarnClusterSchedulerBackend实现的。
感兴趣的同学可以看一下,生成applicaiton_id的逻辑在hadoop-yarn工程的ContainerId中定义。
总结一句话就是,想要自定义id,甭想了!!!!
于是当时脑袋瓜不灵光的我,就想到那就等应用创建好了之后,直接写到数据库里面呗。怎么写呢?
- 我事先生成一个自定义的id,当做参数传递到spark应用里面;
- 等spark初始化后,就可以通过sparkContext取得对应的application_id以及url
- 然后再driver连接数据库,插入一条关联关系
新版本
还是归结于互联网时代的信息大爆炸,我看到群友的聊天,知道了SparkLauncer这个东西,调查后发现他可以基于Java代码自动提交Spark任务。SparkLauncher支持两种模式:
- new SparkLauncher().launch() 直接启动一个Process,效果跟以前一样
- new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器
当然是更倾向于第二种啦,因为好处很多:
- 自带输出重定向(Output,Error都有,支持写到文件里面),超级爽的功能
- 可以自定义监听器,当信息或者状态变更时,都能进行操作(对我没啥用)
- 返回的SparkAppHandler支持 暂停、停止、断连、获得AppId、获得State等多种功能,我就想要这个!!!!
2.4、我的代码示例:
package com.xxx.utils;
/**
* @author yyz
* @class Person
* @date 2021/04/23 17:52
**/
public class Person{
private String name;
private int age;
private String sex;
public Person(String name, int age, String sex) {
this.name = name;
this.age = age;
this.sex = sex;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
}
package com.xxx.utils;
/**
* @author yyz
* @class HelloWorld
* @date 2021/04/23 17:07
**/
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
public class HelloWorld {
private static Log log = LogFactory.getLog(HelloWorld.class);
public static void main(String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder().master("local[2]")
.appName("HelloWorld_from_yyz")
.config("spark.sql.warehouse.dir", "/tmp")
.enableHiveSupport()
.getOrCreate();
List<Person> persons = new ArrayList<>();
persons.add(new Person("zhangsan", 22, "male"));
persons.add(new Person("lisi", 25, "male"));
persons.add(new Person("wangwu", 23, "female"));
Dataset ds= spark.createDataFrame(persons, Person.class);
ds.show(false);
log.info("数据总条数为:"+ds.count());
spark.close();
}
}
package com.xxx.utils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
/**
* @author yyz
* @class Launcher
* @date 2021/04/23 17:13
**/
public class Launcher {
private static Log log = LogFactory.getLog(Launcher.class);
public static void main(String[] args) throws IOException {
SparkAppHandle handler = new SparkLauncher()
.setAppName("hello-world")
// .setSparkHome(args[0])
.setSparkHome("/opt/soft/client/spark_install_home")
.setMaster(args[0])
// .setDeployMode("client")
.setConf("spark.yarn.job.owners",args[1])
.setConf("spark.driver.memory", "2g")
.setConf("spark.executor.memory", "1g")
.setConf("spark.executor.cores", "3")
.setAppResource("/home/work/xxx/project/testSparkLanuncher/TestJavaSpark-1.0-SNAPSHOT.jar")
//此处应写类的全限定名
.setMainClass("com.xxx.utils.HelloWorld")
.addAppArgs("I come from Launcher")
.startApplication(new SparkAppHandle.Listener(){
@Override
public void stateChanged(SparkAppHandle handle) {
System.out.println(handle.getAppId()+": ********** state changed **********: "+handle.getState().toString());
log.info(handle.getAppId()+": ********** state changed **********: "+handle.getState().toString());
}
@Override
public void infoChanged(SparkAppHandle handle) {
System.out.println(handle.getAppId()+": ********** info changed **********: "+handle.getState().toString());
log.info(handle.getAppId()+": ********** info changed **********: "+handle.getState().toString());
}
});
while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){
System.out.println("id "+handler.getAppId());
System.out.println("state "+handler.getState());
System.out.println(handler.getAppId()+": ********** info changed **********: "+handler.getState().toString());
log.info(handler.getAppId()+": ********** info changed **********: "+handler.getState().toString());
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
打包完成后上传到部署Spark的服务器上。由于Spark Launcher所在的类引用了SparkLauncher,所以还需要把这个jar也上传到服务器上。
综上,我们需要的是:
-
一个自定义的Jar,里面包含Spark应用和SparkLauncher类
-
一个SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根据你自己的来就行
-
一个当前目录的路径
-
一个SARK_HOME环境变量指定的目录
然后执行命令启动测试:
[work@hadoop01 testSparkLanuncher]$ java -Djava.ext.dirs=/home/work/xxx/project/testSparkLanuncher -cp TestJavaSpark-1.0-SNAPSHOT.jar:spark-launcher_2.11-2.3.4.jar com.xxx.utils.Launcher local test_onwer
或者
[work@hadoop01 testSparkLanuncher]$ java -classpath /home/work/xxx/project/testSparkLanuncher/TestJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/work/xxx/project/testSparkLanuncher/spark-launcher_2.11-2.3.4.jar com.xxx.utils.Launcher local test_onwer
说明:
-
-Djava.ext.dirs
设置当前目录为java类加载的目录 -
传入两个参数,一个是启动模式,一个是 程序owner
观察发现成功启动运行了:
[work@hadoop01 testSparkLanuncher]$ java -Djava.ext.dirs=/home/work/xxx/project/testSparkLanuncher -cp TestJavaSpark-1.0-SNAPSHOT.jar:spark-launcher_2.11-2.3.4.jar com.xxx.utils.Launcher local test_onwer
id null
state UNKNOWN
null: ********** info changed **********: UNKNOWN
2021-04-25 15:18:41,927 INFO Launcher:51 - null: ********** info changed **********: UNKNOWN
Apr 25, 2021 3:18:42 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
Apr 25, 2021 3:18:42 PM org.apache.spark.launcher.OutputRedirector redirect
……
INFO: 2021-04-25 15:18:43,834 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
……
INFO: 2021-04-25 15:18:43,943 INFO scheduler.FIFOSchedulableBuilder: Adding pool poolName:system_reserve maxSize:0 schedulingMode:FIFO maxConcurrency:2147483647
Apr 25, 2021 3:18:43 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:18:43,944 INFO scheduler.FIFOSchedulableBuilder: Adding pool poolName:user maxSize:2147483645 schedulingMode:FIFO maxConcurrency:2147483647
null: ********** state changed **********: CONNECTED
2021-04-25 15:18:43,945 INFO Launcher:35 - null: ********** state changed **********: CONNECTED
Apr 25, 2021 3:18:43 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:18:43,956 INFO executor.Executor: Starting executor ID driver on host localhost
local-1619335123924: ********** info changed **********: CONNECTED
2021-04-25 15:18:43,993 INFO Launcher:41 - local-1619335123924: ********** info changed **********: CONNECTED
local-1619335123924: ********** state changed **********: RUNNING
2021-04-25 15:18:43,995 INFO Launcher:35 - local-1619335123924: ********** state changed **********: RUNNING
……
Apr 25, 2021 3:19:00 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: +---+--------+------+
INFO: |age|name |sex |
INFO: +---+--------+------+
INFO: |22 |zhangsan|male |
INFO: |25 |lisi |male |
INFO: |23 |wangwu |female|
INFO: +---+--------+------+
INFO:Apr 25, 2021 3:19:00 PM org.apache.spark.launcher.OutputRedirector redirect
……
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,116 INFO utils.HelloWorld: 数据总条数为:3
INFO: 2021-04-25 15:19:01,122 INFO status.AppStatusListener: Write local-1619335123924 with attempts: success...
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
……
local-1619335123924: ********** state changed **********: FINISHED
2021-04-25 15:19:01,160 INFO Launcher:35 - local-1619335123924: ********** state changed **********: FINISHED
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,164 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,171 INFO memory.MemoryStore: MemoryStore cleared
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,171 INFO storage.BlockManager: BlockManager stopped
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,172 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,174 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,177 INFO spark.SparkContext: Successfully stopped SparkContext
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,179 INFO util.ShutdownHookManager: Shutdown hook called
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,180 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-be25cfa7-9b93-4214-b6a4-ad81d3d4122b
Apr 25, 2021 3:19:01 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 2021-04-25 15:19:01,180 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-4c8ab12b-5835-4484-a63a-3d010a0e2559
这样就实现了基于Java应用提交Spark任务,并获得其Appliation_id和状态进行定位跟踪的需求了。
3、通过yarn的rest api的方式提交
第三种方式是通过yarn的rest api的方式提交(不太常用但在这里也介绍一下):
Post请求示例: * http://<rm http address:port>/ws/v1/cluster/apps
请求所带的参数列表:
Item | Data Type | Description |
---|---|---|
application-id | string | The application id |
application-name | string | The application name |
queue | string | The name of the queue to which the application should be submitted |
priority | int | The priority of the application |
am-container-spec | object | The application master container launch context, described below |
unmanaged-AM | boolean | Is the application using an unmanaged application master |
max-app-attempts | int | The max number of attempts for this application |
resource | object | The resources the application master requires, described below |
application-type | string | The application type(MapReduce, Pig, Hive, etc) |
keep-containers-across-application-attempts | boolean | Should YARN keep the containers used by this application instead of destroying them |
application-tags | object | List of application tags, please see the request examples on how to speciy the tags |
log-aggregation-context | object | Represents all of the information needed by the NodeManager to handle the logs for this application |
attempt-failures-validity-interval | long | The failure number will no take attempt failures which happen out of the validityInterval into failure count |
reservation-id | string | Represent the unique id of the corresponding reserved resource allocation in the scheduler |
am-black-listing-requests | object | Contains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold” |
参考:https://www.cnblogs.com/itboys/p/9998666.html
https://www.cnblogs.com/itboys/p/9958933.html
更多推荐
所有评论(0)