Skip to content

Commit

Permalink
Merge branch 'release/3.1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadain committed Jan 2, 2018
2 parents 30d36f5 + 0b428eb commit 9808a7c
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 3 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 3.1.0

- Add RasterSummary operation that, given a shape and a list
of rasters, returns the min, avg, and max value for those
rasters within that shape.

## 3.0.0

- Crop lines to area of interest in RasterLinesJoin
Expand Down
83 changes: 82 additions & 1 deletion api/src/main/scala/Geoprocessing.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.wikiwatershed.mmw.geoprocessing

import java.util.concurrent.atomic.{LongAdder, DoubleAdder}
import java.util.concurrent.atomic.{LongAdder, DoubleAdder, DoubleAccumulator}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

Expand Down Expand Up @@ -83,6 +83,24 @@ trait Geoprocessing extends Utils {
}
}


/**
* For an InputData object, returns a sequence of maps of min, avg, and max
* values for each raster, in the order of the input rasters
*
* @param input The InputData
* @return Seq of map of min, avg, and max values
*/
def getRasterSummary(input: InputData): Future[ResultSummary] = {
val aoi = createAOIFromInput(input)
val futureLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
val opts = getRasterizerOptions(input.pixelIsArea)

futureLayers.map { layers =>
ResultSummary(rasterSummary(layers, aoi, opts))
}
}

private case class TilePixel(key: SpatialKey, col: Int, row: Int)

/**
Expand Down Expand Up @@ -249,4 +267,67 @@ trait Geoprocessing extends Utils {
.map { case (k, v) => k.toString -> v}
.toMap
}

type RasterSummary = (DoubleAccumulator, DoubleAdder, DoubleAccumulator, LongAdder)

/**
* From a list of rasters and a shape, return a list of maps containing min,
* avg, and max values of those rasters.
*
* @param rasterLayers A sequence of TileLayerCollections
* @param multiPolygon The AOI as a MultiPolygon
* @return A Seq of Map of min, avg, and max values
*/
private def rasterSummary(
rasterLayers: Seq[TileLayerCollection[SpatialKey]],
multiPolygon: MultiPolygon,
opts: Rasterizer.Options
): Seq[Map[String, Double]] = {
val update = (newValue: Double, rasterSummary: RasterSummary) => {
rasterSummary match {
case (min, sum, max, count) =>
min.accumulate(newValue)
sum.add(newValue)
max.accumulate(newValue)
count.increment()
}
}

val init = () => (
new DoubleAccumulator(new MinWithoutNoData, Double.MaxValue),
new DoubleAdder,
new DoubleAccumulator(new MaxWithoutNoData, Double.MinValue),
new LongAdder
)

// assume all layouts are the same
val metadata = rasterLayers.head.metadata

val layerSummaries: TrieMap[Int, RasterSummary] = TrieMap.empty

joinCollectionLayers(rasterLayers).par
.foreach({ case (key, tiles) =>
val extent = metadata.mapTransform(key)
val re = RasterExtent(extent, metadata.tileLayout.tileCols, metadata.tileLayout.tileRows)

Rasterizer.foreachCellByMultiPolygon(multiPolygon, re, opts) { case (col, row) =>
val pixels: List[Double] = tiles.map(_.getDouble(col, row)).toList
pixels.zipWithIndex.foreach { case (pixel, index) =>
val rasterSummary = layerSummaries.getOrElseUpdate(index, init())
update(pixel, rasterSummary)
}
}
})

layerSummaries
.toSeq
.sortBy(_._1)
.map { case (_, (min, sum, max, count)) =>
Map(
"min" -> min.get(),
"avg" -> sum.sum() / count.sum(),
"max" -> max.get()
)
}
}
}
22 changes: 22 additions & 0 deletions api/src/main/scala/Utils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.wikiwatershed.mmw.geoprocessing

import java.util.function.DoubleBinaryOperator

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

Expand Down Expand Up @@ -205,4 +207,24 @@ trait Utils {
.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](layerId)
.where(Intersects(shape))
.result

class MinWithoutNoData extends DoubleBinaryOperator {
override def applyAsDouble(left: Double, right: Double): Double =
(left, right) match {
case (`doubleNODATA`, `doubleNODATA`) => Double.MaxValue
case (`doubleNODATA`, r) => r
case (l, `doubleNODATA`) => l
case (l, r) => math.min(l, r)
}
}

class MaxWithoutNoData extends DoubleBinaryOperator {
override def applyAsDouble(left: Double, right: Double): Double =
(left, right) match {
case (`doubleNODATA`, `doubleNODATA`) => Double.MinValue
case (`doubleNODATA`, r) => r
case (l, `doubleNODATA`) => l
case (l, r) => math.max(l, r)
}
}
}
4 changes: 4 additions & 0 deletions api/src/main/scala/WebServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ case class InputData(
case class PostRequest(input: InputData)
case class ResultInt(result: Map[String, Int])
case class ResultDouble(result: Map[String, Double])
case class ResultSummary(result: Seq[Map[String, Double]])

object PostRequestProtocol extends DefaultJsonProtocol {
implicit val inputFormat = jsonFormat10(InputData)
implicit val postFormat = jsonFormat1(PostRequest)
implicit val resultFormat = jsonFormat1(ResultInt)
implicit val resultDoubleFormat = jsonFormat1(ResultDouble)
implicit val resultSummaryFormat = jsonFormat1(ResultSummary)
}

object WebServer extends HttpApp with App with LazyLogging with Geoprocessing with ErrorHandler {
Expand All @@ -53,6 +55,8 @@ object WebServer extends HttpApp with App with LazyLogging with Geoprocessing wi
complete(getRasterGroupedAverage(data.input))
case "RasterLinesJoin" =>
complete(getRasterLinesJoin(data.input))
case "RasterSummary" =>
complete(getRasterSummary(data.input))
case _ => {
val message = s"Unknown operationType: ${data.input.operationType}"
throw new InvalidOperationException(message)
Expand Down
15 changes: 15 additions & 0 deletions examples/MapshedJob_RasterSummary.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"input": {
"polygon": [
"{\"type\":\"MultiPolygon\",\"coordinates\":[[[[-75.1626205444336,39.95580659996906],[-75.25531768798828,39.94514735903112],[-75.22785186767578,39.89446035777916],[-75.1461410522461,39.88761144548104],[-75.09309768676758,39.91078961774283],[-75.09464263916016,39.93817189499188],[-75.12039184570312,39.94435771955196],[-75.1626205444336,39.95580659996906]]]]}"
],
"rasters": [
"ned-nhdplus-30m-epsg5070",
"us-percent-slope-30m-epsg5070-512"
],
"rasterCRS": "ConusAlbers",
"polygonCRS": "LatLng",
"operationType": "RasterSummary",
"zoom": 0
}
}
2 changes: 1 addition & 1 deletion project/build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Geoprocessing extends Build {
super.settings ++
Seq(
shellPrompt := { s => Project.extract(s).currentProject.id + " > " },
version := "3.0.0",
version := "3.1.0",
scalaVersion := Version.scala,
organization := "org.wikiwatershed.mmw.geoprocessing",
name := "mmw-geoprocessing",
Expand Down
12 changes: 11 additions & 1 deletion scripts/benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ huc10_file = 'RasterGroupedCount_LowerSchuylkillRiver_HUC10.json'
huc12_file = 'RasterGroupedCount_LittleNeshaminy_HUC12.json'
rga_file = 'RasterGroupedAverage.json'
rlj_file = 'RasterLinesJoin_Schuylkill_HUC08.json'
rs_file = 'RasterSummary_Schuylkill_HUC08.json'

def make_rga_request(input_file, operation_name):
print("Timing {} ->\n".format(operation_name))
Expand Down Expand Up @@ -64,6 +65,9 @@ def time_raster_grouped_average():
def time_raster_lines_join():
make_rga_request(rlj_file, 'HUC8 RasterLinesJoin')

def time_raster_summary():
make_rga_request(rs_file, 'HUC8 RasterSummary')

def time_huc12():
make_rgc_request(huc12_file, 'HUC12 RasterGroupedCount')

Expand All @@ -74,7 +78,9 @@ def time_huc8():
make_rgc_request(huc8_file, 'HUC8 RasterGroupedCount')

parser = argparse.ArgumentParser(description='Test Geoprocessing service \
response times for RasterGroupedCount and RasterGroupedAverage operations.')
response times for RasterGroupedCount, RasterGroupedAverage, and \
RasterSummary operations.')

parser.add_argument('--huc8', help='Time a HUC8 with 1-3 layers',
action='store_true')
parser.add_argument('--huc10', help='Time a HUC10 with 1-3 layers',
Expand All @@ -85,12 +91,15 @@ parser.add_argument('--rga', help='Time a RasterGroupedAverage operation',
action='store_true')
parser.add_argument('--rlj', help='Time a RasterLinesJoin operation',
action='store_true')
parser.add_argument('--rs', help='Time a RasterSummary operation',
action='store_true')

args = vars(parser.parse_args(sys.argv[1:]))

if True not in args.values():
time_raster_lines_join()
time_raster_grouped_average()
time_raster_summary()
time_huc12()
time_huc10()
time_huc8()
Expand All @@ -100,3 +109,4 @@ else:
if args['huc12']: time_huc12()
if args['rga']: time_raster_grouped_average()
if args['rlj']: time_raster_lines_join()
if args['rs']: time_raster_summary()

Large diffs are not rendered by default.

0 comments on commit 9808a7c

Please sign in to comment.