-
Notifications
You must be signed in to change notification settings - Fork 0
/
hbaseDelete.scala
66 lines (52 loc) · 2 KB
/
hbaseDelete.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#Delete data based on time range in hbase using scala
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Delete, Scan, Result, BufferedMutator}
import org.apache.hadoop.hbase.client.ResultScanner
import org.apache.hadoop.hbase.filter.KeyOnlyFilter
import java.io.File
import java.io.IOException
import scala.collection.JavaConversions._
object truncateHbase {
def main(args: Array[String]): Unit = {
if (args.length != 3) {
println("Invalid Ivocation")
System.exit(1)
}
println("Arguments Passed: " + args)
//Create SparkSession for cluster mode
//val spark = SparkSession.builder.appName("truncateHbase").getOrCreate()
var spark = SparkSession.builder.getOrCreate()
val Array(hbaseTableName, startTime, endTime) = args
val hbaseConfPath :String = "/etc/hbase/conf/hbase-site.xml"
//Create hbase connection
val conf = HBaseConfiguration.create()
conf.addResource(new File(hbaseConfPath).toURI.toURL)
val connection: Connection = ConnectionFactory.createConnection(conf)
val table = connection.getTable(TableName.valueOf(hbaseTableName))
println("Hbase Config Passed")
val mutator: BufferedMutator = connection.getBufferedMutator(TableName.valueOf(hbaseTableName))
val scans = new Scan()
val start = startTime.toLong
val end = endTime.toLong
scans.setTimeRange(start, end)
println("Parms setup Completed")
scans.setFilter(new KeyOnlyFilter())
scans.setCaching(1000)
scans.setBatch(1000)
scans.setCacheBlocks(false)
val rs: ResultScanner = table.getScanner(scans)
try {
for (result: Result <- rs) {
val delete = new Delete(result.getRow)
mutator.mutate(delete)
}
mutator.flush()
mutator.close()
table.close()
}
catch
{case e: IOException => e.printStackTrace
}
}
}