import happybase
import pandas as pd
def get_hbase_pool(host, size=5):
"""獲取一個連接池
:param host: hbase主機(jī)ip
:return: 連接池
"""
pool = happybase.ConnectionPool(host=host, size=int(size)) # 因R傳遞的size過來非int,需用int轉(zhuǎn)換
return pool
def read_hbase_data(pool, table_name, col_filter, col, row_prefix=None):
"""讀取hbase的數(shù)據(jù)表并轉(zhuǎn)換成dataframe輸出
:param pool: 連接池
:param col_filter: 過濾器(數(shù)據(jù)篩選)
:param col: dataframe的列名與read_col對應(yīng)
:param row_prefix: 指定row_key的前綴
:param table_name: 要讀取hbase的表名
:return: 數(shù)據(jù)讀取結(jié)果dataframe
"""
read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定讀取hbase數(shù)據(jù)表的列名(base64編碼)
row_prefix = row_prefix.encode()
result = pd.DataFrame(columns=col)
with pool.connection() as connection:
try:
#print(connection.tables()) #所有數(shù)據(jù)表
tab = connection.table(table_name)
for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
#print('key= ', key, '\nvalue= ', value)
col_value = pd.DataFrame.from_dict(value, orient='index').T # dict轉(zhuǎn)dataframe
col_value = col_value.applymap(lambda x: str(x, 'utf-8')) # 將bytes解碼為utf-8
col_value.columns = col
result = result.append(col_value)
connection.close()
except Exception as e:
connection.close()
print('Error:', e)
return result
def read_hbase_data_nopool(host, table_name, col_filter, col, row_prefix=None):
"""讀取hbase的數(shù)據(jù)表并轉(zhuǎn)換成dataframe輸出
:param host: hbase主機(jī)ip
:param col_filter: 過濾器(數(shù)據(jù)篩選)
:param col: dataframe的列名與read_col對應(yīng)
:param row_prefix: 指定row_key的前綴
:param table_name: 要讀取hbase的表名
:return: 數(shù)據(jù)讀取結(jié)果dataframe
"""
read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定讀取hbase數(shù)據(jù)表的列名(base64編碼)
row_prefix = row_prefix.encode()
result = pd.DataFrame(columns=col)
connection = happybase.Connection(host, autoconnect=False) # ip
connection.open()
try:
#print(connection.tables()) #所有數(shù)據(jù)表
tab = connection.table(table_name)
for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
#print('key= ', key, '\nvalue= ', value)
col_value = pd.DataFrame.from_dict(value, orient='index').T # dict轉(zhuǎn)dataframe
col_value = col_value.applymap(lambda x: str(x, 'utf-8')) # 將bytes解碼為utf-8
col_value.columns = col
result = result.append(col_value)
connection.close()
except Exception as e:
connection.close()
print('Error:', e)
return result
if __name__ == "__main__":
host = 'ip'
table_name = '表名'
row_prefix = None # row_key的前綴
col_filter = "SingleColumnValueFilter('f', 'x', =, 'binary:a')" # 過濾器 篩選x=a的
col = ['id', 'x', 'y'] # dataframe的列名
pool = get_hbase_pool(host, size=5)
result = read_hbase_data(pool=pool, table_name=table_name, col_filter=col_filter,
col=col, row_prefix=row_prefix)
python讀取hbase
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 1.Hbase安裝: 可以參考這篇文章,寫的很詳細(xì):https://blog.csdn.net/wuruijie3...
- 準(zhǔn)備數(shù)據(jù): 上傳到hdfs 編寫mapper: 編寫reducer: 編寫driver: 打包運(yùn)行主類: yarn...
- 一、版本信息和環(huán)境 1、版本信息(全是Apache版本): hadoop-2.6.0 hbase-1.2.6.1 ...
- 1 通過 scan 讀取 hbase 表 應(yīng)用場景: 讀取方法: 直到讀取數(shù)據(jù)的inputformat是 Tabl...