Skip to content

Commit

Permalink
Merge pull request #70 from WikiWatershed/tt/collections-api-futures
Browse files Browse the repository at this point in the history
Collections API: Use Futures

Connects #67
  • Loading branch information
rajadain authored Sep 1, 2017
2 parents f3241af + c37560f commit 821d107
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
38 changes: 24 additions & 14 deletions api/src/main/scala/Geoprocessing.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.wikiwatershed.mmw.geoprocessing

import java.util.concurrent.atomic.{LongAdder, DoubleAdder}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import collection.concurrent.TrieMap

Expand All @@ -17,10 +19,13 @@ trait Geoprocessing extends Utils {
* @param input The InputData
* @return A histogram of results
*/
def getRasterGroupedCount(input: InputData): ResultInt = {
def getRasterGroupedCount(input: InputData): Future[ResultInt] = {
val aoi = createAOIFromInput(input)
val rasterLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
ResultInt(rasterGroupedCount(rasterLayers, aoi))
val futureLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)

futureLayers.map { layers =>
ResultInt(rasterGroupedCount(layers, aoi))
}
}

/**
Expand All @@ -30,21 +35,23 @@ trait Geoprocessing extends Utils {
* @param input The InputData
* @return A histogram of results
*/
def getRasterGroupedAverage(input: InputData): ResultDouble = {
def getRasterGroupedAverage(input: InputData): Future[ResultDouble] = {
val aoi = createAOIFromInput(input)
val rasterLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
val futureLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
val targetLayer = input.targetRaster match {
case Some(targetRaster) =>
cropSingleRasterToAOI(targetRaster, input.zoom, aoi)
case None =>
throw new Exception("Request data missing required 'targetRaster'.")
}

val average =
if (rasterLayers.isEmpty) rasterAverage(targetLayer, aoi)
else rasterGroupedAverage(rasterLayers, targetLayer, aoi)
futureLayers.map { rasterLayers =>
val average =
if (rasterLayers.isEmpty) rasterAverage(targetLayer, aoi)
else rasterGroupedAverage(rasterLayers, targetLayer, aoi)

ResultDouble(average)
ResultDouble(average)
}
}

/**
Expand All @@ -53,9 +60,9 @@ trait Geoprocessing extends Utils {
* @param input The InputData
* @return A histogram of results
*/
def getRasterLinesJoin(input: InputData): ResultInt = {
def getRasterLinesJoin(input: InputData): Future[ResultInt] = {
val aoi = createAOIFromInput(input)
val rasterLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
val futureLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
val lines = input.vector match {
case Some(vector) =>
input.vectorCRS match {
Expand All @@ -67,7 +74,10 @@ trait Geoprocessing extends Utils {
case None =>
throw new Exception("Request data missing required 'vector'.")
}
ResultInt(rasterLinesJoin(rasterLayers, lines))

futureLayers.map { rasterLayers =>
ResultInt(rasterLinesJoin(rasterLayers, lines))
}
}

private case class TilePixel(key: SpatialKey, col: Int, row: Int)
Expand All @@ -85,7 +95,7 @@ trait Geoprocessing extends Utils {
lines: Seq[MultiLine]
): Map[String, Int] = {
val metadata = rasterLayers.head.metadata
var pixelGroups: TrieMap[(List[Int], TilePixel), Int] = TrieMap.empty
val pixelGroups: TrieMap[(List[Int], TilePixel), Int] = TrieMap.empty

joinCollectionLayers(rasterLayers).par
.foreach({ case (key, tiles) =>
Expand Down Expand Up @@ -213,7 +223,7 @@ trait Geoprocessing extends Utils {
// assume all the layouts are the same
val metadata = rasterLayers.head.metadata

var pixelGroups: TrieMap[List[Int], LongAdder] = TrieMap.empty
val pixelGroups: TrieMap[List[Int], LongAdder] = TrieMap.empty

joinCollectionLayers(rasterLayers).par
.foreach({ case (key, tiles) =>
Expand Down
11 changes: 7 additions & 4 deletions api/src/main/scala/Utils.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.wikiwatershed.mmw.geoprocessing

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import spray.json._
import spray.json.DefaultJsonProtocol._

import geotrellis.proj4.{CRS, ConusAlbers, LatLng, WebMercator}

import geotrellis.raster._
import geotrellis.raster.rasterize._
import geotrellis.vector._
import geotrellis.vector.io._
import geotrellis.spark._
Expand All @@ -32,8 +33,10 @@ trait Utils {
rasterIds: List[String],
zoom: Int,
aoi: MultiPolygon
): Seq[TileLayerCollection[SpatialKey]] =
rasterIds.map { str => cropSingleRasterToAOI(str, zoom, aoi) }
): Future[Seq[TileLayerCollection[SpatialKey]]] =
Future.sequence {
rasterIds.map { str => Future(cropSingleRasterToAOI(str, zoom, aoi))}
}

/**
* Given a zoom level & area of interest, transform a raster filename into a
Expand Down

0 comments on commit 821d107

Please sign in to comment.