-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PHEE-708] Events batching in Importer RDBMS. #2
base: develop
Are you sure you want to change the base?
Conversation
4447c2f
to
a275727
Compare
b8a5df2
to
84edbec
Compare
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofSeconds(aggregationWindowSeconds), Duration.ZERO)) | ||
.aggregate(ArrayList::new, aggregator, merger, Materialized.with(STRING_SERDE, ListSerde(ArrayList.class, STRING_SERDE))) | ||
.groupBy((key, value) -> extractCompositeKey(value)) | ||
.windowedBy(TimeWindows.of(Duration.ofMillis(300)).grace(Duration.ofMillis(100))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this time configurable so that this can be fine tuned externally
private String extractCompositeKey(String value) { | ||
DocumentContext documentContext = JsonPathReader.parse(value); | ||
String workflowInstanceKey = documentContext.read("value.processInstanceKey").toString(); | ||
String recordType = documentContext.read("valueType").toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Home these fields won't create NPE
@@ -206,6 +217,10 @@ public void process(Object _key, Object _value) { | |||
} | |||
}); | |||
} | |||
|
|||
if (valueType.equals("VARIABLE")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not clear with this 'break' here. Are we searching for variable?
|
||
@Override | ||
public Serializer<JsonArray> serializer() { | ||
return new JsonArraySerializer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we save this instance and re-use?
|
||
@Override | ||
public Deserializer<JsonArray> deserializer() { | ||
return new JsonArrayDeserializer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we save this instance and re-use?
|
||
Object object; | ||
|
||
if ("TRANSFER".equalsIgnoreCase(flowType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use switch here? And use an ENUM for the flow type
|
||
String variableName = recordDocument.read("$.value.name", String.class); | ||
String variableValue = recordDocument.read("$.value.value", String.class); | ||
String value = variableValue.startsWith("\"") && variableValue.endsWith("\"") ? StringEscapeUtils.unescapeJson(variableValue.substring(1, variableValue.length() - 1)) : variableValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has enough context to be moved into a method
|
||
matchingTransformers.forEach(transformer -> applyTransformer(object, variableName, value, transformer)); | ||
|
||
if ("TRANSFER".equalsIgnoreCase(flowType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again a switch May be.
PHEE-708 Events batching in Importer RDBMS.
Checklist
Please make sure these boxes are checked before submitting your pull request - thanks!
Design related bullet points or design document link related to this PR added in the description above.
Updated corresponding Postman Collection or Api documentation for the changes in this PR.
Create/update unit or integration tests for verifying the changes made.
Add required Swagger annotation and update API documentation with details of any API changes if applicable
Followed the naming conventions as given in https://docs.google.com/document/d/1Q4vaMSzrTxxh9TS0RILuNkSkYCxotuYk1Xe0CMIkkCU/edit?usp=sharing