Skip to content
This repository has been archived by the owner on Apr 19, 2021. It is now read-only.

Update scalatest to 3.1.4 #70

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
version: 2
jobs:
build:
branches:
ignore:
- master
environment:
- JAVA_OPTS: "-XX:ReservedCodeCacheSize=256M -Xms1g -Xmx3g -Xss2m"
- SBT_TEST_TIME_FACTOR: "2"
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ data/
.DS_Store
.credentials
.gpgCredentials
/.bsp/sbt.json
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 2.4.2
version = 2.7.5
style = defaultWithAlign
danglingParentheses = true
indentOperator = spray
Expand Down
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# akka-persistence-kafka

[![CircleCI](https://circleci.com/gh/j5ik2o/akka-persistence-kafka/tree/master.svg?style=shield&circle-token=c809688daf71f6ae582dd2d58cb5518401498373)](https://circleci.com/gh/j5ik2o/akka-persistence-kafka/tree/master)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.github.j5ik2o/akka-persistence-kafka_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.github.j5ik2o/akka-persistence-kafka_2.12)
[![Scaladoc](http://javadoc-badge.appspot.com/com.github.j5ik2o/akka-persistence-kafka_2.12.svg?label=scaladoc)](http://javadoc-badge.appspot.com/com.github.j5ik2o/akka-persistence-kafka_2.12/com/github/j5ik2o/akka/persistence/kafka/index.html?javadocio=true)
[![CircleCI](https://circleci.com/gh/pawelkaczor/akka-persistence-kafka/tree/master.svg?style=shield&circle-token=c809688daf71f6ae582dd2d58cb5518401498373)](https://circleci.com/gh/pawelkaczor/akka-persistence-kafka/tree/master)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/pl.newicom/akka-persistence-kafka_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/pl.newicom/akka-persistence-kafka_2.12)
[![Scaladoc](http://javadoc-badge.appspot.com/pl.newicom/akka-persistence-kafka_2.12.svg?label=scaladoc)](http://javadoc-badge.appspot.com/pl.newicom/akka-persistence-kafka_2.12/pl/newicom/akka/persistence/kafka/index.html?javadocio=true)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

akka-persistence-kafka writes journal and snapshot entries to Kafka.
Expand All @@ -27,7 +27,7 @@ resolvers += "Sonatype OSS Release Repository" at "https://oss.sonatype.org/cont
val version = "..."

libraryDependencies += Seq(
"com.github.j5ik2o" %% "akka-persistence-kafka" % version
"pl.newicom" %% "akka-persistence-kafka" % version
)
```

Expand All @@ -45,9 +45,9 @@ j5ik2o {
kafka-journal {
topic-prefix = "journal-"
# if need customize, default is persistence-id
topic-resolver-class-name = "com.github.j5ik2o.akka.persistence.kafka.resolver.KafkaTopicResolver$PersistenceId"
topic-resolver-class-name = "pl.newicom.akka.persistence.kafka.resolver.KafkaTopicResolver$PersistenceId"
# if need customize, default is partion 0
partition-resolver-class-name = "com.github.j5ik2o.akka.persistence.kafka.resolver.KafkaPartitionResolver$PartitionZero"
partition-resolver-class-name = "pl.newicom.akka.persistence.kafka.resolver.KafkaPartitionResolver$PartitionZero"

producer {
kafka-clients {
Expand All @@ -65,9 +65,9 @@ j5ik2o {
kafka-snapshot-store {
topic-prefix = "snapshot-"
# if need customize, default is persistence-id
topic-resolver-class-name = "com.github.j5ik2o.akka.persistence.kafka.resolver.KafkaTopicResolver$PersistenceId"
topic-resolver-class-name = "pl.newicom.akka.persistence.kafka.resolver.KafkaTopicResolver$PersistenceId"
# if need customize, default is partition 0
partition-resolver-class-name = "com.github.j5ik2o.akka.persistence.kafka.resolver.KafkaPartitionResolver$PartitionZero"
partition-resolver-class-name = "pl.newicom.akka.persistence.kafka.resolver.KafkaPartitionResolver$PartitionZero"

producer {
kafka-clients {
Expand Down
82 changes: 43 additions & 39 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ val kafkaVersion = "2.4.1.1"
val alpakkaKafkaVersion = "2.0.2"

val coreSettings = Seq(
sonatypeProfileName := "com.github.j5ik2o",
organization := "com.github.j5ik2o",
sonatypeProfileName := "pl.newicom",
organization := "pl.newicom",
scalaVersion := scala213Version,
crossScalaVersions ++= Seq(scala212Version, scala213Version),
scalacOptions ++= {
Expand Down Expand Up @@ -42,6 +42,10 @@ val coreSettings = Seq(
<id>j5ik2o</id>
<name>Junichi Kato</name>
</developer>
<developer>
<id>pawelkaczor</id>
<name>Pawel Kaczor</name>
</developer>
</developers>
},
publishTo := sonatypePublishToBundle.value,
Expand All @@ -51,45 +55,45 @@ val coreSettings = Seq(
Credentials(ivyCredentials) :: Credentials(gpgCredentials) :: Nil
},
resolvers ++= Seq(
"Sonatype OSS Snapshot Repository" at "https://oss.sonatype.org/content/repositories/snapshots/",
"Sonatype OSS Release Repository" at "https://oss.sonatype.org/content/repositories/releases/"
),
"Sonatype OSS Snapshot Repository" at "https://oss.sonatype.org/content/repositories/snapshots/",
"Sonatype OSS Release Repository" at "https://oss.sonatype.org/content/repositories/releases/"
),
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"com.iheart" %% "ficus" % "1.4.7",
"org.slf4j" % "slf4j-api" % "1.7.30",
"com.typesafe.akka" %% "akka-stream-kafka" % alpakkaKafkaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"io.github.embeddedkafka" %% "embedded-kafka" % kafkaVersion % Test
) ++ {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2L, scalaMajor)) if scalaMajor == 13 =>
Seq(
"com.typesafe.akka" %% "akka-slf4j" % akka26Version,
"com.typesafe.akka" %% "akka-stream" % akka26Version,
"com.typesafe.akka" %% "akka-persistence" % akka26Version,
"com.typesafe.akka" %% "akka-testkit" % akka26Version % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akka26Version % Test,
"com.typesafe.akka" %% "akka-persistence-tck" % akka26Version % Test,
"org.scalatest" %% "scalatest" % "3.1.1" % Test
)
case Some((2L, scalaMajor)) if scalaMajor == 12 =>
Seq(
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.6",
"com.typesafe.akka" %% "akka-slf4j" % akka26Version,
"com.typesafe.akka" %% "akka-stream" % akka26Version,
"com.typesafe.akka" %% "akka-persistence" % akka26Version,
"com.typesafe.akka" %% "akka-testkit" % akka26Version % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akka26Version % Test,
"com.typesafe.akka" %% "akka-persistence-tck" % akka26Version % Test,
"org.scalatest" %% "scalatest" % "3.1.1" % Test
)
}
},
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"com.iheart" %% "ficus" % "1.4.7",
"org.slf4j" % "slf4j-api" % "1.7.30",
"com.typesafe.akka" %% "akka-stream-kafka" % alpakkaKafkaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"io.github.embeddedkafka" %% "embedded-kafka" % kafkaVersion % Test
) ++ {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2L, scalaMajor)) if scalaMajor == 13 =>
Seq(
"com.typesafe.akka" %% "akka-slf4j" % akka26Version,
"com.typesafe.akka" %% "akka-stream" % akka26Version,
"com.typesafe.akka" %% "akka-persistence" % akka26Version,
"com.typesafe.akka" %% "akka-testkit" % akka26Version % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akka26Version % Test,
"com.typesafe.akka" %% "akka-persistence-tck" % akka26Version % Test,
"org.scalatest" %% "scalatest" % "3.1.4" % Test
)
case Some((2L, scalaMajor)) if scalaMajor == 12 =>
Seq(
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.6",
"com.typesafe.akka" %% "akka-slf4j" % akka26Version,
"com.typesafe.akka" %% "akka-stream" % akka26Version,
"com.typesafe.akka" %% "akka-persistence" % akka26Version,
"com.typesafe.akka" %% "akka-testkit" % akka26Version % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akka26Version % Test,
"com.typesafe.akka" %% "akka-persistence-tck" % akka26Version % Test,
"org.scalatest" %% "scalatest" % "3.1.4" % Test
)
}
},
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
),
scalapb.gen() -> (sourceManaged in Compile).value
),
parallelExecution in Test := false
)

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.3.12
sbt.version = 1.4.0
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.31")

addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13")

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.2")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.4")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ class JournalSequence(
journalPartitionResolver: KafkaPartitionResolver
) {

def readLowestSequenceNrAsync(persistenceId: PersistenceId, fromSequenceNr: Option[Long] = None)(
implicit ec: ExecutionContext
def readLowestSequenceNrAsync(persistenceId: PersistenceId, fromSequenceNr: Option[Long] = None)(implicit
ec: ExecutionContext
): Future[Long] =
Future { readLowestSequenceNr(persistenceId, fromSequenceNr) }

def readHighestSequenceNrAsync(persistenceId: PersistenceId, fromSequenceNr: Option[Long] = None)(
implicit ec: ExecutionContext
def readHighestSequenceNrAsync(persistenceId: PersistenceId, fromSequenceNr: Option[Long] = None)(implicit
ec: ExecutionContext
): Future[Long] =
Future { readHighestSequenceNr(persistenceId, fromSequenceNr) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,14 @@ class KafkaJournal(config: Config) extends AsyncWriteJournal with ActorLogging {
)
)
} else
ProducerMessage.multi(rowsToWrite.map {
case (journal, byteArray) =>
new ProducerRecord(
resolveTopic(journal.persistenceId),
resolvePartition(journal.persistenceId),
journal.persistenceId.asString,
byteArray,
createHeaders(journal)
)
ProducerMessage.multi(rowsToWrite.map { case (journal, byteArray) =>
new ProducerRecord(
resolveTopic(journal.persistenceId),
resolvePartition(journal.persistenceId),
journal.persistenceId.asString,
byteArray,
createHeaders(journal)
)
}.asJava) // asJava method, Must not modify for 2.12
val future: Future[immutable.Seq[Try[Unit]]] = Source
.single(messages)
Expand Down Expand Up @@ -207,10 +206,11 @@ class KafkaJournal(config: Config) extends AsyncWriteJournal with ActorLogging {
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
log.debug(s"asyncDeleteMessagesTo($persistenceId, $toSequenceNr): start")
val future = for {
to <- if (toSequenceNr == Long.MaxValue)
journalSequence.readHighestSequenceNrAsync(PersistenceId(persistenceId))
else
Future.successful(toSequenceNr)
to <-
if (toSequenceNr == Long.MaxValue)
journalSequence.readHighestSequenceNrAsync(PersistenceId(persistenceId))
else
Future.successful(toSequenceNr)
_ <- Future {
adminClient
.deleteRecords(
Expand Down Expand Up @@ -279,23 +279,21 @@ class KafkaJournal(config: Config) extends AsyncWriteJournal with ActorLogging {
.fromBinaryAsync(record.value(), classOf[JournalRow].getName)
.map(journal => (record, journal.asInstanceOf[JournalRow]))
}
.map {
case (record, journal) =>
log.debug(s"record = $record, journal = $journal")
(
record,
journal.persistentRepr
)
.map { case (record, journal) =>
log.debug(s"record = $record, journal = $journal")
(
record,
journal.persistentRepr
)
}
.map {
case (record, persistentRepr) =>
log.debug(
s"record.offset = ${record.offset()}, persistentRepr.sequenceNr = ${persistentRepr.sequenceNr}, deletedTo = $deletedTo"
)
if (persistentRepr.sequenceNr <= deletedTo) {
log.debug("update: deleted = true")
persistentRepr.update(deleted = true)
} else persistentRepr
.map { case (record, persistentRepr) =>
log.debug(
s"record.offset = ${record.offset()}, persistentRepr.sequenceNr = ${persistentRepr.sequenceNr}, deletedTo = $deletedTo"
)
if (persistentRepr.sequenceNr <= deletedTo) {
log.debug("update: deleted = true")
persistentRepr.update(deleted = true)
} else persistentRepr
}
.runWith(Sink.foreach { persistentRepr =>
if (adjustedFrom <= persistentRepr.sequenceNr && persistentRepr.sequenceNr <= adjustedTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ trait EncryptionFilter {
object EncryptionFilter {

class NonEncryption extends EncryptionFilter {
override def encrypt(persistenceId: PersistenceId, bytes: Array[Byte], context: Map[String, AnyRef])(
implicit ec: ExecutionContext
override def encrypt(persistenceId: PersistenceId, bytes: Array[Byte], context: Map[String, AnyRef])(implicit
ec: ExecutionContext
): Future[Array[Byte]] = Future.successful(bytes)

override def decrypt(bytes: Array[Byte])(
implicit ec: ExecutionContext
override def decrypt(bytes: Array[Byte])(implicit
ec: ExecutionContext
): Future[Array[Byte]] = Future.successful(bytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class PersistentReprSerializer(serialization: Serialization) {
serializer.toBinaryAsync(journal).map((journal, _))
}

def serialize(persistentRepr: PersistentRepr, index: Option[Int])(
implicit ec: ExecutionContext
def serialize(persistentRepr: PersistentRepr, index: Option[Int])(implicit
ec: ExecutionContext
): Future[JournalWithByteArray] = {
persistentRepr.payload match {
case Tagged(payload, tags) =>
Expand All @@ -54,8 +54,8 @@ class PersistentReprSerializer(serialization: Serialization) {
atomicWrites: Seq[AtomicWrite]
)(implicit ec: ExecutionContext): Seq[Future[Seq[JournalWithByteArray]]] = {
atomicWrites.map { atomicWrite =>
val serialized = atomicWrite.payload.zipWithIndex.map {
case (v, index) => serialize(v, Some(index))
val serialized = atomicWrite.payload.zipWithIndex.map { case (v, index) =>
serialize(v, Some(index))
}
Future.sequence(serialized)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,18 @@ class KafkaSnapshotStore(config: Config) extends SnapshotStore {
val rangeDeletions = this.rangeDeletions

for {
highest <- if (ignoreOrphan)
journalSequence.readHighestSequenceNrAsync(PersistenceId(persistenceId))
else
Future.successful(Long.MaxValue)
adjusted = if (ignoreOrphan &&
highest < criteria.maxSequenceNr &&
highest > 0L) criteria.copy(maxSequenceNr = highest)
else criteria
highest <-
if (ignoreOrphan)
journalSequence.readHighestSequenceNrAsync(PersistenceId(persistenceId))
else
Future.successful(Long.MaxValue)
adjusted =
if (
ignoreOrphan &&
highest < criteria.maxSequenceNr &&
highest > 0L
) criteria.copy(maxSequenceNr = highest)
else criteria
// highest <- Future.successful(Long.MaxValue)
// adjusted = criteria
snapshot <- {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import org.scalatest.BeforeAndAfterAll

class KafkaJournalPerfSpec
extends JournalPerfSpec(
ConfigFactory.parseString("""
ConfigFactory
.parseString("""
|akka.test.single-expect-default = 60s
""".stripMargin).withFallback(ConfigFactory.load())
""".stripMargin)
.withFallback(ConfigFactory.load())
)
with BeforeAndAfterAll {

Expand Down
Loading