有人会问,为啥要用这个叫啥Kudu的,Kudu是啥?
就像官网所说,Kudu是一个针对Apache hadoop 平台而开发的列式存储管理器,在本菜鸟看来,它是一种介于hdfs与hbase的一种存储。它的优势在于:
1、OLAP工作的快速处理,也就是针对于查询,很快,很牛逼。
2、针对同时运行顺序和随机工作负载的情况性能很好。
3、高可用,Table server和master使用Raft Consensus Algorithm节点来保证高可用,什么是Raft Consunsus Algorith?参考:https://www.cnblogs.com/mindwind/p/5231986.html),只要有一半以上的副本可用,该tablet便可用于读写。
4、结构化数据模型(可以理解为带schema)。
该图显示了一个具有三个master和多个tabletserver的Kudu集群,每个服务器都支持多个tablet。它说明了如何使用Raft共识来允许master和tabletserver的leader和 follow。此外,tablet server可以成为某些tablet的leader,也可以是其他tablet的follower。leader以金色显示,而follower则显示为蓝色。
下面是一些基本概念:
Table(表)
一张talbe是数据存储在Kudu的位置。表具有schema和全局有序的primarykey(主键)。table被分成称为tablets的segments。
Tablet
一个tablet是一张table连续的segment,与其它数据存储引擎或关系型数据库中的partition(分区)相似。给定的tablet冗余到多个tablet服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为tablet服务的一组tablet server之间达成一致性。
TabletServer
一个tabletserver存储tablet和为tablet向client提供服务。对于给定的tablet,一个tabletserver充当 leader,其他tablet server充当该tablet的follower副本。只有leader服务写请求,然而leader或followers为每个服务提供读请求。leader使用Raft Consunsus Algorithm来进行选举。一个tabletserver可以服务多个tablets,并且一个tablet可以被多个tabletservers服务着。
具体我还没有那么深入,写了些api调用玩了一把,下面慢慢讲述,Kudu的API比较恶心的哈。。
kudu的sql语法与传统的sql语法比较相似,但也不尽相同,直接解析时,具体sql语法请参考官网,下面以类似hive metastore表结构的方式封装了下。以下列sql为例:
create table combined_t6 (x int64, s string, s2 string, primary key (x, s))
partition by hash (x) partitions 10, range (x)
(
partition 0 <= values <= 49, partition 50 <= values <= 100
)REPLICAS 1
publicBoolean create(Table table,String operator) { LOGGER.info("kudu Table properties:" +table.getKvInfos().toString()); List<ColumnSchema> columns = newArrayList(table.getTableColumnList().size());
KuduTableGenerateUtil.generateKuduColumn(table.getTableColumnList(),columns); Schema schema = newSchema(columns); KuduPartitionSchema kuduPartitionSchema =KuduTableGenerateUtil.parserPartition(table); CreateTableOptions tableOptions =KuduTableGenerateUtil.generateKuduTableOptions(table,schema,kuduPartitionSchema); try{ getKuduClient(table).createTable(table.getTableName(), schema,tableOptions); } catch(KuduException e) { throw new MetadataInvalidObjectException(e, " create kudu storage table error!!"); } return true; }
kudu的column属性中,包含有primarfyKey、encoding、compression algorithm、null table 、default value 、block size等属性,所以从上述代码中需要先将kuduColumn进行封装,构造ColumnSchema对象:
newColumnSchema.ColumnSchemaBuilder(tableColumn.getColumnName(), getKuduColumnType(tableColumn.getDataType())) .key(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_PRIMARY_KEY))) .nullable(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_IS_NULLTABLE))) .defaultValue(defaultValue) .desiredBlockSize(getDesiredBlockSize(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_DESIRED_BLOCKSIZE))) .encoding(getColumnEncoding(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_ENCODING))) .compressionAlgorithm(getCompressionType(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_COMPRESSION_ALGORITHM))) .build();
对于column的数据类型,有很多种,如下:
private staticType getKuduColumnType(String dataType) { switch(dataType.toUpperCase()) { case "INT8": returnType.INT8; case "INT16": returnType.INT16; case "INT32": returnType.INT32; case "INT64": returnType.INT64; case "BINARY": returnType.BINARY; case "STRING": returnType.STRING; case "BOOL": returnType.BOOL; case "FLOAT": returnType.FLOAT; case "DOUBLE": returnType.DOUBLE; case "UNIXTIME_MICROS": returnType.UNIXTIME_MICROS; default: returnType.STRING; } }
压缩方式包括:
public staticCompressionAlgorithm getCompressionType(String compressionType) { if(StringUtils.isNotBlank(compressionType)) { switch(compressionType.toUpperCase()) { case "UNKNOWN": returnCompressionAlgorithm.UNKNOWN; case "DEFAULT_COMPRESSION": returnCompressionAlgorithm.DEFAULT_COMPRESSION; case "NO_COMPRESSION": returnCompressionAlgorithm.NO_COMPRESSION; case "SNAPPY": returnCompressionAlgorithm.SNAPPY; case "LZ4": returnCompressionAlgorithm.LZ4; case "ZLIB": returnCompressionAlgorithm.UNKNOWN.ZLIB; default: return null; } } return null; }
随之我们要构造,Kudu Partition,Kudu Partition包含两种类型,一种是hashPartition,一种是rangePartition,其实从字面意思应该也能够想到,一种是用于对某个字段进行hash散列,一种是进行分区区间的设置,从而在查询时达到优化的效果,这里通过将sql解析后的转换的KuduPartitionSchema对象分别进行range与hash partition的组装,也就是将sql中 Partition表达式partition 0 <= values <= 49, partition 50 <= values <= 100 封装:
public static void generateHashPartition(CreateTableOptions tableOptions, List<HashPartitionSchema>hashPartitionSchemas) { if (null != hashPartitionSchemas && hashPartitionSchemas.size() != 0) {
hashPartitionSchemas.forEach(hashPartitionSchema ->{
tableOptions.addHashPartitions(hashPartitionSchema.getColumns(), hashPartitionSchema.getBucket());
});
}
}
public static voidgenerateRangePartition(Schema schema, CreateTableOptions tableOptions, RangePartitionSchema rangePartitionSchema) { tableOptions.setRangePartitionColumns(rangePartitionSchema.getColumns()); List<RangeSplit> ranges =rangePartitionSchema.getRanges(); ranges.forEach(range -> { tableOptions.addRangePartition( getPartialRow( range.getLower(), schema, rangePartitionSchema.getColumns()), getPartialRow( range.getUpper(), schema, rangePartitionSchema.getColumns()), getRangePartitionBound( range.getLowerBoundType()), getRangePartitionBound( range.getUpperBoundType()) ); }); }
public staticRangePartitionBound getRangePartitionBound(String boundType) { if(StringUtils.isNotBlank(boundType)) { switch(boundType) { case "EXCLUSIVE_BOUND": returnRangePartitionBound.EXCLUSIVE_BOUND; case "INCLUSIVE_BOUND": returnRangePartitionBound.INCLUSIVE_BOUND; default: return null; } } return null; }
最后构造,CreateTableOptions对象:
public staticCreateTableOptions generateKuduTableOptions(Table table, Schema schema, KuduPartitionSchema kuduPartitionSchema) { CreateTableOptions tableOptions = newCreateTableOptions(); String numReplicas =table.getKvInfos().get(MetadataConfigKey.TABLE_KUDU_REPLICAS); if(StringUtils.isNotBlank(numReplicas)) { tableOptions.setNumReplicas(Integer.valueOf(numReplicas)); } if (kuduPartitionSchema.getHashPartitionSchemaList() != null && kuduPartitionSchema.getHashPartitionSchemaList().size() != 0) { generateHashPartition(tableOptions, kuduPartitionSchema.getHashPartitionSchemaList()); } if (kuduPartitionSchema.getRangePartitionSchema() != null) { generateRangePartition(schema, tableOptions, kuduPartitionSchema.getRangePartitionSchema()); } returntableOptions; }
没有hbase编程便捷。。不过对于kudu的连接而言,只需要配置kudu master的地址,便可创建连接。
publicKuduClient getKuduClient(Table table){ if(null ==kuduClient){ try{ String kuduMaster =table.getStorageClusterKvs().get(MetadataConfigKey.CLUSTER_KUDU_MASTER); kuduClient = newKuduClient.KuduClientBuilder(kuduMaster).build(); }catch(Exception e){ throw new MetadataRuntimeException(e, " create kuduClient error!!"); } } returnkuduClient; }
活儿干不完啊~改天再深入完 哈哈~