-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Correlate events with metadata #656
Conversation
Problem: KJ can be in inconsistent state when (after purging) meta from `metajournal` was deleted while events from `journal` left behind. By itself its not an issue, but after the journal used again and new meta inserted - then old events will be awailable (probably with duplicated `seqNr`). Solution: create correlation between meta and events that belong to it, i. e. were created while the meta was present. Then, on reading journal, only events that correlate with meta must be used. Implementation: add CorrelationId to `journal.headers` and `metajournal.properties`. In appending events CorrelationId from meta will be used, if meta not present yet - new CorrelationId generated. On reading compare CorrelationId from meta and event ignoring events with another value. CorrelationId is optional value thus validation takes place only if its present in meta.
value | ||
.map { | ||
case CorrelationId(value) => | ||
data.setMap("properties", Map(key -> value).asJava, classOf[String], classOf[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't this discard existing entries in properties
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's tricky part of the implementation. I see two options: reimplement way of binding values so that amending maps/sets will be possible OR just add new columns into journal
and metajournal
.
AFAIK properties
are not used now, but all-in-all you're right - its tricky to set whole map for one value deep into CorrelationId
codec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO we should work on way to update, not reset the extra values...
on related note: I was thinking about full data consistency checks, probably for that we will require some way to "mark" accessible data, like some GCs mark accessible objects with colored flags and remove all, which lack expected color. Maybe introducing extra column(s) is safer and easier way how to approach this problem, though adding columns for every feature is not a nice solution either...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reimplemented logic of binding value on the statement: now its adding key-value to the map instead of overriding whole map. As for now the branch depends on locally build scassandra
, PR still in review: evolution-gaming/scassandra#263
...rc/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandra.scala
Show resolved
Hide resolved
into Map instead of replacing whole Map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall LGTM so far
@@ -145,7 +146,13 @@ object EventualCassandra { | |||
* The implementation itself is abstracted from the calls to Cassandra which | |||
* should be passed as part of [[Statements]] parameter. | |||
*/ | |||
@deprecated("Use apply1 instead", "3.6.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe?
@deprecated("Use apply1 instead", "3.6.0") | |
@deprecated("Use apply2 instead", "3.6.0") |
.../src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandra.scala
Outdated
Show resolved
Hide resolved
journal/src/main/scala/com/evolutiongaming/kafka/journal/CorrelationId.scala
Show resolved
Hide resolved
} else { | ||
Stream | ||
.lift { | ||
Log[F].error(s"Data integrity violated: event $event belong to purged meta, key $key") |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Closed in favor of #665 |
Problem
KJ can be in inconsistent state when (after purging) meta from
metajournal
was deleted while events fromjournal
left behind. By itself its not an issue, but after the journal used again and new meta inserted - then old events will be awailable (probably with duplicatedseqNr
).Solution
Create correlation between meta and events that belong to it, i. e. were created while the meta was present. Then, on reading journal, only events that correlate with meta must be used.
Implementation
Add
CorrelationId
tojournal.headers
andmetajournal.properties
. In appending eventsCorrelationId
from meta will be used, if meta not present yet - newCorrelationId
generated. On reading compareCorrelationId
from meta and event ignoring events with another value.CorrelationId
is optional value thus validation takes place only if its present in meta.