Skip to content

Commit

Permalink
Init migration to CE3
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed May 4, 2021
1 parent cdcac99 commit b7a1ea7
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ class SocketWriteStrategy(
kvPairs.foreachPartition { partition =>
if(partition.nonEmpty) {
implicit val ec = executionContext
implicit val cs = IO.contextShift(ec)
// TODO: runime should be configured
import cats.effect.unsafe.implicits.global

val writer = instance.connector.createBatchWriter(table, kwConfig.value)

Expand All @@ -129,10 +130,10 @@ class SocketWriteStrategy(
val mutation = new Mutation(key.getRow)
mutation.put(key.getColumnFamily, key.getColumnQualifier, System.currentTimeMillis(), value)
mutation
}
}, 1
)

val write = { mutation: Mutation => fs2.Stream eval IO.shift(ec) *> IO { writer.addMutation(mutation) } }
val write = { mutation: Mutation => fs2.Stream eval IO { writer.addMutation(mutation) } }

(mutations map write)
.parJoinUnbounded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ object AccumuloCollectionReader {
val ranges = queryKeyBounds.flatMap(decomposeBounds).iterator

implicit val ec = executionContext
implicit val cs = IO.contextShift(ec)
// TODO: runime should be configured
import cats.effect.unsafe.implicits.global

val range: fs2.Stream[IO, AccumuloRange] = fs2.Stream.fromIterator[IO](ranges)
val range: fs2.Stream[IO, AccumuloRange] = fs2.Stream.fromIterator[IO](ranges, 1)

val read = { range: AccumuloRange => fs2.Stream eval IO.shift(ec) *> IO {
val read = { range: AccumuloRange => fs2.Stream eval IO {
val scanner = instance.connector.createScanner(table, new Authorizations())
scanner.setRange(range)
scanner.fetchColumnFamily(columnFamily)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,15 @@ object CassandraRDDWriter {

val rows: fs2.Stream[IO, (BigInt, Vector[(K,V)])] =
fs2.Stream.fromIterator[IO](
partition.map { case (key, value) => (key, value.toVector) }
partition.map { case (key, value) => (key, value.toVector) }, 1
)

implicit val ec = executionContext
implicit val cs = IO.contextShift(ec)
// TODO: runime should be configured
import cats.effect.unsafe.implicits.global

def elaborateRow(row: (BigInt, Vector[(K,V)])): fs2.Stream[IO, (BigInt, Vector[(K,V)])] = {
fs2.Stream eval IO.shift(ec) *> IO ({
fs2.Stream eval IO {
val (key, current) = row
val updated = LayerWriter.updateRecords(mergeFunc, current, existing = {
val oldRow = session.execute(readStatement.bind(key: BigInteger))
Expand All @@ -129,22 +130,22 @@ object CassandraRDDWriter {
})

(key, updated)
})
}
}

def rowToBytes(row: (BigInt, Vector[(K,V)])): fs2.Stream[IO, (BigInt, ByteBuffer)] = {
fs2.Stream eval IO.shift(ec) *> IO ({
fs2.Stream eval IO {
val (key, kvs) = row
val bytes = ByteBuffer.wrap(AvroEncoder.toBinary(kvs)(codec))
(key, bytes)
})
}
}

def retire(row: (BigInt, ByteBuffer)): fs2.Stream[IO, ResultSet] = {
val (id, value) = row
fs2.Stream eval IO.shift(ec) *> IO ({
fs2.Stream eval IO {
session.execute(writeStatement.bind(id: BigInteger, value))
})
}
}

val results = rows
Expand All @@ -153,7 +154,7 @@ object CassandraRDDWriter {
.map(retire)
.parJoinUnbounded
.onComplete {
fs2.Stream eval IO.shift(ec) *> IO {
fs2.Stream eval IO {
session.closeAsync()
session.getCluster.closeAsync()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import geotrellis.spark.store.hadoop._
import geotrellis.spark.testkit._
import geotrellis.store.hadoop._

import cats.effect.{ContextShift, IO}
import cats.effect.IO
import cats.implicits._
import spire.syntax.cfor._
import org.apache.spark.rdd.RDD
Expand All @@ -42,6 +42,8 @@ import scala.concurrent.ExecutionContext

class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with BeforeAndAfterAll {
import geotrellis.GDALTestUtils._
// TODO: runime should be configured
import cats.effect.unsafe.implicits.global

val uri = gdalGeoTiffPath("vlm/aspect-tiled.tif")
def filePathByIndex(i: Int): String = sparkGeoTiffPath(s"vlm/aspect-tiled-$i.tif")
Expand Down Expand Up @@ -209,7 +211,7 @@ class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with Befor
assertRDDLayersEqual(reprojectedExpectedRDDGDAL, reprojectedSourceRDD, true)
}

def parallelSpec(n: Int = 1000)(implicit cs: ContextShift[IO]): List[RasterSource] = {
def parallelSpec(n: Int = 1000): List[RasterSource] = {
println(java.lang.Thread.activeCount())

/** Functions to trigger Datasets computation */
Expand Down Expand Up @@ -277,7 +279,6 @@ class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with Befor

it("should not fail on parallelization with a fork join pool") {
val i = 1000
implicit val cs = IO.contextShift(ExecutionContext.global)

parallelSpec(i)
}
Expand All @@ -287,7 +288,6 @@ class GDALRasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with Befor
val n = 100
val pool = Executors.newFixedThreadPool(n)
val ec = ExecutionContext.fromExecutor(pool)
implicit val cs = IO.contextShift(ec)

parallelSpec(i)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import org.slf4j.LoggerFactory
object IngestGeoTiff {
private val logger = LoggerFactory.getLogger(this.getClass())

// TODO: runime should be configured
import cats.effect.unsafe.implicits.global

def apply(params: IngestParameters[TilingBounds]): List[WriteResults] =
sync[IO](params).unsafeRunSync()

Expand Down
8 changes: 4 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ object Dependencies {

def cats(module: String) = Def.setting {
module match {
case "effect" => "org.typelevel" %% s"cats-$module" % "2.3.3"
case _ => "org.typelevel" %% s"cats-$module" % "2.4.2"
case "effect" => "org.typelevel" %% s"cats-$module" % "3.1.0"
case _ => "org.typelevel" %% s"cats-$module" % "2.6.0"
}
}

Expand All @@ -60,7 +60,7 @@ object Dependencies {
}

def fs2(module: String) = Def.setting {
"co.fs2" %% s"fs2-$module" % "2.5.3"
"co.fs2" %% s"fs2-$module" % "3.0.2"
}

def apacheSpark(module: String) = Def.setting {
Expand All @@ -69,7 +69,7 @@ object Dependencies {

def scalaReflect(version: String) = "org.scala-lang" % "scala-reflect" % version

val pureconfig = "com.github.pureconfig" %% "pureconfig" % "0.14.0"
val pureconfig = "com.github.pureconfig" %% "pureconfig" % "0.15.0"
val log4s = "org.log4s" %% "log4s" % "1.9.0"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.5"
val scalacheck = "org.scalacheck" %% "scalacheck" % "1.15.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,15 @@ class S3RDDWriter(
val schema = kwWriterSchema.value.getOrElse(_recordCodec.schema)

implicit val ec = excutionContext
implicit val timer = IO.timer(ec)
implicit val cs = IO.contextShift(ec)

// TODO: runime should be configured
import cats.effect.unsafe.implicits.global

val rows: fs2.Stream[IO, (String, Vector[(K, V)])] =
fs2.Stream.fromIterator[IO](
partition.map { case (key, value) => (key, value.toVector) }
)
fs2.Stream.fromIterator[IO](partition.map { case (key, value) => (key, value.toVector) }, 1)

def elaborateRow(row: (String, Vector[(K,V)])): fs2.Stream[IO, (String, Vector[(K,V)])] = {
fs2.Stream eval IO.shift(ec) *> IO ({
fs2.Stream eval IO {
val (key, current) = row
val updated = LayerWriter.updateRecords(mergeFunc, current, existing = {
try {
Expand All @@ -106,11 +105,11 @@ class S3RDDWriter(
}
})
(key, updated)
})
}
}

def rowToRequest(row: (String, Vector[(K,V)])): fs2.Stream[IO, (PutObjectRequest, RequestBody)] = {
fs2.Stream eval IO.shift(ec) *> IO ({
fs2.Stream eval IO {
val (key, kvs) = row
val contentBytes = AvroEncoder.toBinary(kvs)(_codec)
val request = PutObjectRequest.builder()
Expand All @@ -121,11 +120,11 @@ class S3RDDWriter(
val requestBody = RequestBody.fromBytes(contentBytes)

(putObjectModifier(request), requestBody)
})
}
}

def retire(request: PutObjectRequest, requestBody: RequestBody): fs2.Stream[IO, PutObjectResponse] =
fs2.Stream eval IO.shift(ec) *> IO { s3Client.putObject(request, requestBody) }
fs2.Stream eval IO { s3Client.putObject(request, requestBody) }

rows
.flatMap(elaborateRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,18 @@ object SaveToS3 {
val requestBody = RequestBody.fromBytes(bytes)

(putObjectModifier(request), requestBody)
}
}, 1
)

implicit val ec = executionContext
implicit val timer = IO.timer(ec)
implicit val cs = IO.contextShift(ec)

// TODO: runime should be configured
import cats.effect.unsafe.implicits.global

import geotrellis.store.util.IOUtils._
val write: (PutObjectRequest, RequestBody) => fs2.Stream[IO, PutObjectResponse] =
(request, requestBody) => {
fs2.Stream eval IO.shift(ec) *> IO {
fs2.Stream eval IO {
//request.getInputStream.reset() // reset in case of retransmission to avoid 400 error
s3client.putObject(request, requestBody)
}.retryEBO {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ import scala.reflect.ClassTag
val strategy: OverviewStrategy

implicit val ec: ExecutionContext
implicit val cs = IO.contextShift(ec)

// TODO: runime should be configured
import cats.effect.unsafe.implicits.global

@experimental def read[V <: CellGrid[Int]: GeoTiffReader: ClassTag]
(layerId: LayerId)
Expand All @@ -64,10 +66,10 @@ import scala.reflect.ClassTag
val keyExtent: Extent = mapTransform(SpatialKey(x, y))

val index: fs2.Stream[IO, GeoTiffMetadata] =
fs2.Stream.fromIterator[IO](attributeStore.query(layerId.name, ProjectedExtent(keyExtent, layoutScheme.crs)).toIterator)
fs2.Stream.fromIterator[IO](attributeStore.query(layerId.name, ProjectedExtent(keyExtent, layoutScheme.crs)).toIterator, 1)

val readRecord: GeoTiffMetadata => fs2.Stream[IO, Option[Raster[V]]] = { md =>
fs2.Stream eval IO.shift(ec) *> IO {
fs2.Stream eval IO {
val tiff = GeoTiffReader[V].read(RangeReader(md.uri), streaming = true)
val reprojectedKeyExtent = keyExtent.reproject(layoutScheme.crs, tiff.crs)

Expand Down Expand Up @@ -104,10 +106,10 @@ import scala.reflect.ClassTag
.layout

val index: fs2.Stream[IO, GeoTiffMetadata] =
fs2.Stream.fromIterator[IO](attributeStore.query(layerId.name).toIterator)
fs2.Stream.fromIterator[IO](attributeStore.query(layerId.name).toIterator, 1)

val readRecord: GeoTiffMetadata => fs2.Stream[IO, Raster[V]] = { md =>
fs2.Stream eval IO.shift(ec) *> IO {
fs2.Stream eval IO {
val tiff = GeoTiffReader[V].read(RangeReader(md.uri), streaming = true)
tiff
.crop(tiff.extent, layout.cellSize)
Expand Down
15 changes: 5 additions & 10 deletions store/src/main/scala/geotrellis/store/AsyncWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,7 @@ abstract class AsyncWriter[Client, V, E](executionContext: => ExecutionContext =
retryFunc: Option[Throwable => Boolean]
): Unit = {
if (partition.isEmpty) return

// TODO: remove the implicit on ec and consider moving the implicit timer to method signature
implicit val ec = executionContext
implicit val timer = IO.timer(ec)
implicit val cs = IO.contextShift(ec)

val rows: fs2.Stream[IO, (String, V)] = fs2.Stream.fromIterator[IO](partition)
val rows: fs2.Stream[IO, (String, V)] = fs2.Stream.fromIterator[IO](partition, 1)

def elaborateRow(row: (String, V)): fs2.Stream[IO, (String, V)] = {
val foldUpdate: ((String, V)) => (String, V) = { case newRecord @ (key, newValue) =>
Expand All @@ -61,21 +55,22 @@ abstract class AsyncWriter[Client, V, E](executionContext: => ExecutionContext =
}
}

fs2.Stream eval IO.shift(ec) *> IO(foldUpdate(row))
fs2.Stream eval /*IO.shift(ec) *>*/ IO(foldUpdate(row))
}

def encode(row: (String, V)): fs2.Stream[IO, (String, E)] = {
val (key, value) = row
val encodeTask = IO((key, encodeRecord(key, value)))
fs2.Stream eval IO.shift(ec) *> encodeTask
fs2.Stream eval /*IO.shift(ec) *>*/ encodeTask
}

def retire(row: (String, E)): fs2.Stream[IO, Try[Long]] = {
val writeTask = IO(writeRecord(client, row._1, row._2))
import geotrellis.store.util.IOUtils._
fs2.Stream eval IO.shift(ec) *> retryFunc.fold(writeTask)(writeTask.retryEBO(_))
fs2.Stream eval /*IO.shift(ec) *>*/ retryFunc.fold(writeTask)(writeTask.retryEBO(_))
}

import cats.effect.unsafe.implicits.global
rows
.flatMap(elaborateRow)
.flatMap(encode)
Expand Down
22 changes: 12 additions & 10 deletions store/src/main/scala/geotrellis/store/util/IOUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package geotrellis.store.util

import cats.effect._
import cats.syntax.all._
import cats.syntax.apply._
import cats.syntax.either._
import cats.syntax.applicativeError._
import cats.{ApplicativeError}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand All @@ -27,7 +30,7 @@ object IOUtils {
/**
* Implement non-blocking Exponential Backoff on a Task.
*/
implicit class IOBackoff[A, F[_]: Effect: Timer: Sync](ioa: F[A]) {
implicit class IOBackoff[A, F[_]: ApplicativeError[*[_], Throwable]: Temporal: Async](ioa: F[A]) {
/**
* @param p returns true for exceptions that trigger a backoff and retry
* @return
Expand All @@ -38,9 +41,9 @@ object IOUtils {
val timeout = base * Random.nextInt(math.pow(2, count).toInt) // .extInt is [), implying -1
val actualDelay = FiniteDuration(timeout.toMillis, MILLISECONDS)

ioa.handleErrorWith { error =>
if(p(error)) implicitly[Timer[F]].sleep(actualDelay) *> help(count + 1)
else implicitly[Sync[F]].raiseError(error)
ioa.handleErrorWith { error: Throwable =>
if(p(error)) Temporal[F].sleep(actualDelay) *> help(count + 1)
else Async[F].raiseError(error)
}
}
help(0)
Expand All @@ -56,19 +59,18 @@ object IOUtils {
(readFunc: BigInt => Vector[(K, V)])
(backOffPredicate: Throwable => Boolean)
(implicit ec: ExecutionContext): Vector[(K, V)] = {
implicit val timer = IO.timer(ec)
implicit val cs = IO.contextShift(ec)

val indices: Iterator[BigInt] = ranges.flatMap { case (start, end) =>
(start to end).iterator
}

val index: fs2.Stream[IO, BigInt] = fs2.Stream.fromIterator[IO](indices)
val index: fs2.Stream[IO, BigInt] = fs2.Stream.fromIterator[IO](indices, 1)

val readRecord: BigInt => fs2.Stream[IO, Vector[(K, V)]] = { index =>
fs2.Stream eval IO.shift(ec) *> IO { readFunc(index) }.retryEBO { backOffPredicate }
fs2.Stream eval IO { readFunc(index) }.retryEBO { backOffPredicate }
}

// TODO: runime should be configured
import cats.effect.unsafe.implicits.global
index
.map(readRecord)
.parJoinUnbounded
Expand Down

0 comments on commit b7a1ea7

Please sign in to comment.