From 397d191d7442e96aa239759bf8ed21db7d8838f5 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Sun, 26 Jun 2022 20:22:42 -0400 Subject: [PATCH] Properly pass IORuntime everywhere instead of the old ExecutionContext --- .../accumulo/AccumuloWriteStrategy.scala | 27 +++---- .../accumulo/AccumuloCollectionReader.scala | 14 ++-- build.sbt | 4 +- .../store/cassandra/CassandraRDDReader.scala | 9 +-- .../store/cassandra/CassandraRDDWriter.scala | 24 +++--- .../cassandra/CassandraCollectionReader.scala | 11 ++- .../spark/gdal/GDALRasterSourceRDDSpec.scala | 51 +++++++----- project/Settings.scala | 2 +- .../spark/store/s3/S3LayerReader.scala | 6 +- .../spark/store/s3/S3LayerWriter.scala | 24 +++--- .../spark/store/s3/S3RDDReader.scala | 9 +-- .../spark/store/s3/S3RDDWriter.scala | 21 ++--- .../geotrellis/spark/store/s3/SaveToS3.scala | 21 ++--- .../spark/store/s3/SaveToS3Methods.scala | 11 ++- .../spark/store/s3/cog/S3COGLayerReader.scala | 8 +- .../spark/store/s3/cog/S3COGLayerWriter.scala | 12 +-- .../s3/geotiff/S3GeoTiffLayerReader.scala | 11 ++- .../store/s3/S3CollectionReader.scala | 11 ++- .../s3/cog/S3COGCollectionLayerReader.scala | 6 +- .../spark/store/cog/COGLayerReader.scala | 4 +- .../spark/store/file/FileRDDReader.scala | 9 +-- .../store/file/cog/FileCOGLayerReader.scala | 6 +- .../file/geotiff/FileGeoTiffLayerReader.scala | 13 ++- .../hadoop/cog/HadoopCOGLayerReader.scala | 8 +- .../hadoop/geotiff/GeoTiffLayerReader.scala | 13 +-- .../geotiff/HadoopGeoTiffLayerReader.scala | 13 ++- store/src/main/resources/reference.conf | 2 - .../scala/geotrellis/store/AsyncWriter.scala | 79 ++++++++++--------- .../store/cog/COGCollectionLayerReader.scala | 6 +- .../file/FileCollectionLayerReader.scala | 6 +- .../store/file/FileCollectionReader.scala | 5 +- .../cog/FileCOGCollectionLayerReader.scala | 7 +- .../hadoop/HadoopCollectionLayerReader.scala | 8 +- .../store/hadoop/HadoopCollectionReader.scala | 13 +-- .../cog/HadoopCOGCollectionLayerReader.scala | 7 +- .../store/util/BlockingThreadPool.scala | 49 ------------ .../store/util/IORuntimeTransient.scala | 24 ++++++ .../scala/geotrellis/store/util/IOUtils.scala | 18 ++--- 38 files changed, 254 insertions(+), 318 deletions(-) delete mode 100644 store/src/main/scala/geotrellis/store/util/BlockingThreadPool.scala create mode 100644 store/src/main/scala/geotrellis/store/util/IORuntimeTransient.scala diff --git a/accumulo-spark/src/main/scala/geotrellis/spark/store/accumulo/AccumuloWriteStrategy.scala b/accumulo-spark/src/main/scala/geotrellis/spark/store/accumulo/AccumuloWriteStrategy.scala index e379cabb57..7e39582fd1 100644 --- a/accumulo-spark/src/main/scala/geotrellis/spark/store/accumulo/AccumuloWriteStrategy.scala +++ b/accumulo-spark/src/main/scala/geotrellis/spark/store/accumulo/AccumuloWriteStrategy.scala @@ -19,7 +19,7 @@ package geotrellis.spark.store.accumulo import geotrellis.store.accumulo._ import geotrellis.store.hadoop.util._ import geotrellis.spark.util._ -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.fs.Path @@ -28,14 +28,11 @@ import org.apache.accumulo.core.data.{Key, Mutation, Value} import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat import org.apache.accumulo.core.client.BatchWriterConfig -import cats.effect.IO -import cats.syntax.apply._ +import cats.effect._ import cats.syntax.either._ import java.util.UUID -import scala.concurrent.ExecutionContext - object AccumuloWriteStrategy { def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest") } @@ -110,30 +107,28 @@ object HdfsWriteStrategy { * @param config Configuration for the BatchWriters */ class SocketWriteStrategy( - @transient config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(BlockingThreadPool.threads), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + @transient config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(IORuntimeTransient.ThreadsNumber), + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends AccumuloWriteStrategy { val kwConfig = KryoWrapper(config) // BatchWriterConfig is not java serializable def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit = { kvPairs.foreachPartition { partition => if(partition.nonEmpty) { - implicit val ec = executionContext - // TODO: runime should be configured - import cats.effect.unsafe.implicits.global + implicit val ioRuntime: unsafe.IORuntime = runtime val writer = instance.connector.createBatchWriter(table, kwConfig.value) try { - val mutations: fs2.Stream[IO, Mutation] = fs2.Stream.fromIterator[IO]( + val mutations: fs2.Stream[IO, Mutation] = fs2.Stream.fromBlockingIterator[IO]( partition.map { case (key, value) => val mutation = new Mutation(key.getRow) mutation.put(key.getColumnFamily, key.getColumnQualifier, System.currentTimeMillis(), value) mutation - }, 1 + }, chunkSize = 1 ) - val write = { mutation: Mutation => fs2.Stream eval IO { writer.addMutation(mutation) } } + val write = { mutation: Mutation => fs2.Stream eval IO.blocking { writer.addMutation(mutation) } } (mutations map write) .parJoinUnbounded @@ -150,7 +145,7 @@ class SocketWriteStrategy( object SocketWriteStrategy { def apply( - config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(BlockingThreadPool.threads), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext - ): SocketWriteStrategy = new SocketWriteStrategy(config, executionContext) + config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(IORuntimeTransient.ThreadsNumber), + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime + ): SocketWriteStrategy = new SocketWriteStrategy(config, runtime) } diff --git a/accumulo/src/main/scala/geotrellis/store/accumulo/AccumuloCollectionReader.scala b/accumulo/src/main/scala/geotrellis/store/accumulo/AccumuloCollectionReader.scala index f7d0338c40..d6e6ac320d 100644 --- a/accumulo/src/main/scala/geotrellis/store/accumulo/AccumuloCollectionReader.scala +++ b/accumulo/src/main/scala/geotrellis/store/accumulo/AccumuloCollectionReader.scala @@ -19,18 +19,16 @@ package geotrellis.store.accumulo import geotrellis.layer._ import geotrellis.store.avro.codecs.KeyValueRecordCodec import geotrellis.store.avro.{AvroEncoder, AvroRecordCodec} -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import org.apache.accumulo.core.data.{Range => AccumuloRange} import org.apache.accumulo.core.security.Authorizations import org.apache.avro.Schema import org.apache.hadoop.io.Text import cats.effect._ -import cats.syntax.apply._ import cats.syntax.either._ import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag object AccumuloCollectionReader { @@ -41,7 +39,7 @@ object AccumuloCollectionReader { decomposeBounds: KeyBounds[K] => Seq[AccumuloRange], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime )(implicit instance: AccumuloInstance): Seq[(K, V)] = { if(queryKeyBounds.isEmpty) return Seq.empty[(K, V)] @@ -50,13 +48,11 @@ object AccumuloCollectionReader { val ranges = queryKeyBounds.flatMap(decomposeBounds).iterator - implicit val ec = executionContext - // TODO: runime should be configured - import cats.effect.unsafe.implicits.global + implicit val ioRuntime: unsafe.IORuntime = runtime - val range: fs2.Stream[IO, AccumuloRange] = fs2.Stream.fromIterator[IO](ranges, 1) + val range: fs2.Stream[IO, AccumuloRange] = fs2.Stream.fromIterator[IO](ranges, chunkSize = 1) - val read = { range: AccumuloRange => fs2.Stream eval IO { + val read = { range: AccumuloRange => fs2.Stream eval IO.blocking { val scanner = instance.connector.createScanner(table, new Authorizations()) scanner.setRange(range) scanner.fetchColumnFamily(columnFamily) diff --git a/build.sbt b/build.sbt index c3d992ed5e..6525199a26 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,9 @@ import sbt.Keys._ ThisBuild / versionScheme := Some("semver-spec") -ThisBuild / scalaVersion := "2.12.15" +ThisBuild / scalaVersion := "2.12.16" ThisBuild / organization := "org.locationtech.geotrellis" -ThisBuild / crossScalaVersions := List("2.12.15", "2.13.8") +ThisBuild / crossScalaVersions := List("2.12.16", "2.13.8") lazy val root = Project("geotrellis", file(".")) .aggregate( diff --git a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDReader.scala b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDReader.scala index 5f45a83355..55196a0f1d 100644 --- a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDReader.scala +++ b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDReader.scala @@ -22,9 +22,10 @@ import geotrellis.store.cassandra._ import geotrellis.store.avro.codecs.KeyValueRecordCodec import geotrellis.store.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.store.index.{IndexRanges, MergeQueue} -import geotrellis.store.util.{BlockingThreadPool, IOUtils} +import geotrellis.store.util.{IORuntimeTransient, IOUtils} import geotrellis.spark.util.KryoWrapper +import cats.effect._ import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs} import org.apache.avro.Schema @@ -35,8 +36,6 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import java.math.BigInteger -import scala.concurrent.ExecutionContext - object CassandraRDDReader { def read[K: Boundable : AvroRecordCodec : ClassTag, V: AvroRecordCodec : ClassTag]( instance: CassandraInstance, @@ -48,7 +47,7 @@ object CassandraRDDReader { filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, numPartitions: Option[Int] = None, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime )(implicit sc: SparkContext): RDD[(K, V)] = { if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)] @@ -73,7 +72,7 @@ object CassandraRDDReader { sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(BigInt, BigInt)]] => instance.withSession { session => - implicit val ec = executionContext + implicit val ioRuntime: unsafe.IORuntime = runtime val statement = session.prepare(query) val result = partition map { seq => diff --git a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala index cfc6c4efa7..a995c773be 100644 --- a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala +++ b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala @@ -22,15 +22,14 @@ import geotrellis.store.avro.codecs._ import geotrellis.store.cassandra._ import geotrellis.spark.store._ import geotrellis.spark.util.KryoWrapper -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import com.datastax.driver.core.DataType._ import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs} import com.datastax.driver.core.ResultSet import com.datastax.driver.core.schemabuilder.SchemaBuilder -import cats.effect.IO -import cats.syntax.apply._ +import cats.effect._ import cats.syntax.either._ import org.apache.avro.Schema import org.apache.spark.rdd.RDD @@ -39,7 +38,6 @@ import java.nio.ByteBuffer import java.math.BigInteger import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext object CassandraRDDWriter { def write[K: AvroRecordCodec, V: AvroRecordCodec]( @@ -49,8 +47,8 @@ object CassandraRDDWriter { decomposeKey: K => BigInt, keyspace: String, table: String, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext - ): Unit = update(rdd, instance, layerId, decomposeKey, keyspace, table, None, None, executionContext) + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime + ): Unit = update(rdd, instance, layerId, decomposeKey, keyspace, table, None, None, runtime) private[cassandra] def update[K: AvroRecordCodec, V: AvroRecordCodec]( raster: RDD[(K, V)], @@ -61,7 +59,7 @@ object CassandraRDDWriter { table: String, writerSchema: Option[Schema], mergeFunc: Option[(V,V) => V], - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ): Unit = { implicit val sc = raster.sparkContext @@ -110,15 +108,13 @@ object CassandraRDDWriter { val rows: fs2.Stream[IO, (BigInt, Vector[(K,V)])] = fs2.Stream.fromIterator[IO]( - partition.map { case (key, value) => (key, value.toVector) }, 1 + partition.map { case (key, value) => (key, value.toVector) }, chunkSize = 1 ) - implicit val ec = executionContext - // TODO: runime should be configured - import cats.effect.unsafe.implicits.global + implicit val ioRuntime: unsafe.IORuntime = runtime def elaborateRow(row: (BigInt, Vector[(K,V)])): fs2.Stream[IO, (BigInt, Vector[(K,V)])] = { - fs2.Stream eval IO { + fs2.Stream eval IO.blocking { val (key, current) = row val updated = LayerWriter.updateRecords(mergeFunc, current, existing = { val oldRow = session.execute(readStatement.bind(key: BigInteger)) @@ -143,7 +139,7 @@ object CassandraRDDWriter { def retire(row: (BigInt, ByteBuffer)): fs2.Stream[IO, ResultSet] = { val (id, value) = row - fs2.Stream eval IO { + fs2.Stream eval IO.blocking { session.execute(writeStatement.bind(id: BigInteger, value)) } } @@ -154,7 +150,7 @@ object CassandraRDDWriter { .map(retire) .parJoinUnbounded .onComplete { - fs2.Stream eval IO { + fs2.Stream eval IO.blocking { session.closeAsync() session.getCluster.closeAsync() } diff --git a/cassandra/src/main/scala/geotrellis/store/cassandra/CassandraCollectionReader.scala b/cassandra/src/main/scala/geotrellis/store/cassandra/CassandraCollectionReader.scala index 0fe4ac424e..a724b4f3ca 100644 --- a/cassandra/src/main/scala/geotrellis/store/cassandra/CassandraCollectionReader.scala +++ b/cassandra/src/main/scala/geotrellis/store/cassandra/CassandraCollectionReader.scala @@ -22,8 +22,9 @@ import geotrellis.store.avro.codecs.KeyValueRecordCodec import geotrellis.store.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.store.index.MergeQueue import geotrellis.store.LayerId -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient +import cats.effect._ import org.apache.avro.Schema import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs} @@ -32,8 +33,6 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import java.math.BigInteger -import scala.concurrent.ExecutionContext - object CassandraCollectionReader { def read[K: Boundable : AvroRecordCodec : ClassTag, V: AvroRecordCodec : ClassTag]( instance: CassandraInstance, @@ -44,7 +43,7 @@ object CassandraCollectionReader { decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, - executionContext: ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ): Seq[(K, V)] = { if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)] @@ -56,7 +55,7 @@ object CassandraCollectionReader { else queryKeyBounds.flatMap(decomposeBounds) - implicit val ec = executionContext + implicit val ioRuntime: unsafe.IORuntime = runtime val query = QueryBuilder.select("value") .from(keyspace, table) @@ -68,7 +67,7 @@ object CassandraCollectionReader { instance.withSessionDo { session => val statement = session.prepare(query) - IOUtils.parJoin[K, V](ranges.iterator){ index: BigInt => + IOUtils.parJoin[K, V](ranges.iterator) { index: BigInt => val row = session.execute(statement.bind(index: BigInteger)) if (row.asScala.nonEmpty) { val bytes = row.one().getBytes("value").array() diff --git a/gdal-spark/src/test/scala/geotrellis/spark/gdal/GDALRasterSourceRDDSpec.scala b/gdal-spark/src/test/scala/geotrellis/spark/gdal/GDALRasterSourceRDDSpec.scala index 1027b0ebdb..b1d10f9ac5 100644 --- a/gdal-spark/src/test/scala/geotrellis/spark/gdal/GDALRasterSourceRDDSpec.scala +++ b/gdal-spark/src/test/scala/geotrellis/spark/gdal/GDALRasterSourceRDDSpec.scala @@ -27,23 +27,42 @@ import geotrellis.spark._ import geotrellis.spark.store.hadoop._ import geotrellis.spark.testkit._ import geotrellis.store.hadoop._ - -import cats.effect.IO -import cats.implicits._ +import cats.effect._ +import cats.syntax.parallel._ import spire.syntax.cfor._ import org.apache.spark.rdd.RDD - import org.scalatest.Inspectors._ import org.scalatest.BeforeAndAfterAll import org.scalatest.funspec.AnyFunSpec -import java.util.concurrent.Executors +import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor} import scala.concurrent.ExecutionContext class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with BeforeAndAfterAll { import geotrellis.GDALTestUtils._ - // TODO: runime should be configured - import cats.effect.unsafe.implicits.global + + def createRuntime(compute: ExecutionContext, blocking: ExecutionContext, shutdown: () => Unit): unsafe.IORuntime = { + val scheduler = new ScheduledThreadPoolExecutor( + 1, + { r: Runnable => + val t = new Thread(r) + t.setName("io-scheduler") + t.setDaemon(true) + t.setPriority(Thread.MAX_PRIORITY) + t + }) + scheduler.setRemoveOnCancelPolicy(true) + val (s, sh) = (unsafe.Scheduler.fromScheduledExecutor(scheduler), { () => scheduler.shutdown() }) + + unsafe.IORuntime(compute, blocking, s, () => { sh(); shutdown(); }, unsafe.IORuntimeConfig()) + } + + implicit val runtime: unsafe.IORuntime = { + val n = 100 + val pool = Executors.newFixedThreadPool(n) + val ec = ExecutionContext.fromExecutor(pool) + createRuntime(ExecutionContext.global, ec, () => pool.shutdown()) + } val uri = gdalGeoTiffPath("vlm/aspect-tiled.tif") def filePathByIndex(i: Int): String = sparkGeoTiffPath(s"vlm/aspect-tiled-$i.tif") @@ -211,8 +230,9 @@ class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with Befor assertRDDLayersEqual(reprojectedExpectedRDDGDAL, reprojectedSourceRDD, true) } - def parallelSpec(n: Int = 1000): List[RasterSource] = { + def parallelSpec(blocking: Boolean = false, n: Int = 1000)(implicit runtime: unsafe.IORuntime): List[RasterSource] = { println(java.lang.Thread.activeCount()) + def toIO[T](value: => T): IO[T] = if(blocking) IO.blocking(value) else IO(value) /** Functions to trigger Datasets computation */ def ltsWithDatasetsTriggered(lts: LayoutTileSource[SpatialKey]): LayoutTileSource[SpatialKey] = { rsWithDatasetsTriggered(lts.source); lts } @@ -244,7 +264,7 @@ class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with Befor val res = (1 to n).toList.flatMap { _ => (0 to 4).flatMap { i => - List(IO { + List(toIO { // println(Thread.currentThread().getName()) // Thread.sleep((Math.random() * 100).toLong) val lts = reprojRS(i) @@ -252,7 +272,7 @@ class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with Befor reprojRS(i).source.resolutions dirtyCalls(reprojRS(i).source) - }, IO { + }, toIO { // println(Thread.currentThread().getName()) // Thread.sleep((Math.random() * 100).toLong) val lts = reprojRS(i) @@ -260,7 +280,7 @@ class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with Befor reprojRS(i).source.resolutions dirtyCalls(reprojRS(i).source) - }, IO { + }, toIO { // println(Thread.currentThread().getName()) // Thread.sleep((Math.random() * 100).toLong) val lts = reprojRS(i) @@ -280,18 +300,13 @@ class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with Befor it("should not fail on parallelization with a fork join pool") { val i = 1000 - parallelSpec(i) + parallelSpec(blocking = false, i) } it("should not fail on parallelization with a fixed thread pool") { val i = 1000 - val n = 100 - val pool = Executors.newFixedThreadPool(n) - val ec = ExecutionContext.fromExecutor(pool) - - parallelSpec(i) - pool.shutdown() + parallelSpec(blocking = true, i) } } } diff --git a/project/Settings.scala b/project/Settings.scala index 1c142c4c0c..aabf2c1dd3 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -100,7 +100,7 @@ object Settings { ).filter(_.asFile.canRead).map(Credentials(_)), addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full), - addCompilerPlugin("org.scalameta" % "semanticdb-scalac" % "4.5.1" cross CrossVersion.full), + addCompilerPlugin("org.scalameta" % "semanticdb-scalac" % "4.5.9" cross CrossVersion.full), libraryDependencies ++= (CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, 13)) => Nil diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3LayerReader.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3LayerReader.scala index 2f053fc9bf..5ecf2dec4b 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3LayerReader.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3LayerReader.scala @@ -30,8 +30,8 @@ import org.apache.spark.SparkContext import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.model._ import io.circe._ +import cats.effect._ -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag /** @@ -45,13 +45,13 @@ import scala.reflect.ClassTag class S3LayerReader( val attributeStore: AttributeStore, s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime )(implicit sc: SparkContext) extends FilteringLayerReader[LayerId] { val defaultNumPartitions = sc.defaultParallelism - def rddReader: S3RDDReader = new S3RDDReader(s3Client, executionContext) + def rddReader: S3RDDReader = new S3RDDReader(s3Client, runtime) def read[ K: AvroRecordCodec: Boundable: Decoder: ClassTag, diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3LayerWriter.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3LayerWriter.scala index b749bbdb80..53cffccd61 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3LayerWriter.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3LayerWriter.scala @@ -32,8 +32,8 @@ import org.apache.spark.rdd.RDD import org.log4s._ import io.circe._ import cats.Semigroup +import cats.effect._ -import scala.concurrent.ExecutionContext import scala.reflect._ /** @@ -43,8 +43,8 @@ import scala.reflect._ * @param keyPrefix S3 prefix to write the raster to * @param attributeStore AttributeStore to be used for storing raster metadata * @param putObjectModifier Function that will be applied ot S3 PutObjectRequests, so that they can be modified (e.g. to change the ACL settings) - * @param s3Client A function which returns an S3 Client (real or mock) into-which to save the data - * @param executionContext A function to get execution context + * @param s3Client A function which returns an S3 Client (real or mock) into-which to save the data + * @param runtime A function to get IORuntime */ class S3LayerWriter( val attributeStore: AttributeStore, @@ -52,11 +52,11 @@ class S3LayerWriter( keyPrefix: String, putObjectModifier: PutObjectRequest => PutObjectRequest = identity, s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends LayerWriter[LayerId] { @transient private[this] lazy val logger = getLogger - def rddWriter: S3RDDWriter = new S3RDDWriter(s3Client, executionContext) + def rddWriter: S3RDDWriter = new S3RDDWriter(s3Client, runtime) // Layer Updating def overwrite[ @@ -142,7 +142,7 @@ object S3LayerWriter { putObjectModifier: PutObjectRequest => PutObjectRequest, getClient: => S3Client = S3ClientProducer.get() ): S3LayerWriter = - new S3LayerWriter(attributeStore, bucket, prefix, putObjectModifier) + new S3LayerWriter(attributeStore, bucket, prefix, putObjectModifier, getClient) def apply(attributeStore: AttributeStore, bucket: String, prefix: String, s3Client: => S3Client): S3LayerWriter = new S3LayerWriter(attributeStore, bucket, prefix, identity, s3Client) @@ -153,19 +153,15 @@ object S3LayerWriter { def apply(attributeStore: S3AttributeStore, putObjectModifier: PutObjectRequest => PutObjectRequest): S3LayerWriter = apply(attributeStore, attributeStore.bucket, attributeStore.prefix, putObjectModifier, attributeStore.client) - def apply(bucket: String, prefix: String, s3Client: => S3Client): S3LayerWriter = { - val attStore = S3AttributeStore(bucket, prefix, s3Client) - apply(attStore) - } + def apply(bucket: String, prefix: String, s3Client: => S3Client): S3LayerWriter = + apply(S3AttributeStore(bucket, prefix, s3Client)) def apply( bucket: String, prefix: String, putObjectModifier: PutObjectRequest => PutObjectRequest, s3Client: => S3Client - ): S3LayerWriter = { - val attStore = S3AttributeStore(bucket, prefix, s3Client) - apply(attStore, putObjectModifier) - } + ): S3LayerWriter = + apply(S3AttributeStore(bucket, prefix, s3Client), bucket, prefix, putObjectModifier, s3Client) } diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDReader.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDReader.scala index d315517fab..e86744aa1a 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDReader.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDReader.scala @@ -23,7 +23,7 @@ import geotrellis.store.index.{IndexRanges, MergeQueue} import geotrellis.store.util.{IOUtils => GTIOUtils} import geotrellis.store.s3.S3ClientProducer import geotrellis.spark.util.KryoWrapper -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import software.amazon.awssdk.services.s3.model.{GetObjectRequest, S3Exception} import software.amazon.awssdk.services.s3.S3Client @@ -31,12 +31,11 @@ import org.apache.avro.Schema import org.apache.commons.io.IOUtils import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD - -import scala.concurrent.ExecutionContext +import cats.effect._ class S3RDDReader( s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends Serializable { def read[ @@ -66,7 +65,7 @@ class S3RDDReader( sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(BigInt, BigInt)]] => - implicit val ec = executionContext + implicit val r = runtime val s3Client = this.s3Client val writerSchema = kwWriterSchema.value.getOrElse(_recordCodec.schema) partition flatMap { seq => diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDWriter.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDWriter.scala index 9f53aa2a56..30054f6eab 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDWriter.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDWriter.scala @@ -21,10 +21,9 @@ import geotrellis.store.avro._ import geotrellis.store.avro.codecs.KeyValueRecordCodec import geotrellis.store.s3._ import geotrellis.spark.util.KryoWrapper -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient -import cats.effect.IO -import cats.syntax.apply._ +import cats.effect._ import cats.syntax.either._ import software.amazon.awssdk.services.s3.model.{S3Exception, PutObjectRequest, PutObjectResponse, GetObjectRequest} import software.amazon.awssdk.services.s3.S3Client @@ -33,12 +32,11 @@ import org.apache.avro.Schema import org.apache.commons.io.IOUtils import org.apache.spark.rdd.RDD -import scala.concurrent.ExecutionContext import scala.reflect._ class S3RDDWriter( s3Client: => S3Client = S3ClientProducer.get(), - excutionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends Serializable { def write[K: AvroRecordCodec: ClassTag, V: AvroRecordCodec: ClassTag]( @@ -79,16 +77,13 @@ class S3RDDWriter( val s3Client = this.s3Client val schema = kwWriterSchema.value.getOrElse(_recordCodec.schema) - implicit val ec = excutionContext - - // TODO: runime should be configured - import cats.effect.unsafe.implicits.global + implicit val r = runtime val rows: fs2.Stream[IO, (String, Vector[(K, V)])] = - fs2.Stream.fromIterator[IO](partition.map { case (key, value) => (key, value.toVector) }, 1) + fs2.Stream.fromIterator[IO](partition.map { case (key, value) => (key, value.toVector) }, chunkSize = 1) def elaborateRow(row: (String, Vector[(K,V)])): fs2.Stream[IO, (String, Vector[(K,V)])] = { - fs2.Stream eval IO { + fs2.Stream eval IO.blocking { val (key, current) = row val updated = LayerWriter.updateRecords(mergeFunc, current, existing = { try { @@ -109,7 +104,7 @@ class S3RDDWriter( } def rowToRequest(row: (String, Vector[(K,V)])): fs2.Stream[IO, (PutObjectRequest, RequestBody)] = { - fs2.Stream eval IO { + fs2.Stream eval IO.blocking { val (key, kvs) = row val contentBytes = AvroEncoder.toBinary(kvs)(_codec) val request = PutObjectRequest.builder() @@ -124,7 +119,7 @@ class S3RDDWriter( } def retire(request: PutObjectRequest, requestBody: RequestBody): fs2.Stream[IO, PutObjectResponse] = - fs2.Stream eval IO { s3Client.putObject(request, requestBody) } + fs2.Stream eval IO.blocking { s3Client.putObject(request, requestBody) } rows .flatMap(elaborateRow) diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3.scala index dbf46f9368..6e1259c5db 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3.scala @@ -19,18 +19,16 @@ package geotrellis.spark.store.s3 import geotrellis.layer.SpatialKey import geotrellis.store.LayerId import geotrellis.store.s3._ -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import software.amazon.awssdk.services.s3.model.{PutObjectRequest, PutObjectResponse, S3Exception} import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.core.sync.RequestBody import org.apache.spark.rdd.RDD -import cats.effect.IO -import cats.syntax.apply._ +import cats.effect._ import cats.syntax.either._ import java.net.URI -import scala.concurrent.ExecutionContext object SaveToS3 { /** @@ -54,14 +52,14 @@ object SaveToS3 { * @param rdd An RDD of K, Byte-Array pairs (where the byte-arrays contains image data) to send to S3 * @param putObjectModifier Function that will be applied ot S3 PutObjectRequests, so that they can be modified (e.g. to change the ACL settings) * @param s3Client A function which returns an S3 Client (real or mock) into-which to save the data - * @param executionContext A function to get execution context + * @param runtime A function to get IORuntime */ def apply[K, V]( rdd: RDD[(K, V)], keyToUri: K => String, putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p }, s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime )(implicit ev: V => Array[Byte]): Unit = { val keyToPrefix: K => (String, String) = key => { val uri = new URI(keyToUri(key)) @@ -74,7 +72,7 @@ object SaveToS3 { rdd.foreachPartition { partition => val s3client = s3Client val requests: fs2.Stream[IO, (PutObjectRequest, RequestBody)] = - fs2.Stream.fromIterator[IO]( + fs2.Stream.fromBlockingIterator[IO]( partition.map { case (key, data) => val bytes = ev(data) val (bucket, path) = keyToPrefix(key) @@ -86,18 +84,15 @@ object SaveToS3 { val requestBody = RequestBody.fromBytes(bytes) (putObjectModifier(request), requestBody) - }, 1 + }, chunkSize = 1 ) - implicit val ec = executionContext - - // TODO: runime should be configured - import cats.effect.unsafe.implicits.global + implicit val r = runtime import geotrellis.store.util.IOUtils._ val write: (PutObjectRequest, RequestBody) => fs2.Stream[IO, PutObjectResponse] = (request, requestBody) => { - fs2.Stream eval IO { + fs2.Stream eval IO.blocking { //request.getInputStream.reset() // reset in case of retransmission to avoid 400 error s3client.putObject(request, requestBody) }.retryEBO { diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3Methods.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3Methods.scala index caeff014b8..665773606e 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3Methods.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3Methods.scala @@ -16,14 +16,13 @@ package geotrellis.spark.store.s3 -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import geotrellis.util.MethodExtensions +import cats.effect._ import software.amazon.awssdk.services.s3.model.PutObjectRequest import org.apache.spark.rdd.RDD -import scala.concurrent.ExecutionContext - class SaveToS3Methods[K, V](val self: RDD[(K, V)]) extends MethodExtensions[RDD[(K, V)]] { /** @@ -31,9 +30,9 @@ class SaveToS3Methods[K, V](val self: RDD[(K, V)]) extends MethodExtensions[RDD[ * * @param keyToUri A function from K (a key) to an S3 URI * @param putObjectModifier Function that will be applied ot S3 PutObjectRequests, so that they can be modified (e.g. to change the ACL settings) - * @param executionContext A function to get execution context + * @param runtime A function to get IORuntime */ - def saveToS3(keyToUri: K => String, putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p }, executionContext: => ExecutionContext = BlockingThreadPool.executionContext) + def saveToS3(keyToUri: K => String, putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p }, runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime) (implicit ev: V => Array[Byte]): Unit = - SaveToS3(self, keyToUri, putObjectModifier, executionContext = executionContext) + SaveToS3(self, keyToUri, putObjectModifier, runtime = runtime) } diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/cog/S3COGLayerReader.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/cog/S3COGLayerReader.scala index c0ef2255e6..4b3c59916a 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/cog/S3COGLayerReader.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/cog/S3COGLayerReader.scala @@ -30,8 +30,8 @@ import org.apache.spark.SparkContext import software.amazon.awssdk.services.s3._ import software.amazon.awssdk.services.s3.model._ import io.circe._ +import cats.effect._ -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag import java.net.URI @@ -43,9 +43,9 @@ import java.net.URI class S3COGLayerReader( val attributeStore: AttributeStore, s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime )(@transient implicit val sc: SparkContext) extends COGLayerReader[LayerId] { - @transient implicit lazy val ec: ExecutionContext = executionContext + @transient implicit lazy val ioRuntime: unsafe.IORuntime = runtime val defaultNumPartitions: Int = sc.defaultParallelism @@ -98,6 +98,6 @@ object S3COGLayerReader { new S3COGLayerReader( attributeStore, attributeStore.client, - BlockingThreadPool.executionContext + IORuntimeTransient.IORuntime ) } diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/cog/S3COGLayerWriter.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/cog/S3COGLayerWriter.scala index ba909d0c67..4b6acd2b68 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/cog/S3COGLayerWriter.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/cog/S3COGLayerWriter.scala @@ -28,15 +28,15 @@ import geotrellis.store.cog.vrt.VRT.IndexedSimpleSource import geotrellis.store.index.{Index, KeyIndex} import geotrellis.store.s3._ import geotrellis.spark.store.cog._ -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import software.amazon.awssdk.services.s3.model.{GetObjectRequest, PutObjectRequest, S3Exception} import software.amazon.awssdk.services.s3._ import software.amazon.awssdk.core.sync.RequestBody import org.apache.commons.io.IOUtils import _root_.io.circe._ +import cats.effect._ -import scala.concurrent.ExecutionContext import scala.util.Try import scala.reflect.{ClassTag, classTag} @@ -45,7 +45,7 @@ class S3COGLayerWriter( bucket: String, keyPrefix: String, s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends COGLayerWriter { def writeCOGLayer[ @@ -74,7 +74,7 @@ class S3COGLayerWriter( attributeStore.writeCOGLayerAttributes(layerId0, header, storageMetadata) // Make S3COGAsyncWriter - val asyncWriter = new S3COGAsyncWriter[V](bucket, p => p, executionContext) + val asyncWriter = new S3COGAsyncWriter[V](bucket, p => p, runtime) val retryCheck: Throwable => Boolean = { case e: S3Exception if e.statusCode == 503 => true @@ -135,8 +135,8 @@ object S3COGLayerWriter { class S3COGAsyncWriter[V <: CellGrid[Int]: GeoTiffReader]( bucket: String, putObjectModifier: PutObjectRequest => PutObjectRequest, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext -) extends AsyncWriter[S3Client, GeoTiff[V], (PutObjectRequest, RequestBody)](executionContext) { + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime +) extends AsyncWriter[S3Client, GeoTiff[V], (PutObjectRequest, RequestBody)](runtime) { def readRecord( client: S3Client, diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/geotiff/S3GeoTiffLayerReader.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/geotiff/S3GeoTiffLayerReader.scala index 79c7d08932..b3ca397889 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/geotiff/S3GeoTiffLayerReader.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/geotiff/S3GeoTiffLayerReader.scala @@ -24,10 +24,9 @@ import geotrellis.store.s3.S3ClientProducer import geotrellis.spark.store.hadoop.geotiff.{AttributeStore, GeoTiffLayerReader, GeoTiffMetadata} import geotrellis.util.annotations.experimental +import cats.effect._ import software.amazon.awssdk.services.s3.S3Client -import scala.concurrent.ExecutionContext - /** * @define experimental EXPERIMENTAL@experimental */ @@ -37,9 +36,9 @@ import scala.concurrent.ExecutionContext val resampleMethod: ResampleMethod = NearestNeighbor, val strategy: OverviewStrategy = OverviewStrategy.DEFAULT, s3Client: => S3Client = S3ClientProducer.get(), - executionContext: ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends GeoTiffLayerReader[M] { - implicit lazy val ec: ExecutionContext = executionContext + implicit lazy val ioRuntime: unsafe.IORuntime = runtime } @experimental object S3GeoTiffLayerReader { @@ -49,13 +48,13 @@ import scala.concurrent.ExecutionContext resampleMethod: ResampleMethod = NearestNeighbor, strategy: OverviewStrategy = OverviewStrategy.DEFAULT, s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ): S3GeoTiffLayerReader[M] = new S3GeoTiffLayerReader[M]( attributeStore, layoutScheme, resampleMethod, strategy, s3Client, - executionContext + runtime ) } diff --git a/s3/src/main/scala/geotrellis/store/s3/S3CollectionReader.scala b/s3/src/main/scala/geotrellis/store/s3/S3CollectionReader.scala index ffa813a899..69b2b3500e 100644 --- a/s3/src/main/scala/geotrellis/store/s3/S3CollectionReader.scala +++ b/s3/src/main/scala/geotrellis/store/s3/S3CollectionReader.scala @@ -20,18 +20,17 @@ import geotrellis.layer._ import geotrellis.store.avro.codecs.KeyValueRecordCodec import geotrellis.store.index.MergeQueue import geotrellis.store.avro.{AvroEncoder, AvroRecordCodec} -import geotrellis.store.util.{BlockingThreadPool, IOUtils => GTIOUtils} +import geotrellis.store.util.{IORuntimeTransient, IOUtils => GTIOUtils} +import cats.effect._ import software.amazon.awssdk.services.s3.model._ import software.amazon.awssdk.services.s3.S3Client import org.apache.avro.Schema import org.apache.commons.io.IOUtils -import scala.concurrent.ExecutionContext - class S3CollectionReader( s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends Serializable { def read[ @@ -53,9 +52,9 @@ class S3CollectionReader( queryKeyBounds.flatMap(decomposeBounds) val recordCodec = KeyValueRecordCodec[K, V] - implicit val ec = executionContext + implicit val r = runtime - GTIOUtils.parJoin[K, V](ranges.iterator){ index: BigInt => + GTIOUtils.parJoin[K, V](ranges.iterator) { index: BigInt => try { val getRequest = GetObjectRequest.builder() .bucket(bucket) diff --git a/s3/src/main/scala/geotrellis/store/s3/cog/S3COGCollectionLayerReader.scala b/s3/src/main/scala/geotrellis/store/s3/cog/S3COGCollectionLayerReader.scala index dfaa17bdbb..0138b5b50a 100644 --- a/s3/src/main/scala/geotrellis/store/s3/cog/S3COGCollectionLayerReader.scala +++ b/s3/src/main/scala/geotrellis/store/s3/cog/S3COGCollectionLayerReader.scala @@ -27,12 +27,12 @@ import geotrellis.store.cog._ import geotrellis.store.index._ import geotrellis.store.s3._ +import cats.effect._ import software.amazon.awssdk.services.s3.S3Client import software.amazon.awssdk.services.s3.model._ import java.net.URI -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag /** @@ -43,10 +43,10 @@ import scala.reflect.ClassTag class S3COGCollectionLayerReader( val attributeStore: AttributeStore, s3Client: => S3Client = S3ClientProducer.get(), - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends COGCollectionLayerReader[LayerId] { - @transient implicit lazy val ec = executionContext + @transient implicit lazy val ioRuntime: unsafe.IORuntime = runtime def read[ K: SpatialComponent: Boundable: Decoder: ClassTag, diff --git a/spark/src/main/scala/geotrellis/spark/store/cog/COGLayerReader.scala b/spark/src/main/scala/geotrellis/spark/store/cog/COGLayerReader.scala index 71cfaf508f..8a92ad576f 100644 --- a/spark/src/main/scala/geotrellis/spark/store/cog/COGLayerReader.scala +++ b/spark/src/main/scala/geotrellis/spark/store/cog/COGLayerReader.scala @@ -34,16 +34,16 @@ import org.apache.spark.SparkContext import io.circe._ import io.circe.parser._ import cats.syntax.either._ +import cats.effect._ import java.net.URI import java.util.ServiceLoader -import scala.concurrent.ExecutionContext import scala.reflect._ abstract class COGLayerReader[ID] extends Serializable { - implicit val ec: ExecutionContext + implicit val ioRuntime: unsafe.IORuntime val attributeStore: AttributeStore diff --git a/spark/src/main/scala/geotrellis/spark/store/file/FileRDDReader.scala b/spark/src/main/scala/geotrellis/spark/store/file/FileRDDReader.scala index 4086964beb..4da4f02631 100644 --- a/spark/src/main/scala/geotrellis/spark/store/file/FileRDDReader.scala +++ b/spark/src/main/scala/geotrellis/spark/store/file/FileRDDReader.scala @@ -20,17 +20,16 @@ import geotrellis.layer.{Boundable, KeyBounds} import geotrellis.store.avro.codecs.KeyValueRecordCodec import geotrellis.store.index.{IndexRanges, MergeQueue} import geotrellis.store.avro.{AvroEncoder, AvroRecordCodec} -import geotrellis.store.util.{BlockingThreadPool, IOUtils} +import geotrellis.store.util.{IORuntimeTransient, IOUtils} import geotrellis.spark.util.KryoWrapper import geotrellis.util.Filesystem +import cats.effect._ import org.apache.avro.Schema import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import java.io.File -import scala.concurrent.ExecutionContext - object FileRDDReader { def read[K: AvroRecordCodec: Boundable, V: AvroRecordCodec]( keyPath: BigInt => String, @@ -39,7 +38,7 @@ object FileRDDReader { filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, numPartitions: Option[Int] = None, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime )(implicit sc: SparkContext): RDD[(K, V)] = { if(queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)] @@ -57,7 +56,7 @@ object FileRDDReader { sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(BigInt, BigInt)]] => - implicit val ec: ExecutionContext = executionContext + implicit val ioRuntime: unsafe.IORuntime = runtime partition flatMap { seq => IOUtils.parJoin[K, V](seq.iterator) { index: BigInt => diff --git a/spark/src/main/scala/geotrellis/spark/store/file/cog/FileCOGLayerReader.scala b/spark/src/main/scala/geotrellis/spark/store/file/cog/FileCOGLayerReader.scala index 7df402f959..cd0a586b82 100644 --- a/spark/src/main/scala/geotrellis/spark/store/file/cog/FileCOGLayerReader.scala +++ b/spark/src/main/scala/geotrellis/spark/store/file/cog/FileCOGLayerReader.scala @@ -25,13 +25,13 @@ import geotrellis.store.cog.{Extension, ZoomRange} import geotrellis.store.file.{FileAttributeStore, FileLayerHeader, KeyPathGenerator} import geotrellis.spark.store.cog._ +import cats.effect._ import org.apache.spark.SparkContext import _root_.io.circe._ import java.net.URI import java.io.File -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag /** @@ -42,10 +42,10 @@ import scala.reflect.ClassTag class FileCOGLayerReader( val attributeStore: AttributeStore, val catalogPath: String, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime )(@transient implicit val sc: SparkContext) extends COGLayerReader[LayerId] { - @transient implicit lazy val ec: ExecutionContext = executionContext + @transient implicit lazy val ioRuntime: unsafe.IORuntime = runtime val defaultNumPartitions: Int = sc.defaultParallelism diff --git a/spark/src/main/scala/geotrellis/spark/store/file/geotiff/FileGeoTiffLayerReader.scala b/spark/src/main/scala/geotrellis/spark/store/file/geotiff/FileGeoTiffLayerReader.scala index 9e7deea062..323f29a94b 100644 --- a/spark/src/main/scala/geotrellis/spark/store/file/geotiff/FileGeoTiffLayerReader.scala +++ b/spark/src/main/scala/geotrellis/spark/store/file/geotiff/FileGeoTiffLayerReader.scala @@ -20,11 +20,10 @@ import geotrellis.layer.ZoomedLayoutScheme import geotrellis.raster.resample.{NearestNeighbor, ResampleMethod} import geotrellis.raster.io.geotiff.OverviewStrategy import geotrellis.spark.store.hadoop.geotiff._ -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import geotrellis.util.annotations.experimental - -import scala.concurrent.ExecutionContext +import cats.effect._ /** * @define experimental EXPERIMENTAL@experimental @@ -34,9 +33,9 @@ import scala.concurrent.ExecutionContext val layoutScheme: ZoomedLayoutScheme, val resampleMethod: ResampleMethod = NearestNeighbor, val strategy: OverviewStrategy = OverviewStrategy.DEFAULT, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends GeoTiffLayerReader[M] { - implicit lazy val ec: ExecutionContext = executionContext + implicit lazy val ioRuntime: unsafe.IORuntime = runtime } @experimental object FileGeoTiffLayerReader { @@ -45,7 +44,7 @@ import scala.concurrent.ExecutionContext layoutScheme: ZoomedLayoutScheme, resampleMethod: ResampleMethod = NearestNeighbor, strategy: OverviewStrategy = OverviewStrategy.DEFAULT, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ): FileGeoTiffLayerReader[M] = - new FileGeoTiffLayerReader(attributeStore, layoutScheme, resampleMethod, strategy, executionContext) + new FileGeoTiffLayerReader(attributeStore, layoutScheme, resampleMethod, strategy, runtime) } diff --git a/spark/src/main/scala/geotrellis/spark/store/hadoop/cog/HadoopCOGLayerReader.scala b/spark/src/main/scala/geotrellis/spark/store/hadoop/cog/HadoopCOGLayerReader.scala index 27cc20d3df..22c1911c77 100644 --- a/spark/src/main/scala/geotrellis/spark/store/hadoop/cog/HadoopCOGLayerReader.scala +++ b/spark/src/main/scala/geotrellis/spark/store/hadoop/cog/HadoopCOGLayerReader.scala @@ -28,13 +28,13 @@ import geotrellis.store.hadoop.util._ import geotrellis.store.index.Index import geotrellis.spark.store.cog._ import geotrellis.spark.store.hadoop._ -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient +import cats.effect._ import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext import java.net.URI -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag /** @@ -44,10 +44,10 @@ import scala.reflect.ClassTag */ class HadoopCOGLayerReader( val attributeStore: AttributeStore, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime )(@transient implicit val sc: SparkContext) extends COGLayerReader[LayerId] { - @transient implicit lazy val ec: ExecutionContext = executionContext + @transient implicit lazy val ioRuntime: unsafe.IORuntime = runtime val hadoopConfiguration = SerializableConfiguration(sc.hadoopConfiguration) diff --git a/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/GeoTiffLayerReader.scala b/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/GeoTiffLayerReader.scala index df2ad0b809..45d4e82484 100644 --- a/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/GeoTiffLayerReader.scala +++ b/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/GeoTiffLayerReader.scala @@ -30,11 +30,9 @@ import geotrellis.util.RangeReader import geotrellis.util.annotations.experimental import geotrellis.store.LayerId -import cats.effect.IO -import cats.syntax.apply._ +import cats.effect._ import cats.syntax.either._ -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag /** @@ -46,10 +44,7 @@ import scala.reflect.ClassTag val resampleMethod: ResampleMethod val strategy: OverviewStrategy - implicit val ec: ExecutionContext - - // TODO: runime should be configured - import cats.effect.unsafe.implicits.global + implicit val ioRuntime: unsafe.IORuntime @experimental def read[V <: CellGrid[Int]: GeoTiffReader: ClassTag] (layerId: LayerId) @@ -66,10 +61,10 @@ import scala.reflect.ClassTag val keyExtent: Extent = mapTransform(SpatialKey(x, y)) val index: fs2.Stream[IO, GeoTiffMetadata] = - fs2.Stream.fromIterator[IO](attributeStore.query(layerId.name, ProjectedExtent(keyExtent, layoutScheme.crs)).toIterator, 1) + fs2.Stream.fromBlockingIterator[IO](attributeStore.query(layerId.name, ProjectedExtent(keyExtent, layoutScheme.crs)).toIterator, chunkSize = 1) val readRecord: GeoTiffMetadata => fs2.Stream[IO, Option[Raster[V]]] = { md => - fs2.Stream eval IO { + fs2.Stream eval IO.blocking { val tiff = GeoTiffReader[V].read(RangeReader(md.uri), streaming = true) val reprojectedKeyExtent = keyExtent.reproject(layoutScheme.crs, tiff.crs) diff --git a/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/HadoopGeoTiffLayerReader.scala b/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/HadoopGeoTiffLayerReader.scala index 856d8ce49c..5ce68fa172 100644 --- a/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/HadoopGeoTiffLayerReader.scala +++ b/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/HadoopGeoTiffLayerReader.scala @@ -19,13 +19,12 @@ package geotrellis.spark.store.hadoop.geotiff import geotrellis.layer.ZoomedLayoutScheme import geotrellis.raster.resample.{NearestNeighbor, ResampleMethod} import geotrellis.raster.io.geotiff.OverviewStrategy -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import geotrellis.util.annotations.experimental +import cats.effect._ import org.apache.hadoop.conf.Configuration -import scala.concurrent.ExecutionContext - /** * @define experimental EXPERIMENTAL@experimental */ @@ -35,9 +34,9 @@ import scala.concurrent.ExecutionContext val resampleMethod: ResampleMethod = NearestNeighbor, val strategy: OverviewStrategy = OverviewStrategy.DEFAULT, val conf: Configuration = new Configuration, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends GeoTiffLayerReader[M] { - implicit val ec: ExecutionContext = executionContext + implicit val ioRuntime: unsafe.IORuntime = runtime } @experimental object HadoopGeoTiffLayerReader { @@ -47,7 +46,7 @@ import scala.concurrent.ExecutionContext resampleMethod: ResampleMethod = NearestNeighbor, strategy: OverviewStrategy = OverviewStrategy.DEFAULT, conf: Configuration = new Configuration, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ): HadoopGeoTiffLayerReader[M] = - new HadoopGeoTiffLayerReader(attributeStore, layoutScheme, resampleMethod, strategy, conf, executionContext) + new HadoopGeoTiffLayerReader(attributeStore, layoutScheme, resampleMethod, strategy, conf, runtime) } diff --git a/store/src/main/resources/reference.conf b/store/src/main/resources/reference.conf index 90c129dd99..90aa5c3d03 100644 --- a/store/src/main/resources/reference.conf +++ b/store/src/main/resources/reference.conf @@ -18,6 +18,4 @@ geotrellis { max-size = 1000 enabled = true } - - blocking-thread-pool.threads = default } diff --git a/store/src/main/scala/geotrellis/store/AsyncWriter.scala b/store/src/main/scala/geotrellis/store/AsyncWriter.scala index 51f1cdaefa..2164001948 100644 --- a/store/src/main/scala/geotrellis/store/AsyncWriter.scala +++ b/store/src/main/scala/geotrellis/store/AsyncWriter.scala @@ -16,16 +16,15 @@ package geotrellis.store -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient +import geotrellis.store.util.IOUtils._ -import cats.effect.IO -import cats.syntax.apply._ +import cats.effect._ import cats.syntax.either._ -import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} -abstract class AsyncWriter[Client, V, E](executionContext: => ExecutionContext = BlockingThreadPool.executionContext) extends Serializable { +abstract class AsyncWriter[Client, V, E](runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime) extends Serializable { def readRecord(client: Client, key: String): Try[V] @@ -39,47 +38,49 @@ abstract class AsyncWriter[Client, V, E](executionContext: => ExecutionContext = mergeFunc: Option[(V, V) => V] = None, retryFunc: Option[Throwable => Boolean] ): Unit = { - if (partition.isEmpty) return - val rows: fs2.Stream[IO, (String, V)] = fs2.Stream.fromIterator[IO](partition, 1) + if (partition.nonEmpty) { + implicit val ioRuntime = runtime - def elaborateRow(row: (String, V)): fs2.Stream[IO, (String, V)] = { - val foldUpdate: ((String, V)) => (String, V) = { case newRecord @ (key, newValue) => - mergeFunc match { - case Some(fn) => - // TODO: match on this failure to retry reads - readRecord(client, key) match { - case Success(oldValue) => (key, fn(newValue, oldValue)) - case Failure(_) => newRecord + val rows: fs2.Stream[IO, (String, V)] = fs2.Stream.fromIterator[IO](partition, chunkSize = 1) + + def elaborateRow(row: (String, V)): fs2.Stream[IO, (String, V)] = { + val foldUpdate: ((String, V)) => (String, V) = { + case newRecord@(key, newValue) => + mergeFunc match { + case Some(fn) => + // TODO: match on this failure to retry reads + readRecord(client, key) match { + case Success(oldValue) => (key, fn(newValue, oldValue)) + case Failure(_) => newRecord + } + case None => newRecord } - case None => newRecord } + + fs2.Stream eval IO.blocking(foldUpdate(row)) } - fs2.Stream eval /*IO.shift(ec) *>*/ IO(foldUpdate(row)) - } + def encode(row: (String, V)): fs2.Stream[IO, (String, E)] = { + val (key, value) = row + val encodeTask = IO.blocking((key, encodeRecord(key, value))) + fs2.Stream eval encodeTask + } - def encode(row: (String, V)): fs2.Stream[IO, (String, E)] = { - val (key, value) = row - val encodeTask = IO((key, encodeRecord(key, value))) - fs2.Stream eval /*IO.shift(ec) *>*/ encodeTask - } + def retire(row: (String, E)): fs2.Stream[IO, Try[Long]] = { + val writeTask = IO.blocking(writeRecord(client, row._1, row._2)) + fs2.Stream eval retryFunc.fold(writeTask)(writeTask.retryEBO(_)) + } - def retire(row: (String, E)): fs2.Stream[IO, Try[Long]] = { - val writeTask = IO(writeRecord(client, row._1, row._2)) - import geotrellis.store.util.IOUtils._ - fs2.Stream eval /*IO.shift(ec) *>*/ retryFunc.fold(writeTask)(writeTask.retryEBO(_)) + rows + .flatMap(elaborateRow) + .flatMap(encode) + .map(retire) + .parJoinUnbounded + .compile + .toVector + .attempt + .unsafeRunSync() + .valueOr(throw _) } - - import cats.effect.unsafe.implicits.global - rows - .flatMap(elaborateRow) - .flatMap(encode) - .map(retire) - .parJoinUnbounded - .compile - .toVector - .attempt - .unsafeRunSync() - .valueOr(throw _) } } diff --git a/store/src/main/scala/geotrellis/store/cog/COGCollectionLayerReader.scala b/store/src/main/scala/geotrellis/store/cog/COGCollectionLayerReader.scala index 5a1dcf09e3..f18655a2bf 100644 --- a/store/src/main/scala/geotrellis/store/cog/COGCollectionLayerReader.scala +++ b/store/src/main/scala/geotrellis/store/cog/COGCollectionLayerReader.scala @@ -28,14 +28,14 @@ import geotrellis.util._ import io.circe._ import io.circe.parser._ import cats.syntax.either._ +import cats.effect._ -import scala.concurrent.ExecutionContext import scala.reflect._ import java.net.URI import java.util.ServiceLoader abstract class COGCollectionLayerReader[ID] { self => - implicit val ec: ExecutionContext + implicit val ioRuntime: unsafe.IORuntime val attributeStore: AttributeStore def read[ @@ -208,7 +208,7 @@ object COGCollectionLayerReader { decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], readDefinitions: Map[SpatialKey, Seq[(SpatialKey, Int, TileBounds, Seq[(TileBounds, SpatialKey)])]], numPartitions: Option[Int] = None - )(implicit ec: ExecutionContext): Seq[(K, V)] = { + )(implicit runtime: unsafe.IORuntime): Seq[(K, V)] = { if (baseQueryKeyBounds.isEmpty) return Seq.empty[(K, V)] val ranges = if (baseQueryKeyBounds.length > 1) diff --git a/store/src/main/scala/geotrellis/store/file/FileCollectionLayerReader.scala b/store/src/main/scala/geotrellis/store/file/FileCollectionLayerReader.scala index f0e428a45f..21acdccfd2 100644 --- a/store/src/main/scala/geotrellis/store/file/FileCollectionLayerReader.scala +++ b/store/src/main/scala/geotrellis/store/file/FileCollectionLayerReader.scala @@ -24,8 +24,8 @@ import geotrellis.store.index.Index import geotrellis.util._ import io.circe.Decoder +import cats.effect._ -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag /** @@ -39,10 +39,10 @@ import scala.reflect.ClassTag class FileCollectionLayerReader( val attributeStore: AttributeStore, catalogPath: String, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends CollectionLayerReader[LayerId] { - @transient implicit lazy val ec = executionContext + @transient implicit lazy val ioRuntime = runtime def read[ K: AvroRecordCodec: Boundable: Decoder: ClassTag, diff --git a/store/src/main/scala/geotrellis/store/file/FileCollectionReader.scala b/store/src/main/scala/geotrellis/store/file/FileCollectionReader.scala index bf3ce1d2f6..4bf9868c58 100644 --- a/store/src/main/scala/geotrellis/store/file/FileCollectionReader.scala +++ b/store/src/main/scala/geotrellis/store/file/FileCollectionReader.scala @@ -23,11 +23,10 @@ import geotrellis.store.index.MergeQueue import geotrellis.store.util.IOUtils import geotrellis.util.Filesystem +import cats.effect._ import org.apache.avro.Schema import java.io.File -import scala.concurrent.ExecutionContext - object FileCollectionReader { def read[K: AvroRecordCodec : Boundable, V: AvroRecordCodec]( keyPath: BigInt => String, @@ -35,7 +34,7 @@ object FileCollectionReader { decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None - )(implicit ec: ExecutionContext): Seq[(K, V)] = { + )(implicit runtime: unsafe.IORuntime): Seq[(K, V)] = { if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)] val ranges = if (queryKeyBounds.length > 1) diff --git a/store/src/main/scala/geotrellis/store/file/cog/FileCOGCollectionLayerReader.scala b/store/src/main/scala/geotrellis/store/file/cog/FileCOGCollectionLayerReader.scala index 7ddd93e390..20a783198b 100644 --- a/store/src/main/scala/geotrellis/store/file/cog/FileCOGCollectionLayerReader.scala +++ b/store/src/main/scala/geotrellis/store/file/cog/FileCOGCollectionLayerReader.scala @@ -23,10 +23,9 @@ import geotrellis.store._ import geotrellis.store.util._ import geotrellis.store.cog.{COGCollectionLayerReader, Extension, ZoomRange} import geotrellis.store.file.{FileAttributeStore, KeyPathGenerator} - +import cats.effect._ import _root_.io.circe._ -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag import java.net.URI import java.io.File @@ -39,10 +38,10 @@ import java.io.File class FileCOGCollectionLayerReader( val attributeStore: AttributeStore, val catalogPath: String, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends COGCollectionLayerReader[LayerId] { - @transient implicit lazy val ec: ExecutionContext = executionContext + @transient implicit lazy val ioRuntime: unsafe.IORuntime = runtime def read[ K: SpatialComponent: Boundable: Decoder: ClassTag, diff --git a/store/src/main/scala/geotrellis/store/hadoop/HadoopCollectionLayerReader.scala b/store/src/main/scala/geotrellis/store/hadoop/HadoopCollectionLayerReader.scala index 6f18196400..d1d95aca12 100644 --- a/store/src/main/scala/geotrellis/store/hadoop/HadoopCollectionLayerReader.scala +++ b/store/src/main/scala/geotrellis/store/hadoop/HadoopCollectionLayerReader.scala @@ -19,15 +19,15 @@ package geotrellis.store.hadoop import geotrellis.layer._ import geotrellis.layer.{ContextCollection, Metadata} import geotrellis.store._ -import geotrellis.store.util.BlockingThreadPool +import geotrellis.store.util.IORuntimeTransient import geotrellis.store.avro._ import geotrellis.util._ +import cats.effect._ import io.circe._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import scala.concurrent.ExecutionContext import scala.reflect.ClassTag /** @@ -39,10 +39,10 @@ class HadoopCollectionLayerReader( val attributeStore: AttributeStore, conf: Configuration, maxOpenFiles: Int = 16, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends CollectionLayerReader[LayerId] { - @transient implicit lazy val ec: ExecutionContext = executionContext + @transient implicit lazy val ioRuntime: unsafe.IORuntime = runtime def read[ K: AvroRecordCodec: Boundable: Decoder: ClassTag, diff --git a/store/src/main/scala/geotrellis/store/hadoop/HadoopCollectionReader.scala b/store/src/main/scala/geotrellis/store/hadoop/HadoopCollectionReader.scala index 9ef7b0edae..651c51da35 100644 --- a/store/src/main/scala/geotrellis/store/hadoop/HadoopCollectionReader.scala +++ b/store/src/main/scala/geotrellis/store/hadoop/HadoopCollectionReader.scala @@ -21,20 +21,15 @@ import geotrellis.store.avro._ import geotrellis.store.avro.codecs._ import geotrellis.store.hadoop.formats.FilterMapFileInputFormat import geotrellis.store.util.IOUtils -import geotrellis.store.util.BlockingThreadPool +import cats.effect._ import com.github.blemale.scaffeine.{Cache, Scaffeine} import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io._ -import scala.concurrent.ExecutionContext - -class HadoopCollectionReader( - maxOpenFiles: Int, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext -) extends Serializable { +class HadoopCollectionReader(maxOpenFiles: Int) extends Serializable { val readers: Cache[Path, MapFile.Reader] = Scaffeine() @@ -43,8 +38,6 @@ class HadoopCollectionReader( .removalListener[Path, MapFile.Reader] { case (_, v, _) => v.close() } .build[Path, MapFile.Reader]() - implicit val ec: ExecutionContext = executionContext - private def predicate(row: (Path, BigInt, BigInt), index: BigInt): Boolean = (index >= row._2) && ((index <= row._3) || (row._3 == -1)) @@ -57,7 +50,7 @@ class HadoopCollectionReader( decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)], indexFilterOnly: Boolean, writerSchema: Option[Schema] = None - )(implicit ec: ExecutionContext): Seq[(K, V)] = { + )(implicit runtime: unsafe.IORuntime): Seq[(K, V)] = { if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)] val includeKey = (key: K) => KeyBounds.includeKey(queryKeyBounds, key) diff --git a/store/src/main/scala/geotrellis/store/hadoop/cog/HadoopCOGCollectionLayerReader.scala b/store/src/main/scala/geotrellis/store/hadoop/cog/HadoopCOGCollectionLayerReader.scala index 657ecdfdf2..d93bf2d24c 100644 --- a/store/src/main/scala/geotrellis/store/hadoop/cog/HadoopCOGCollectionLayerReader.scala +++ b/store/src/main/scala/geotrellis/store/hadoop/cog/HadoopCOGCollectionLayerReader.scala @@ -26,6 +26,7 @@ import geotrellis.store.hadoop.{HadoopAttributeStore, SerializableConfiguration} import geotrellis.store.hadoop.util._ import geotrellis.store.index.Index +import cats.effect._ import _root_.io.circe._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -33,8 +34,6 @@ import org.apache.hadoop.fs.Path import scala.reflect.ClassTag import java.net.URI -import scala.concurrent.ExecutionContext - /** * Handles reading raster RDDs and their metadata from HDFS. * @@ -44,12 +43,12 @@ class HadoopCOGCollectionLayerReader( val attributeStore: AttributeStore, val catalogPath: String, val conf: Configuration = new Configuration, - executionContext: => ExecutionContext = BlockingThreadPool.executionContext + runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime ) extends COGCollectionLayerReader[LayerId] { val serConf: SerializableConfiguration = SerializableConfiguration(conf) - @transient implicit lazy val ec: ExecutionContext = executionContext + @transient implicit lazy val ioRuntime: unsafe.IORuntime = ioRuntime def read[ K: SpatialComponent: Boundable: Decoder: ClassTag, diff --git a/store/src/main/scala/geotrellis/store/util/BlockingThreadPool.scala b/store/src/main/scala/geotrellis/store/util/BlockingThreadPool.scala deleted file mode 100644 index db5cac89e1..0000000000 --- a/store/src/main/scala/geotrellis/store/util/BlockingThreadPool.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2019 Azavea - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package geotrellis.store.util - -import org.apache.commons.lang3.concurrent.BasicThreadFactory -import pureconfig._ - -import java.util.concurrent.{ExecutorService, Executors} -import scala.concurrent.ExecutionContext -import scala.util.{Failure, Success, Try} - -object BlockingThreadPool extends Serializable { - case class Config(threads: Int = Runtime.getRuntime.availableProcessors) - - implicit val configReader: ConfigReader[Config] = ConfigReader.fromCursor[Config] { cur => - cur.fluent.at("threads").asString match { - case Right("default") => Right(Config()) - case Right(th) => Try(th.toInt) match { - case Success(threads) => Right(Config(threads)) - case Failure(_) => Right(Config()) - } - case Left(_) => Right(Config()) - } - } - - lazy val conf: Config = ConfigSource.default.at("geotrellis.blocking-thread-pool").loadOrThrow[Config] - implicit def blockingThreadPoolToConf(obj: BlockingThreadPool.type): Config = conf - - @transient lazy val pool: ExecutorService = - Executors.newFixedThreadPool( - conf.threads, - new BasicThreadFactory.Builder().namingPattern("geotrellis-default-io-%d").build() - ) - @transient lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutor(pool) -} diff --git a/store/src/main/scala/geotrellis/store/util/IORuntimeTransient.scala b/store/src/main/scala/geotrellis/store/util/IORuntimeTransient.scala new file mode 100644 index 0000000000..de847b5d37 --- /dev/null +++ b/store/src/main/scala/geotrellis/store/util/IORuntimeTransient.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2019 Azavea + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package geotrellis.store.util + +import cats.effect._ + +object IORuntimeTransient extends Serializable { + val ThreadsNumber: Int = Runtime.getRuntime.availableProcessors + @transient lazy val IORuntime: unsafe.IORuntime = unsafe.IORuntime.global +} diff --git a/store/src/main/scala/geotrellis/store/util/IOUtils.scala b/store/src/main/scala/geotrellis/store/util/IOUtils.scala index 5c51327fab..79df3f6419 100644 --- a/store/src/main/scala/geotrellis/store/util/IOUtils.scala +++ b/store/src/main/scala/geotrellis/store/util/IOUtils.scala @@ -17,12 +17,12 @@ package geotrellis.store.util import cats.effect._ +import cats.effect.syntax.temporal._ import cats.syntax.apply._ import cats.syntax.either._ import cats.syntax.applicativeError._ -import cats.{ApplicativeError} +import cats.ApplicativeError -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.Random @@ -42,8 +42,8 @@ object IOUtils { val actualDelay = FiniteDuration(timeout.toMillis, MILLISECONDS) ioa.handleErrorWith { error: Throwable => - if(p(error)) Temporal[F].sleep(actualDelay) *> help(count + 1) - else Async[F].raiseError(error) + if(p(error)) help(count + 1).andWait(actualDelay) + else error.raiseError } } help(0) @@ -52,25 +52,23 @@ object IOUtils { def parJoin[K, V](ranges: Iterator[(BigInt, BigInt)]) (readFunc: BigInt => Vector[(K, V)]) - (implicit ec: ExecutionContext): Vector[(K, V)] = + (implicit runtime: unsafe.IORuntime): Vector[(K, V)] = parJoinEBO[K, V](ranges)(readFunc)(_ => false) private[geotrellis] def parJoinEBO[K, V](ranges: Iterator[(BigInt, BigInt)]) (readFunc: BigInt => Vector[(K, V)]) (backOffPredicate: Throwable => Boolean) - (implicit ec: ExecutionContext): Vector[(K, V)] = { + (implicit runtime: unsafe.IORuntime): Vector[(K, V)] = { val indices: Iterator[BigInt] = ranges.flatMap { case (start, end) => (start to end).iterator } - val index: fs2.Stream[IO, BigInt] = fs2.Stream.fromIterator[IO](indices, 1) + val index: fs2.Stream[IO, BigInt] = fs2.Stream.fromIterator[IO](indices, chunkSize = 1) val readRecord: BigInt => fs2.Stream[IO, Vector[(K, V)]] = { index => - fs2.Stream eval IO { readFunc(index) }.retryEBO { backOffPredicate } + fs2.Stream eval IO.blocking { readFunc(index) }.retryEBO { backOffPredicate } } - // TODO: runime should be configured - import cats.effect.unsafe.implicits.global index .map(readRecord) .parJoinUnbounded