This is a generic extension of spark for efficient scanning, joining and mutating HBase tables from a spark environment. The master setup is for HBase API 1.1.0.1, Scala 2.10 and Spark 1.4.1 but it is possible to create branches for older APIs simply by changing the versions properties in the pom.xml (dataframes api is not necessary for the basic use case so practically any spark version > 0.92 should work but for HBase old API a small refactor will be required around the hbase api calls).
This library can be used in 3 major ways:
- Basic: In the most basic case it can be used to simply map existing hbase tables to HBaseRDD which will result a simple pair RDD[(Array[Byte], hbase.client.Result)]. These can be filtered, transformed,.. as any other RDD and the result can be for example given to HBaseTable as a mutation to execute on the same underlying table.
- Standard: In the more typical case, by extending HBaseTable to provide mapping of raw key bytes and hbase result to some more meaningful types. You can learn about this method in the Concepts section below and by studying the example source of the demo-simple application
- Specialised/Experimental: Using the keyspace package to the basic HBaseRDD and HBaseTable. This package cannot be used on existing tables because it uses predefined key structure which aims to get the most from both spark and hbase perspective. You can learn more about this method by studying the demo-graph application (the demo is broken in the TODOs below)
The main concept is HBaseTable
which behaves similarly to the Spark DataFrame API but the push down logic is handled slightly differently. Any instance of HBaseTable
is mapped to the underlying hbase table and its method .rdd()
gives an instance of HBaseRDD
which inherits all transformation methods from RDD[(K,V)] and has some special transformations available via implicit conversions:
myTable.rdd.filter(Consistency)
- server-side scan filter for different levels of consistency requiredmyTable.rdd.filter(minStamp, maxStamp)
- server-side scan filter for hbase timestamp rangesmyTable.rdd.select(columnOrFamily1, columnOrFamily2, ...)
- server-side scan filter for selected columns or column familiesmyTable.rdd.join(other: RDD)
- uses aHBaseJoin
abstract function which is implemented in 2 versions, both resulting in a single-stage join regardless of partitioners used. One is for situations where the right table is very large portion of the left hbase table -HBaseJoinRangeScan
and the other is for situtations where the right table is a small fraction of the left table -HBaseJoinMultiGet
. (The mechanism for choosing between the types of join is not done, i.e. at the moment all the joins are mutli-get, see TODO below)myTable.rdd.rightOuterJoin(other: RDD)
- uses the same optimized implementation as join but with rightOuterJoin resultmyTable.rdd.fill(range: RDD)
- Fill is an additional functionality, similar to join except where the argument rdd is treated as to be 'updated' or 'filled-in' where the value of the Option is None - this is for highly iterative algorithms which start from a subset of HBase table and expand it in later iterations.
Because these methods are available implicitly for any HBaseRDD or its extension they can be wrapped in additional layers via HBaseRDDFiltered that are put together only when a compute method is invoked by a Spark action adding filters, ranges, etc to the single scan for each region/partition.
Besides HBaseTable and HBaseRDD another important concept is Transformation
which is a bi-directional mapper
that can map a basic hbase result value into a type V and inversely, a type V into a Mutation and which also declares columns or column families which are required by it. This gives the HBaseTable extensions a very rich high-level interface and at the same time optimizes scans which can be filtered under the hood to only read data necessary for a given transformation. Transformations can be used with the following HBaseTable methods
myTable.select(Transformation1, Transformation2, ...)
- selects only fields required by the selected Transformations and returns HBaseRDD[K, (T1, T2,...)]myTable.rdd.filter(Transformation1[V] ==|<|>|!=|contains <V>)
- server-side filter applied on the transformationmyTable.update(Transformation1, RDD[(K, T1)])
- transforms the input RDD, generate region-aware mutations and executes the multi-put mutation on the underlying tablemyTable.bulkUpdate(Transformation1, RDD[(K, T1)])
- same as update but not using hbase client API but generating HFiles and submitting them to the HBase Master (see notes below about the bulk operations)
bulkUpdate
, bulkLoad
and bulkDelete
are available to generate HFiles directly for large mutations.
NOTE: Due to the way how HBase handles the bulk files submission, the spark shell or job needs to be started as hbase
user in order to be able to use bulk operations.
First thing you'll need is a deafult-spark-env, there's a template you can copy and then modify to match your environment.
cp scripts/default-spark-env.template scripts/default-spark-env
On the yarn nodes as well as driver, the following files should be distributed:
/usr/lib/hbase/lib
needs to contain all hbase java libraries required by the hbase client
/usr/lib/hbase/lib/native
needs to contain all required native libraries for compression algorithms etc.
Further, on the driver you'll need the distributions of spark and hadoop as defined in the pom.xml and on the path defined by $SPARK_HOME/
and $HADOOP_HOME
in the spark-default-env respectively
NOTE: that the scripts predefined here for spark-shell and spark-submit define the spark master as yarn-client so the driver is the computer from which you are building the demo app.
If you don't have your spark assembly jar ready on the driver or available in hdfs for executors, you'll first need to build it and put it on the driver and into hdfs.
Mapping an existing table to an instance of HBaseRdd[(Array[Byte], hbase.client.Result)]
val sc: SparkContext = ...
val minStamp = HConstants.OLDEST_TIMESTAMP
val maxStamp = HConstants.LATEST_TIMESTAMP
val rdd1 = HBaseRDD.create(sc, "my-hbase-table")
val rdd2 = rdd1.select("CF1:col1int", "CF1:col2double")
val rdd3 = rdd2.filter(minStamp, maxStamp)
// all of the RDDs above have default type HBaseRDD[Array[Byte], Result]
// so we need to do the transformation by hand:
val cf1 = Bytes.toBytes("CF1")
val qual1 = Bytes.toBytes("col1int")
val qual2 = Bytes.toBytes("col2double")
val rdd2: RDD[String, (Int, Double)] = rdd3.map { case (rowKey: Array[Byte], cells: Result) => {
val keyAsString = Bytes.toString(rowKey)
val cell1 = cells.getColumnLatestCell(cf1, qual1)
val cell2 = cells.getColumnLatestCell(cf1, qual2)
val value1 = Bytes.toInt(cell1.getValueArray, cell1.getValueOffset)
val value2 = Bytes.toDouble(cell2.getValueArray, cell2.getValueOffset)
(keyAsString, (value1, value2))
}}
Extending HBaseRDD class to provide richer semantics. The example is implemented as a demo application.
Run the following script which will package the demo application in org.apache.spark.hbase.examples.demo.simple
./scripts/build demo-simple
You can then run the demo appliation as a shell:
./scripts/demo-simple-shell
Consider a following example of a document table in which each row represents a document, keyed by UUID and which has one column family 'A' where each column represents and Attribute of the document and column family 'C' which always has one column 'body' containing the content of the document:
val documents = new HBaseTable[UUID](sc, "my_documents_table") {
override def keyToBytes = (key: UUID) => ByteUtils.UUIDToBytes(key)
override def bytesToKey = (bytes: Array[Byte]) => ByteUtils.bytesToUUID(bytes)
val C = Bytes.toBytes("C")
val body = Bytes.toBytes("body")
val A = Bytes.toBytes("A")
val spelling = Bytes.toBytes("spelling")
val Content = new Transformation[String]("C:body") {
override def apply(result: Result): String = {
val cell = result.getColumnLatestCell(C, body)
Bytes.toString(cell.getValueArray, cell.getValueOffset)
}
override def applyInverse(value: String, mutation: Put) {
mutation.addColumn(C, body, Bytes.toBytes(value))
}
}
val Spelling = new Transformation[String]("A:spelling") {
override def apply(result: Result): String = {
val cell = result.getColumnLatestCell(A, spelling)
Bytes.toString(cell.getValueArray, cell.getValueOffset)
}
override def applyInverse(value: String, mutation: Put) {
mutation.addColumn(A, spelling, Bytes.toBytes(value))
}
}
}
Above we have created (and implemented) a fully working HBaseTable instance with 2 trasnformations available that can be used to read and write the data as typed RDD. The transformations in the above class are implemented from scratch here but there are some standard transformations and key serdes available in the helpers package so the shorter version could look like this:
val documents = new HBaseTable[UUID](sc, "my_documents_table") with SerdeUUID {
val Content = TString("C:body")
val Spelling = TString("A:spelling")
}
We can now for exmple do the following:
val dictionary: RDD[String] ... //contains a dictionary of known enlgish words
val docWords: RDD[(UUID, Seq[String])] = documents.select(documents.Content).mapValues(_.split("\\s+"))
val words = content.flatMap{ case (uuid, words) => words.map(word => (word, uuid))}
val unknown = words.leftOuterJoin(dictionary.mapValues(d => (d, "known"))).filter(_._2._2.isEmpty).mapValues(_._1)
val docWordsUnknown = unknown.map{ case (word, uuid) => (uuid, word) }.groupByKey
documents.update(documents.Spelling, docWordsUnknown.mapValues(_.mkString(",")))
The code above will find all misspelled words in each document's content and update the misspelled
attribute with a their coma-separated list, in a fully distributed way of course.
If we wanted to access and update the whole column family 'A' as a Map[String,String] of Attribute-Value pairs, we could add another transformation to the table, this time we will use slightly higher level FamilyTransformation which is an extension of the abstract Transformation that maps entire column family to a Map[K,V] in this case Map[String,String] of document's name-value attributes
...
val Attributes = new FamilyTransformation[String, String]("A") {
override def applyCell(kv: Cell): (String, String) = {
val attr = Bytes.toString(kv.getQualifierArray, kv.getQualifierOffset, kv.getQualifierLength)
val value = Bytes.toString(kv.getValueArray, kv.getValueOffset)
(attr, value)
}
override def applyCellInverse(attr: String, value: String): (Array[Byte], Array[Byte]) = {
(Bytes.toBytes(attr), Bytes.toBytes(value))
}
}
...
There are also some standard ColumnFamilyTransformation helpers like TStringDouble TStringLong and TStringString which map the whole column to a Map[String,Double], Map[Stirng,Long] and Map[String,String] respectively so the Attributes above can also be also declared as:
...
val Attributes = TStringString("A")
...
And since TStringString is a case class Transformation it can be in fact used directly in select, filter and update statements without declaring it as a val of the table class:
documents.select(TStringString("A"))
//is same as
documents.select(documents.Attributes)
//both statements above return an HBaseRDD[UUID, Map[String,String]] or RDD[(UUID, Map[String,String])]
Using Transformations in filter statments is very efficient because they are pushed down to the region servers and are very clean however at the moment only few predicates are implemented:
documents.select(documents.Content).filter(TStringString("A") contains "misspelled")
TODO section about bulk-loading the documents from some hdfs data and large-scale join and transformation all the way from launching the spark job as hbase user.
This example makes use of the org.apache.spark.hbase.keyspace which provides specialised Key
implementation that
addresses several scalability issues and general integration between spark and hbase. It also provides specialised
HBaseRDDKS
and HBaseTableKS
which all share the concept of KeySpace. The main idea here is to be able
to mix different key types while preserving even distribution of records across hbase regions. By using the form
[4-byte-hash] [2-byte-keyspace-symbol] [n-byte-key-value]
..it is also possible to do the hbase server-side fuzzy filtering for a specific 2-byte symbol by ignoring the first 4 bytes and matching 5th and 6th. Each implementation of KeySpace must provide serde methods for generating hash and value into pre-allocated byte array by the KeySpace abstraction. More information is in the comments of the demo application and the package classes.
Run the following script which will package the demo application in org.apache.spark.hbase.examples.demo.graph
./scripts/build demo-graph
You can then run the demo appliation as a shell:
./scripts/demo-graph-shell
- TTL on a per-mutation basis
- fixme: update on HBaseTable task size is 250k - something unnecessary is getting serialised
- fixme: warning: there were 1 deprecation warning(s); re-run with -deprecation for details
- spark-submit script and add an example into the simple demo
- figure out a work-around for the bulk operations requiring the process to run under hbase user
- mechanism for choosing HBaseJoin implementation must be done per operation, not simply configuration variable because the type of join depends on the relative volume of the RDDs not location or resource - but ideally it should be done by estimation, not requiring any control argument
- add implicit Ordering[K] for all HBaseRDD representations since HBase is ordered by definition
- investigate table.rdd.mapValues(Tags).collect.foreach(println) => WARN ClosureCleaner: Expected a closure; got org.apache.spark.hbase.examples.simple.HBaseTableSimple$$anon$2, while table.select(Tags) works fine
- multiple transformations over the same result could be optimised as long as they all use the cell scanner - but some transformations access columns directly so maybe too awkward to abstract