-
Notifications
You must be signed in to change notification settings - Fork 64
Sparkifying MapAlgebra
Currently, the entire mapalgebra chain is backed by JAI (Java Advanced Imaging) rasters. With the move to Spark-based distributed processing, along with the deprecation of JAI by Oracle, the entire processing chain should be refactored.
After reimplementing the opchain driver to use Spark, still using the old JAI-based opchain, we started doing some scale testing on AWS. Using the aster-30m dataset, with 24,000 partitions, the operations just crawled along. After 1 hour, it had done only 400 partitions. The median time for a single partition was 4 minutes. This is more than 10 times slower than the existing, Hadoop-based processing. Clearly something was amiss.
After investigating, it turns out the serialization/deserialization of the opchain was the culprit. Simply changing the serialization from closures to a Spark broadcast variable had minimal impact. Here's why -- JAI is implemented using two parts, a Descriptor and an Operation. A Descriptor, as guessed, describes how to construct an Operation. When serializing an opchain, the Operations are converted back to Descriptors. When deserializing, the opposite occurs, the Descriptors create Operations. The creation of the Operations is what takes the time.
In the original, Hadoop-based opchain driver, the deserialization was done in the Map.setup(), so it was done once per map task. In the test cluster, this would be 700 times. In Spark, this deserialization is done once per partition, (24,000 times in the test case). There lies the difference.
Since the platform is moving away from Hadoop map/reduce toward Spark, it makes sense to tailor the MapOps towards Spark. In this regard, we should refactor all the MapOps to take in and spit out Spark RDDs (Resilient Distributed Dataset).
Following the tile pattern we use for Rasters, we'll create a base TiledRDD, then a RasterRDD from that. This will be the basic type passed around as inputs and outputs for each MapOp.
Each MapOp will acccept a group of one or more RDD inputs, perform its necessary processing, and spit out a single RDD. This follows the way MapOps work today, but substitutes RDDs for RasterOps (the JAI construct).
An additional benefit of this method is that Spark, with its scheduling and lazy evaluation, can optimize the various operations for much more efficient processing. We can also eliminate the need to store intermediate output for non JAI-based operations.
For those operations we don't wish to move over to Spark, or ones we havn't yet moved over, we can store the output as an intermediate file, then create an RDD for use further downstream.
Another benefit is actual sharing of input data. Right now, if an optree shares inputs, unique operators are created for each input leaf in the tree. In the map step, the shared tile is then distributed to each operator. Thus, the operations look like they are sharing data, but the tree really isn't. By using RDDs, we will be actually sharing data, allowing more efficient memory usage as well as Spark optimizations and RDD caching.
There is additional work to support vector-based functionality in map algebra documented here. The tasks associated with that work are included below so that progress can be seen in one place.
The basic approach will be to copy the existing functionality to "old" methods. Create a new MapOp using RDDs. Convert RasterMath to use the new MapOp. Change the MapAlgebraExecutioner and old MapOp to use RDDs. Once that works, convert the other operations to the new MapOp.
Task | Time |
---|---|
1d | |
3d | |
6d | |
2d | |
3d | |
Test the operation, including at at scale, and make changes as needed | 5d |
3d | |
2d | |
3d | |
3d | |
1d | |
3d | |
5d | |
3d | |
Total estimate | 43d |
After verifying operations, we can then estimate and begin conversion of all the old style MapOps to the new.
##MapOp Conversion List
###RasterMath
- Plus (+)
- Minus (-)
- Multiply (*)
- Divide (/)
- Equals (==, eq)
- Not Equals (!=, ^=, <>, ne)
- Greater Than (>, gt)
- Greater Than or Equals (>=, ge)
- Less Than (<, lt)
- Less Than or Equals (<=, le)
- And (&&, &, and)
- Or (||, |, or)
- Xor (exclusive or) (xor)
- To the Power (pow)
- Not (!)
- Unary minus (-)
- Absolute Value (abs)
- Sine (sin)
- Cosine (cos)
- Tangent (tan)
- isNodata (isNodata, isNull)
- Conditional (con)
- Logarithm (log)
###Terrain
- Slope (slope)
- Aspect (aspect)
###Image
- BuildPyramid (buildpyramid)
- Change Classification (changeClassification)
- Crop (crop)
- Fill (fill)
- Kernel (kernel)
- Mosaic (mosaic)
- Save Raster (save)
###RasterizeVector
- RasterizeVector (rasterizeVector)
###Vector
- Boundary (boundary) - DELETED
- Convex Hull (convexhull) - DELETED
- PostGreSQL Query (pgQuery) - DELETED
- Randomize Vector (randomizeVector) - DELETED
- Random Sample (randomSample) - DELETED
- Split Vector (splitVector) - DELETED
- Vector Buffer (vectorBuffer) - DELETED
###Pig
- Pig (pig) - DELETED
- Pig Script (pigScript) - DELETED
- Inline CSV (for specifying vector data directly in map algebra)
- Delimited text file
- Shapefile