diff --git a/acme-data/acme-data-scheduling/build.gradle.kts b/acme-data/acme-data-scheduling/build.gradle.kts index e9c2c8f..029dc0a 100644 --- a/acme-data/acme-data-scheduling/build.gradle.kts +++ b/acme-data/acme-data-scheduling/build.gradle.kts @@ -14,8 +14,10 @@ dependencies { testImplementation(project(":acme-lib:acme-lib-liquibase")) testImplementation(libs.com.zaxxer.hikariCP) + testImplementation(libs.io.kotest.kotest.runner.junit5) testImplementation(libs.org.postgresql) - testImplementation(libs.org.slf4j.slf4j.api) + testImplementation(libs.org.postgresql.r2dbc) + testImplementation(libs.org.slf4j.slf4j.simple) testImplementation(libs.org.testcontainers.postgresql) - testImplementation(libs.io.kotest.kotest.runner.junit5) + testImplementation(libs.org.testcontainers.r2dbc) } diff --git a/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepository.kt b/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepository.kt index 9d261b8..b43b4dd 100644 --- a/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepository.kt +++ b/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepository.kt @@ -6,6 +6,8 @@ import com.acme.core.PersistenceMetaData import com.acme.jooq.asExcluded import com.acme.scheduling.Appointment import com.acme.sql.scheduling.tables.references.APPOINTMENTS +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import org.jooq.DSLContext @@ -20,10 +22,10 @@ class JooqAppointmentAggregateRepository( private val clock: Clock = Clock.systemUTC() ) : AggregateRepository { - override fun find(id: Appointment.Id): PersistedAggregate? = + override suspend fun find(id: Appointment.Id): PersistedAggregate? = dsl.selectFrom(APPOINTMENTS) .where(APPOINTMENTS.ID.eq(id.value)) - .fetchOne { + .awaitFirstOrNull()?.let { PersistedAggregate( aggregate = Json.decodeFromString(it.aggregate!!.data()), metaData = PersistenceMetaData( @@ -34,15 +36,16 @@ class JooqAppointmentAggregateRepository( ) } - override fun get(id: Appointment.Id): PersistedAggregate = getOrThrow(id) { NoSuchElementException() } + override suspend fun get(id: Appointment.Id): PersistedAggregate = + getOrThrow(id) { NoSuchElementException() } - override fun getOrThrow(id: Appointment.Id, block: () -> Throwable): PersistedAggregate = + override suspend fun getOrThrow(id: Appointment.Id, block: () -> Throwable): PersistedAggregate = find(id) ?: throw block() - override fun exists(id: Appointment.Id): Boolean = - dsl.fetchExists(APPOINTMENTS, APPOINTMENTS.ID.eq(id.value)) + override suspend fun exists(id: Appointment.Id): Boolean = + dsl.selectOne().from(APPOINTMENTS).where(APPOINTMENTS.ID.eq(id.value)).awaitFirstOrNull() != null - override fun save(aggregate: Appointment) { + override suspend fun save(aggregate: Appointment) { val now = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC) val json = JSONB.valueOf(Json.encodeToString(aggregate)) @@ -65,6 +68,7 @@ class JooqAppointmentAggregateRepository( .set(APPOINTMENTS.AGGREGATE, APPOINTMENTS.AGGREGATE.asExcluded()) .set(APPOINTMENTS.REVISION, APPOINTMENTS.REVISION.add(1)) .set(APPOINTMENTS.UPDATED_AT, now) - .execute() + .returning() + .awaitFirst() } } diff --git a/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqClientAggregateRepository.kt b/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqClientAggregateRepository.kt index de2f514..fe4a8a3 100644 --- a/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqClientAggregateRepository.kt +++ b/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqClientAggregateRepository.kt @@ -6,6 +6,8 @@ import com.acme.core.PersistenceMetaData import com.acme.jooq.asExcluded import com.acme.scheduling.Client import com.acme.sql.scheduling.tables.references.CLIENTS +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import org.jooq.DSLContext @@ -18,10 +20,10 @@ class JooqClientAggregateRepository( private val clock: Clock = Clock.systemUTC() ) : AggregateRepository { - override fun find(id: Client.Id): PersistedAggregate? = + override suspend fun find(id: Client.Id): PersistedAggregate? = dsl.selectFrom(CLIENTS) .where(CLIENTS.ID.eq(id.value)) - .fetchOne { + .awaitFirstOrNull()?.let { PersistedAggregate( aggregate = Json.decodeFromString(it.aggregate!!.data()), metaData = PersistenceMetaData( @@ -32,15 +34,15 @@ class JooqClientAggregateRepository( ) } - override fun get(id: Client.Id): PersistedAggregate = getOrThrow(id) { NoSuchElementException() } + override suspend fun get(id: Client.Id): PersistedAggregate = getOrThrow(id) { NoSuchElementException() } - override fun getOrThrow(id: Client.Id, block: () -> Throwable): PersistedAggregate = + override suspend fun getOrThrow(id: Client.Id, block: () -> Throwable): PersistedAggregate = find(id) ?: throw block() - override fun exists(id: Client.Id): Boolean = - dsl.fetchExists(CLIENTS, CLIENTS.ID.eq(id.value)) + override suspend fun exists(id: Client.Id): Boolean = + dsl.selectOne().from(CLIENTS).where(CLIENTS.ID.eq(id.value)).awaitFirstOrNull() != null - override fun save(aggregate: Client) { + override suspend fun save(aggregate: Client) { val now = LocalDateTime.now(clock) val json = JSONB.valueOf(Json.encodeToString(aggregate)) @@ -63,6 +65,7 @@ class JooqClientAggregateRepository( .set(CLIENTS.AGGREGATE, CLIENTS.AGGREGATE.asExcluded()) .set(CLIENTS.REVISION, CLIENTS.REVISION.add(1)) .set(CLIENTS.UPDATED_AT, now) - .execute() + .returning() + .awaitFirst() } } diff --git a/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepository.kt b/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepository.kt index 5c45b82..90a3f47 100644 --- a/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepository.kt +++ b/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepository.kt @@ -6,6 +6,8 @@ import com.acme.core.PersistenceMetaData import com.acme.jooq.asExcluded import com.acme.scheduling.Practice import com.acme.sql.scheduling.tables.Practices.Companion.PRACTICES +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import org.jooq.DSLContext @@ -18,10 +20,10 @@ class JooqPracticeAggregateRepository( private val clock: Clock = Clock.systemUTC() ) : AggregateRepository { - override fun find(id: Practice.Id): PersistedAggregate? = + override suspend fun find(id: Practice.Id): PersistedAggregate? = dsl.selectFrom(PRACTICES) .where(PRACTICES.ID.eq(id.value)) - .fetchOne { + .awaitFirstOrNull()?.let { PersistedAggregate( aggregate = Json.decodeFromString(it.aggregate!!.data()), metaData = PersistenceMetaData( @@ -32,15 +34,15 @@ class JooqPracticeAggregateRepository( ) } - override fun get(id: Practice.Id): PersistedAggregate = getOrThrow(id) { NoSuchElementException() } + override suspend fun get(id: Practice.Id): PersistedAggregate = getOrThrow(id) { NoSuchElementException() } - override fun getOrThrow(id: Practice.Id, block: () -> Throwable): PersistedAggregate = + override suspend fun getOrThrow(id: Practice.Id, block: () -> Throwable): PersistedAggregate = find(id) ?: throw block() - override fun exists(id: Practice.Id): Boolean = - dsl.fetchExists(PRACTICES, PRACTICES.ID.eq(id.value)) + override suspend fun exists(id: Practice.Id): Boolean = + dsl.selectOne().from(PRACTICES).where(PRACTICES.ID.eq(id.value)).awaitFirstOrNull() != null - override fun save(aggregate: Practice) { + override suspend fun save(aggregate: Practice) { val now = LocalDateTime.now(clock) val json = JSONB.valueOf(Json.encodeToString(aggregate)) @@ -63,6 +65,7 @@ class JooqPracticeAggregateRepository( .set(PRACTICES.AGGREGATE, PRACTICES.AGGREGATE.asExcluded()) .set(PRACTICES.REVISION, PRACTICES.REVISION.add(1)) .set(PRACTICES.UPDATED_AT, now) - .execute() + .returning() + .awaitFirst() } } diff --git a/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepository.kt b/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepository.kt index ac12ec2..fc22a7f 100644 --- a/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepository.kt +++ b/acme-data/acme-data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepository.kt @@ -6,6 +6,8 @@ import com.acme.core.PersistenceMetaData import com.acme.jooq.asExcluded import com.acme.scheduling.Practitioner import com.acme.sql.scheduling.tables.Practitioners.Companion.PRACTITIONERS +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import org.jooq.DSLContext @@ -18,10 +20,10 @@ class JooqPractitionerAggregateRepository( private val clock: Clock = Clock.systemUTC() ) : AggregateRepository { - override fun find(id: Practitioner.Id): PersistedAggregate? = + override suspend fun find(id: Practitioner.Id): PersistedAggregate? = dsl.selectFrom(PRACTITIONERS) .where(PRACTITIONERS.ID.eq(id.value)) - .fetchOne { + .awaitFirstOrNull()?.let { PersistedAggregate( aggregate = Json.decodeFromString(it.aggregate!!.data()), metaData = PersistenceMetaData( @@ -32,15 +34,16 @@ class JooqPractitionerAggregateRepository( ) } - override fun get(id: Practitioner.Id): PersistedAggregate = getOrThrow(id) { NoSuchElementException() } + override suspend fun get(id: Practitioner.Id): PersistedAggregate = + getOrThrow(id) { NoSuchElementException() } - override fun getOrThrow(id: Practitioner.Id, block: () -> Throwable): PersistedAggregate = + override suspend fun getOrThrow(id: Practitioner.Id, block: () -> Throwable): PersistedAggregate = find(id) ?: throw block() - override fun exists(id: Practitioner.Id): Boolean = - dsl.fetchExists(PRACTITIONERS, PRACTITIONERS.ID.eq(id.value)) + override suspend fun exists(id: Practitioner.Id): Boolean = + dsl.selectOne().from(PRACTITIONERS).where(PRACTITIONERS.ID.eq(id.value)).awaitFirstOrNull() != null - override fun save(aggregate: Practitioner) { + override suspend fun save(aggregate: Practitioner) { val now = LocalDateTime.now(clock) val json = JSONB.valueOf(Json.encodeToString(aggregate)) @@ -63,6 +66,7 @@ class JooqPractitionerAggregateRepository( .set(PRACTITIONERS.AGGREGATE, PRACTITIONERS.AGGREGATE.asExcluded()) .set(PRACTITIONERS.REVISION, PRACTITIONERS.REVISION.add(1)) .set(PRACTITIONERS.UPDATED_AT, now) - .execute() + .returning() + .awaitFirst() } } diff --git a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepositoryTest.kt b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepositoryTest.kt index 7f87878..e92c853 100644 --- a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepositoryTest.kt +++ b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepositoryTest.kt @@ -14,7 +14,6 @@ import java.time.LocalDateTime class JooqAppointmentAggregateRepositoryTest : ShouldSpec({ - val jooq = listener(TestDatabaseListener()) val now = LocalDateTime.now() val appointment = Appointment( @@ -30,7 +29,7 @@ class JooqAppointmentAggregateRepositoryTest : ShouldSpec({ ) should("save new aggregate") { - jooq.testTransaction { + testTransaction { val time = timeFixtureFactory() val repo = JooqAppointmentAggregateRepository(it.dsl(), time.clock) repo.save(appointment) @@ -45,7 +44,7 @@ class JooqAppointmentAggregateRepositoryTest : ShouldSpec({ } should("update existing aggregate and increment revision") { - jooq.testTransaction { + testTransaction { val createTime = timeFixtureFactory() val createRepo = JooqAppointmentAggregateRepository(it.dsl(), createTime.clock) createRepo.save(appointment) @@ -69,7 +68,7 @@ class JooqAppointmentAggregateRepositoryTest : ShouldSpec({ } should("throw NoSuchElementException") { - jooq.testTransaction { + testTransaction { val repo = JooqAppointmentAggregateRepository(it.dsl()) shouldThrow { repo.get(appointment.id) @@ -78,7 +77,7 @@ class JooqAppointmentAggregateRepositoryTest : ShouldSpec({ } should("throw user supplied exception") { - jooq.testTransaction { + testTransaction { val repo = JooqAppointmentAggregateRepository(it.dsl()) shouldThrow { repo.getOrThrow(appointment.id) { diff --git a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqClientAggregateRepositoryTest.kt b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqClientAggregateRepositoryTest.kt index a6a7dfa..337cd6c 100644 --- a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqClientAggregateRepositoryTest.kt +++ b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqClientAggregateRepositoryTest.kt @@ -12,8 +12,6 @@ import io.kotest.matchers.shouldBe class JooqClientAggregateRepositoryTest : ShouldSpec({ - val jooq = listener(TestDatabaseListener()) - val client = Client( id = Client.Id("Client123"), names = setOf( @@ -29,7 +27,7 @@ class JooqClientAggregateRepositoryTest : ShouldSpec({ ) should("should save new aggregate") { - jooq.testTransaction { + testTransaction { val time: TimeFixture = timeFixtureFactory() val repo = JooqClientAggregateRepository(it.dsl(), time.clock) repo.save(client) @@ -44,7 +42,7 @@ class JooqClientAggregateRepositoryTest : ShouldSpec({ } should("update existing aggregate and increment revision") { - jooq.testTransaction { + testTransaction { val createTime: TimeFixture = timeFixtureFactory() val createRepo = JooqClientAggregateRepository(it.dsl(), createTime.clock) createRepo.save(client) @@ -63,7 +61,7 @@ class JooqClientAggregateRepositoryTest : ShouldSpec({ } should("should throw NoSuchElementException") { - jooq.testTransaction { + testTransaction { val repo = JooqClientAggregateRepository(it.dsl()) shouldThrow { repo.get(client.id) @@ -72,7 +70,7 @@ class JooqClientAggregateRepositoryTest : ShouldSpec({ } should("should throw user supplied exception") { - jooq.testTransaction { + testTransaction { val repo = JooqClientAggregateRepository(it.dsl()) shouldThrow { repo.getOrThrow(client.id) { diff --git a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepositoryTest.kt b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepositoryTest.kt index 838152c..8be60f0 100644 --- a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepositoryTest.kt +++ b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepositoryTest.kt @@ -10,8 +10,6 @@ import io.kotest.matchers.shouldBe class JooqPracticeAggregateRepositoryTest : ShouldSpec({ - val jooq = listener(TestDatabaseListener()) - val practice = Practice( id = Practice.Id("PracticeID"), name = Practice.Name("Practice & Associates"), @@ -28,7 +26,7 @@ class JooqPracticeAggregateRepositoryTest : ShouldSpec({ ) should("save new aggregate") { - jooq.testTransaction { + testTransaction { val time: TimeFixture = timeFixtureFactory() val repo = JooqPracticeAggregateRepository(it.dsl(), time.clock) repo.save(practice) @@ -43,7 +41,7 @@ class JooqPracticeAggregateRepositoryTest : ShouldSpec({ } should("update an existing aggregate and increment revision") { - jooq.testTransaction { + testTransaction { val createTime = timeFixtureFactory() val createRepo = JooqPracticeAggregateRepository(it.dsl(), createTime.clock) createRepo.save(practice) @@ -64,7 +62,7 @@ class JooqPracticeAggregateRepositoryTest : ShouldSpec({ } should("throw NoSuchElementException") { - jooq.testTransaction { + testTransaction { val repo = JooqPracticeAggregateRepository(it.dsl()) shouldThrow { repo.get(practice.id) @@ -73,7 +71,7 @@ class JooqPracticeAggregateRepositoryTest : ShouldSpec({ } should("throw user supplied exception") { - jooq.testTransaction { + testTransaction { val repo = JooqPracticeAggregateRepository(it.dsl()) shouldThrow { repo.getOrThrow(practice.id) { diff --git a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepositoryTest.kt b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepositoryTest.kt index 6716171..9ab5701 100644 --- a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepositoryTest.kt +++ b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepositoryTest.kt @@ -13,8 +13,6 @@ import io.kotest.matchers.shouldBe class JooqPractitionerAggregateRepositoryTest : ShouldSpec({ - val jooq = listener(TestDatabaseListener()) - val practitioner = Practitioner( id = Practitioner.Id("Practitioner123"), user = UserId("User123"), @@ -31,7 +29,7 @@ class JooqPractitionerAggregateRepositoryTest : ShouldSpec({ ) should("save new aggregate") { - jooq.testTransaction { + testTransaction { val time: TimeFixture = timeFixtureFactory() val repo = JooqPractitionerAggregateRepository(it.dsl(), time.clock) repo.save(practitioner) @@ -46,7 +44,7 @@ class JooqPractitionerAggregateRepositoryTest : ShouldSpec({ } should("update existing aggregate and increment revision") { - jooq.testTransaction { + testTransaction { val createTime = timeFixtureFactory() val createRepo = JooqPractitionerAggregateRepository(it.dsl(), createTime.clock) createRepo.save(practitioner) @@ -65,7 +63,7 @@ class JooqPractitionerAggregateRepositoryTest : ShouldSpec({ } should("throw NoSuchElementException") { - jooq.testTransaction { + testTransaction { val repo = JooqPractitionerAggregateRepository(it.dsl()) shouldThrow { repo.get(practitioner.id) @@ -74,7 +72,7 @@ class JooqPractitionerAggregateRepositoryTest : ShouldSpec({ } should("throw user supplied exception") { - jooq.testTransaction { + testTransaction { val repo = JooqPractitionerAggregateRepository(it.dsl()) shouldThrow { repo.getOrThrow(practitioner.id) { diff --git a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqSchedulingUnitOfWorkTest.kt b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqSchedulingUnitOfWorkTest.kt index 89a85a2..fd72e5c 100644 --- a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqSchedulingUnitOfWorkTest.kt +++ b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/JooqSchedulingUnitOfWorkTest.kt @@ -10,85 +10,80 @@ import com.acme.scheduling.Period import com.acme.scheduling.Practice import com.acme.scheduling.Practitioner import com.acme.scheduling.UserId -import io.kotest.common.runBlocking import io.kotest.core.spec.style.ShouldSpec import io.kotest.matchers.booleans.shouldBeTrue import java.time.LocalDateTime class JooqSchedulingUnitOfWorkTest : ShouldSpec({ - val jooq = listener(TestDatabaseListener()) - should("commit work") { - jooq.testTransaction { - val uow = SchedulingJooqUnitOfWork(it) - runBlocking { - uow.transaction { - with(uow.repositories) { - practices.save( - Practice( - id = Practice.Id("Practice123"), - name = Practice.Name("Hello & Associates"), - contactPoints = emptySet(), - owner = Practitioner.Id("Practitioner123") - ) + testTransaction { + val uow = SchedulingJooqUnitOfWork(it.configuration()) + uow.transaction { + with(uow.repositories) { + practices.save( + Practice( + id = Practice.Id("Practice123"), + name = Practice.Name("Hello & Associates"), + contactPoints = emptySet(), + owner = Practitioner.Id("Practitioner123") ) + ) - appointments.save( - Appointment( - id = Appointment.Id("Appointment123"), - practitioner = Practitioner.Id("Practitioner123"), - client = Client.Id("Client123"), - practice = Practice.Id("Practice123"), - period = Period.Bounded(LocalDateTime.now(), LocalDateTime.now().plusSeconds(100)), - state = AppointmentState.SCHEDULED - ) + appointments.save( + Appointment( + id = Appointment.Id("Appointment123"), + practitioner = Practitioner.Id("Practitioner123"), + client = Client.Id("Client123"), + practice = Practice.Id("Practice123"), + period = Period.Bounded(LocalDateTime.now(), LocalDateTime.now().plusSeconds(100)), + state = AppointmentState.SCHEDULED ) + ) - clients.save( - Client( - id = Client.Id("Client123"), - names = setOf( - HumanName( - family = Name.Family("World"), - given = Name.Given("Hello"), - prefix = Name.Prefix(""), - suffix = Name.Suffix(""), - period = Period.Unknown - ) - ), - gender = Gender.UNKNOWN, - contactPoints = emptySet() - ) + clients.save( + Client( + id = Client.Id("Client123"), + names = setOf( + HumanName( + family = Name.Family("World"), + given = Name.Given("Hello"), + prefix = Name.Prefix(""), + suffix = Name.Suffix(""), + period = Period.Unknown + ) + ), + gender = Gender.UNKNOWN, + contactPoints = emptySet() ) + ) - practitioners.save( - Practitioner( - id = Practitioner.Id("Practitioner123"), - user = UserId("User123"), - names = setOf( - HumanName( - family = Name.Family("World"), - given = Name.Given("Hello"), - prefix = Name.Prefix(""), - suffix = Name.Suffix(""), - period = Period.Unknown - ) - ), - gender = Gender.UNKNOWN, - contactPoints = emptySet() - ) + practitioners.save( + Practitioner( + id = Practitioner.Id("Practitioner123"), + user = UserId("User123"), + names = setOf( + HumanName( + family = Name.Family("World"), + given = Name.Given("Hello"), + prefix = Name.Prefix(""), + suffix = Name.Suffix(""), + period = Period.Unknown + ) + ), + gender = Gender.UNKNOWN, + contactPoints = emptySet() ) - } + ) } - val uow2 = SchedulingJooqUnitOfWork(it) - uow2.transaction { - with(uow2.repositories) { - appointments.exists(Appointment.Id("Appointment123")).shouldBeTrue() - clients.exists(Client.Id("Client123")).shouldBeTrue() - practices.exists(Practice.Id("Practice123")).shouldBeTrue() - practitioners.exists(Practitioner.Id("Practitioner123")).shouldBeTrue() - } + } + val uow2 = SchedulingJooqUnitOfWork(it.configuration()) + uow2.transaction { + with(uow2.repositories) { + appointments.exists(Appointment.Id("Appointment123")).shouldBeTrue() + clients.exists(Client.Id("Client123")).shouldBeTrue() + practices.exists(Practice.Id("Practice123")).shouldBeTrue() + practitioners.exists(Practitioner.Id("Practitioner123")).shouldBeTrue() } } } diff --git a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/TestDatabaseListener.kt b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/TestDatabaseListener.kt deleted file mode 100644 index c393a05..0000000 --- a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/TestDatabaseListener.kt +++ /dev/null @@ -1,57 +0,0 @@ -package com.acme.scheduling.data - -import com.zaxxer.hikari.HikariConfig -import com.zaxxer.hikari.HikariDataSource -import io.github.oshai.kotlinlogging.KotlinLogging -import io.kotest.core.listeners.TestListener -import io.kotest.core.spec.Spec -import org.jooq.Configuration -import org.jooq.SQLDialect -import org.jooq.exception.DataAccessException -import org.jooq.impl.DataSourceConnectionProvider -import org.jooq.impl.DefaultConfiguration -import org.jooq.impl.DefaultTransactionProvider - -class TestDatabaseListener : TestListener { - - private val logger = KotlinLogging.logger {} - - private var ds: HikariDataSource? = null - - fun testTransaction(block: (config: Configuration) -> Unit) { - val config = DefaultConfiguration().apply { - set(ds) - set(DefaultTransactionProvider(DataSourceConnectionProvider(ds), true)) - set(SQLDialect.POSTGRES) - } - - try { - with(config.dsl()) { - transaction { config -> - block(config) - throw TestTransactionException() - } - } - } catch (e: TestTransactionException) { - logger.debug { e.message } - } - } - - override suspend fun beforeSpec(spec: Spec) { - ds = HikariDataSource( - HikariConfig().apply { - jdbcUrl = - "jdbc:tc:postgresql:11.5:///test?TC_INITFUNCTION=com.acme.liquibase.LiquibaseTestContainerInitializerKt::update" - username = "test" - password = "test" - isAutoCommit = false - } - ) - } - - override suspend fun afterSpec(spec: Spec) { - ds?.close() - } - - class TestTransactionException : DataAccessException("Rollback caused by test transaction") -} diff --git a/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/database.kt b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/database.kt new file mode 100644 index 0000000..df0671f --- /dev/null +++ b/acme-data/acme-data-scheduling/src/test/kotlin/com/acme/scheduling/data/database.kt @@ -0,0 +1,51 @@ +package com.acme.scheduling.data + +import com.acme.liquibase.update +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import io.r2dbc.spi.ConnectionFactories +import org.jooq.DSLContext +import org.jooq.exception.DataAccessException +import org.jooq.impl.DSL +import org.jooq.kotlin.coroutines.transactionCoroutine +import org.testcontainers.containers.PostgreSQLContainer +import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer + +class TestDatabase { + private val container = PostgreSQLContainer("postgres:15.5").apply { + start() + } + + val dsl = DSL.using( + ConnectionFactories.get( + PostgreSQLR2DBCDatabaseContainer.getOptions(container) + ) + ) + + init { + val ds = HikariDataSource( + HikariConfig().apply { + jdbcUrl = container.getJdbcUrl() + username = container.username + password = container.password + isAutoCommit = false + } + ) + update(ds.connection) + ds.close() + } +} + +val database = TestDatabase() + +suspend fun testTransaction(block: suspend (dsl: DSLContext) -> Unit) = try { + database.dsl.transactionCoroutine { + block(it.dsl()) + throw TestTransactionException() + } +} catch (_: TestTransactionException) { + // Do nothing and let Jooq rollback +} + +class TestTransactionException : DataAccessException("Rollback caused by test transaction") + diff --git a/acme-data/acme-data-sql/build.gradle.kts b/acme-data/acme-data-sql/build.gradle.kts index 2f381ad..1807775 100644 --- a/acme-data/acme-data-sql/build.gradle.kts +++ b/acme-data/acme-data-sql/build.gradle.kts @@ -18,7 +18,10 @@ dependencies { api(project(":acme-lib:acme-lib-jooq")) testImplementation(project(":acme-lib:acme-lib-liquibase")) + testImplementation(libs.com.zaxxer.hikariCP) testImplementation(libs.io.kotest.kotest.runner.junit5) + testImplementation(libs.org.postgresql) + testImplementation(libs.org.postgresql.r2dbc) testImplementation(libs.org.slf4j.slf4j.simple) testImplementation(libs.org.testcontainers.postgresql) testImplementation(libs.org.testcontainers.r2dbc) diff --git a/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/R2DCBTest.kt b/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/R2DCBTest.kt deleted file mode 100644 index 274a0a8..0000000 --- a/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/R2DCBTest.kt +++ /dev/null @@ -1,17 +0,0 @@ -package com.acme.sql.scheduling - -import com.acme.sql.scheduling.tables.references.APPOINTMENTS -import io.kotest.core.spec.style.ShouldSpec -import io.kotest.matchers.nulls.shouldBeNull -import kotlinx.coroutines.reactive.awaitFirstOrNull - -class R2DCBTest : ShouldSpec({ - - val db = listener(TestDatabaseListener()) - - should("not block or hang?") { - db.dsl.selectFrom(APPOINTMENTS) - .where(APPOINTMENTS.ID.eq("123")) - .awaitFirstOrNull().shouldBeNull() - } -}) diff --git a/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/SmokeTest.kt b/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/SmokeTest.kt new file mode 100644 index 0000000..8cb6bda --- /dev/null +++ b/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/SmokeTest.kt @@ -0,0 +1,61 @@ +package com.acme.sql.scheduling + +import com.acme.jooq.asExcluded +import com.acme.sql.scheduling.tables.references.APPOINTMENTS +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.nulls.shouldNotBeNull +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.jooq.JSONB +import java.time.Clock +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset + +class SmokeTest : ShouldSpec({ + val clock = Clock.systemUTC() + + should("be able to insert and query against the appointments table") { + testTransaction { dsl -> + val now = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC) + val json = JSONB.valueOf( + """ + { + "hello": "world" + } + """.trimIndent() + ) + + dsl.insertInto( + APPOINTMENTS, + APPOINTMENTS.ID, + APPOINTMENTS.REVISION, + APPOINTMENTS.AGGREGATE, + APPOINTMENTS.CREATED_AT, + APPOINTMENTS.UPDATED_AT + ).values( + "123", + 1, + json, + now, + now, + ) + .onConflict(APPOINTMENTS.ID) + .doUpdate() + .set(APPOINTMENTS.AGGREGATE, APPOINTMENTS.AGGREGATE.asExcluded()) + .set(APPOINTMENTS.REVISION, APPOINTMENTS.REVISION.add(1)) + .set(APPOINTMENTS.UPDATED_AT, now) + .returning() + .awaitFirst() + + dsl.selectFrom(APPOINTMENTS) + .where(APPOINTMENTS.ID.eq("123")) + .awaitFirstOrNull().shouldNotBeNull() + + dsl.selectFrom(APPOINTMENTS) + .where(APPOINTMENTS.ID.eq("345")) + .awaitFirstOrNull().shouldBeNull() + } + } +}) diff --git a/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/TestDatabaseListener.kt b/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/TestDatabaseListener.kt deleted file mode 100644 index 85e46da..0000000 --- a/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/TestDatabaseListener.kt +++ /dev/null @@ -1,28 +0,0 @@ -package com.acme.sql.scheduling - -import io.kotest.core.listeners.TestListener -import io.kotest.core.spec.Spec -import io.r2dbc.spi.ConnectionFactories -import io.r2dbc.spi.ConnectionFactoryOptions -import org.jooq.SQLDialect -import org.jooq.impl.DSL - -class TestDatabaseListener : TestListener { - - val dsl = DSL.using( - ConnectionFactories.get( - ConnectionFactoryOptions - .parse("r2dbc:tc:postgresql:///test?TC_IMAGE_TAG=15.5") - .mutate() - .option(ConnectionFactoryOptions.USER, "test") - .option(ConnectionFactoryOptions.PASSWORD, "test") - .build() - ), SQLDialect.POSTGRES - ) - - override suspend fun beforeSpec(spec: Spec) { - } - - override suspend fun afterSpec(spec: Spec) { - } -} diff --git a/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/database.kt b/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/database.kt new file mode 100644 index 0000000..0d9bc07 --- /dev/null +++ b/acme-data/acme-data-sql/src/test/kotlin/com/acme/sql/scheduling/database.kt @@ -0,0 +1,50 @@ +package com.acme.sql.scheduling + +import com.acme.liquibase.update +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import io.r2dbc.spi.ConnectionFactories +import org.jooq.DSLContext +import org.jooq.exception.DataAccessException +import org.jooq.impl.DSL +import org.jooq.kotlin.coroutines.transactionCoroutine +import org.testcontainers.containers.PostgreSQLContainer +import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer + +class TestDatabase { + private val container = PostgreSQLContainer("postgres:15.5").apply { + start() + } + + val dsl = DSL.using( + ConnectionFactories.get( + PostgreSQLR2DBCDatabaseContainer.getOptions(container) + ) + ) + + init { + val ds = HikariDataSource( + HikariConfig().apply { + jdbcUrl = container.getJdbcUrl() + username = container.username + password = container.password + isAutoCommit = false + } + ) + update(ds.connection) + ds.close() + } +} + +val database = TestDatabase() + +suspend fun testTransaction(block: suspend (dsl: DSLContext) -> Unit) = try { + database.dsl.transactionCoroutine { + block(it.dsl()) + throw TestTransactionException() + } +} catch (_: TestTransactionException) { + // Do nothing and let Jooq rollback +} + +class TestTransactionException : DataAccessException("Rollback caused by test transaction") diff --git a/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/AggregateRepository.kt b/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/AggregateRepository.kt index 7b8177c..f1f8af4 100644 --- a/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/AggregateRepository.kt +++ b/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/AggregateRepository.kt @@ -1,9 +1,9 @@ package com.acme.core interface AggregateRepository, I> { - fun find(id: I): PersistedAggregate? - fun get(id: I): PersistedAggregate - fun getOrThrow(id: I, block: () -> Throwable): PersistedAggregate - fun exists(id: I): Boolean - fun save(aggregate: T) + suspend fun find(id: I): PersistedAggregate? + suspend fun get(id: I): PersistedAggregate + suspend fun getOrThrow(id: I, block: () -> Throwable): PersistedAggregate + suspend fun exists(id: I): Boolean + suspend fun save(aggregate: T) } diff --git a/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/DefaultMessageBus.kt b/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/DefaultMessageBus.kt index 115979b..2dfc2d9 100644 --- a/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/DefaultMessageBus.kt +++ b/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/DefaultMessageBus.kt @@ -2,10 +2,11 @@ package com.acme.core import io.github.oshai.kotlinlogging.KotlinLogging import kotlin.reflect.KClass +import kotlin.reflect.KSuspendFunction2 class DefaultMessageBus( - private val commandHandlers: MutableMap, (Command, UnitOfWork) -> Unit> = mutableMapOf(), - private val eventHandlers: MutableMap, List<(Event, UnitOfWork) -> Unit>> = mutableMapOf() + private val commandHandlers: MutableMap, KSuspendFunction2> = mutableMapOf(), + private val eventHandlers: MutableMap, List>> = mutableMapOf() ) : MessageBus { private val logger = KotlinLogging.logger {} @@ -13,26 +14,35 @@ class DefaultMessageBus( override fun copy() = DefaultMessageBus(commandHandlers, eventHandlers) @Suppress("UNCHECKED_CAST") + override fun addEventHandler(eventClass: KClass<*>, handler: Any): DefaultMessageBus { + val handlers = eventHandlers.getOrDefault(eventClass, emptyList()) + if (handlers.contains(handler)) throw RuntimeException("Event handler has already been registered") + eventHandlers[eventClass] = handlers.plus(handler as KSuspendFunction2) + return this + } + override fun addEventHandler(vararg pairs: Pair, Any>): DefaultMessageBus { pairs.forEach { - val handler = it.second as (Event, UnitOfWork) -> Unit - val handlers = eventHandlers.getOrDefault(it.first, emptyList()) - if (handlers.contains(it.second)) throw RuntimeException("${it.second} has already been registered") - eventHandlers[it.first] = handlers.plus(handler) + addEventHandler(it.first, it.second) } return this } @Suppress("UNCHECKED_CAST") + override fun addCommandHandler(commandClass: KClass<*>, handler: Any): DefaultMessageBus { + if (commandHandlers.containsKey(commandClass)) throw RuntimeException("Command handler already exists") + commandHandlers[commandClass] = handler as KSuspendFunction2 + return this + } + override fun addCommandHandler(vararg pairs: Pair, Any>): DefaultMessageBus { pairs.forEach { - if (commandHandlers.containsKey(it.first)) throw RuntimeException("${it.first} handler already exists") - commandHandlers[it.first] = it.second as (Command, UnitOfWork) -> Unit + addCommandHandler(it.first, it.second) } return this } - override fun handle(message: Message, unitOfWork: UnitOfWork) { + override suspend fun handle(message: Message, unitOfWork: UnitOfWork) { val queue = mutableListOf(message) while (queue.size > 0) { when (val msg = queue.removeAt(0)) { @@ -43,18 +53,18 @@ class DefaultMessageBus( } } - private fun handleEvent(event: Event, queue: MutableList, unitOfWork: UnitOfWork) = + private suspend fun handleEvent(event: Event, queue: MutableList, unitOfWork: UnitOfWork) = eventHandlers.getOrDefault(event::class, emptyList()).forEach { try { it(event, unitOfWork) queue.addAll(unitOfWork.events) } catch (e: Exception) { - logger.error(e) {"Unable to handle event" } + logger.error(e) { "Unable to handle event" } throw e } } - private fun handleCommand(command: Command, queue: MutableList, unitOfWork: UnitOfWork) = + private suspend fun handleCommand(command: Command, queue: MutableList, unitOfWork: UnitOfWork) = commandHandlers[command::class]?.also { it(command, unitOfWork) queue.addAll(unitOfWork.events) diff --git a/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/InMemoryAggregateRepository.kt b/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/InMemoryAggregateRepository.kt index 9ef1d85..130d330 100644 --- a/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/InMemoryAggregateRepository.kt +++ b/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/InMemoryAggregateRepository.kt @@ -4,28 +4,22 @@ import java.time.Clock import java.time.LocalDateTime open class InMemoryAggregateRepository, I>( - objects: Set = emptySet(), private val clock: Clock = Clock.systemUTC() -) : - AggregateRepository { +) : AggregateRepository { private val objects: MutableMap> = mutableMapOf() - init { - objects.forEach(::save) - } - - override fun find(id: I): PersistedAggregate? = objects[id] + override suspend fun find(id: I): PersistedAggregate? = objects[id] - override fun get(id: I): PersistedAggregate = + override suspend fun get(id: I): PersistedAggregate = getOrThrow(id) { NoSuchElementException() } - override fun getOrThrow(id: I, block: () -> Throwable): PersistedAggregate = + override suspend fun getOrThrow(id: I, block: () -> Throwable): PersistedAggregate = find(id) ?: throw block() - override fun exists(id: I): Boolean = objects.containsKey(id) + override suspend fun exists(id: I): Boolean = objects.containsKey(id) - override fun save(aggregate: T) { + override suspend fun save(aggregate: T) { val now = LocalDateTime.now(clock) objects[aggregate.id] = objects[aggregate.id]?.let { diff --git a/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/MessageBus.kt b/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/MessageBus.kt index 08d496f..49f1595 100644 --- a/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/MessageBus.kt +++ b/acme-domain/acme-domain-core/src/main/kotlin/com/acme/core/MessageBus.kt @@ -4,7 +4,9 @@ import kotlin.reflect.KClass interface MessageBus { fun copy(): DefaultMessageBus + fun addEventHandler(eventClass: KClass<*>, handler: Any): DefaultMessageBus fun addEventHandler(vararg pairs: Pair, Any>): DefaultMessageBus + fun addCommandHandler(commandClass: KClass<*>, handler: Any): DefaultMessageBus fun addCommandHandler(vararg pairs: Pair, Any>): DefaultMessageBus - fun handle(message: Message, unitOfWork: UnitOfWork) + suspend fun handle(message: Message, unitOfWork: UnitOfWork) } diff --git a/acme-domain/acme-domain-core/src/test/kotlin/com/acme/core/InMemoryAggregateRepositoryTest.kt b/acme-domain/acme-domain-core/src/test/kotlin/com/acme/core/InMemoryAggregateRepositoryTest.kt index 65a288e..99727f3 100644 --- a/acme-domain/acme-domain-core/src/test/kotlin/com/acme/core/InMemoryAggregateRepositoryTest.kt +++ b/acme-domain/acme-domain-core/src/test/kotlin/com/acme/core/InMemoryAggregateRepositoryTest.kt @@ -15,7 +15,7 @@ class InMemoryAggregateRepositoryTest : ShouldSpec({ should("contain aggregates provided through constructor") { val aggregate = FakeAggregate("id123") - val repo = InMemoryAggregateRepository(setOf(aggregate)) + val repo = InMemoryAggregateRepository().apply { save(aggregate) } repo.exists(aggregate.id).shouldBeTrue() val persistedAggregate = repo.get(aggregate.id) persistedAggregate.aggregate.shouldBe(aggregate) diff --git a/acme-domain/acme-domain-core/src/test/kotlin/com/acme/core/MessageBusTest.kt b/acme-domain/acme-domain-core/src/test/kotlin/com/acme/core/MessageBusTest.kt index ee1c912..820d018 100644 --- a/acme-domain/acme-domain-core/src/test/kotlin/com/acme/core/MessageBusTest.kt +++ b/acme-domain/acme-domain-core/src/test/kotlin/com/acme/core/MessageBusTest.kt @@ -36,17 +36,19 @@ class MessageBusTest : ShouldSpec({ var commandCalls = 0 var eventCalls = 0 - val onCommand = { _: FakeCommand, uow: FakeUnitOfWork -> + suspend fun onCommand(cmd: FakeCommand, uow: FakeUnitOfWork) { commandCalls++ uow.addEvent(FakeEvent("World")) } - val onEvent = { _: FakeEvent, _: FakeUnitOfWork -> + suspend fun onEvent(event: FakeEvent, uow: FakeUnitOfWork) { eventCalls++ } - mb.addCommandHandler(FakeCommand::class to onCommand) - mb.addEventHandler(FakeEvent::class to onEvent) + val e = ::onEvent + + mb.addCommandHandler(FakeCommand::class to ::onCommand) + mb.addEventHandler(FakeEvent::class, ::onEvent) mb.handle(FakeCommand("Hello"), FakeUnitOfWork()) @@ -65,11 +67,11 @@ class MessageBusTest : ShouldSpec({ should("not swallow command handler exceptions") { val mb = DefaultMessageBus() - val onCommand = { _: FakeCommand, _: FakeUnitOfWork -> + suspend fun onCommand(command: FakeCommand, uow: FakeUnitOfWork) { throw FakeException("command") } - mb.addCommandHandler(FakeCommand::class to onCommand) + mb.addCommandHandler(FakeCommand::class to ::onCommand) val exc = shouldThrow { mb.handle(FakeCommand("Hello"), FakeUnitOfWork()) @@ -81,16 +83,16 @@ class MessageBusTest : ShouldSpec({ should("not swallow event handler exceptions") { val mb = DefaultMessageBus() - val onCommand = { _: FakeCommand, uow: FakeUnitOfWork -> + suspend fun onCommand(command: FakeCommand, uow: FakeUnitOfWork) { uow.addEvent(FakeEvent("World")) } - val onEvent = { _: FakeEvent, _: FakeUnitOfWork -> + suspend fun onEvent(event: FakeEvent, uow: FakeUnitOfWork) { throw FakeException("event") } - mb.addCommandHandler(FakeCommand::class to onCommand) - mb.addEventHandler(FakeEvent::class to onEvent) + mb.addCommandHandler(FakeCommand::class to ::onCommand) + mb.addEventHandler(FakeEvent::class to ::onEvent) val exc = shouldThrow { mb.handle(FakeCommand("Hello"), FakeUnitOfWork()) @@ -102,25 +104,22 @@ class MessageBusTest : ShouldSpec({ should("throw exception when repeated handlers are added") { val mb = DefaultMessageBus() - val onCommand = { _: FakeCommand, uow: FakeUnitOfWork -> + suspend fun onCommand(command: FakeCommand, uow: FakeUnitOfWork) { uow.addEvent(FakeEvent("World")) } - val onEvent = { _: FakeEvent, _: FakeUnitOfWork -> + suspend fun onEvent(event: FakeEvent, uow: FakeUnitOfWork) { throw FakeException("event") } - mb.addCommandHandler(FakeCommand::class to onCommand) - mb.addEventHandler(FakeEvent::class to onEvent) - val exc = shouldThrow { - mb.addCommandHandler(FakeCommand::class to onCommand) + mb.addCommandHandler(FakeCommand::class to ::onCommand, FakeCommand::class to ::onCommand) } - exc.message.shouldBe("${FakeCommand::class} handler already exists") + exc.message.shouldBe("Command handler already exists") val exc2 = shouldThrow { - mb.addEventHandler(FakeEvent::class to onEvent) + mb.addEventHandler(FakeEvent::class to ::onEvent, FakeEvent::class to ::onEvent) } - exc2.message.shouldBe("$onEvent has already been registered") + exc2.message.shouldBe("Event handler has already been registered") } }) diff --git a/acme-domain/acme-domain-scheduling/build.gradle.kts b/acme-domain/acme-domain-scheduling/build.gradle.kts index bce9e1d..ee490ac 100644 --- a/acme-domain/acme-domain-scheduling/build.gradle.kts +++ b/acme-domain/acme-domain-scheduling/build.gradle.kts @@ -4,6 +4,7 @@ plugins { } dependencies { + implementation(kotlin("reflect")) implementation(project(":acme-lib:acme-lib-serialization")) implementation(libs.net.sizovs.pipelinr) api(project(":acme-domain:acme-domain-core")) diff --git a/acme-domain/acme-domain-scheduling/src/main/kotlin/com/acme/scheduling/handlers.kt b/acme-domain/acme-domain-scheduling/src/main/kotlin/com/acme/scheduling/handlers.kt index 406dce9..9c6e68a 100644 --- a/acme-domain/acme-domain-scheduling/src/main/kotlin/com/acme/scheduling/handlers.kt +++ b/acme-domain/acme-domain-scheduling/src/main/kotlin/com/acme/scheduling/handlers.kt @@ -3,45 +3,47 @@ package com.acme.scheduling import com.acme.core.CommandValidationException import com.acme.core.DefaultMessageBus -val createPractice = { command: CreatePracticeCommand, uow: SchedulingUnitOfWork -> +suspend fun createPractice(command: CreatePracticeCommand, uow: SchedulingUnitOfWork) { Practice( id = command.id, owner = command.owner, name = command.name, contactPoints = command.contactPoints ) - .also(uow.repositories.practices::save) + .also { + uow.repositories.practices.save(it) + } .also { uow.addEvent(PracticeCreatedEvent(it)) } } -val createClient = { command: CreateClientCommand, uow: SchedulingUnitOfWork -> +suspend fun createClient(command: CreateClientCommand, uow: SchedulingUnitOfWork) { Client( id = command.id, names = setOf(command.name), gender = command.gender, contactPoints = command.contactPoints - ).also(uow.repositories.clients::save) + ).also { uow.repositories.clients.save(it) } .also { uow.addEvent(ClientCreatedEvent(it)) } } -val createPractitioner = { command: CreatePractitionerCommand, uow: SchedulingUnitOfWork -> +suspend fun createPractitioner(command: CreatePractitionerCommand, uow: SchedulingUnitOfWork) { Practitioner( id = command.id, user = command.user, gender = command.gender, names = setOf(command.name), contactPoints = command.contactPoints - ).also(uow.repositories.practitioners::save) + ).also { uow.repositories.practitioners.save(it) } .also { uow.addEvent(PractitionerCreatedEvent(it)) } } -val createAppointment = { command: CreateAppointmentCommand, uow: SchedulingUnitOfWork -> +suspend fun createAppointment(command: CreateAppointmentCommand, uow: SchedulingUnitOfWork) { val errors = mutableSetOf() if (!uow.repositories.practices.exists(command.practice)) { errors.add( @@ -85,7 +87,7 @@ val createAppointment = { command: CreateAppointmentCommand, uow: SchedulingUnit state = command.state, period = command.period, ) - .also(uow.repositories.appointments::save) + .also { uow.repositories.appointments.save(it) } .also { uow.addEvent( AppointmentCreatedEvent( @@ -100,7 +102,7 @@ val createAppointment = { command: CreateAppointmentCommand, uow: SchedulingUnit } } -val markAppointmentAttended = { command: MarkAppointmentAttendedCommand, uow: SchedulingUnitOfWork -> +suspend fun markAppointmentAttended(command: MarkAppointmentAttendedCommand, uow: SchedulingUnitOfWork) { uow.repositories.appointments.getOrThrow(command.appointment) { CommandValidationException( command, @@ -110,16 +112,15 @@ val markAppointmentAttended = { command: MarkAppointmentAttendedCommand, uow: Sc "Invalid appointment" ) ) - } - .let { it.aggregate } - .markAttended() - .also(uow.repositories.appointments::save) - .also { - uow.addEvent(AppointmentAttendedEvent(it.id)) - } + }.aggregate + .markAttended() + .also { uow.repositories.appointments.save(it) } + .also { + uow.addEvent(AppointmentAttendedEvent(it.id)) + } } -val markAppointmentUnattended = { command: MarkAppointmentUnattendedCommand, uow: SchedulingUnitOfWork -> +suspend fun markAppointmentUnattended(command: MarkAppointmentUnattendedCommand, uow: SchedulingUnitOfWork) { uow.repositories.appointments.getOrThrow(command.appointment) { CommandValidationException( command, @@ -129,16 +130,14 @@ val markAppointmentUnattended = { command: MarkAppointmentUnattendedCommand, uow "Invalid appointment" ) ) - }.let { - it.aggregate - }.markUnattended() - .also(uow.repositories.appointments::save) + }.aggregate.markUnattended() + .also { uow.repositories.appointments.save(it) } .also { uow.addEvent(AppointmentUnattendedEvent(it.id)) } } -val cancelAppointment = { command: CancelAppointmentCommand, uow: SchedulingUnitOfWork -> +suspend fun cancelAppointment(command: CancelAppointmentCommand, uow: SchedulingUnitOfWork) { uow.repositories.appointments.getOrThrow(command.appointment) { CommandValidationException( command, @@ -148,8 +147,8 @@ val cancelAppointment = { command: CancelAppointmentCommand, uow: SchedulingUnit "Invalid appointment" ) ) - }.let { it.aggregate }.cancel() - .also(uow.repositories.appointments::save) + }.aggregate.cancel() + .also { uow.repositories.appointments.save(it) } .also { uow.addEvent(AppointmentCanceledEvent(it.id)) } @@ -157,11 +156,11 @@ val cancelAppointment = { command: CancelAppointmentCommand, uow: SchedulingUnit val schedulingMessageBus = DefaultMessageBus() .addCommandHandler( - CreatePracticeCommand::class to createPractice, - CreatePractitionerCommand::class to createPractitioner, - CreateClientCommand::class to createClient, - CreateAppointmentCommand::class to createAppointment, - MarkAppointmentAttendedCommand::class to markAppointmentAttended, - MarkAppointmentUnattendedCommand::class to markAppointmentUnattended, - CancelAppointmentCommand::class to cancelAppointment, + CreatePracticeCommand::class to ::createPractice, + CreatePractitionerCommand::class to ::createPractitioner, + CreateClientCommand::class to ::createClient, + CreateAppointmentCommand::class to ::createAppointment, + MarkAppointmentAttendedCommand::class to ::markAppointmentAttended, + MarkAppointmentUnattendedCommand::class to ::markAppointmentUnattended, + CancelAppointmentCommand::class to ::cancelAppointment, ) diff --git a/acme-domain/acme-domain-scheduling/src/test/kotlin/com/acme/scheduling/CommandHandlerTest.kt b/acme-domain/acme-domain-scheduling/src/test/kotlin/com/acme/scheduling/CommandHandlerTest.kt index ed478fb..b7b1689 100644 --- a/acme-domain/acme-domain-scheduling/src/test/kotlin/com/acme/scheduling/CommandHandlerTest.kt +++ b/acme-domain/acme-domain-scheduling/src/test/kotlin/com/acme/scheduling/CommandHandlerTest.kt @@ -185,11 +185,13 @@ class CommandHandlerTest : ShouldSpec({ period = Period.Bounded(LocalDateTime.MIN, LocalDateTime.MAX), ) - val uow = InMemorySchedulingUnitOfWork( - practices = setOf(practice), - practitioners = setOf(practitioner), - clients = setOf(client), - ) + val uow = InMemorySchedulingUnitOfWork().apply { + with(repositories) { + practices.save(practice) + practitioners.save(practitioner) + clients.save(client) + } + } createAppointment(cmd, uow) @@ -222,7 +224,8 @@ class CommandHandlerTest : ShouldSpec({ ) val cmd = MarkAppointmentAttendedCommand(Appointment.Id("Appointment123")) - val uow = InMemorySchedulingUnitOfWork(appointments = setOf(appointment)) + val uow = InMemorySchedulingUnitOfWork() + uow.repositories.appointments.save(appointment) markAppointmentAttended(cmd, uow) @@ -264,7 +267,8 @@ class CommandHandlerTest : ShouldSpec({ ) val cmd = MarkAppointmentUnattendedCommand(Appointment.Id("Appointment123")) - val uow = InMemorySchedulingUnitOfWork(appointments = setOf(appointment)) + val uow = InMemorySchedulingUnitOfWork() + uow.repositories.appointments.save(appointment) markAppointmentUnattended(cmd, uow) @@ -306,7 +310,8 @@ class CommandHandlerTest : ShouldSpec({ ) val cmd = CancelAppointmentCommand(Appointment.Id("Appointment123")) - val uow = InMemorySchedulingUnitOfWork(appointments = setOf(appointment)) + val uow = InMemorySchedulingUnitOfWork() + uow.repositories.appointments.save(appointment) cancelAppointment(cmd, uow) diff --git a/acme-domain/acme-domain-scheduling/src/test/kotlin/com/acme/scheduling/InMemorySchedulingUnitOfWork.kt b/acme-domain/acme-domain-scheduling/src/test/kotlin/com/acme/scheduling/InMemorySchedulingUnitOfWork.kt index dc7e601..910bc1e 100644 --- a/acme-domain/acme-domain-scheduling/src/test/kotlin/com/acme/scheduling/InMemorySchedulingUnitOfWork.kt +++ b/acme-domain/acme-domain-scheduling/src/test/kotlin/com/acme/scheduling/InMemorySchedulingUnitOfWork.kt @@ -4,18 +4,13 @@ import com.acme.core.AbstractUnitOfWork import com.acme.core.Event import com.acme.core.InMemoryAggregateRepository -class InMemorySchedulingUnitOfWork( - appointments: Set = emptySet(), - clients: Set = emptySet(), - practitioners: Set = emptySet(), - practices: Set = emptySet(), -) : AbstractUnitOfWork(), SchedulingUnitOfWork { +class InMemorySchedulingUnitOfWork : AbstractUnitOfWork(), SchedulingUnitOfWork { override val repositories = object : SchedulingPersistenceModule { - override val appointments = InMemoryAggregateRepository(appointments) - override val clients = InMemoryAggregateRepository(clients) - override val practitioners = InMemoryAggregateRepository(practitioners) - override val practices = InMemoryAggregateRepository(practices) + override val appointments = InMemoryAggregateRepository() + override val clients = InMemoryAggregateRepository() + override val practitioners = InMemoryAggregateRepository() + override val practices = InMemoryAggregateRepository() } private var _events: MutableList = mutableListOf() diff --git a/acme-web/acme-web-api-test/build.gradle.kts b/acme-web/acme-web-api-test/build.gradle.kts index 8e4c78a..b7799ee 100644 --- a/acme-web/acme-web-api-test/build.gradle.kts +++ b/acme-web/acme-web-api-test/build.gradle.kts @@ -11,6 +11,7 @@ dependencies { testImplementation(project(":acme-lib:acme-lib-liquibase")) testImplementation(project(":acme-lib:acme-lib-serialization")) testImplementation(libs.com.github.java.json.tools.json.schema.validator) + testImplementation(libs.com.zaxxer.hikariCP) testImplementation(libs.io.kotest.extensions.kotest.assertions.ktor) testImplementation(libs.io.kotest.kotest.assertions.json) testImplementation(libs.io.kotest.kotest.runner.junit5) @@ -19,4 +20,5 @@ dependencies { testImplementation(libs.io.ktor.ktor.client.logging) testImplementation(libs.io.ktor.ktor.server.netty) testImplementation(libs.org.testcontainers.postgresql) + testImplementation(libs.org.testcontainers.r2dbc) } diff --git a/acme-web/acme-web-api-test/src/test/kotlin/com/acme/web/api/test/StandaloneAcmeWebServerExtension.kt b/acme-web/acme-web-api-test/src/test/kotlin/com/acme/web/api/test/StandaloneAcmeWebServerExtension.kt index 847d9e6..fdf1d4a 100644 --- a/acme-web/acme-web-api-test/src/test/kotlin/com/acme/web/api/test/StandaloneAcmeWebServerExtension.kt +++ b/acme-web/acme-web-api-test/src/test/kotlin/com/acme/web/api/test/StandaloneAcmeWebServerExtension.kt @@ -1,11 +1,14 @@ package com.acme.web.api.test +import com.acme.liquibase.update import com.acme.web.api.core.ktor.AuthenticationConfiguration import com.acme.web.api.core.ktor.DataSourceConfiguration import com.acme.web.api.core.ktor.KetoConfiguration import com.acme.web.api.core.ktor.MainConfiguration import com.acme.web.api.core.ktor.main import com.acme.web.api.security.ktor.HeaderAuthConfiguration +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource import io.kotest.core.listeners.ProjectListener import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe @@ -25,6 +28,7 @@ import io.ktor.server.engine.embeddedServer import io.ktor.server.netty.NettyApplicationEngine import org.testcontainers.containers.BindMode import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.PostgreSQLContainer import java.io.IOException import java.net.ServerSocket @@ -48,6 +52,8 @@ class StandaloneAcmeWebServerExtension( withExposedPorts(4466, 4467) } + private val postgres = PostgreSQLContainer("postgres:15.5") + val http = HttpClient { expectSuccess = false @@ -69,8 +75,23 @@ class StandaloneAcmeWebServerExtension( } } + private fun runMigrations() { + val ds = HikariDataSource( + HikariConfig().apply { + jdbcUrl = postgres.getJdbcUrl() + username = postgres.username + password = postgres.password + isAutoCommit = false + } + ) + update(ds.connection) + ds.close() + } + override suspend fun beforeProject() { keto.start() + postgres.start() + runMigrations() server = embeddedServer( io.ktor.server.netty.Netty, @@ -79,9 +100,9 @@ class StandaloneAcmeWebServerExtension( main( MainConfiguration( datasource = DataSourceConfiguration( - jdbcUrl = "jdbc:tc:postgresql:11.5:///test?TC_INITFUNCTION=com.acme.liquibase.LiquibaseTestContainerInitializerKt::update", - username = "acme", - password = "password" + r2dbcUrl = postgres.getJdbcUrl().replace("jdbc:", "r2dbc:"), + username = postgres.username, + password = postgres.password, ), authentication = AuthenticationConfiguration( headers = HeaderAuthConfiguration( @@ -109,6 +130,7 @@ class StandaloneAcmeWebServerExtension( override suspend fun afterProject() { server?.stop(3000, 5000) keto.stop() + postgres.stop() } companion object { diff --git a/acme-web/acme-web-api/application.conf b/acme-web/acme-web-api/application.conf deleted file mode 100644 index b41f201..0000000 --- a/acme-web/acme-web-api/application.conf +++ /dev/null @@ -1,3 +0,0 @@ -datasource { - jdbcUrl = "jdbc:tc:postgresql:11.5:///acme?TC_INITFUNCTION=com.acme.liquibase.LiquibaseTestContainerInitializerKt::update" -} diff --git a/acme-web/acme-web-api/build.gradle.kts b/acme-web/acme-web-api/build.gradle.kts index 6391f63..68f057d 100644 --- a/acme-web/acme-web-api/build.gradle.kts +++ b/acme-web/acme-web-api/build.gradle.kts @@ -25,8 +25,6 @@ dependencies { implementation(project(":acme-lib:acme-lib-validation")) implementation(libs.am.ik.timeflake.timeflake4j) implementation(libs.ch.qos.logback.logback.classic) - implementation(libs.com.michael.bull.kotlin.coroutines.jdbc) - implementation(libs.com.zaxxer.hikariCP) implementation(libs.io.github.oshai.kotlin.logging.jvm) implementation(libs.io.ktor.ktor.client.content.negotiation) implementation(libs.io.ktor.ktor.client.java) @@ -46,16 +44,17 @@ dependencies { implementation(libs.io.opentelemetry.opentelemetry.sdk) implementation(libs.org.jetbrains.kotlinx.kotlinx.coroutines.core) implementation(libs.org.postgresql) + implementation(libs.org.postgresql.r2dbc) implementation(libs.org.slf4j.slf4j.api) runtimeOnly(libs.org.glassfish.jakarta.el) testImplementation(project(":acme-lib:acme-lib-liquibase")) testImplementation(libs.com.mattbertolini.liquibase.slf4j) + testImplementation(libs.com.zaxxer.hikariCP) testImplementation(libs.io.kotest.kotest.runner.junit5) + testImplementation(libs.org.slf4j.slf4j.simple) testImplementation(libs.org.testcontainers.postgresql) - - localRuntimeOnly(project(":acme-lib:acme-lib-liquibase")) - localRuntimeOnly(libs.org.testcontainers.postgresql) + testImplementation(libs.org.testcontainers.r2dbc) } swaggerSources { @@ -87,11 +86,6 @@ jib { } tasks { - getByName("run") { - classpath += localRuntimeOnly - systemProperty("logback.configurationFile", projectDir.resolve("logback-dev.xml")) - } - getByName("processResources") { dependsOn(generateSwaggerUI) } diff --git a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/core/ktor/DataSourceConfiguration.kt b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/core/ktor/DataSourceConfiguration.kt index 8c1457a..4e7600a 100644 --- a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/core/ktor/DataSourceConfiguration.kt +++ b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/core/ktor/DataSourceConfiguration.kt @@ -3,14 +3,14 @@ package com.acme.web.api.core.ktor import io.ktor.server.config.ApplicationConfig data class DataSourceConfiguration( - val jdbcUrl: String, + val r2dbcUrl: String, val username: String, val password: String ) { companion object { fun fromConfig(config: ApplicationConfig) = DataSourceConfiguration( - config.property("jdbcUrl").getString(), + config.property("r2dbcUrl").getString(), config.property("username").getString(), config.property("password").getString() ) diff --git a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/core/ktor/factory.kt b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/core/ktor/factory.kt index bacca73..934e077 100644 --- a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/core/ktor/factory.kt +++ b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/core/ktor/factory.kt @@ -3,8 +3,6 @@ package com.acme.web.api.core.ktor import com.acme.ktor.server.logging.logger import com.acme.web.api.core.defaultJson import com.acme.web.api.scheduling.ktor.scheduling -import com.zaxxer.hikari.HikariConfig -import com.zaxxer.hikari.HikariDataSource import io.ktor.http.ContentType import io.ktor.server.application.Application import io.ktor.server.application.call @@ -13,29 +11,22 @@ import io.ktor.server.response.respondText import io.ktor.server.routing.get import io.ktor.server.routing.route import io.ktor.server.routing.routing +import io.r2dbc.spi.ConnectionFactories +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.ConnectionFactoryOptions import kotlinx.serialization.json.Json -import org.jooq.SQLDialect -import org.jooq.impl.DataSourceConnectionProvider -import org.jooq.impl.DefaultConfiguration -import org.jooq.impl.DefaultTransactionProvider -import javax.sql.DataSource +import org.jooq.impl.DSL -fun dataSourceFactory(config: DataSourceConfiguration) = HikariDataSource( - HikariConfig().apply { - jdbcUrl = config.jdbcUrl - username = config.username - password = config.password - isAutoCommit = false - } +fun connectionFactory(config: DataSourceConfiguration): ConnectionFactory = ConnectionFactories.get( + ConnectionFactoryOptions.parse(config.r2dbcUrl).mutate() + .option(ConnectionFactoryOptions.USER, config.username) + .option(ConnectionFactoryOptions.PASSWORD, config.password) + .build() ) -fun jooqConfigFactory(dataSource: DataSource) = DefaultConfiguration().apply { - set(dataSource) - set(DefaultTransactionProvider(DataSourceConnectionProvider(dataSource), true)) - set(SQLDialect.POSTGRES) -} +fun jooqConfigFactory(connectionFactory: ConnectionFactory) = DSL.using(connectionFactory) -fun dataConfigFactory(config: DataSourceConfiguration) = dataSourceFactory(config).let { +fun dataConfigFactory(config: DataSourceConfiguration) = connectionFactory(config).let { it to jooqConfigFactory(it) } @@ -46,11 +37,11 @@ fun Application.main(config: MainConfiguration, json: Json = defaultJson) { config.authentication.apply(this) } - val (_, jooqConfig) = dataConfigFactory(config.datasource) + val (_, jooq) = dataConfigFactory(config.datasource) routing { route("/scheduling") { - scheduling(jooqConfig, config.keto, "/scheduling") + scheduling(jooq.configuration(), config.keto, "/scheduling") } route("/health/check") { get { diff --git a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/JooqSchedulingWebViews.kt b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/JooqSchedulingWebViews.kt index 6306b65..3ed0195 100644 --- a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/JooqSchedulingWebViews.kt +++ b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/JooqSchedulingWebViews.kt @@ -9,6 +9,7 @@ import com.acme.sql.web_server.tables.references.PRACTICE_CONTACT_POINTS import com.acme.sql.web_server.tables.references.PRACTITIONERS import com.acme.sql.web_server.tables.references.PRACTITIONER_CONTACT_POINTS import com.acme.sql.web_server.tables.references.PRACTITIONER_NAMES +import kotlinx.coroutines.reactive.awaitFirstOrNull import org.jooq.Configuration import org.jooq.DSLContext import org.jooq.Records.mapping @@ -41,7 +42,7 @@ class JooqSchedulingWebViews(private val dsl: DSLContext) : SchedulingWebViews { constructor(configuration: Configuration) : this(configuration.dsl()) - override fun findPractice(id: String): PracticeRecord? = + override suspend fun findPractice(id: String): PracticeRecord? = dsl.select( PRACTICES.ID, PRACTICES.NAME, multiset( @@ -65,22 +66,15 @@ class JooqSchedulingWebViews(private val dsl: DSLContext) : SchedulingWebViews { ) .from(PRACTICES) .where(PRACTICES.ID.eq(id)) - .fetch() - .let { result -> - if (result.isEmpty()) { - null - } else { - result.first().let { - PracticeRecord( - id = it[PRACTICES.ID]!!, - name = it[PRACTICES.NAME]!!, - contactPoints = (it["contactPoints"] as List) - ) - } - } + .awaitFirstOrNull()?.let { + PracticeRecord( + id = it[PRACTICES.ID]!!, + name = it[PRACTICES.NAME]!!, + contactPoints = (it["contactPoints"] as List) + ) } - override fun findClient(id: String): ClientRecord? = + override suspend fun findClient(id: String): ClientRecord? = dsl.select( CLIENTS.ID, CLIENTS.GENDER, multiset( @@ -123,23 +117,16 @@ class JooqSchedulingWebViews(private val dsl: DSLContext) : SchedulingWebViews { ) .from(CLIENTS) .where(CLIENTS.ID.eq(id)) - .fetch() - .let { result -> - if (result.isEmpty()) { - null - } else { - result.first().let { - ClientRecord( - id = it[CLIENTS.ID]!!, - gender = it[CLIENTS.GENDER]!!, - names = (it["names"] as List), - contactPoints = (it["contactPoints"] as List) - ) - } - } + .awaitFirstOrNull()?.let { + ClientRecord( + id = it[CLIENTS.ID]!!, + gender = it[CLIENTS.GENDER]!!, + names = (it["names"] as List), + contactPoints = (it["contactPoints"] as List) + ) } - override fun findPractitioner(id: String): PractitionerRecord? = + override suspend fun findPractitioner(id: String): PractitionerRecord? = dsl.select( PRACTITIONERS.ID, PRACTITIONERS.GENDER, multiset( @@ -182,19 +169,16 @@ class JooqSchedulingWebViews(private val dsl: DSLContext) : SchedulingWebViews { ) .from(PRACTITIONERS) .where(PRACTITIONERS.ID.eq(id)) - .fetch() - .let { result -> - result.first().let { - PractitionerRecord( - id = it[PRACTITIONERS.ID]!!, - gender = it[PRACTITIONERS.GENDER]!!, - names = it["names"] as List, - contactPoints = (it["contactPoints"] as List) - ) - } + .awaitFirstOrNull()?.let { + PractitionerRecord( + id = it[PRACTITIONERS.ID]!!, + gender = it[PRACTITIONERS.GENDER]!!, + names = it["names"] as List, + contactPoints = (it["contactPoints"] as List) + ) } - override fun findAppointment(id: String): AppointmentRecord? = + override suspend fun findAppointment(id: String): AppointmentRecord? = dsl.selectFrom( APPOINTMENTS .leftJoin(PRACTITIONERS) @@ -205,7 +189,7 @@ class JooqSchedulingWebViews(private val dsl: DSLContext) : SchedulingWebViews { .on(CLIENTS.ID.eq(APPOINTMENTS.CLIENT_ID)) ) .where(APPOINTMENTS.ID.eq(id)) - .fetchOne()?.let { + .awaitFirstOrNull()?.let { AppointmentRecord( id = it[APPOINTMENTS.ID]!!, practiceId = it[PRACTICES.ID]!!, diff --git a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/SchedulingWebViews.kt b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/SchedulingWebViews.kt index d2db13b..e4b8b94 100644 --- a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/SchedulingWebViews.kt +++ b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/SchedulingWebViews.kt @@ -1,26 +1,26 @@ package com.acme.web.api.scheduling.data interface SchedulingWebViews { - fun findPractice(id: String): PracticeRecord? - fun findPracticeOrThrow( + suspend fun findPractice(id: String): PracticeRecord? + suspend fun findPracticeOrThrow( id: String, block: () -> Throwable = ::NoSuchElementException ): PracticeRecord = findPractice(id) ?: throw block() - fun findClient(id: String): ClientRecord? - fun findClientOrThrow( + suspend fun findClient(id: String): ClientRecord? + suspend fun findClientOrThrow( id: String, block: () -> Throwable = ::NoSuchElementException ): ClientRecord = findClient(id) ?: throw block() - fun findPractitioner(id: String): PractitionerRecord? - fun findPractitionerOrThrow( + suspend fun findPractitioner(id: String): PractitionerRecord? + suspend fun findPractitionerOrThrow( id: String, block: () -> Throwable = ::NoSuchElementException ): PractitionerRecord = findPractitioner(id) ?: throw block() - fun findAppointment(id: String): AppointmentRecord? - fun findAppointmentOrThrow( + suspend fun findAppointment(id: String): AppointmentRecord? + suspend fun findAppointmentOrThrow( id: String, block: () -> Throwable = ::NoSuchElementException ): AppointmentRecord = findAppointment(id) ?: throw block() diff --git a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/events.kt b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/events.kt index 014b00f..b869d4b 100644 --- a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/events.kt +++ b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/data/events.kt @@ -21,9 +21,10 @@ import com.acme.sql.web_server.tables.references.PRACTICE_CONTACT_POINTS import com.acme.sql.web_server.tables.references.PRACTITIONERS import com.acme.sql.web_server.tables.references.PRACTITIONER_CONTACT_POINTS import com.acme.sql.web_server.tables.references.PRACTITIONER_NAMES +import kotlinx.coroutines.reactive.awaitFirst import java.time.LocalDateTime -val onPractitionerCreated: (PractitionerCreatedEvent, SchedulingJooqUnitOfWork) -> Unit = { event, uow -> +suspend fun onPractitionerCreated(event: PractitionerCreatedEvent, uow: SchedulingJooqUnitOfWork) { with(uow.dsl) { val practitioner = event.practitioner @@ -34,7 +35,7 @@ val onPractitionerCreated: (PractitionerCreatedEvent, SchedulingJooqUnitOfWork) ).values( practitioner.id.value, practitioner.gender.toString() - ).execute() + ).returningResult(PRACTITIONERS.ID).awaitFirst() practitioner.names.map { val (start, end) = it.period.getTimeValues() @@ -57,7 +58,9 @@ val onPractitionerCreated: (PractitionerCreatedEvent, SchedulingJooqUnitOfWork) start, end, ) - }.forEach(::execute) + }.forEach { + it.returning().awaitFirst() + } practitioner.contactPoints.map { insertInto( @@ -72,11 +75,13 @@ val onPractitionerCreated: (PractitionerCreatedEvent, SchedulingJooqUnitOfWork) it.toSystemString(), it.getVerifiedAtValue() ) - }.forEach(::execute) + }.forEach { + it.returning().awaitFirst() + } } } -val onPracticeCreated: (PracticeCreatedEvent, SchedulingJooqUnitOfWork) -> Unit = { event, uow -> +suspend fun onPracticeCreated(event: PracticeCreatedEvent, uow: SchedulingJooqUnitOfWork) { with(uow.dsl) { val practice = event.practice @@ -87,7 +92,7 @@ val onPracticeCreated: (PracticeCreatedEvent, SchedulingJooqUnitOfWork) -> Unit ).values( practice.id.value, practice.name.value - ).execute() + ).returning().awaitFirst() practice.contactPoints.map { insertInto( @@ -102,11 +107,13 @@ val onPracticeCreated: (PracticeCreatedEvent, SchedulingJooqUnitOfWork) -> Unit it.toSystemString(), it.getVerifiedAtValue(), ) - }.forEach(::execute) + }.forEach { + it.returning().awaitFirst() + } } } -val onClientCreated: (ClientCreatedEvent, uow: SchedulingJooqUnitOfWork) -> Unit = { event, uow -> +suspend fun onClientCreated(event: ClientCreatedEvent, uow: SchedulingJooqUnitOfWork) { with(uow.dsl) { val client = event.client @@ -117,7 +124,7 @@ val onClientCreated: (ClientCreatedEvent, uow: SchedulingJooqUnitOfWork) -> Unit ).values( client.id.value, client.gender.toString() - ).execute() + ).returning().awaitFirst() client.names.map { val (start, end) = it.period.getTimeValues() @@ -140,7 +147,9 @@ val onClientCreated: (ClientCreatedEvent, uow: SchedulingJooqUnitOfWork) -> Unit start, end, ) - }.forEach(::execute) + }.forEach { + it.returning().awaitFirst() + } client.contactPoints.map { insertInto( @@ -155,11 +164,13 @@ val onClientCreated: (ClientCreatedEvent, uow: SchedulingJooqUnitOfWork) -> Unit it.toSystemString(), it.getVerifiedAtValue(), ) - }.forEach(::execute) + }.forEach { + it.returning().awaitFirst() + } } } -val onAppointmentCreated: (AppointmentCreatedEvent, SchedulingJooqUnitOfWork) -> Unit = { event, uow -> +suspend fun onAppointmentCreated(event: AppointmentCreatedEvent, uow: SchedulingJooqUnitOfWork) { val (start, end) = event.period.getTimeValues() uow.dsl.insertInto( @@ -179,25 +190,29 @@ val onAppointmentCreated: (AppointmentCreatedEvent, SchedulingJooqUnitOfWork) -> event.state.toString(), start, end - ).execute() + ).returning().awaitFirst() } -private fun updateAppointmentState(appointmentId: Appointment.Id, state: String, uow: SchedulingJooqUnitOfWork) { +private suspend fun updateAppointmentState( + appointmentId: Appointment.Id, + state: String, + uow: SchedulingJooqUnitOfWork +) { with(uow.dsl) { update(APPOINTMENTS).set(APPOINTMENTS.STATE, state) - .where(APPOINTMENTS.ID.eq(appointmentId.value)).execute() + .where(APPOINTMENTS.ID.eq(appointmentId.value)).returning().awaitFirst() } } -val onAppointmentUnattended: (AppointmentUnattendedEvent, SchedulingJooqUnitOfWork) -> Unit = { event, uow -> +suspend fun onAppointmentUnattended(event: AppointmentUnattendedEvent, uow: SchedulingJooqUnitOfWork) { updateAppointmentState(event.appointmentId, AppointmentState.UNATTENDED.toString(), uow) } -val onAppointmentAttended: (AppointmentAttendedEvent, SchedulingJooqUnitOfWork) -> Unit = { event, uow -> +suspend fun onAppointmentAttended(event: AppointmentAttendedEvent, uow: SchedulingJooqUnitOfWork) { updateAppointmentState(event.appointmentId, AppointmentState.ATTENDED.toString(), uow) } -val onAppointmentCanceled: (AppointmentCanceledEvent, SchedulingJooqUnitOfWork) -> Unit = { event, uow -> +suspend fun onAppointmentCanceled(event: AppointmentCanceledEvent, uow: SchedulingJooqUnitOfWork) { updateAppointmentState(event.appointmentId, AppointmentState.CANCELED.toString(), uow) } diff --git a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/messageBus.kt b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/messageBus.kt index e135700..298bbaa 100644 --- a/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/messageBus.kt +++ b/acme-web/acme-web-api/src/main/kotlin/com/acme/web/api/scheduling/messageBus.kt @@ -18,12 +18,12 @@ import com.acme.scheduling.schedulingMessageBus as baseSchedulingMessageBus val schedulingMessageBus = baseSchedulingMessageBus.copy().apply { addEventHandler( - PracticeCreatedEvent::class to onPracticeCreated, - PractitionerCreatedEvent::class to onPractitionerCreated, - ClientCreatedEvent::class to onClientCreated, - AppointmentCreatedEvent::class to onAppointmentCreated, - AppointmentAttendedEvent::class to onAppointmentAttended, - AppointmentUnattendedEvent::class to onAppointmentUnattended, - AppointmentCanceledEvent::class to onAppointmentCanceled, + PracticeCreatedEvent::class to ::onPracticeCreated, + PractitionerCreatedEvent::class to ::onPractitionerCreated, + ClientCreatedEvent::class to ::onClientCreated, + AppointmentCreatedEvent::class to ::onAppointmentCreated, + AppointmentAttendedEvent::class to ::onAppointmentAttended, + AppointmentUnattendedEvent::class to ::onAppointmentUnattended, + AppointmentCanceledEvent::class to ::onAppointmentCanceled, ) } diff --git a/acme-web/acme-web-api/src/main/resources/application.conf b/acme-web/acme-web-api/src/main/resources/application.conf index 402cbb7..0140c4a 100644 --- a/acme-web/acme-web-api/src/main/resources/application.conf +++ b/acme-web/acme-web-api/src/main/resources/application.conf @@ -9,8 +9,8 @@ ktor { } datasource { - jdbcUrl = "jdbc:postgresql://acme-postgresql:5432/acme" - jdbcUrl = ${?DATASOURCE__JDBC_URL} + r2dbcUrl = "r2dbc:postgresql://acme-postgresql:5432/acme" + r2dbcUrl = ${?DATASOURCE__R2DBC_URL} username = "acme" username = ${?DATASOURCE__USERNAME} password = "password" @@ -25,12 +25,10 @@ authentication { } keto { - baseUrl = "http://keto" - baseUrl = ${?KETO__BASE_URL} - readPort = 4466 - readPort = ${?KETO__READ_PORT} - writePort = 4467 - writePort = ${?KETO__WRITE_PORT} + readUrl = "http://keto:4466" + readUrl = ${?KETO__READ_URL} + writeUrl = "http://leto:4467" + writeUrl = ${?KETO__WRITE_URL} } include file("/etc/acme/application.conf") diff --git a/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/MessagesTest.kt b/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/MessagesTest.kt index a784109..e8ee043 100644 --- a/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/MessagesTest.kt +++ b/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/MessagesTest.kt @@ -29,7 +29,6 @@ import com.acme.web.api.scheduling.data.PracticeRecord import com.acme.web.api.scheduling.data.PractitionerRecord import io.kotest.core.spec.style.ShouldSpec import io.kotest.matchers.shouldBe -import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.assertThrows import java.time.Instant import java.time.LocalDateTime @@ -44,45 +43,43 @@ class MessagesTest : ShouldSpec({ context("CreatePracticeCommand") { should("result in new PracticeEntity") { testTransaction { - val uow = SchedulingJooqUnitOfWork(it) - - runBlocking { - uow.transaction { - // When - schedulingMessageBus.handle( - CreatePracticeCommand( - id = Practice.Id("Practice123"), - name = Practice.Name("Hello & Associates"), - owner = Practitioner.Id("Practitioner123"), - contactPoints = setOf( - ContactPoint.Phone.Unverified("917-555-5555"), - ContactPoint.Email.Unverified("hello@associates.com") - ) - ), - uow - ) - - // Then - with(JooqSchedulingWebViews(it.dsl())) { - findPracticeOrThrow("Practice123").shouldBe( - PracticeRecord( - id = "Practice123", - name = "Hello & Associates", - contactPoints = listOf( - ContactPointRecord( - system = "Email", - value = "hello@associates.com", - verifiedAt = null - ), - ContactPointRecord( - system = "Phone", - value = "917-555-5555", - verifiedAt = null - ) + val uow = SchedulingJooqUnitOfWork(it.configuration()) + + uow.transaction { + // When + schedulingMessageBus.handle( + CreatePracticeCommand( + id = Practice.Id("Practice123"), + name = Practice.Name("Hello & Associates"), + owner = Practitioner.Id("Practitioner123"), + contactPoints = setOf( + ContactPoint.Phone.Unverified("917-555-5555"), + ContactPoint.Email.Unverified("hello@associates.com") + ) + ), + uow + ) + + // Then + with(JooqSchedulingWebViews(it.dsl())) { + findPracticeOrThrow("Practice123").shouldBe( + PracticeRecord( + id = "Practice123", + name = "Hello & Associates", + contactPoints = listOf( + ContactPointRecord( + system = "Email", + value = "hello@associates.com", + verifiedAt = null + ), + ContactPointRecord( + system = "Phone", + value = "917-555-5555", + verifiedAt = null ) ) ) - } + ) } } } @@ -92,47 +89,45 @@ class MessagesTest : ShouldSpec({ context("CreatePractitionerCommand") { should("result in new PractitionerEntity") { testTransaction { - val uow = SchedulingJooqUnitOfWork(it) - runBlocking { - uow.transaction { - // When - schedulingMessageBus.handle( - CreatePractitionerCommand( - id = Practitioner.Id("Practitioner123"), - user = UserId("User123"), - name = HumanName( - given = Name.Given("First"), - family = Name.Family("Last"), - suffix = Name.Suffix(""), - prefix = Name.Prefix(""), - period = Period.Unknown - ), - gender = Gender.UNKNOWN, - contactPoints = emptySet() + val uow = SchedulingJooqUnitOfWork(it.configuration()) + uow.transaction { + // When + schedulingMessageBus.handle( + CreatePractitionerCommand( + id = Practitioner.Id("Practitioner123"), + user = UserId("User123"), + name = HumanName( + given = Name.Given("First"), + family = Name.Family("Last"), + suffix = Name.Suffix(""), + prefix = Name.Prefix(""), + period = Period.Unknown ), - uow - ) - - // Then - with(JooqSchedulingWebViews(it.dsl())) { - findPractitionerOrThrow("Practitioner123").shouldBe( - PractitionerRecord( - id = "Practitioner123", - names = listOf( - HumanNameRecord( - given = "First", - family = "Last", - suffix = "", - prefix = "", - periodStart = null, - periodEnd = null - ) - ), - gender = "UNKNOWN", - contactPoints = emptyList() - ) + gender = Gender.UNKNOWN, + contactPoints = emptySet() + ), + uow + ) + + // Then + with(JooqSchedulingWebViews(it.dsl())) { + findPractitionerOrThrow("Practitioner123").shouldBe( + PractitionerRecord( + id = "Practitioner123", + names = listOf( + HumanNameRecord( + given = "First", + family = "Last", + suffix = "", + prefix = "", + periodStart = null, + periodEnd = null + ) + ), + gender = "UNKNOWN", + contactPoints = emptyList() ) - } + ) } } } @@ -142,60 +137,58 @@ class MessagesTest : ShouldSpec({ context("CreateClientCommand") { should("result in new ClientEntity") { testTransaction { - val uow = SchedulingJooqUnitOfWork(it) - runBlocking { - uow.transaction { - // When - schedulingMessageBus.handle( - CreateClientCommand( - id = Client.Id("Client123"), - name = HumanName( - family = Name.Family("Last"), - given = Name.Given("First"), - prefix = Name.Prefix(""), - suffix = Name.Suffix(""), - period = Period.Unknown - ), - gender = Gender.UNKNOWN, - contactPoints = setOf( - ContactPoint.Phone.Unverified("917-555-5555"), - ContactPoint.Email.Unverified("hello@world.com") - ) + val uow = SchedulingJooqUnitOfWork(it.configuration()) + uow.transaction { + // When + schedulingMessageBus.handle( + CreateClientCommand( + id = Client.Id("Client123"), + name = HumanName( + family = Name.Family("Last"), + given = Name.Given("First"), + prefix = Name.Prefix(""), + suffix = Name.Suffix(""), + period = Period.Unknown ), - uow - ) - - // Then - with(JooqSchedulingWebViews(it.dsl())) { - findClientOrThrow("Client123").shouldBe( - ClientRecord( - id = "Client123", - names = listOf( - HumanNameRecord( - family = "Last", - given = "First", - prefix = "", - suffix = "", - periodStart = null, - periodEnd = null, - ) + gender = Gender.UNKNOWN, + contactPoints = setOf( + ContactPoint.Phone.Unverified("917-555-5555"), + ContactPoint.Email.Unverified("hello@world.com") + ) + ), + uow + ) + + // Then + with(JooqSchedulingWebViews(it.dsl())) { + findClientOrThrow("Client123").shouldBe( + ClientRecord( + id = "Client123", + names = listOf( + HumanNameRecord( + family = "Last", + given = "First", + prefix = "", + suffix = "", + periodStart = null, + periodEnd = null, + ) + ), + gender = "UNKNOWN", + contactPoints = listOf( + ContactPointRecord( + system = "Email", + value = "hello@world.com", + verifiedAt = null ), - gender = "UNKNOWN", - contactPoints = listOf( - ContactPointRecord( - system = "Email", - value = "hello@world.com", - verifiedAt = null - ), - ContactPointRecord( - system = "Phone", - value = "917-555-5555", - verifiedAt = null - ) + ContactPointRecord( + system = "Phone", + value = "917-555-5555", + verifiedAt = null ) ) ) - } + ) } } } @@ -205,15 +198,115 @@ class MessagesTest : ShouldSpec({ context("CreateAppointmentCommand") { should("throw exception with invalid references") { testTransaction { - val uow = SchedulingJooqUnitOfWork(it) + val uow = SchedulingJooqUnitOfWork(it.configuration()) + + uow.transaction { + // Given + val start = Instant.now().truncatedTo(ChronoUnit.MICROS) + val end = start.plus(1, ChronoUnit.HOURS) + + val command = CreateAppointmentCommand( + id = Appointment.Id("Appointment123"), + client = Client.Id("Client123"), + practitioner = Practitioner.Id("Practitioner123"), + practice = Practice.Id("Practice123"), + state = AppointmentState.SCHEDULED, + period = Period.Bounded( + start.toLocalDateTimeUTC(), + end.toLocalDateTimeUTC() + ) + ) - runBlocking { - uow.transaction { - // Given - val start = Instant.now().truncatedTo(ChronoUnit.MICROS) - val end = start.plus(1, ChronoUnit.HOURS) + // When + val exc = assertThrows { + schedulingMessageBus.handle(command, uow) + } - val command = CreateAppointmentCommand( + // Then + exc.command.shouldBe(command) + exc.errors.shouldBe( + setOf( + CommandValidationException.InvalidAggregateReferenceError( + fieldName = "client", + value = "Client123", + message = "Invalid client" + ), + CommandValidationException.InvalidAggregateReferenceError( + fieldName = "practitioner", + value = "Practitioner123", + message = "Invalid practitioner" + ), + CommandValidationException.InvalidAggregateReferenceError( + fieldName = "practice", + value = "Practice123", + message = "Invalid practice" + ) + ) + ) + } + } + } + + should("result in new AppointmentEntity") { + testTransaction { + val uow = SchedulingJooqUnitOfWork(it.configuration()) + uow.transaction { + // Given + schedulingMessageBus.handle( + CreatePracticeCommand( + id = Practice.Id("Practice123"), + name = Practice.Name("Hello & Associates"), + owner = Practitioner.Id("Practitioner123"), + contactPoints = setOf( + ContactPoint.Phone.Unverified("917-555-5555"), + ContactPoint.Email.Unverified("hello@associates.com") + ) + ), + uow + ) + + schedulingMessageBus.handle( + CreatePractitionerCommand( + id = Practitioner.Id("Practitioner123"), + user = UserId("User123"), + name = HumanName( + given = Name.Given("First"), + family = Name.Family("Last"), + suffix = Name.Suffix(""), + prefix = Name.Prefix(""), + period = Period.Unknown + ), + gender = Gender.UNKNOWN, + contactPoints = emptySet() + ), + uow + ) + + schedulingMessageBus.handle( + CreateClientCommand( + id = Client.Id("Client123"), + name = HumanName( + family = Name.Family("Last"), + given = Name.Given("First"), + prefix = Name.Prefix(""), + suffix = Name.Suffix(""), + period = Period.Unknown + ), + gender = Gender.UNKNOWN, + contactPoints = setOf( + ContactPoint.Phone.Unverified("917-555-5555"), + ContactPoint.Email.Unverified("hello@world.com") + ) + ), + uow + ) + + val start = Instant.now().truncatedTo(ChronoUnit.MICROS) + val end = start.plus(1, ChronoUnit.HOURS) + + // When + schedulingMessageBus.handle( + CreateAppointmentCommand( id = Appointment.Id("Appointment123"), client = Client.Id("Client123"), practitioner = Practitioner.Id("Practitioner123"), @@ -223,32 +316,21 @@ class MessagesTest : ShouldSpec({ start.toLocalDateTimeUTC(), end.toLocalDateTimeUTC() ) - ) - - // When - val exc = assertThrows { - schedulingMessageBus.handle(command, uow) - } - - // Then - exc.command.shouldBe(command) - exc.errors.shouldBe( - setOf( - CommandValidationException.InvalidAggregateReferenceError( - fieldName = "client", - value = "Client123", - message = "Invalid client" - ), - CommandValidationException.InvalidAggregateReferenceError( - fieldName = "practitioner", - value = "Practitioner123", - message = "Invalid practitioner" - ), - CommandValidationException.InvalidAggregateReferenceError( - fieldName = "practice", - value = "Practice123", - message = "Invalid practice" - ) + ), + uow + ) + + // Then + with(JooqSchedulingWebViews(it.dsl())) { + findAppointmentOrThrow("Appointment123").shouldBe( + AppointmentRecord( + id = "Appointment123", + clientId = "Client123", + practiceId = "Practice123", + practitionerId = "Practitioner123", + state = "SCHEDULED", + periodStart = start, + periodEnd = end, ) ) } @@ -256,10 +338,11 @@ class MessagesTest : ShouldSpec({ } } - should("result in new AppointmentEntity") { - testTransaction { - val uow = SchedulingJooqUnitOfWork(it) - runBlocking { + context("MarkAppointmentAttendedCommand") { + should("result in updated AppointmentEntity") { + testTransaction { + val uow = SchedulingJooqUnitOfWork(it.configuration()) + uow.transaction { // Given schedulingMessageBus.handle( @@ -314,7 +397,6 @@ class MessagesTest : ShouldSpec({ val start = Instant.now().truncatedTo(ChronoUnit.MICROS) val end = start.plus(1, ChronoUnit.HOURS) - // When schedulingMessageBus.handle( CreateAppointmentCommand( id = Appointment.Id("Appointment123"), @@ -330,6 +412,14 @@ class MessagesTest : ShouldSpec({ uow ) + // When + schedulingMessageBus.handle( + MarkAppointmentAttendedCommand( + appointment = Appointment.Id("Appointment123") + ), + uow + ) + // Then with(JooqSchedulingWebViews(it.dsl())) { findAppointmentOrThrow("Appointment123").shouldBe( @@ -338,7 +428,7 @@ class MessagesTest : ShouldSpec({ clientId = "Client123", practiceId = "Practice123", practitionerId = "Practitioner123", - state = "SCHEDULED", + state = "ATTENDED", periodStart = start, periodEnd = end, ) @@ -349,206 +439,101 @@ class MessagesTest : ShouldSpec({ } } - context("MarkAppointmentAttendedCommand") { + context("MarkAppointmentUnattendedCommand") { should("result in updated AppointmentEntity") { testTransaction { - val uow = SchedulingJooqUnitOfWork(it) - - runBlocking { - uow.transaction { - // Given - schedulingMessageBus.handle( - CreatePracticeCommand( - id = Practice.Id("Practice123"), - name = Practice.Name("Hello & Associates"), - owner = Practitioner.Id("Practitioner123"), - contactPoints = setOf( - ContactPoint.Phone.Unverified("917-555-5555"), - ContactPoint.Email.Unverified("hello@associates.com") - ) - ), - uow - ) + val uow = SchedulingJooqUnitOfWork(it.configuration()) - schedulingMessageBus.handle( - CreatePractitionerCommand( - id = Practitioner.Id("Practitioner123"), - user = UserId("User123"), - name = HumanName( - given = Name.Given("First"), - family = Name.Family("Last"), - suffix = Name.Suffix(""), - prefix = Name.Prefix(""), - period = Period.Unknown - ), - gender = Gender.UNKNOWN, - contactPoints = emptySet() - ), - uow - ) - - schedulingMessageBus.handle( - CreateClientCommand( - id = Client.Id("Client123"), - name = HumanName( - family = Name.Family("Last"), - given = Name.Given("First"), - prefix = Name.Prefix(""), - suffix = Name.Suffix(""), - period = Period.Unknown - ), - gender = Gender.UNKNOWN, - contactPoints = setOf( - ContactPoint.Phone.Unverified("917-555-5555"), - ContactPoint.Email.Unverified("hello@world.com") - ) - ), - uow - ) - - val start = Instant.now().truncatedTo(ChronoUnit.MICROS) - val end = start.plus(1, ChronoUnit.HOURS) - - schedulingMessageBus.handle( - CreateAppointmentCommand( - id = Appointment.Id("Appointment123"), - client = Client.Id("Client123"), - practitioner = Practitioner.Id("Practitioner123"), - practice = Practice.Id("Practice123"), - state = AppointmentState.SCHEDULED, - period = Period.Bounded( - start.toLocalDateTimeUTC(), - end.toLocalDateTimeUTC() - ) - ), - uow - ) - - // When - schedulingMessageBus.handle( - MarkAppointmentAttendedCommand( - appointment = Appointment.Id("Appointment123") - ), - uow - ) - - // Then - with(JooqSchedulingWebViews(it.dsl())) { - findAppointmentOrThrow("Appointment123").shouldBe( - AppointmentRecord( - id = "Appointment123", - clientId = "Client123", - practiceId = "Practice123", - practitionerId = "Practitioner123", - state = "ATTENDED", - periodStart = start, - periodEnd = end, - ) + uow.transaction { + // Given + schedulingMessageBus.handle( + CreatePracticeCommand( + id = Practice.Id("Practice123"), + name = Practice.Name("Hello & Associates"), + owner = Practitioner.Id("Practitioner123"), + contactPoints = setOf( + ContactPoint.Phone.Unverified("917-555-5555"), + ContactPoint.Email.Unverified("hello@associates.com") ) - } - } - } - } - } - } + ), + uow + ) - context("MarkAppointmentUnattendedCommand") { - should("result in updated AppointmentEntity") { - testTransaction { - val uow = SchedulingJooqUnitOfWork(it) - - runBlocking { - uow.transaction { - // Given - schedulingMessageBus.handle( - CreatePracticeCommand( - id = Practice.Id("Practice123"), - name = Practice.Name("Hello & Associates"), - owner = Practitioner.Id("Practitioner123"), - contactPoints = setOf( - ContactPoint.Phone.Unverified("917-555-5555"), - ContactPoint.Email.Unverified("hello@associates.com") - ) + schedulingMessageBus.handle( + CreatePractitionerCommand( + id = Practitioner.Id("Practitioner123"), + user = UserId("User123"), + name = HumanName( + given = Name.Given("First"), + family = Name.Family("Last"), + suffix = Name.Suffix(""), + prefix = Name.Prefix(""), + period = Period.Unknown ), - uow - ) + gender = Gender.UNKNOWN, + contactPoints = emptySet() + ), + uow + ) - schedulingMessageBus.handle( - CreatePractitionerCommand( - id = Practitioner.Id("Practitioner123"), - user = UserId("User123"), - name = HumanName( - given = Name.Given("First"), - family = Name.Family("Last"), - suffix = Name.Suffix(""), - prefix = Name.Prefix(""), - period = Period.Unknown - ), - gender = Gender.UNKNOWN, - contactPoints = emptySet() + schedulingMessageBus.handle( + CreateClientCommand( + id = Client.Id("Client123"), + name = HumanName( + family = Name.Family("Last"), + given = Name.Given("First"), + prefix = Name.Prefix(""), + suffix = Name.Suffix(""), + period = Period.Unknown ), - uow - ) + gender = Gender.UNKNOWN, + contactPoints = setOf( + ContactPoint.Phone.Unverified("917-555-5555"), + ContactPoint.Email.Unverified("hello@world.com") + ) + ), + uow + ) - schedulingMessageBus.handle( - CreateClientCommand( - id = Client.Id("Client123"), - name = HumanName( - family = Name.Family("Last"), - given = Name.Given("First"), - prefix = Name.Prefix(""), - suffix = Name.Suffix(""), - period = Period.Unknown - ), - gender = Gender.UNKNOWN, - contactPoints = setOf( - ContactPoint.Phone.Unverified("917-555-5555"), - ContactPoint.Email.Unverified("hello@world.com") - ) - ), - uow - ) + val start = Instant.now().truncatedTo(ChronoUnit.MICROS) + val end = start.plus(1, ChronoUnit.HOURS) - val start = Instant.now().truncatedTo(ChronoUnit.MICROS) - val end = start.plus(1, ChronoUnit.HOURS) - - schedulingMessageBus.handle( - CreateAppointmentCommand( - id = Appointment.Id("Appointment123"), - client = Client.Id("Client123"), - practitioner = Practitioner.Id("Practitioner123"), - practice = Practice.Id("Practice123"), - state = AppointmentState.SCHEDULED, - period = Period.Bounded( - start.toLocalDateTimeUTC(), - end.toLocalDateTimeUTC() - ) - ), - uow - ) + schedulingMessageBus.handle( + CreateAppointmentCommand( + id = Appointment.Id("Appointment123"), + client = Client.Id("Client123"), + practitioner = Practitioner.Id("Practitioner123"), + practice = Practice.Id("Practice123"), + state = AppointmentState.SCHEDULED, + period = Period.Bounded( + start.toLocalDateTimeUTC(), + end.toLocalDateTimeUTC() + ) + ), + uow + ) - // When - schedulingMessageBus.handle( - MarkAppointmentUnattendedCommand( - appointment = Appointment.Id("Appointment123") - ), - uow - ) + // When + schedulingMessageBus.handle( + MarkAppointmentUnattendedCommand( + appointment = Appointment.Id("Appointment123") + ), + uow + ) - // Then - with(JooqSchedulingWebViews(it.dsl())) { - findAppointmentOrThrow("Appointment123").shouldBe( - AppointmentRecord( - id = "Appointment123", - clientId = "Client123", - practiceId = "Practice123", - practitionerId = "Practitioner123", - state = "UNATTENDED", - periodStart = start, - periodEnd = end, - ) + // Then + with(JooqSchedulingWebViews(it.dsl())) { + findAppointmentOrThrow("Appointment123").shouldBe( + AppointmentRecord( + id = "Appointment123", + clientId = "Client123", + practiceId = "Practice123", + practitionerId = "Practitioner123", + state = "UNATTENDED", + periodStart = start, + periodEnd = end, ) - } + ) } } } @@ -558,100 +543,98 @@ class MessagesTest : ShouldSpec({ context("CancelAppointmentCommand") { should(" result in updated AppointmentEntity") { testTransaction { - val uow = SchedulingJooqUnitOfWork(it) - - runBlocking { - uow.transaction { - // Given - schedulingMessageBus.handle( - CreatePracticeCommand( - id = Practice.Id("Practice123"), - name = Practice.Name("Hello & Associates"), - owner = Practitioner.Id("Practitioner123"), - contactPoints = setOf( - ContactPoint.Phone.Unverified("917-555-5555"), - ContactPoint.Email.Unverified("hello@associates.com") - ) - ), - uow - ) + val uow = SchedulingJooqUnitOfWork(it.configuration()) - schedulingMessageBus.handle( - CreatePractitionerCommand( - id = Practitioner.Id("Practitioner123"), - user = UserId("User123"), - name = HumanName( - given = Name.Given("First"), - family = Name.Family("Last"), - suffix = Name.Suffix(""), - prefix = Name.Prefix(""), - period = Period.Unknown - ), - gender = Gender.UNKNOWN, - contactPoints = emptySet() - ), - uow - ) + uow.transaction { + // Given + schedulingMessageBus.handle( + CreatePracticeCommand( + id = Practice.Id("Practice123"), + name = Practice.Name("Hello & Associates"), + owner = Practitioner.Id("Practitioner123"), + contactPoints = setOf( + ContactPoint.Phone.Unverified("917-555-5555"), + ContactPoint.Email.Unverified("hello@associates.com") + ) + ), + uow + ) - schedulingMessageBus.handle( - CreateClientCommand( - id = Client.Id("Client123"), - name = HumanName( - family = Name.Family("Last"), - given = Name.Given("First"), - prefix = Name.Prefix(""), - suffix = Name.Suffix(""), - period = Period.Unknown - ), - gender = Gender.UNKNOWN, - contactPoints = setOf( - ContactPoint.Phone.Unverified("917-555-5555"), - ContactPoint.Email.Unverified("hello@world.com") - ) + schedulingMessageBus.handle( + CreatePractitionerCommand( + id = Practitioner.Id("Practitioner123"), + user = UserId("User123"), + name = HumanName( + given = Name.Given("First"), + family = Name.Family("Last"), + suffix = Name.Suffix(""), + prefix = Name.Prefix(""), + period = Period.Unknown ), - uow - ) + gender = Gender.UNKNOWN, + contactPoints = emptySet() + ), + uow + ) - val start = Instant.now().truncatedTo(ChronoUnit.MICROS) - val end = start.plus(1, ChronoUnit.HOURS) - - schedulingMessageBus.handle( - CreateAppointmentCommand( - id = Appointment.Id("Appointment123"), - client = Client.Id("Client123"), - practitioner = Practitioner.Id("Practitioner123"), - practice = Practice.Id("Practice123"), - state = AppointmentState.SCHEDULED, - period = Period.Bounded( - start.toLocalDateTimeUTC(), - end.toLocalDateTimeUTC() - ) + schedulingMessageBus.handle( + CreateClientCommand( + id = Client.Id("Client123"), + name = HumanName( + family = Name.Family("Last"), + given = Name.Given("First"), + prefix = Name.Prefix(""), + suffix = Name.Suffix(""), + period = Period.Unknown ), - uow - ) + gender = Gender.UNKNOWN, + contactPoints = setOf( + ContactPoint.Phone.Unverified("917-555-5555"), + ContactPoint.Email.Unverified("hello@world.com") + ) + ), + uow + ) - // When - schedulingMessageBus.handle( - CancelAppointmentCommand( - appointment = Appointment.Id("Appointment123") - ), - uow - ) + val start = Instant.now().truncatedTo(ChronoUnit.MICROS) + val end = start.plus(1, ChronoUnit.HOURS) - // Then - with(JooqSchedulingWebViews(it.dsl())) { - findAppointmentOrThrow("Appointment123").shouldBe( - AppointmentRecord( - id = "Appointment123", - clientId = "Client123", - practiceId = "Practice123", - practitionerId = "Practitioner123", - state = "CANCELED", - periodStart = start, - periodEnd = end, - ) + schedulingMessageBus.handle( + CreateAppointmentCommand( + id = Appointment.Id("Appointment123"), + client = Client.Id("Client123"), + practitioner = Practitioner.Id("Practitioner123"), + practice = Practice.Id("Practice123"), + state = AppointmentState.SCHEDULED, + period = Period.Bounded( + start.toLocalDateTimeUTC(), + end.toLocalDateTimeUTC() ) - } + ), + uow + ) + + // When + schedulingMessageBus.handle( + CancelAppointmentCommand( + appointment = Appointment.Id("Appointment123") + ), + uow + ) + + // Then + with(JooqSchedulingWebViews(it.dsl())) { + findAppointmentOrThrow("Appointment123").shouldBe( + AppointmentRecord( + id = "Appointment123", + clientId = "Client123", + practiceId = "Practice123", + practitionerId = "Practitioner123", + state = "CANCELED", + periodStart = start, + periodEnd = end, + ) + ) } } } diff --git a/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/database.kt b/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/database.kt new file mode 100644 index 0000000..ce53db1 --- /dev/null +++ b/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/database.kt @@ -0,0 +1,50 @@ +package com.acme.web.api.scheduling + +import com.acme.liquibase.update +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import io.r2dbc.spi.ConnectionFactories +import org.jooq.DSLContext +import org.jooq.exception.DataAccessException +import org.jooq.impl.DSL +import org.jooq.kotlin.coroutines.transactionCoroutine +import org.testcontainers.containers.PostgreSQLContainer +import org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainer + +class TestDatabase { + private val container = PostgreSQLContainer("postgres:15.5").apply { + start() + } + + val dsl = DSL.using( + ConnectionFactories.get( + PostgreSQLR2DBCDatabaseContainer.getOptions(container) + ) + ) + + init { + val ds = HikariDataSource( + HikariConfig().apply { + jdbcUrl = container.getJdbcUrl() + username = container.username + password = container.password + isAutoCommit = false + } + ) + update(ds.connection) + ds.close() + } +} + +val database = TestDatabase() + +suspend fun testTransaction(block: suspend (dsl: DSLContext) -> Unit) = try { + database.dsl.transactionCoroutine { + block(it.dsl()) + throw TestTransactionException() + } +} catch (_: TestTransactionException) { + // Do nothing and let Jooq rollback +} + +class TestTransactionException : DataAccessException("Rollback caused by test transaction") diff --git a/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/etc.kt b/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/etc.kt index 5d0180a..bc1f7bd 100644 --- a/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/etc.kt +++ b/acme-web/acme-web-api/src/test/kotlin/com/acme/web/api/scheduling/etc.kt @@ -1,58 +1,10 @@ package com.acme.web.api.scheduling import com.acme.web.api.core.toJsonPointer -import com.zaxxer.hikari.HikariConfig -import com.zaxxer.hikari.HikariDataSource -import io.github.oshai.kotlinlogging.KotlinLogging import io.kotest.core.Tag import jakarta.validation.ConstraintViolation import jakarta.validation.Validation import jakarta.validation.Validator -import org.jooq.Configuration -import org.jooq.SQLDialect -import org.jooq.exception.DataAccessException -import org.jooq.impl.DataSourceConnectionProvider -import org.jooq.impl.DefaultConfiguration -import org.jooq.impl.DefaultTransactionProvider - -internal val logger = KotlinLogging.logger {} - -const val DB_URL = - "jdbc:tc:postgresql:11.5:///test?TC_INITFUNCTION=com.acme.liquibase.LiquibaseTestContainerInitializerKt::update" - -val ds by lazy { - HikariDataSource( - HikariConfig().apply { - jdbcUrl = DB_URL - username = "test" - password = "test" - isAutoCommit = false - } - ) -} - -val jooq by lazy { - DefaultConfiguration().apply { - set(ds) - set(DefaultTransactionProvider(DataSourceConnectionProvider(ds), true)) - set(SQLDialect.POSTGRES) - } -} - -class TestTransactionException : DataAccessException("Rollback caused by test transaction") - -fun testTransaction(block: (config: Configuration) -> Unit) { - try { - with(jooq.dsl()) { - transaction { config -> - block(config) - throw TestTransactionException() - } - } - } catch (e: TestTransactionException) { - logger.debug { e.message } - } -} object SchedulingTest : Tag() diff --git a/acme-web/acme-web-app/build.gradle.kts b/acme-web/acme-web-app/build.gradle.kts index e3df0ac..c2f3378 100644 --- a/acme-web/acme-web-app/build.gradle.kts +++ b/acme-web/acme-web-app/build.gradle.kts @@ -24,7 +24,6 @@ dependencies { implementation(libs.am.ik.timeflake.timeflake4j) implementation(libs.ch.qos.logback.logback.classic) implementation(libs.com.michael.bull.kotlin.coroutines.jdbc) - implementation(libs.com.zaxxer.hikariCP) implementation(libs.io.github.oshai.kotlin.logging.jvm) implementation(libs.io.ktor.ktor.client.content.negotiation) implementation(libs.io.ktor.ktor.client.java) diff --git a/acme-web/acme-web-app/src/main/kotlin/com/acme/web/app/config/DataSourceConfiguration.kt b/acme-web/acme-web-app/src/main/kotlin/com/acme/web/app/config/DataSourceConfiguration.kt index e8a6539..6256c58 100644 --- a/acme-web/acme-web-app/src/main/kotlin/com/acme/web/app/config/DataSourceConfiguration.kt +++ b/acme-web/acme-web-app/src/main/kotlin/com/acme/web/app/config/DataSourceConfiguration.kt @@ -3,14 +3,14 @@ package com.acme.web.app.config import io.ktor.server.config.ApplicationConfig data class DataSourceConfiguration( - val jdbcUrl: String, + val r2dbcUrl: String, val username: String, val password: String ) { companion object { fun fromConfig(config: ApplicationConfig) = DataSourceConfiguration( - config.property("jdbcUrl").getString(), + config.property("r2dbcUrl").getString(), config.property("username").getString(), config.property("password").getString() ) diff --git a/acme-web/acme-web-app/src/main/kotlin/com/acme/web/app/factory.kt b/acme-web/acme-web-app/src/main/kotlin/com/acme/web/app/factory.kt index cfdb1c2..8a9ed8c 100644 --- a/acme-web/acme-web-app/src/main/kotlin/com/acme/web/app/factory.kt +++ b/acme-web/acme-web-app/src/main/kotlin/com/acme/web/app/factory.kt @@ -8,52 +8,37 @@ import com.acme.web.app.views.appointments import com.acme.web.app.views.health import com.acme.web.app.views.root import com.acme.web.app.views.staticAssets -import com.zaxxer.hikari.HikariConfig -import com.zaxxer.hikari.HikariDataSource import io.ktor.server.application.Application import io.ktor.server.auth.authenticate import io.ktor.server.routing.routing +import io.r2dbc.spi.ConnectionFactories import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.ConnectionFactoryOptions import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.Json -import org.jooq.SQLDialect -import org.jooq.impl.DataSourceConnectionProvider -import org.jooq.impl.DefaultConfiguration -import org.jooq.impl.DefaultTransactionProvider -import javax.sql.DataSource +import org.jooq.impl.DSL val defaultJson = Json { prettyPrint = true prettyPrintIndent = " " } -fun dataSourceFactory(config: DataSourceConfiguration) = HikariDataSource( - HikariConfig().apply { - jdbcUrl = config.jdbcUrl - username = config.username - password = config.password - isAutoCommit = false - } +fun connectionFactory(config: DataSourceConfiguration): ConnectionFactory = ConnectionFactories.get( + ConnectionFactoryOptions.parse(config.r2dbcUrl).mutate() + .option(ConnectionFactoryOptions.USER, config.username) + .option(ConnectionFactoryOptions.PASSWORD, config.password) + .build() ) -fun jooqConfigFactory(connectionFactory: ConnectionFactory) = DefaultConfiguration().apply { - set(connectionFactory) - set(SQLDialect.POSTGRES) -} - -fun jooqConfigFactory(dataSource: DataSource) = DefaultConfiguration().apply { - set(dataSource) - set(DefaultTransactionProvider(DataSourceConnectionProvider(dataSource), true)) - set(SQLDialect.POSTGRES) -} +fun jooqConfigFactory(connectionFactory: ConnectionFactory) = DSL.using(connectionFactory) -fun dataConfigFactory(config: DataSourceConfiguration) = dataSourceFactory(config).let { +fun dataConfigFactory(config: DataSourceConfiguration) = connectionFactory(config).let { it to jooqConfigFactory(it) } fun Application.main(config: MainConfiguration, json: Json = defaultJson) { common(config, json) - // val (_, jooqConfig) = dataConfigFactory(config.datasource) + val (_, jooq) = dataConfigFactory(config.datasource) routing { authenticate { diff --git a/acme-web/acme-web-app/src/main/resources/application.conf b/acme-web/acme-web-app/src/main/resources/application.conf index 333dab0..590a092 100644 --- a/acme-web/acme-web-app/src/main/resources/application.conf +++ b/acme-web/acme-web-app/src/main/resources/application.conf @@ -9,8 +9,8 @@ ktor { } datasource { - jdbcUrl = "jdbc:postgresql://acme-postgresql:5432/acme" - jdbcUrl = ${?DATASOURCE__JDBC_URL} + r2dbcUrl = "r2dbc:postgresql://acme-postgresql:5432/acme" + r2dbcUrl = ${?DATASOURCE__R2DBC_URL} username = "acme" username = ${?DATASOURCE__USERNAME} password = "password"