Skip to content

Commit

Permalink
Throttling to limit QPS from HbaseIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
kaushikd49 authored and vinothchandar committed Aug 22, 2018
1 parent 3746ace commit e624480
Show file tree
Hide file tree
Showing 6 changed files with 436 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/

package com.uber.hoodie.config;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {

public static final String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
/**
* Note that if HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP is set to true, this batch size will not
* be honored for HBase Puts
*/
public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
/**
* Property to set to enable auto computation of put batch size
*/
public static final String HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP = "hoodie.index.hbase.put.batch.size.autocompute";
public static final String DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE = "false";
/**
* Property to set the fraction of the global share of QPS that should be allocated to this job.
* Let's say there are 3 jobs which have input size in terms of number of rows required for
* HbaseIndexing as x, 2x, 3x respectively. Then this fraction for the jobs would be (0.17) 1/6,
* 0.33 (2/6) and 0.5 (3/6) respectively.
*/
public static final String HBASE_QPS_FRACTION_PROP = "hoodie.index.hbase.qps.fraction";
/**
* Property to set maximum QPS allowed per Region Server. This should be same across various
* jobs. This is intended to limit the aggregate QPS generated across various jobs to an Hbase
* Region Server. It is recommended to set this value based on global indexing throughput needs
* and most importantly, how much the HBase installation in use is able to tolerate without
* Region Servers going down.
*/
public static String HBASE_MAX_QPS_PER_REGION_SERVER_PROP = "hoodie.index.hbase.max.qps.per.region.server";
/**
* Default batch size, used only for Get, but computed for Put
*/
public static final int DEFAULT_HBASE_BATCH_SIZE = 100;
/**
* A low default value.
*/
public static final int DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER = 1000;
/**
* Default is 50%, which means a total of 2 jobs can run using HbaseIndex without overwhelming
* Region Servers
*/
public static final float DEFAULT_HBASE_QPS_FRACTION = 0.5f;

public HoodieHBaseIndexConfig(final Properties props) {
super(props);
}

public static HoodieHBaseIndexConfig.Builder newBuilder() {
return new HoodieHBaseIndexConfig.Builder();
}

public static class Builder {

private final Properties props = new Properties();

public HoodieHBaseIndexConfig.Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
this.props.load(reader);
return this;
} finally {
reader.close();
}
}

public HoodieHBaseIndexConfig.Builder fromProperties(Properties props) {
this.props.putAll(props);
return this;
}

public HoodieHBaseIndexConfig.Builder hbaseZkQuorum(String zkString) {
props.setProperty(HBASE_ZKQUORUM_PROP, zkString);
return this;
}

public HoodieHBaseIndexConfig.Builder hbaseZkPort(int port) {
props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port));
return this;
}

public HoodieHBaseIndexConfig.Builder hbaseTableName(String tableName) {
props.setProperty(HBASE_TABLENAME_PROP, tableName);
return this;
}

public HoodieHBaseIndexConfig.Builder hbaseIndexGetBatchSize(int getBatchSize) {
props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
return this;
}

public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSize(int putBatchSize) {
props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
return this;
}

public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSizeAutoCompute(
boolean putBatchSizeAutoCompute) {
props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP,
String.valueOf(putBatchSizeAutoCompute));
return this;
}

public HoodieHBaseIndexConfig.Builder hbaseIndexQPSFraction(float qpsFraction) {
props.setProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP,
String.valueOf(qpsFraction));
return this;
}

/**
* <p>
* Method to set maximum QPS allowed per Region Server. This should be same across various
* jobs. This is intended to limit the aggregate QPS generated across various jobs to an
* Hbase Region Server.
* </p>
* <p>
* It is recommended to set this value based on your global indexing throughput needs and
* most importantly, how much your HBase installation is able to tolerate without Region
* Servers going down.
* </p>
*/
public HoodieHBaseIndexConfig.Builder hbaseIndexMaxQPSPerRegionServer(
int maxQPSPerRegionServer) {
// This should be same across various jobs
props.setProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP,
String.valueOf(maxQPSPerRegionServer));
return this;
}

public HoodieHBaseIndexConfig build() {
HoodieHBaseIndexConfig config = new HoodieHBaseIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP),
HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP),
HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP,
String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE));
setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP),
HBASE_QPS_FRACTION_PROP, String.valueOf(DEFAULT_HBASE_QPS_FRACTION));
setDefaultOnCondition(props,
!props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP),
HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf(
DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER));
return config;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
"hoodie.bloom.index.input.storage" + ".level";
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";

// ***** HBase Index Configs *****
public static final String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
public static final String DEFAULT_HBASE_BATCH_SIZE = "100";

// ***** Bucketed Index Configs *****
public static final String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets";

Expand Down Expand Up @@ -92,6 +84,11 @@ public Builder withIndexType(HoodieIndex.IndexType indexType) {
return this;
}

public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig hBaseIndexConfig) {
props.putAll(hBaseIndexConfig.getProps());
return this;
}

public Builder bloomFilterNumEntries(int numEntries) {
props.setProperty(BLOOM_FILTER_NUM_ENTRIES, String.valueOf(numEntries));
return this;
Expand All @@ -102,21 +99,6 @@ public Builder bloomFilterFPP(double fpp) {
return this;
}

public Builder hbaseZkQuorum(String zkString) {
props.setProperty(HBASE_ZKQUORUM_PROP, zkString);
return this;
}

public Builder hbaseZkPort(int port) {
props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port));
return this;
}

public Builder hbaseTableName(String tableName) {
props.setProperty(HBASE_TABLENAME_PROP, tableName);
return this;
}

public Builder bloomIndexParallelism(int parallelism) {
props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
return this;
Expand All @@ -137,15 +119,6 @@ public Builder numBucketsPerPartition(int numBuckets) {
return this;
}

public Builder hbaseIndexGetBatchSize(int getBatchSize) {
props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
return this;
}

public Builder hbaseIndexPutBatchSize(int putBatchSize) {
props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
return this;
}

public Builder withBloomIndexInputStorageLevel(String level) {
props.setProperty(BLOOM_INDEX_INPUT_STORAGE_LEVEL, level);
Expand All @@ -166,10 +139,6 @@ public HoodieIndexConfig build() {
BLOOM_INDEX_PRUNE_BY_RANGES_PROP, DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_USE_CACHING_PROP),
BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING);
setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP),
HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL),
BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,23 +258,45 @@ public double getBloomFilterFPP() {
}

public String getHbaseZkQuorum() {
return props.getProperty(HoodieIndexConfig.HBASE_ZKQUORUM_PROP);
return props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKQUORUM_PROP);
}

public int getHbaseZkPort() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.HBASE_ZKPORT_PROP));
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKPORT_PROP));
}

public String getHbaseTableName() {
return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP);
return props.getProperty(HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP);
}

public int getHbaseIndexGetBatchSize() {
return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
}

public int getHbaseIndexPutBatchSize() {
return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
}

public Boolean getHbaseIndexPutBatchSizeAutoCompute() {
return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP));
}

/**
* Fraction of the global share of QPS that should be allocated to this job.
* Let's say there are 3 jobs which have input size in terms of number of rows
* required for HbaseIndexing as x, 2x, 3x respectively. Then this fraction for
* the jobs would be (0.17) 1/6, 0.33 (2/6) and 0.5 (3/6) respectively.
*/
public float getHbaseIndexQPSFraction() {
return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP));
}

/**
* This should be same across various jobs. This is intended to limit the aggregate
* QPS generated across various Hoodie jobs to an Hbase Region Server
*/
public int getHbaseIndexMaxQPSPerRegionServer() {
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP));
}

public int getBloomIndexParallelism() {
Expand Down
Loading

0 comments on commit e624480

Please sign in to comment.