python讀取hbase

import happybase
import pandas as pd

def get_hbase_pool(host, size=5):
    """獲取一個連接池

    :param host: hbase主機(jī)ip
    :return: 連接池
    """

    pool = happybase.ConnectionPool(host=host, size=int(size))  # 因R傳遞的size過來非int,需用int轉(zhuǎn)換
    return pool


def read_hbase_data(pool, table_name, col_filter, col, row_prefix=None):
    """讀取hbase的數(shù)據(jù)表并轉(zhuǎn)換成dataframe輸出
    
    :param pool: 連接池
    :param col_filter: 過濾器(數(shù)據(jù)篩選)
    :param col: dataframe的列名與read_col對應(yīng)
    :param row_prefix: 指定row_key的前綴
    :param table_name: 要讀取hbase的表名
    :return: 數(shù)據(jù)讀取結(jié)果dataframe
    """

    read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定讀取hbase數(shù)據(jù)表的列名(base64編碼)
    row_prefix = row_prefix.encode()
  
    result =  pd.DataFrame(columns=col)
    
    with pool.connection() as connection:
        try:
            #print(connection.tables()) #所有數(shù)據(jù)表
            tab = connection.table(table_name)
            
            for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
                #print('key= ', key, '\nvalue= ', value)
                col_value = pd.DataFrame.from_dict(value, orient='index').T  # dict轉(zhuǎn)dataframe
                col_value = col_value.applymap(lambda x: str(x, 'utf-8'))  # 將bytes解碼為utf-8
                col_value.columns = col
                result = result.append(col_value)
                
            connection.close()
            
        except Exception as e:
            connection.close()  
            print('Error:', e)  
            
    return result


def read_hbase_data_nopool(host, table_name, col_filter, col, row_prefix=None):
    """讀取hbase的數(shù)據(jù)表并轉(zhuǎn)換成dataframe輸出
    
    :param host: hbase主機(jī)ip
    :param col_filter: 過濾器(數(shù)據(jù)篩選)
    :param col: dataframe的列名與read_col對應(yīng)
    :param row_prefix: 指定row_key的前綴
    :param table_name: 要讀取hbase的表名
    :return: 數(shù)據(jù)讀取結(jié)果dataframe
    """
     
    read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定讀取hbase數(shù)據(jù)表的列名(base64編碼)
    row_prefix = row_prefix.encode()
  
    result =  pd.DataFrame(columns=col)
    connection = happybase.Connection(host, autoconnect=False) # ip 
    connection.open()
    try:
        #print(connection.tables()) #所有數(shù)據(jù)表
        tab = connection.table(table_name)
        
        for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
            #print('key= ', key, '\nvalue= ', value)
            col_value = pd.DataFrame.from_dict(value, orient='index').T  # dict轉(zhuǎn)dataframe
            col_value = col_value.applymap(lambda x: str(x, 'utf-8'))  # 將bytes解碼為utf-8
            col_value.columns = col
            result = result.append(col_value)
            
        connection.close()
        
    except Exception as e:
        connection.close()  
        print('Error:', e)  
            
    return result


if __name__ == "__main__":
    host = 'ip'
    table_name = '表名'
    row_prefix = None  # row_key的前綴
    col_filter = "SingleColumnValueFilter('f', 'x', =, 'binary:a')"  # 過濾器 篩選x=a的
    col = ['id', 'x', 'y']  # dataframe的列名
    pool = get_hbase_pool(host, size=5)
    result = read_hbase_data(pool=pool, table_name=table_name, col_filter=col_filter,
                             col=col, row_prefix=row_prefix)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容