Skip to content

Commit

Permalink
KOGITO-9611 - Track user identity in Kogito event
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianonicolai committed Jul 21, 2023
1 parent 5acc68c commit 959321e
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static ProcessInstanceDataEvent getProcessCloudEvent(String processId, St
.build() : null)
.build();

return new ProcessInstanceDataEvent(URI.create("http://localhost:8080/" + processId).toString(), "jobs-management,prometheus-monitoring,process-management", body.metaData(), body);
return new ProcessInstanceDataEvent(URI.create("http://localhost:8080/" + processId).toString(), "jobs-management,prometheus-monitoring,process-management", null, body.metaData(), body);
}

public static ProcessInstance getProcessInstance(String processId, String processInstanceId, Integer status, String rootProcessInstanceId, String rootProcessId) {
Expand Down Expand Up @@ -229,7 +229,7 @@ public static UserTaskInstanceDataEvent getUserTaskCloudEvent(String taskId, Str
.outputs(emptyMap())
.build();

return new UserTaskInstanceDataEvent(URI.create("http://localhost:8080/" + processId).toString(), null, body.metaData(), body);
return new UserTaskInstanceDataEvent(URI.create("http://localhost:8080/" + processId).toString(), null, null, body.metaData(), body);
}

public static AttachmentEventBody getTaskAttachment(String id, String user, String name, String content) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public TestingDataEvent(String type,
String kogitoProcessId,
String kogitoRootProcessId) {
super(type, source, data, kogitoProcessInstanceId, kogitoRootProcessInstanceId, kogitoProcessId,
kogitoRootProcessId, null);
kogitoRootProcessId, null, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ public class JobDataEvent extends AbstractDataEvent<ScheduledJob> {

public static final String JOB_EVENT_TYPE = "JobEvent";

public JobDataEvent(String source, ScheduledJob data) {
public JobDataEvent(String source, String identity, ScheduledJob data) {
super(JOB_EVENT_TYPE,
source,
data,
data.getProcessInstanceId(),
data.getRootProcessInstanceId(),
data.getProcessId(),
data.getRootProcessId(),
null);
null,
identity);
}

@JsonIgnore
Expand All @@ -49,19 +50,25 @@ public static class JobDataEventBuilder {

private String source;
private ScheduledJob data;
private String identity;

public JobDataEventBuilder source(String source) {
this.source = source;
return this;
}

public JobDataEventBuilder identity(String identity) {
this.identity = identity;
return this;
}

public JobDataEventBuilder data(ScheduledJob data) {
this.data = data;
return this;
}

public JobDataEvent build() {
return new JobDataEvent(source, data);
return new JobDataEvent(source, identity, data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public abstract class AbstractJobStreams {
Expand All @@ -44,14 +45,17 @@ public abstract class AbstractJobStreams {

protected String url;

protected SecurityIdentity identity;

protected AbstractJobStreams() {
}

protected AbstractJobStreams(ObjectMapper objectMapper, boolean enabled, Emitter<String> emitter, String url) {
protected AbstractJobStreams(ObjectMapper objectMapper, boolean enabled, Emitter<String> emitter, String url, SecurityIdentity identity) {
this.objectMapper = objectMapper;
this.enabled = enabled;
this.emitter = emitter;
this.url = url;
this.identity = identity;
}

protected void jobStatusChange(JobDetails job) {
Expand All @@ -60,6 +64,7 @@ protected void jobStatusChange(JobDetails job) {
JobDataEvent event = JobDataEvent
.builder()
.source(url + RestApiConstants.JOBS_PATH)
.identity(identity.isAnonymous() ? null : identity.getPrincipal().getName())
.data(ScheduledJobAdapter.of(job))//this should support jobs crated with V1 and V2
.build();
String json = objectMapper.writeValueAsString(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.security.identity.SecurityIdentity;

@ApplicationScoped
public class KafkaJobStreams extends AbstractJobStreams {

Expand All @@ -39,8 +41,9 @@ public class KafkaJobStreams extends AbstractJobStreams {
public KafkaJobStreams(ObjectMapper objectMapper,
@ConfigProperty(name = PUBLISH_EVENTS_CONFIG_KEY) Optional<String> config,
@Channel(AvailableStreams.JOB_STATUS_CHANGE_EVENTS_TOPIC) @OnOverflow(value = OnOverflow.Strategy.LATEST) Emitter<String> emitter,
@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url) {
super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url);
@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url,
SecurityIdentity identity) {
super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url, identity);
}

@Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import io.cloudevents.jackson.JsonFormat;
import io.quarkus.reactivemessaging.http.runtime.OutgoingHttpMetadata;
import io.quarkus.security.identity.SecurityIdentity;

@ApplicationScoped
public class KnativeJobStreams extends AbstractJobStreams {
Expand All @@ -54,8 +55,9 @@ public class KnativeJobStreams extends AbstractJobStreams {
public KnativeJobStreams(ObjectMapper objectMapper,
@ConfigProperty(name = PUBLISH_EVENTS_CONFIG_KEY) Optional<String> config,
@Channel(JOB_STATUS_CHANGE_EVENTS_KNATIVE) @OnOverflow(value = OnOverflow.Strategy.LATEST) Emitter<String> emitter,
@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url) {
super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url);
@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url,
SecurityIdentity identity) {
super(objectMapper, config.map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElse(false), emitter, url, identity);
}

@Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.security.identity.SecurityIdentity;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
abstract class AbstractJobStreamsTest<T extends AbstractJobStreams> {
Expand All @@ -71,6 +74,9 @@ abstract class AbstractJobStreamsTest<T extends AbstractJobStreams> {
@Mock
ObjectMapper objectMapper;

@Mock
SecurityIdentity identity;

@Captor
ArgumentCaptor<Object> eventCaptor;

Expand All @@ -82,6 +88,7 @@ abstract class AbstractJobStreamsTest<T extends AbstractJobStreams> {
@BeforeEach
void setUp() {
jobStreams = spy(createJobStreams());
when(identity.isAnonymous()).thenReturn(true);
}

protected abstract T createJobStreams();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ class KafkaJobStreamsTest extends AbstractJobStreamsTest<KafkaJobStreams> {

@Override
protected KafkaJobStreams createJobStreams() {
return new KafkaJobStreams(objectMapper, Optional.of("true"), emitter, URL);
return new KafkaJobStreams(objectMapper, Optional.of("true"), emitter, URL, identity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class KnativeJobStreamsTest extends AbstractJobStreamsTest<KnativeJobStreams> {

@Override
protected KnativeJobStreams createJobStreams() {
return new KnativeJobStreams(objectMapper, Optional.of("true"), emitter, URL);
return new KnativeJobStreams(objectMapper, Optional.of("true"), emitter, URL, identity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.reactive.messaging.annotations.Blocking;

import static org.kie.kogito.jobs.service.events.JobDataEvent.JOB_EVENT_TYPE;
Expand All @@ -57,16 +58,20 @@ public class EventPublisherJobStreams {

private final ObjectMapper objectMapper;

private final SecurityIdentity identity;

@Inject
public EventPublisherJobStreams(@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url,
Instance<EventPublisher> eventPublishers,
ObjectMapper objectMapper) {
ObjectMapper objectMapper,
SecurityIdentity identity) {
this.url = url;
eventPublisher = eventPublishers.stream()
.filter(publisher -> publisher.getClass().getName().startsWith(DATA_INDEX_EVENT_PUBLISHER))
.findFirst()
.orElse(null);
this.objectMapper = objectMapper;
this.identity = identity;
}

@Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS)
Expand All @@ -87,7 +92,8 @@ public void onJobStatusChange(JobDetails jobDetails) {
scheduledJob.getProcessInstanceId(),
scheduledJob.getRootProcessInstanceId(),
scheduledJob.getProcessId(),
scheduledJob.getRootProcessId());
scheduledJob.getRootProcessId(),
identity.isAnonymous() ? null : identity.getPrincipal().getName());
try {
eventPublisher.publish(event);
} catch (Exception e) {
Expand All @@ -103,9 +109,10 @@ public EventPublisherJobDataEvent(String type,
String kogitoProcessInstanceId,
String kogitoRootProcessInstanceId,
String kogitoProcessId,
String kogitoRootProcessId) {
String kogitoRootProcessId,
String kogitoIdentity) {
super(type, source, data, kogitoProcessInstanceId, kogitoRootProcessInstanceId, kogitoProcessId,
kogitoRootProcessId, null);
kogitoRootProcessId, null, kogitoIdentity);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import io.quarkus.security.identity.SecurityIdentity;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.anyCollection;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class EventPublisherJobStreamsTest {

Expand Down Expand Up @@ -94,7 +97,9 @@ void onJobStatusChange() throws Exception {
.registerModule(new JavaTimeModule())
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);

EventPublisherJobStreams eventPublisherJobStreams = new EventPublisherJobStreams(URL, eventPublisherInstance, objectMapper);
SecurityIdentity identity = mock(SecurityIdentity.class);
when(identity.isAnonymous()).thenReturn(true);
EventPublisherJobStreams eventPublisherJobStreams = new EventPublisherJobStreams(URL, eventPublisherInstance, objectMapper, identity);

JobDetails jobDetails = buildJobDetails();
eventPublisherJobStreams.onJobStatusChange(jobDetails);
Expand Down

0 comments on commit 959321e

Please sign in to comment.