Skip to content

Commit

Permalink
feat(lock): rename methods
Browse files Browse the repository at this point in the history
  • Loading branch information
CianoDanilo committed Dec 12, 2024
1 parent 712898c commit c77c4e3
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class PaymentWalletsLogEventsStream(

private fun processEvent(event: Document?): Mono<Document> {
return Mono.defer {
cdcLockService.acquireJobLock(event?.getString("_id").toString()).flatMap {
cdcLockService.acquireEventLock(event?.getString("_id").toString()).flatMap {
walletPaymentCDCEventDispatcherService.dispatchEvent(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ data class RedisJobLockPolicyConfig(
val ttlInMs: Long,
val waitTimeInMs: Long
) {
fun getLockNameByJob(eventId: String): String = "%s:%s:%s".format(keyspace, "lock", eventId)
fun getLockNameByEventId(eventId: String): String = "%s:%s:%s".format(keyspace, "lock", eventId)
}
4 changes: 2 additions & 2 deletions src/main/kotlin/it/pagopa/wallet/services/CdcLockService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class CdcLockService(
) {
private val logger: Logger = LoggerFactory.getLogger(javaClass)

fun acquireJobLock(eventId: String): Mono<Unit> {
fun acquireEventLock(eventId: String): Mono<Unit> {
logger.debug("Trying to acquire lock for event: {}", eventId)
return redissonClient
.getLock(redisJobLockPolicyConfig.getLockNameByJob(eventId))
.getLock(redisJobLockPolicyConfig.getLockNameByEventId(eventId))
.tryLock(
redisJobLockPolicyConfig.waitTimeInMs,
redisJobLockPolicyConfig.ttlInMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class PaymentWalletsLogEventsStreamTest {

given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now())

given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit))
given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit))

doNothing().`when`(resumePolicyService).saveResumeTimestamp(anyOrNull())

Expand All @@ -86,7 +86,7 @@ class PaymentWalletsLogEventsStreamTest {
.expectNext(expectedDocument)
.verifyComplete()

verify(cdcLockService, Times(1)).acquireJobLock(expectedDocument.getString("_id"))
verify(cdcLockService, Times(1)).acquireEventLock(expectedDocument.getString("_id"))
}

@Test
Expand Down Expand Up @@ -115,7 +115,7 @@ class PaymentWalletsLogEventsStreamTest {
}
.willReturn(bsonDocumentFlux)

given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit))
given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit))

given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now())

Expand All @@ -128,7 +128,7 @@ class PaymentWalletsLogEventsStreamTest {
.verifyComplete()

verify(walletPaymentCDCEventDispatcherService, times(3)).dispatchEvent(anyOrNull())
verify(cdcLockService, Times(3)).acquireJobLock(expectedDocument.getString("_id"))
verify(cdcLockService, Times(3)).acquireEventLock(expectedDocument.getString("_id"))
}

@Test
Expand Down Expand Up @@ -157,7 +157,7 @@ class PaymentWalletsLogEventsStreamTest {
}
.willReturn(bsonDocumentFlux)

given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit))
given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit))

given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now())

Expand All @@ -171,7 +171,7 @@ class PaymentWalletsLogEventsStreamTest {
.verifyComplete()

verify(walletPaymentCDCEventDispatcherService, times(3)).dispatchEvent(anyOrNull())
verify(cdcLockService, Times(3)).acquireJobLock(expectedDocument.getString("_id"))
verify(cdcLockService, Times(3)).acquireEventLock(expectedDocument.getString("_id"))
}

@Test
Expand Down Expand Up @@ -200,7 +200,7 @@ class PaymentWalletsLogEventsStreamTest {
}
.willReturn(bsonDocumentFlux)

given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit))
given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit))

given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now())

Expand All @@ -215,7 +215,7 @@ class PaymentWalletsLogEventsStreamTest {

verify(walletPaymentCDCEventDispatcherService, times(3)).dispatchEvent(anyOrNull())
verify(resumePolicyService, times(3)).saveResumeTimestamp(anyOrNull())
verify(cdcLockService, Times(3)).acquireJobLock(expectedDocument.getString("_id"))
verify(cdcLockService, Times(3)).acquireEventLock(expectedDocument.getString("_id"))
}

@Test
Expand Down Expand Up @@ -250,7 +250,7 @@ class PaymentWalletsLogEventsStreamTest {

given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now())

given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit))
given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit))

doNothing().`when`(resumePolicyService).saveResumeTimestamp(anyOrNull())

Expand All @@ -265,7 +265,7 @@ class PaymentWalletsLogEventsStreamTest {

verify(walletPaymentCDCEventDispatcherService, times(3)).dispatchEvent(anyOrNull())
verify(resumePolicyService, times(3)).saveResumeTimestamp(anyOrNull())
verify(cdcLockService, Times(3)).acquireJobLock(any())
verify(cdcLockService, Times(3)).acquireEventLock(any())
}

@Test
Expand All @@ -281,7 +281,7 @@ class PaymentWalletsLogEventsStreamTest {

given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now())

given { cdcLockService.acquireJobLock(any()) }.willReturn(Mono.just(Unit))
given { cdcLockService.acquireEventLock(any()) }.willReturn(Mono.just(Unit))

doNothing().`when`(resumePolicyService).saveResumeTimestamp(anyOrNull())

Expand All @@ -293,7 +293,7 @@ class PaymentWalletsLogEventsStreamTest {

verify(reactiveMongoTemplate, times(3))
.changeStream(anyOrNull(), anyOrNull(), eq(BsonDocument::class.java))
verify(cdcLockService, Times(0)).acquireJobLock(any())
verify(cdcLockService, Times(0)).acquireEventLock(any())
}

@Test
Expand All @@ -319,7 +319,7 @@ class PaymentWalletsLogEventsStreamTest {

given { resumePolicyService.getResumeTimestamp() }.willReturn(Instant.now())

given { cdcLockService.acquireJobLock(any()) }
given { cdcLockService.acquireEventLock(any()) }
.willThrow(LockNotAcquiredException("Test error"))

doNothing().`when`(resumePolicyService).saveResumeTimestamp(anyOrNull())
Expand All @@ -330,7 +330,7 @@ class PaymentWalletsLogEventsStreamTest {
StepVerifier.create(paymentWalletsLogEventsStream.streamPaymentWalletsLogEvents())
.verifyComplete()

verify(cdcLockService, Times(1)).acquireJobLock(expectedDocument.getString("_id"))
verify(cdcLockService, Times(1)).acquireEventLock(expectedDocument.getString("_id"))
verify(walletPaymentCDCEventDispatcherService, Times(0)).dispatchEvent(any())
verify(resumePolicyService, Times(0)).saveResumeTimestamp(any())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CdcLockServiceTest {
given(rLockReactive.tryLock(any(), any(), any())).willReturn(mono { true })

// Test
cdcLockService.acquireJobLock(eventId).test().expectNext(Unit).verifyComplete()
cdcLockService.acquireEventLock(eventId).test().expectNext(Unit).verifyComplete()

// verifications
verify(redissonClient, times(1)).getLock("lockkeyspace:lock:$eventId")
Expand All @@ -45,7 +45,7 @@ class CdcLockServiceTest {

// Test
cdcLockService
.acquireJobLock(eventId)
.acquireEventLock(eventId)
.test()
.expectError(LockNotAcquiredException::class.java)
.verify()
Expand Down

0 comments on commit c77c4e3

Please sign in to comment.