Skip to content

Sparkifying MapAlgebra

Tim Tisler edited this page Sep 21, 2015 · 20 revisions

Proposal

Currently, the entire mapalgebra chain is backed by JAI (Java Advanced Imaging) rasters. With the moved to Spark-based distributed processing, along with the deprecation of JAI by Oracle, the entire processing chain should be refactored.

Background

After reimplementing the opchain driver to use Spark, still using the old JAI-based opchain, we started doing some scale testing on AWS. Using the aster-30m dataset, with 24,000 partitions, the operations just crawled along. After 1 hour, it had done only 400 partitions. The median time for a single partition was 4 minutes. This is more than 10 times slower than the existing, Hadoop-based processing. Clearly something was amiss.

After investigating, it turns out the serialization/deserialization of the opchain was the culprit. Simply changing the serialization from closures to a Spark broadcast variable had minimal impact. Here's why -- JAI is implemented using two parts, a Descriptor and an Operation. A Descriptor, as guessed, describes how to construct an Operation. When serializing an opchain, the Operations are converted back to Descriptors. When deserializing, the opposite occurs, the Descriptors create Operations. The creation of the Operations is what takes the time.

In the original, Hadoop-based opchain driver, the deserialization was done in the Map.setup(), so it was done once per map task. In the test cluster, this would be 700 times. In Spark, this deserialization is done once per partition, (24,000 times in the test case). There lies the difference.

Solution

Since the platform is moving away from Hadoop map/reduce toward Spark, it makes sense to tailor the MapOps towards Spark. In this regard, we should refactor all the MapOps to take in and spit out Spark RDDs (Resilient Distributed Dataset).

Following the tile pattern we use for Rasters, we'll create a base TiledRDD, then a RasterRDD from that. This will be the basic type passed around as inputs and outputs for each MapOp.

Each MapOp will acccept a group of one or more RDD inputs, perform its necessary processing, and spit out a single RDD. This follows the way MapOps work today, but substitutes RDDs for RasterOps (the JAI construct).

An additional benefit of this method is that Spark, with its scheduling and lazy evaluation, can optimize the various operations for much more efficient processing. We can also eliminate the need to store intermediate output for non JAI-based operations.

For those operations we don't wish to move over to Spark, or ones we havn't yet moved over, we can store the output as an intermediate file, then create an RDD for use further downstream.

Another benefit is actual sharing of input data. Right now, if an optree shares inputs, unique operators are created for each input leaf in the tree. In the map step, the shared tile is then distributed to each operator. Thus, the operations look like they are sharing data, but the tree really isn't. By using RDDs, we will be actually sharing data, allowing more efficient memory usage as well as Spark optimizations and RDD caching.

Related Work

There is additional work to support vector-based functionality in map algebra documented here. The tasks associated with that work are included below so that progress can be seen in one place.

Timeline

The basic approach will be to copy the existing functionality to "old" methods. Create a new MapOp using RDDs. Convert RasterMath to use the new MapOp. Change the MapAlgebraExecutioner and old MapOp to use RDDs. Once that works, convert the other operations to the new MapOp.

Task Time
Move existing MapOp class to be "MapOpOld", and make sure existing functionality still works. 1d
Design and implement RasterRDD. 3d
Create the new MapOp 6d
Convert RawBinaryMapOp to the new MapOp 2d
Make changes to MapAlgebraExecutioner to support the MapOp 3d
Test the operation, including at at scale, and make changes as needed 5d
Convert another MapOp (RawConMapOp?) 3d
Test the operation of a chain of new MapOps (more complicated MapAlgebra) 2d
Create a shim to allow old MapOps to interact with the new 3d
Test operation of old and new MapOps 3d
Write Spark-based load/save methods for vector data 1d
Create vector data provider for inline CSV 3d
Create vector data provider for delimited text files 5d
Create vector data provider for shapefiles 3d
Total estimate 43d

After verifying operations, we can then estimate and begin conversion of all the old style MapOps to the new.

##MapOp Conversion List

###RasterMath

  • Plus (+)
  • Minus (-)
  • Multiply (*)
  • Divide (/)
  • Equals (==, eq)
  • Not Equals (!=, ^=, <>, ne)
  • Greater Than (>, gt)
  • Greater Than or Equals (>=, ge)
  • Less Than (<, lt)
  • Less Than or Equals (<=, le)
  • And (&&, &, and)
  • Or (||, |, or)
  • Xor (exclusive or) (xor)
  • To the Power (pow)
  • Not (!)
  • Unary minus (-)
  • Absolute Value (abs)
  • Sine (sin)
  • Cosine (cos)
  • Tangent (tan)
  • isNodata (isNodata, isNull)
  • Conditional (con)
  • Logarithm (log)

###Terrain

  • Slope (slope)
  • Aspect (aspect)

###Image

  • BuildPyramid (buildpyramid)
  • Change Classification (changeClassification)
  • Crop (crop)
  • Fill (fill)
  • Kernel (kernel)
  • Mosaic (mosaic)
  • Save Raster (save)

###RasterizeVector

  • RasterizeVector (rasterizeVector)

###Vector

  • Boundary (boundary)
  • Convex Hull (convexhull)
  • PostGreSQL Query (pgQuery)
  • Randomize Vector (randomizeVector)
  • Random Sample (randomSample)
  • Split Vector (splitVector)
  • Vector Buffer (vectorBuffer)

###Pig

  • Pig (pig)
  • Pig Script (pigScript)

Vector Data Providers

  • Inline CSV (for specifying vector data directly in map algebra)
  • Delimited text file
  • Shapefile