通过hive自定义函数直接回写数据到数据库

摘要:
配置单元通常用于执行离线统计分析相关功能,然后将执行结果导入数据库表,以可视化显示前端报表进行查询。有许多方法可以导入回数据库,例如sqoop、hivejdbc、mrjdbc等,但所有这些方法都有一个辅助处理阶段。这次我们将介绍另一种处理方法,它将数据库操作直接集成到udf中,这样我们就可以直接编写一个hql查询语句。

hive一般用来执行离线统计分析相关的功能,然后将执行的结果导入到数据库的表中供前端报表可视化展现来查询。

导回数据库的方式有许多,sqoop、hive jdbc、mr jdbc等等,但是这几种方式都会有一个二次处理环节(数据需要人工)。

这次介绍另外一种处理方式,直接将对数据库的操作集成在udf中,这样直接写一个hql查询语句就可以了。

代码如下:

package com.taisenki.tools;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;

/**
 * 在hive0.13版本之上才能注册永久函数,否则只能注册临时函数
 * @author taisenki
 *
 */
@Description(name = "batch_import",  value = "_FUNC_(sql, args1[, args2,...][, config_path]) - Return ret ")  
public class SqlBatchImportUDF extends GenericUDF {

    public static final String DEFAULT_CONFIG_ROOT_PATH = "/user/hive/udf/sjk/"; 
    public static final String DEFAULT_CONFIG_FILE_NAME = "sjk.properties";  
    public static final String DEFAULT_CONFIG_FILE_SUFFIX = "properties";  
    private IntObjectInspector retValInspector; 
    private String sql;   
    private PrimitiveObjectInspector[] paramsInspectors;  
    private int insert = 0;
    private Connection conn;
    private PreparedStatement psi;
    private int count = 0;
    
    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
        try {if (insert > 0) {
                psi.executeBatch();
                conn.commit();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally{
            try {
                if(conn != null)
                    conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        super.close();
    }

    @Override
    public Object evaluate(DeferredObject[] arg0) throws HiveException {
        // TODO Auto-generated method stub
        try {
            for (int i = 0; i < count; i++) {  
                Object param = paramsInspectors[i].getPrimitiveJavaObject(arg0[i + 1].get());  
                psi.setObject(i + 1, param);  
            }  
            psi.addBatch();
            insert++;
            if(insert>1000){
                psi.executeBatch();
                conn.commit();
                insert = 0;
            }  
            IntWritable iw = new IntWritable(insert);  
            return retValInspector.getPrimitiveWritableObject(iw);  

        } catch (SQLException e) {
            e.printStackTrace();
            throw new HiveException(e);
        }
    }

    @Override
    public String getDisplayString(String[] arg0) {
        // TODO Auto-generated method stub
        return "batch_import(sql, args1[, args2,...][, config_path])"; 
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arg0)
            throws UDFArgumentException {
        // TODO Auto-generated method stub
        if (arg0.length < 2) {  
            throw new UDFArgumentException(" Expecting  at least two arguments ");  
        }  
        insert = 0;
        //第一个参数校验,必须是一个非空的sql语句  
        if (arg0[0].getCategory() == Category.PRIMITIVE  
                && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {  
            if (!(arg0[0] instanceof ConstantObjectInspector)) {  
                throw new UDFArgumentException("the frist arg   must be a sql string constant");  
            }  
            ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[0];  
            this.sql = sqlInsp.getWritableConstantValue().toString();  
            int i = -1;
            count = 0;
            while (true) {
                i = sql.indexOf("?", i + 1);
                if (i == -1) {
                    break;
                }
                count++;
            }
            if (this.sql == null || this.sql.trim().length() == 0) {  
                throw new UDFArgumentException("the frist arg   must be a sql string constant and not nullable");  
            }  
        }  
        
        if (count+1 > arg0.length){
            throw new UDFArgumentException("arguments not enough with this sql["+(arg0.length-1)/count+"]");  
        }
        
        //默认情况
        String fileName1 = SqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + SqlBatchImportUDF.DEFAULT_CONFIG_FILE_NAME;
        //判断是否存在指定的配置文件路径
        if (count+1 < arg0.length){
            //第一个参数校验  
            if (arg0[count+1].getCategory() == Category.PRIMITIVE  
                    && ((PrimitiveObjectInspector) arg0[count+1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {  
                if (!(arg0[count+1] instanceof ConstantObjectInspector)) {  
                    throw new UDFArgumentException("mysql connection pool config path  must be constant");  
                }  
                ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[count+1];  
      
                fileName1 = propertiesPath.getWritableConstantValue().toString();  
                Path path1 = new Path(fileName1);  
                if (path1.toUri().getScheme() == null) {  
                    if (!"".equals(FilenameUtils.getExtension(fileName1)) && !DEFAULT_CONFIG_FILE_SUFFIX.equals(FilenameUtils.getExtension(fileName1))) {  
                        throw new UDFArgumentException("不支持的文件扩展名,目前只支持properties文件!");  
                    }  
                    //如果是相对路径,补齐根路径  
                    if (!fileName1.startsWith("/")) {  
                        fileName1 = SqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + fileName1;  
                    }  
                }  
                //如果只写了文件前缀的话,补上后缀  
                if (!FilenameUtils.isExtension(fileName1, DEFAULT_CONFIG_FILE_SUFFIX)) {  
                    fileName1 = fileName1 + FilenameUtils.EXTENSION_SEPARATOR_STR + DEFAULT_CONFIG_FILE_SUFFIX;  
                }  
            } 
        }

        Properties properties = new Properties();  
        Configuration conf = new Configuration();  
        Path path2 = new Path(fileName1);  

        try (FileSystem fs = FileSystem.newInstance(path2.toUri(), conf); //这里不能用FileSystem.get(path2.toUri(), conf),必须得重新newInstance,get出来的是共享的连接,这边关闭的话,会导致后面执行完之后可能出现FileSystem is closed的异常  
             InputStream in = fs.open(path2)) {  
            properties.load(in);
        } catch (FileNotFoundException ex) {  
            throw new UDFArgumentException("在文件系统中或者是HDFS上没有找到对应的配置文件");  
        } catch (Exception e) {  
            e.printStackTrace();  
            throw new UDFArgumentException(e);  
        }  
        
        try {
            Class.forName(properties.getProperty("driverClassName"));
            System.out.println(properties.getProperty("driverClassName"));
            System.out.println(properties.getProperty("url"));
            conn = DriverManager.getConnection(properties.getProperty("url"), properties);
            psi = conn.prepareStatement(sql);
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            throw new UDFArgumentException(e);  
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            throw new UDFArgumentException(e);  
        }  
        //中间为参数
        paramsInspectors = new PrimitiveObjectInspector[count];  
        for (int i = 0; i < count; i++) {  
            paramsInspectors[i] = (PrimitiveObjectInspector) arg0[i+1];  
        }  
        retValInspector = PrimitiveObjectInspectorFactory.writableIntObjectInspector; 
        return retValInspector;
    }

}

然后上传jar包,注册udf,注意此处需把对应数据库的驱动包一同进行注册操作:

如回写oracle: create function default.oracleSave as 'com.taisenki.tools.SqlBatchImportUDF' using jar 'hdfs://cdh5/data/lib/test.jar', jar 'hdfs://cdh5/data/lib/ojdbc6.jar';

然后写一个HQL测试一下:

select oracleSave('insert into test111 values (?)',b.id) from (select 2 id from dual) b;

UDF第一个参数是静态参数,是对应数据库的sql语句,描述入库方式,然后后面的参数就不固定了,一一对应sql语句中的占位符,比如我上面有1个占位符,然后我后面就跟了1个参数。

若传入的参数恰好比占位符多1个的时候,最后一个参数则为指定数据库配置文件名,里面配置了如何开启连接池连接哪个数据库什么的。

附上一个默认的sjk.properties:

driverClassName=oracle.jdbc.driver.OracleDriver
url=jdbc:oracle:thin:@host:port:inst
user=test
password=test

此处注意,如果是hive 0.13以下的版本,是不支持注册永久function的,请使用
create temporaryfunction来进行,而且只支持session级别的,断开后自动消失……

免责声明:文章转载自《通过hive自定义函数直接回写数据到数据库》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇深入理解php底层:php生命周期关于ZFS、GPT、4K、Geom Label的一些说明下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Hive学习小记-(14)如何写SQL求出中位数平均数和众数(count 之外的方法)

平均数中位数众数 平均数、中位数、众数都是度量一组数据集中趋势的统计量。所谓集中趋势是指一组数据向某一中心值靠拢的倾向,测度集中趋势就是寻找数据一般水平的代表值或中心值。而这三个特征数又各有特点,能够从不同的角度提供信息。 平均数 特点:计算用到所有的数据,它能够充分利用数据提供的信息,它具有优秀的数学性质,因此在实际应用中较为广泛。但它受极端值的影响...

Hive实现自增列的两种方法

多维数据仓库中的维度表和事实表一般都需要有一个代理键,作为这些表的主键,代理键一般由单列的自增数字序列构成。Hive没有关系数据库中的自增列,但它也有一些对自增序列的支持,通常有两种方法生成代理键:使用row_number()窗口函数或者使用一个名为UDFRowSequence的用户自定义函数(UDF)。 用row_number()函数生成代理键 INS...

二、给openTCS4.12做汉化

openTCS目前支持英语和德语 我们今天讲解如何汉化 1.修改配置文件 (1)修改openTCS-Kernel的配置文件(将语言改成chinese) (2) 修改openTCS-KernelControlCenter的配置文件 (3)修改openTCS-PlantOverview的配置文件 2.新增多语言资源文件 将项目中所有的资源文件都复制两份(下图只...

ASP.NET Core中添加MIME 类型

目录 #事故现场 #解决方法 #事故现场 在asp.net core 中使用pdf.js插件,然后遇到一个问题,发现pdf的工具条的文字都是英文的;打开浏览器的控制台,发现有一个报错:http://localhost:2076/lib/pdf.js/web/locale/zh-CN/viewer.properties这个文件404了。而且还报了很...

CDH| Cloudera Manager的安装

一.  cloudera manager的概念 简单来说,Cloudera Manager是一个拥有集群自动化安装、中心化管理、集群监控、报警功能的一个工具(软件),使得安装集群从几天的时间缩短在几个小时内,运维人员从数十人降低到几人以内,极大的提高集群管理的效率。 功能: 1)   管理:对集群进行管理,如添加、删除节点等操作。 2)   监控:监控集群...

码云Android项目构建注意事项(转载)

1、ant项目 build.xml必须位于项目根目录。 2、maven项目 pom.xml必须位于项目根目录。 3、gradle项目 由于gradle的配置灵活,我们做了一些规范,并且增加了一下机制来处理签名问题。 二、码云Android项目构建示例 android-app 项目目录结构(以osc/android-app为例) gradle 需要的配置和...