HBase觀察者協(xié)處理器示例

2018-09-15 14:58 更新

示例

HBase提供了Observer Coprocessor(觀察者協(xié)處理器)的示例。

下面給出更詳細(xì)的例子。

這些示例假設(shè)一個(gè)名為users的表,其中有兩個(gè)列族personalDet和salaryDet,包含個(gè)人和工資詳細(xì)信息。下面是users表格:

personalDet salaryDet

jverne

Jules

Verne

02/08/1828

12000

9000

3000

rowkey

name

lastname

dob

gross

net

allowances

admin

Admin

Admin

cdickens

Charles

Dickens

02/07/1812

10000

8000

2000

觀察者示例

以下Observer協(xié)處理器可防止用戶(hù)admin的詳細(xì)信息在users表Get或者Scan中返回。

  1. 編寫(xiě)一個(gè)實(shí)現(xiàn)RegionObserver類(lèi)的類(lèi)。
  2. 重寫(xiě)preGetOp()方法(不推薦使用該preGet()方法)以檢查客戶(hù)端是否已使用admin值查詢(xún)r(jià)owkey。如果是,則返回空結(jié)果。否則,正常處理請(qǐng)求。
  3. 將您的代碼和依賴(lài)項(xiàng)放在JAR文件中。
  4. 將JAR放在HDFS中,HBase可以在其中找到它。
  5. 加載協(xié)處理器。
  6. 寫(xiě)一個(gè)簡(jiǎn)單的程序來(lái)測(cè)試它。

以下是上述步驟的實(shí)現(xiàn):

public class RegionObserverExample implements RegionObserver {

    private static final byte[] ADMIN = Bytes.toBytes("admin");
    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
    private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
    private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");

    @Override
    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results)
    throws IOException {

        if (Bytes.equals(get.getRow(),ADMIN)) {
            Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN,
            System.currentTimeMillis(), (byte)4, VALUE);
            results.add(c);
            e.bypass();
        }
    }
}

重寫(xiě)preGetOp()僅適用于Get操作。您還需要重寫(xiě)該preScannerOpen()方法以從掃描結(jié)果中過(guò)濾admin行。

@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,
final RegionScanner s) throws IOException {

    Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
    scan.setFilter(filter);
    return s;
}

這種方法有效,但有副作用。

如果客戶(hù)端在其掃描中使用了過(guò)濾器,則該過(guò)濾器將替換該過(guò)濾器。相反,您可以顯式刪除掃描中的任何admin結(jié)果:

@Override
public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,
final List<Result> results, final int limit, final boolean hasMore) throws IOException {
        Result result = null;
    Iterator<Result> iterator = results.iterator();
    while (iterator.hasNext()) {
    result = iterator.next();
        if (Bytes.equals(result.getRow(), ROWKEY)) {
            iterator.remove();
            break;
        }
    }
    return hasMore;
}

端點(diǎn)示例

仍然使用該users表,該示例使用端點(diǎn)協(xié)處理器實(shí)現(xiàn)協(xié)處理器以計(jì)算所有員工工資的總和。

  1. 創(chuàng)建一個(gè)定義服務(wù)的'.proto'文件。
    option java_package = "org.myname.hbase.coprocessor.autogenerated";
    option java_outer_classname = "Sum";
    option java_generic_services = true;
    option java_generate_equals_and_hash = true;
    option optimize_for = SPEED;
    message SumRequest {
        required string family = 1;
        required string column = 2;
    }
    
    message SumResponse {
      required int64 sum = 1 [default = 0];
    }
    
    service SumService {
      rpc getSum(SumRequest)
        returns (SumResponse); 
    }
  2. 執(zhí)行protoc命令以從上面的.proto文件生成Java代碼。
    $ mkdir src
    $ protoc --java_out=src ./sum.proto
    這將生成一個(gè)類(lèi)調(diào)用Sum.java。
  3. 編寫(xiě)一個(gè)擴(kuò)展生成的服務(wù)類(lèi)的類(lèi),實(shí)現(xiàn)Coprocessor和CoprocessorService類(lèi),并重寫(xiě)服務(wù)方法。
    注意:如果您從hbase-site.xml加載協(xié)處理器然后使用HBase Shell 再次加載同一個(gè)協(xié)處理器,它將再次加載。同一個(gè)類(lèi)將存在兩次,第二個(gè)實(shí)例將具有更高的ID(因此具有更低的優(yōu)先級(jí))。結(jié)果是有效地忽略了重復(fù)的協(xié)處理器。
    public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {
    
        private RegionCoprocessorEnvironment env;
    
        @Override
        public Service getService() {
            return this;
        }
    
        @Override
        public void start(CoprocessorEnvironment env) throws IOException {
            if (env instanceof RegionCoprocessorEnvironment) {
                this.env = (RegionCoprocessorEnvironment)env;
            } else {
                throw new CoprocessorException("Must be loaded on a table region!");
            }
        }
    
        @Override
        public void stop(CoprocessorEnvironment env) throws IOException {
            // do nothing
        }
    
        @Override
        public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) {
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes(request.getFamily()));
            scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
    
            Sum.SumResponse response = null;
            InternalScanner scanner = null;
    
            try {
                scanner = env.getRegion().getScanner(scan);
                List<Cell> results = new ArrayList<>();
                boolean hasMore = false;
                long sum = 0L;
    
                do {
                    hasMore = scanner.next(results);
                    for (Cell cell : results) {
                        sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
                    }
                    results.clear();
                } while (hasMore);
    
                response = Sum.SumResponse.newBuilder().setSum(sum).build();
            } catch (IOException ioe) {
                ResponseConverter.setControllerException(controller, ioe);
            } finally {
                if (scanner != null) {
                    try {
                        scanner.close();
                    } catch (IOException ignored) {}
                }
            }
    
            done.run(response);
        }
    }
    Configuration conf = HBaseConfiguration.create();
    Connection connection = ConnectionFactory.createConnection(conf);
    TableName tableName = TableName.valueOf("users");
    Table table = connection.getTable(tableName);
    
    final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
    try {
        Map<byte[], Long> results = table.coprocessorService(
            Sum.SumService.class,
            null,  /* start key */
            null,  /* end   key */
            new Batch.Call<Sum.SumService, Long>() {
                @Override
                public Long call(Sum.SumService aggregate) throws IOException {
                    BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>();
                    aggregate.getSum(null, request, rpcCallback);
                    Sum.SumResponse response = rpcCallback.get();
    
                    return response.hasSum() ? response.getSum() : 0L;
                }
            }
        );
    
        for (Long sum : results.values()) {
            System.out.println("Sum = " + sum);
        }
    } catch (ServiceException e) {
        e.printStackTrace();
    } catch (Throwable e) {
        e.printStackTrace();
    }
  4. 加載協(xié)處理器。
  5. 編寫(xiě)客戶(hù)端代碼以調(diào)用協(xié)處理器。
以上內(nèi)容是否對(duì)您有幫助:
在線(xiàn)筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)