scala之 spark连接SQL和HIVE/IDEA操作HDFS

摘要:
1、 连接SQL方法1:packagecom.njbdqn.linkSqlimportjava.util.Propertiesimportororg.apache.spark.SparkSessionimportororg.apache.shark。sql_objectLinkSql{defmain(args:Array[String]):单元={valspark=SparkSession.b

一、连接SQL

方法一、

package com.njbdqn.linkSql

import java.util.Properties

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._

object LinkSql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("apptest").master("local[2]").getOrCreate()
    // 1.properties
    val prop = new Properties()
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    prop.setProperty("user","root")
    prop.setProperty("password","root")
    // 2.jdbcDF show
    val jdbcDF = spark.read.jdbc("jdbc:mysql://192.168.56.111:3306/test","studentInfo",prop)
    jdbcDF.show(false)
    // 3.添加一行
    import spark.implicits._
    val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq((90, "抖抖抖", "男", 23, "sdf", "sdfg@dfg"),(8, "抖33", "男", 23, "s444f", "sdfg@dfg"))))
      .toDF("sid","sname","sgender","sage","saddress","semail")
  //  df.show(false)
    df.write.mode("append").jdbc("jdbc:mysql://192.168.56.111:3306/test","studentInfo",prop)

  }
}

方法二、

package com.njbdqn

import org.apache.spark.sql.{DataFrame, SparkSession}

object KMeansTest {
  def readMySQL(spark:SparkSession):DataFrame ={
    val map:Map[String,String]=Map[String,String](
      elems="url"->"jdbc:mysql://192.168.56.111:3306/myshops",
      "driver" -> "com.mysql.jdbc.Driver",
      "user" ->"root",
      "password"->"root",
      "dbtable"->"customs"
    )
    spark.read.format("jdbc").options(map).load()
  }
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("db").master("local[*]").getOrCreate()
    readMySQL(spark).select("cust_id","company","province_id","city_id","district_id","membership_level","create_at","last_login_time","idno","biz_point","sex","marital_status","education_id","login_count","vocation","post")
      .show(20)
    spark.stop()

  }
}

 方法三、读取Resource上写的.properties配置:

https://www.cnblogs.com/sabertobih/p/13874061.html

二、连接HIVE

(一)8 9月写的,没有理解,写的不好

1.添加resources

scala之 spark连接SQL和HIVE/IDEA操作HDFS第1张

 2.代码

package com.njbdqn.linkSql

import org.apache.spark.sql.SparkSession

object LinkHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("apptest").master("local[2]")
      .enableHiveSupport()
      .getOrCreate()
    spark
       // .sql("show databases")
      .sql("select * from storetest.testhive")
      .show(false)
  }
}

 注意!如果XML配置中配置的是集群,  val df = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv") 就失败了,因为

>>> spark可以读取本地数据文件,但是需要在所有的节点都有这个数据文件(亲测,在有三个节点的集群中,只在master中有这个数据文件时执行textFile方法一直报找不到文件,

在另外两个work中复制这个文件之后,就可以读取文件了)

>>> 解决:删除配置(本地)/上传到hdfs(集群)


 (二)12月25日写的

pom文件:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.3.4</version>
    </dependency>

代码中增加配置:hive.metastore.uris 

开启metastore元数据共享,

修改方案详见:https://www.pianshen.com/article/8993307375/

为什么这样修改?原理见:https://www.cnblogs.com/sabertobih/p/13772933.html

import org.apache.spark.sql.SparkSession

object EventTrans {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]")
      .config("hive.metastore.uris","thrift://192.168.56.115:9083") # 配置metastore server的访问地址,该server必须开启服务
      .appName("test")
      .enableHiveSupport().getOrCreate()
    spark.sql("select * from dm_events.dm_final limit 3")
      .show(false)

    spark.close()

  }
}

1)192.168.56.115 需要开启metastore服务

hive --service metastore

如果不启动服务,在启动Spark thriftServer服务的时候会报如下错误:

org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

2)192.168.56.115 需要配置直连mysql

验证:

scala之 spark连接SQL和HIVE/IDEA操作HDFS第2张

三、操作HDFS 之 删除

    val spark = SparkSession.builder().master("local[*]").appName("app").getOrCreate();
    /**
     *  删除checkpoint留下的过程数据
     */
    val path = new Path(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"); //声明要操作(删除)的hdfs 文件路径
    val hadoopConf = spark.sparkContext.hadoopConfiguration
    val hdfs = org.apache.hadoop.fs.FileSystem.get(new URI(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"),hadoopConf)
    if(hdfs.exists(path)) {
      //需要递归删除设置true,不需要则设置false
      hdfs.delete(path, true) //这里因为是过程数据,可以递归删除
    }

出现的问题:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://h1:9000/out, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:381)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:55)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:393)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:452)
at mapreduce.WordCountApp.main(WordCountApp.java:36)

解决方法:

  val hdfs = org.apache.hadoop.fs.FileSystem.get(new URI(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"),hadoopConf) 

免责声明:文章转载自《scala之 spark连接SQL和HIVE/IDEA操作HDFS》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Spark SQL 编程NVelocity系列 → NVelocity配置详解下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Hive on Spark 和Hive on mr执行结果不一致原因剖析

一、Hive 执行引擎概述 目前hive执行引擎支持三种,包括mr、spark和Tz;mr是hive最早支持的数据类型,执行速度最慢,但是性能上也是最为稳定的;spark和Tz是后续支持的执行引擎,也是将hive的SQL语句转换为spark可以识别的sparksql语句进行执行。 二、当有空表出现时,spark和mr执行引擎结果不一样问题排查 示例:sel...

SpringBootSecurity学习(22)前后端分离版之OAuth2.0自定义授权码

使用JDBC维护授权码 前面的代码中,测试流程第一步都是获取授权码,然后再携带授权码去申请令牌,授权码示例如下: 产生的授权码默认是 6 位的,产生以后并没有做任何管理,可以说是一个临时性的授权码,oauth2也提供了将授权码使用jdbc进行管理的功能,首先在数据库中创建表 oauth_code : code:存储服务端系统生成的code的值(未加密...

scala lift环境搭建

 Intellij IDEA + scala插件 工欲善其事,必先利其器! 学习scala已经有一段时间了,对scala这门语言爱不释手,但同时也为scala糟糕的IDE工具支持感到懊恼(我是一个100%的IDE支持者)。由于社区关注度还不是很高,scala缺乏像java那样强大的ide支持。scala官方网站上列出了三种主流的IDE插件(eclips...

nutch+hadoop 配置使用

nutch+hadoop 配置使用 配置nutch+hadoop 1,下载nutch。如果不需要特别开发hadoop,则不需要下载hadoop。因为nutch里面带了hadoop core包以及相关配置 2,建立目录(根据自己喜好) /nutch /search       (nutch installation goes here) nutch安装到这里...

scala对复杂json的处理

本次代码主要侧重为flink stream流解析cannal-json,经过多次实验,发现还是阿里的fastjson较为好用,故在此做记录 将依赖引入 <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId&g...

sqoop迁移

3.1 概述 sqoop是apache旗下一款“Hadoop和关系数据库服务器之间传送数据”的工具。 导入数据:MySQL,Oracle导入数据到Hadoop的HDFS、HIVE、HBASE等数据存储系统; 导出数据:从Hadoop的文件系统中导出数据到关系数据库 3.2 工作机制 将导入或导出命令翻译成mapreduce程序来实现 在翻译出的mapre...