SparkWriteToHFile

摘要:
Yarn.log-aggregation-enable,Yarn的executor日志是一个聚合日志,在任务完成后通过聚合机器的日志生成。8.可串行化异常java。io.NotSerializableException:org.apache。哈哈。hbase。io.InterableBytesWrite指定序列化类,问题解决:valsc=newSparkContext有另一个异常:java。io.io例外:com.ethicsoftware。克里奥。KryoException:java。lang.IndexOutOfBoundsException:索引:108,大小:10Serializationtrace:familyMap。处理方法应尽量确保代码是在表面上处理的,并且此类中的函数不应在映射中引用。

1. HFile的LoadIncrement卡住

  原来是因为权限,我一直以为,load函数之后是要删除文件的,但是hdfs://slave1:8020/test/info文件夹所有的是只读权限,而且考出来附加到HFile的时候可能也需要改文件,但是权限不够,所以导致卡在了那个地方。
2.df.rdd明明有值,为什么没有执行到map呢?
  没有触发,map只是transformation,还需要一个action,比如count;
3.spark分区
  每个分区对应一个CPU的核
4. Put方式插入到内存中和Increment.bulkLoad的区别
  Put方式无疑是最简单的,而且在小数量下,其实二者差别不大,但是bulkLoad在开始之初需要对数据进行变形为三元组的形式,这就导致了一定的开销,在实际测试中,当数据大小达到了48M之后,就需要手动来指定--executor-memory,否则就是发生Outofmemory异常;当然这是因为当时测试环境机器数量比较少,但是这种方式无疑是比较消耗内存的;
5. hbase写入,无法删除写入的表

  发生了问题点其实是在truncate table上面,truncate的本质其实是删除表后重建,删除的表内容其实是放在了/hbase/.tmp/data/default/下面;

 

  权限问题是这样的,如果开始使用hdfs权限创建的HFile,拷贝到了hbase的目录下(/hbase/data/default/下面),此时文件权限是"rwxr-xr-x hdfs:hbase",hbase组只有读取和执行权限;在删除HFile的时候,会把HFile文件拷贝到/hbase/.tmp文件夹下,归档(Archieve)就会面临权限问题,因为删除的用户是hbase(在hbase组中),文件权限是hdfs,所以会有问题。

 

如何泛泛的遍历6.row获取值和列名

 

  

 

开始的处理方式:

 

    var data = new ListBuffer[(String, String, String)]
    val rddSchema = df.schema.fieldNames
   
    df.rdd.foreach(row => {
      var rowkey: String = row.getAs[String]("CUSTID")
      for (fieldName: String <- row.schema.fieldNames) {
        val value: String = row.getAs[String](fieldName)
        data.append((rowkey, fieldName, row.getAs[String](fieldName)))
      }
  }

这种方式碰到了一个异常:碰到了个异常:java.math.BigDecimal cannot be cast to java.lang.String。这是因为如果非String类型的,getAs[String]将会报错

改变了处理方式:

    var data = new ListBuffer[(String, String, String)]
    val rddSchema = df.schema.fieldNames
    // TODO rowkey need read from config
    // TODO best can get data by map but not foreach
    df.rdd.foreach(row => {
      var rowkey: String = row.getAs[String]("CUSTID")
      for (i <- 1 until row.length) {
        val fieldName = rddSchema(i).toString
        val value = row(i).toString()
        data.append((rowkey, fieldName, value))
      }
    }

row(i).toString没有类型转换问题。

7. 查看日志

  对于提交到YARN上面的任务,想要看代码中println的内容,可以在Resource Web UI上面,点击最有一列Trace UI,在跳转的页面中点击上面的"Executors",在Executors列表中,你将会看到Logs一列,里面的stdout链接,在stdout最下面即为用户程序输出日志。注意这个是executor的日志;driver的日志可以通过控制台直接看到。另外Standalone模式,对应的日志输出在NodeManager节点的 spark安装目录/work/。
  yarn.log-aggregation-enable,yarn的executor日志是聚合日志,是在任务完成后,汇聚个台机器的日志而成
8. 序列化异常
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
  指定序列化类,问题解决:

val sc = new SparkContext(new 
SparkConf().setAppName("aaa").setMaster("local[*]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))

  又爆异常:
java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 108, Size: 10
Serialization trace:familyMap (org.apache.hadoop.hbase.client.Put)
  这类问题,处理方式尽量保证你的代码处理上面,map内部不要引用本类中函数。

8. sparkContext必须要指定名称

   val sc = new SparkContext(new SparkConf().setMaster("local[*]"))报错:
  An application name must be set in your configuration

  指定appName之后问题解决,因为这个name是要在YARN的管理页面做现实(见上面的“查看日志”),用于跟踪所执行任务执行的情况。
   val sc = new SparkContext(new SparkConf().setAppName("aaa").setMaster("local[*]")) 
9. .size导致指针到迭代器尾部

putValues.repartitionAndSortWithinPartitions(regionSplitPartitioner).foreachPartition(part => {
      try {
        println("*******************get in the partition part size: " + part.size + "***************")

       while (part.hasNext) {... ...}
part.size就会导致一次遍历到尾部,导致part.haseNext为false

10. 内存溢出
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
...
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
...
  这些异常的原因是分配给yarn执行程序的内存空间不够,所以需要手动设定”--driver-memory 2G”,这样,异常消失。但是伴随着数据量的增加,这个手动的设置值也要跟着增加。发生这种异常多半是因为在内存中做了flatMap等消耗比较大的操作,如果是Map,因为都是一行一行从物理文件中读取,所以不会出现此问题。
11. Task not serializable
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: com.cmiot.bulkload.HBaseBulkLoader
Serialization stack:
        - object not serializable (class: com.cmiot.bulkload.HBaseBulkLoader, value: com.cmiot.bulkload.HBaseBulkLoader@38588dea)
        - field (class: com.cmiot.bulkload.HBaseBulkLoader$$anonfun$bulkLoad$2, name: $outer, type: class com.cmiot.bulkload.HBaseBulkLoader)

这个问题的异常是因为map中的代码中引用了不可序列化的内容;我的代码爆了这个错误是因为用了几个org.apache.haddop.conf.Configuration等类;解决方法就是把这些内容统统放在map的匿名函数中进行处理,不要再外部搞。
12. zookeeper的获取HBase配置异常: Connection refused
17/10/11 14:50:32 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
17/10/11 14:50:32 WARN zookeeper.ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
这个是因为通过zookeeper获取HBase信息的时候发生异常,地址不对;这是因为上面的那个异常导致了我把所有的配置都放到了map匿名函数中,但是没有做初始化。增加了初始化内容后,获取了正确的配置信息即可:
val hbaseConfig = ConfigFactory.load("hbase.properties")
val prod = hbaseConfig.getString("hbase.zookeeper.property.clientPort")
val parent = hbaseConfig.getString("zookeeper.znode.parent")
val quorum = hbaseConfig.getString("hbase.zookeeper.quorum")

    rdd.flatMap(r => flatMap(r)).repartitionAndSortWithinPartitions(regionSplitPartitioner).foreachPartition { part =>
      val config = HBaseConfiguration.create()
      config.set("hbase.zookeeper.property.clientPort", prod)
      config.set("zookeeper.znode.parent", parent)
      config.set("hbase.zookeeper.quorum", quorum)
      val fs = HFileSystem.get(conf)
  粗体倾斜部分之前就是一句话:val config = new Configuration();另外注意ConfigFactory.load不能放在map/foreahPartition里面,因为需要加载本地文件。
  但是后来发现这样写也不行,因为编译之后map函数里面的prod等变量都变成了BulkLoader.this.prod的形式,BulkLoader还是会出现;于是我让这个类继承了Serializable,因为构造参数中还有一个Confguration,添加了@transient,进行了屏蔽。
   class HBaseBulkLoader(@transient conf: Configuration) extends Serializable{... ... } 
粗体倾斜部分之前就是一句话:val config = new Configuration();另外注意ConfigFactory.load不能放在map/foreahPartition里面,因为需要加载本地文件。
但是后来发现这样写也不行,因为编译之后map函数里面的prod等变量都变成了BulkLoader.this.prod的形式,BulkLoader还是会出现;于是我让这个类继承了Serializable,因为构造参数中还有一个Confguration,添加了@transient,进行了屏蔽。
   class HBaseBulkLoader(@transient conf: Configuration) extends Serializable{... ... } 
13. spark写入HBase异常
  spark将hive数据写入到HBase。
  原始问题:
  Caused by: java.lang.NullPointerException
    at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.getMetaReplicaNodes(ZooKeeperWatcher.java:269)
  网调发现主要说的是插入HBase的值为NULL导致的,但是我在定义Put对象的时候,发现即使全部写成固定值,仍然报同样的错误。经历了各种尝试之后,我想要验证一下Put的值是否正确,于是我决定换一种方式来实现,直接使用HTable的方式来提交数据。
  spark-submit之后发现了类似的错误,不过似乎信息更加明确:
  17/10/10 14:50:32 INFO client.ZooKeeperRegistry: ClusterId read in ZooKeeper is null
  Exception in thread "main" java.lang.NullPointerException
  基于新发现的报错,继续网调,解决:
  HBase里面的配置中,zookeeper.znode.parent字段被配置为“/hbase-unsecurity",但是CDH中zookeeper中的根节点配置为/hbae;保持一致即可
 
  网调很多时候比较具有迷惑性,它在缩小你的调查范围的同时,可能也在把正确的点排除在你调查范围内。尝试多种方式来实现,来缩小问题的范围,是一种解决思路。可能会有不同效果。
 

免责声明:文章转载自《SparkWriteToHFile》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇vscode折叠代码后,没有显示结束大括号,只显示省略号怎么解决将.csv文件用Excel 2016打开下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

java中对list集合中的数据按照某一个属性进行分组

import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; p...

Springboot + Mybatis 多数据源配置

1、src/main/resources/application.properties 中配置好多个数据源   spring.datasource.initialize=false  #接口请求端口号、路径  server.port=9090  servcer.context-path=/  #mybatis配置  #mybatis.config-loca...

从零搭建企业大数据分析和机器学习平台-技术栈介绍(三)

数据传输和采集 Sqoop数据传输工具实际项目开发中,往往很多业务数据是存放在关系型数据库中,如 MySQL数据库。我们需要将这些数据集中到数据仓库中进行管理,便于使用计算模型进行统计、挖掘这类操作。 Sqoop是Apache软件基金会的⼀一款顶级开源数据传输工具,用于在 Hadoop与关系型数据库(如MySQL、Oracle、PostgreSQL等)之间...

关于c++正则表达式的用法

本人最近在做一个项目,这个项目里面有一个功能是这样的,要求这个项目中提供搜索功能,简单的说,如果里面输入1-10 11,15,27,39这个字符串,那么你就要从中找到1,2,3,4,5,6,7,8,9,10和11,15,27,39等等这些数字。我考虑了很久,决定使用正则表达式来做,采用的原因有两点:其一,因为考虑到范围的问题(比如说位数不能超过三位)这样的...

在内网服务器中获得真正的客户端ip的方法

如下代码: /**//// <summary>    /// RealIP 的摘要说明:    /// 获得用户的真实ip,由于squidserver的原因直接取到的ip是内网ip    /// </summary>    abstract public class RealIP    {        const string H...

C#反射动态调用dll中的方法,并返回结果[转]

最近在看工厂开发模式,发现用到了反射,之前只听说过也没怎么用过;所以花了点时间重新温习了一遍; 反射的作用是动态的加载某个dll(程序集),并执行该程序集中的某个方法,并返回结果;当然也可以给该方法传递参数 namespace assembly_name { public class assembly_class {...