Skip to content

Commit

Permalink
set fill_sequence_number_gaps in ConsumerEventInit (#131)
Browse files Browse the repository at this point in the history
* set fill_sequence_number_gaps in ConsumerEventInit

* added to the proto in akka/akka-projection#1076
* means that there is no need for a config change on the jvm side
* ReplicaInfo was also added, but that is not used here because
  that is for replicated event sourcing

* one more

* unused
  • Loading branch information
patriknw authored Dec 5, 2023
1 parent 1eefc1f commit 7763e3d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ message ConsumerEventInit {
string origin_id = 1;
// the stream id of the type of entity the producer wants to push
string stream_id = 2;
// if gaps in sequence numbers may exist and should be filled in
bool fill_sequence_number_gaps = 3;
}

message ConsumerEventStart {
repeated FilterCriteria filter = 1;
ReplicaInfo replica_info = 2;
}

message KeepAlive { }
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message InitReq {
Offset offset = 4;
// consumer defined event filters
repeated FilterCriteria filter = 5;
ReplicaInfo replica_info = 6;
}

// Add filter criteria to exclude and include events for matching entities.
Expand Down Expand Up @@ -191,8 +192,10 @@ message RemoveIncludeTopics {

message Offset {
google.protobuf.Timestamp timestamp = 1;
// events with these sequence numbers for this timestamp have already been
// processed and doesn't have to be emitted again
// Events with these sequence numbers for this timestamp have already been
// processed and doesn't have to be emitted again.
// If empty it is assumed to be the persistence_id -> seq_nr of enclosing Event
// or FilteredEvent.
repeated PersistenceIdSeqNr seen = 2;
}

Expand All @@ -201,6 +204,23 @@ message PersistenceIdSeqNr {
int64 seq_nr = 2;
}

// Used for Replicated Event Sourcing to filter events based on origin.
// For edge topologies, like star topologies, an edge replica is not connected
// to all other replicas, but should be able to receive events indirectly via
// the replica that it is consuming from.
//
// Events originating from other replicas that the consumer is connected to are excluded
// and emitted as FilteredEvent from the producer side, because the consumer will receive
// them directly from the other replica.
// Events originating from the consumer replica itself are excluded (break the cycle).
// Events originating from the producer replica are always included.
message ReplicaInfo {
// The replica id of the consumer
string replica_id = 1;
// Other replicas that the consumer is connected to.
repeated string other_replica_ids = 2;
}


message StreamOut {
oneof message {
Expand Down Expand Up @@ -259,6 +279,7 @@ message LoadEventRequest {
string stream_id = 1;
string persistence_id = 2;
int64 seq_nr = 3;
ReplicaInfo replica_info = 4;
}

// Response to `LoadEventRequest`.
Expand Down
2 changes: 2 additions & 0 deletions akka-projection-rs-grpc/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ where
.collect()
},
),
replica_info: None,
})),
}])
.chain(consumer_filters),
Expand Down Expand Up @@ -257,6 +258,7 @@ where
stream_id: self.stream_id.to_string(),
persistence_id: persistence_id.to_string(),
seq_nr: seq_nr as i64,
replica_info: None,
})
.await
{
Expand Down
8 changes: 6 additions & 2 deletions akka-projection-rs-grpc/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ where
proto::ConsumerEventInit {
origin_id,
stream_id,
fill_sequence_number_gaps: true,
},
)),
};
Expand Down Expand Up @@ -311,7 +312,8 @@ where
stream_out = stream_outs.next() => match stream_out {
Some(Ok(proto::ConsumeEventOut { message })) =>
match message {
Some(proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter })) => {
Some(proto::consume_event_out::Message::Start(proto::ConsumerEventStart {
filter, replica_info: None})) => {
debug!("Starting the protocol");
let _ = consumer_filters.send(
filter
Expand Down Expand Up @@ -455,6 +457,7 @@ mod tests {
Some(proto::consume_event_in::Message::Init(proto::ConsumerEventInit {
origin_id,
stream_id,
fill_sequence_number_gaps: _,
})),
})) = consume_events_in.next().await
{
Expand All @@ -465,7 +468,8 @@ mod tests {
proto::ConsumerEventStart {
filter: vec![proto::FilterCriteria {
message: Some(proto::filter_criteria::Message::ExcludeEntityIds(proto::ExcludeEntityIds { entity_ids: vec![] })),
}]
}],
replica_info: None,
},
)),
});
Expand Down

0 comments on commit 7763e3d

Please sign in to comment.