Spark程序排错

摘要:
shufflewrite的分区数由上一阶段的RDD分区数控制,shuffleread的分区数则是由Spark提供的一些参数控制。结果导致JVMcrash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failedtoconnecttohost的错误,也就是executorlost的意思。将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。SparkSQL和DataFrame的join,groupby等操作通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。提高executor的内存通过spark.executor.memory适当提高executor的memory值。
1.shuffle相关

报错提示

org.apache.spark.shuffle.MetadataFetchFailedException: 
Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException:
Failed to connect to hostname/192.168.xx.xxx:50268

java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at org.apache.spark.memory.UnifiedMemoryManager.acquireExecutionMemory(UnifiedMemoryManager.scala:80)

原理分析

shuffle分为shuffle writeshuffle read两部分。
shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。

shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。

解决办法

知道原因后问题就好解决了,主要从shuffle的数据量和处理shuffle数据的分区数两个角度入手。

  1. 减少shuffle数据

    思考是否可以使用map side join或是broadcast join来规避shuffle的产生。

    将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

  2. SparkSQL和DataFrame的join,group by等操作

    通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。

  3. Rdd的join,groupBy,reduceByKey等操作

    通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

  4. 提高executor的内存

    通过spark.executor.memory适当提高executor的memory值。

  5. 是否存在数据倾斜的问题

    空值是否已经过滤?异常数据(某个key数据特别大)是否可以单独处理?考虑改变数据分区规则。

参考

http://blog.csdn.net/lsshlsw/article/details/51213610

免责声明:文章转载自《Spark程序排错》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇JSP上传视频等大文件[MySQL] 常用SQL的优化--18.4下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Apache seaTunnel 数据集成平台

为什么我们需要 seatunnel Databricks 开源的 Apache Spark 对于分布式数据处理来说是一个伟大的进步。我们在使用 Spark 时发现了很多可圈可点之处,同时我们也发现了我们的机会 —— 通过我们的努力让Spark的使用更简单,更高效,并将业界和我们使用Spark的优质经验固化到seatunnel这个产品中,明显减少学习成本,加...

Spark(十六)DataSet

  Spark最吸引开发者的就是简单易用、跨语言(Scala, Java, Python, and R)的API。 本文主要讲解Apache Spark 2.0中RDD,DataFrame和Dataset三种API;它们各自适合的使用场景;它们的性能和优化;列举使用DataFrame和DataSet代替RDD的场景。本文聚焦DataFrame和Datase...

spark 2.X 疑难问题汇总

当前spark任务都是运行在yarn上,所以不用启动长进程worker,也没有master的HA问题,所以主要的问题在任务执行层面。 作业故障分类故障主要分为版本,内存和权限三方面。 - 各种版本不一致 - 各种内存溢出 - 其他问题 版本不一致1)java版本不一致报错:java.lang.UnsupportedClassVersionError: co...

spark性能调优06-数据倾斜处理

1、数据倾斜 1.1 数据倾斜的现象 现象一:大部分的task都能快速执行完,剩下几个task执行非常慢 现象二:大部分的task都能快速执行完,但总是执行到某个task时就会报OOM,JVM out of Memory,task faild,task lost,resubmitting task等错误 1.2 出现的原因 大部分task分配的数据很少(某...

Spark SQL概述

前言:一些逻辑用spark core 来写,会比较麻烦,如果用sql 来表达,那简直太方便了 一、Spark SQL 是什么 是专门处理结构化数据的 Spark 组件 Spark SQL 提供了两种操作数据的方法:   sql 查询   DataFrames/Datasets API Spark SQL = Schema + RDD 二、Spark SQL...

Mastering-Spark-SQL学习笔记02 SparkSession

SparkSession是在使用类型化数据集(或基于非类型化Row-基于DataFrame)数据抽象开发Spark SQL应用程序时创建的首批对象之一。 在Spark 2.0中,SparkSession将SQLContext和HiveContext合并到一个对象中。 使用SparkSession.builder方法来创建一个SparkSession实例,使...