Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pyramid tile image #685

Open
mingnet opened this issue Sep 29, 2018 · 3 comments
Open

Update pyramid tile image #685

mingnet opened this issue Sep 29, 2018 · 3 comments

Comments

@mingnet
Copy link

mingnet commented Sep 29, 2018

I am trying to ingest a batch of large tiff images. And my spark cluster doesn't have a lot of memory and resources. So I tried to ingest images in multiple batches

I plan to generate a pyramid of the first tiff image and then write it to disk. Then generate a pyramid of the second tiff image and update to the same directory

I am trying to add an update code

./geopyspark-backend/geotrellis/src/main/scala/geopyspark/geotrellis/io/LayerWriterWrapper.scala

  def update(
    layerName: String,
    spatialRDD: TiledRasterLayer[SpatialKey]
  ): Unit = {
    val id =
      spatialRDD.zoomLevel match {
        case Some(zoom) => LayerId(layerName, zoom)
        case None => LayerId(layerName, 0)
      }
    layerWriter.update(id, spatialRDD.rdd)
  }

  def overwrite(
    layerName: String,
    spatialRDD: TiledRasterLayer[SpatialKey]
  ): Unit = {
    val id =
      spatialRDD.zoomLevel match {
        case Some(zoom) => LayerId(layerName, zoom)
        case None => LayerId(layerName, 0)
      }
    layerWriter.overwrite(id, spatialRDD.rdd)
  }

./geopyspark/geotrellis/catalog.py

def update(uri,
          layer_name,
          tiled_raster_layer,
          store=None):
    if tiled_raster_layer.zoom_level is None:
        Log.warn(tiled_raster_layer.pysc, "The given layer doesn't not have a zoom_level. Writing to zoom 0.")

    if store:
        store = AttributeStore.build(store)
    else:
        store = AttributeStore.cached(uri)

    pysc = tiled_raster_layer.pysc

    writer = pysc._gateway.jvm.geopyspark.geotrellis.io.LayerWriterWrapper(
        store.wrapper.attributeStore(), uri)

    writer.update(layer_name, tiled_raster_layer.srdd)

def overwrite(uri,
          layer_name,
          tiled_raster_layer,
          store=None):
    if tiled_raster_layer.zoom_level is None:
        Log.warn(tiled_raster_layer.pysc, "The given layer doesn't not have a zoom_level. Writing to zoom 0.")

    if store:
        store = AttributeStore.build(store)
    else:
        store = AttributeStore.cached(uri)

    pysc = tiled_raster_layer.pysc

    writer = pysc._gateway.jvm.geopyspark.geotrellis.io.LayerWriterWrapper(
        store.wrapper.attributeStore(), uri)

    writer.overwrite(layer_name, tiled_raster_layer.srdd)

Then I ingest the data like this.

def writefile(srcpath, key, destpath):
    raster_layer = gps.geotiff.get(layer_type=gps.LayerType.SPATIAL, uri=srcpath,num_partitions=100)
    raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    tiled_layer = raster_layer.tile_to_layout(layout=gps.GlobalLayout(), target_crs=3857)
    pyramid = tiled_layer.pyramid()
    for layer in pyramid.levels.values():
        gps.write(destpath, key, layer, time_unit=gps.TimeUnit.DAYS)
    raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)

def updatefile(srcpath, key, destpath):
    raster_layer = gps.geotiff.get(layer_type=gps.LayerType.SPATIAL, uri=srcpath,num_partitions=100)
    raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    tiled_layer = raster_layer.tile_to_layout(layout=gps.GlobalLayout(), target_crs=3857)
    pyramid = tiled_layer.pyramid()
    for layer in pyramid.levels.values():
        gps.update(destpath, key, layer)
    raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)

tiflist = [os.path.join(basepath, b) for b in filelist]
writefile(tiflist[0], key, destpath)
for t in tiflist[1:]:
    updatefile(t, key, destpath)

Then run an error and prompt

py4j.protocol.Py4JJavaError: An error occurred while calling o131.update.
: geotrellis.spark.io.package$LayerOutOfKeyBoundsError: Updating rdd is out of the key index space for Layer(name = "201706", zoom = 14): KeyBounds(SpatialKey(13341,6446),SpatialKey(13460,6569)). You must reindex this layer with large enough key bounds for this update.

What should I do, what good advice?

@jbouffard
Copy link
Collaborator

Hey, @mingnet! As I mentioned in your other issue, I'm really sorry about responding to your issue so late 🙁

The reason you're getting that error is because LayerWriter.update fails when trying to update a saved catalog with a layer whose KeyBounds are outside of the layer's (see here). What this means is that unfortunately your implementation won't work as each layer will have different KeyBounds.

The most straightforward way around this would be to read all of your files a once and then save them together as one layer. I know you said that you're working with a small cluster, but if you can show me the script you're using as well as give me some info about your cluster, I may be able to point out places where you could improve the performance. I think we should try this first before going to the next alternative (which is more involved/complicated).

@mingnet
Copy link
Author

mingnet commented Oct 23, 2018

I have tried another solution, but I still have some problems. Maybe you are interested to know. I tried to generate a global KeyBounds at the beginning. I am writing another function in the file(./geopyspark-backend/geotrellis/src/main/scala/geopyspark/geotrellis/io/LayerWriterWrapper.scala)

  def writeSpatialGlobal(
    layerName: String,
    spatialRDD: TiledRasterLayer[SpatialKey],
    indexStrategy: String
  ): Unit = {
    val id =
      spatialRDD.zoomLevel match {
        case Some(zoom) => LayerId(layerName, zoom)
        case None => LayerId(layerName, 0)
      }
    val indexKeyBounds = KeyBounds[SpatialKey](SpatialKey(0, 0), SpatialKey(spatialRDD.rdd.metadata.layout.layoutCols, spatialRDD.rdd.metadata.layout.layoutRows))
    val indexMethod = getSpatialIndexMethod(indexStrategy)
    val keyIndex = indexMethod.createIndex(indexKeyBounds)
    layerWriter.write(id, spatialRDD.rdd, keyIndex)
  }

I plan to call this function when processing the first batch. This has a global KeyBounds. Then update the data of other batches. But this function is very very slow to execute. As a result, I was very difficult to finish the first batch. Because I don't know enough about geotrellis. So I don't understand why. Just generate a different index. I think it should be as fast as the writeSpatial function.

@jbouffard
Copy link
Collaborator

@mingnet I see. Based on the work you showed, it looks like everything should work okay. What backend are you trying to write to? There can be a lot of I/O involved for some of them, which could greatly increase the running time. Other than what I just mentioned, there could be other causes for slowdown, but I won't be able to say for sure without seeing your Python code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants