大数据技术-spark+hive+hbase研究

摘要:
大数据Spark研究1背景1 Scala语言基础:Scala详细摘要Spark简介:Spark简介2环境1部署SparkProperties-˃ScalaCompiler1,选中UseProjectSettings2,在ScalaInstallation下拉框中选择可用的Scala版本,然后单击Apply以解决3DD到DataFramevalconf=newSparkConf()conf.setApp.conf的转换。setMastervalsc=newSparkContextvalsq=newSQLContextimportsq。implicits_//使用这句话,我们可以隐式转换valpeople=sc。text文件。地图地图toDF()人。registerTempTablevalresult=sq.sql4RDD操作分为转换和操作。在查询数据时,无论之前使用什么转换,都是转换,直到检索到结果才会执行!五个基本概念HDFS管理Namenode节点中的数千个数据节点。Namenode相当于资源管理器,Datanodes相当于数据资源。文件以块的形式存储在不同的数据节点中,每个块
大数据技术-spark+hive+hbase研究第1张大数据 spark 研究0基础入门)

一 背景

基础

Scala 语言基础:Scala详细总结(精辟版++)

spark 介绍    :  spark介绍

 

 

二 环境

部署spark

 

<![if !supportLists]>1、<![endif]>环境准备
1)配套软件版本要求:

Java 6+ 

Python 2.6+. 

Scala version (2.10.x).


2)安装好linuxjdkpython, 一般linux均会自带安装好jdkpython但注意jdk默认为openjdk,建议重新安装oracle jdk


3IP10.171.29.191  hostnamemaster


2、安装scala


1)下载scala
wget http://downloads.typesafe.com/scala/2.10.5/scala-2.10.5.tgz

2)解压文件
tar -zxvf scala-2.10.5.tgz

3)配置环境变量
#vi/etc/profile
#SCALA VARIABLES START
export SCALA_HOME=/home/jediael/setupfile/scala-2.10.5
export PATH=$PATH:$SCALA_HOME/bin
#SCALA VARIABLES END

$ source /etc/profile
$ scala -version
Scala code runner version 2.10.5 -- Copyright 2002-2013, LAMP/EPFL

4)验证scala
$ scala
Welcome to Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51).
Type in expressions to have them evaluated.
Type :help for more information.

scala> 9*9
res0: Int = 81

3、安装spark
1)下载spark
wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz

2)解压spark
tar -zxvf http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz

3)配置环境变量
#vi/etc/profile
#SPARK VARIABLES START 
export SPARK_HOME=/mnt/jediael/spark-1.3.1-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin 
#SPARK VARIABLES END

$ source /etc/profile

4)配置spark
 $ pwd
/mnt/jediael/spark-1.3.1-bin-hadoop2.6/conf

$ mv spark-env.sh.template spark-env.sh
$vi spark-env.sh
export SCALA_HOME=/home/jediael/setupfile/scala-2.10.5
export JAVA_HOME=/usr/java/jdk1.7.0_51
export SPARK_MASTER_IP=10.171.29.191
export SPARK_WORKER_MEMORY=512m 
export master=spark://10.171.29.191:7070

$vi slaves
localhost

5)启动spark
pwd
/mnt/jediael/spark-1.3.1-bin-hadoop2.6/sbin
$ ./start-all.sh 
注意,hadoop也有start-all.sh脚本,因此必须进入具体目录执行脚本

$ jps
30302 Worker
30859 Jps
30172 Master

4、验证安装情况
1)运行自带示例
$ bin/run-example  org.apache.spark.examples.SparkPi

2)查看集群环境
http://master:8080/

3)进入spark-shell
$spark-shell

4)查看jobs等信息
http://master:4040/jobs/

 

 

部署开发环境

 

  下载安装ScalaI IDEScala IDE

  

三 示例入门

  1 建议查看借鉴 spark安装目录地下的examples目录

 

四 爬过的坑

 

1 开启spark服务时,报错 

#localhost port 22: Connection refused

  

解决:是因为没有安装openssh-server,输入命令 sudo apt-get install openssh-server安装之后,即可解决

 

2 在eclipse上建立的spark项目,无法运行,报错:错误: 找不到或无法加载主类 

  问题出现条件,当在项目中添加spark的jar包时,就会出现项目报错。

  

   解决:右键工程 ----> Properties --->Scala Compiler

  1 勾选 Use Project Settings

  2 在Scala Installation 下拉框选择一个能用的Scala版本,点击应用即可解决

 

 

 

 

 

 

 

 

 

3 RDD 转换成为DataFrame

 

  

     val conf = new SparkConf()

    conf.setAppName("SparkSQL")

    conf.setMaster("local")

    

    val sc = new SparkContext(conf)

    val sq = new SQLContext(sc)

    import sq.implicits._ // 加上这句话,才能隐式的转换

    

    val people = sc.textFile("/wgx-linux/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

    people.registerTempTable("people")

    val result = sq.sql("select * from people")

 

   

4 RDD 操作分为transformation与action

 

在查询数据时,无论先前是何种变换,都是transformation直到去取结果时,才带有执行!若在没取结果前,计算效率时间,是错误的!

五 基础概念

HDFS

 

 

 

大数据技术-spark+hive+hbase研究第2张 

 

 

 

  一个Namenode节点管理成千上万个DatanodesNamenode相当于资源管理器,Datanodes相当于数据资源。

一个文件分块存储到不同的Datanodes,每个块都会有副本。

 

MapReduce

 

  假设有这么一个任务,需要计算出一个大文件存储的最大的数据,下图给出了mapReduce的计算过程。

   大数据技术-spark+hive+hbase研究第3张

 

RDD

 

大数据技术-spark+hive+hbase研究第4张 

 

   RDD将操作分为两类:transformation与action。无论执行了多少次transformation操作,RDD都不会真正执行运算,只有当action操作被执行时,运算才会触发。而在RDD的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。

 

  例如map操作会返回MappedRDD,而flatMap则返回FlatMappedRDD。当我们执行map或flatMap操作时,不过是将当前RDD对象传递给对应的RDD对象而已transformation

 

 

DDL

 

大数据技术-spark+hive+hbase研究第5张 

 

  数据库模式定义语言DDL(Data Definition Language),是用于描述数据库中要存储的现实世界实体的语言。一个数据库模式包含该数据库中所有实体的描述定义。

 

 

六  spark +hbase

 

 

 1 在Spark-evn.sh里添加hbase的库(否则会报错误)

SPARK_CLASSPATH=/home/victor/software/hbase/lib/*

 

2 hbase数据结构

  大数据技术-spark+hive+hbase研究第6张

 

3 连接数据库

    val sqconf = HBaseConfiguration.create()  

    sqconf.set("hbase.zookeeper.property.clientPort""2181")

    sqconf.set("hbase.zookeeper.quorum""localhost")

    val admin = new HBaseAdmin(sqconf)

 

 

4 增删改查

val sqconf = HBaseConfiguration.create()  

     sqconf.set("hbase.zookeeper.property.clientPort""2181")

     sqconf.set("hbase.zookeeper.quorum""localhost")

     val admin = new HBaseAdmin(sqconf)

    

    

    // 建表

     if (!admin.isTableAvailable("test-user")) {//检查表是否存在  

      print("Table Not Exists! Create Table")  

      val tableDesc = new HTableDescriptor("test-user")  //表名

      tableDesc.addFamily(new HColumnDescriptor("name".getBytes()))//添加列簇

      tableDesc.addFamily(new HColumnDescriptor("id".getBytes()))//添加列簇

      admin.createTable(tableDesc)//建表

      

      println("create table test-user")

      

      

     }

    

    // 增

    val table = new HTable(sqconf"test-user");  

    for (i <- 1 to 10) {  

      var put = new Put(Bytes.toBytes("row"+i)) 

      put.add(Bytes.toBytes("name"), Bytes.toBytes("name"), Bytes.toBytes("value " + i))//往列簇basic添加字段name值为value

      put.add(Bytes.toBytes("name"), Bytes.toBytes("xing"), Bytes.toBytesi))

      put.add(Bytes.toBytes("id"), Bytes.toBytes("id"), Bytes.toBytesi))

      table.put(put

    }  

    table.flushCommits()

    

    // 删

    val delete = new Delete(Bytes.toBytes("row0"))//删除row1数据

    table.delete(delete)

    table.flushCommits()

    // 改

    var put = new Put(Bytes.toBytes("row1")) 

    put.add(Bytes.toBytes("id"), Bytes.toBytes("id"), Bytes.toBytes"10001100"))

    table.put(put

    table.flushCommits()

    

    // 查

     val row1 =  new Get(Bytes.toBytes("row1"))

    val HBaseRow = table.get(row1)//获取row为scutshuxue的数据

    if(HBaseRow != null && !HBaseRow.isEmpty){

      var result:AnyRef = null

      result = Bytes.toString(HBaseRow.getValue(Bytes.toBytes("id"), Bytes.toBytes("id")))//得到列簇为address属性city的值

      println("result="+result)

}

 

 

5 带条件的查询

   import CompareFilter._

    // 带条件的查询

    // 查询列族为id,列为id.值为3的数据行

    val filter  = new SingleColumnValueFilter(Bytes.toBytes("id"), Bytes.toBytes("id"),  

                    CompareOp.EQUAL, Bytes.toBytes("10001100")); 

    

    val scan = new Scan()

    scan.setFilter(filter)

    

    val scanner = table.getScanner(scan)

    var rsu = scanner.next()

    while(rsu != null){

      

      println("rowkey = " + rsu.getRow()); 

      println("value = " + rsu.getValueAsByteBuffer(Bytes.toBytes("id"), Bytes.toBytes("id"))); 

      rsu = scanner.next()

}

 

 

  

 

6 其他操作(group order 等)

  需要一个转换,将hbase 转换成RDD, 在转化成DataFrame,注册数据表,执行sql

sqconf.set(TableInputFormat.INPUT_TABLE"test-user")

    val usersRDD = sc.newAPIHadoopRDD(sqconfclassOf[TableInputFormat],

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

      classOf[org.apache.hadoop.hbase.client.Result])

      

      val personRDD = usersRDD.map(p => Peoson(p._2.getValue(Bytes.toBytes("name"),Bytes.toBytes("name")) +"",

          p._2.getValue(Bytes.toBytes("xing"),Bytes.toBytes("name"))+"",p._2.getValue(Bytes.toBytes("id"),Bytes.toBytes("id"))+""))

    

     if(personRDD != null){

       println("suc")

       println(personRDD.count())

     }

     else

       println("ERROR")

       

    println("to DataFrame suc")

    

    import sq.implicits._ 

    val people = personRDD.toDF()

    people.registerTempTable("people")

    

    val result = sq.sql("select * from people")

    

    result.map(t => "Name: " + t(0)).collect().foreach(println)

 

 

七 spark +hive 

   安装spark之后,自带有hive,所以不需要另外部署hive。

 

1 特点

 

  Hive不支持常规的SQL更新语句,如:数据插入,更新,删除。

  Hive 数据查询用时以数分钟甚至数小时来进行计算即非即时性

  Hive 支持类sql的语法,即hql

 

2 hive导入数据

 

  def addDataToTable(sq:HiveContext){

    sq.sql("load data local inpath '/usr/wgx/test_user.txt' into table test_user1")

    sq.sql("load data local inpath '/usr/wgx/test_user_info.txt' into table test_user_info1")

  }

 

 

   

 

八 spark +mysql

 

九 调研分析

 

结果分析

注:1 order by 为随机数排序

 

                                       (单位:秒)

方式

order by

 group

sum group

left join

数量

spark+hbase

46.727

15.490

13.320

45.943

100

spark+hive 

11.209

4.490

2.814

6.247

100

spark +mysql

 

 

 

 

100

 

续                                     (单位:秒)

方式

=

<>

<

>

数量

spark+hbase

11.682

29.947

10.484

10.849

100

spark+hive 

0.694

0.826

0.900

0.941

100

spark +mysql

 

 

 

 

100

 

   

                                      (单位:秒)

方式

order by

 group

sum group

left join

数量

spark+hbase

72.114

25.347

23.190

66.321

200

spark+hive 

12.009

5.026

3.385

8.620

200

spark +mysql

 

 

 

 

200

 

续                                   (单位:秒)

方式

=

<>

<

>

数量

spark+hbase

20.073

20.134

19.883

20.679

200

spark+hive 

1.33

1.564

1.359

1.355

200

spark +mysql

 

 

 

 

200

 

 

               

线性分析

 

 

          线性分析(spark +hbase

      (单位:秒)

 

100

200

600

order by

46.727

72.114

138.561

 group

15.490

25.347

51.306

sum group

13.320

23.190

50.565

left join

45.943

66.321

281.226 

[join1700万】

=

11.682

20.073

46.582

<>

29.947

20.134

51.231

<

10.484

19.883

45.515

>

10.849

20.679

47.663

 

          线性分析(spark +hive

               (单位:秒)

 

100

200

600

order by

11.209

12.009

 

 group

4.490

5.026

 

sum group

2.814

3.385

 

left join

6.247

8.620

 

=

0.694

1.331

 

<>

0.826

1.564

 

<

0.900

1.359

 

>

0.941

1.355

 

 

 

 

十 hive hbase整合

 

整合方式一 sparkSQL 通过hive查询hbase数据

 

原理:

 1 读取hbase表,转换成RDD

 2 将RDD转换成对象模型RDD

 3 对象模型RDD注册成虚拟临时表

 4 从第三步的虚拟临时表的数据导入hive表

 5 读取hive表为RDD

 6 再将第五步的RDD注册临时表

 7 查询

 

转换效率

 

注:从hbase转换成hive (单位:秒)

数量

200

400

600

时间

97.513

144.007

226.221

 

查询效率线性分析(单位:秒)

 

200

400

600

order by

12.748

26.016

56.231

 group

5.114

7.871

21.625

sum group

3.765

5.869

9.379

left join

10.935

34.467

31.471

=

2.041

7.298

5.727

<>

2.662

5.534

8.502

<

1.907

4.115

5.499

>

2.120

4.049

5.644

 

 

2 整合方式二  以hive sql方式查询hbase

 

原理:

spark 将hbase表转换成RDD (模型转换,并没执行)

RDD 通过hive Context 注册为临时表

hive 执行查询

 

整合效率

 

200

400

600

order by

 

 

148.701

 group

 

 

78.277

sum group

 

 

53.201

left join

 

 

314.468 [1]

=

 

 

46.615

<>

 

 

53.453

<

 

 

46.097

>

 

 

46.845

 

:600万的表与1700万数据表的join

 

整合方式三 hive表外部关联hbase

 

  原理:

 

  在创建hive表时,在创建表的sql上加上对hbase表的关联

  

sql("CREATE EXTERNAL TABLE user_t_info(id string,userId string,name string,phone string)"+

 

       "STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'"+

 

"WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,id:id,info:name,info:phone ")"+

 

"TBLPROPERTIES("hbase.table.name" = "user_t_info")")

 

   

整合效率

 

 

200

400

600

order by

 

 

240.608

 group

 

 

88.908

sum group

 

 

86.667

left join

 

 

 

=

 

 

79.768

<>

 

 

80.462

<

 

 

80.645

>

 

 

79.237

 

 

十一 结论分析

1 spark +hive 比spqrk+hbase 效率高

 

2 随着数据量的增加,spark+hive 没有成线性增加,spark+hbase大致成线性关系增加,总体上,spark+hive的增加幅度较小

 

3 shpark +hbase +hive 

  从hbase转换成hive,数据量和时间大致成线性关系,比纯线性关系好一点

  查询效率来讲,随着数据量的增加,虽然时间有所增加,但幅度不大

 

十二 附录

测试源码

 

大数据技术-spark+hive+hbase研究第7张

 

大数据技术-spark+hive+hbase研究第8张 

极点科技

诚信 专注 创新

免责声明:文章转载自《大数据技术-spark+hive+hbase研究》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇pyqt5学习之QCommandLinkButtonlhgdialogv3.13 使用点滴下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Scala初入

何为Scala物   Scala为基于JVM虚拟机中的面向对象与函数式编程思想并且完全兼容Java的混合编程语言,可以是Scala与Java是同根同源的,既然Scala与JAVA都是基于JVM之上的编程语言那么Scala的特色又在哪呢 Scala最大的特色就是他是支持函数式编程的,函数式编程风格使得Scala语法远比Java简洁、优雅、易读懂得多,又因为S...

QOS限速

XX涉及的QOS限速主要有两种: 第一种是针对一个端口下双向IP互访; 第二种是针对多个端口下双向IP互访;(聚合car) 聚合car:是指能够对多个业务使用同一个car进行流量监控,即如果多个端口应用同一个聚合CAR,则这多个端口的流量之和必须在此聚合CAR设定的流量监管范围之内。 总部与分部之间为20M专线。现要求:分支10.2.2.0/24和总部1...

es-09-spark集成

es和spark的集成比较简单, 直接使用内部封装的一些方法即可 版本设置说明: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html maven依赖说明: https://www.elastic.co/guide/en/elasticsearch/ha...

hive函数之~集合统计函数

1、个数统计函数: count *** 语法: count(*), count(expr), count(DISTINCT expr[, expr_.])返回值: int说明: count(*)统计检索出的行的个数,包括NULL值的行;count(expr)返回指定字段的非空值的个数;count(DISTINCT expr[, expr_.])返回指定字段...

hiveql函数笔记(二)

1、数据查询 //提高聚合的性能 SET hive.map.aggr=true; SELECT count(*),avg(salary) FROM employees; //木匾不允许在一个查询语句中使用多于一个的函数(DISTINCT。。。)表达式 SELECT count(DISTINCT symbol) FROM stocks; 表生成函数: exp...

Scala从入门到放弃(三)Scala的数组、映射、元组和集合

1.数组 1.1定长数组和变长数组 object ArrayDemo { def main(args: Array[String]): Unit = { //初始化一个长度为8的定长数组,其数组元素均为0 val arr1 = new Array[Int](8) //直接打印定长数组,内容为数组的hashcode值 pr...