diff --git a/README.md b/README.md index ff61a4a7..ba00059b 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Tutorial, practical samples and other resources about Event Sourcing in JVM. See - [Prerequisites](#prerequisites) - [Tools used](#tools-used) - [Event Versioning](#event-versioning) + - [Uniqueness](#uniqueness) - [Articles](#articles) ## Event Sourcing @@ -281,7 +282,15 @@ Shows how to handle basic event schema versioning scenarios using event and stre - [Events Transformations](./samples/events-versioning/#events-transformations) - [Stream Transformation](./samples/events-versioning/#stream-transformation) - [Summary](./samples/events-versioning/#summary) -- 📝 [Simple patterns for events schema versioning](https://event-driven.io/en/simple_events_versioning_patterns/?utm_source=event_sourcing_jvm_) +- 📝 [Simple patterns for events schema versioning](https://event-driven.io/en/simple_events_versioning_patterns/?utm_source=event_sourcing_jvm) + +### [Uniqueness](./samples/uniqueness/) +Shows how to handle unique constraint checks in an event-sources system. Explains various techniques like: +- talking to business, +- stream id design, +- reservation pattern. + +Read more in [How to ensure uniqueness in Event Sourcing](https://event-driven.io/en/uniqueness-in-event-sourcing/?utm_source=event_sourcing_jvm). ## Articles diff --git a/samples/uniqueness/README.md b/samples/uniqueness/README.md index 02785118..c76a97e1 100644 --- a/samples/uniqueness/README.md +++ b/samples/uniqueness/README.md @@ -1,563 +1,219 @@ # Event Schema Versioning - [Event Schema Versioning](#event-schema-versioning) - - [Simple mapping](#simple-mapping) - - [New not required property](#new-not-required-property) - - [New required property](#new-required-property) - - [Renamed property](#renamed-property) - - [Upcasting](#upcasting) - - [Changed Structure](#changed-structure) - - [New required property](#new-required-property-1) - - [Downcasters](#downcasters) - - [Events Transformations](#events-transformations) - - [Stream Transformation](#stream-transformation) - - [Migrations](#migrations) - - [Summary](#summary) + - [Enforcing unique constraint by stream id](#enforcing-unique-constraint-by-stream-id) + - [Revoking unique value by stream id](#revoking-unique-value-by-stream-id) + - [Reservation pattern](#reservation-pattern) -As time flow, the events' definition may change. Our business is changing, and we need to add more information. Sometimes we have to fix a bug or modify the definition for a better developer experience. +Uniqueness constraint is one of the most common requests but also (surprisingly) the most challenging. Typically it means that the business tries to bring us a potential solution for their problem instead of explaining the root issue to us. We should always ask enough whys before diving into a technical solution. Read more in my article [Bring me problems, not solutions!](https://event-driven.io/en/uniqueness-in-event-sourcing/?utm_source=event_sourcing_jvm). -Migrations are never easy, even in relational databases. You always have to think on: -- what caused the change? -- what are the possible solutions? -- is the change breaking? -- what to do with old data? +Moreover, the uniqueness constraint in the real world is a thing. Typically duplicates may appear, and then we're doing compensating operations or merging them. Implementing such compensation may be much easier than sophisticated technical solutions to guarantee uniqueness if that rarely happens. -We should always try to perform the change in a non-breaking manner. I explained that in [Let's take care of ourselves! Thoughts on compatibility](https://event-driven.io/en/lets_take_care_of_ourselves_thoughts_about_comptibility/) article. +Yet, sometimes we need to cut discussions and just do the work. Some time ago, I wrote an article [How to ensure uniqueness in Event Sourcing](https://event-driven.io/en/uniqueness-in-event-sourcing/?utm_source=event_sourcing_jvm). It explains all the most popular techniques for dealing with this case. I suggest reading it before going to code. These samples are an appendix showing how to do it practically. -The same "issues" happens for event data model. Greg Young wrote a book about it. You can read it for free: https://leanpub.com/esversioning/read. I recommend you to read it. +Have a look also at the article [Tell, don't ask! Or, how to keep an eye on boiling milk](https://event-driven.io/en/tell_dont_ask_how_to_keep_an_eye_on_boiling_milk/?utm_source=event_sourcing_jvm). I explained why querying for the read model is never a bulletproof solution; it can be, at best, good enough. -This sample shows how to do basic Event Schema versioning. Those patterns can be applied to any event store. +## Enforcing unique constraint by stream id -You can watch the webinar on YouTube where I'm explaining the details of the implementation: +Event stores are types of [key-value stores](https://event-driven.io/pl/key-value-stores/?utm_source=event_sourcing_jvm). They guarantee the uniqueness of the key. We can use that to guarantee the uniqueness of our data. We can add unique data as part of the stream id. It works well for use cases where unique fields do not change once they're set up. If they change, we'd need to create a new stream, deleting the old one. For instance, we'd like to enforce only a single shopping cart for the client. We could do that by having the following stream id pattern: -Never Lose Data Again - Event Sourcing to the Rescue! - -or read blog article [Simple patterns for events schema versioning](https://event-driven.io/en/simple_events_versioning_patterns/). - -## Simple mapping - -There are some simple mappings that we could handle on the code structure or serialisation level. I'm using `Jackson` in samples, other serialisers may be smarter, but the patterns will be similar. - -### New not required property - -Having event defined as such: - -```java -public record ShoppingCartOpened( - UUID shoppingCartId, - UUID clientId - ) { } ``` - -If we'd like to add a new not required property, e.g. `intializedAt`, we can add it just as a new nullable property. The essential fact to decide if that's the right strategy is if we're good with not having it defined. It can be handled as: - -```java -public record ShoppingCartOpened( - UUID shoppingCartId, - UUID clientId, - // Adding new not required property - OffsetDateTime initializedAt - ) { } +var shoppingCartStreamId = "shopping_cart-%s".formatted(clientId); ``` -Then, most serialisers will put the null value by default and not fail unless we use strict mode. The new events will contain whole information, for the old ones we'll have to live with that. - -See full sample: [NewNotRequiredPropertyTests.java](./src/test/java/io/eventdriven/uniqueness/simplemappings/NewNotRequiredPropertyTests.java). - - -### New required property - -We must define a default value if we'd like to add a new required property and make it non-breaking. It's the same as you'd add a new column to the relational table. - -For instance, we decide that we'd like to add a validation step when the shopping cart is open (e.g. for fraud or spam detection), and our shopping cart can be opened with a pending state. We could solve that by adding the new property with the status information and setting it to `Opened`, assuming that all old events were appended using the older logic. +Then we can tell the event store that we expect the stream not to exist. For example, implementation using EventStoreDB will look as follows: ```java -public enum ShoppingCartStatus { - Pending, - Opened, - Confirmed, - Cancelled -} +var clientId = UUID.randomUUID(); +// We're assuming that there can be only a single shopping cart open for specific client. +// We can enforce uniqueness by putting client id into a stream id +var shoppingCartStreamId = "shopping_cart-%s".formatted(clientId); +var shoppingCartOpened = new ShoppingCartOpened(clientId, clientId); -public record ShoppingCartOpened( - UUID shoppingCartId, - UUID clientId, - // Adding new not required property as nullable - ShoppingCartStatus status -) { - public ShoppingCartOpened { - if (status == null) { - status = ShoppingCartStatus.Opened; - } - } -} -``` - -See full sample: [NewRequiredProperty.cs](./src/test/java/io/eventdriven/uniqueness/simplemappings/NewRequiredPropertyTests.java). - -### Renamed property - -Renaming property is also a breaking change. Still, we can do it in a non-breaking manner. We could keep the same name in the JSON but map it during (de) serialisation. +// This one should succeed as we don't have such stream yet +eventStore.appendToStream( + shoppingCartStreamId, + AppendToStreamOptions.get().expectedRevision(ExpectedRevision.NO_STREAM), + EventSerializer.serialize(shoppingCartOpened) +).get(); -Let's assume that we concluded that keeping `ShoppingCart` prefix in the `shoppingCartId` is redundant and decided to change it to `cartId`, as we see in the event name, what cart we have in mind. - -We could do it as: - -```java -record ShoppingCartOpened( - @JsonProperty("shoppingCartId") UUID cartId, - UUID clientId -) implements ShoppingCartEvent { +// This one will fail, as we're expecting that stream doesn't exist +try { + eventStore.appendToStream( + shoppingCartStreamId, + AppendToStreamOptions.get().expectedRevision(ExpectedRevision.NO_STREAM), + EventSerializer.serialize(shoppingCartOpened) + ).get(); +} catch (ExecutionException exception) { + assertInstanceOf(WrongExpectedVersionException.class, exception.getCause()); } ``` -The benefit is that both old and the new structure will be backward and forward compatible. The downside of this solution is that we're still keeping the old JSON structure, so all consumers need to be aware of that and do mapping if they want to use the new structure. - -See full sample: [NewRequiredProperty.cs](./src/test/java/io/eventdriven/uniqueness/simplemappings/NewRequiredPropertyTests.java). +See more in [ShoppingCartTests](./src/test/java/io/eventdriven/uniqueness/shoppingcarts/ShoppingCartTests.java). -## Upcasting - -Sometimes we want to make more significant changes or be more flexible in the event mapping. We'd like to use a new structure in our code, not polluted by the custom mappings. - -We can use an upcasting pattern for that. We can plug a middleware between the deserialisation and application logic. Having that, we can either grab raw JSON or deserialised object of the old structure and transform it to the new schema. - -### Changed Structure - -For instance, we decide to send also other information about the client, instead of just their id. We'd like to have a nested object instead of the flattened list of fields. We could model new event structure as: +We could also use stream id to enforce the uniqueness of multiple keys. For instance, cinema ticket reservations should be unique for the specific screening and seat id. We could either create a conjoined stream id or use some [decent hash](./src/main/java/io/eventdriven/uniqueness/core/resourcereservation/Hash.java). We can combine all the values into the string and hash them, e.g.: ```java -public record Client( - UUID id, - String name -) { - public Client { - if (name == null) { - name = "Unknown"; - } - } -} - -public record ShoppingCartOpened( - UUID shoppingCartId, - Client client -) { } +var seatReservationId = "cinema_ticket-%s".formatted( + Hash.hash("%s_%s".formatted(screeningId, seatId)).toString() +); ``` +See more in [CinemaTicketTests](./src/test/java/io/eventdriven/uniqueness/cinematickets/CinemaTicketTests.java). -We can define upcaster as a function that'll later plug in the deserialisation process. - -We can define the transformation of the object of the old structure as: +This technique could also be useful for GDPR data, like user email: ```java -public static ShoppingCartOpened upcast( - ShoppingCartEvent.ShoppingCartOpened oldEvent - ) { - return new ShoppingCartOpened( - oldEvent.shoppingCartId(), - new Client(oldEvent.clientId(), null) - ); -} +var userId = Hash.hash(email).toString(); ``` -Or we can map it from JSON +## Revoking unique value by stream id -```java -public static ShoppingCartOpened upcast( - byte[] oldEventJson -) { - var oldEvent = Serializer.deserialize(oldEventJson); - - return new ShoppingCartOpened( - UUID.fromString(oldEvent.at("/shoppingCartId").asText()), - new Client( - UUID.fromString(oldEvent.at("/clientId").asText()), - null - ) - ); -} -``` +Sometimes we need to revoke the unique value, e.g. someone cancelled a seat reservation. In EventStoreDB, we can use soft delete to mark stream events as _to-be-delted_but will allow registering it again. It will cause us to reuse the reservation stream for potentially multiple tickets. If we add new events to a soft-deleted stream, and they were not [scavenged](https://developers.eventstore.com/server/v21.10/operations.html#scavenging-events), then those events will reappear in the stream. That's why we should add a _tobstone event_ that will mark where the previous reservation finished. We could subscribe and delete the stream once such an event was appended. -See full sample: [ChangedStructure.cs](./src/test/java/io/eventdriven/uniqueness/upcasters/ChangedStructureTests.java). +See more in [CinemaTicketTests](./src/test/java/io/eventdriven/uniqueness/cinematickets/CinemaTicketTests.java). -### New required property +## Reservation pattern -We can also solve the same cases as simple mappings, but we have more handling options. +For more advanced scenarios, the Reservation pattern comes to the rescue. When performing a business operation, first, we request a resource reservation: e.g. a unique email value. Reservation should be durable and respected by concurrent resources. Typically it's recorded in some durable storage. For instance, for key/value storage like Redis, we may use the unique resource id (e.g. user email) as a key. -Let's say that we forget to add information about who initialised the shopping cart (user id). We cannot retroactively guess what the user was, but if we were lucky enough to track such information in user metadata (e.g. for tracing), we could try to map it. +Most importantly, this storage should allow us to claim the resource with a unique constraint. The reservation can be synchronous or asynchronous (e.g. when it requires more business logic than just adding an entry in some database). We can continue our main business logic only after confirmation that the reservation was successful. -```java -public record EventMetadata( - UUID userId -) { } - -public record ShoppingCartOpened( - UUID shoppingCartId, - UUID clientId, - UUID initializedBy -) { } -``` +With a reserved resource (e.g. user email), we can run the rest of the business logic and store the results in our main data storage. -Upcaster from old object to the new one can look like: +Implementation of the reservation pattern in EventStoreDB could look like this: ```java -public static ShoppingCartOpened upcast( - ShoppingCartEvent.ShoppingCartOpened oldEvent, - EventMetadata eventMetadata -) { - return new ShoppingCartOpened( - oldEvent.shoppingCartId(), - oldEvent.clientId(), - eventMetadata.userId() - ); -} -``` - -From JSON to the object: +public class ESDBResourceReservationHandler implements ResourceReservationHandler { + private static final Logger logger = LoggerFactory.getLogger(ESDBResourceReservationHandler.class); + private final Duration reservationLockDuration; + private final EventStore eventStore; + private final RetryPolicy retryPolicy; -```java -public static ShoppingCartOpened upcast( - byte[] oldEventJson, - byte[] eventMetadataJson + public ESDBResourceReservationHandler( + Duration reservationLockDuration, + RetryPolicy retryPolicy, + EventStore eventStore ) { - var oldEvent = Serializer.deserialize(oldEventJson); - var eventMetadata = Serializer.deserialize(eventMetadataJson); - - return new ShoppingCartOpened( - UUID.fromString(oldEvent.at("/shoppingCartId").asText()), - UUID.fromString(oldEvent.at("/clientId").asText()), - UUID.fromString(eventMetadata.at("/userId").asText()) - ); -} -``` - -See full sample: [NewRequiredPropertyFromMetadata.cs](./src/test/java/io/eventdriven/uniqueness/upcasters/NewRequiredPropertyFromMetadataTests.java). - -## Downcasters - -In the same way, as described above, we can downcast the events from the new structure to the old one (if we have the old reader/listener or, for some reason, want to keep the old format). + this.reservationLockDuration = reservationLockDuration; + this.eventStore = eventStore; + this.retryPolicy = retryPolicy; + } -From the new object to the old one: + @Override + public Boolean reserve(String resourceKey, HandlerWithAck onReserved) { + try { + final var reservationStreamId = streamName(resourceKey); -```java -public static ShoppingCartEvent.ShoppingCartOpened downcast( - ShoppingCartOpened newEvent -) { - return new ShoppingCartEvent.ShoppingCartOpened( - newEvent.shoppingCartId(), - newEvent.client().id() - ); -} -``` + var initiationResult = initiateReservation( + resourceKey, + reservationStreamId, + reservationLockDuration + ); -From new JSON format to the old object: + if (!(initiationResult instanceof EventStore.AppendResult.Success success)) { + logger.error("Failed to reserve '%s'".formatted(reservationStreamId)); + return false; + } -```java -public static ShoppingCartEvent.ShoppingCartOpened downcast( - byte[] newEventJson -) { - var newEvent = Serializer.deserialize(newEventJson); - - return new ShoppingCartEvent.ShoppingCartOpened( - UUID.fromString(newEvent.at("/shoppingCartId").asText()), - UUID.fromString(newEvent.at("/client/id").asText()) - ); -} -``` -See full sample: [ChangedStructure.cs](./src/test/java/io/eventdriven/uniqueness/upcasters/ChangedStructureTests.java). + var succeeded = run(onReserved).orElse(false); -## Events Transformations + if (!succeeded) { + markReservationAsReleased(resourceKey, reservationStreamId, success.nextExpectedRevision()); + return false; + } -At this point, you may be wondering, "That's nice, but how to connect that with real code?". Let's dive into that. + var confirmationReservation = confirmReservation( + resourceKey, + reservationStreamId, + success.nextExpectedRevision() + ); -We'll be plugging between the serialisation and application logic as explained initially. We'll define the class that will contain and process all defined transformations. - -```java -public class EventTransformations { - private final Map> jsonTransformations = new HashMap<>(); - - public Optional tryTransform(String eventTypeName, byte[] json) { - if (!jsonTransformations.containsKey(eventTypeName)) { - return Optional.empty(); + return confirmationReservation instanceof EventStore.AppendResult.Success; + } catch (Throwable e) { + logger.error("Error while reserving resource"); + return false; } - - var transformJson = jsonTransformations.get(eventTypeName); - - return Optional.of(transformJson.apply(json)); } - public EventTransformations register( - String eventTypeName, - Function transformJson - ) { - jsonTransformations.put( - eventTypeName, - json -> transformJson.apply(Serializer.deserialize(json)) - ); - return this; - } + @Override + public void release(String resourceKey) { + var result = eventStore.deleteStream(streamName(resourceKey)); - public EventTransformations register( - Class oldEventClass, - String eventTypeName, - Function transformEvent - ) { - jsonTransformations.put( - eventTypeName, - json -> transformEvent.apply(Serializer.deserialize(oldEventClass, json) - .orElseThrow(() -> new RuntimeException("Error deserializing"))) - ); - return this; - } -} -``` - -We have two `register` methods. Both of them has JSON and handler function as params. One is used to register the `JsonNode` raw transformation, the other to register an object to object one. Sample registrations: - -```java -var transformations = new EventTransformations() - .register(eventTypeV1Name, MultipleTransformationsWithDifferentEventTypesTests::upcastV1) - .register(ShoppingCartOpened.class, eventTypeV2Name, MultipleTransformationsWithDifferentEventTypesTests::upcastV2); -``` - -We also have `tryTransform` that either transforms JSON into the new object structure or returns `null`. We'll use it further on. - -Let's also define the type mapping class responsible for mapping event type name into the CLR type. - -```java -public class EventTypeMapping { - private final Map mappings = new HashMap<>(); - - public EventTypeMapping register(Class eventClass, String... typeNames) { - for (var typeName : typeNames) { - mappings.put(typeName, eventClass); + if (result instanceof EventStore.DeleteResult.UnexpectedFailure) { + throw new RuntimeException("Error while deleting stream: %s".formatted(result)); } - - return this; } - public Class map(String eventType) { - return mappings.get(eventType); - } -} -``` - -and use it as - -```java -final String eventTypeV1Name = "shopping_cart_opened_v1"; -final String eventTypeV2Name = "shopping_cart_opened_v2"; -final String eventTypeV3Name = "shopping_cart_opened_v3"; - -var mapping = new EventTypeMapping() - .register(ShoppingCartInitializedWithStatus.class, - eventTypeV1Name, - eventTypeV2Name, - eventTypeV3Name - ); -``` - -See details in [EventTypeMapping](./src/test/java/io/eventdriven/uniqueness/transformations/MultipleTransformationsWithDifferentEventTypesTests.java). - -It's the most straightforward wrapper that requires manual mapping for all the event types, but it benefits from being explicit and less error-prone. For the convention-based mapper, there is a risk that refactoring accidentally changes the event type name stored in the event store. Still, a viable option is a mixed solution. See more in [EventTypeMapper in the sample project](../../samples/event-sourcing-esdb-simple/src/main/java/io/eventdriven/ecommerce/core/events/EventTypeMapper.java) + private EventStore.AppendResult initiateReservation(String resourceKey, String reservationStreamId, Duration tentativeLockDuration) { + final var reservationInitiated = new ResourceReservationInitiated( + resourceKey, + OffsetDateTime.now(), + tentativeLockDuration + ); -Having those classes, we can define the final deserialisation logic. + return retryPolicy.run(ack -> { + var result = eventStore.append(reservationStreamId, reservationInitiated); -```java -public record EventSerializer( - EventTypeMapping mapping, - EventTransformations transformations -) { - public Optional deserialize(String eventTypeName, byte[] json) { - return transformations.tryTransform(eventTypeName, json) - .or(() -> Serializer.deserialize(mapping().map(eventTypeName), json)); + if(!(result instanceof EventStore.AppendResult.UnexpectedFailure)) + ack.accept(result); + }); } -} -``` - -The logic is simple. It'll either transform JSON through registered transformations (e.g. upcasters or downcasters) or run the regular deserialisation logic. - -See a full sample in [MultipleTransformationsWithDifferentEventTypes.cs](src/test/java/io/eventdriven/uniqueness/transformations/MultipleTransformationsWithDifferentEventTypesTests.java). - -## Stream Transformation - -You might want not only to transform a single event into another (1:1) but also a set of events into another one (N:M). - -Let's take as an example scenario where we can initialise not only empty shopping cart but also filled with products. For some time, we were doing that by publishing multiple events: `ShoppingCartOpened` and `ProductItemAddedToShoppingCart` for each added product item. We decided that we'd like to replace this with event containing list of product items: - -```java -public record ProductItem( - UUID productId, - int quantity -) {} - -public record PricedProductItem( - ProductItem productItem, - double unitPrice -) { } - -public record ShoppingCartOpened( - UUID shoppingCartId, - UUID clientId -) implements ShoppingCartEvent { } - -public record ProductItemAddedToShoppingCart( - UUID shoppingCartId, - PricedProductItem productItem -) implements ShoppingCartEvent { } - -public record ShoppingCartInitializedWithProducts( - UUID shoppingCartId, - UUID clientId, - List productItems -) { } -``` -We want to process our logic using a new event schema. However, that'd require zipping multiple stream events into a single one. We were lucky enough that we decided to store in metadata correlation id. It's an identifier of the command that initiated business logic. All of the events resulting from the command processing will share the same correlation id. - -![stream transformation](./assets/stream-transformation.png) - -Using it, we could decide if `ProductItemAddedToShoppingCart` was a part of the initialisation request or not. - -We need to take the stream events and transform them into another sequence of events. It could be modeled by such class: - -```java -public class StreamTransformations { - private final List, List>> jsonTransformations = new ArrayList<>(); - - public List transform(List events) { - if (jsonTransformations.isEmpty()) - return events; + private EventStore.AppendResult confirmReservation(String resourceKey, String reservationStreamId, StreamRevision expectedRevision) { + final var reservationConfirmed = new ResourceReservationConfirmed( + resourceKey, + OffsetDateTime.now() + ); - var result = events; + return retryPolicy.run(ack -> { + var result = eventStore.append( reservationStreamId, expectedRevision, reservationConfirmed); - for (var transform : jsonTransformations) { - result = transform.apply(result); - } - return result; + if(!(result instanceof EventStore.AppendResult.UnexpectedFailure)) + ack.accept(result); + }); } - public StreamTransformations register(Function, List> transformJson) { - jsonTransformations.add(transformJson); - return this; - } -} -``` - -We allow registering multiple transformations. Thanks to that, we can chain them using the `Aggregate` method, taking the previous transformation's result as a base for the next one. - -To connect it with the deserialisation process, we need to add it to the `EventSerializer` defined in the previous steps. + private void markReservationAsReleased(String resourceKey, String reservationStreamId, StreamRevision expectedRevision) { + // We're marking reservation as to be released instead of deleting stream. + // That's needed as if we'd delete stream here, then we wouldn't get event notification through subscriptions. + // Because of that we wouldn't be able to clear the lookup for timed out reservations. + final var reservationReleased = new ResourceReservationReleaseInitiated( + resourceKey, + OffsetDateTime.now() + ); -```java -public record EventSerializer( - EventTypeMapping mapping, - StreamTransformations streamTransformations, - EventTransformations transformations -) { - public Optional deserialize(String eventTypeName, byte[] json) { - return transformations.tryTransform(eventTypeName, json) - .or(() -> Serializer.deserialize(mapping().map(eventTypeName), json)); - } + retryPolicy.run(ack -> { + var result = eventStore.append( + reservationStreamId, + AppendToStreamOptions.get().expectedRevision(expectedRevision), + EventSerializer.serialize(reservationReleased) + ); - public List> deserialize(List events) { - return streamTransformations.transform(events).stream() - .map(event -> deserialize(event.eventType, event.data)) - .toList(); + if(!(result instanceof EventStore.AppendResult.UnexpectedFailure)) + ack.accept(result); + }); } -} -``` - -We're injecting stream transformations into the deserialisation process. We're performing them first before running upcasters or regular deserialisation. - -We can implement function doing event grouping as described above as: - -```java -public static List flattenInitializedEventsWithProductItemsAdded(List events) { - var cartOpened = events.get(0); - var cartInitializedCorrelationId = - Serializer.deserialize(EventMetadata.class, cartOpened.metaData) - .orElseThrow(() -> new RuntimeException("Error deserializing metadata")) - .correlationId; - - var i = 1; - var productItemsAdded = new ArrayList(); - - while (i < events.size()) { - var eventData = events.get(i); - - if (!eventData.eventType().equals("product_item_added_v1")) - break; - - var correlationId = - Serializer.deserialize(EventMetadata.class, eventData.metaData) - .orElseThrow(() -> new RuntimeException("Error deserializing metadata")) - .correlationId; - if (!correlationId.equals(cartInitializedCorrelationId)) - break; - - productItemsAdded.add(eventData); - i++; + private static String streamName(String resourceKey){ + return "reservation-%s".formatted(resourceKey); } - - var mergedEvent = toShoppingCartInitializedWithProducts( - cartOpened, - productItemsAdded - ); - - return Stream.concat( - Stream.of(mergedEvent), - events.stream().skip(i) - ).toList(); -} - -private static EventData toShoppingCartInitializedWithProducts( - EventData shoppingCartInitialized, - List productItemsAdded -) { - var shoppingCartInitializedJson = - Serializer.deserialize(shoppingCartInitialized.data); - - var newEvent = new ShoppingCartInitializedWithProducts( - UUID.fromString(shoppingCartInitializedJson.at("/shoppingCartId").asText()), - UUID.fromString(shoppingCartInitializedJson.at("/clientId").asText()), - productItemsAdded.stream() - .map(pi -> { - var pricedProductItem = Serializer.deserialize(pi.data); - - return new PricedProductItem( - new ProductItem( - UUID.fromString(pricedProductItem.at("/productItem/productItem/productId").asText()), - pricedProductItem.at("/productItem/productItem/quantity").asInt() - ), - pricedProductItem.at("/productItem/unitPrice").asDouble() - ); - }).toList() - ); - - return new EventData( - "shopping_cart_opened_v2", - Serializer.serialize(newEvent), - shoppingCartInitialized.metaData - ); } ``` -See a full sample in: -- [StreamTransformations.cs](./src/test/java/io/eventdriven/uniqueness/transformations/StreamTransformationsTests.java). -- [esdb/StreamTransformations.cs](./src/test/java/io/eventdriven/uniqueness/transformations/esdb/StreamTransformationsTests.java) showing how to apply them in practice into EventStoreDB. - -## Migrations - -You can say that, well, those patterns are not migrations. Events will stay as they were, and you'll have to keep the old structure forever. That's quite true. Still, this is fine, as typically, you should not change the past. Having precise information, even including bugs, is a valid scenario. It allows you to get insights and see the precise history. However, pragmatically you may sometimes want to have a "clean" event log with only a new schema. - -It appears that composing the patterns described above can support such a case. For example, if you're running EventStoreDB, you can read/subscribe to the event stream, store events in the new stream, or even a new EventStoreDB cluster. Having that, you could even rewrite the whole log and switch databases once the new one caught up. +See more in [ESDBResourceReservationHandler.java](./src/main/java/io/eventdriven/uniqueness/core/resourcereservation/esdb/ESDBResourceReservationHandler.java). -I hope that those samples will show you that you can support many versioning scenarios with basic composition techniques. +Yet, this is not enough. The logic may fail, or the process may die, and we could end up partially processed information. Primarily since EventStoreDB doesn't support transactions (only atomic appends of multiple events to the same stream). We need to ensure that we'll have failures compensated. To do that, we can store the current reservation data in external storage (e.g. relational DB, Redis, etc.). It will need to be held in other storage, as ESDB cannot: +- set TTL for a single event (only for all stream events), +- cannot send scheduled events, +- cannot do filtered queries. -![migrations](./assets/migration.png) +To fill the generic reservation read model, we can use the following event handler [ResourceReservationEventHandler.java](./src/main/java/io/eventdriven/uniqueness/core/resourcereservation/ResourceReservationEventHandler.java) together with a specific business event handler [UserEmailReservationEventHandler.java](./src/main/java/io/eventdriven/uniqueness/users/reservation/UserEmailReservationEventHandler.java). Those event handlers will ensure that we have the read model with information about the reservation. -## Summary +When we have the read model updated, we can run the CRON job, which will clean up timed-out reservations. The scavenging logic can look as follows [ResourceReservationScavenging.java](./src/main/java/io/eventdriven/uniqueness/core/resourcereservation/jpa/ResourceReservationScavenging.java). -I hope that those samples will show you that you can support many versioning scenarios with basic composition techniques. +See also tests showing the example usage: +- fallback - [UserEmailRegistrationFallbackTests.java](./src/test/java/io/eventdriven/uniqueness/users/UserEmailRegistrationFallbackTests.java] +- scavenging: [UserEmailRegistrationScavengingTests.java](./src/test/java/io/eventdriven/uniqueness/users/UserEmailRegistrationScavengingTests.java). -Nevertheless, the best approach is to [not need to do versioning at all](https://event-driven.io/en/how_to_do_event_versioning/). If you're facing such a need, before using the strategies described above, make sure that your business scenario cannot be solved by talking to the business. It may appear that's some flaw in the business process modelling. We should not be trying to fix the issue, but the root cause. +Still, the reservation pattern is a complicated process to operate and orchestrate. As I mentioned at the beginning, the best is to start understanding the problem we're trying to fix, then either compensate or use stream id for uniqueness. If that's not enough, then use Reservation Pattern. It's also worth evaluating other storage for the reservation process coordination, as they may have capabilities (e.g. transactions) to simplify it. \ No newline at end of file diff --git a/samples/uniqueness/src/main/java/io/eventdriven/uniqueness/core/resourcereservation/Hash.java b/samples/uniqueness/src/main/java/io/eventdriven/uniqueness/core/resourcereservation/Hash.java index 2fe72675..58c64347 100644 --- a/samples/uniqueness/src/main/java/io/eventdriven/uniqueness/core/resourcereservation/Hash.java +++ b/samples/uniqueness/src/main/java/io/eventdriven/uniqueness/core/resourcereservation/Hash.java @@ -3,7 +3,7 @@ public final class Hash { /** * Simple hash function for string with additional randomness logic. - * Source: https://stackoverflow.com/a/1660613/10966454 + * Source * @param string input string * @return hashed long value */ diff --git a/samples/uniqueness/src/main/java/io/eventdriven/uniqueness/core/resourcereservation/esdb/ESDBResourceReservationHandler.java b/samples/uniqueness/src/main/java/io/eventdriven/uniqueness/core/resourcereservation/esdb/ESDBResourceReservationHandler.java index b25e6e4e..5d54b5e3 100644 --- a/samples/uniqueness/src/main/java/io/eventdriven/uniqueness/core/resourcereservation/esdb/ESDBResourceReservationHandler.java +++ b/samples/uniqueness/src/main/java/io/eventdriven/uniqueness/core/resourcereservation/esdb/ESDBResourceReservationHandler.java @@ -88,7 +88,7 @@ private EventStore.AppendResult initiateReservation(String resourceKey, String r return retryPolicy.run(ack -> { var result = eventStore.append(reservationStreamId, reservationInitiated); - if(!(result instanceof EventStore.AppendResult.UnexpectedFailure)) + if (!(result instanceof EventStore.AppendResult.UnexpectedFailure)) ack.accept(result); }); } @@ -100,9 +100,9 @@ private EventStore.AppendResult confirmReservation(String resourceKey, String re ); return retryPolicy.run(ack -> { - var result = eventStore.append( reservationStreamId, expectedRevision, reservationConfirmed); + var result = eventStore.append(reservationStreamId, expectedRevision, reservationConfirmed); - if(!(result instanceof EventStore.AppendResult.UnexpectedFailure)) + if (!(result instanceof EventStore.AppendResult.UnexpectedFailure)) ack.accept(result); }); } @@ -123,12 +123,12 @@ private void markReservationAsReleased(String resourceKey, String reservationStr EventSerializer.serialize(reservationReleased) ); - if(!(result instanceof EventStore.AppendResult.UnexpectedFailure)) + if (!(result instanceof EventStore.AppendResult.UnexpectedFailure)) ack.accept(result); }); } - private static String streamName(String resourceKey){ + private static String streamName(String resourceKey) { return "reservation-%s".formatted(resourceKey); } }