flink连接hbase方法及遇到的问题

摘要:
1.继承RichSinkFunction类mvn配置:<相关性>&书信电报;组ID>org.apache.frink</组ID>&书信电报;artifactId>退缩_ 2.12</artifactId>&书信电报;版本>1.7.2</版本></从属关系&

1、继承 RichSinkFunction 类

  mvn配置:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_2.12</artifactId>
            <version>1.7.2</version>
        </dependency>
  <dependency>

<groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<exclusions>
<exclusion>
<artifactId>xml-apis</artifactId>
<groupId>xml-apis</groupId>
</exclusion>
</exclusions>
</dependency>

  config配置:

  flink连接hbase方法及遇到的问题第1张

  flink连接hbase方法及遇到的问题第2张

  flink接入config代码:

  

    public static void main(String[] args) throws Exception {
        /*
        Env and Config
         */
        if (args.length > 0) {
            configEnv = args[0];
        }

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String confName = String.format("xxx.%s.properties", configEnv);
        InputStream in = MidasCtr.class.getClassLoader().getResourceAsStream(confName);

        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(in);
        env.getConfig().setGlobalJobParameters(parameterTool);
}

  

  代码:  

package midas.knowbox;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class WriteHbaseRich extends RichSinkFunction<AdDot> {
    private Connection conn = null;
    private Table table = null;

    private static String zkServer;
    private static String zkPort;
    private static TableName tableName;

    private static final String click = "click";
    BufferedMutatorParams params;
    BufferedMutator mutator;

    @Override
    public void open(Configuration parameters) throws Exception {
        ParameterTool para = (ParameterTool)
                getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        zkServer = para.getRequired("hbase.zkServer");
        zkPort = para.getRequired("hbase.zkPort");
        String tName = para.getRequired("hbase.tableName");
        tableName = TableName.valueOf(tName);


        org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();

        config.set("hbase.zookeeper.quorum", zkServer);
        config.set("hbase.zookeeper.property.clientPort", zkPort);

        conn = ConnectionFactory.createConnection(config);
        Admin admin = conn.getAdmin();
        admin.listTableNames();
        if (!admin.tableExists(tableName)) {
            HTableDescriptor tableDes = new HTableDescriptor(tableName);

            tableDes.addFamily(new HColumnDescriptor(click).setMaxVersions(3));

            System.out.println("create table");
            admin.flush(tableName);
        }
        // 连接表
        table = conn.getTable(tableName);

        // 设置缓存
        params = new BufferedMutatorParams(tableName);
        params.writeBufferSize(1024);
        mutator = conn.getBufferedMutator(params);
    }

    @Override
    public void invoke(AdDot record, Context context) throws Exception {
        Put put = new Put(Bytes.toBytes(String.valueOf(record.userID)));
        System.out.println("hbase write");

        System.out.println(record.recent10Data);
        put.addColumn(Bytes.toBytes(click),Bytes.toBytes("recent_click"),Bytes.toBytes(String.valueOf(record.toJson())));


        mutator.mutate(put);
        System.out.println("hbase write");
    }

    @Override
    public void close() throws Exception {
        mutator.flush();
        conn.close();
    }
}

   调用:

dataStream.addSink(new WriteHbaseRich());

2、实现接口OutputFormat(不知道如何使用flink的配置文件)

  

package midas.knowbox;

import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;

public class WriteHbase implements OutputFormat<AdDot> {

    private Connection conn = null;
    private Table table = null;

    private static String zkServer = "";
    private static String port = "2181";
    private static TableName tableName = TableName.valueOf("test");

    private static final String userCf = "user";
    private static final String adCf = "ad";

    @Override
    public void configure(Configuration parameters) {
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();

        config.set("hbase.zookeeper.quorum", zkServer);
        config.set("hbase.zookeeper.property.clientPort", port);

        conn = ConnectionFactory.createConnection(config);
        Admin admin = conn.getAdmin();
        admin.listTableNames();
        if (!admin.tableExists(tableName)) {

            // 添加表描述
            HTableDescriptor tableDes = new HTableDescriptor(tableName);

            // 添加列族
            tableDes.addFamily(new HColumnDescriptor(userCf));
            tableDes.addFamily(new HColumnDescriptor(adCf));

            // 创建表
            admin.createTable(tableDes);
        }
        table = conn.getTable(tableName);
    }

    @Override
    public void writeRecord(AdDot record) throws IOException {
        Put put = new Put(Bytes.toBytes(record.userID + "_" + record.adID + "_" + record.actionTime)); // 指定行
        // 参数分别:列族、列、值
        put.addColumn(Bytes.toBytes("user"), Bytes.toBytes("uerid"), Bytes.toBytes(record.userID));
        put.addColumn(Bytes.toBytes("ad"), Bytes.toBytes("ad_id"), Bytes.toBytes(record.adID));

        table.put(put);
    }

    @Override
    public void close() throws IOException {
          conn.close()
    }
}    

3、遇到的问题

  写入hbase的时候出现包引用错误 剔除 xml-apis 就好了

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>xml-apis</artifactId>
                    <groupId>xml-apis</groupId>
                </exclusion>
            </exclusions>
 </dependency>

  flink连接hbase方法及遇到的问题第3张

免责声明:文章转载自《flink连接hbase方法及遇到的问题》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Oracle触发器详细 和 Oracle 创建序列号如何打包和部署air应用程序下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

ASP.NET Core SignalR (七):考虑设计向后兼容的SignalR API

此为系列文章,对MSDN ASP.NET Core SignalR 的官方文档进行系统学习与翻译。其中或许会添加本人对 ASP.NET Core 的浅显理解。 使用自定义参数对象来确保向后兼容         向SignalR 中心 方法添加参数(要么是服务端方法,要么是客户端方法)是一个重大的变化。这就意味着老的 服务端/客户端在不带有预期个数的参数进行...

FastJson简单实现@JsonInclude效果,使得非空字段不返回

引言:记录最近一次做项目过程中碰到的一个FastJson序列化的问题,本次项目基于spring boot实现,在接口返回数据的时候,实体类的序列化是由FastJson完成的,但是由于功能需要,我需要将某个实体类中的些为空的字段则不返回,但是不能改动FastJson作为序列化的大逻辑,也就是说不能将序列化由FastJson替换为JackSon,但是要实现Ja...

mybatis教程:入门&amp;gt;&amp;gt;精通&amp;gt;&amp;gt;实战

以前曾经用过ibatis,这是mybatis的前身,当时在做项目时,感觉很不错,比hibernate灵活。性能也比hibernate好。而且也比较轻量级,因为当时在项目中,没来的及做很很多笔记。后来项目结束了,我也没写总结文档。已经过去好久了。但最近突然又对这个ORM 工具感兴趣。因为接下来自己的项目中很有可能采用这个ORM工具。所以在此重新温习了一下 m...

JAVA对文件类型的校验

通常,在WEB系统中,上传文件时都需要做文件的类型校验,大致有如下几种方法: 1. 通过后缀名,如exe,jpg,bmp,rar,zip等等。 2. 通过读取文件,获取文件的Content-type来判断。 3. 通过读取文件流,根据文件流中特定的一些字节标识来区分不同类型的文件。 4. 若是图片,则通过缩放来判断,可以缩放的为图片,不可以的则不是。 然而...

dynamic-insert和dynamic-update属性

dynamic-insert 作用:设置对象中没有值的字段 insert并不会对其进行插入. 实体类映射配置如下 <!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN" "http://www.hibernate.o...

帝国CMS 复制word里面带图文的文章,图片可以直接显示

1.4.2之后官方并没有做功能的改动,1.4.2在word复制这块没有bug,其他版本会出现手动无法转存的情况 本文使用的后台是Java。前端为Jsp(前端都一样,后台如果语言不通得自己做 Base64编码解码) 因为公司业务需要支持IE8 ,网上其实有很多富文本框,效果都很好。 例如www.wangEditor.com  但试了一圈都不支持IE8 。 所...