Skip to content

Commit

Permalink
Migration to CE3 and other major dependencies upgrade (#3389)
Browse files Browse the repository at this point in the history
* Migration to CE3

* Bump dependencies versions up

* Bump Cassandra and Spark versions up

* Fix invalid vector tests

* Use Cassandra async execution to avoid using blocking API

* Revert graceful JTS fall back and lazy Circe encoders

* Bump SBT version up to 1.7.1

* Remove the unused CassandraAttributeStore.fetch method

* Make Cassandra toBigInteger cast less ambiguous

* CHANGELOG.md update
  • Loading branch information
pomadchin authored Jul 16, 2022
1 parent 41678b0 commit 0276d78
Show file tree
Hide file tree
Showing 71 changed files with 683 additions and 797 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ workflows:
- scaladocs:
matrix:
parameters:
scala-version: ["2.12.15"]
scala-version: ["2.12.16"]
filters:
branches:
only: master
Expand Down
2 changes: 1 addition & 1 deletion .github/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
network_mode: host

cassandra:
image: cassandra:3.11.6
image: cassandra:4.0.4
environment:
- _JAVA_OPTIONS=-Xms1m -Xmx512m
- MAX_HEAP_SIZE=512m
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
name: Build and Test
strategy:
matrix:
scala: ["2.12.15", "2.13.7"]
scala: ["2.12.16", "2.13.8"]
runs-on: ubuntu-latest

env:
Expand Down
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Add RasterSourceRDD.tiledLayerRDD within the geometry [#3474](https://github.com/locationtech/geotrellis/pull/3474)
- Add RasterSourceRDD.tiledLayerRDD within the geometry intersection [#3474](https://github.com/locationtech/geotrellis/pull/3474)

### Changed
- Migration to CE3 and other major dependencies upgrade [#3389](https://github.com/locationtech/geotrellis/pull/3389)
- Revert graceful JTS fall back and lazy Circe encoders [#3463](https://github.com/locationtech/geotrellis/issues/3463)
- Update Cassandra up to 4.x [#3382](https://github.com/locationtech/geotrellis/issues/3382)

## [3.6.3] - 2022-07-12

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package geotrellis.spark.store.accumulo
import geotrellis.store.accumulo._
import geotrellis.store.hadoop.util._
import geotrellis.spark.util._
import geotrellis.store.util.BlockingThreadPool
import geotrellis.store.util.IORuntimeTransient

import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.fs.Path
Expand All @@ -28,14 +28,11 @@ import org.apache.accumulo.core.data.{Key, Mutation, Value}
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat
import org.apache.accumulo.core.client.BatchWriterConfig

import cats.effect.IO
import cats.syntax.apply._
import cats.effect._
import cats.syntax.either._

import java.util.UUID

import scala.concurrent.ExecutionContext

object AccumuloWriteStrategy {
def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest")
}
Expand Down Expand Up @@ -110,29 +107,28 @@ object HdfsWriteStrategy {
* @param config Configuration for the BatchWriters
*/
class SocketWriteStrategy(
@transient config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(BlockingThreadPool.threads),
executionContext: => ExecutionContext = BlockingThreadPool.executionContext
@transient config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(IORuntimeTransient.ThreadsNumber),
runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime
) extends AccumuloWriteStrategy {
val kwConfig = KryoWrapper(config) // BatchWriterConfig is not java serializable

def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit = {
kvPairs.foreachPartition { partition =>
if(partition.nonEmpty) {
implicit val ec = executionContext
implicit val cs = IO.contextShift(ec)
implicit val ioRuntime: unsafe.IORuntime = runtime

val writer = instance.connector.createBatchWriter(table, kwConfig.value)

try {
val mutations: fs2.Stream[IO, Mutation] = fs2.Stream.fromIterator[IO](
val mutations: fs2.Stream[IO, Mutation] = fs2.Stream.fromBlockingIterator[IO](
partition.map { case (key, value) =>
val mutation = new Mutation(key.getRow)
mutation.put(key.getColumnFamily, key.getColumnQualifier, System.currentTimeMillis(), value)
mutation
}
}, chunkSize = 1
)

val write = { mutation: Mutation => fs2.Stream eval IO.shift(ec) *> IO { writer.addMutation(mutation) } }
val write = { mutation: Mutation => fs2.Stream eval IO.blocking { writer.addMutation(mutation) } }

(mutations map write)
.parJoinUnbounded
Expand All @@ -149,7 +145,7 @@ class SocketWriteStrategy(

object SocketWriteStrategy {
def apply(
config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(BlockingThreadPool.threads),
executionContext: => ExecutionContext = BlockingThreadPool.executionContext
): SocketWriteStrategy = new SocketWriteStrategy(config, executionContext)
config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(IORuntimeTransient.ThreadsNumber),
runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime
): SocketWriteStrategy = new SocketWriteStrategy(config, runtime)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ package geotrellis.store.accumulo
import geotrellis.layer._
import geotrellis.store.avro.codecs.KeyValueRecordCodec
import geotrellis.store.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.store.util.BlockingThreadPool
import geotrellis.store.util.IORuntimeTransient
import org.apache.accumulo.core.data.{Range => AccumuloRange}
import org.apache.accumulo.core.security.Authorizations
import org.apache.avro.Schema
import org.apache.hadoop.io.Text

import cats.effect._
import cats.syntax.apply._
import cats.syntax.either._

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag

object AccumuloCollectionReader {
Expand All @@ -41,7 +39,7 @@ object AccumuloCollectionReader {
decomposeBounds: KeyBounds[K] => Seq[AccumuloRange],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
executionContext: => ExecutionContext = BlockingThreadPool.executionContext
runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime
)(implicit instance: AccumuloInstance): Seq[(K, V)] = {
if(queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

Expand All @@ -50,12 +48,11 @@ object AccumuloCollectionReader {

val ranges = queryKeyBounds.flatMap(decomposeBounds).iterator

implicit val ec = executionContext
implicit val cs = IO.contextShift(ec)
implicit val ioRuntime: unsafe.IORuntime = runtime

val range: fs2.Stream[IO, AccumuloRange] = fs2.Stream.fromIterator[IO](ranges)
val range: fs2.Stream[IO, AccumuloRange] = fs2.Stream.fromIterator[IO](ranges, chunkSize = 1)

val read = { range: AccumuloRange => fs2.Stream eval IO.shift(ec) *> IO {
val read = { range: AccumuloRange => fs2.Stream eval IO.blocking {
val scanner = instance.connector.createScanner(table, new Authorizations())
scanner.setRange(range)
scanner.fetchColumnFamily(columnFamily)
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import sbt.Keys._

ThisBuild / versionScheme := Some("semver-spec")
ThisBuild / scalaVersion := "2.12.15"
ThisBuild / scalaVersion := "2.12.16"
ThisBuild / organization := "org.locationtech.geotrellis"
ThisBuild / crossScalaVersions := List("2.12.15", "2.13.8")
ThisBuild / crossScalaVersions := List("2.12.16", "2.13.8")

lazy val root = Project("geotrellis", file("."))
.aggregate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@ import geotrellis.store.cassandra._
import geotrellis.store.avro.codecs.KeyValueRecordCodec
import geotrellis.store.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.store.index.{IndexRanges, MergeQueue}
import geotrellis.store.util.{BlockingThreadPool, IOUtils}
import geotrellis.store.util.{IORuntimeTransient, IOUtils}
import geotrellis.spark.util.KryoWrapper

import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}
import cats.effect._
import com.datastax.oss.driver.api.querybuilder.QueryBuilder
import com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal
import org.apache.avro.Schema
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import java.math.BigInteger

import scala.concurrent.ExecutionContext

object CassandraRDDReader {
def read[K: Boundable : AvroRecordCodec : ClassTag, V: AvroRecordCodec : ClassTag](
instance: CassandraInstance,
Expand All @@ -48,7 +46,7 @@ object CassandraRDDReader {
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
numPartitions: Option[Int] = None,
executionContext: => ExecutionContext = BlockingThreadPool.executionContext
runtime: => unsafe.IORuntime = IORuntimeTransient.IORuntime
)(implicit sc: SparkContext): RDD[(K, V)] = {
if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)]

Expand All @@ -63,35 +61,35 @@ object CassandraRDDReader {

val bins = IndexRanges.bin(ranges, numPartitions.getOrElse(sc.defaultParallelism))

val query = QueryBuilder.select("value")
.from(keyspace, table)
.where(eqs("key", QueryBuilder.bindMarker()))
.and(eqs("name", layerId.name))
.and(eqs("zoom", layerId.zoom))
.toString
val query = QueryBuilder
.selectFrom(keyspace, table)
.column("value")
.whereColumn("key").isEqualTo(QueryBuilder.bindMarker())
.whereColumn("name").isEqualTo(literal(layerId.name))
.whereColumn("zoom").isEqualTo(literal(layerId.zoom))
.asCql()

sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(BigInt, BigInt)]] =>
instance.withSession { session =>
implicit val ec = executionContext
implicit val ioRuntime: unsafe.IORuntime = runtime
val statement = session.prepare(query)

val result = partition map { seq =>
IOUtils.parJoin[K, V](seq.iterator) { index: BigInt =>
val row = session.execute(statement.bind(index: BigInteger))
if (row.asScala.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) recs
else recs.filter { row => includeKey(row._1) }
} else Vector.empty
IOUtils.parJoinIO[K, V](seq.iterator) { index: BigInt =>
session.executeF[IO](statement.bind(index.asJava)).map { row =>
if (row.nonEmpty) {
val bytes = row.one().getByteBuffer("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) recs
else recs.filter { row => includeKey(row._1) }
} else Vector.empty
}
}
}

/** Close partition session */
(result ++ Iterator({
session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)]
})).flatten
(result ++ Iterator({ session.closeAsync(); Seq.empty[(K, V)] })).flatten
}
}
}
Expand Down
Loading

0 comments on commit 0276d78

Please sign in to comment.