[Spark SQL]Spark SQL读取Kudu,写入Hive

摘要:
Kudu_To_Hive,简称KTVpackagecom。实例dao导入com。实例单位。SparkUnitimportorg。阿帕奇。公园sql。ParkSessionobjectKTV{defgetKuduTableDataFrame:Unit={//读取kudu//获取tb对象valkuduTb=ss.Read.format.option.option//提示:指定库的注意事项.load()//createviewkuduTb.createTempViewvalkudu_unit1_df=ss.sql/printkudu_unit1_df.printSchema()kudu_unit1_df.show()//loadofmemorykudu_ unit1_df.createOrReplaceTempView}definersertHive:Unit={//createtables.sqlss.sqlprintln(“创建表成功!

SparkUnit
Function:用于获取Spark Session

package com.example.unitl

import org.apache.spark.sql.SparkSession

object SparkUnit {
def getLocal(appName: String): SparkSession = {
SparkSession.builder().appName(appName).master("local[*]").getOrCreate()
}

def getLocal(appName: String, supportHive: Boolean): SparkSession = {
if (supportHive) getLocal(appName,"local[*]",true)
else getLocal(appName)
}

def getLocal(appName:String,master:String,supportHive:Boolean): SparkSession = {
if (supportHive) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate()
else SparkSession.builder().appName(appName).master(master).getOrCreate()
}

def stopSs(ss:SparkSession): Unit ={
if (ss != null) {
ss.stop()
}
}
}

 
log4j.properties
Function:设置控制台输出级别

# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
 
KTV
Function:读取kudu,写入hive。Kudu_To_Hive,简称KTV

package com.example.dao

import com.example.unitl.SparkUnit
import org.apache.spark.sql.SparkSession

object KTV {
def getKuduTableDataFrame(ss: SparkSession): Unit = {
// 读取kudu
// 获取tb对象
val kuduTb = ss.read.format("org.apache.kudu.spark.kudu")
.option("kudu.master", "10.168.1.12:7051")
.option("kudu.table", "impala::realtimedcs.wtr31") // Tips:注意指定库
.load()

// create view
kuduTb.createTempView("wtr31")

val kudu_unit1_df = ss.sql(
"""
|SELECT * FROM `wtr31`
|WHERE `splittime` = "2021-07-11"
|""".stripMargin)

// print
kudu_unit1_df.printSchema()
kudu_unit1_df.show()

// load of memory
kudu_unit1_df.createOrReplaceTempView("wtr31_bakup")
}

def insertHive(ss: SparkSession): Unit = {
// create table
ss.sql(
"""
|USE `realtimebakup`
|""".stripMargin)

ss.sql(
"""
| CREATE TABLE IF NOT EXISTS `dcs_wtr31_bakup`(
| `id` int,
| `packtimestr` string,
| `dcs_name` string,
| `dcs_type` string,
| `dcs_value` string,
| `dcs_as` string,
| `dcs_as2` string)
| PARTITIONED BY (
| `splittime` string)
|""".stripMargin)
println("创建表成功!")

// create view
ss.sql(
"""
|INSERT INTO `dcs_wtr31_bakup`
|SELECT * FROM wtr31_bakup
|""".stripMargin)
println("保存成功!")
}

def main(args: Array[String]): Unit = {
//get ss
val ss = SparkUnit.getLocal("KTV", true)
// 做动态分区, 所以要先设定partition参数
// default是false, 需要额外下指令打开这个开关
ss.sqlContext.setConf("hive.exec.dynamic.partition;","true");
ss.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict");

// 调用方法
getKuduTableDataFrame(ss)
insertHive(ss)

// 关闭连接
SparkUnit.stopSs(ss)
}

运行截图:
注意到那个红框了吗?运行时请将hive的配置文件 hive-site.xml文件,复制到项目resource下,否则必报错。

[Spark SQL]Spark SQL读取Kudu,写入Hive第1张


hue查看写入的数据:

 [Spark SQL]Spark SQL读取Kudu,写入Hive第2张

————————————————
版权声明:本文为CSDN博主「NBA首席形象大使阿坤」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_44491709/article/details/118656681

免责声明:文章转载自《[Spark SQL]Spark SQL读取Kudu,写入Hive》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Docker 常用命令与操作宜信数据采集平台DBus-allinone部署实战案例下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Android 演示 Android ListView 和 github XListView(1-3)

本文内容 环境 项目结构 演示 1:ListView 演示 2:简单 XListView 演示 3:音乐列表 XListView 演示 4:另一个音乐列表 XListView 本文四个演示,循序渐进。 演示 1 只是普通的 Android ListView 控件; 演示 2 是 GitHub 上的 XListView 控件,具备“下拉...

Java 查询数据后进行递归操作

java的递归方法记录: private List<Map<String, Object>> generateOrgMapToTree(List<Map<String, Object>>orgMaps, Integer pid) { if (null == orgMaps || orgMaps...

Json Schema简介

1. 引言 什么是Json Schema? 以一个例子来说明 假设有一个web api,接受一个json请求,返回某个用户在某个城市关系最近的若干个好友。一个请求的例子如下: { "city" : "chicago", "number": 20, "user" : { "name":"Alex",...

springboot备份mysql后发送邮件并删除备份文件,支持win和Linux

首先加入springboot的邮箱依赖 <!--邮箱依赖--> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-mail --> <dependency> <groupId>...

判断session或者是cookies为空的时候出现“未将对象引用设置到对象的实例”的报错

在我们做asp.net网站的时候 经常会判断一个值是不是为空 比如session是否为空 或者是cookies是否为空  如果为空的时候 你就已经引用了 那么就会报错 错误信息为 未将对象引用设置到对象的实例   那么这2个的判断也是不一样的    错误的例子 1 if (!string.IsNullOrEmpty(Request.QueryStri...

Unity3D研究之支持中文与本地文件的读取写入(转)

前几天有个朋友问我为什么在IOS平台中可以正常的读写文件可是在Android平台中就无法正常的读写。当时因为在上班所以我没时间来帮他解决,晚上回家后我就拿起安卓手机真机调试很快就定位问题所在,原来是他文件的路径写错了。开发中往往一道很难的问题解开的时候发现原来真的非常的简单,哇咔咔。 刚好在MOMO的书中也有涉及到文件的读取与写入,那么本节我将书中的部分...