hbase observer實現(xiàn)value自動累加求和

hbase有兩種Coprocessor,endpoint和observer,endpoint類似于存儲過程,可以在hbase上實現(xiàn)了一個類似于mapReduce的過程,observer實現(xiàn)起來比較簡單,類似于觸發(fā)器,具體架構(gòu)和理論在這里就不在多說,直接上代碼,下面代碼實現(xiàn)了一個數(shù)據(jù)自增的功能,相同key的數(shù)據(jù),每插入一條,后面的counts列自動加1。

package com.open01.hbase.coprocessor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;

/**
 * Created by caolch on 2017/4/7.
 */
public class PrePutSumObserver extends BaseRegionObserver{

    private byte[] family;
    private byte[] col;

    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        Configuration conf = e.getConfiguration();
        family = Bytes.toBytes(conf.get("family"));
        col = Bytes.toBytes(conf.get("col"));
    }

    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        super.stop(e);
    }

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e,
                       Put put, WALEdit edit, Durability durability) throws IOException {
//        super.prePut(e, put, edit, durability);
        if (put.has(family,col)){
            int oriCounts = 0;
            int incrCounts = 0;
            int sum = 0;
            List<Cell> cells = put.get(family,col);

            Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
            for (Cell cell : rs.rawCells()) {
                if (CellUtil.matchingColumn(cell,family,col)){
                    oriCounts = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
                }
            }
            for (Cell cell : cells) {
                if (CellUtil.matchingColumn(cell,family,col)){
                    incrCounts = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
                }
            }
            sum = oriCounts + incrCounts;
            put.addColumn(family,col,Bytes.toBytes(String.valueOf(sum)));
        }
    }
}

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>hbase-observer</groupId>
    <artifactId>hbase-observer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.1.9</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>1.1.9</version>
    </dependency>
    </dependencies>
</project>

打好包,把jar包傳到上傳到hdfs目錄,然后在hbase執(zhí)行alter命令,修改要添加observer的table:

#!/bin/bash

hbase shell <<EOF
disable 'observer_test'
alter 'observer_test', METHOD => 'table_att', 'Coprocessor'=>'hdfs://open009:9000/user/root/observerJar/hbase-observer-1.0-SNAPSHOT.jar|com.open01.hbase.coprocessor.PrePutSumObserver|1001|family=f1,col=counts'
enable 'observer_test'
EOF

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容