-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor persistence metadata and revision updates
- Loading branch information
1 parent
db6968b
commit 5fe7e9a
Showing
27 changed files
with
380 additions
and
352 deletions.
There are no files selected for viewing
65 changes: 41 additions & 24 deletions
65
...scheduling/src/main/kotlin/com/acme/scheduling/data/JooqAppointmentAggregateRepository.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,71 @@ | ||
package com.acme.scheduling.data | ||
|
||
import com.acme.core.AggregateRepository | ||
import com.acme.core.HasRevision | ||
import com.acme.core.PersistedAggregate | ||
import com.acme.core.PersistenceMetaData | ||
import com.acme.scheduling.Appointment | ||
import com.acme.sql.scheduling.tables.references.APPOINTMENTS | ||
import kotlinx.serialization.decodeFromString | ||
import kotlinx.serialization.encodeToString | ||
import kotlinx.serialization.json.Json | ||
import org.jooq.DSLContext | ||
import org.jooq.JSONB | ||
import org.jooq.impl.asExcluded | ||
import java.time.Clock | ||
import java.time.Instant | ||
import java.time.LocalDateTime | ||
import java.time.ZoneOffset | ||
|
||
class JooqAppointmentAggregateRepository( | ||
private val dsl: DSLContext, | ||
private val clock: Clock = Clock.systemUTC() | ||
) : AggregateRepository<Appointment, Appointment.Id> { | ||
|
||
override fun find(id: Appointment.Id): Appointment? = | ||
override fun find(id: Appointment.Id): PersistedAggregate<Appointment>? = | ||
dsl.selectFrom(APPOINTMENTS) | ||
.where(APPOINTMENTS.ID.eq(id.value)) | ||
.fetchOne { | ||
Json.decodeFromString<Appointment>(it.aggregate!!.data()) | ||
PersistedAggregate( | ||
aggregate = Json.decodeFromString<Appointment>(it.aggregate!!.data()), | ||
metaData = PersistenceMetaData( | ||
createdAt = it.createdAt!!, | ||
updatedAt = it.updatedAt!!, | ||
revision = it.revision!!, | ||
) | ||
) | ||
} | ||
|
||
override fun get(id: Appointment.Id): Appointment = getOrThrow(id) { NoSuchElementException() } | ||
override fun get(id: Appointment.Id): PersistedAggregate<Appointment> = getOrThrow(id) { NoSuchElementException() } | ||
|
||
override fun getOrThrow(id: Appointment.Id, block: () -> Throwable): Appointment = | ||
override fun getOrThrow(id: Appointment.Id, block: () -> Throwable): PersistedAggregate<Appointment> = | ||
find(id) ?: throw block() | ||
|
||
override fun exists(id: Appointment.Id): Boolean = | ||
dsl.fetchExists(APPOINTMENTS, APPOINTMENTS.ID.eq(id.value)) | ||
|
||
override fun save(aggregate: Appointment) { | ||
val now = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC) | ||
val json = JSONB.valueOf(Json.encodeToString(aggregate)) | ||
if (HasRevision.aggregateIsNew(aggregate)) { | ||
dsl.insertInto( | ||
APPOINTMENTS, | ||
APPOINTMENTS.ID, | ||
APPOINTMENTS.VERSION_NUMBER, | ||
APPOINTMENTS.AGGREGATE | ||
).values( | ||
aggregate.id.value, | ||
aggregate.revision, | ||
json | ||
).execute() | ||
} else { | ||
dsl.update(APPOINTMENTS) | ||
.set(APPOINTMENTS.AGGREGATE, json) | ||
.set(APPOINTMENTS.VERSION_NUMBER, aggregate.revision) | ||
.where(APPOINTMENTS.ID.eq(aggregate.id.value)) | ||
.and(APPOINTMENTS.VERSION_NUMBER.eq(aggregate.revision - 1)) | ||
.execute() | ||
} | ||
|
||
dsl.insertInto( | ||
APPOINTMENTS, | ||
APPOINTMENTS.ID, | ||
APPOINTMENTS.REVISION, | ||
APPOINTMENTS.AGGREGATE, | ||
APPOINTMENTS.CREATED_AT, | ||
APPOINTMENTS.UPDATED_AT | ||
).values( | ||
aggregate.id.value, | ||
1, | ||
json, | ||
now, | ||
now, | ||
) | ||
.onConflict(APPOINTMENTS.ID) | ||
.doUpdate() | ||
.set(APPOINTMENTS.AGGREGATE, APPOINTMENTS.AGGREGATE.asExcluded()) | ||
.set(APPOINTMENTS.REVISION, APPOINTMENTS.REVISION.add(1)) | ||
.set(APPOINTMENTS.UPDATED_AT, now) | ||
.execute() | ||
} | ||
} |
65 changes: 40 additions & 25 deletions
65
...data-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqClientAggregateRepository.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,69 @@ | ||
package com.acme.scheduling.data | ||
|
||
import com.acme.core.AggregateRepository | ||
import com.acme.core.HasRevision | ||
import com.acme.core.PersistedAggregate | ||
import com.acme.core.PersistenceMetaData | ||
import com.acme.scheduling.Client | ||
import com.acme.sql.scheduling.tables.references.CLIENTS | ||
import kotlinx.serialization.decodeFromString | ||
import kotlinx.serialization.encodeToString | ||
import kotlinx.serialization.json.Json | ||
import org.jooq.DSLContext | ||
import org.jooq.JSONB | ||
import org.jooq.impl.asExcluded | ||
import java.time.Clock | ||
import java.time.LocalDateTime | ||
|
||
class JooqClientAggregateRepository( | ||
private val dsl: DSLContext | ||
private val dsl: DSLContext, | ||
private val clock: Clock = Clock.systemUTC() | ||
) : AggregateRepository<Client, Client.Id> { | ||
|
||
override fun find(id: Client.Id): Client? = | ||
override fun find(id: Client.Id): PersistedAggregate<Client>? = | ||
dsl.selectFrom(CLIENTS) | ||
.where(CLIENTS.ID.eq(id.value)) | ||
.fetchOne { | ||
Json.decodeFromString<Client>(it.aggregate!!.data()) | ||
PersistedAggregate( | ||
aggregate = Json.decodeFromString<Client>(it.aggregate!!.data()), | ||
metaData = PersistenceMetaData( | ||
createdAt = it.createdAt!!, | ||
updatedAt = it.updatedAt!!, | ||
revision = it.revision!!, | ||
) | ||
) | ||
} | ||
|
||
override fun get(id: Client.Id): Client = getOrThrow(id) { NoSuchElementException() } | ||
override fun get(id: Client.Id): PersistedAggregate<Client> = getOrThrow(id) { NoSuchElementException() } | ||
|
||
override fun getOrThrow(id: Client.Id, block: () -> Throwable): Client = | ||
override fun getOrThrow(id: Client.Id, block: () -> Throwable): PersistedAggregate<Client> = | ||
find(id) ?: throw block() | ||
|
||
override fun exists(id: Client.Id): Boolean = | ||
dsl.fetchExists(CLIENTS, CLIENTS.ID.eq(id.value)) | ||
|
||
override fun save(aggregate: Client) { | ||
val now = LocalDateTime.now(clock) | ||
val json = JSONB.valueOf(Json.encodeToString(aggregate)) | ||
if (HasRevision.aggregateIsNew(aggregate)) { | ||
dsl.insertInto( | ||
CLIENTS, | ||
CLIENTS.ID, | ||
CLIENTS.VERSION_NUMBER, | ||
CLIENTS.AGGREGATE | ||
).values( | ||
aggregate.id.value, | ||
aggregate.revision, | ||
json | ||
).execute() | ||
} else { | ||
dsl.update(CLIENTS) | ||
.set(CLIENTS.AGGREGATE, json) | ||
.set(CLIENTS.VERSION_NUMBER, aggregate.revision) | ||
.where(CLIENTS.ID.eq(aggregate.id.value)) | ||
.and(CLIENTS.VERSION_NUMBER.eq(aggregate.revision - 1)) | ||
.execute() | ||
} | ||
|
||
dsl.insertInto( | ||
CLIENTS, | ||
CLIENTS.ID, | ||
CLIENTS.REVISION, | ||
CLIENTS.AGGREGATE, | ||
CLIENTS.CREATED_AT, | ||
CLIENTS.UPDATED_AT | ||
).values( | ||
aggregate.id.value, | ||
1, | ||
json, | ||
now, | ||
now | ||
) | ||
.onConflict(CLIENTS.ID) | ||
.doUpdate() | ||
.set(CLIENTS.AGGREGATE, CLIENTS.AGGREGATE.asExcluded()) | ||
.set(CLIENTS.REVISION, CLIENTS.REVISION.add(1)) | ||
.set(CLIENTS.UPDATED_AT, now) | ||
.execute() | ||
} | ||
} |
65 changes: 40 additions & 25 deletions
65
...ta-scheduling/src/main/kotlin/com/acme/scheduling/data/JooqPracticeAggregateRepository.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,69 @@ | ||
package com.acme.scheduling.data | ||
|
||
import com.acme.core.AggregateRepository | ||
import com.acme.core.HasRevision | ||
import com.acme.core.PersistedAggregate | ||
import com.acme.core.PersistenceMetaData | ||
import com.acme.scheduling.Practice | ||
import com.acme.sql.scheduling.tables.Practices.Companion.PRACTICES | ||
import kotlinx.serialization.decodeFromString | ||
import kotlinx.serialization.encodeToString | ||
import kotlinx.serialization.json.Json | ||
import org.jooq.DSLContext | ||
import org.jooq.JSONB | ||
import org.jooq.impl.asExcluded | ||
import java.time.Clock | ||
import java.time.LocalDateTime | ||
|
||
class JooqPracticeAggregateRepository( | ||
private val dsl: DSLContext | ||
private val dsl: DSLContext, | ||
private val clock: Clock = Clock.systemUTC() | ||
) : AggregateRepository<Practice, Practice.Id> { | ||
|
||
override fun find(id: Practice.Id): Practice? = | ||
override fun find(id: Practice.Id): PersistedAggregate<Practice>? = | ||
dsl.selectFrom(PRACTICES) | ||
.where(PRACTICES.ID.eq(id.value)) | ||
.fetchOne { | ||
Json.decodeFromString<Practice>(it.aggregate!!.data()) | ||
PersistedAggregate( | ||
aggregate = Json.decodeFromString<Practice>(it.aggregate!!.data()), | ||
metaData = PersistenceMetaData( | ||
createdAt = it.createdAt!!, | ||
updatedAt = it.updatedAt!!, | ||
revision = it.revision!!, | ||
) | ||
) | ||
} | ||
|
||
override fun get(id: Practice.Id): Practice = getOrThrow(id) { NoSuchElementException() } | ||
override fun get(id: Practice.Id): PersistedAggregate<Practice> = getOrThrow(id) { NoSuchElementException() } | ||
|
||
override fun getOrThrow(id: Practice.Id, block: () -> Throwable): Practice = | ||
override fun getOrThrow(id: Practice.Id, block: () -> Throwable): PersistedAggregate<Practice> = | ||
find(id) ?: throw block() | ||
|
||
override fun exists(id: Practice.Id): Boolean = | ||
dsl.fetchExists(PRACTICES, PRACTICES.ID.eq(id.value)) | ||
|
||
override fun save(aggregate: Practice) { | ||
val now = LocalDateTime.now(clock) | ||
val json = JSONB.valueOf(Json.encodeToString(aggregate)) | ||
if (HasRevision.aggregateIsNew(aggregate)) { | ||
dsl.insertInto( | ||
PRACTICES, | ||
PRACTICES.ID, | ||
PRACTICES.VERSION_NUMBER, | ||
PRACTICES.AGGREGATE, | ||
).values( | ||
aggregate.id.value, | ||
aggregate.revision, | ||
json | ||
).execute() | ||
} else { | ||
dsl.update(PRACTICES) | ||
.set(PRACTICES.AGGREGATE, json) | ||
.set(PRACTICES.VERSION_NUMBER, aggregate.revision) | ||
.where(PRACTICES.ID.eq(aggregate.id.value)) | ||
.and(PRACTICES.VERSION_NUMBER.eq(aggregate.revision - 1)) | ||
.execute() | ||
} | ||
|
||
dsl.insertInto( | ||
PRACTICES, | ||
PRACTICES.ID, | ||
PRACTICES.REVISION, | ||
PRACTICES.AGGREGATE, | ||
PRACTICES.CREATED_AT, | ||
PRACTICES.UPDATED_AT, | ||
).values( | ||
aggregate.id.value, | ||
1, | ||
json, | ||
now, | ||
now | ||
) | ||
.onConflict(PRACTICES.ID) | ||
.doUpdate() | ||
.set(PRACTICES.AGGREGATE, PRACTICES.AGGREGATE.asExcluded()) | ||
.set(PRACTICES.REVISION, PRACTICES.REVISION.add(1)) | ||
.set(PRACTICES.UPDATED_AT, now) | ||
.execute() | ||
} | ||
} |
65 changes: 40 additions & 25 deletions
65
...cheduling/src/main/kotlin/com/acme/scheduling/data/JooqPractitionerAggregateRepository.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,69 @@ | ||
package com.acme.scheduling.data | ||
|
||
import com.acme.core.AggregateRepository | ||
import com.acme.core.HasRevision | ||
import com.acme.core.PersistedAggregate | ||
import com.acme.core.PersistenceMetaData | ||
import com.acme.scheduling.Practitioner | ||
import com.acme.sql.scheduling.tables.Practitioners.Companion.PRACTITIONERS | ||
import kotlinx.serialization.decodeFromString | ||
import kotlinx.serialization.encodeToString | ||
import kotlinx.serialization.json.Json | ||
import org.jooq.DSLContext | ||
import org.jooq.JSONB | ||
import org.jooq.impl.asExcluded | ||
import java.time.Clock | ||
import java.time.LocalDateTime | ||
|
||
class JooqPractitionerAggregateRepository( | ||
private val dsl: DSLContext | ||
private val dsl: DSLContext, | ||
private val clock: Clock = Clock.systemUTC() | ||
) : AggregateRepository<Practitioner, Practitioner.Id> { | ||
|
||
override fun find(id: Practitioner.Id): Practitioner? = | ||
override fun find(id: Practitioner.Id): PersistedAggregate<Practitioner>? = | ||
dsl.selectFrom(PRACTITIONERS) | ||
.where(PRACTITIONERS.ID.eq(id.value)) | ||
.fetchOne { | ||
Json.decodeFromString<Practitioner>(it.aggregate!!.data()) | ||
PersistedAggregate( | ||
aggregate = Json.decodeFromString<Practitioner>(it.aggregate!!.data()), | ||
metaData = PersistenceMetaData( | ||
createdAt = it.createdAt!!, | ||
updatedAt = it.updatedAt!!, | ||
revision = it.revision!!, | ||
) | ||
) | ||
} | ||
|
||
override fun get(id: Practitioner.Id): Practitioner = getOrThrow(id) { NoSuchElementException() } | ||
override fun get(id: Practitioner.Id): PersistedAggregate<Practitioner> = getOrThrow(id) { NoSuchElementException() } | ||
|
||
override fun getOrThrow(id: Practitioner.Id, block: () -> Throwable): Practitioner = | ||
override fun getOrThrow(id: Practitioner.Id, block: () -> Throwable): PersistedAggregate<Practitioner> = | ||
find(id) ?: throw block() | ||
|
||
override fun exists(id: Practitioner.Id): Boolean = | ||
dsl.fetchExists(PRACTITIONERS, PRACTITIONERS.ID.eq(id.value)) | ||
|
||
override fun save(aggregate: Practitioner) { | ||
val now = LocalDateTime.now(clock) | ||
val json = JSONB.valueOf(Json.encodeToString(aggregate)) | ||
if (HasRevision.aggregateIsNew(aggregate)) { | ||
dsl.insertInto( | ||
PRACTITIONERS, | ||
PRACTITIONERS.ID, | ||
PRACTITIONERS.VERSION_NUMBER, | ||
PRACTITIONERS.AGGREGATE | ||
).values( | ||
aggregate.id.value, | ||
aggregate.revision, | ||
json | ||
).execute() | ||
} else { | ||
dsl.update(PRACTITIONERS) | ||
.set(PRACTITIONERS.AGGREGATE, json) | ||
.set(PRACTITIONERS.VERSION_NUMBER, aggregate.revision) | ||
.where(PRACTITIONERS.ID.eq(aggregate.id.value)) | ||
.and(PRACTITIONERS.VERSION_NUMBER.eq(aggregate.revision - 1)) | ||
.execute() | ||
} | ||
|
||
dsl.insertInto( | ||
PRACTITIONERS, | ||
PRACTITIONERS.ID, | ||
PRACTITIONERS.REVISION, | ||
PRACTITIONERS.AGGREGATE, | ||
PRACTITIONERS.CREATED_AT, | ||
PRACTITIONERS.UPDATED_AT | ||
).values( | ||
aggregate.id.value, | ||
1, | ||
json, | ||
now, | ||
now | ||
) | ||
.onConflict(PRACTITIONERS.ID) | ||
.doUpdate() | ||
.set(PRACTITIONERS.AGGREGATE, PRACTITIONERS.AGGREGATE.asExcluded()) | ||
.set(PRACTITIONERS.REVISION, PRACTITIONERS.REVISION.add(1)) | ||
.set(PRACTITIONERS.UPDATED_AT, now) | ||
.execute() | ||
} | ||
} |
Oops, something went wrong.