HBase 是什么?
在HBase官網(wǎng)中對HBase的描述為“ Apache HBase? is the Hadoop database, a distributed, scalable, big data store”,直譯過來就是HBase是基于Hadoop的數(shù)據(jù)庫具有分布式、可伸縮、大數(shù)據(jù)存儲(chǔ)特點(diǎn)。如果對HBase感興趣可以直接去HBase官網(wǎng)查看。
PHOENIX是什么?
PHOENIX官網(wǎng)中的描述為“Phoenix is an open source SQL skin for HBase. You use the standard JDBC APIs instead of the regular HBase client APIs to create tables, insert data, and query your HBase data.”,翻譯過來可以直接理解為PHOENIX是構(gòu)建于HBase之上的一款插件,這款插件使得可以通過標(biāo)準(zhǔn)的JDBC接口代替HBase API直接在HBase上建表、插入數(shù)據(jù)以及查詢數(shù)據(jù)。如果感興趣可以直接去官網(wǎng)查看介紹。
問題由來
最近本菜參加部門一個(gè)數(shù)據(jù)存儲(chǔ)相關(guān)的項(xiàng)目,主要是采用Oracle/MySql + Redis + HBase作為底層數(shù)據(jù)存儲(chǔ)。由于項(xiàng)目組其他人對Hadoop以及HBase不太清楚加上PHOENIX在支持二級(jí)索引等等原因,最終決定不直接使用HBase提供的API而是采用PHOENIX通過標(biāo)準(zhǔn)的JDBC接口執(zhí)行SQL來實(shí)現(xiàn)。
隨著項(xiàng)目的進(jìn)展,后來遇到一個(gè)需求"歷史數(shù)據(jù)的導(dǎo)入",這個(gè)歷史數(shù)據(jù)指的是十幾年的氣象數(shù)據(jù)(報(bào)文、雷達(dá)、衛(wèi)星數(shù)據(jù)等等)。十幾年的數(shù)據(jù)比較大采用JDBC或者HBase自身的API入庫太慢了,隨后本菜在相關(guān)的書籍以及博客中發(fā)現(xiàn)了BulkLoad的存在。BulkLoad的本質(zhì)就是通過MapReduce直接生成HBase的數(shù)據(jù)文件HFile,然后由HBase加載到自身的數(shù)據(jù)目錄中。在數(shù)據(jù)表設(shè)計(jì)階段有一些字段使用了PHOENIX支持但是HBase無法支持的數(shù)據(jù)類型,所以就需要面對一個(gè)很現(xiàn)實(shí)的問題“數(shù)據(jù)類型轉(zhuǎn)換”。
解決問題
PHOENIX支持的數(shù)據(jù)類型可以參照官網(wǎng)介紹,HBase中的類型直接參照工具類Bytes。下面舉個(gè)例子簡單寫一個(gè)HBase插入操作
Put put = new Put(RowKey.getBytes());//RowKey 是一個(gè)字符串 put.addColumn(ColumnFamily.getBytes(),QUALIFIER.getBytes(),Bytes.toBytes(Value));//ColumnFamily,QUALIFIER,Value是字符串,
Table table = conn.getTable(TableName.valueOf(TableName));//TableName 是字符串
table.put(put);
其實(shí)看了這一小段代碼就能發(fā)現(xiàn)HBase API中數(shù)據(jù)相關(guān)用的都是byte[] 而不是什么int/float/double什么的,實(shí)際HBase支持的類型完全就是看Bytes.toBytes的參數(shù)。因?yàn)镠Base底層就是存儲(chǔ)的byte數(shù)組,具體你想要存什么類型只要你寫兩個(gè)方法,一個(gè)是把數(shù)據(jù)轉(zhuǎn)成byte數(shù)組,一個(gè)是把byte數(shù)組轉(zhuǎn)回來。其實(shí)PHOENIX底層數(shù)據(jù)類型就是這么干的,下面粘貼PHOENIX中關(guān)于TimeStamp類型的定義以及關(guān)鍵代碼:
public class PTimestamp extends PDataType<Timestamp> {
public static final int MAX_NANOS_VALUE_EXCLUSIVE = 1000000;
public static final PTimestamp INSTANCE = new PTimestamp();
private PTimestamp() {
super("TIMESTAMP", Types.TIMESTAMP, java.sql.Timestamp.class,
new PDate.DateCodec(), 9);
}
@Override
public byte[] toBytes(Object object) {
byte[] bytes = new byte[getByteSize()];
toBytes(object, bytes, 0);
return bytes;
}
@Override
public int toBytes(Object object, byte[] bytes, int offset) {
if (object == null) {
// Create the byte[] of size MAX_TIMESTAMP_BYTES
if(bytes.length != getByteSize()) {
bytes = Bytes.padTail(bytes, (getByteSize() - bytes.length));
}
PDate.INSTANCE.getCodec().encodeLong(0l, bytes, offset);
Bytes.putInt(bytes, offset + Bytes.SIZEOF_LONG, 0);
return getByteSize();
}
java.sql.Timestamp value = (java.sql.Timestamp) object;
// For Timestamp, the getTime() method includes milliseconds that may
// be stored in the nanos part as well.
PDate.INSTANCE.getCodec().encodeLong(value.getTime(), bytes, offset);
/*
* By not getting the stuff that got spilled over from the millis part,
* it leaves the timestamp's byte representation saner - 8 bytes of millis | 4 bytes of nanos.
* Also, it enables timestamp bytes to be directly compared with date/time bytes.
*/
Bytes.putInt(bytes, offset + Bytes.SIZEOF_LONG, value.getNanos() % MAX_NANOS_VALUE_EXCLUSIVE);
return getByteSize();
}
@Override
public Object toObject(Object object, PDataType actualType) {
if (object == null) {
return null;
}
if (equalsAny(actualType, PDate.INSTANCE, PUnsignedDate.INSTANCE, PTime.INSTANCE,
PUnsignedTime.INSTANCE)) {
return new java.sql.Timestamp(((java.util.Date) object).getTime());
} else if (equalsAny(actualType, PTimestamp.INSTANCE, PUnsignedTimestamp.INSTANCE)) {
return object;
} else if (equalsAny(actualType, PLong.INSTANCE, PUnsignedLong.INSTANCE)) {
return new java.sql.Timestamp((Long) object);
} else if (actualType == PDecimal.INSTANCE) {
BigDecimal bd = (BigDecimal) object;
long ms = bd.longValue();
int nanos =
(bd.remainder(BigDecimal.ONE).multiply(QueryConstants.BD_MILLIS_NANOS_CONVERSION))
.intValue();
return DateUtil.getTimestamp(ms, nanos);
} else if (actualType == PVarchar.INSTANCE) {
return DateUtil.parseTimestamp((String) object);
}
return throwConstraintViolationException(actualType, this);
}
@Override
public java.sql.Timestamp toObject(byte[] b, int o, int l, PDataType actualType,
SortOrder sortOrder, Integer maxLength, Integer scale) {
if (actualType == null || l == 0) {
return null;
}
java.sql.Timestamp v;
if (equalsAny(actualType, PTimestamp.INSTANCE, PUnsignedTimestamp.INSTANCE)) {
long millisDeserialized =
(actualType == PTimestamp.INSTANCE ? PDate.INSTANCE : PUnsignedDate.INSTANCE).getCodec()
.decodeLong(b, o, sortOrder);
v = new java.sql.Timestamp(millisDeserialized);
int nanosDeserialized =
PUnsignedInt.INSTANCE.getCodec().decodeInt(b, o + Bytes.SIZEOF_LONG, sortOrder);
/*
* There was a bug in serialization of timestamps which was causing the sub-second millis part
* of time stamp to be present both in the LONG and INT bytes. Having the <100000 check
* makes this serialization fix backward compatible.
*/
v.setNanos(
nanosDeserialized < MAX_NANOS_VALUE_EXCLUSIVE ? v.getNanos() + nanosDeserialized : nanosDeserialized);
return v;
} else if (equalsAny(actualType, PDate.INSTANCE, PUnsignedDate.INSTANCE, PTime.INSTANCE,
PUnsignedTime.INSTANCE, PLong.INSTANCE, PUnsignedLong.INSTANCE)) {
return new java.sql.Timestamp(actualType.getCodec().decodeLong(b, o, sortOrder));
} else if (actualType == PDecimal.INSTANCE) {
BigDecimal bd = (BigDecimal) actualType.toObject(b, o, l, actualType, sortOrder);
long ms = bd.longValue();
int nanos = (bd.remainder(BigDecimal.ONE).multiply(QueryConstants.BD_MILLIS_NANOS_CONVERSION))
.intValue();
v = DateUtil.getTimestamp(ms, nanos);
return v;
}
throwConstraintViolationException(actualType, this);
return null;
}
......后續(xù)代碼省略
}
通過這個(gè)代碼就能很清楚看懂PHOENIX對時(shí)間類型的是如何實(shí)現(xiàn)支持的。實(shí)際PHOENIX就是把時(shí)間戳拆成了兩部分,毫秒部分采用long類型保存,納秒部分用int類型來保存。
其實(shí)剩下的數(shù)據(jù)類型都是類似的,直接去查看PHOENIX源碼中org.apache.phoenix.schema.types這個(gè)包下的內(nèi)容就清楚了。
最后直接在列舉一個(gè)小示例:
Put put = new Put("111".getBytes());
put.addColumn("0".getBytes(),"TRANSACTION_TIME".getBytes(), PTimestamp.toBytes(new Timestamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-09-09 11:11:11.111").getTime())));
Table table = conn.getTable(TableName.valueOf("EXCHANGE"));
table.put(put);
末尾
以上便是這次的所有內(nèi)容,本菜第一次正兒八經(jīng)的寫的第一篇文章。希望能堅(jiān)持下去吧!如果有啥問題歡迎留言到公眾號(hào)“喜歡打游戲的摸魚怪”留言。