From 59cd9362416e6d836ad729db5ada7b8b76e11059 Mon Sep 17 00:00:00 2001 From: Sowmya Ujjappa Banakar <43202851+sowmya695@users.noreply.github.com> Date: Tue, 26 Dec 2023 12:01:47 +0530 Subject: [PATCH] MOSIP-29935 Code changes (#1800) * MOSIP-29935 Code changes Signed-off-by: Sowmya Ujjappa Banakar * MOSIP-29935 intial changes after new approach Signed-off-by: Sowmya Ujjappa Banakar * MOSIP-29935 code fix Signed-off-by: Sowmya Ujjappa Banakar * MOSIP-29935 code fix Signed-off-by: Sowmya Ujjappa Banakar * MOSIp-29935 code fix after testing Signed-off-by: Sowmya Ujjappa Banakar --------- Signed-off-by: Sowmya Ujjappa Banakar Co-authored-by: Sowmya Ujjappa Banakar --- .../util/PlatformSuccessMessages.java | 431 +++++----- .../core/status/util/StatusUtil.java | 2 + .../verticle/ReprocessorVerticle.java | 772 ++++++++++-------- .../verticle/ReprocessorVerticleTest.java | 34 + 4 files changed, 678 insertions(+), 561 deletions(-) diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/exception/util/PlatformSuccessMessages.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/exception/util/PlatformSuccessMessages.java index c18347781b7..51956273957 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/exception/util/PlatformSuccessMessages.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/exception/util/PlatformSuccessMessages.java @@ -1,214 +1,217 @@ -package io.mosip.registration.processor.core.exception.util; - -/** - * - * @author M1048399 Horteppa - * - */ -public enum PlatformSuccessMessages { - - // Packet Receiver Success messages. - - /** The rpr pkr packet receiver. */ - PACKET_RECEIVER_VALIDATION_SUCCESS(PlatformConstants.RPR_PACKET_RECEIVER_MODULE + "001", - "Packet receiver validation success"), - - /** The rpr pkr packet receiver. */ - RPR_PKR_PACKET_RECEIVER(PlatformConstants.RPR_PACKET_RECEIVER_MODULE + "000", - "Packet received and uploaded to landing zone"), - - // securezone notification success messages - /** The rpr sez notification. */ - RPR_SEZ_SECUREZONE_NOTIFICATION(PlatformConstants.RPR_SECUREZONE_NOTIFICATION_MODULE + "000", "Securezone Notification Success"), - - RPR_PUM_PACKET_UPLOADER(PlatformConstants.RPR_PACKET_UPLOADER_MODULE + "000", "Packet uploaded to file system"), - - RPR_PUM_PACKET_ARCHIVED(PlatformConstants.RPR_PACKET_UPLOADER_MODULE + "001", "Packet successfully archived"), - - RPR_PUM_PACKET_UPLOADER_ALREADY_UPLOADED(PlatformConstants.RPR_PACKET_UPLOADER_MODULE + "002", "Packet already present in object store"), - - // Packet Validator Success messages - /** The rpr pkr packet validate. */ - RPR_PKR_PACKET_VALIDATE(PlatformConstants.RPR_PACKET_VALIDATOR_MODULE + "000", "Packet Validation Success"), - /** The reverse data sync success. */ - REVERSE_DATA_SYNC_SUCCESS(PlatformConstants.RPR_PACKET_VALIDATOR_MODULE + "017", "Reverse Data Sync Success"), - // Packet Classifier Success messages - /** The rpr pkr packet classifier. */ - RPR_PKR_PACKET_CLASSIFIER(PlatformConstants.RPR_PACKET_CLASSIFIER_MODULE + "000", "Packet Classifier Success"), - - RPR_PKR_ADDITIONAL_INFO_DELETED(PlatformConstants.RPR_PACKET_VALIDATOR_MODULE + "000", "Deleted additionalInfo from RegistrationList"), - - RPR_PKR_OPERATOR_VALIDATE(PlatformConstants.RPR_OVM_VALIDATOR_MODULE + "000", "OPERATOR Validation Success"), - - RPR_PKR_SUPERVISOR_VALIDATE(PlatformConstants.RPR_SVM_VALIDATOR_MODULE + "000", "SUPERVISOR Validation Success"), - - RPR_PKR_INTRODUCER_VALIDATE(PlatformConstants.RPR_IVM_VALIDATOR_MODULE + "000", "INTRODUCER Validation Success"), - - // CMD validator Success messages - RPR_PKR_CMD_VALIDATE(PlatformConstants.RPR_CMD_VALIDATOR_MODULE + "000", "CMD Validation Success"), - - // DEMO-De-dupe Success Messages - /** The rpr pkr demode-dupe validate. */ - RPR_PKR_DEMO_DE_DUP(PlatformConstants.RPR_DEMO_DEDUPE_MODULE + "000", "Demo-de-dupe Success"), - - RPR_PKR_DEMO_DE_DUP_POTENTIAL_DUPLICATION_FOUND(PlatformConstants.RPR_DEMO_DEDUPE_MODULE + "001", - "Potential duplicate packet found for registration id : "), - - RPR_PKR_DEMO_DE_DUP_SKIP(PlatformConstants.RPR_DEMO_DEDUPE_MODULE + "002", "Demographic Deduplication Skipped"), - - // Biometric Authentication Success Messages - RPR_PKR_BIOMETRIC_AUTHENTICATION(PlatformConstants.RPR_BIOMETRIC_AUTHENTICATION_MODULE + "000", - "Biometric Authentication Success"), - - - - // Bio-De-dupe Success messages - /** The Constant PACKET_BIODEDUPE_SUCCESS. */ - RPR_BIO_DEDUPE_SUCCESS(PlatformConstants.RPR_BIO_DEDUPE_STAGE_MODULE + "000", "Packet biodedupe successful"), - - /** The Constant PACKET_BIOMETRIC_POTENTIAL_MATCH. */ - RPR_BIO_METRIC_POTENTIAL_MATCH(PlatformConstants.RPR_BIO_DEDUPE_STAGE_MODULE + "000", - "Potential match found while processing bio dedupe"), - - RPR_BIO_LOST_PACKET_UNIQUE_MATCH_FOUND(PlatformConstants.RPR_BIO_DEDUPE_STAGE_MODULE + "001", - "Unique Match was Found for the Biometrics Received"), - - RPR_RE_PROCESS_SUCCESS(PlatformConstants.RPR_REPROCESSOR_VERTICLE + "000", "Reprocessor Success"), - - RPR_RE_PROCESS_FAILED(PlatformConstants.RPR_REPROCESSOR_VERTICLE + "002", "Reprocessor FAILED"), - - RPR_SENT_TO_REPROCESS_SUCCESS(PlatformConstants.RPR_REPROCESSOR_VERTICLE + "001", "sent to reprocess Success"), - - RPR_WORKFLOW_INTERNAL_ACTION_SUCCESS(PlatformConstants.RPR_WORKFLOW_INTERNAL_ACTION + "000", - "Workflow internal action completed successfully"), - - RPR_WORKFLOW_ACTION_SERVICE_SUCCESS(PlatformConstants.RPR_WORKFLOW_ACTION_SERVICE + "000", - "Processed the workflow action - %s"), - - RPR_WORKFLOW_ACTION_API_SUCCESS(PlatformConstants.RPR_WORKFLOW_ACTION_API + "000", - "Process the workflow action success"), - RPR_WORKFLOW_SEARCH_API_SUCCESS(PlatformConstants.RPR_WORKFLOW_SEARCH_API + "000", - "Process the workflow search success"), - RPR_WORKFLOW_ACTION_JOB_SUCCESS(PlatformConstants.RPR_WORKFLOW_ACTION_JOB + "000", "Workflow action job success"), - - RPR_EXTERNAL_STAGE_SUCCESS(PlatformConstants.RPR_EXTERNAL_STAGE + "000", "External stage Success"), - - RPR_UIN_GENERATOR_STAGE_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "000", "UIN Generator Success"), - - RPR_BIOMETRIC_EXTRACTION_SUCCESS(PlatformConstants.RPR_BIOMETRIC_EXTRACTION_STAGE + "000", - "biometric extraction success"), - - RPR_FINALIZATION_SUCCESS(PlatformConstants.RPR_FINALIZATION_STAGE + "000", - "Finalization success"), - - RPR_UIN_DATA_UPDATION_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "001", "UIN Generator Success"), - - RPR_UIN_ACTIVATED_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "002", "UIN Generator Success"), - - RPR_UIN_DEACTIVATION_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "003", "UIN Generator Success"), - - RPR_LINK_RID_FOR_LOST_PACKET_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "004", "UIN Generator Success"), - - RPR_QUALITY_CHECK_SUCCESS(PlatformConstants.RPR_QUALITY_CHECKER_MODULE + "000", "Quality check Success"), - - RPR_PRINT_STAGE_REQUEST_SUCCESS(PlatformConstants.RPR_PRINTING_MODULE + "000", - "Print request submitted"), - - RPR_PRINT_STAGE_SUCCESS(PlatformConstants.RPR_PRINTING_MODULE + "001", "Print and Post Completed"), - - RPR_MESSAGE_SENDER_STAGE_SUCCESS(PlatformConstants.RPR_MESSAGE_SENDER_TEMPLATE + "001", - "Message Sender Stage success"), - - RPR_ABIS_HANDLER_STAGE_SUCCESS(PlatformConstants.RPR_ABIS_HANDLER + "000", "ABIS hanlder stage success"), - - RPR_ABIS_MIDDLEWARE_STAGE_SUCCESS(PlatformConstants.RPR_ABIS_MIDDLEWARE + "000", - "Abis insertRequests sucessfully sent to Queue"), - - RPR_MANUAL_VERIFICATION_APPROVED(PlatformConstants.RPR_MANUAL_ADJUDICATION_MODULE + "000", - "Manual verification approved"), - - RPR_MANUAL_VERIFICATION_RESEND(PlatformConstants.RPR_MANUAL_ADJUDICATION_MODULE + "002", - "Manual verification resend"), - - RPR_MANUAL_VERIFICATION_SENT(PlatformConstants.RPR_MANUAL_ADJUDICATION_MODULE + "001", - "Manual verification Sent to queue"), - - RPR_VERIFICATION_SENT(PlatformConstants.RPR_VERIFICATION_MODULE + "000", - "Sent for Verification"), - - RPR_VERIFICATION_SUCCESS(PlatformConstants.RPR_VERIFICATION_MODULE + "001", - "Verification successful"), - - RPR_DECRYPTION_SUCCESS(PlatformConstants.RPR_PACKET_DECRYPTION_MODULE + "000", "Decryption success"), - - RPR_ENCRYPTION_SUCCESS(PlatformConstants.RPR_PACKET_DECRYPTION_MODULE + "000", "Encryption success"), - - RPR_PRINT_SERVICE_SUCCESS(PlatformConstants.RPR_PRINTING_MODULE + "002", "Pdf generated and sent to print stage"), - - RPR_SYNC_REGISTRATION_SERVICE_SUCCESS(PlatformConstants.RPR_REGISTRATION_STATUS_MODULE + "000", "SYNC successfull"), - - RPR_REQUEST_HANDLER_LOST_PACKET_SUCCESS(PlatformConstants.RPR_PACKET_REQUEST_HANDLER_MODULE + "000", - "Lost packet id value fetched successfully"), - - PACKET_MARK_AS_PAUSED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "000", - "Packet paused because of pause settings match"), - PACKET_COMPLETE_AS_PROCESSED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "001", - "Packet processing completed with processed status"), - PACKET_COMPLETE_AS_REJECTED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "002", - "Packet processing completed with reject status"), - PACKET_COMPLETE_AS_FAILED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "003", - "Packet processing completed with failed status"), - PACKET_MARK_AS_REPROCESS(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "004", "Packet marked for reprocessing"), - PACKET_PROCESSING_COMPLETED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "009", - "Packet processing completed with action code : "), - - PAUSE_AND_REQUEST_ADDITIONAL_INFO(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "005", - "packet paused and request additional info"), - - PACKET_RESTART_PARENT_FLOW(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "006", - "Packet parent flow restart initiated"), - - PACKET_COMPLETE_AS_REJECTED_WITHOUT_PARENT_FLOW(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "007", - "Packet processing completed with reject status without Parent flow"), - - PACKET_ANONYMOUS_PROFILE(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "008", - "Packet anonymous profile flow initiated"); - - /** The success message. */ - private final String successMessage; - - /** The success code. */ - private final String successCode; - - /** - * Instantiates a new platform success messages. - * - * @param errorCode - * the error code - * @param errorMsg - * the error msg - */ - private PlatformSuccessMessages(String errorCode, String errorMsg) { - this.successCode = errorCode; - this.successMessage = errorMsg; - } - - /** - * Gets the message. - * - * @return the message - */ - public String getMessage() { - return this.successMessage; - } - - /** - * Gets the code. - * - * @return the code - */ - public String getCode() { - return this.successCode; - } - -} +package io.mosip.registration.processor.core.exception.util; + +/** + * + * @author M1048399 Horteppa + * + */ +public enum PlatformSuccessMessages { + + // Packet Receiver Success messages. + + /** The rpr pkr packet receiver. */ + PACKET_RECEIVER_VALIDATION_SUCCESS(PlatformConstants.RPR_PACKET_RECEIVER_MODULE + "001", + "Packet receiver validation success"), + + /** The rpr pkr packet receiver. */ + RPR_PKR_PACKET_RECEIVER(PlatformConstants.RPR_PACKET_RECEIVER_MODULE + "000", + "Packet received and uploaded to landing zone"), + + // securezone notification success messages + /** The rpr sez notification. */ + RPR_SEZ_SECUREZONE_NOTIFICATION(PlatformConstants.RPR_SECUREZONE_NOTIFICATION_MODULE + "000", "Securezone Notification Success"), + + RPR_PUM_PACKET_UPLOADER(PlatformConstants.RPR_PACKET_UPLOADER_MODULE + "000", "Packet uploaded to file system"), + + RPR_PUM_PACKET_ARCHIVED(PlatformConstants.RPR_PACKET_UPLOADER_MODULE + "001", "Packet successfully archived"), + + RPR_PUM_PACKET_UPLOADER_ALREADY_UPLOADED(PlatformConstants.RPR_PACKET_UPLOADER_MODULE + "002", "Packet already present in object store"), + + // Packet Validator Success messages + /** The rpr pkr packet validate. */ + RPR_PKR_PACKET_VALIDATE(PlatformConstants.RPR_PACKET_VALIDATOR_MODULE + "000", "Packet Validation Success"), + /** The reverse data sync success. */ + REVERSE_DATA_SYNC_SUCCESS(PlatformConstants.RPR_PACKET_VALIDATOR_MODULE + "017", "Reverse Data Sync Success"), + // Packet Classifier Success messages + /** The rpr pkr packet classifier. */ + RPR_PKR_PACKET_CLASSIFIER(PlatformConstants.RPR_PACKET_CLASSIFIER_MODULE + "000", "Packet Classifier Success"), + + RPR_PKR_ADDITIONAL_INFO_DELETED(PlatformConstants.RPR_PACKET_VALIDATOR_MODULE + "000", "Deleted additionalInfo from RegistrationList"), + + RPR_PKR_OPERATOR_VALIDATE(PlatformConstants.RPR_OVM_VALIDATOR_MODULE + "000", "OPERATOR Validation Success"), + + RPR_PKR_SUPERVISOR_VALIDATE(PlatformConstants.RPR_SVM_VALIDATOR_MODULE + "000", "SUPERVISOR Validation Success"), + + RPR_PKR_INTRODUCER_VALIDATE(PlatformConstants.RPR_IVM_VALIDATOR_MODULE + "000", "INTRODUCER Validation Success"), + + // CMD validator Success messages + RPR_PKR_CMD_VALIDATE(PlatformConstants.RPR_CMD_VALIDATOR_MODULE + "000", "CMD Validation Success"), + + // DEMO-De-dupe Success Messages + /** The rpr pkr demode-dupe validate. */ + RPR_PKR_DEMO_DE_DUP(PlatformConstants.RPR_DEMO_DEDUPE_MODULE + "000", "Demo-de-dupe Success"), + + RPR_PKR_DEMO_DE_DUP_POTENTIAL_DUPLICATION_FOUND(PlatformConstants.RPR_DEMO_DEDUPE_MODULE + "001", + "Potential duplicate packet found for registration id : "), + + RPR_PKR_DEMO_DE_DUP_SKIP(PlatformConstants.RPR_DEMO_DEDUPE_MODULE + "002", "Demographic Deduplication Skipped"), + + // Biometric Authentication Success Messages + RPR_PKR_BIOMETRIC_AUTHENTICATION(PlatformConstants.RPR_BIOMETRIC_AUTHENTICATION_MODULE + "000", + "Biometric Authentication Success"), + + + + // Bio-De-dupe Success messages + /** The Constant PACKET_BIODEDUPE_SUCCESS. */ + RPR_BIO_DEDUPE_SUCCESS(PlatformConstants.RPR_BIO_DEDUPE_STAGE_MODULE + "000", "Packet biodedupe successful"), + + /** The Constant PACKET_BIOMETRIC_POTENTIAL_MATCH. */ + RPR_BIO_METRIC_POTENTIAL_MATCH(PlatformConstants.RPR_BIO_DEDUPE_STAGE_MODULE + "000", + "Potential match found while processing bio dedupe"), + + RPR_BIO_LOST_PACKET_UNIQUE_MATCH_FOUND(PlatformConstants.RPR_BIO_DEDUPE_STAGE_MODULE + "001", + "Unique Match was Found for the Biometrics Received"), + + RPR_RE_PROCESS_SUCCESS(PlatformConstants.RPR_REPROCESSOR_VERTICLE + "000", "Reprocessor Success"), + + RPR_RE_PROCESS_FAILED(PlatformConstants.RPR_REPROCESSOR_VERTICLE + "002", "Reprocessor FAILED"), + + RPR_SENT_TO_REPROCESS_SUCCESS(PlatformConstants.RPR_REPROCESSOR_VERTICLE + "001", "sent to reprocess Success"), + + RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS(PlatformConstants.RPR_REPROCESSOR_VERTICLE + "002", + "sent to reprocess restart from stage Success"), + + RPR_WORKFLOW_INTERNAL_ACTION_SUCCESS(PlatformConstants.RPR_WORKFLOW_INTERNAL_ACTION + "000", + "Workflow internal action completed successfully"), + + RPR_WORKFLOW_ACTION_SERVICE_SUCCESS(PlatformConstants.RPR_WORKFLOW_ACTION_SERVICE + "000", + "Processed the workflow action - %s"), + + RPR_WORKFLOW_ACTION_API_SUCCESS(PlatformConstants.RPR_WORKFLOW_ACTION_API + "000", + "Process the workflow action success"), + RPR_WORKFLOW_SEARCH_API_SUCCESS(PlatformConstants.RPR_WORKFLOW_SEARCH_API + "000", + "Process the workflow search success"), + RPR_WORKFLOW_ACTION_JOB_SUCCESS(PlatformConstants.RPR_WORKFLOW_ACTION_JOB + "000", "Workflow action job success"), + + RPR_EXTERNAL_STAGE_SUCCESS(PlatformConstants.RPR_EXTERNAL_STAGE + "000", "External stage Success"), + + RPR_UIN_GENERATOR_STAGE_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "000", "UIN Generator Success"), + + RPR_BIOMETRIC_EXTRACTION_SUCCESS(PlatformConstants.RPR_BIOMETRIC_EXTRACTION_STAGE + "000", + "biometric extraction success"), + + RPR_FINALIZATION_SUCCESS(PlatformConstants.RPR_FINALIZATION_STAGE + "000", + "Finalization success"), + + RPR_UIN_DATA_UPDATION_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "001", "UIN Generator Success"), + + RPR_UIN_ACTIVATED_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "002", "UIN Generator Success"), + + RPR_UIN_DEACTIVATION_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "003", "UIN Generator Success"), + + RPR_LINK_RID_FOR_LOST_PACKET_SUCCESS(PlatformConstants.RPR_UIN_GENERATOR_STAGE + "004", "UIN Generator Success"), + + RPR_QUALITY_CHECK_SUCCESS(PlatformConstants.RPR_QUALITY_CHECKER_MODULE + "000", "Quality check Success"), + + RPR_PRINT_STAGE_REQUEST_SUCCESS(PlatformConstants.RPR_PRINTING_MODULE + "000", + "Print request submitted"), + + RPR_PRINT_STAGE_SUCCESS(PlatformConstants.RPR_PRINTING_MODULE + "001", "Print and Post Completed"), + + RPR_MESSAGE_SENDER_STAGE_SUCCESS(PlatformConstants.RPR_MESSAGE_SENDER_TEMPLATE + "001", + "Message Sender Stage success"), + + RPR_ABIS_HANDLER_STAGE_SUCCESS(PlatformConstants.RPR_ABIS_HANDLER + "000", "ABIS hanlder stage success"), + + RPR_ABIS_MIDDLEWARE_STAGE_SUCCESS(PlatformConstants.RPR_ABIS_MIDDLEWARE + "000", + "Abis insertRequests sucessfully sent to Queue"), + + RPR_MANUAL_VERIFICATION_APPROVED(PlatformConstants.RPR_MANUAL_ADJUDICATION_MODULE + "000", + "Manual verification approved"), + + RPR_MANUAL_VERIFICATION_RESEND(PlatformConstants.RPR_MANUAL_ADJUDICATION_MODULE + "002", + "Manual verification resend"), + + RPR_MANUAL_VERIFICATION_SENT(PlatformConstants.RPR_MANUAL_ADJUDICATION_MODULE + "001", + "Manual verification Sent to queue"), + + RPR_VERIFICATION_SENT(PlatformConstants.RPR_VERIFICATION_MODULE + "000", + "Sent for Verification"), + + RPR_VERIFICATION_SUCCESS(PlatformConstants.RPR_VERIFICATION_MODULE + "001", + "Verification successful"), + + RPR_DECRYPTION_SUCCESS(PlatformConstants.RPR_PACKET_DECRYPTION_MODULE + "000", "Decryption success"), + + RPR_ENCRYPTION_SUCCESS(PlatformConstants.RPR_PACKET_DECRYPTION_MODULE + "000", "Encryption success"), + + RPR_PRINT_SERVICE_SUCCESS(PlatformConstants.RPR_PRINTING_MODULE + "002", "Pdf generated and sent to print stage"), + + RPR_SYNC_REGISTRATION_SERVICE_SUCCESS(PlatformConstants.RPR_REGISTRATION_STATUS_MODULE + "000", "SYNC successfull"), + + RPR_REQUEST_HANDLER_LOST_PACKET_SUCCESS(PlatformConstants.RPR_PACKET_REQUEST_HANDLER_MODULE + "000", + "Lost packet id value fetched successfully"), + + PACKET_MARK_AS_PAUSED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "000", + "Packet paused because of pause settings match"), + PACKET_COMPLETE_AS_PROCESSED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "001", + "Packet processing completed with processed status"), + PACKET_COMPLETE_AS_REJECTED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "002", + "Packet processing completed with reject status"), + PACKET_COMPLETE_AS_FAILED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "003", + "Packet processing completed with failed status"), + PACKET_MARK_AS_REPROCESS(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "004", "Packet marked for reprocessing"), + PACKET_PROCESSING_COMPLETED(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "009", + "Packet processing completed with action code : "), + + PAUSE_AND_REQUEST_ADDITIONAL_INFO(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "005", + "packet paused and request additional info"), + + PACKET_RESTART_PARENT_FLOW(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "006", + "Packet parent flow restart initiated"), + + PACKET_COMPLETE_AS_REJECTED_WITHOUT_PARENT_FLOW(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "007", + "Packet processing completed with reject status without Parent flow"), + + PACKET_ANONYMOUS_PROFILE(PlatformConstants.RPR_CAMEL_BRIDGE_MODULE + "008", + "Packet anonymous profile flow initiated"); + + /** The success message. */ + private final String successMessage; + + /** The success code. */ + private final String successCode; + + /** + * Instantiates a new platform success messages. + * + * @param errorCode + * the error code + * @param errorMsg + * the error msg + */ + private PlatformSuccessMessages(String errorCode, String errorMsg) { + this.successCode = errorCode; + this.successMessage = errorMsg; + } + + /** + * Gets the message. + * + * @return the message + */ + public String getMessage() { + return this.successMessage; + } + + /** + * Gets the code. + * + * @return the code + */ + public String getCode() { + return this.successCode; + } + +} diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/status/util/StatusUtil.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/status/util/StatusUtil.java index 8b860e167c3..7d3910b450f 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/status/util/StatusUtil.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/status/util/StatusUtil.java @@ -314,6 +314,8 @@ public enum StatusUtil { RE_PROCESS_FAILED(StatusConstants.RE_PROCESS_MODULE_FAILED + "001", "Reprocess count has exceeded the configured attempts"), RE_PROCESS_COMPLETED(StatusConstants.RE_PROCESS_MODULE_SUCCESS + "001", "Reprocess Completed"), + RE_PROCESS_RESTART_FROM_STAGE(StatusConstants.RE_PROCESS_MODULE_SUCCESS + "002", + "Reprocess restart from stage Completed"), // Message sender stage NOTIFICATION_SUCESSFUL(StatusConstants.MESSAGE_SENDER_NOTIF_SUCCESS_CODE + "001", "Notification Sent Successfully"), diff --git a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java index 8233e09837d..bebd730a69a 100644 --- a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java +++ b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java @@ -1,347 +1,425 @@ -package io.mosip.registration.processor.reprocessor.verticle; - -import java.util.ArrayList; -import java.util.List; - -import io.mosip.registration.processor.reprocessor.constants.ReprocessorConstants; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.env.Environment; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import io.mosip.kernel.core.logger.spi.Logger; -import io.mosip.registration.processor.core.abstractverticle.MessageBusAddress; -import io.mosip.registration.processor.core.abstractverticle.MessageDTO; -import io.mosip.registration.processor.core.abstractverticle.MosipEventBus; -import io.mosip.registration.processor.core.abstractverticle.MosipRouter; -import io.mosip.registration.processor.core.abstractverticle.MosipVerticleAPIManager; -import io.mosip.registration.processor.core.code.EventId; -import io.mosip.registration.processor.core.code.EventName; -import io.mosip.registration.processor.core.code.EventType; -import io.mosip.registration.processor.core.code.ModuleName; -import io.mosip.registration.processor.core.code.RegistrationTransactionStatusCode; -import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode; -import io.mosip.registration.processor.core.constant.LoggerFileConstant; -import io.mosip.registration.processor.core.exception.util.PlatformErrorMessages; -import io.mosip.registration.processor.core.exception.util.PlatformSuccessMessages; -import io.mosip.registration.processor.core.logger.LogDescription; -import io.mosip.registration.processor.core.logger.RegProcessorLogger; -import io.mosip.registration.processor.core.status.util.StatusUtil; -import io.mosip.registration.processor.core.util.MessageBusUtil; -import io.mosip.registration.processor.rest.client.audit.builder.AuditLogRequestBuilder; -import io.mosip.registration.processor.status.code.RegistrationStatusCode; -import io.mosip.registration.processor.status.dto.InternalRegistrationStatusDto; -import io.mosip.registration.processor.status.dto.RegistrationStatusDto; -import io.mosip.registration.processor.status.exception.TablenotAccessibleException; -import io.mosip.registration.processor.status.service.RegistrationStatusService; -import io.vertx.core.AsyncResult; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.json.JsonObject; - -/** - * The Reprocessor Verticle to deploy the scheduler and implement re-processing - * logic - * - * @author Alok Ranjan - * @author Sowmya - * @author Pranav Kumar - * - * @since 0.10.0 - * - */ -@Component -public class ReprocessorVerticle extends MosipVerticleAPIManager { - - private static final String VERTICLE_PROPERTY_PREFIX = "mosip.regproc.reprocessor."; - - private static Logger regProcLogger = RegProcessorLogger.getLogger(ReprocessorVerticle.class); - - /** The cluster manager url. */ - @Value("${vertx.cluster.configuration}") - private String clusterManagerUrl; - - /** The environment. */ - @Autowired - Environment environment; - - /** The mosip event bus. */ - MosipEventBus mosipEventBus = null; - - /** The fetch size. */ - @Value("${registration.processor.reprocess.fetchsize}") - private Integer fetchSize; - - /** The elapse time. */ - @Value("${registration.processor.reprocess.elapse.time}") - private long elapseTime; - - /** The reprocess count. */ - @Value("${registration.processor.reprocess.attempt.count}") - private Integer reprocessCount; - - /** Comman seperated stage names that should be excluded while reprocessing. */ - @Value("#{T(java.util.Arrays).asList('${mosip.registration.processor.reprocessor.exclude-stage-names:PacketReceiverStage}')}") - private List reprocessExcludeStageNames; - - /** The is transaction successful. */ - boolean isTransactionSuccessful; - - /** The registration status service. */ - @Autowired - RegistrationStatusService registrationStatusService; - - /** The core audit request builder. */ - @Autowired - AuditLogRequestBuilder auditLogRequestBuilder; - - /** Mosip router for APIs */ - @Autowired - MosipRouter router; - - /** The port. */ - @Value("${server.port}") - private String port; - - /** - * Deploy verticle. - */ - public void deployVerticle() { - mosipEventBus = this.getEventBus(this, clusterManagerUrl); - deployScheduler(getVertx()); - - } - - /** - * This method deploys the chime scheduler - * - * @param vertx - * the vertx - */ - private void deployScheduler(Vertx vertx) { - vertx.deployVerticle(ReprocessorConstants.CEYLON_SCHEDULER, this::schedulerResult); - } - - public void schedulerResult(AsyncResult res) { - if (res.succeeded()) { - regProcLogger.info("ReprocessorVerticle::schedular()::deployed"); - cronScheduling(vertx); - } else { - regProcLogger.error("ReprocessorVerticle::schedular()::deployment failure " + res.cause().getMessage()); - } - } - - /** - * This method does the cron scheduling by fetchin cron expression from config - * server - * - * @param vertx - * the vertx - */ - private void cronScheduling(Vertx vertx) { - - EventBus eventBus = vertx.eventBus(); - // listen the timer events - eventBus.consumer((ReprocessorConstants.TIMER_EVENT), message -> { - - process(new MessageDTO()); - }); - - // description of timers - JsonObject timer = (new JsonObject()) - .put(ReprocessorConstants.TYPE, environment.getProperty(ReprocessorConstants.TYPE_VALUE)) - .put(ReprocessorConstants.SECONDS, environment.getProperty(ReprocessorConstants.SECONDS_VALUE)) - .put(ReprocessorConstants.MINUTES, environment.getProperty(ReprocessorConstants.MINUTES_VALUE)) - .put(ReprocessorConstants.HOURS, environment.getProperty(ReprocessorConstants.HOURS_VALUE)) - .put(ReprocessorConstants.DAY_OF_MONTH, - environment.getProperty(ReprocessorConstants.DAY_OF_MONTH_VALUE)) - .put(ReprocessorConstants.MONTHS, environment.getProperty(ReprocessorConstants.MONTHS_VALUE)) - .put(ReprocessorConstants.DAYS_OF_WEEK, - environment.getProperty(ReprocessorConstants.DAYS_OF_WEEK_VALUE)); - - // create scheduler - eventBus.send(ReprocessorConstants.CHIME, - (new JsonObject()).put(ReprocessorConstants.OPERATION, ReprocessorConstants.OPERATION_VALUE) - .put(ReprocessorConstants.NAME, ReprocessorConstants.NAME_VALUE) - .put(ReprocessorConstants.DESCRIPTION, timer), - ar -> { - if (ar.succeeded()) { - regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), - LoggerFileConstant.REGISTRATIONID.toString(), "", - "ReprocessorVerticle::schedular()::started"); - } else { - regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), - LoggerFileConstant.REGISTRATIONID.toString(), "", - "ReprocessorVerticle::schedular()::failed " + ar.cause()); - vertx.close(); - } - }); - - } - - /** - * Send message. - * - * @param message - * the message - * @param toAddress - * the to address - */ - public void sendMessage(MessageDTO message, MessageBusAddress toAddress) { - this.send(this.mosipEventBus, toAddress, message); - } - - @Override - public void start() { - router.setRoute(this.postUrl(getVertx(), null, null)); - this.createServer(router.getRouter(), Integer.parseInt(port)); - } - - /* - * (non-Javadoc) - * - * @see - * io.mosip.registration.processor.core.spi.eventbus.EventBusManager#process( - * java.lang.Object) - */ - @Override - public MessageDTO process(MessageDTO object) { - List reprocessorDtoList = null; - LogDescription description = new LogDescription(); - List statusList = new ArrayList<>(); - statusList.add(RegistrationTransactionStatusCode.SUCCESS.toString()); - statusList.add(RegistrationTransactionStatusCode.REPROCESS.toString()); - statusList.add(RegistrationTransactionStatusCode.IN_PROGRESS.toString()); - regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", - "ReprocessorVerticle::process()::entry"); - StringBuffer ridSb=new StringBuffer(); - try { - reprocessorDtoList = registrationStatusService.getResumablePackets(fetchSize); - if (!CollectionUtils.isEmpty(reprocessorDtoList)) { - if (reprocessorDtoList.size() < fetchSize) { - List reprocessorPacketList = registrationStatusService.getUnProcessedPackets(fetchSize - reprocessorDtoList.size(), elapseTime, - reprocessCount, statusList, reprocessExcludeStageNames); - if (!CollectionUtils.isEmpty(reprocessorPacketList)) { - reprocessorDtoList.addAll(reprocessorPacketList); - } - } - } else { - reprocessorDtoList = registrationStatusService.getUnProcessedPackets(fetchSize, elapseTime, - reprocessCount, statusList, reprocessExcludeStageNames); - } - - - if (!CollectionUtils.isEmpty(reprocessorDtoList)) { - reprocessorDtoList.forEach(dto -> { - String registrationId = dto.getRegistrationId(); - ridSb.append(registrationId); - ridSb.append(","); - MessageDTO messageDTO = new MessageDTO(); - messageDTO.setRid(registrationId); - messageDTO.setReg_type(dto.getRegistrationType()); - messageDTO.setSource(dto.getSource()); - messageDTO.setIteration(dto.getIteration()); - messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId()); - if (reprocessCount.equals(dto.getReProcessRetryCount())) { - dto.setLatestTransactionStatusCode( - RegistrationTransactionStatusCode.REPROCESS_FAILED.toString()); - dto.setLatestTransactionTypeCode( - RegistrationTransactionTypeCode.PACKET_REPROCESS.toString()); - dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage()); - dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString()); - dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode()); - messageDTO.setIsValid(false); - description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage()); - description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode()); - - } else { - messageDTO.setIsValid(true); - isTransactionSuccessful = true; - String stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName()); - if (RegistrationTransactionStatusCode.SUCCESS.name() - .equalsIgnoreCase(dto.getLatestTransactionStatusCode())) { - stageName = stageName.concat(ReprocessorConstants.BUS_OUT); - } else { - stageName = stageName.concat(ReprocessorConstants.BUS_IN); - } - MessageBusAddress address = new MessageBusAddress(stageName); - sendMessage(messageDTO, address); - dto.setUpdatedBy(ReprocessorConstants.USER); - Integer reprocessRetryCount = dto.getReProcessRetryCount() != null - ? dto.getReProcessRetryCount() + 1 - : 1; - dto.setReProcessRetryCount(reprocessRetryCount); - dto.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.REPROCESS.toString()); - dto.setLatestTransactionTypeCode( - RegistrationTransactionTypeCode.PACKET_REPROCESS.toString()); - dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage()); - dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode()); - description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage()); - description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()); - } - regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), - LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage()); - - /** Module-Id can be Both Success/Error code */ - String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); - String moduleName = ModuleName.RE_PROCESSOR.toString(); - registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); - String eventId = EventId.RPR_402.toString(); - String eventName = EventName.UPDATE.toString(); - String eventType = EventType.BUSINESS.toString(); - - if (!isTransactionSuccessful) - auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, - eventType, moduleId, moduleName, registrationId); - }); - - } - } catch (TablenotAccessibleException e) { - isTransactionSuccessful = false; - object.setInternalError(Boolean.TRUE); - description.setMessage(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage()); - description.setCode(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getCode()); - regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), - description.getCode() + " -- ", - PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e.toString()); - - }catch (Exception ex) { - isTransactionSuccessful = false; - description.setMessage(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage()); - description.setCode(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getCode()); - regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), - description.getCode() + " -- ", - PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage() + ex.getMessage() - + ExceptionUtils.getStackTrace(ex)); - object.setInternalError(Boolean.TRUE); - - } finally { - regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), - null, description.getMessage()); - if (isTransactionSuccessful) - description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage()); - - String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString(); - String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString(); - String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString(); - - /** Module-Id can be Both Success/Error code */ - String moduleId = isTransactionSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() - : description.getCode(); - String moduleName = ModuleName.RE_PROCESSOR.toString(); - auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType, - moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):"")); - } - - return object; - } - - - - @Override - protected String getPropertyPrefix() { - return VERTICLE_PROPERTY_PREFIX; - } -} +package io.mosip.registration.processor.reprocessor.verticle; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import io.mosip.kernel.core.logger.spi.Logger; +import io.mosip.registration.processor.core.abstractverticle.MessageBusAddress; +import io.mosip.registration.processor.core.abstractverticle.MessageDTO; +import io.mosip.registration.processor.core.abstractverticle.MosipEventBus; +import io.mosip.registration.processor.core.abstractverticle.MosipRouter; +import io.mosip.registration.processor.core.abstractverticle.MosipVerticleAPIManager; +import io.mosip.registration.processor.core.code.EventId; +import io.mosip.registration.processor.core.code.EventName; +import io.mosip.registration.processor.core.code.EventType; +import io.mosip.registration.processor.core.code.ModuleName; +import io.mosip.registration.processor.core.code.RegistrationTransactionStatusCode; +import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode; +import io.mosip.registration.processor.core.constant.LoggerFileConstant; +import io.mosip.registration.processor.core.exception.util.PlatformErrorMessages; +import io.mosip.registration.processor.core.exception.util.PlatformSuccessMessages; +import io.mosip.registration.processor.core.logger.LogDescription; +import io.mosip.registration.processor.core.logger.RegProcessorLogger; +import io.mosip.registration.processor.core.status.util.StatusUtil; +import io.mosip.registration.processor.core.util.MessageBusUtil; +import io.mosip.registration.processor.reprocessor.constants.ReprocessorConstants; +import io.mosip.registration.processor.rest.client.audit.builder.AuditLogRequestBuilder; +import io.mosip.registration.processor.status.code.RegistrationStatusCode; +import io.mosip.registration.processor.status.dto.InternalRegistrationStatusDto; +import io.mosip.registration.processor.status.dto.RegistrationStatusDto; +import io.mosip.registration.processor.status.exception.TablenotAccessibleException; +import io.mosip.registration.processor.status.service.RegistrationStatusService; +import io.vertx.core.AsyncResult; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.json.JsonObject; + +/** + * The Reprocessor Verticle to deploy the scheduler and implement re-processing + * logic + * + * @author Alok Ranjan + * @author Sowmya + * @author Pranav Kumar + * + * @since 0.10.0 + * + */ +@Component +public class ReprocessorVerticle extends MosipVerticleAPIManager { + + private static final String VERTICLE_PROPERTY_PREFIX = "mosip.regproc.reprocessor."; + + private static Logger regProcLogger = RegProcessorLogger.getLogger(ReprocessorVerticle.class); + + /** The cluster manager url. */ + @Value("${vertx.cluster.configuration}") + private String clusterManagerUrl; + + /** The environment. */ + @Autowired + Environment environment; + + /** The mosip event bus. */ + MosipEventBus mosipEventBus = null; + + /** The fetch size. */ + @Value("${registration.processor.reprocess.fetchsize}") + private Integer fetchSize; + + /** The elapse time. */ + @Value("${registration.processor.reprocess.elapse.time}") + private long elapseTime; + + /** The reprocess count. */ + @Value("${registration.processor.reprocess.attempt.count}") + private Integer reprocessCount; + + /** Comman seperated stage names that should be excluded while reprocessing. */ + @Value("#{T(java.util.Arrays).asList('${mosip.registration.processor.reprocessor.exclude-stage-names:PacketReceiverStage}')}") + private List reprocessExcludeStageNames; + + @Value("${registration.processor.reprocess.restart-from-stage}") + private String reprocessRestartFromStage; + + @Value("#{'${registration.processor.reprocess.restart-trigger-filter}'.split(',')}") + private List reprocessRestartTriggerFilter; + + /** The is transaction successful. */ + boolean isTransactionSuccessful; + + /** The registration status service. */ + @Autowired + RegistrationStatusService registrationStatusService; + + /** The core audit request builder. */ + @Autowired + AuditLogRequestBuilder auditLogRequestBuilder; + + /** Mosip router for APIs */ + @Autowired + MosipRouter router; + + /** The port. */ + @Value("${server.port}") + private String port; + + /** + * Deploy verticle. + */ + public void deployVerticle() { + mosipEventBus = this.getEventBus(this, clusterManagerUrl); + deployScheduler(getVertx()); + + } + + /** + * This method deploys the chime scheduler + * + * @param vertx + * the vertx + */ + private void deployScheduler(Vertx vertx) { + vertx.deployVerticle(ReprocessorConstants.CEYLON_SCHEDULER, this::schedulerResult); + } + + public void schedulerResult(AsyncResult res) { + if (res.succeeded()) { + regProcLogger.info("ReprocessorVerticle::schedular()::deployed"); + cronScheduling(vertx); + } else { + regProcLogger.error("ReprocessorVerticle::schedular()::deployment failure " + res.cause().getMessage()); + } + } + + /** + * This method does the cron scheduling by fetchin cron expression from config + * server + * + * @param vertx + * the vertx + */ + private void cronScheduling(Vertx vertx) { + + EventBus eventBus = vertx.eventBus(); + // listen the timer events + eventBus.consumer((ReprocessorConstants.TIMER_EVENT), message -> { + + process(new MessageDTO()); + }); + + // description of timers + JsonObject timer = (new JsonObject()) + .put(ReprocessorConstants.TYPE, environment.getProperty(ReprocessorConstants.TYPE_VALUE)) + .put(ReprocessorConstants.SECONDS, environment.getProperty(ReprocessorConstants.SECONDS_VALUE)) + .put(ReprocessorConstants.MINUTES, environment.getProperty(ReprocessorConstants.MINUTES_VALUE)) + .put(ReprocessorConstants.HOURS, environment.getProperty(ReprocessorConstants.HOURS_VALUE)) + .put(ReprocessorConstants.DAY_OF_MONTH, + environment.getProperty(ReprocessorConstants.DAY_OF_MONTH_VALUE)) + .put(ReprocessorConstants.MONTHS, environment.getProperty(ReprocessorConstants.MONTHS_VALUE)) + .put(ReprocessorConstants.DAYS_OF_WEEK, + environment.getProperty(ReprocessorConstants.DAYS_OF_WEEK_VALUE)); + + // create scheduler + eventBus.send(ReprocessorConstants.CHIME, + (new JsonObject()).put(ReprocessorConstants.OPERATION, ReprocessorConstants.OPERATION_VALUE) + .put(ReprocessorConstants.NAME, ReprocessorConstants.NAME_VALUE) + .put(ReprocessorConstants.DESCRIPTION, timer), + ar -> { + if (ar.succeeded()) { + regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), + LoggerFileConstant.REGISTRATIONID.toString(), "", + "ReprocessorVerticle::schedular()::started"); + } else { + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), + LoggerFileConstant.REGISTRATIONID.toString(), "", + "ReprocessorVerticle::schedular()::failed " + ar.cause()); + vertx.close(); + } + }); + + } + + /** + * Send message. + * + * @param message + * the message + * @param toAddress + * the to address + */ + public void sendMessage(MessageDTO message, MessageBusAddress toAddress) { + this.send(this.mosipEventBus, toAddress, message); + } + + @Override + public void start() { + router.setRoute(this.postUrl(getVertx(), null, null)); + this.createServer(router.getRouter(), Integer.parseInt(port)); + } + + /* + * (non-Javadoc) + * + * @see + * io.mosip.registration.processor.core.spi.eventbus.EventBusManager#process( + * java.lang.Object) + */ + @Override + public MessageDTO process(MessageDTO object) { + List reprocessorDtoList = null; + LogDescription description = new LogDescription(); + List statusList = new ArrayList<>(); + statusList.add(RegistrationTransactionStatusCode.SUCCESS.toString()); + statusList.add(RegistrationTransactionStatusCode.REPROCESS.toString()); + statusList.add(RegistrationTransactionStatusCode.IN_PROGRESS.toString()); + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + "ReprocessorVerticle::process()::entry"); + StringBuffer ridSb=new StringBuffer(); + try { + Map> reprocessRestartTriggerMap = intializeReprocessRestartTriggerMapping(); + reprocessorDtoList = registrationStatusService.getResumablePackets(fetchSize); + if (!CollectionUtils.isEmpty(reprocessorDtoList)) { + if (reprocessorDtoList.size() < fetchSize) { + List reprocessorPacketList = registrationStatusService.getUnProcessedPackets(fetchSize - reprocessorDtoList.size(), elapseTime, + reprocessCount, statusList, reprocessExcludeStageNames); + if (!CollectionUtils.isEmpty(reprocessorPacketList)) { + reprocessorDtoList.addAll(reprocessorPacketList); + } + } + } else { + reprocessorDtoList = registrationStatusService.getUnProcessedPackets(fetchSize, elapseTime, + reprocessCount, statusList, reprocessExcludeStageNames); + } + + + if (!CollectionUtils.isEmpty(reprocessorDtoList)) { + reprocessorDtoList.forEach(dto -> { + String registrationId = dto.getRegistrationId(); + ridSb.append(registrationId); + ridSb.append(","); + MessageDTO messageDTO = new MessageDTO(); + messageDTO.setRid(registrationId); + messageDTO.setReg_type(dto.getRegistrationType()); + messageDTO.setSource(dto.getSource()); + messageDTO.setIteration(dto.getIteration()); + messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId()); + if (reprocessCount.equals(dto.getReProcessRetryCount())) { + dto.setLatestTransactionStatusCode( + RegistrationTransactionStatusCode.REPROCESS_FAILED.toString()); + dto.setLatestTransactionTypeCode( + RegistrationTransactionTypeCode.PACKET_REPROCESS.toString()); + dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage()); + dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString()); + dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode()); + messageDTO.setIsValid(false); + description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage()); + description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode()); + + } else { + messageDTO.setIsValid(true); + isTransactionSuccessful = true; + String stageName; + if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) { + stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage); + stageName = stageName.concat(ReprocessorConstants.BUS_IN); + sendAndSetStatus(dto, messageDTO, stageName); + dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage()); + dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode()); + description + .setMessage( + PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS + .getMessage()); + description.setCode( + PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS + .getCode()); + + } else { + stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName()); + if (RegistrationTransactionStatusCode.SUCCESS.name() + .equalsIgnoreCase(dto.getLatestTransactionStatusCode())) { + stageName = stageName.concat(ReprocessorConstants.BUS_OUT); + } else { + stageName = stageName.concat(ReprocessorConstants.BUS_IN); + } + sendAndSetStatus(dto, messageDTO, stageName); + dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage()); + dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode()); + description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage()); + description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()); + } + } + regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), + LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage()); + + /** Module-Id can be Both Success/Error code */ + String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); + String moduleName = ModuleName.RE_PROCESSOR.toString(); + registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); + String eventId = EventId.RPR_402.toString(); + String eventName = EventName.UPDATE.toString(); + String eventType = EventType.BUSINESS.toString(); + + if (!isTransactionSuccessful) + auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, + eventType, moduleId, moduleName, registrationId); + }); + + } + } catch (TablenotAccessibleException e) { + isTransactionSuccessful = false; + object.setInternalError(Boolean.TRUE); + description.setMessage(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage()); + description.setCode(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getCode()); + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), + description.getCode() + " -- ", + PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e.toString()); + + }catch (Exception ex) { + isTransactionSuccessful = false; + description.setMessage(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage()); + description.setCode(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getCode()); + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), + description.getCode() + " -- ", + PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage() + ex.getMessage() + + ExceptionUtils.getStackTrace(ex)); + object.setInternalError(Boolean.TRUE); + + } finally { + regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), + null, description.getMessage()); + if (isTransactionSuccessful) + description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage()); + + String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString(); + String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString(); + String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString(); + + /** Module-Id can be Both Success/Error code */ + String moduleId = isTransactionSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() + : description.getCode(); + String moduleName = ModuleName.RE_PROCESSOR.toString(); + auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType, + moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):"")); + } + + return object; + } + + private Map> intializeReprocessRestartTriggerMapping() { + Map> reprocessRestartTriggerMap = new HashMap>(); + for (String filter : reprocessRestartTriggerFilter) { + String[] stageAndStatus = filter.split(":"); + String stageName = stageAndStatus[0]; + String latestTransactionStatusCode = stageAndStatus[1]; + Set latestTransactionStatusCodeSet; + if (reprocessRestartTriggerMap.containsKey(stageName)) { + latestTransactionStatusCodeSet = reprocessRestartTriggerMap.get(stageName); + if (latestTransactionStatusCodeSet.size() != 3) { + setReprocessRestartTriggerMap(reprocessRestartTriggerMap, stageName, latestTransactionStatusCode, + latestTransactionStatusCodeSet); + } + } else { + latestTransactionStatusCodeSet = new HashSet(); + setReprocessRestartTriggerMap(reprocessRestartTriggerMap, stageName, latestTransactionStatusCode, + latestTransactionStatusCodeSet); + } + } + return reprocessRestartTriggerMap; + + + } + + private void setReprocessRestartTriggerMap(Map> reprocessRestartTriggerMap, String stageName, + String latestTransactionStatusCode, Set latestTransactionStatusCodeSet) { + if (latestTransactionStatusCode.equalsIgnoreCase("*")) { + latestTransactionStatusCodeSet.add(RegistrationTransactionStatusCode.SUCCESS.toString()); + latestTransactionStatusCodeSet.add(RegistrationTransactionStatusCode.IN_PROGRESS.toString()); + latestTransactionStatusCodeSet.add(RegistrationTransactionStatusCode.REPROCESS.toString()); + } else { + latestTransactionStatusCodeSet.add(latestTransactionStatusCode.toUpperCase()); + } + reprocessRestartTriggerMap.put(stageName, latestTransactionStatusCodeSet); + } + + private boolean isRestartFromStageRequired(InternalRegistrationStatusDto dto, + Map> reprocessRestartTriggerMap) { + boolean isRestartFromStageRequired = false; + String stageName = dto.getRegistrationStageName(); + if (reprocessRestartTriggerMap.containsKey(stageName)) { + Set latestTransactionStatusCodes = reprocessRestartTriggerMap.get(stageName); + if (latestTransactionStatusCodes.contains(dto.getLatestTransactionStatusCode())) { + isRestartFromStageRequired = true; + } + } + return isRestartFromStageRequired; + } + + private void sendAndSetStatus(InternalRegistrationStatusDto dto, MessageDTO messageDTO, String stageName) { + MessageBusAddress address = new MessageBusAddress(stageName); + sendMessage(messageDTO, address); + dto.setUpdatedBy(ReprocessorConstants.USER); + Integer reprocessRetryCount = dto.getReProcessRetryCount() != null ? dto.getReProcessRetryCount() + 1 : 1; + dto.setReProcessRetryCount(reprocessRetryCount); + dto.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.REPROCESS.toString()); + dto.setLatestTransactionTypeCode(RegistrationTransactionTypeCode.PACKET_REPROCESS.toString()); + } + + + + @Override + protected String getPropertyPrefix() { + return VERTICLE_PROPERTY_PREFIX; + } +} diff --git a/registration-processor/workflow-engine/registration-processor-reprocessor/src/test/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticleTest.java b/registration-processor/workflow-engine/registration-processor-reprocessor/src/test/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticleTest.java index cb182bf77db..b028d511454 100644 --- a/registration-processor/workflow-engine/registration-processor-reprocessor/src/test/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticleTest.java +++ b/registration-processor/workflow-engine/registration-processor-reprocessor/src/test/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticleTest.java @@ -38,6 +38,7 @@ import io.mosip.registration.processor.core.spi.restclient.RegistrationProcessorRestClientService; import io.mosip.registration.processor.rest.client.audit.builder.AuditLogRequestBuilder; import io.mosip.registration.processor.rest.client.audit.dto.AuditResponseDto; +import io.mosip.registration.processor.status.code.RegistrationStatusCode; import io.mosip.registration.processor.status.code.RegistrationType; import io.mosip.registration.processor.status.dto.InternalRegistrationStatusDto; import io.mosip.registration.processor.status.dto.RegistrationStatusDto; @@ -120,6 +121,16 @@ public void setup() throws Exception { ReflectionTestUtils.setField(reprocessorVerticle, "elapseTime", 21600); ReflectionTestUtils.setField(reprocessorVerticle, "reprocessCount", 3); ReflectionTestUtils.setField(reprocessorVerticle, "reprocessExcludeStageNames", new ArrayList<>()); + List reprocessRestartTriggerFilterList = new ArrayList<>(); + reprocessRestartTriggerFilterList.add("DemodedupStage:Success"); + reprocessRestartTriggerFilterList.add("BioDedupeStage:*"); + reprocessRestartTriggerFilterList.add("UinGeneratorStage:reprocess"); + reprocessRestartTriggerFilterList.add("BioDedupeStage:reprocess"); + + ReflectionTestUtils.setField(reprocessorVerticle, "reprocessRestartTriggerFilter", + reprocessRestartTriggerFilterList); + ReflectionTestUtils.setField(reprocessorVerticle, "reprocessRestartFromStage", + "SecurezoneNotificationStage"); Field auditLog = AuditLogRequestBuilder.class.getDeclaredField("registrationProcessorRestService"); auditLog.setAccessible(true); @SuppressWarnings("unchecked") @@ -259,4 +270,27 @@ public void testProcessValidWithResumablePackets() throws TablenotAccessibleExce reprocessorVerticle.process(dto); } + + @Test + public void testProcessWithRestartFromStage() throws TablenotAccessibleException, + PacketManagerException, + ApisResourceAccessException, WorkflowActionException { + + List dtolist = new ArrayList<>(); + InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto(); + + registrationStatusDto.setRegistrationId("2018701130000410092018110735"); + registrationStatusDto.setRegistrationType(RegistrationType.NEW.toString()); + registrationStatusDto.setRegistrationStageName("BioDedupeStage"); + registrationStatusDto.setReProcessRetryCount(0); + registrationStatusDto.setStatusCode(RegistrationStatusCode.PROCESSING.toString()); + registrationStatusDto.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.REPROCESS.toString()); + dtolist.add(registrationStatusDto); + Mockito.when( + registrationStatusService.getUnProcessedPackets(anyInt(), anyLong(), anyInt(), anyList(), anyList())) + .thenReturn(dtolist); + reprocessorVerticle.process(dto); + + } + }