因项目需要,以http请求调用spark api,并以集群模式运行。(因踩坑掉了几根头发,所以特此记录。。。)
一、项目测试环境
1、jdk1.8
2、spark 2.3.1
3、idea、win10、centos7
我是win10,在idea上启动的项目,然后起了3个虚拟机
二、创建并配置SpringBoot项目
1、创建springboot项目(略)。
2、添加pom依赖,我所使用的所有依赖,见文章末尾附1。
3、在application.yml中添加spark配置信息。
spark:
app:
name: yl
home: 127.0.0.1
master:
uri: spark://host03:7077
driver:
memory: 2g
worker:
memory: 2g
executor:
memory: 1g
rpc:
message:
maxSize: 1024
4、添加spark配置文件,将sparksession交由spring容器管理。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.env.Environment;
@Configuration
public class SparkConfig {
@Autowired
private Environment env;
@Value("${spark.app.name}")
private String appName;
@Value("${spark.home}")
private String sparkHome;
@Value("${spark.master.uri}")
private String sparkMasterUri;
@Value("${spark.driver.memory}")
private String sparkDriverMemory;
@Value("${spark.worker.memory}")
private String sparkWorkerMemory;
@Value("${spark.executor.memory}")
private String sparkExecutorMemory;
@Value("${spark.rpc.message.maxSize}")
private String sparkRpcMessageMaxSize;
@Bean
public SparkConf sparkConf() {
SparkConf sparkConf = new SparkConf()
.setAppName(appName)
.setMaster(sparkMasterUri)
.set("spark.driver.memory",sparkDriverMemory)
.set("spark.worker.memory",sparkWorkerMemory) //"26g"
.set("spark.executor.memory",sparkExecutorMemory)
.set("spark.rpc.message.maxSize",sparkRpcMessageMaxSize);
// .set("spark.shuffle.memoryFraction","0") //默认0.2
// .setMaster("local[*]");
return sparkConf;
}
@Bean
@ConditionalOnMissingBean(JavaSparkContext.class)
public JavaSparkContext javaSparkContext(){
return new JavaSparkContext(sparkConf());
}
@Bean
public SparkSession sparkSession(){
return SparkSession
.builder()
.sparkContext(javaSparkContext().sc())
.appName(appName)
.getOrCreate();
}
@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer(){
return new PropertySourcesPlaceholderConfigurer();
}
}
5、之后就可以使用注入方式使用sparksession了。
@Autowired
private SparkSession sparkSession;
6、因为我是java和scala混编,所以需要创建一个scala目录,并设置该目录为root,mark directory as ---》 source root,下图为我的目录结构:
7、创建scala Object,在这里我读取的mysql中的数据。
object AnalysisADR {
def analysis(sparkSession:SparkSession) = {
val spark = sparkSession
val url = "jdbc:mysql://localhost:3306/yl"
val drugTable = "drug"
val reactionTable = "reaction"
val prop = new java.util.Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
prop.put("user", "root")
prop.put("password", "root")
val drug = spark.read.jdbc(url, drugTable, prop).show()
}
}
8、之后在java代码中调用scala的方法并传入sparksession。
AnalysisADR.analysis(sparkSession);
三、在虚拟机上配置spark
1、解压spark安装包之后,进入conf目录。
2、修改spark-env.sh
SPARK_LOCAL_DIRS=/home/software/spark/tmp
SPARK_MASTER_HOST=host01
SPARK_MASTER_PORT=7077
export JAVA_HOME=/home/software/jdk1.8
3、修改spark-defaults.conf
spark.master spark://host01:7077
4、修改slaves
host01
host02
host03
5、把配置文件复制到host02和host03,并在host01上启动spark,进入spark/sbin目录执行sh start-all.sh。
之后可以登录http://host01:8080查看spark集群状态。
四、启动项目
试一把。。。木有报错啊哈哈哈哈哈哈哈哈哈哈~
但是spark程序并没有执行,之后去看spark ui,显示exector一直启动后关闭。。。在spark ui上查看日志,原来是主机名无法解析,也就是虚拟机上没有配置win10的ip,所以要在/etc/profile中加入自己主机的ip和域名,并关闭win的防火墙。
再次执行,成功!!!
以下是在虚拟机上的tomcat中发布项目,并执行spark任务成功的截图:
四、编译项目,因为是java和scala混编,所以编译时需要先编译scala,再编译java,
执行命令:
mvn clean scala:compile compile package
附1:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/>
</parent>
<properties>
<java.version>1.8</java.version>
<druid.version>1.1.9</druid.version>
<mybatis.version>1.2.0</mybatis.version>
<spark.version>2.3.1</spark.version>
<spring.boot.version>2.1.6.RELEASE</spring.boot.version>
<swagger.version>2.9.2</swagger.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.version}</version>
</dependency>
<!-- 使用外置tomcat -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<version>${spring.boot.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 因为我们需要使用Scala代码,所以我们还需要加入scala-library依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.databricks/spark-xml -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_2.11</artifactId>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
<version>${spring.boot.version}</version>
</dependency>
<!-- swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.7</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.7</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
更多推荐