1.总体原则
1.Yarn配置
- Yarn部署在单台服务器128G内存,32个核
如果一个服务器是5个核, executor-cores=5,则理论上num-executor<=6个核。5*6<=32
/hadoop-xxx/yarn-site.xml
修改yarn的运行分配的最大最小内存
yarn.scheduler.minimum-allocation-mb
yarn.scheduler.maximum-allocation-mb
2.配置参数
-
driver-memory
driver端的内存消耗主要是以下内容:
1.创建小规模的分布式数据集:使用 parallelize、createDataFrame 等 API 创建数据集
2.收集计算结果:通过 take、show、collect 等算子把结果收集到 Driver 端
根据以上估算即可下面代码是预估driver端内存的执行计划
val df: DataFrame = _ df.cache.count val plan = df.queryExecution.logical val estimated: BigInt = spark .sessionState .executePlan(plan) .optimizedPlan .stats .sizeInBytes
-
executor-cores
1.每个executor的最大核数,一般在3-6之间比较合适.
2.Executor 中并行计算任务数的上限是 spark.executor.cores 除以 spark.task.cpus ,暂且记为 #Executor-tasks,整个集群的并行计算任务数自然就是 #Executor-tasks 乘以集群内 Executors 的数量
3.每个线程只能在同一时刻处理一个任务在一个分区上,因此,在运行时,线程、任务与分区是一一对应的关系。配置项 | 含义 ---|--- spark.cores.max| 集群范围内最大的CPU核数 spark.executore.cores | 单个Executor内CPU核数 spark.task.cpus | 单个任务消耗的CPU核数,默认为1 spark.default.parallelism|未指定分区数是的默认并行度 spark.sql.shuffle.partitions|数据关联、聚合操作中Reducer的并行度
-
–num-executors
一个job拥有多个少jvm进程。每个executor和kafka的一个parition对接 -
executor-memory
单个executor的内存<br- 如果yarn 内存配置不足,例如只有5个executor,100/5=20g则最大为20g
3.spark内存估算
1. spark内存模型如下图
内存区域划分 | 堆内内存 | 堆外内存 |
---|---|---|
内存空间总大小 | spark.executor.memory | spark.memory.offHeap.size |
Reserved Memory | 固定为300M | 无 |
User Memory | 1-spark.memory.fraction | 无 |
Storage Meomory | spark.memory.fraction*spark.memory.storageFracion | spark.memory.storageFraction |
Reserved Memory | fraction*(1-strageFraction) | 1-storeageFraction |
2. 不同内存区域解释
- Reserved Memory 用于存储 Spark 内部对象,300M
- User Memory 用于存储用户自定义的数据结构(图中黄色区域)
- Execution Memory 用于分布式任务执行
- Storage Memory 则用来容纳 RDD 缓存和广播变量
3.内存争抢协议
- 如果对方的内存空间有空闲,双方就都可以抢占;
- 对于 RDD 缓存任务抢占的执行内存,当执行任务有内存需要时,RDD 缓存任务必须立即归还抢占的内存,涉及的 RDD 缓存数据要么落盘、要么清除;
- 对于分布式计算任务抢占的 Storage Memory 内存空间,即便 RDD 缓存任务有收回内存的需要,也要等到任务执行完毕才能释放。
4.内存估算方法
针对单个executor而言
设定spark.executor.memory为900M
spark.storage.storageFtaction=0.3
spark.memory.fraction=0.6
1.执行内存总大小(即图中蓝色+绿色部分)
执行内存总大小(M)=(spark.executor.memory-System.reserved.memory)* spark.memory.fraction=(900M—300M)*0.6=360 M
存储内存(Story Memory)=(spark.executor.memory-System.reserved.memory)*spark.memory.fraction* spark.storage.storageFtaction=(900M—300M)*0.6*0.3=108
执行内存(Execution Memory)=(spark.executor.memory-System.reserved.memory)*spark.memory.fraction* spark.storage.storageFtaction=(900M—300M)*0.6*0.7=252
2. 其他(图中黄色部分)
other=(spark.executor.memory-System.reserved.memory)*(1-spark.memory.fraction)=240 M
5. 简单内存估算方法:
-
Storage Memory估算:将要缓存到内存的RDD/Dataset/Dataframe或广播变量进行cache,然后在Spark WEBUI的Storage标签页下直接查看所有的内存占用,大致就对应Storage Memory。
-
Execution Memory估算:有了Storage Memory,因为默认情况下Execution Memory和Storage Memory占用Spark Memory的比例是相同的,这里可以将Execution Memory和Storage Memory设置为相同。
-
User Memory:如果应用中没有使用太多自定义数据类型,保持默认值即可;如果使用了很多自定义数据类型,按老师说的方式进行估算即可。
6. 堆外内存存在的意义
- 在于堆外堆内的空间互不share,也就是说,你的task最开始用堆外,用着用着发现不够了,这个时候即使堆内还有空闲,task也没法用,所以照样会oom
7.估计内存的原则
- 执行内存总量为M ,Execution Memory+Storage Memory的剩余空间,最少为Execution Memory,最多为Storage Memory + Execution Memory
- 线程数为N, spark.execution.cores 为N
- 每个线程可以分配到的内存上下限,下限为M/N/2,上限为M/N,取到2是因为比例是默认比例spark.storage.storageFtaction为0.5
给定数据总量D,以及并行度P,P由spark.default.parallelism来控制。
主要是让数据分片的大小D/P坐落在(M/N/2,M/N)
8.CPU利用率观察
ganglia、Prometheus、Grafana、nmon、htop等等,这些工具,去观察你的CPU利用率,然后回过头来,平衡三者,然后再去观察CPU利用率是不是提升了,通过这种方式,来去调优
9. 解压文件
spark在加载parquet+snappy压缩文件时,它会考虑解压之后的文件大小吗?
Spark考虑的,不是数据在磁盘中的存储大小,是内存中的存储大小,所以恰恰是Parquet+Snappy解压后、在内存中的存储大小~
更多推荐