Skip to content

Commit

Permalink
Merge pull request #164 from zalando-nakadi/154-key-extractor-bean-ex…
Browse files Browse the repository at this point in the history
…periment

support compacted event types (#154) via spring beans
  • Loading branch information
ePaul authored Dec 21, 2022
2 parents ae433d1 + aff88fb commit 1e284c8
Show file tree
Hide file tree
Showing 17 changed files with 590 additions and 159 deletions.
56 changes: 49 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

[Nakadi](https://github.com/zalando/nakadi) is a distributed event bus that implements a RESTful API abstraction instead of Kafka-like queues.

The goal of this Spring Boot starter is to simplify the reliable integration between event producer and Nakadi. When we send events from a transactional application, a few recurring challenges appear:
The goal of **this** Spring Boot starter is to simplify the reliable integration between event producer and Nakadi. When we send events from a transactional application, a few recurring challenges appear:
- we have to make sure that events from a transaction get sent, when the transaction has been committed,
- we have to make sure that events from a transaction do not get sent, when the transaction has been rolled back,
- we have to make sure that events get sent, even if an error occurred while sending the event,
Expand All @@ -24,7 +24,8 @@ This project is mature, used in production in some services at Zalando, and in a

Be aware that this library **does neither guarantee that events are sent exactly once, nor that they are sent in the order they have been persisted**. This is not a bug but a design decision that allows us to skip and retry sending events later in case of temporary failures. So make sure that your events are designed to be processed out of order (See [Rule 203 in Zalando's API guidelines](https://opensource.zalando.com/restful-api-guidelines/#203)). To help you in this matter, the library generates a *strictly monotonically increasing event id* (field `metadata/eid` in Nakadi's event object) that can be used to reconstruct the message order.

Unfortunately this approach is not compatible with Nakadi's compacted event types – it can happen that the last event submitted (and thus the one which will stay after compaction) is not the last event which was actually been fired. For this reason, the library currently also doesn't provide any access to Nakadi's [`partition_compaction_key`](https://nakadi.io/manual.html#definition_EventMetadata*partition_compaction_key) feature.
Unfortunately this approach is fundamentally incompatible with Nakadi's compacted event types – it can happen that the last event submitted (and thus the one which will stay after compaction) is not the last event which was actually been fired.
We still provide means to set the compaction key, see [compacted event types](#compacted-event-types) below.

## Versioning

Expand Down Expand Up @@ -107,7 +108,7 @@ token. The easiest way to do so is to include the [Zalando Tokens library](https
</dependency>
```

This starter will detect and auto configure it.
This starter will detect and autoconfigure it.

If your application is running in Zalando's Kubernetes environment, you have to configure the credential rotation:
```yaml
Expand Down Expand Up @@ -158,16 +159,18 @@ nakadi-producer:
```

#### Implement Nakadi authentication yourself
If you do not use the STUPS Tokens library, you can implement token retrieval yourself by defining a Spring bean of type `org.zalando.nakadiproducer.AccessTokenProvider`. The starter will detect it and call it once for each request to retrieve the token.
If you do not use the STUPS Tokens library, you can implement token retrieval yourself by defining a Spring bean of
type [`AccessTokenProvider`](nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/AccessTokenProvider.java).
The starter will detect it and call it once for each request to retrieve the token.

### Creating events

The typical use case for this library is to publish events like creating or updating of some objects.

In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java)
In order to store events you can autowire the [`EventLogWriter`](nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java)
service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`.

To store events in bulk the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used.
To store several events of the same type in bulk, the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used.

You normally don't need to call `fireSnapshotEvent` directly, see below for [snapshot creation](#event-snapshots-optional).

Expand Down Expand Up @@ -222,14 +225,50 @@ For business events, you have just two parameters, the **eventType** and the eve
You usually should fire those also in the same transaction as you are storing the results of the
process step the event is reporting.

#### Compacted event types

Nakadi offers a "log-compaction" feature, where each event (on an event type) has a
[`partition_compaction_key`](https://nakadi.io/manual.html#definition_EventMetadata*partition_compaction_key), and
Nakadi will (after delivering to live subscribers) clean up events, but leave the latest event for each
compaction key available long-term.

This library (by design) doesn't guarantee the submission order of events – especially when there are problems
on Nakadi side and some events fail (and are retried later), earlier produced events (for the same entity)
can be submitted after later events. For log-compacted event types this means that an outdated event will remain
in the topic for future subscribers to read.
It is therefore generally **not recommended** to use this library (or any solution which doesn't guarantee the order)
for sending events to a compacted event type.

In some cases, like when there usually are large time gaps between producing events for the same compaction key,
the risk of getting events for the same key out-of-order is small.
For these cases, you just can define a bean of type [`CompactionKeyExtractor`](nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/CompactionKeyExtractor.java),
and then all events of that event type will be sent with a compaction key.

```java
@Configuration
public class NakadiProducerConfiguration {
@Bean
public CompactionKeyExtractor extractorForWarehouseEvents() {
return CompactionKeyExtractor.of("wholesale.warehouse-change-event",
Warehouse.class, Warehouse::getCode);
}
}
```
The service class sending the event looks exactly the same as above.

For corner cases: You can have multiple such extractors for the same event type, any one where the class object
matches the payload object (in undefined order) will be used.
There are also some more factory methods with different signatures for more special cases, and you can also write
your own implementation (but for the usual cases, the one shown here should be enough).

### Event snapshots (optional)

A Snapshot event is a special type of data change event (data operation) defined by Nakadi.
It does not represent a change of the state of a resource, but a current snapshot of its state. It can be useful to
bootstrap a new consumer or to recover from inconsistencies between sender and consumer after an incident.

You can create snapshot events programmatically (using EventLogWriter.fireSnapshotEvent), but usually snapshot event
creation is a irregular, manually triggered maintenance task.
creation is an irregular, manually triggered maintenance task.

This library provides a Spring Boot Actuator endpoint named `snapshot_event_creation` that can be used to trigger a Snapshot for a given event type. Assuming your management port is set to `7979`,

Expand Down Expand Up @@ -260,6 +299,9 @@ your `application.properties` includes
management.endpoints.web.exposure.include=snapshot-event-creation,your-other-endpoints,...`
```
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface.
(Note that this will automatically work together with the compaction key feature mentioned above,
if you have registered a compaction key extractor matching the type of the data objects in your snapshots.)
The optional filter specifier of the trigger request will be passed as a string parameter to the
SnapshotEventGenerator's `generateSnapshots` method and may be null, if none is given.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.zalando.fahrschein.NakadiClient;
import org.zalando.fahrschein.NakadiClientBuilder;
import org.zalando.fahrschein.http.api.ContentEncoding;
import org.zalando.fahrschein.http.api.RequestFactory;
import org.zalando.fahrschein.http.simple.SimpleRequestFactory;
import org.zalando.nakadiproducer.eventlog.CompactionKeyExtractor;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepositoryImpl;
Expand Down Expand Up @@ -139,8 +139,8 @@ public SnapshotCreationService snapshotCreationService(

@Bean
public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, ObjectMapper objectMapper,
FlowIdComponent flowIdComponent) {
return new EventLogWriterImpl(eventLogRepository, objectMapper, flowIdComponent);
FlowIdComponent flowIdComponent, List<CompactionKeyExtractor> extractorList) {
return new EventLogWriterImpl(eventLogRepository, objectMapper, flowIdComponent, extractorList);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,17 @@ public void persist(Collection<EventLog> eventLogs) {
namedParameterMap.addValue("lastModified", now);
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
namedParameterMap.addValue("compactionKey", eventLog.getCompactionKey());
return namedParameterMap;
})
.toArray(MapSqlParameterSource[]::new);

jdbcTemplate.batchUpdate(
"INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until) " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key)" +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil)",
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey)",
namedParameterMaps
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE nakadi_events.event_log
ADD COLUMN compaction_key TEXT NULL
;
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import static com.jayway.jsonpath.Criteria.where;
import static com.jayway.jsonpath.JsonPath.read;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

import java.io.IOException;
import java.util.List;
Expand All @@ -13,17 +12,23 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.ContextConfiguration;
import org.zalando.nakadiproducer.eventlog.CompactionKeyExtractor;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.transmission.MockNakadiPublishingClient;
import org.zalando.nakadiproducer.transmission.impl.EventTransmitter;
import org.zalando.nakadiproducer.util.Fixture;
import org.zalando.nakadiproducer.util.MockPayload;

@ContextConfiguration(classes = EndToEndTestIT.Config.class)
public class EndToEndTestIT extends BaseMockedExternalCommunicationIT {
private static final String MY_DATA_CHANGE_EVENT_TYPE = "myDataChangeEventType";
private static final String SECOND_DATA_CHANGE_EVENT_TYPE = "secondDataChangeEventType";
private static final String MY_BUSINESS_EVENT_TYPE = "myBusinessEventType";
public static final String PUBLISHER_DATA_TYPE = "nakadi:some-publisher";
private static final String CODE = "code123";
public static final String COMPACTION_KEY = "Hello World";

@Autowired
private EventLogWriter eventLogWriter;
Expand Down Expand Up @@ -55,6 +60,36 @@ public void dataEventsShouldBeSubmittedToNakadi() throws IOException {
assertThat(read(value.get(0), "$.data.code"), is(CODE));
}

@Test
public void compactionKeyIsPreserved() throws IOException {
MockPayload payload = Fixture.mockPayload(1, CODE);
eventLogWriter.fireDeleteEvent(SECOND_DATA_CHANGE_EVENT_TYPE, PUBLISHER_DATA_TYPE, payload);
eventLogWriter.fireBusinessEvent(MY_BUSINESS_EVENT_TYPE, payload);

eventTransmitter.sendEvents();

List<String> dataEvents = nakadiClient.getSentEvents(SECOND_DATA_CHANGE_EVENT_TYPE);
assertThat(dataEvents.size(), is(1));
assertThat(read(dataEvents.get(0), "$.metadata.partition_compaction_key"), is(COMPACTION_KEY));

List<String> businessEvents = nakadiClient.getSentEvents(MY_BUSINESS_EVENT_TYPE);
assertThat(businessEvents.size(), is(1));
assertThat(read(businessEvents.get(0), "$.metadata.partition_compaction_key"), is(CODE));
}

@Test
public void compactionKeyIsNotInvented() throws IOException {
MockPayload payload = Fixture.mockPayload(1, CODE);
eventLogWriter.fireDeleteEvent(MY_DATA_CHANGE_EVENT_TYPE, PUBLISHER_DATA_TYPE, payload);

eventTransmitter.sendEvents();
List<String> value = nakadiClient.getSentEvents(MY_DATA_CHANGE_EVENT_TYPE);

assertThat(value.size(), is(1));
assertThat(read(value.get(0), "$.metadata[?]", where("partition_compaction_key").exists(true)),
is(empty()));
}

@Test
public void businessEventsShouldBeSubmittedToNakadi() throws IOException {
MockPayload payload = Fixture.mockPayload(1, CODE);
Expand All @@ -75,4 +110,16 @@ public void businessEventsShouldBeSubmittedToNakadi() throws IOException {
assertThat(read(value.get(0), "$[?]", where("data_type").exists(true)), is(empty()));
assertThat(read(value.get(0), "$[?]", where("data").exists(true)), is(empty()));
}

public static class Config {
@Bean
public CompactionKeyExtractor compactionKeyExtractorForSecondDataEventType() {
return CompactionKeyExtractor.of(SECOND_DATA_CHANGE_EVENT_TYPE, MockPayload.class, m -> COMPACTION_KEY);
}

@Bean
public CompactionKeyExtractor keyExtractorForBusinessEventType() {
return CompactionKeyExtractor.of(MY_BUSINESS_EVENT_TYPE, MockPayload.class, MockPayload::getCode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {

private final String WAREHOUSE_EVENT_TYPE = "wholesale.warehouse-change-event";

public static final String COMPACTION_KEY = "COMPACTED";

@BeforeEach
public void setUp() {
eventLogRepository.deleteAll();

final EventLog eventLog = EventLog.builder().eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
final EventLog eventLog = EventLog.builder()
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
.eventType(WAREHOUSE_EVENT_TYPE)
.compactionKey(COMPACTION_KEY)
.flowId("FLOW_ID").build();
eventLogRepository.persist(eventLog);
}

@Test
public void findEventRepositoryId() {
public void testFindEventInRepositoryById() {
Integer id = jdbcTemplate.queryForObject(
"SELECT id FROM nakadi_events.event_log WHERE flow_id = 'FLOW_ID'",
Integer.class);
Expand All @@ -60,6 +64,7 @@ public void findEventRepositoryId() {
private void compareWithPersistedEvent(final EventLog eventLog) {
assertThat(eventLog.getEventBodyData(), is(WAREHOUSE_EVENT_BODY_DATA));
assertThat(eventLog.getEventType(), is(WAREHOUSE_EVENT_TYPE));
assertThat(eventLog.getCompactionKey(), is(COMPACTION_KEY));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.zalando.nakadiproducer.eventlog;

import org.zalando.nakadiproducer.eventlog.CompactionKeyExtractors.SimpleCompactionKeyExtractor;
import org.zalando.nakadiproducer.eventlog.CompactionKeyExtractors.TypedCompactionKeyExtractor;

import java.util.Optional;
import java.util.function.Function;

/**
* This interface defines a way of extracting a compaction key from an object which
* is sent as a payload in a compacted event type.
* In most cases, for each compacted event type exactly one such object will be made known to the producer, and
* you can define it using {@link #of(String, Class, Function)}, passing a method reference or a lambda.
* For special occasions (e.g. where objects of different classes are used as payloads for the same event type)
* also multiple extractors for the same event type are supported – in this case any which returns a
* non-empty optional will be used.
*/
public interface CompactionKeyExtractor {

default String getKeyOrNull(Object payload) {
return tryGetKeyFor(payload).orElse(null);
}

Optional<String> tryGetKeyFor(Object o);

String getEventType();

/**
* A type-safe compaction key extractor. This will be the one to be used by most applications.
*
* @param eventType Indicates the event type. Only events sent to this event type will be considered.
* @param type A Java type for payload objects. Only payload objects where {@code type.isInstance(payload)}
* will be considered at all.
* @param extractorFunction A function extracting a compaction key from a payload object.
* This will commonly be given as a method reference or lambda.
* @return A compaction key extractor, to be defined as a spring bean (if using the spring-boot starter)
* or passed manually to the event log writer implementation (if using nakadi-producer directly).
* (This should not return null.)
* @param <X> the type of {@code type} and input type of {@code extractorFunction}.
*/
static <X> CompactionKeyExtractor of(String eventType, Class<X> type, Function<X, String> extractorFunction) {
return new TypedCompactionKeyExtractor<>(eventType, type, extractorFunction);
}

/**
* Non-type safe key extractor, returning an Optional.
* @param eventType The event type for which this extractor is intended.
* @param extractor The extractor function. It is supposed to return {@link Optional#empty()} if this extractor
* can't handle the input object, otherwise the actual key.
* @return a key extractor object.
*/
static CompactionKeyExtractor ofOptional(String eventType, Function<Object, Optional<String>> extractor) {
return new SimpleCompactionKeyExtractor(eventType, extractor);
}

/**
* Non-type safe key extractor, returning null for unknown objects.
* @param eventType The event type for which this extractor is intended.
* @param extractor The extractor function. It is supposed to return {@code null} if this extractor
* can't handle the input object, otherwise the actual key.
* @return a key extractor object.
*/
static CompactionKeyExtractor ofNullable(String eventType, Function<Object, String> extractor) {
return new SimpleCompactionKeyExtractor(eventType, extractor.andThen(Optional::ofNullable));
}

/**
* An universal key extractor, capable of handling all objects.
* @param eventType The event type for which this extractor is intended.
* @param extractor The extractor function. It is not allowed to return {@code null}.
* @return a key extractor object.
*/
static CompactionKeyExtractor of(String eventType, Function<Object, String> extractor) {
return new SimpleCompactionKeyExtractor(eventType, extractor.andThen(Optional::of));
}
}
Loading

0 comments on commit 1e284c8

Please sign in to comment.