diff --git a/data-index/data-index-common/src/test/java/org/kie/kogito/index/TestUtils.java b/data-index/data-index-common/src/test/java/org/kie/kogito/index/TestUtils.java index 103b0568fa..b98819762e 100644 --- a/data-index/data-index-common/src/test/java/org/kie/kogito/index/TestUtils.java +++ b/data-index/data-index-common/src/test/java/org/kie/kogito/index/TestUtils.java @@ -132,7 +132,7 @@ public static ProcessInstanceDataEvent getProcessCloudEvent(String processId, St .build() : null) .build(); - return new ProcessInstanceDataEvent(URI.create("http://localhost:8080/" + processId).toString(), "jobs-management,prometheus-monitoring,process-management", body.metaData(), body); + return new ProcessInstanceDataEvent(URI.create("http://localhost:8080/" + processId).toString(), "jobs-management,prometheus-monitoring,process-management", null, body.metaData(), body); } public static ProcessInstance getProcessInstance(String processId, String processInstanceId, Integer status, String rootProcessInstanceId, String rootProcessId) { @@ -229,7 +229,7 @@ public static UserTaskInstanceDataEvent getUserTaskCloudEvent(String taskId, Str .outputs(emptyMap()) .build(); - return new UserTaskInstanceDataEvent(URI.create("http://localhost:8080/" + processId).toString(), null, body.metaData(), body); + return new UserTaskInstanceDataEvent(URI.create("http://localhost:8080/" + processId).toString(), null, null, body.metaData(), body); } public static AttachmentEventBody getTaskAttachment(String id, String user, String name, String content) { diff --git a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/DataIndexEventPublisherTest.java b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/DataIndexEventPublisherTest.java index 7afb99e1f7..c5061a1db9 100644 --- a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/DataIndexEventPublisherTest.java +++ b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/DataIndexEventPublisherTest.java @@ -130,7 +130,7 @@ public TestingDataEvent(String type, String kogitoProcessId, String kogitoRootProcessId) { super(type, source, data, kogitoProcessInstanceId, kogitoRootProcessInstanceId, kogitoProcessId, - kogitoRootProcessId, null); + kogitoRootProcessId, null, null); } } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/events/JobDataEvent.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/events/JobDataEvent.java index 3cf1fcbdcb..5000e29f21 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/events/JobDataEvent.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/events/JobDataEvent.java @@ -28,7 +28,7 @@ public class JobDataEvent extends AbstractDataEvent { public static final String JOB_EVENT_TYPE = "JobEvent"; - public JobDataEvent(String source, ScheduledJob data) { + public JobDataEvent(String source, String identity, ScheduledJob data) { super(JOB_EVENT_TYPE, source, data, @@ -36,7 +36,8 @@ public JobDataEvent(String source, ScheduledJob data) { data.getRootProcessInstanceId(), data.getProcessId(), data.getRootProcessId(), - null); + null, + identity); } @JsonIgnore @@ -49,19 +50,25 @@ public static class JobDataEventBuilder { private String source; private ScheduledJob data; + private String identity; public JobDataEventBuilder source(String source) { this.source = source; return this; } + public JobDataEventBuilder identity(String identity) { + this.identity = identity; + return this; + } + public JobDataEventBuilder data(ScheduledJob data) { this.data = data; return this; } public JobDataEvent build() { - return new JobDataEvent(source, data); + return new JobDataEvent(source, identity, data); } } } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java index 9c364fa7bc..87709e9c20 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import io.quarkus.security.identity.SecurityIdentity; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; public abstract class AbstractJobStreams { @@ -44,14 +45,17 @@ public abstract class AbstractJobStreams { protected String url; + protected SecurityIdentity identity; + protected AbstractJobStreams() { } - protected AbstractJobStreams(ObjectMapper objectMapper, boolean enabled, Emitter emitter, String url) { + protected AbstractJobStreams(ObjectMapper objectMapper, boolean enabled, Emitter emitter, String url, SecurityIdentity identity) { this.objectMapper = objectMapper; this.enabled = enabled; this.emitter = emitter; this.url = url; + this.identity = identity; } protected void jobStatusChange(JobDetails job) { @@ -60,6 +64,7 @@ protected void jobStatusChange(JobDetails job) { JobDataEvent event = JobDataEvent .builder() .source(url + RestApiConstants.JOBS_PATH) + .identity(identity.isAnonymous() ? null : identity.getPrincipal().getName()) .data(ScheduledJobAdapter.of(job))//this should support jobs crated with V1 and V2 .build(); String json = objectMapper.writeValueAsString(event); diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KafkaJobStreams.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KafkaJobStreams.java index 96fcb8d5b1..8e871fed27 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KafkaJobStreams.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KafkaJobStreams.java @@ -30,6 +30,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import io.quarkus.security.identity.SecurityIdentity; + @ApplicationScoped public class KafkaJobStreams extends AbstractJobStreams { @@ -39,8 +41,9 @@ public class KafkaJobStreams extends AbstractJobStreams { public KafkaJobStreams(ObjectMapper objectMapper, @ConfigProperty(name = PUBLISH_EVENTS_CONFIG_KEY) Optional config, @Channel(AvailableStreams.JOB_STATUS_CHANGE_EVENTS_TOPIC) @OnOverflow(value = OnOverflow.Strategy.LATEST) Emitter emitter, - @ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url) { - super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url); + @ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url, + SecurityIdentity identity) { + super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url, identity); } @Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS) diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KnativeJobStreams.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KnativeJobStreams.java index 6e28819e2e..5839ec0e73 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KnativeJobStreams.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/KnativeJobStreams.java @@ -36,6 +36,7 @@ import io.cloudevents.jackson.JsonFormat; import io.quarkus.reactivemessaging.http.runtime.OutgoingHttpMetadata; +import io.quarkus.security.identity.SecurityIdentity; @ApplicationScoped public class KnativeJobStreams extends AbstractJobStreams { @@ -54,8 +55,9 @@ public class KnativeJobStreams extends AbstractJobStreams { public KnativeJobStreams(ObjectMapper objectMapper, @ConfigProperty(name = PUBLISH_EVENTS_CONFIG_KEY) Optional config, @Channel(JOB_STATUS_CHANGE_EVENTS_KNATIVE) @OnOverflow(value = OnOverflow.Strategy.LATEST) Emitter emitter, - @ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url) { - super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url); + @ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url, + SecurityIdentity identity) { + super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url, identity); } @Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS) diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java index 81ffa59705..18f88b54eb 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java @@ -40,6 +40,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import io.quarkus.security.identity.SecurityIdentity; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -47,6 +49,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) abstract class AbstractJobStreamsTest { @@ -71,6 +74,9 @@ abstract class AbstractJobStreamsTest { @Mock ObjectMapper objectMapper; + @Mock + SecurityIdentity identity; + @Captor ArgumentCaptor eventCaptor; @@ -82,6 +88,7 @@ abstract class AbstractJobStreamsTest { @BeforeEach void setUp() { jobStreams = spy(createJobStreams()); + when(identity.isAnonymous()).thenReturn(true); } protected abstract T createJobStreams(); diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KafkaJobStreamsTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KafkaJobStreamsTest.java index d073a15b25..bee3b33e20 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KafkaJobStreamsTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KafkaJobStreamsTest.java @@ -22,6 +22,6 @@ class KafkaJobStreamsTest extends AbstractJobStreamsTest { @Override protected KafkaJobStreams createJobStreams() { - return new KafkaJobStreams(objectMapper, Optional.of("true"), emitter, URL); + return new KafkaJobStreams(objectMapper, Optional.of("true"), emitter, URL, identity); } } diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KnativeJobStreamsTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KnativeJobStreamsTest.java index 796f87bf26..6dde05f01e 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KnativeJobStreamsTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/KnativeJobStreamsTest.java @@ -31,7 +31,7 @@ class KnativeJobStreamsTest extends AbstractJobStreamsTest { @Override protected KnativeJobStreams createJobStreams() { - return new KnativeJobStreams(objectMapper, Optional.of("true"), emitter, URL); + return new KnativeJobStreams(objectMapper, Optional.of("true"), emitter, URL, identity); } @Override diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java index dc848c0fd6..14a9201c1e 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java @@ -36,6 +36,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import io.quarkus.security.identity.SecurityIdentity; import io.smallrye.reactive.messaging.annotations.Blocking; import static org.kie.kogito.jobs.service.events.JobDataEvent.JOB_EVENT_TYPE; @@ -57,16 +58,20 @@ public class EventPublisherJobStreams { private final ObjectMapper objectMapper; + private final SecurityIdentity identity; + @Inject public EventPublisherJobStreams(@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url, Instance eventPublishers, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, + SecurityIdentity identity) { this.url = url; eventPublisher = eventPublishers.stream() .filter(publisher -> publisher.getClass().getName().startsWith(DATA_INDEX_EVENT_PUBLISHER)) .findFirst() .orElse(null); this.objectMapper = objectMapper; + this.identity = identity; } @Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS) @@ -87,7 +92,8 @@ public void onJobStatusChange(JobDetails jobDetails) { scheduledJob.getProcessInstanceId(), scheduledJob.getRootProcessInstanceId(), scheduledJob.getProcessId(), - scheduledJob.getRootProcessId()); + scheduledJob.getRootProcessId(), + identity.isAnonymous() ? null : identity.getPrincipal().getName()); try { eventPublisher.publish(event); } catch (Exception e) { @@ -103,9 +109,10 @@ public EventPublisherJobDataEvent(String type, String kogitoProcessInstanceId, String kogitoRootProcessInstanceId, String kogitoProcessId, - String kogitoRootProcessId) { + String kogitoRootProcessId, + String kogitoIdentity) { super(type, source, data, kogitoProcessInstanceId, kogitoRootProcessInstanceId, kogitoProcessId, - kogitoRootProcessId, null); + kogitoRootProcessId, null, kogitoIdentity); } } } diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java index 361aa9932e..d172f4f8f3 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java @@ -40,6 +40,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.quarkus.security.identity.SecurityIdentity; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.anyCollection; import static org.mockito.Mockito.doReturn; @@ -47,6 +49,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; class EventPublisherJobStreamsTest { @@ -94,7 +97,9 @@ void onJobStatusChange() throws Exception { .registerModule(new JavaTimeModule()) .disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - EventPublisherJobStreams eventPublisherJobStreams = new EventPublisherJobStreams(URL, eventPublisherInstance, objectMapper); + SecurityIdentity identity = mock(SecurityIdentity.class); + when(identity.isAnonymous()).thenReturn(true); + EventPublisherJobStreams eventPublisherJobStreams = new EventPublisherJobStreams(URL, eventPublisherInstance, objectMapper, identity); JobDetails jobDetails = buildJobDetails(); eventPublisherJobStreams.onJobStatusChange(jobDetails);