Hbase介绍及操作

摘要:
HBase可以提供对海量结构化数据的快速随机访问。值:Bytearray2.Hbase的架构2.1 Hbase Hbase的架构组成使用主/从架构来构建集群。它属于Hadoop生态系统,由以下类型的节点组成:HMaster节点、HRegionServer节点和ZooKeeper集群。在底部,它在HDFS中存储数据,因此涉及HDFS的NameNode、DataNode等。总体结构如下:每个组件的描述:Client使用HBaseRPC机制与HMaster和HRegionServer进行通信;客户与HMaster沟通管理操作;客户端和HRegionServer执行数据读写操作;HMasterHMaster没有单点问题。HBase中可以启动多个HMaster。动物园管理员确保总是有一个主人在运行。
1. Hbase概述

1.1 Hbase是什么

  • HBase是建立在HDFS之上的分布式面向列的数据库;属于KV结构数据,原生不支持标准SQL。它是一个Apache的开源项目,是横向扩展的。
  • HBase可以提供快速随机访问海量结构化数据。它利用了Hadoop的文件系统(HDFS)提供的容错能力。
  • HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库,是HBase基于列的而不是基于行的模式。

1.2 Hbase数据单元

RowKey:是Byte array,是表中每条记录的“主键”,按照字典顺序排序,方便快速查找,Rowkey的设计非常重要;
Column Family:列族,拥有一个名称(string),包含一个或者多个相关列;
Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加;
Version Number:类型为Long,默认值是系统时间戳Timestamp,可由用户自定义;用于标记同一份数据的不同版本。
Value(Cell):Byte array;
Hbase介绍及操作第1张

2. Hbase的架构

2.1 Hbase的架构组成

HBase采用Master/Slave架构搭建集群,它隶属于Hadoop生态系统,由以下类型节点组成:HMaster节点、HRegionServer节点、ZooKeeper集群,而在底层,它将数据存储于HDFS中,因而涉及到HDFS的NameNode、DataNode等,总体结构如下:
Hbase介绍及操作第2张

各组件说明:
Client

  • 使用HBase RPC机制与HMaster和HRegionServer进行通信;
  • Client与HMaster进行通信进行管理类操作;
  • Client与HRegionServer进行数据读写类操作;

HMaster

  • HMaster没有单点问题,HBase中可以启动多个HMaster,通过Zookeeper保证总有一个Master在运行。

HMaster主要负责Table和Region的管理工作

  • 管理用户对表的增删改查操作;
  • 管理HRegionServer的负载均衡,调整Region分布;
  • Region Split后,负责新Region的分布;
  • 在HRegionServer停机后,负责失效HRegionServer上Region 的迁移;

HRegionServer
HBase中最核心的模块;

  • 维护region,处理对这些region的IO请求;
  • Regionserver负责切分在运行过程中变得过大的region;

HRegion

  • HBase使用RowKey将表水平切割成多个HRegion,从HMaster的角度,每个HRegion都纪录了它的StartKey和EndKey(第一个HRegion的StartKey为空,最后一个HRegion的EndKey为空),由于RowKey是排序的,因而Client可以通过HMaster快速的定位每个RowKey在哪个HRegion中。

2.2查看hbase的web界面

http://master:16010/

3.hbase shell操作

3.1 DDL操作

#开启hbase shell
hbase shell
#查看hbase状态
status
#查看hbase版本
version
#创建命名空间
create_namespace '命名空间名'
#显示所有命名空间
list_namespace
#删除命名空间, 在删除一个命名空间时,该命名空间不能包含任何的表,否则会报错
drop_namespace '命名空间名'
#创建默认命名空间的表
create '表名称', '列族名称1','列族名称2','列族名称N'
#创建带有命名空间的表
create '命名空间:表名称', '列族名称1','列族名称2','列族名称N'
#列出所有表
list
#获得表的描述
describe '表名'
#删除table 表的 列族名称1 列族
alter 'table',{NAME=>'列族名称1',METHOD=>'delete'}
#删除多个列族
alter 'table', {NAME => '列族名称1', METHOD => 'delete'},{NAME => '列族名称2', METHOD => 'delete'}
#先把表下线
disable '表名'
#再drop表
drop '表名'

3.2 DML操作

#添加数据
# 语法:put <table>,<rowkey>,<family:column>,<value>,[<timestamp>]
#如果不写timestamp,则系统默认
put 'table','id01', 'c_f1:name','111'

#获取数据
#get: 获取表中一行数据,不能扫描全表
# 语法:get <table>,<rowkey>,[<family:column>,....]
get 'table','id01'

#更新数据
#语法:重新put,put时会覆盖原来的数据
put 'table','id01', 'c_f1:name','222'

#scan扫描
# 语法:scan <table> ,{COLUMNS => [ <family:column>,.... ], LIMIT => num}
#扫描全表,大表操作不可取
scan 'table'
#获取表中前两行
scan 'table', {LIMIT => 2}
#扫描表中指定列族数据
scan 'table', {COLUMNS => 'c_f1'}
#扫描表中执行列族中列的数据
scan 'table', {COLUMNS => 'c_f2:cert_no'}
#扫描表中值=222 的数据
scan 'table', FILTER=>"ValueFilter(=,'name:222')"
# 筛选行,按照rowkey的范围[STARTROW,STOPROW) 
scan 'table', {STARTROW =>'id01' , STOPROW => 'id03'}

#删除行中某列数据
# 语法:delete <table>, <rowkey>, <family:column>
# 必须指定列名
# 会删除执行列的所有版本数据
delete 'table', 'id04',  'c_f2:name'

#删除整行
# 语法:deleteall <table>, <rowkey>
deleteall 'table', 'id05'

#清空表数据
# 语法: truncate <table>
truncate 'table'

#查询表中有多少行
# 语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}
# INTERVAL设置多少行显示一次及对应的rowkey,默认1000;
# CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度
#查询表中数据行数
count 'table'
#按照2行显示一次,查询
count 'table', {INTERVAL => 2} 
4.hbase整合springboot

4.1pom

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.6</version>
        </dependency>

4.2 application.properties

hbase.conf.confMaps.'hbase.zookeeper.quorum'=master,slave1,slave2

4.3 HbaseConfig 自定义配置类

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
 * Hbase-Conf配置
 *
 * @Author: suyuan
 */
@Configuration
@ConfigurationProperties(prefix = HbaseConfig.CONF_PREFIX)
public class HbaseConfig {

    public static final String CONF_PREFIX = "hbase.conf";

    private Map<String,String> confMaps;

    public Map<String, String> getconfMaps() {
        return confMaps;
    }
    public void setconfMaps(Map<String, String> confMaps) {
        this.confMaps = confMaps;
    }
}

4.4 HBaseUtils工具类

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean
 */
@Component
public class SpringContextHolder implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextHolder.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        assertApplicationContext();
        return applicationContext;
    }

    @SuppressWarnings("unchecked")
    public static <T> T getBean(String beanName) {
        assertApplicationContext();
        return (T) applicationContext.getBean(beanName);
    }

    public static <T> T getBean(Class<T> requiredType) {
        assertApplicationContext();
        return applicationContext.getBean(requiredType);
    }

    private static void assertApplicationContext() {
        if (SpringContextHolder.applicationContext == null) {
            throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
        }
    }

}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@DependsOn("springContextHolder")        //控制依赖顺序,保证springContextHolder类在之前已经加载
@Component
public class HBaseUtils {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    //手动获取hbaseConfig配置类对象
    private static HbaseConfig hbaseConfig = SpringContextHolder.getBean("hbaseConfig");

    private static Configuration conf = HBaseConfiguration.create();
    private static ExecutorService pool = Executors.newScheduledThreadPool(20);    //设置连接池
    private static Connection connection = null;
    private static HBaseUtils instance = null;
    private static Admin admin = null;

    private HBaseUtils(){
        if(connection == null){
            try {
                //将hbase配置类中定义的配置加载到连接池中每个连接里
                Map<String, String> confMap = hbaseConfig.getconfMaps();
                for (Map.Entry<String,String> confEntry : confMap.entrySet()) {
                    conf.set(confEntry.getKey(), confEntry.getValue());
                }
                connection = ConnectionFactory.createConnection(conf, pool);
                admin = connection.getAdmin();
            } catch (IOException e) {
                logger.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);
            }
        }
    }

    //简单单例方法,如果autowired自动注入就不需要此方法
    public static synchronized HBaseUtils getInstance(){
        if(instance == null){
            instance = new HBaseUtils();
        }
        return instance;
    }


    /**
     * 创建表
     *
     * @param tableName         表名
     * @param columnFamily      列族(数组)
     */
    public void createTable(String tableName, String[] columnFamily) throws IOException{
        TableName name = TableName.valueOf(tableName);
        //如果存在则删除
        if (admin.tableExists(name)) {
            admin.disableTable(name);
            admin.deleteTable(name);
            logger.error("create htable error! this table {} already exists!", name);
        } else {
            HTableDescriptor desc = new HTableDescriptor(name);
            for (String cf : columnFamily) {
                desc.addFamily(new HColumnDescriptor(cf));
            }
            admin.createTable(desc);
        }
    }

    /**
     * 插入记录(单行单列族-多列多值)
     *
     * @param tableName         表名
     * @param row               行名
     * @param columnFamilys     列族名
     * @param columns           列名(数组)
     * @param values            值(数组)(且需要和列一一对应)
     */
    public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
        TableName name = TableName.valueOf(tableName);
        Table table = connection.getTable(name);
        Put put = new Put(Bytes.toBytes(row));
        for (int i = 0; i < columns.length; i++) {
            put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
            table.put(put);
        }
    }

    /**
     * 插入记录(单行单列族-单列单值)
     *
     * @param tableName         表名
     * @param row               行名
     * @param columnFamily      列族名
     * @param column            列名
     * @param value             值
     */
    public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
        TableName name = TableName.valueOf(tableName);
        Table table = connection.getTable(name);
        Put put = new Put(Bytes.toBytes(row));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        table.put(put);
    }

    /**
     * 删除一行记录
     *
     * @param tablename         表名
     * @param rowkey            行名
     */
    public void deleteRow(String tablename, String rowkey) throws IOException {
        TableName name = TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Delete d = new Delete(rowkey.getBytes());
        table.delete(d);
    }

    /**
     * 删除单行单列族记录
     * @param tablename         表名
     * @param rowkey            行名
     * @param columnFamily      列族名
     */
    public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {
        TableName name = TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Delete d = new Delete(rowkey.getBytes()).deleteFamily(Bytes.toBytes(columnFamily));
        table.delete(d);
    }

    /**
     * 删除单行单列族单列记录
     *
     * @param tablename         表名
     * @param rowkey            行名
     * @param columnFamily      列族名
     * @param column            列名
     */
    public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {
        TableName name = TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Delete d = new Delete(rowkey.getBytes()).deleteColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        table.delete(d);
    }


    /**
     * 查找一行记录
     *
     * @param tablename         表名
     * @param rowKey            行名
     */
    public static String selectRow(String tablename, String rowKey) throws IOException {
        String record = "";
        TableName name=TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Get g = new Get(rowKey.getBytes());
        Result rs = table.get(g);
        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = rs.getMap();
        for (Cell cell : rs.rawCells()) {
            StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("	")
                    .append(Bytes.toString(cell.getFamily())).append("	")
                    .append(Bytes.toString(cell.getQualifier())).append("	")
                    .append(Bytes.toString(cell.getValue())).append("
");
            String str = stringBuffer.toString();
            record += str;
        }
        return record;
    }

    /**
     * 查找单行单列族单列记录
     *
     * @param tablename         表名
     * @param rowKey            行名
     * @param columnFamily      列族名
     * @param column            列名
     * @return
     */
    public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {
        TableName name=TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Get g = new Get(rowKey.getBytes());
        g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        Result rs = table.get(g);
        return Bytes.toString(rs.value());
    }

    /**
     * 查询表中所有行(Scan方式)
     *
     * @param tablename
     * @return
     */
    public String scanAllRecord(String tablename) throws IOException {
        String record = "";
        TableName name=TableName.valueOf(tablename);
        Table table = connection.getTable(name);
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);
        try {
            for(Result result : scanner){
                for (Cell cell : result.rawCells()) {
                    StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("	")
                            .append(Bytes.toString(cell.getFamily())).append("	")
                            .append(Bytes.toString(cell.getQualifier())).append("	")
                            .append(Bytes.toString(cell.getValue())).append("
");
                    String str = stringBuffer.toString();
                    record += str;
                }
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
        }

        return record;
    }

    /**
     * 根据rowkey关键字查询报告记录
     *
     * @param tablename
     * @param rowKeyword
     * @return
     */
    public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {
        ArrayList<Object> list = new ArrayList<>();

        Table table = connection.getTable(TableName.valueOf(tablename));
        Scan scan = new Scan();

        //添加行键过滤器,根据关键字匹配
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
        scan.setFilter(rowFilter);

        ResultScanner scanner = table.getScanner(scan);
        try {
            for (Result result : scanner) {
                //TODO 此处根据业务来自定义实现
                list.add(null);
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
        }

        return list;
    }

    /**
     * 根据rowkey关键字和时间戳范围查询报告记录
     *
     * @param tablename
     * @param rowKeyword
     * @return
     */
    public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {
        ArrayList<Object> list = new ArrayList<>();

        Table table = connection.getTable(TableName.valueOf(tablename));
        Scan scan = new Scan();
        //添加scan的时间范围
        scan.setTimeRange(minStamp, maxStamp);

        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
        scan.setFilter(rowFilter);

        ResultScanner scanner = table.getScanner(scan);
        try {
            for (Result result : scanner) {
                //TODO 此处根据业务来自定义实现
                list.add(null);
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
        }

        return list;
    }


    /**
     * 删除表操作
     *
     * @param tablename
     */
    public void deleteTable(String tablename) throws IOException {
        TableName name=TableName.valueOf(tablename);
        if(admin.tableExists(name)) {
            admin.disableTable(name);
            admin.deleteTable(name);
        }
    }

    /**
     * 利用协处理器进行全表count统计
     *
     * @param tablename
     */
    public Long countRowsWithCoprocessor(String tablename) throws Throwable {
        TableName name=TableName.valueOf(tablename);
        HTableDescriptor descriptor = admin.getTableDescriptor(name);

        String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            admin.disableTable(name);
            descriptor.addCoprocessor(coprocessorClass);
            admin.modifyTable(name, descriptor);
            admin.enableTable(name);
        }

        //计时
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        Scan scan = new Scan();
        AggregationClient aggregationClient = new AggregationClient(conf);

        Long count = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);

        stopWatch.stop();
        System.out.println("RowCount:" + count +  ",全表count统计耗时:" + stopWatch.getTotalTimeMillis());

        return count;
    }

}

4.5 测试用例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/hbase")
public class HbaseController {

    @Autowired
    private HBaseUtils hBaseUtils;

    /**
     * 创建表
     * @author suyuan
     * @date 2021/6/22 17:17 
     */
    @RequestMapping("/createTable")
    public String createTable() throws Exception {
        String[] arr = {"name","age"};
        hBaseUtils.createTable("sy:syy",arr);
        return "ok";
    }

    /**
     * 删除表
     * @author suyuan
     * @date 2021/6/22 17:18 
     */
    @RequestMapping("/deleteTable")
    public String deleteTable() throws Exception {
        hBaseUtils.deleteTable("syy");
        return "ok";
    }

    /**
     * 插入记录(插入同列族数据就是更新)
     * @author suyuan
     * @date 2021/6/22 17:18 
     */
    @RequestMapping("/insertRecords")
    public String insertRecords() throws Exception {
        hBaseUtils.insertRecords("sy:syy","20210622","name", new String[]{"name1","name2"}, new String[]{"111","222"});
        return "ok";
    }

    /**
     * 查找一行记录
     * @author suyuan
     * @date 2021/6/22 17:18 
     */
    @RequestMapping("/selectRow")
    public String selectRow() throws Exception {
        String s = hBaseUtils.selectRow("sy:syy", "20210622");
        System.out.println(s);
        return s;
    }

}
本地编写代码需配置:

host地址

192.168.10.160  master
192.168.10.161  slave1
192.168.10.162  slave2

集群需要启动

zookeeper、hadoop、hbase
Hbase介绍及操作第3张

免责声明:文章转载自《Hbase介绍及操作》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇三种自定义圆形按钮的方法eNSP——Hybrid接口的应用下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Flink写入kafka时,只写入kafka的部分Partitioner,无法写所有的Partitioner问题

1. 写在前面 在利用flink实时计算的时候,往往会从kafka读取数据写入数据到kafka,但会发现当kafka多个Partitioner时,特别在P量级数据为了kafka的性能kafka的节点有十几个时,一个topic的Partitioner可能有几十个甚至更多,发现flink写入kafka的时候没有全部写Partitioner,而是写了部分的Par...

使用JDBC进行简单的增删改查

JDBC为java的基础。用jdbc实现对数据库的增删改查的功能是程序员的基本要求。本例以mysql为例,首先要使用本例需要添加mysql-connector-java-5.1.7-bin.jar包。专门用来加载jdbc的驱动。如果数据库为oracle,相应的jar包换为ojdbc6.jar。 通过下面的代码可以练习一下,掌握jdbc的使用方法,自己可以对...

Hive 安装配置

实验简介 本次课程学习了如何安装配置 Hive。 一、实验环境说明 1. 环境登录 无需密码自动登录,系统用户名shiyanlou,密码shiyanlou 2. 环境介绍 本实验环境采用带桌面的Ubuntu Linux环境,实验中会用到桌面上的程序: XfceTerminal: Linux命令行终端,打开后会进入Bash环境,可以使用Linux命令;...

关于将桌面扩展到监视器的问题 extended my windows desktop onto this monitor

说下思路吧 下面是网上找的Use the EnumDisplayDevices() API call to enumerate the display devices on the system and look for those that don't have the DISPLAY_DEVICE_ATTACHED_TO_DESKTOP flag se...

Ant Design Pro V5 从服务器请求菜单(typescript版)

【前言】 找了很多Admin模板,最后还是看中了AntDesignPro(下文简写antd pro)这个阿里巴巴开源的Admin框架,长这样(还行吧,目前挺主流的): 官网地址:https://pro.ant.design/index-cn 该套模板是使用了React开发框架作为基础,AntDesign(蚂蚁金服开源UI组件库)作为UI库,集成了Dva,...

.Net Task&amp;lt;T&amp;gt;的一种比较神奇的卡死情况(Wait/Result卡死, await能得到结果)

出现的环境.Net4.0 + WebApi1(4.0.30506.0) +Microsoft.Bcl.Async.1.0.168 自己死活看不出原因, 分享出来给大家看看,希望有人能找到问题的关键 出现错误的是下面这两个模块 下面的CorsMessageHandler,抄的http://www.cnblogs.com/artech/p/cors-4-asp...