Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PHEE-708] Events batching in Importer RDBMS. #2

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
api 'org.postgresql:postgresql:42.5.1'
api 'com.zaxxer:HikariCP:5.0.1'
api 'org.apache.commons:commons-text:1.10.0'
api 'com.google.code.gson:gson:2.9.0'
implementation 'com.amazonaws:aws-java-sdk:1.11.486'
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.1'
implementation 'org.projectlombok:lombok:1.18.22'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ KafkaStreamsConfiguration kStreamsConfig() {
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}

Expand Down
36 changes: 36 additions & 0 deletions src/main/java/hu/dpc/phee/operator/streams/JsonArraySerde.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package hu.dpc.phee.operator.streams;

import com.google.gson.JsonArray;
import com.google.gson.JsonParser;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.StandardCharsets;

public class JsonArraySerde implements Serde<JsonArray> {

@Override
public Serializer<JsonArray> serializer() {
return new JsonArraySerializer();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we save this instance and re-use?

}

@Override
public Deserializer<JsonArray> deserializer() {
return new JsonArrayDeserializer();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we save this instance and re-use?

}

public static class JsonArraySerializer implements Serializer<JsonArray> {
@Override
public byte[] serialize(String topic, JsonArray data) {
return data.toString().getBytes(StandardCharsets.UTF_8);
}
}

public static class JsonArrayDeserializer implements Deserializer<JsonArray> {
@Override
public JsonArray deserialize(String topic, byte[] data) {
return JsonParser.parseString(new String(data, StandardCharsets.UTF_8)).getAsJsonArray();
}
}
}
138 changes: 89 additions & 49 deletions src/main/java/hu/dpc/phee/operator/streams/RecordParser.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package hu.dpc.phee.operator.streams;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import hu.dpc.phee.operator.config.TransferTransformerConfig;
Expand All @@ -18,6 +22,7 @@
import hu.dpc.phee.operator.entity.transfer.TransferRepository;
import hu.dpc.phee.operator.entity.transfer.TransferStatus;
import hu.dpc.phee.operator.entity.variable.Variable;
import hu.dpc.phee.operator.importer.JsonPathReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.logging.log4j.util.Strings;
Expand All @@ -34,6 +39,7 @@
import javax.xml.xpath.XPathFactory;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -72,6 +78,9 @@ public class RecordParser {
@Autowired
TransferTransformerConfig transferTransformerConfig;

@Autowired
ObjectMapper objectMapper;

private final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();

private final XPathFactory xPathFactory = XPathFactory.newInstance();
Expand Down Expand Up @@ -99,7 +108,7 @@ public List<Object> processWorkflowInstance(DocumentContext recordDocument, Stri
if ("TRANSFER".equalsIgnoreCase(flowType)) {
logger.debug("Processing flow of type TRANSFER");
Transfer transfer = inFlightTransferManager.retrieveOrCreateTransfer(bpmn, sample, "PROCESS_INSTANCE");
if ("EVENT".equals(recordType) && "START_EVENT".equals(bpmnElementType) && "ELEMENT_ACTIVATED".equals(intent)) {
if ("EVENT".equals(recordType) && "START_EVENT".equals(bpmnElementType) && "ELEMENT_ACTIVATING".equals(intent)) {
transfer.setStartedAt(new Date(timestamp));
transfer.setDirection(config.get().getDirection());
logger.debug("found {} constant transformers for flow start {}", constantTransformers.size(), bpmn);
Expand Down Expand Up @@ -180,79 +189,110 @@ public void updateRtpTransaction(TransactionRequest transactionRequest, String v
}
}

public List<Object> processVariable(DocumentContext recordDocument, String bpmn, Long workflowInstanceKey, Long workflowKey, Long timestamp, String flowType, DocumentContext sample)throws JsonProcessingException {
logger.debug("Processing variable instance");
String variableName = recordDocument.read("$.value.name", String.class);
String variableValue = recordDocument.read("$.value.value", String.class);
String value = variableValue.startsWith("\"") && variableValue.endsWith("\"") ? StringEscapeUtils.unescapeJson(variableValue.substring(1, variableValue.length() - 1)) : variableValue;
public List<Object> processVariables(JsonArray records, String bpmn, Long workflowInstanceKey, Long workflowKey, String flowType, DocumentContext sample)throws JsonMappingException {

Object object;

if ("TRANSFER".equalsIgnoreCase(flowType)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use switch here? And use an ENUM for the flow type

object = new Transfer();
} else if ("TRANSACTION-REQUEST".equalsIgnoreCase(flowType)) {
object = new TransactionRequest();
} else if ("BATCH".equalsIgnoreCase(flowType)) {
object = new Batch();
} else if ("OUTBOUND_MESSAGES".equalsIgnoreCase(flowType)) {
object = new OutboudMessages();
} else {
object=null;
logger.error("No matching flow types for the given request");
}

List<Object> result= new ArrayList<Object>();
for (JsonElement record : records) {
try {
DocumentContext recordDocument = JsonPathReader.parse(String.valueOf(record));

String variableName = recordDocument.read("$.value.name", String.class);
String variableValue = recordDocument.read("$.value.value", String.class);
String value = variableValue.startsWith("\"") && variableValue.endsWith("\"") ? StringEscapeUtils.unescapeJson(variableValue.substring(1, variableValue.length() - 1)) : variableValue;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has enough context to be moved into a method

Long timestamp = recordDocument.read("$.timestamp");

List<Object> results = List.of(
new Variable()
result.add( new Variable()
.withWorkflowInstanceKey(workflowInstanceKey)
.withName(variableName)
.withWorkflowKey(workflowKey)
.withTimestamp(timestamp)
.withValue(value));

if(variableName.equals("subBatchDetails")) {
parseSubBatchDetails(variableValue);
}
if(variableName.equals("subBatchDetails")) {
parseSubBatchDetails(variableValue);
}

if (variableName.equals("failedTransactionFile")) {
// insert the transaction into transfer table
logger.debug("Name {} and value: {}");
inflightBatchManager.updateTransferTableWithFailedTransaction(workflowInstanceKey, value);
}

if (variableName.equals("batchId")) {
logger.debug("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey);
inflightBatchManager.storeBatchId(workflowInstanceKey, value);
}

logger.debug("finding transformers for bpmn: {} and variable: {}", bpmn, variableName);
List<TransferTransformerConfig.Transformer> matchingTransformers = transferTransformerConfig.getFlows().stream()
.filter(it -> bpmn.equalsIgnoreCase(it.getName()))
.flatMap(it -> it.getTransformers().stream()) // todo - can break here
.filter(it -> variableName.equalsIgnoreCase(it.getVariableName()))
.toList();

matchingTransformers.forEach(transformer -> applyTransformer(object, variableName, value, transformer));

if ("TRANSFER".equalsIgnoreCase(flowType)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again a switch May be.

} else if ("TRANSACTION-REQUEST".equalsIgnoreCase(flowType)) {
if(variableName.equals("state")){
updateRtpTransaction((TransactionRequest) object, value);
}
} else if ("BATCH".equalsIgnoreCase(flowType)) {
//if (!config.get().getName().equalsIgnoreCase("bulk_processor")) {
logger.debug("Inside if condition {}", variableName);
if (variableName.equals("filename")) {
logger.debug("store filename {} in tempDocStore for instance {}", strip(value), workflowInstanceKey);
inflightBatchManager.storeBatchFileName(workflowInstanceKey, value);
}
if (variableName.equals("batchId")) {
logger.debug("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey);
inflightBatchManager.storeBatchId(workflowInstanceKey, value);
}
//}
}

if (variableName.equals("failedTransactionFile")) {
// insert the transaction into transfer table
logger.debug("Name {} and value: {}");
inflightBatchManager.updateTransferTableWithFailedTransaction(workflowInstanceKey, value);
}
} catch (Exception ex) {

if (variableName.equals("batchId")) {
logger.debug("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey);
inflightBatchManager.storeBatchId(workflowInstanceKey, value);
}
}

logger.debug("finding transformers for bpmn: {} and variable: {}", bpmn, variableName);
List<TransferTransformerConfig.Transformer> matchingTransformers = transferTransformerConfig.getFlows().stream()
.filter(it -> bpmn.equalsIgnoreCase(it.getName()))
.flatMap(it -> it.getTransformers().stream())
.filter(it -> variableName.equalsIgnoreCase(it.getVariableName()))
.toList();

matchTransformerForFlowType(flowType, bpmn, sample, matchingTransformers, variableName, value, workflowInstanceKey);

return results;
matchTransformerForFlowType(flowType, bpmn, sample, object);
return result;
}

@Transactional
private void matchTransformerForFlowType(String flowType, String bpmn, DocumentContext sample, List<TransferTransformerConfig.Transformer> matchingTransformers, String variableName, String value, Long workflowInstanceKey) {
private void matchTransformerForFlowType(String flowType, String bpmn, DocumentContext sample, Object object)throws JsonMappingException {
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
Optional<TransferTransformerConfig.Flow> config = transferTransformerConfig.findFlow(bpmn);
if ("TRANSFER".equalsIgnoreCase(flowType)) {
Transfer transfer = inFlightTransferManager.retrieveOrCreateTransfer(bpmn, sample, "VARIABLE");
matchingTransformers.forEach(transformer -> applyTransformer(transfer, variableName, value, transformer));
objectMapper.updateValue(transfer, (Transfer) object);
transferRepository.save(transfer);
} else if ("TRANSACTION-REQUEST".equalsIgnoreCase(flowType)) {
TransactionRequest transactionRequest = inflightTransactionRequestManager.retrieveOrCreateTransaction(bpmn, sample);
if(variableName.equals("state")){
updateRtpTransaction(transactionRequest, value);
}
matchingTransformers.forEach(transformer -> applyTransformer(transactionRequest, variableName, value, transformer));
objectMapper.updateValue(transactionRequest, (TransactionRequest) object);
transactionRequestRepository.save(transactionRequest);
} else if ("BATCH".equalsIgnoreCase(flowType)) {
Batch batch = inflightBatchManager.retrieveOrCreateBatch(bpmn, sample);
matchingTransformers.forEach(transformer -> applyTransformer(batch, variableName, value, transformer));
objectMapper.updateValue(batch, (Batch) object);
batchRepository.save(batch);
//if (!config.get().getName().equalsIgnoreCase("bulk_processor")) {
logger.debug("Inside if condition {}", variableName);
if (variableName.equals("filename")) {
logger.debug("store filename {} in tempDocStore for instance {}", strip(value), workflowInstanceKey);
inflightBatchManager.storeBatchFileName(workflowInstanceKey, value);
}
if (variableName.equals("batchId")) {
logger.debug("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey);
inflightBatchManager.storeBatchId(workflowInstanceKey, value);
}
//}
} else if ("OUTBOUND_MESSAGES".equalsIgnoreCase(flowType)) {
OutboudMessages outboudMessages = inflightOutboundMessageManager.retrieveOrCreateOutboundMessage(bpmn, sample);
matchingTransformers.forEach(transformer -> applyTransformer(outboudMessages, variableName, value, transformer));
objectMapper.updateValue(outboudMessages, (OutboudMessages) object);
outboundMessagesRepository.save(outboudMessages);
} else {
logger.error("No matching flow types for the given request");
Expand Down
Loading