diff --git a/build.gradle b/build.gradle index e65d1007..51e5b738 100644 --- a/build.gradle +++ b/build.gradle @@ -42,6 +42,7 @@ dependencies { api 'org.postgresql:postgresql:42.5.1' api 'com.zaxxer:HikariCP:5.0.1' api 'org.apache.commons:commons-text:1.10.0' + api 'com.google.code.gson:gson:2.9.0' implementation 'com.amazonaws:aws-java-sdk:1.11.486' testImplementation 'org.junit.jupiter:junit-jupiter:5.9.1' implementation 'org.projectlombok:lombok:1.18.22' diff --git a/src/main/java/hu/dpc/phee/operator/config/KafkaConfiguration.java b/src/main/java/hu/dpc/phee/operator/config/KafkaConfiguration.java index b1187eda..2fac7e0a 100644 --- a/src/main/java/hu/dpc/phee/operator/config/KafkaConfiguration.java +++ b/src/main/java/hu/dpc/phee/operator/config/KafkaConfiguration.java @@ -70,6 +70,7 @@ KafkaStreamsConfiguration kStreamsConfig() { props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers); props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(COMMIT_INTERVAL_MS_CONFIG, 1000); + props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration(props); } diff --git a/src/main/java/hu/dpc/phee/operator/streams/JsonArraySerde.java b/src/main/java/hu/dpc/phee/operator/streams/JsonArraySerde.java new file mode 100644 index 00000000..76a6eeb3 --- /dev/null +++ b/src/main/java/hu/dpc/phee/operator/streams/JsonArraySerde.java @@ -0,0 +1,36 @@ +package hu.dpc.phee.operator.streams; + +import com.google.gson.JsonArray; +import com.google.gson.JsonParser; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +import java.nio.charset.StandardCharsets; + +public class JsonArraySerde implements Serde { + + @Override + public Serializer serializer() { + return new JsonArraySerializer(); + } + + @Override + public Deserializer deserializer() { + return new JsonArrayDeserializer(); + } + + public static class JsonArraySerializer implements Serializer { + @Override + public byte[] serialize(String topic, JsonArray data) { + return data.toString().getBytes(StandardCharsets.UTF_8); + } + } + + public static class JsonArrayDeserializer implements Deserializer { + @Override + public JsonArray deserialize(String topic, byte[] data) { + return JsonParser.parseString(new String(data, StandardCharsets.UTF_8)).getAsJsonArray(); + } + } +} diff --git a/src/main/java/hu/dpc/phee/operator/streams/RecordParser.java b/src/main/java/hu/dpc/phee/operator/streams/RecordParser.java index 84922d4b..f9b5d0b2 100644 --- a/src/main/java/hu/dpc/phee/operator/streams/RecordParser.java +++ b/src/main/java/hu/dpc/phee/operator/streams/RecordParser.java @@ -1,8 +1,12 @@ package hu.dpc.phee.operator.streams; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; import hu.dpc.phee.operator.config.TransferTransformerConfig; @@ -18,6 +22,7 @@ import hu.dpc.phee.operator.entity.transfer.TransferRepository; import hu.dpc.phee.operator.entity.transfer.TransferStatus; import hu.dpc.phee.operator.entity.variable.Variable; +import hu.dpc.phee.operator.importer.JsonPathReader; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringEscapeUtils; import org.apache.logging.log4j.util.Strings; @@ -34,6 +39,7 @@ import javax.xml.xpath.XPathFactory; import java.io.IOException; import java.io.StringReader; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -72,6 +78,9 @@ public class RecordParser { @Autowired TransferTransformerConfig transferTransformerConfig; + @Autowired + ObjectMapper objectMapper; + private final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); private final XPathFactory xPathFactory = XPathFactory.newInstance(); @@ -99,7 +108,7 @@ public List processWorkflowInstance(DocumentContext recordDocument, Stri if ("TRANSFER".equalsIgnoreCase(flowType)) { logger.debug("Processing flow of type TRANSFER"); Transfer transfer = inFlightTransferManager.retrieveOrCreateTransfer(bpmn, sample, "PROCESS_INSTANCE"); - if ("EVENT".equals(recordType) && "START_EVENT".equals(bpmnElementType) && "ELEMENT_ACTIVATED".equals(intent)) { + if ("EVENT".equals(recordType) && "START_EVENT".equals(bpmnElementType) && "ELEMENT_ACTIVATING".equals(intent)) { transfer.setStartedAt(new Date(timestamp)); transfer.setDirection(config.get().getDirection()); logger.debug("found {} constant transformers for flow start {}", constantTransformers.size(), bpmn); @@ -180,79 +189,110 @@ public void updateRtpTransaction(TransactionRequest transactionRequest, String v } } - public List processVariable(DocumentContext recordDocument, String bpmn, Long workflowInstanceKey, Long workflowKey, Long timestamp, String flowType, DocumentContext sample)throws JsonProcessingException { - logger.debug("Processing variable instance"); - String variableName = recordDocument.read("$.value.name", String.class); - String variableValue = recordDocument.read("$.value.value", String.class); - String value = variableValue.startsWith("\"") && variableValue.endsWith("\"") ? StringEscapeUtils.unescapeJson(variableValue.substring(1, variableValue.length() - 1)) : variableValue; + public List processVariables(JsonArray records, String bpmn, Long workflowInstanceKey, Long workflowKey, String flowType, DocumentContext sample)throws JsonMappingException { + + Object object; + + if ("TRANSFER".equalsIgnoreCase(flowType)) { + object = new Transfer(); + } else if ("TRANSACTION-REQUEST".equalsIgnoreCase(flowType)) { + object = new TransactionRequest(); + } else if ("BATCH".equalsIgnoreCase(flowType)) { + object = new Batch(); + } else if ("OUTBOUND_MESSAGES".equalsIgnoreCase(flowType)) { + object = new OutboudMessages(); + } else { + object=null; + logger.error("No matching flow types for the given request"); + } + + List result= new ArrayList(); + for (JsonElement record : records) { + try { + DocumentContext recordDocument = JsonPathReader.parse(String.valueOf(record)); + + String variableName = recordDocument.read("$.value.name", String.class); + String variableValue = recordDocument.read("$.value.value", String.class); + String value = variableValue.startsWith("\"") && variableValue.endsWith("\"") ? StringEscapeUtils.unescapeJson(variableValue.substring(1, variableValue.length() - 1)) : variableValue; + Long timestamp = recordDocument.read("$.timestamp"); - List results = List.of( - new Variable() + result.add( new Variable() .withWorkflowInstanceKey(workflowInstanceKey) .withName(variableName) .withWorkflowKey(workflowKey) .withTimestamp(timestamp) .withValue(value)); - if(variableName.equals("subBatchDetails")) { - parseSubBatchDetails(variableValue); - } + if(variableName.equals("subBatchDetails")) { + parseSubBatchDetails(variableValue); + } + + if (variableName.equals("failedTransactionFile")) { + // insert the transaction into transfer table + logger.debug("Name {} and value: {}"); + inflightBatchManager.updateTransferTableWithFailedTransaction(workflowInstanceKey, value); + } + + if (variableName.equals("batchId")) { + logger.debug("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey); + inflightBatchManager.storeBatchId(workflowInstanceKey, value); + } + + logger.debug("finding transformers for bpmn: {} and variable: {}", bpmn, variableName); + List matchingTransformers = transferTransformerConfig.getFlows().stream() + .filter(it -> bpmn.equalsIgnoreCase(it.getName())) + .flatMap(it -> it.getTransformers().stream()) // todo - can break here + .filter(it -> variableName.equalsIgnoreCase(it.getVariableName())) + .toList(); + + matchingTransformers.forEach(transformer -> applyTransformer(object, variableName, value, transformer)); + + if ("TRANSFER".equalsIgnoreCase(flowType)) { + } else if ("TRANSACTION-REQUEST".equalsIgnoreCase(flowType)) { + if(variableName.equals("state")){ + updateRtpTransaction((TransactionRequest) object, value); + } + } else if ("BATCH".equalsIgnoreCase(flowType)) { + //if (!config.get().getName().equalsIgnoreCase("bulk_processor")) { + logger.debug("Inside if condition {}", variableName); + if (variableName.equals("filename")) { + logger.debug("store filename {} in tempDocStore for instance {}", strip(value), workflowInstanceKey); + inflightBatchManager.storeBatchFileName(workflowInstanceKey, value); + } + if (variableName.equals("batchId")) { + logger.debug("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey); + inflightBatchManager.storeBatchId(workflowInstanceKey, value); + } + //} + } - if (variableName.equals("failedTransactionFile")) { - // insert the transaction into transfer table - logger.debug("Name {} and value: {}"); - inflightBatchManager.updateTransferTableWithFailedTransaction(workflowInstanceKey, value); - } + } catch (Exception ex) { - if (variableName.equals("batchId")) { - logger.debug("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey); - inflightBatchManager.storeBatchId(workflowInstanceKey, value); + } } - - logger.debug("finding transformers for bpmn: {} and variable: {}", bpmn, variableName); - List matchingTransformers = transferTransformerConfig.getFlows().stream() - .filter(it -> bpmn.equalsIgnoreCase(it.getName())) - .flatMap(it -> it.getTransformers().stream()) - .filter(it -> variableName.equalsIgnoreCase(it.getVariableName())) - .toList(); - - matchTransformerForFlowType(flowType, bpmn, sample, matchingTransformers, variableName, value, workflowInstanceKey); - - return results; + matchTransformerForFlowType(flowType, bpmn, sample, object); + return result; } @Transactional - private void matchTransformerForFlowType(String flowType, String bpmn, DocumentContext sample, List matchingTransformers, String variableName, String value, Long workflowInstanceKey) { + private void matchTransformerForFlowType(String flowType, String bpmn, DocumentContext sample, Object object)throws JsonMappingException { + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); Optional config = transferTransformerConfig.findFlow(bpmn); if ("TRANSFER".equalsIgnoreCase(flowType)) { Transfer transfer = inFlightTransferManager.retrieveOrCreateTransfer(bpmn, sample, "VARIABLE"); - matchingTransformers.forEach(transformer -> applyTransformer(transfer, variableName, value, transformer)); + objectMapper.updateValue(transfer, (Transfer) object); transferRepository.save(transfer); } else if ("TRANSACTION-REQUEST".equalsIgnoreCase(flowType)) { TransactionRequest transactionRequest = inflightTransactionRequestManager.retrieveOrCreateTransaction(bpmn, sample); - if(variableName.equals("state")){ - updateRtpTransaction(transactionRequest, value); - } - matchingTransformers.forEach(transformer -> applyTransformer(transactionRequest, variableName, value, transformer)); + objectMapper.updateValue(transactionRequest, (TransactionRequest) object); transactionRequestRepository.save(transactionRequest); } else if ("BATCH".equalsIgnoreCase(flowType)) { Batch batch = inflightBatchManager.retrieveOrCreateBatch(bpmn, sample); - matchingTransformers.forEach(transformer -> applyTransformer(batch, variableName, value, transformer)); + objectMapper.updateValue(batch, (Batch) object); batchRepository.save(batch); - //if (!config.get().getName().equalsIgnoreCase("bulk_processor")) { - logger.debug("Inside if condition {}", variableName); - if (variableName.equals("filename")) { - logger.debug("store filename {} in tempDocStore for instance {}", strip(value), workflowInstanceKey); - inflightBatchManager.storeBatchFileName(workflowInstanceKey, value); - } - if (variableName.equals("batchId")) { - logger.debug("store batchid {} in tempDocStore for instance {}", strip(value), workflowInstanceKey); - inflightBatchManager.storeBatchId(workflowInstanceKey, value); - } - //} } else if ("OUTBOUND_MESSAGES".equalsIgnoreCase(flowType)) { OutboudMessages outboudMessages = inflightOutboundMessageManager.retrieveOrCreateOutboundMessage(bpmn, sample); - matchingTransformers.forEach(transformer -> applyTransformer(outboudMessages, variableName, value, transformer)); + objectMapper.updateValue(outboudMessages, (OutboudMessages) object); outboundMessagesRepository.save(outboudMessages); } else { logger.error("No matching flow types for the given request"); diff --git a/src/main/java/hu/dpc/phee/operator/streams/StreamsSetup.java b/src/main/java/hu/dpc/phee/operator/streams/StreamsSetup.java index 8e6ba7c1..f8e755cd 100644 --- a/src/main/java/hu/dpc/phee/operator/streams/StreamsSetup.java +++ b/src/main/java/hu/dpc/phee/operator/streams/StreamsSetup.java @@ -1,5 +1,8 @@ package hu.dpc.phee.operator.streams; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import com.jayway.jsonpath.DocumentContext; import hu.dpc.phee.operator.config.AnalyticsConfig; import hu.dpc.phee.operator.config.TransferTransformerConfig; @@ -24,8 +27,7 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Merger; -import org.apache.kafka.streams.kstream.SessionWindows; -import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.TimeWindows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -37,7 +39,6 @@ import javax.sql.DataSource; import java.text.SimpleDateFormat; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.List; @@ -45,8 +46,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.kafka.common.serialization.Serdes.ListSerde; - @Service public class StreamsSetup { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -100,36 +99,43 @@ public class StreamsSetup { @PostConstruct public void setup() { logger.debug("## setting up kafka streams on topic `{}`, aggregating every {} seconds", kafkaTopic, aggregationWindowSeconds); - Aggregator> aggregator = (key, value, aggregate) -> { - aggregate.add(value); - return aggregate; - }; - Merger> merger = (key, first, second) -> Stream.of(first, second) - .flatMap(Collection::stream) - .collect(Collectors.toList()); streamsBuilder.stream(kafkaTopic, Consumed.with(STRING_SERDE, STRING_SERDE)) - .groupByKey() - .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofSeconds(aggregationWindowSeconds), Duration.ZERO)) - .aggregate(ArrayList::new, aggregator, merger, Materialized.with(STRING_SERDE, ListSerde(ArrayList.class, STRING_SERDE))) + .filter((key, value) -> !shouldFilterOut(value)) + .groupBy((key, value) -> extractCompositeKey(value)) + .windowedBy(TimeWindows.of(Duration.ofMillis(300)).grace(Duration.ofMillis(100))) + .aggregate( + JsonArray::new, + (key, value, aggregate) -> { + aggregate.add(JsonParser.parseString(value)); + return aggregate; + }, + Materialized.with(Serdes.String(), new JsonArraySerde()) + ) .toStream() - .foreach(this::process); + .foreach((windowedKey, batch) -> { + String compositeKey = windowedKey.key(); + process(compositeKey, batch); + }); // TODO kafka-ba kell leirni a vegen az entitaslistat, nem DB-be, hogy konzisztens es ujrajatszhato legyen !! } public void process(Object _key, Object _value) { - Windowed key = (Windowed) _key; - List records = (List) _value; + String key = (String) _key; + JsonArray records = (JsonArray) _value; if (records == null || records.size() == 0) { logger.warn("skipping processing, null records for key: {}", key); return; } + logger.debug(key); + logger.debug(String.valueOf(records.size())); + String bpmn; String tenantName; - String first = records.get(0); + String first = String.valueOf(records.get(0)); DocumentContext sample = JsonPathReader.parse(first); try { @@ -154,21 +160,20 @@ public void process(Object _key, Object _value) { String flowType = getTypeForFlow(config); logger.debug("processing key: {}, records: {}", key, records); + Long workflowInstanceKey = sample.read("$.value.processInstanceKey"); + String valueType = sample.read("$.valueType", String.class); + logger.debug("processing {} events", valueType); transactionTemplate.executeWithoutResult(status -> { - for (String record : records) { + for (JsonElement record : records) { try { - DocumentContext recordDocument = JsonPathReader.parse(record); + DocumentContext recordDocument = JsonPathReader.parse(String.valueOf(record)); if (analyticsConfig.enableEventsTimestampsDump.equals("true")) { logToTimestampsTable(recordDocument); } logger.debug("from kafka: {}", recordDocument.jsonString()); - String valueType = recordDocument.read("$.valueType", String.class); - logger.debug("processing {} event", valueType); - Long workflowKey = recordDocument.read("$.value.processDefinitionKey"); - Long workflowInstanceKey = recordDocument.read("$.value.processInstanceKey"); Long timestamp = recordDocument.read("$.timestamp"); String bpmnElementType = recordDocument.read("$.value.bpmnElementType"); String elementId = recordDocument.read("$.value.elementId"); @@ -185,7 +190,7 @@ public void process(Object _key, Object _value) { } case "VARIABLE" -> { - yield recordParser.processVariable(recordDocument, bpmn, workflowInstanceKey, workflowKey, timestamp, flowType, sample); + yield recordParser.processVariables(records, bpmn, workflowInstanceKey, workflowKey, flowType, sample); } case "INCIDENT" -> { @@ -206,6 +211,10 @@ public void process(Object _key, Object _value) { } }); } + + if (valueType.equals("VARIABLE")) { + break; + } } catch (Exception e) { logger.error("failed to parse record: {}", record, e); } @@ -259,4 +268,20 @@ public void logToTimestampsTable(DocumentContext incomingRecord) { logger.debug(e.getMessage().toString() + " Error parsing record"); } } + + private String extractCompositeKey(String value) { + DocumentContext documentContext = JsonPathReader.parse(value); + String workflowInstanceKey = documentContext.read("value.processInstanceKey").toString(); + String recordType = documentContext.read("valueType").toString(); + return workflowInstanceKey + "|" + recordType; + } + + private boolean shouldFilterOut(String value) { + DocumentContext documentContext = JsonPathReader.parse(value); + String valueType = documentContext.read("valueType").toString(); + String intent = documentContext.read("$.intent", String.class); + + // Add the condition to filter out specific value types + return "PROCESS_INSTANCE".equals(valueType) && !("START_EVENT".equals(intent) || "END_EVENT".equals(intent)); // Replace "specificValueType" with the actual value type you want to filter out + } }