diff --git a/src/main/kotlin/it/pagopa/wallet/cdc/PaymentWalletsLogEventsStream.kt b/src/main/kotlin/it/pagopa/wallet/cdc/PaymentWalletsLogEventsStream.kt index 22b17b1..e7a25c9 100644 --- a/src/main/kotlin/it/pagopa/wallet/cdc/PaymentWalletsLogEventsStream.kt +++ b/src/main/kotlin/it/pagopa/wallet/cdc/PaymentWalletsLogEventsStream.kt @@ -91,7 +91,7 @@ class PaymentWalletsLogEventsStream( private fun processEvent(event: Document?): Mono { return Mono.defer { - cdcLockService.acquireJobLock(event?.getString("_id").toString()).flatMap { + cdcLockService.acquireEventLock(event?.getString("_id").toString()).flatMap { walletPaymentCDCEventDispatcherService.dispatchEvent(event) } } diff --git a/src/main/kotlin/it/pagopa/wallet/config/properties/RedisJobLockPolicyConfig.kt b/src/main/kotlin/it/pagopa/wallet/config/properties/RedisJobLockPolicyConfig.kt index 55a4799..dabccc7 100644 --- a/src/main/kotlin/it/pagopa/wallet/config/properties/RedisJobLockPolicyConfig.kt +++ b/src/main/kotlin/it/pagopa/wallet/config/properties/RedisJobLockPolicyConfig.kt @@ -8,5 +8,5 @@ data class RedisJobLockPolicyConfig( val ttlInMs: Long, val waitTimeInMs: Long ) { - fun getLockNameByJob(eventId: String): String = "%s:%s:%s".format(keyspace, "lock", eventId) + fun getLockNameByEventId(eventId: String): String = "%s:%s:%s".format(keyspace, "lock", eventId) } diff --git a/src/main/kotlin/it/pagopa/wallet/services/CdcLockService.kt b/src/main/kotlin/it/pagopa/wallet/services/CdcLockService.kt index 13af596..fe5a6d7 100644 --- a/src/main/kotlin/it/pagopa/wallet/services/CdcLockService.kt +++ b/src/main/kotlin/it/pagopa/wallet/services/CdcLockService.kt @@ -17,10 +17,10 @@ class CdcLockService( ) { private val logger: Logger = LoggerFactory.getLogger(javaClass) - fun acquireJobLock(eventId: String): Mono { + fun acquireEventLock(eventId: String): Mono { logger.debug("Trying to acquire lock for event: {}", eventId) return redissonClient - .getLock(redisJobLockPolicyConfig.getLockNameByJob(eventId)) + .getLock(redisJobLockPolicyConfig.getLockNameByEventId(eventId)) .tryLock( redisJobLockPolicyConfig.waitTimeInMs, redisJobLockPolicyConfig.ttlInMs, diff --git a/src/test/kotlin/it/pagopa/wallet/cdc/PaymentWalletsLogEventsStreamTest.kt b/src/test/kotlin/it/pagopa/wallet/cdc/PaymentWalletsLogEventsStreamTest.kt index caff3ad..d5b1584 100644 --- a/src/test/kotlin/it/pagopa/wallet/cdc/PaymentWalletsLogEventsStreamTest.kt +++ b/src/test/kotlin/it/pagopa/wallet/cdc/PaymentWalletsLogEventsStreamTest.kt @@ -75,7 +75,7 @@ class PaymentWalletsLogEventsStreamTest { given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now()) - given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit)) + given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit)) doNothing().`when`(resumePolicyService).saveResumeTimestamp(anyOrNull()) @@ -86,7 +86,7 @@ class PaymentWalletsLogEventsStreamTest { .expectNext(expectedDocument) .verifyComplete() - verify(cdcLockService, Times(1)).acquireJobLock(expectedDocument.getString("_id")) + verify(cdcLockService, Times(1)).acquireEventLock(expectedDocument.getString("_id")) } @Test @@ -115,7 +115,7 @@ class PaymentWalletsLogEventsStreamTest { } .willReturn(bsonDocumentFlux) - given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit)) + given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit)) given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now()) @@ -128,7 +128,7 @@ class PaymentWalletsLogEventsStreamTest { .verifyComplete() verify(walletPaymentCDCEventDispatcherService, times(3)).dispatchEvent(anyOrNull()) - verify(cdcLockService, Times(3)).acquireJobLock(expectedDocument.getString("_id")) + verify(cdcLockService, Times(3)).acquireEventLock(expectedDocument.getString("_id")) } @Test @@ -157,7 +157,7 @@ class PaymentWalletsLogEventsStreamTest { } .willReturn(bsonDocumentFlux) - given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit)) + given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit)) given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now()) @@ -171,7 +171,7 @@ class PaymentWalletsLogEventsStreamTest { .verifyComplete() verify(walletPaymentCDCEventDispatcherService, times(3)).dispatchEvent(anyOrNull()) - verify(cdcLockService, Times(3)).acquireJobLock(expectedDocument.getString("_id")) + verify(cdcLockService, Times(3)).acquireEventLock(expectedDocument.getString("_id")) } @Test @@ -200,7 +200,7 @@ class PaymentWalletsLogEventsStreamTest { } .willReturn(bsonDocumentFlux) - given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit)) + given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit)) given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now()) @@ -215,7 +215,7 @@ class PaymentWalletsLogEventsStreamTest { verify(walletPaymentCDCEventDispatcherService, times(3)).dispatchEvent(anyOrNull()) verify(resumePolicyService, times(3)).saveResumeTimestamp(anyOrNull()) - verify(cdcLockService, Times(3)).acquireJobLock(expectedDocument.getString("_id")) + verify(cdcLockService, Times(3)).acquireEventLock(expectedDocument.getString("_id")) } @Test @@ -250,7 +250,7 @@ class PaymentWalletsLogEventsStreamTest { given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now()) - given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit)) + given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit)) doNothing().`when`(resumePolicyService).saveResumeTimestamp(anyOrNull()) @@ -265,7 +265,7 @@ class PaymentWalletsLogEventsStreamTest { verify(walletPaymentCDCEventDispatcherService, times(3)).dispatchEvent(anyOrNull()) verify(resumePolicyService, times(3)).saveResumeTimestamp(anyOrNull()) - verify(cdcLockService, Times(3)).acquireJobLock(any()) + verify(cdcLockService, Times(3)).acquireEventLock(any()) } @Test @@ -281,7 +281,7 @@ class PaymentWalletsLogEventsStreamTest { given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now()) - given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit)) + given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit)) doNothing().`when`(resumePolicyService).saveResumeTimestamp(anyOrNull()) @@ -293,7 +293,7 @@ class PaymentWalletsLogEventsStreamTest { verify(reactiveMongoTemplate, times(3)) .changeStream(anyOrNull(), anyOrNull(), eq(BsonDocument::class.java)) - verify(cdcLockService, Times(0)).acquireJobLock(any()) + verify(cdcLockService, Times(0)).acquireEventLock(any()) } @Test @@ -319,7 +319,7 @@ class PaymentWalletsLogEventsStreamTest { given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now()) - given { cdcLockService.acquireJobLock(any()) } + given { cdcLockService.acquireEventLock(any()) } .willThrow(LockNotAcquiredException("Test error")) doNothing().`when`(resumePolicyService).saveResumeTimestamp(anyOrNull()) @@ -330,7 +330,7 @@ class PaymentWalletsLogEventsStreamTest { StepVerifier.create(paymentWalletsLogEventsStream.streamPaymentWalletsLogEvents()) .verifyComplete() - verify(cdcLockService, Times(1)).acquireJobLock(expectedDocument.getString("_id")) + verify(cdcLockService, Times(1)).acquireEventLock(expectedDocument.getString("_id")) verify(walletPaymentCDCEventDispatcherService, Times(0)).dispatchEvent(any()) verify(resumePolicyService, Times(0)).saveResumeTimestamp(any()) } diff --git a/src/test/kotlin/it/pagopa/wallet/services/CdcLockServiceTest.kt b/src/test/kotlin/it/pagopa/wallet/services/CdcLockServiceTest.kt index 2bc8955..93768e5 100644 --- a/src/test/kotlin/it/pagopa/wallet/services/CdcLockServiceTest.kt +++ b/src/test/kotlin/it/pagopa/wallet/services/CdcLockServiceTest.kt @@ -29,7 +29,7 @@ class CdcLockServiceTest { given(rLockReactive.tryLock(any(), any(), any())).willReturn(mono { true }) // Test - cdcLockService.acquireJobLock(eventId).test().expectNext(Unit).verifyComplete() + cdcLockService.acquireEventLock(eventId).test().expectNext(Unit).verifyComplete() // verifications verify(redissonClient, times(1)).getLock("lockkeyspace:lock:$eventId") @@ -45,7 +45,7 @@ class CdcLockServiceTest { // Test cdcLockService - .acquireJobLock(eventId) + .acquireEventLock(eventId) .test() .expectError(LockNotAcquiredException::class.java) .verify()