HBase海量数据存储

摘要:
HBaseHBase是一个基于HDFS的非关系型数据库HBase的特点1.海量数据存储,HBase中的表可以容纳上百亿行x上百万列的数据。
HBase

HBase是一个基于HDFS的非关系型数据库(海量数据存储)

HBase的特点

1.海量数据存储,HBase中的表可以容纳上百亿行x上百万列的数据。

2.列式存储,HBase中的数据是基于列进行存储的,能够动态的增加和删除列。

3.准实时查询,HBase在海量的数据量下能够接近准实时的查询(百毫秒以内)

4.多版本,HBase中每一列的数据都有多个版本。

5.高可靠性,HBase中的数据存储于HDFS中且依赖于Zookeeper进行Master和RegionServer的协调管理。

1.HBase的表结构

HBase海量数据存储第1张

HBase中的表由RowKey、ColumnFamily、Column、Timestamp组成。

RowKey

记录的唯一标识,相当于关系型数据库中的主键。

RowKey的最大长度为64KB且按字典顺序进行排序存储。

HBase会自动为RowKey加上索引,当按RowKey查询时速度会很快。

ColumnFamily

列簇是列的命名空间,是逻辑上类型相同的一组列,每个列簇下可以有任意数量的列,HBase在创建表时只需要指定表名以及列簇(表中的列簇最好不超过5个)

列簇下的列有着相同的前缀,使用冒号来对列簇和列名进行分隔。

Column

当创建了一张HBase的表时,默认只有列簇,列只有在插入数据后才存在(列在列簇下是有序的),为空的列不占用存储空间。

Timestamp

每个列的value都有多个版本,每个版本对应一个时间戳,在进行插入时由HBase自动进行赋值。

2.HBase的物理模型

2.1 Region

HBase海量数据存储第2张

HBase表中的数据存储在Region当中,每个Region都包含MemoryStore和StoreFile,MemoryStore位于内存当中,每个列簇都对应一个MemoryStore,而StoreFile存储在HDFS当中,每当MemoryStore中的数据达到128M时将会生成一个StoreFile并写入到HDFS中。

由Master将新的Region分配到相应的RegionServer中,实现负载均衡。

2.2 RegionServer

HBase海量数据存储第3张

RegionServer负责管理Region,会将Region所产生的StoreFile写入到HDFS当中,同时当Reigon中的StoreFile超过一定的数量时,会对StoreFile进行合并,当StoreFile的文件大小达到指定的阀值时,会对Region进行切分。

RegionServer负责对表中的数据进行操作(插入、删除、查询、更新)

2.3 Master

HBase海量数据存储第4张

Master负责对表进行操作,同时当RegionServer对Region进行切分后,Master会将新的Region分配给合适RegionServer进行管理(负载均衡),当RegionServer宕机后负责RegionServer上的Region迁移(通过WAL日志)

如果Master失效了仅会导致mete数据和表无法被修改,表中的数据仍然可以进行读取和写入。

2.4 关于Zookeeper在HBase中的作用

当HBase集群启动后,Master和RegionServer会分别向Zookeeper进行注册,并且在Zookeeper中保存meta表数据、Region与RegionServer的关系,以及RegionServer的访问地址等信息。

meta表中维护着TableName与RowKey、RowKey与Region的关系。

1.存放HBase的meta表数据、Region与RegionServer的关系、以及RegionServer的访问地址等信息。

2.保证Master的高可用,当状态为Active的Master无法对外提供服务时,会将状态为StandBy的Master切换为Active状态。

3.实时监控RegionServer,当某个RegionServer节点无法提供服务时将会通知Master,由Master进行RegionServer上的Region迁移。

2.5 HBase处理读请求的流程

1.HBase Client连接Zookeeper,根据TableName和RowKey从Zookeeper中查询这些记录对应存放的Region以及所关联的RegionServe。

2.HBase Client请求这些RegionServer并找到对应的Region。

3.如果Region的MemoryStore中存在目标的RowKey则直接从MemoryStore中进行查询,否则从StoreFile中进行查询。

2.6 HBase处理写请求的流程

1.HBase Client连接Zookeeper,根据TableName找到其Region列表及其关联的RegionServer。

2.然后通过一定的算法计算出数据要写入的Region,并依次连接要写入的Region其对应的RegionServer。

3.然后把数据写入到HLog和Region的MemoryStore中。

4.每当MemoryStore中的大小达到128M时,会生成一个StoreFile,并写入到HDFS中。

5.当StoreFile的数量超过一定时,会进行StoreFile的合并,将多个StoreFile文件合并成一个StoreFile,当StoreFile的文件大小超过指定的阈值时,会进行Region的切分,由Master将新的Region分配给合适的RegionServer进行管理(负载均衡)

HBase Client会在第一次读取或写入时才需要连接Zookeeper,会将Zookeeper中的相关数据缓存到本地,往后直接从本地进行查询,当Zookeeper中的信息发生改变时,将会通过通知机制去通知HBase Client进行更新。

2.7 HBase在HDFS中的目录

HBase海量数据存储第5张

tmp目录:当创建和删除HBase的表时,会将表移动到该目录中进行操作。
MasterProcWALs目录:预写日志目录,主要用于存储Master的操作日志。
WALs目录:预写日志目录,主要用于存储RegionServer的操作日志。
data目录:存放Region中的StoreFile。
hbase.id文件:HBase集群的唯一标识。
hbase.version文件:HBase集群的版本号。
oldWALs目录:当WALs目录下的日志文件超过一定时间后,会将其移动到oldWALs目录中,Master会定期进行清理。

3.搭建HBase集群

3.1 安装JDK

由于HBase是通过JAVA语言编写的,因此需要安装JDK并配置好JAVA_HOME环境变量。

HBase海量数据存储第6张

HBase海量数据存储第7张

3.2 启动HDFS集群

由于HBase是基于HDFS的,因此需要安装Hadoop并启动HDFS集群。

HBase海量数据存储第8张

3.3 启动Zookeeper集群

由于HBase需要在Zookeeper中保存meta信息以及Region与RegionServer的关系,同时需要依赖Zookeeper保证Master节点的高可用,因此需要搭建Zookeeper集群(HDFS的NameNode也需要通过Zookeeper保证高可用)

HBase海量数据存储第9张

HBase海量数据存储第10张

HBase海量数据存储第11张

3.4安装HBase

1.从CDH中下载HBase并进行解压 。

HBase海量数据存储第12张

3.5 修改配置

1.修改hbase-env.sh配置文件

#设置JDK的安装目录
export JAVA_HOME=/usr/jdk8/jdk1.8.0_161
​
#true则使用hbase自带的zk服务,false则使用外部的zk服务.
export HBASE_MANAGES_ZK=flase

2.修改hbase-site.xml配置文件

<configuration>
    <!--指定HBase日志的存放目录 -->
    <property>
        <name>hbase.tmp.dir</name>
        <value>/usr/hbase/hbase-1.2.8/logs</value>
    </property>
    <!--指定HBase中的数据存储在HDFS中的目录 -->
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://nameservice:8020/hbase</value>
    </property>
    <!--设置是否是分布式 -->
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <!--指定HBase使用的ZK地址 -->
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181</value>
    </property>
</configuration> 

4.修改regionservers文件,配置充当RegionServer的节点

HBase海量数据存储第13张

值可以是主机名或者IP地址。

如果Hadoop配置了HDFS高可用集群,那么就会有两个NameNode和一个NameService,此时就需要将HDFS的core-site.xml和hdfs-site.xml配置文件复制到HBase的conf目录下,且hbase-site.xml配置文件中的hbase.rootdir配置项的HDFS地址要指向NameService的名称。

5.NTP时间同步

NTP是一个时间服务器,作用是使集群中的各个节点的时间都保持一致。

由于在HBase集群中,Zookeeper与HBase对时间的要求较高,如果两个节点之间的时间相差过大,那么整个集群就会崩溃,因此需要使各个节点的时间都保持一致。

#查看是否安装了NTP服务
rpm -qa|grep ntp
​
#安装NTP服务
yum install ntp -y
​
#从NTP服务器中获取时间并同步本地
ntpdate 192.168.1.80

在实际的应用场景中,可以自己搭建NTP服务器,也可以使用第三方开源的NTP服务器,如阿里等。

使用 “ntpdate NTP服务器地址” 命令从NTP服务器中获取时间并同步本地,一般配合Linux的crontab使用,每隔5分钟进行一次时间的同步。

3.6 启动集群

bin/start-hbase.sh

HBase海量数据存储第14张

当执行start-hbase.sh命令后,会在本节点中启动一个Master和一个RegionServer进程,并通过SSH访问其它节点启动RegionServer进程。

HBase海量数据存储第15张

HBase海量数据存储第16张

3.7 配置Master节点的高可用

由于HBase中Master节点的高可用是通过Zookeeper进行协调的,需要在其他节点中手动启动Master,当状态为Active的Master无法对外提供服务时,会将处于StandBy的Master切换为Active状态,对外提供服务。

HBase海量数据存储第17张

3.8 HBase的可视化管理界面

当HBase集群启动后,可以访问http://localhost:16030,进入HBase的可视化管理界面。

HBase海量数据存储第18张

4.使用Shell操作HBase

#进入HBase的可执行命名窗口
bin/hbase shell
#创建表 create 'tableName' , 'columnFamily' , 'columnFamily...' ​ #添加和更新记录 put 'tableName' , 'rowkey' , 'columnFamily:column' , 'value' ​ #查询记录 get 'tableName' , 'rowkey' ​ #查看表中的所有记录 scan 'tableName' ​ #查看表中指定列的所有记录 scan 'tableName' , {COLUMNS=>'columnFamily:column'} ​ #统计表的记录数 count 'tableName' ​ #删除整条记录 deleteall 'tableName' , 'rowkey' ​ #删除记录中的某一列 delete 'tableName' , 'rowkey' ,'columnFamily:column' ​ #禁用表 disable 'tableName' ​ #启动表 enable 'tableName' ​ #查看表是否被禁用 is_disabled 'tableName' ​ #删除表 drop 'tableName' ​ #检查表是否存在 exists 'tableName' ​ #查看当前HBase中的表 list

在删除表时需要禁用表,否则无法删除。

当使用put命令时,如果RowKey不存在则插入一条记录,如果Column已存在则更新value,否则插入Column和value。

5.JAVA中操作HBase

5.1 导入相关依赖

<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>2.2.5</version>
</dependency>

5.2 初始化配置并获取连接

static{
    //使用Configuration对象封装连接信息,比如hbase连接的ZK地址以及端口等。
    //默认会加载classpath下的hbase-site.xml配置文件,如果classpath下存在hbase-site.xml配置文件则不需要通过Configuration实例进行连接的配置。
    Configuration configuration =HBaseConfiguration.create();
    configuration.set("hbase.zookeeper.quorum", ZK_CLUSTER_HOSTS);
    configuration.set("hbase.zookeeper.property.clientPort", ZK_CLUSTER_PORT);
    try{
        connection =ConnectionFactory.createConnection(configuration);
    } catch(Exception e) {
        System.out.println("初始化HBase连接失败" +e);
    }
}

5.3 对表进行管理

使用Admin对象对表进行管理,通过Connection.getAdmin()方法获取一个Admin实例。

//判断表是否存在
booleantableExists(TableName);
​
//获取HBase中表的描述器(表信息)
TableDescriptor [] listTableDescriptors(Pattern);
​
//获取HBase中的表名称
TableName [] listTableNames();
​
//创建表
voidcreateTable(TableDescriptor);
​
//删除表
voiddeleteTable(TableName);
​
//启用表
voidenableTable(TableName);
​
//禁用表
voiddisableTable(TableName);
​
//判断表是否是启用状态
booleanisTableEnabled(TableName);
​
//判断表是否是禁用状态
booleanisTableDisabled(TableName);
​
//为表添加列簇
voidaddColumnFamily(TableName,ColumnDescriptor);
​
//删除表中的列簇
void deleteColumnFamily(TableName,byte);
​
//修改表中的列簇
void modifyColumnFamily(TableName,ColumnDescriptor);

5.4 对表中的数据进行增删改查操作

使用Table对象对表中的数据进行操作,通过Connection.getTable(byte [] tableName)方法获取一个Table实例。

//判断指定的RowKey是否存在
booleanexists(Get get);
​
//添加或更新记录
voidput(Put);
​
//批量添加或更新记录
void put(List<Put>);
​
//根据RowKey获取记录
Result get(Get get);
​
//根据多个RowKey获取记录
Result [] get(List<Get>);
​
//根据指定的条件扫描表
ResultScanner getScanner(Scan);
​
//根据RowKey删除记录
voiddelete(Delete);
​
//批量根据RowKey删除记录
void delete(List<Delete>)

1.使用Put、Delete、Get实例分别用来封装新增/更新、删除、查询操作的参数,通过其构造方法传入RowKey。

2.put()方法可以用来新增和更新记录,当RowKey不存在时则创建记录,否则如果Column相同则更新,否则新增。

3.在进行查询操作时,会返回Result实例,Result实例包含了一条记录的完整信息,同时一个Cell对象对应一个ColumnFamily、Column、Value,可以通过CellUtil工具类来获取Cell实例中对应的ColumnFamily、Column、Value等信息。

4.ResultScanner接口继承Iterable接口,其泛型是Result,也就是Result的集合,可以直接进行遍历。

由于HBase是基于列进行存储的,因此在查询某些列的数据时效率会很高。

由于大部分API都需要传入字节数组类型,可以使用HBase提供了Bytes工具类来进行字符串和字节数组之间的转换。

完整的HBaseUtils

/*** 使用HRecord表示HBase中的一条记录
 */@Data
@Builder(toBuilder = true)
public classHRecord {
​
    privateString rowKey;
​
    private List<HColumn>columnList;
​
    @Data
    @Builder
    public static classHColumn {
​
        privateString columnFamily;
​
        privateString column;
​
        privateString value;
    }
​
}
/*** @author: Zhuang HaoTang
 * @create: 2020-06-25 19:58
 * @description:
 */
public classHBaseUtils {
​
    /*** ZK集群地址
     */
    private static final String ZK_CLUSTER_HOSTS = "localhost";
​
    /*** ZK端口
     */
    private static final String ZK_CLUSTER_PORT = "2181";
​
    /*** HBase全局连接
     */
    private staticConnection connection;
​
    static{
        //使用Configuration对象封装连接信息,比如hbase连接的ZK地址以及端口等。
        //默认会加载classpath下的hbase-site.xml配置文件,如果classpath下存在hbase-site.xml配置文件则不需要通过Configuration实例进行连接的配置。
        Configuration configuration =HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", ZK_CLUSTER_HOSTS);
        configuration.set("hbase.zookeeper.property.clientPort", ZK_CLUSTER_PORT);
        try{
            connection =ConnectionFactory.createConnection(configuration);
        } catch(Exception e) {
            System.out.println("初始化HBase连接失败" +e);
        }
    }
​
    /*---------------------------------------对表进行管理---------------------------------------*//*** 创建表
     */
    public static void createTable(String tableName, String... columnFamilies) throwsException {
        Admin admin =connection.getAdmin();
        //表的描述器
        TableDescriptorBuilder tableDescriptorBuilder =TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
        //存储columnFamily描述器集合
        Collection<ColumnFamilyDescriptor> columnFamilyDescriptorsCollection =Lists.newArrayList();
        for(String columnFamily : columnFamilies) {
            //ColumnFamily描述器
            ColumnFamilyDescriptor columnFamilyDescriptor =ColumnFamilyDescriptorBuilder.of(columnFamily);
            columnFamilyDescriptorsCollection.add(columnFamilyDescriptor);
        }
        tableDescriptorBuilder.setColumnFamilies(columnFamilyDescriptorsCollection);
​
        admin.createTable(tableDescriptorBuilder.build());
    }
​
    /*** 为表添加列簇
     */
    public static void addColumnFamily(String tableName, String columnFamily) throwsException {
        Admin admin =connection.getAdmin();
        admin.addColumnFamily(TableName.valueOf(tableName), ColumnFamilyDescriptorBuilder.of(columnFamily));
    }
​
    /*** 删除表中的列簇
     */
    public static void deleteColumnFamily(String tableName, String columnFamily) throwsException {
        Admin admin =connection.getAdmin();
        admin.deleteColumnFamily(TableName.valueOf(tableName), Bytes.toBytes(columnFamily));
    }
​
    /*** 获取表的描述器(表信息)
     */
    public static List<TableDescriptor> getTableDescriptor(String tableName) throwsException {
        Admin admin =connection.getAdmin();
        returnadmin.listTableDescriptors(Pattern.compile(tableName));
    }
​
    /*** 删除表
     */
    public static void deleteTable(String tableName) throwsException {
        Admin admin =connection.getAdmin();
        if(admin.tableExists(TableName.valueOf(tableName))) {
            //先禁用再删除
admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
        }
    }
​
    /*---------------------------------------对表中的数据进行CRUD---------------------------------------*//*** 插入或更新记录
     */
    public static void insertOrUpdate(String tableName, HRecord record) throwsException {
        Table table =connection.getTable(TableName.valueOf(tableName));
        Put put = newPut(Bytes.toBytes(record.getRowKey()));
        record.getColumnList().forEach(column ->put.addColumn(Bytes.toBytes(column.getColumnFamily()), Bytes.toBytes(column.getColumn()), Bytes.toBytes(column.getValue())));
        table.put(put);
    }
​
    /*** 批量插入或更新记录
     */
    public static void batchInsertOrUpdate(String tableName, Collection<HRecord> records) throwsException {
        Table table =connection.getTable(TableName.valueOf(tableName));
        List<Put> puts = records.stream().map(record ->{
            Put put = newPut(Bytes.toBytes(record.getRowKey()));
            record.getColumnList().forEach(column ->put.addColumn(Bytes.toBytes(column.getColumnFamily()), Bytes.toBytes(column.getColumn()), Bytes.toBytes(column.getValue())));
            returnput;
        }).collect(Collectors.toList());
        table.put(puts);
    }
​
    /*** 扫描全表
     */
    public static ResultScanner scan(String tableName) throwsException {
        Table table =connection.getTable(TableName.valueOf(tableName));
        return table.getScanner(newScan());
    }
​
    /*** 扫描表中的某些列
     * @paramcolumns key:columnFamily value:Set<column>
     */
    public static ResultScanner scanByColumns(String tableName, Map<String, Set<String>> columns) throwsIOException {
        Table table =connection.getTable(TableName.valueOf(tableName));
        Scan scan = newScan();
        for (Map.Entry<String, Set<String>>entry : columns.entrySet()) {
            String columnFamily =entry.getKey();
            entry.getValue().forEach(column ->scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column)));
        }
        returntable.getScanner(scan);
    }
​
    /*** 根据RowKey查询记录
     */
    public static Result getByRowKey(String tableName, String rowKey) throwsIOException {
        Table table =connection.getTable(TableName.valueOf(tableName));
        Get get = newGet(Bytes.toBytes(rowKey));
        returntable.get(get);
    }
​
    /*** 根据RowKey删除记录
     */
    public static void deleteByRowKey(String tableName, String rowKey) throwsIOException {
        Table table =connection.getTable(TableName.valueOf(tableName));
        Delete delete = newDelete(Bytes.toBytes(rowKey));
        table.delete(delete);
    }
​
    /*** 批量根据RowKey删除记录
     */
    public static void batchDeleteByRowKey(String tableName, Collection<String> rowKeys) throwsIOException {
        Table table =connection.getTable(TableName.valueOf(tableName));
        List<Delete> deletes = rowKeys.stream().map(rowKey -> newDelete(Bytes.toBytes(rowKey))).collect(Collectors.toList());
        table.delete(deletes);
    }
​
    /*** 删除记录中的某列
     */
    public static void deleteColumnOfRow(String tableName, String rowKey, String columnFamily, String column) throwsIOException {
        Table table =connection.getTable(TableName.valueOf(tableName));
        Delete delete = newDelete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        table.delete(delete);
    }
​
}

测试类

/*** @author: Zhuang HaoTang
 * @create: 2020-06-26 10:12
 * @description:
 */
public classHBaseUtilsTest {
​
    private static final String TABLE_NAME = "blacklist_customer_info";
​
    @Test
    public voidtestCreateTable() {
        try{
            HBaseUtils.createTable(TABLE_NAME, "cf1", "cf2");
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestAddColumnFamily() {
        try{
            HBaseUtils.addColumnFamily(TABLE_NAME, "cf3");
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestDeleteColumnFamily() {
        try{
            HBaseUtils.deleteColumnFamily(TABLE_NAME, "cf3");
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public void testGetTableDescriptor() throwsException {
        TableDescriptor tableDescriptor =Iterables.getOnlyElement(HBaseUtils.getTableDescriptor(TABLE_NAME));
        System.out.println("表名:" +tableDescriptor.getTableName());
        tableDescriptor.getColumnFamilyNames().forEach(columnFamilyByte -> System.out.println("列簇:" +Bytes.toString(columnFamilyByte)));
    }
​
    @Test
    public voidtestDeleteTable() {
        try{
            HBaseUtils.deleteTable(TABLE_NAME);
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestInsert() {
        HRecord record = HRecord.builder().rowKey(UUID.randomUUID().toString().replace("-", "")).build();
        List<HRecord.HColumn> columnList =Lists.newArrayList();
        columnList.add(HRecord.HColumn.builder().columnFamily("cf1").column("name").value("庄浩棠").build());
        columnList.add(HRecord.HColumn.builder().columnFamily("cf1").column("idcard").value("440112************").build());
        columnList.add(HRecord.HColumn.builder().columnFamily("cf1").column("phone").value("156****8105").build());
        columnList.add(HRecord.HColumn.builder().columnFamily("cf1").column("risk_status").value("2").build());
        columnList.add(HRecord.HColumn.builder().columnFamily("cf2").column("idcard_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("440112************"))).build());
        columnList.add(HRecord.HColumn.builder().columnFamily("cf2").column("phone_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("156****8105"))).build());
        record =record.toBuilder().columnList(columnList).build();
        try{
            HBaseUtils.insertOrUpdate(TABLE_NAME, record);
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestBatchInsert() {
        HRecord record1 = HRecord.builder().rowKey(UUID.randomUUID().toString().replace("-", "")).build();
        List<HRecord.HColumn> columnList1 =Lists.newArrayList();
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf1").column("name").value("梁红英").build());
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf1").column("idcard").value("130404************").build());
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf1").column("phone").value("186****6666").build());
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf1").column("risk_status").value("2").build());
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf2").column("idcard_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("130404************"))).build());
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf2").column("phone_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("186****6666"))).build());
        record1 =record1.toBuilder().columnList(columnList1).build();
​
        HRecord record2 = HRecord.builder().rowKey(UUID.randomUUID().toString().replace("-", "")).build();
        List<HRecord.HColumn> columnList2 =Lists.newArrayList();
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf1").column("name").value("夕涵山").build());
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf1").column("idcard").value("360731************").build());
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf1").column("phone").value("145****8637").build());
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf1").column("risk_status").value("2").build());
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf2").column("idcard_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("360731************"))).build());
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf2").column("phone_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("145****8637"))).build());
        record2 =record2.toBuilder().columnList(columnList2).build();
​
        try{
            HBaseUtils.batchInsertOrUpdate(TABLE_NAME, Lists.newArrayList(record1, record2));
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestUpdate() {
        String rowKey = "10e21b2a08614daca6de0d3a4c2d0fd0";
        HRecord record =HRecord.builder().rowKey(rowKey).build();
        List<HRecord.HColumn> columnList =Lists.newArrayList();
        columnList.add(HRecord.HColumn.builder().columnFamily("cf1").column("name").value("zhuanght").build());
        columnList.add(HRecord.HColumn.builder().columnFamily("cf1").column("idcard").value("440112************").build());
        record =record.toBuilder().columnList(columnList).build();
        try{
            HBaseUtils.insertOrUpdate(TABLE_NAME, record);
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestBatchUpdate() {
        String rowKey1 = "10e21b2a08614daca6de0d3a4c2d0fd0";
        HRecord record1 =HRecord.builder().rowKey(rowKey1).build();
        List<HRecord.HColumn> columnList1 =Lists.newArrayList();
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf1").column("name").value("庄浩棠").build());
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf1").column("address").value("广州市").build());
        columnList1.add(HRecord.HColumn.builder().columnFamily("cf1").column("phone_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("136****1502"))).build());
        record1 = record1.toBuilder().columnList(columnList1).build();//6
​
        String rowKey2 = "f0f0492c36fb4792be266e922f042685";
        HRecord record2 =HRecord.builder().rowKey(rowKey2).build();
        List<HRecord.HColumn> columnList2 =Lists.newArrayList();
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf1").column("name").value("夕涵山").build());
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf1").column("idcard").value("360731************").build());
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf2").column("idcard_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("360731************"))).build());
        columnList2.add(HRecord.HColumn.builder().columnFamily("cf2").column("phone_md5").value(MD5Hash.getMD5AsHex(Bytes.toBytes("145****8637"))).build());
        record2 =record2.toBuilder().columnList(columnList2).build();
​
        try{
            HBaseUtils.batchInsertOrUpdate(TABLE_NAME, Lists.newArrayList(record1, record2));
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestScanner() {
        try{
            ResultScanner resultScanner = HBaseUtils.scan("blacklist_customer_info");
            for(Result result : resultScanner) {
                StringBuilder recordStr = new StringBuilder(String.format("rowKey:%s", Bytes.toString(result.getRow())));
                result.listCells().forEach(cell -> recordStr.append(String.format("
%s:%s--》%s", Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)))));
​
                System.out.println("====================================================================");
                System.out.println(recordStr);
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestScanByColumns() {
        Map<String, Set<String>> columns =Maps.newHashMap();
        columns.put("cf1", Sets.newHashSet("name", "idcard"));
        columns.put("cf2", Sets.newHashSet("idcard_md5"));
        try{
            ResultScanner resultScanner =HBaseUtils.scanByColumns(TABLE_NAME, columns);
            for(Result result : resultScanner) {
                StringBuilder recordStr = new StringBuilder(String.format("rowKey:%s", Bytes.toString(result.getRow())));
                result.listCells().forEach(cell -> recordStr.append(String.format("
%s:%s--》%s", Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)))));
​
                System.out.println("====================================================================");
                System.out.println(recordStr);
            }
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestGetByRowKey() {
        String tableName = "blacklist_customer_info";
        String rowKey = "7d86f464b4ba459cac828cdd3772b2ba";
        try{
            Result result =HBaseUtils.getByRowKey(tableName, rowKey);
            StringBuilder recordStr = new StringBuilder(String.format("rowKey:%s", Bytes.toString(result.getRow())));
            result.listCells().forEach(cell -> recordStr.append(String.format("
%s:%s--》%s", Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)))));
            System.out.println(recordStr);
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestDeleteByRowKey() {
        String tableName = "blacklist_customer_info";
        String rowKey = "7d86f464b4ba459cac828cdd3772b2ba";
        try{
            HBaseUtils.deleteByRowKey(tableName, rowKey);
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestBatchDeleteByRowKey() {
        String tableName = "blacklist_customer_info";
        List<String> rowKeys = Lists.newArrayList("423e209fd0d84985a161bbc5a808d67b", "ce6ee0f16b7944f6abbba7762ab177cb");
        try{
            HBaseUtils.batchDeleteByRowKey(tableName, rowKeys);
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
​
    @Test
    public voidtestDeleteColumnOfRow() {
        String tableName = "blacklist_customer_info";
        String rowKey = "10e21b2a08614daca6de0d3a4c2d0fd0";
        try{
            HBaseUtils.deleteColumnOfRow(tableName, rowKey, "cf1", "idcard");
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
​
}

免责声明:文章转载自《HBase海量数据存储》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Data URIJava8 stream处理List,Map总结下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

MyBatis(四)映射文件 之 参数获取详解#{} 与 ${}

一、#{} 与${} 的取值 相同点: #{}:可以获取map中的值或者pojo对象属性的值; ${}:可以获取map中的值或者pojo对象属性的值; 区别: #{}:是以预编译的形式,将参数设置到sql语句中;PreparedStatement;防止sql注入; ${}:取出的值直接拼装在sql语句中;会有安全问题; 大多情况下,我们去参数的值都应该去使...

lua中table的遍历,以及删除

Lua 内table遍历 在lua中有4种方式遍历一个table,当然,从本质上来说其实都一样,只是形式不同,这四种方式分别是: 1. ipairs for index, value in ipairs(table) do end 注:这种方式的遍历只会从key为1的地方开始,一直以key递增1的顺序来遍历,若找到一个递增不是1的key就结束遍历,无论后面...

C# Dictionary.Keys用法及代码示例

此属性用于获取包含Dictionary中的键的集合。 用法: public System.Collections.Generic.Dictionary<TKey, TValue>.KeyCollection Keys { get; } 返回值:它返回一个包含Dictionary中关键字的集合。 以下示例程序旨在说明上面讨论的属性的使用:...

Hive的基本操作

一、创建数据库   hive>create database mydb ; //hql和mysql语法极其相似   hive>show databases;//查看所有数据库 二、创建表   格式:     create [external] table <表名>(列的定义,....)  [row format delimited...

Golang 对MongoDB的操作简单封装

使用MongoDB的Go驱动库mgo,对MongoDB的操作做一下简单封装 初始化 操作没有用户权限的MongoDB var globalS *mgo.Session func init() { s, err := mgo.Dial(dialInfo) if err != nil { log.Fatalf("Create...

不知道如何实现服务的动态发现?快来看看 Dubbo 是如何做到的

上篇文章如果有人问你 Dubbo 中注册中心工作原理,就把这篇文章给他大致了解了注册中心作用以及 Dubbo Registry 模块源码,这篇文章将深入 Dubbo ZooKeeper 模块,去了解如何实现服务动态的发现。 ps: 以下将 ZooKeeper 缩写为 zk。 一、dubbo zk 数据结构 在 ZooKeeper 基本概念分享一文讲道,Z...