From 0a786c66e69dbc31bf213ef6f3697b99b8d5705d Mon Sep 17 00:00:00 2001 From: Marcus Aspin Date: Tue, 9 Jul 2024 17:49:08 +0100 Subject: [PATCH] PI-2342 Improve Telemetry for message publishing (#4006) --- .../hmpps/listener/AwsNotificationListener.kt | 31 ++--- .../digital/hmpps/publisher/QueuePublisher.kt | 8 +- .../digital/hmpps/publisher/TopicPublisher.kt | 8 +- .../telemetry/TelemetryMessagingExtensions.kt | 18 ++- .../hmpps/PublishingIntegrationTest.kt | 4 +- .../digital/hmpps/scheduling/Scheduler.kt | 20 +-- .../hmpps/service/DomainEventService.kt | 30 +++-- .../digital/hmpps/scheduling/SchedulerTest.kt | 23 +--- .../hmpps/service/DomainEventServiceTest.kt | 57 +++------ .../justice/digital/hmpps/IntegrationTest.kt | 8 +- .../delius/OffenderDeltaPoller.kt | 31 ++--- .../delius/offender/OffenderDeltaService.kt | 31 +++-- .../delius/OffenderDeltaPollerTest.kt | 117 ------------------ 13 files changed, 115 insertions(+), 271 deletions(-) delete mode 100644 projects/offender-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/OffenderDeltaPollerTest.kt diff --git a/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/listener/AwsNotificationListener.kt b/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/listener/AwsNotificationListener.kt index 80409c099e..e832cb1f34 100644 --- a/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/listener/AwsNotificationListener.kt +++ b/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/listener/AwsNotificationListener.kt @@ -6,12 +6,8 @@ import io.awspring.cloud.sqs.annotation.SqsListener import io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException import io.opentelemetry.api.trace.SpanKind -import io.opentelemetry.instrumentation.annotations.SpanAttribute -import io.opentelemetry.instrumentation.annotations.WithSpan import io.sentry.Sentry import io.sentry.spring.jakarta.tracing.SentryTransaction -import org.slf4j.Logger -import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.context.annotation.Conditional import org.springframework.dao.CannotAcquireLockException @@ -25,8 +21,8 @@ import uk.gov.justice.digital.hmpps.config.AwsCondition import uk.gov.justice.digital.hmpps.message.Notification import uk.gov.justice.digital.hmpps.messaging.NotificationHandler import uk.gov.justice.digital.hmpps.retry.retry -import uk.gov.justice.digital.hmpps.telemetry.TelemetryMessagingExtensions.extractSpanContext -import uk.gov.justice.digital.hmpps.telemetry.TelemetryMessagingExtensions.startSpan +import uk.gov.justice.digital.hmpps.telemetry.TelemetryMessagingExtensions.extractTelemetryContext +import uk.gov.justice.digital.hmpps.telemetry.TelemetryMessagingExtensions.withSpan import java.util.concurrent.CompletionException @Component @@ -36,21 +32,19 @@ class AwsNotificationListener( private val handler: NotificationHandler<*>, private val objectMapper: ObjectMapper ) { - @WithSpan(kind = SpanKind.CONSUMER) @SentryTransaction(operation = "messaging") @SqsListener("\${messaging.consumer.queue}") - fun receive(@SpanAttribute message: String) { - val attributes = objectMapper.readValue(message, jacksonTypeRef>()).attributes - val span = attributes.extractSpanContext().startSpan(this::class.java.name, "receive", SpanKind.CONSUMER) - span.makeCurrent().use { - try { - retry(3, RETRYABLE_EXCEPTIONS) { handler.handle(message) } - } catch (e: Throwable) { - Sentry.captureException(unwrapSqsExceptions(e)) - throw e + fun receive(message: String) { + objectMapper.readValue(message, jacksonTypeRef>()).attributes + .extractTelemetryContext() + .withSpan(this::class.java.simpleName, "receive", SpanKind.CONSUMER) { + try { + retry(3, RETRYABLE_EXCEPTIONS) { handler.handle(message) } + } catch (e: Throwable) { + Sentry.captureException(unwrapSqsExceptions(e)) + throw e + } } - } - span.end() } fun unwrapSqsExceptions(e: Throwable): Throwable { @@ -69,7 +63,6 @@ class AwsNotificationListener( } companion object { - private val log: Logger = LoggerFactory.getLogger(this::class.java) val RETRYABLE_EXCEPTIONS = listOf( RestClientException::class, CannotAcquireLockException::class, diff --git a/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/publisher/QueuePublisher.kt b/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/publisher/QueuePublisher.kt index d741f8a4f4..efd39a8022 100644 --- a/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/publisher/QueuePublisher.kt +++ b/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/publisher/QueuePublisher.kt @@ -3,6 +3,7 @@ package uk.gov.justice.digital.hmpps.publisher import com.fasterxml.jackson.databind.ObjectMapper import io.awspring.cloud.sqs.operations.SqsTemplate import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.instrumentation.annotations.SpanAttribute import io.opentelemetry.instrumentation.annotations.WithSpan import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty @@ -12,7 +13,7 @@ import org.springframework.messaging.support.MessageBuilder import org.springframework.stereotype.Component import uk.gov.justice.digital.hmpps.config.AwsCondition import uk.gov.justice.digital.hmpps.message.Notification -import uk.gov.justice.digital.hmpps.telemetry.TelemetryMessagingExtensions.withSpanContext +import uk.gov.justice.digital.hmpps.telemetry.TelemetryMessagingExtensions.withTelemetryContext import java.util.concurrent.Semaphore @Component @@ -28,7 +29,7 @@ class QueuePublisher( private val permit = Semaphore(limit, true) @WithSpan(kind = SpanKind.PRODUCER) - override fun publish(notification: Notification<*>) { + override fun publish(@SpanAttribute notification: Notification<*>) { notification.message?.also { _ -> permit.acquire() try { @@ -41,6 +42,7 @@ class QueuePublisher( private fun Notification<*>.asMessage() = MessageBuilder.createMessage( objectMapper.writeValueAsString(Notification(objectMapper.writeValueAsString(message), attributes)), - MessageHeaders(attributes.map { it.key to it.value.value }.toMap()).withSpanContext() + MessageHeaders(attributes.map { it.key to it.value.value }.toMap()) + .withTelemetryContext() ) } diff --git a/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/publisher/TopicPublisher.kt b/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/publisher/TopicPublisher.kt index f5a64a8843..62c4c55f44 100644 --- a/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/publisher/TopicPublisher.kt +++ b/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/publisher/TopicPublisher.kt @@ -2,6 +2,7 @@ package uk.gov.justice.digital.hmpps.publisher import io.awspring.cloud.sns.core.SnsTemplate import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.instrumentation.annotations.SpanAttribute import io.opentelemetry.instrumentation.annotations.WithSpan import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty @@ -12,7 +13,7 @@ import org.springframework.messaging.support.MessageBuilder import org.springframework.stereotype.Component import uk.gov.justice.digital.hmpps.config.AwsCondition import uk.gov.justice.digital.hmpps.message.Notification -import uk.gov.justice.digital.hmpps.telemetry.TelemetryMessagingExtensions.withSpanContext +import uk.gov.justice.digital.hmpps.telemetry.TelemetryMessagingExtensions.withTelemetryContext @Primary @Component @@ -23,12 +24,13 @@ class TopicPublisher( @Value("\${messaging.producer.topic}") private val topic: String ) : NotificationPublisher { @WithSpan(kind = SpanKind.PRODUCER) - override fun publish(notification: Notification<*>) { + override fun publish(@SpanAttribute notification: Notification<*>) { notification.message?.let { message -> notificationTemplate.convertAndSend(topic, message) { msg -> MessageBuilder.createMessage( msg.payload, - MessageHeaders(notification.attributes.map { it.key to it.value.value }.toMap()).withSpanContext(), + MessageHeaders(notification.attributes.map { it.key to it.value.value }.toMap()) + .withTelemetryContext(), ) } } diff --git a/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/telemetry/TelemetryMessagingExtensions.kt b/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/telemetry/TelemetryMessagingExtensions.kt index 3d4e7c0f34..4b9c4c2608 100644 --- a/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/telemetry/TelemetryMessagingExtensions.kt +++ b/libs/messaging/src/main/kotlin/uk/gov/justice/digital/hmpps/telemetry/TelemetryMessagingExtensions.kt @@ -11,7 +11,7 @@ import uk.gov.justice.digital.hmpps.message.MessageAttributes import uk.gov.justice.digital.hmpps.message.Notification object TelemetryMessagingExtensions { - fun MessageHeaders.withSpanContext(): MessageHeaders { + fun MessageHeaders.withTelemetryContext(): MessageHeaders { val map = this.toMutableMap() val context = Context.current().with(Span.current()) GlobalOpenTelemetry.getPropagators().textMapPropagator @@ -19,7 +19,7 @@ object TelemetryMessagingExtensions { return MessageHeaders(map) } - fun MessageAttributes.extractSpanContext(): Context { + fun MessageAttributes.extractTelemetryContext(): Context { val getter = object : TextMapGetter { override fun keys(carrier: MessageAttributes) = carrier.keys override fun get(carrier: MessageAttributes?, key: String) = carrier?.get(key)?.value @@ -27,9 +27,19 @@ object TelemetryMessagingExtensions { return GlobalOpenTelemetry.getPropagators().textMapPropagator.extract(Context.current(), this, getter) } - fun Context.startSpan(scopeName: String, spanName: String, spanKind: SpanKind = SpanKind.INTERNAL): Span { + fun Context.withSpan( + scopeName: String, + spanName: String, + spanKind: SpanKind = SpanKind.INTERNAL, + block: () -> T + ): T { val tracer = GlobalOpenTelemetry.getTracer(scopeName) - return tracer.spanBuilder(spanName).setParent(this).setSpanKind(spanKind).startSpan() + val span = tracer.spanBuilder("$scopeName.$spanName").setParent(this).setSpanKind(spanKind).startSpan() + try { + return span.makeCurrent().use { block() } + } finally { + span.end() + } } fun TelemetryService.hmppsEventReceived(hmppsEvent: HmppsDomainEvent) { diff --git a/projects/domain-events-and-delius/src/integrationTest/kotlin/uk/gov/justice/digital/hmpps/PublishingIntegrationTest.kt b/projects/domain-events-and-delius/src/integrationTest/kotlin/uk/gov/justice/digital/hmpps/PublishingIntegrationTest.kt index 71eb2d4505..3797a7b85a 100644 --- a/projects/domain-events-and-delius/src/integrationTest/kotlin/uk/gov/justice/digital/hmpps/PublishingIntegrationTest.kt +++ b/projects/domain-events-and-delius/src/integrationTest/kotlin/uk/gov/justice/digital/hmpps/PublishingIntegrationTest.kt @@ -7,7 +7,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.mockito.kotlin.any import org.mockito.kotlin.eq -import org.mockito.kotlin.timeout +import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value @@ -69,7 +69,7 @@ internal class PublishingIntegrationTest { equalTo("http://localhost:${wireMockServer.port()}/probation-case.engagement.created/X789654") ) - verify(telemetryService, timeout(30000)).trackEvent(eq("DomainEventsProcessed"), any(), any()) + verify(telemetryService, times(3)).trackEvent(eq("DomainEventPublished"), any(), any()) assertEquals(0, domainEventRepository.count()) } } diff --git a/projects/domain-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/scheduling/Scheduler.kt b/projects/domain-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/scheduling/Scheduler.kt index 326be36e07..d530ee2a3b 100644 --- a/projects/domain-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/scheduling/Scheduler.kt +++ b/projects/domain-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/scheduling/Scheduler.kt @@ -1,25 +1,17 @@ package uk.gov.justice.digital.hmpps.scheduling -import io.sentry.Sentry import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional import uk.gov.justice.digital.hmpps.service.DomainEventService -import uk.gov.justice.digital.hmpps.telemetry.TelemetryService @Service -class Scheduler( - private val domainEventService: DomainEventService, - private val telemetryService: TelemetryService -) { +class Scheduler(private val service: DomainEventService) { + @Transactional @Scheduled(fixedDelayString = "\${poller.fixed-delay:100}") fun poll() { - try { - val count = domainEventService.publishBatch() - if (count > 0) telemetryService.trackEvent("DomainEventsProcessed", mapOf("EventsSent" to count.toString())) - } catch (e: Exception) { - telemetryService.trackException(e) - Sentry.captureException(e) - throw e - } + service.getDeltas() + .onEach { service.notify(it) } + .also(service::deleteAll) } } diff --git a/projects/domain-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/service/DomainEventService.kt b/projects/domain-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/service/DomainEventService.kt index be982e9dd2..1919cdedfb 100644 --- a/projects/domain-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/service/DomainEventService.kt +++ b/projects/domain-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/service/DomainEventService.kt @@ -2,16 +2,18 @@ package uk.gov.justice.digital.hmpps.service import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import io.opentelemetry.instrumentation.annotations.WithSpan import org.springframework.beans.factory.annotation.Value import org.springframework.data.domain.Pageable import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional import uk.gov.justice.digital.hmpps.integrations.delius.DomainEvent import uk.gov.justice.digital.hmpps.integrations.delius.DomainEventRepository import uk.gov.justice.digital.hmpps.message.HmppsDomainEvent import uk.gov.justice.digital.hmpps.message.Notification import uk.gov.justice.digital.hmpps.publisher.NotificationPublisher import uk.gov.justice.digital.hmpps.service.enhancement.NotificationEnhancer +import uk.gov.justice.digital.hmpps.telemetry.TelemetryService +import java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME @Service class DomainEventService( @@ -20,15 +22,25 @@ class DomainEventService( private val objectMapper: ObjectMapper, private val domainEventRepository: DomainEventRepository, private val notificationPublisher: NotificationPublisher, - private val notificationEnhancer: NotificationEnhancer + private val notificationEnhancer: NotificationEnhancer, + private val telemetryService: TelemetryService ) { - @Transactional - fun publishBatch(): Int { - val deltas = domainEventRepository.findAll(Pageable.ofSize(batchSize)).content - val notifications = deltas.map { notificationEnhancer.enhance(it.asNotification()) } - notifications.forEach { notificationPublisher.publish(it) } - domainEventRepository.deleteAllByIdInBatch(deltas.map { it.id }) - return deltas.size + fun getDeltas(): List = domainEventRepository.findAll(Pageable.ofSize(batchSize)).content + + fun deleteAll(deltas: List) = domainEventRepository.deleteAllByIdInBatch(deltas.map { it.id }) + + @WithSpan + fun notify(delta: DomainEvent) { + val notification = notificationEnhancer.enhance(delta.asNotification()) + notificationPublisher.publish(notification) + telemetryService.trackEvent( + "DomainEventPublished", + mapOf( + "crn" to notification.message.personReference.findCrn().toString(), + "eventType" to notification.eventType.toString(), + "occurredAt" to ISO_ZONED_DATE_TIME.format(notification.message.occurredAt) + ) + ) } fun DomainEvent.asNotification() = Notification( diff --git a/projects/domain-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/scheduling/SchedulerTest.kt b/projects/domain-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/scheduling/SchedulerTest.kt index 237adeca3b..164c7254ac 100644 --- a/projects/domain-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/scheduling/SchedulerTest.kt +++ b/projects/domain-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/scheduling/SchedulerTest.kt @@ -6,41 +6,20 @@ import org.junit.jupiter.api.extension.ExtendWith import org.mockito.InjectMocks import org.mockito.Mock import org.mockito.junit.jupiter.MockitoExtension -import org.mockito.kotlin.any -import org.mockito.kotlin.never -import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import uk.gov.justice.digital.hmpps.service.DomainEventService -import uk.gov.justice.digital.hmpps.telemetry.TelemetryService @ExtendWith(MockitoExtension::class) class SchedulerTest { @Mock private lateinit var domainEventService: DomainEventService - @Mock - private lateinit var telemetryService: TelemetryService - @InjectMocks private lateinit var scheduler: Scheduler @Test fun `failure to publish is thrown`() { - whenever(domainEventService.publishBatch()).thenThrow(RuntimeException("event not processed")) + whenever(domainEventService.getDeltas()).thenThrow(RuntimeException("event not processed")) assertThrows { scheduler.poll() } } - - @Test - fun `poller receives no events and doesn't call telemetry`() { - whenever(domainEventService.publishBatch()).thenReturn(0) - scheduler.poll() - verify(telemetryService, never()).trackEvent(any(), any(), any()) - } - - @Test - fun `poller calls telemetry on success`() { - whenever(domainEventService.publishBatch()).thenReturn(100) - scheduler.poll() - verify(telemetryService).trackEvent("DomainEventsProcessed", mapOf("EventsSent" to "100"), mapOf()) - } } diff --git a/projects/domain-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/service/DomainEventServiceTest.kt b/projects/domain-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/service/DomainEventServiceTest.kt index 8ec9db32bf..0eae404ad4 100644 --- a/projects/domain-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/service/DomainEventServiceTest.kt +++ b/projects/domain-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/service/DomainEventServiceTest.kt @@ -2,26 +2,18 @@ package uk.gov.justice.digital.hmpps.service import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import org.hamcrest.MatcherAssert.assertThat -import org.hamcrest.Matchers.equalTo import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.junit.jupiter.api.extension.ExtendWith import org.mockito.Mock import org.mockito.junit.jupiter.MockitoExtension -import org.mockito.kotlin.any -import org.mockito.kotlin.argThat -import org.mockito.kotlin.never -import org.mockito.kotlin.times -import org.mockito.kotlin.verify -import org.mockito.kotlin.whenever -import org.springframework.data.domain.PageImpl -import org.springframework.data.domain.Pageable +import org.mockito.kotlin.* import uk.gov.justice.digital.hmpps.data.generator.DomainEventGenerator import uk.gov.justice.digital.hmpps.integrations.delius.DomainEventRepository import uk.gov.justice.digital.hmpps.publisher.NotificationPublisher import uk.gov.justice.digital.hmpps.service.enhancement.NotificationEnhancer +import uk.gov.justice.digital.hmpps.telemetry.TelemetryService @ExtendWith(MockitoExtension::class) class DomainEventServiceTest { @@ -35,6 +27,9 @@ class DomainEventServiceTest { @Mock private lateinit var notificationEnhancer: NotificationEnhancer + @Mock + private lateinit var telemetryService: TelemetryService + private lateinit var service: DomainEventService @BeforeEach @@ -44,46 +39,30 @@ class DomainEventServiceTest { objectMapper = jacksonObjectMapper().findAndRegisterModules(), domainEventRepository = domainEventRepository, notificationPublisher = notificationPublisher, - notificationEnhancer = notificationEnhancer + notificationEnhancer = notificationEnhancer, + telemetryService = telemetryService, ) - whenever(notificationEnhancer.enhance(any())).thenAnswer { it.getArgument(0) } - } - - @Test - fun `messages can be read and published`() { - val entities = listOf(DomainEventGenerator.generate("registration-added")) - whenever(domainEventRepository.findAll(any())).thenReturn(PageImpl(entities)) - - val count = service.publishBatch() - - verify(notificationPublisher).publish(argThat { eventType == "probation-case.registration.added" }) - assertThat(count, equalTo(1)) } @Test fun `multiple messages can be published`() { - val entities = listOf( - DomainEventGenerator.generate("manual-ogrs"), - DomainEventGenerator.generate("registration-added") - ) - whenever(domainEventRepository.findAll(any())).thenReturn(PageImpl(entities)) - - val count = service.publishBatch() + whenever(notificationEnhancer.enhance(any())).thenAnswer { it.getArgument(0) } + service.notify(DomainEventGenerator.generate("manual-ogrs")) + service.notify(DomainEventGenerator.generate("registration-added")) verify(notificationPublisher, times(2)).publish(any()) - assertThat(count, equalTo(2)) } @Test fun `nothing is published if any entity is invalid`() { - val entities = listOf( - DomainEventGenerator.generate("manual-ogrs"), - DomainEventGenerator.generate("registration-added"), - DomainEventGenerator.generate("{\"invalid-json\"}", "{\"invalid-json\"}") - ) - whenever(domainEventRepository.findAll(any())).thenReturn(PageImpl(entities)) - - assertThrows { service.publishBatch() } + assertThrows { + service.notify( + DomainEventGenerator.generate( + "{\"invalid-json\"}", + "{\"invalid-json\"}" + ) + ) + } verify(notificationPublisher, never()).publish(any()) } diff --git a/projects/offender-events-and-delius/src/integrationTest/kotlin/uk/gov/justice/digital/hmpps/IntegrationTest.kt b/projects/offender-events-and-delius/src/integrationTest/kotlin/uk/gov/justice/digital/hmpps/IntegrationTest.kt index 88e4291a77..82caf7b12e 100644 --- a/projects/offender-events-and-delius/src/integrationTest/kotlin/uk/gov/justice/digital/hmpps/IntegrationTest.kt +++ b/projects/offender-events-and-delius/src/integrationTest/kotlin/uk/gov/justice/digital/hmpps/IntegrationTest.kt @@ -4,11 +4,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource import org.mockito.Mockito.timeout -import org.mockito.kotlin.after -import org.mockito.kotlin.any -import org.mockito.kotlin.eq -import org.mockito.kotlin.never -import org.mockito.kotlin.verify +import org.mockito.kotlin.* import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.boot.test.context.SpringBootTest @@ -45,7 +41,7 @@ internal class IntegrationTest { fun `offender delta test`(delta: OffenderDelta, expected: List>) { offenderDeltaRepository.save(delta) - verify(offenderDeltaService, after(250).atLeastOnce()).checkAndSendEvents() + verify(offenderDeltaService, after(250).atLeastOnce()).notify(any()) generateSequence { channelManager.getChannel(topicName).receive()?.eventType }.toList() if (expected.isNotEmpty()) { diff --git a/projects/offender-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/OffenderDeltaPoller.kt b/projects/offender-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/OffenderDeltaPoller.kt index e2976f794d..6946d5cad2 100644 --- a/projects/offender-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/OffenderDeltaPoller.kt +++ b/projects/offender-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/OffenderDeltaPoller.kt @@ -1,34 +1,17 @@ package uk.gov.justice.digital.hmpps.integrations.delius -import io.sentry.Sentry import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional import uk.gov.justice.digital.hmpps.integrations.delius.offender.OffenderDeltaService -import uk.gov.justice.digital.hmpps.telemetry.TelemetryService -import java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME @Service -class OffenderDeltaPoller(private val service: OffenderDeltaService, private val telemetryService: TelemetryService) { - +class OffenderDeltaPoller(private val service: OffenderDeltaService) { + @Transactional @Scheduled(fixedDelayString = "\${offender-events.fixed-delay:100}") - fun checkAndSendEvents() { - try { - service.checkAndSendEvents().forEach { - telemetryService.trackEvent( - "OffenderEventPublished", - mapOf( - "crn" to it.message.crn, - "eventType" to it.eventType!!, - "occurredAt" to ISO_ZONED_DATE_TIME.format(it.message.eventDatetime) - ) - ) - } - } catch (e: Exception) { - telemetryService.trackEvent( - "OffenderEventsProcessingFailed", - mapOf("Exception" to (e.message ?: "Unknown Exception")) - ) - Sentry.captureException(e) - } + fun poll() { + service.getDeltas() + .onEach { service.notify(it) } + .also(service::deleteAll) } } diff --git a/projects/offender-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/offender/OffenderDeltaService.kt b/projects/offender-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/offender/OffenderDeltaService.kt index 7da61837f4..b016cfdd85 100644 --- a/projects/offender-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/offender/OffenderDeltaService.kt +++ b/projects/offender-events-and-delius/src/main/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/offender/OffenderDeltaService.kt @@ -1,12 +1,14 @@ package uk.gov.justice.digital.hmpps.integrations.delius.offender +import io.opentelemetry.instrumentation.annotations.WithSpan import org.springframework.beans.factory.annotation.Value import org.springframework.data.domain.Pageable import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional import uk.gov.justice.digital.hmpps.message.MessageAttributes import uk.gov.justice.digital.hmpps.message.Notification import uk.gov.justice.digital.hmpps.publisher.NotificationPublisher +import uk.gov.justice.digital.hmpps.telemetry.TelemetryService +import java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME import java.time.temporal.ChronoUnit @Service @@ -14,15 +16,26 @@ class OffenderDeltaService( @Value("\${offender-events.batch-size:50}") private val batchSize: Int, private val repository: OffenderDeltaRepository, - private val notificationPublisher: NotificationPublisher + private val notificationPublisher: NotificationPublisher, + private val telemetryService: TelemetryService ) { - @Transactional - fun checkAndSendEvents(): List> { - val deltas = repository.findAll(Pageable.ofSize(batchSize)).content - val events = deltas.flatMap { it.asNotifications() } - events.forEach { notificationPublisher.publish(it) } - repository.deleteAllByIdInBatch(deltas.map { it.id }) - return events + fun getDeltas(): List = repository.findAll(Pageable.ofSize(batchSize)).content + + fun deleteAll(deltas: List) = repository.deleteAllByIdInBatch(deltas.map { it.id }) + + @WithSpan + fun notify(delta: OffenderDelta) { + delta.asNotifications().forEach { + notificationPublisher.publish(it) + telemetryService.trackEvent( + "OffenderEventPublished", + mapOf( + "crn" to it.message.crn, + "eventType" to it.eventType!!, + "occurredAt" to ISO_ZONED_DATE_TIME.format(it.message.eventDatetime) + ) + ) + } } fun OffenderDelta.asNotifications(): List> { diff --git a/projects/offender-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/OffenderDeltaPollerTest.kt b/projects/offender-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/OffenderDeltaPollerTest.kt deleted file mode 100644 index d2c49fb557..0000000000 --- a/projects/offender-events-and-delius/src/test/kotlin/uk/gov/justice/digital/hmpps/integrations/delius/OffenderDeltaPollerTest.kt +++ /dev/null @@ -1,117 +0,0 @@ -package uk.gov.justice.digital.hmpps.integrations.delius - -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.ExtendWith -import org.mockito.Mock -import org.mockito.junit.jupiter.MockitoExtension -import org.mockito.kotlin.any -import org.mockito.kotlin.never -import org.mockito.kotlin.verify -import org.mockito.kotlin.whenever -import uk.gov.justice.digital.hmpps.integrations.delius.offender.OffenderDeltaService -import uk.gov.justice.digital.hmpps.integrations.delius.offender.OffenderEvent -import uk.gov.justice.digital.hmpps.message.MessageAttributes -import uk.gov.justice.digital.hmpps.message.Notification -import uk.gov.justice.digital.hmpps.telemetry.TelemetryService -import java.time.ZonedDateTime -import java.time.format.DateTimeFormatter - -@ExtendWith(MockitoExtension::class) -class OffenderDeltaPollerTest { - - @Mock - private lateinit var service: OffenderDeltaService - - @Mock - private lateinit var telemetryService: TelemetryService - - private lateinit var poller: OffenderDeltaPoller - - @BeforeEach - fun setup() { - poller = OffenderDeltaPoller(service, telemetryService) - } - - @Test - fun `unsuccessful event creation notify app insights`() { - whenever(service.checkAndSendEvents()).thenThrow(RuntimeException("event not processed")) - poller.checkAndSendEvents() - - verify(telemetryService).trackEvent( - "OffenderEventsProcessingFailed", - mapOf("Exception" to "event not processed"), - mapOf() - ) - } - - @Test - fun `poller receives no events and doesn't call telemetry`() { - whenever(service.checkAndSendEvents()).thenReturn(listOf()) - poller.checkAndSendEvents() - verify(telemetryService, never()).trackEvent(any(), any(), any()) - } - - @Test - fun `poller calls telemetry if either events or not found`() { - val dateTime = ZonedDateTime.now() - whenever(service.checkAndSendEvents()).thenReturn( - listOf( - Notification( - OffenderEvent(123456, "X123456", null, 67891, dateTime), - MessageAttributes("OFFENDER_CHANGED") - ), - Notification( - OffenderEvent(123456, "X123456", null, 67891, dateTime), - MessageAttributes("OFFENDER_MANAGER_CHANGED") - ), - Notification( - OffenderEvent(223456, "X223456", null, 67890, dateTime), - MessageAttributes("ORDER_MANAGER_CHANGED") - ), - Notification( - OffenderEvent(223456, "X223456", null, 67890, dateTime), - MessageAttributes("CONVICTION_CHANGED") - ) - ) - ) - - poller.checkAndSendEvents() - verify(telemetryService).trackEvent( - "OffenderEventPublished", - mapOf( - "crn" to "X123456", - "eventType" to "OFFENDER_CHANGED", - "occurredAt" to DateTimeFormatter.ISO_ZONED_DATE_TIME.format(dateTime) - ), - mapOf() - ) - verify(telemetryService).trackEvent( - "OffenderEventPublished", - mapOf( - "crn" to "X123456", - "eventType" to "OFFENDER_MANAGER_CHANGED", - "occurredAt" to DateTimeFormatter.ISO_ZONED_DATE_TIME.format(dateTime) - ), - mapOf() - ) - verify(telemetryService).trackEvent( - "OffenderEventPublished", - mapOf( - "crn" to "X223456", - "eventType" to "ORDER_MANAGER_CHANGED", - "occurredAt" to DateTimeFormatter.ISO_ZONED_DATE_TIME.format(dateTime) - ), - mapOf() - ) - verify(telemetryService).trackEvent( - "OffenderEventPublished", - mapOf( - "crn" to "X223456", - "eventType" to "CONVICTION_CHANGED", - "occurredAt" to DateTimeFormatter.ISO_ZONED_DATE_TIME.format(dateTime) - ), - mapOf() - ) - } -}