因项目需要,以http请求调用spark api,并以集群模式运行。(因踩坑掉了几根头发,所以特此记录。。。)

一、项目测试环境

1、jdk1.8

2、spark 2.3.1

3、idea、win10、centos7

我是win10,在idea上启动的项目,然后起了3个虚拟机

二、创建并配置SpringBoot项目

1、创建springboot项目(略)。

2、添加pom依赖,我所使用的所有依赖,见文章末尾附1。

3、在application.yml中添加spark配置信息。

spark:
  app:
    name: yl
  home: 127.0.0.1
  master:
    uri: spark://host03:7077
  driver:
    memory: 2g
  worker:
    memory: 2g
  executor:
    memory: 1g
  rpc:
    message:
      maxSize: 1024

4、添加spark配置文件,将sparksession交由spring容器管理。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.env.Environment;

@Configuration
public class SparkConfig {

    @Autowired
    private Environment env;

    @Value("${spark.app.name}")
    private String appName;
    @Value("${spark.home}")
    private String sparkHome;
    @Value("${spark.master.uri}")
    private String sparkMasterUri;
    @Value("${spark.driver.memory}")
    private String sparkDriverMemory;
    @Value("${spark.worker.memory}")
    private String sparkWorkerMemory;
    @Value("${spark.executor.memory}")
    private String sparkExecutorMemory;
    @Value("${spark.rpc.message.maxSize}")
    private String sparkRpcMessageMaxSize;

    @Bean
    public SparkConf sparkConf() {
        SparkConf sparkConf = new SparkConf()
                .setAppName(appName)
                .setMaster(sparkMasterUri)
                .set("spark.driver.memory",sparkDriverMemory)
                .set("spark.worker.memory",sparkWorkerMemory) //"26g"
                .set("spark.executor.memory",sparkExecutorMemory)
                .set("spark.rpc.message.maxSize",sparkRpcMessageMaxSize);
//                .set("spark.shuffle.memoryFraction","0") //默认0.2
//                .setMaster("local[*]");
        return sparkConf;
    }


    @Bean
    @ConditionalOnMissingBean(JavaSparkContext.class)
    public JavaSparkContext javaSparkContext(){
        return new JavaSparkContext(sparkConf());
    }

    @Bean
    public SparkSession sparkSession(){
        return SparkSession
                .builder()
                .sparkContext(javaSparkContext().sc())
                .appName(appName)
                .getOrCreate();
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer(){
        return new PropertySourcesPlaceholderConfigurer();
    }
}

5、之后就可以使用注入方式使用sparksession了。

@Autowired
private SparkSession sparkSession;

6、因为我是java和scala混编,所以需要创建一个scala目录,并设置该目录为root,mark directory as  ---》 source root,下图为我的目录结构:

7、创建scala Object,在这里我读取的mysql中的数据。

object AnalysisADR {

  def analysis(sparkSession:SparkSession) = {
    val spark = sparkSession

    val url = "jdbc:mysql://localhost:3306/yl"
    val drugTable = "drug"
    val reactionTable = "reaction"
    val prop = new java.util.Properties()
    prop.put("driver", "com.mysql.jdbc.Driver")
    prop.put("user", "root")
    prop.put("password", "root")

    val drug = spark.read.jdbc(url, drugTable, prop).show()

  }
}

8、之后在java代码中调用scala的方法并传入sparksession。

AnalysisADR.analysis(sparkSession);

三、在虚拟机上配置spark

1、解压spark安装包之后,进入conf目录。

2、修改spark-env.sh

SPARK_LOCAL_DIRS=/home/software/spark/tmp
SPARK_MASTER_HOST=host01
SPARK_MASTER_PORT=7077
export JAVA_HOME=/home/software/jdk1.8

3、修改spark-defaults.conf

spark.master                     spark://host01:7077

4、修改slaves

host01
host02
host03

5、把配置文件复制到host02和host03,并在host01上启动spark,进入spark/sbin目录执行sh start-all.sh。

      之后可以登录http://host01:8080查看spark集群状态。

四、启动项目

试一把。。。木有报错啊哈哈哈哈哈哈哈哈哈哈~

但是spark程序并没有执行,之后去看spark ui,显示exector一直启动后关闭。。。在spark ui上查看日志,原来是主机名无法解析,也就是虚拟机上没有配置win10的ip,所以要在/etc/profile中加入自己主机的ip和域名,并关闭win的防火墙。

再次执行,成功!!!

以下是在虚拟机上的tomcat中发布项目,并执行spark任务成功的截图:

四、编译项目,因为是java和scala混编,所以编译时需要先编译scala,再编译java,

 执行命令:

mvn clean scala:compile compile package

 

附1:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/>
</parent>

<properties>
        <java.version>1.8</java.version>
        <druid.version>1.1.9</druid.version>
        <mybatis.version>1.2.0</mybatis.version>
        <spark.version>2.3.1</spark.version>
        <spring.boot.version>2.1.6.RELEASE</spring.boot.version>
        <swagger.version>2.9.2</swagger.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis.version}</version>
        </dependency>

        <!-- 使用外置tomcat -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <version>${spring.boot.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- 因为我们需要使用Scala代码,所以我们还需要加入scala-library依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.databricks/spark-xml -->
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-xml_2.11</artifactId>
            <version>0.11.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
            <version>${spring.boot.version}</version>
        </dependency>

        <!-- swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>${swagger.version}</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>${swagger.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.7</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.7</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>

            <!-- scala编译插件 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐