前言

之前项目是基于springboot整合spark,在standalone上运行,曾经写过一篇博客,链接:

https://blog.csdn.net/qq_41587243/article/details/112918052?spm=1001.2014.3001.5501

现在使用同样的方案,不过是在生产环境yarn集群上提交spark,并且需进行kerbores验证,如下。

背景

公司项目需求,通过手机信令位置数据,做一个分析性平台。基于目前线上环境spark+hadoop+yarn做分析。数据量10亿用户。

spark on yarn 问题总结

首先在开发过程中,前提保证版本的一致性,否则问题更多!!

一、standalone和yarn区别

  1. 在standalone模式下运行,这个只需要指定好setMaster的url为spark://172.31.13.100:7077,启动方式多样,java -jar、tomcat、spark-submit都可以启动的。
  2. on yarn集群上,所以只能用spark-submit去启动,由Spark-Yarn-Client托管应用的jar,否则,应用的jar中缺少向Yarn资源管理器申请资源的模块,无法正常启动。
    如下:
./spark-submit 
--conf spark.yarn.jars="/xxxx/spark/jars/*,hdfs://ns1:8020/xxxxx/lib/*" 
--driver-java-options "-Dorg.springframework.boot.logging.LoggingSystem=none -Dspring.profiles.active=test -Dspark.yarn.dist.files=/yarn-site.xml" --
master yarn
--deploy-mode client  
--executor-cores 3 
--num-executors 80 
--executor-memory 12g 
--driver-memory 12g 
--name xxxxx_analysis 
--queue xxxxxxx 
--class org.springframework.boot.loader.JarLauncher 
./com.xxx-1.0-SNAPSHOT.jar 
--principal xxxx 
--keytab xxx.keytab >> xxx.log 2>&1 &
参数解释:
--conf spark.yarn.jars 指定的是运行在项目需要的一些jar包,先传到hdfs对应目录下;
                  Spark客户端上面需要的一些jar包,springboot依赖的jar包
--driver-java-options  指定应用的运行环境参数.
--class 必须指定springboot的main所在的class
 JarLuncher 实际上是同进程内调用 Start-Class 类的 main(String[] args) 方法,并在启动前准备好了 Class Path
--master   --deploy-mode  指定yarn的client端
--queue  指定hdaoop中yarn队列
--principal  指定kerberos的用户
--keytab   指定kerberos的票据

二、日志冲突

在这里插入图片描述
解决:
日志冲突,spark-submit内部使用log4j作为日志模块,springboot采用的是logbak作为日志模块。两种方案,一是springbooe中exclusion掉这个logback-classic这个jar包,二是直接在启动运行环境时,将-Dorg.springframework.boot.logging.LoggingSystem=none。

三、GSON版本冲突

Spark自带的GSON版本可能与SpringBoot依赖的版本冲突,引起如下异常:
在这里插入图片描述
很明显的问题,jar包冲突,在两个位置gson包,版本不同,直接移除掉spark的jars中的gson版本即可。

四、guava和validation-api包 冲突

可以在springboot中直接把所有涉及到的东西都exclusion掉,其中涉及到的有spark和javaee-api。
在这里插入图片描述

五、序列化和反序列问题

在这里插入图片描述
刚看到错误就反应过来,哦!序列化,java使用JavaSerializer序列化方式,spark使用kyro方式,然后修改spark代码,去extend继承javaSerializer,然而并不管用,百度查不到,困扰好几天,最后解决,把springboot打包后的target文件夹下面的com.asia-1.0-SNAPSHOT.jar.original这个文件,重命名为com.asia-api.jar,这个就是我们自己开发程序的jar包,将这个jar上传到我们提前指定的hdfs的项目依赖的路径下即可,解决!!

六、 运行时找不到应用中的类

1)将应用的jar解压,将lib目录下所有jar上传HDFS目录"hdfs://xxx:8020/lib"
2)"hdfs://xxx:8020//lib"追加到命令行“spark.yarn.jars”参数中
3)删除其中可能与集群环境冲突的包
未指定“spark.yarn.jars”参数,导致executor节点缺少依赖库,命令行指定“spark.yarn.jars”参数,将必要的库目录都加上,并将springboot上的依赖包中排除hadoop*、spark*、scala*的包,也是防止冲突并加上了–calss

七、 冲突…在这里插入图片描述

这个问题在本地测试的时候就出现过,是缺少这个包,所以在子pom中添加了
在这里插入图片描述
现在提交到集群上,和spark中的又是冲突了,所以直接在自己的项目中排除掉,相应的hdfs上依赖的jar中去除就可以了。
排查第二个依赖包,janino-3.0.8.jar,也会和spark中jars有版本的冲突,删除即可。

        <!--java.lang.NoClassDefFoundError: org/codehaus/janino/ClassBodyEvaluator-->
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.0.8</version>
        </dependency>

yarn 资源释放

为了节省yarn的资源,打算让每个application运行完成后,自动释放资源。有两种方案:
第一种:
每个程序运行完成后,直接释放掉sparkSession.close(),他在释放的过程中,也关闭了sparkContext,所以不在占用yarn资源,在下一次任务启动后,又会自动启动spark任务,申请资源。 SparkSession不直接加入springboot启动项,在每次运行任务前去加载初始化sparkSession。
第二种:
直接不进行释放sparkSession,设置动态资源,最大使用executors和最小使用。

1.将spark.dynamicAllocation.enabled设置为true。意思就是启动动态资源功能;
2.此外关于动态资源分配还有以下相关参数
3.spark.dynamicAllocation.initialExecutors:
初始executor数量,如果--num-executors设置的值比这个值大,那么将使用--num-executors设置的值作为初始executor数量。
4.spark.dynamicAllocation.maxExecutors:
executor数量的上限,默认是无限制的。
5.spark.dynamicAllocation.minExecutors:
executor数量的下限,默认是0个
6.spark.dynamicAllocation.cachedExecutorIdleTimeout:
如果executor内有缓存数据(cache data),并且空闲了N秒。则remove该executor。默认值无限制。也就是如果有缓存数据,则不会remove该executor
为什么?比如在写shuffle数据时候,executor可能会写到磁盘也可能会保存在内存中,如果保存在内存中,该executor又remove掉了,那么数据也就丢失了。
--executor-memory 20g 
--executor-cores 5 
--driver-memory 10g 
--driver-cores 5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=20 \
--conf spark.dynamicAllocation.minExecutors=20 \
--conf spark.dynamicAllocation.maxExecutors=400 \
--conf spark.dynamicAllocation.executorIdleTimeout=300s \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=10s \

hadoop token超时问题

HDFS_DELEGATION_TOKEN token 13619910 for zh2_xxxx) can’t be found in cache
在这里插入图片描述
首先,由此问题想出,keytab有过期时间,24小时过期,所以不能再springboot直接引入一次,定时刷新票据,或者在使用前刷新一次票据;
其次,这个问题就是连接hadoop判断文件是否存在的问题,因为代码是在启动hadoop的时候就加载了hadoop的票据,以及建立连接的,这个明显是超时的问题;
解决办法:不去定时刷新票据,因为现在是sparkSession每次都会关闭,所以在每次程序开始之前,刷新一次票据。
delegation token其实就是hadoop里一种轻量级认证方法,作为kerberos认证的一种补充。
详解:https://blog.csdn.net/qq_41587243/article/details/122255689

POM文件依赖

<properties>
        <java.version>1.8</java.version>
        <druid.version>1.1.9</druid.version>
        <mybatis.version>1.2.0</mybatis.version>
        <spark.version>2.2.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.8.2</hadoop.version>
        <spring.boot.version>2.1.6.RELEASE</spring.boot.version>
        <swagger.version>2.9.2</swagger.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <!-- 去除对默认日志的依赖 -->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <groupId>org.springframework.boot</groupId>-->
<!--                    <artifactId>spring-boot-starter-logging</artifactId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
            <exclusions>
                <exclusion>
                    <groupId>javax.validation</groupId>
                    <artifactId>validation-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 引入Druid依赖,阿里巴巴所提供的数据源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis.version}</version>
        </dependency>

        <!-- 使用外置tomcat -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <version>${spring.boot.version}</version>
            <!-- provided 表明该包只在编译和测试的时候使用,去除默认的tomcat -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <groupId>io.netty</groupId>-->
<!--                    <artifactId>netty-all</artifactId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
            <exclusions>
                <exclusion>
                    <groupId>javax.validation</groupId>
                    <artifactId>validation-api</artifactId>
                </exclusion>
                    <exclusion>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                    </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.0.43.Final</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>janino</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!-- 因为我们需要使用Scala代码,所以我们还需要加入scala-library依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
            <version>${spring.boot.version}</version>
        </dependency>

        <!-- swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>${swagger.version}</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>${swagger.version}</version>
        </dependency>

        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>8.0</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.validation</groupId>
                    <artifactId>validation-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>commons-net</groupId>
                    <artifactId>commons-net</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>
        <!--Data 注解依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
            <scope>provided</scope>
        </dependency>

        <!-- postgresql -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.18</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <recompileMode>incremental</recompileMode>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <launchers>
                        <launcher>
                            <id>app</id>
                            <mainClass>com.asia.xxxApplication</mainClass>
                            <args>
                                <arg>-deprecation</arg>
                            </args>
                            <jvmArgs>
                                <jvmArg>-Xms2056m</jvmArg>
                                <jvmArg>-Xmx4096m</jvmArg>
                            </jvmArgs>
                        </launcher>
                    </launchers>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <!--下面为打包不加依赖,只包含代码-->
                <configuration>
                    <includes>
                        <include>
                            <groupId>com.asia</groupId>
                            <artifactId>business</artifactId>
                        </include>
                    </includes>
                </configuration>
            </plugin>
        </plugins>
    </build>

这就是项目上springboot和spark整合中的一些坑,中间可能略过了一些很小的问题,如有问题可沟通交流哦! 欢迎留言
后面也尝试了利用多线程线程池,提交多个job的案例,也是可以缩短分析时间,更充分利用资源的,有时间再分享!!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐