Skip to content

Commit

Permalink
Add handling of downstream automation events
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Nov 29, 2024
1 parent 9a1ece3 commit 224ca18
Show file tree
Hide file tree
Showing 15 changed files with 388 additions and 38 deletions.
19 changes: 18 additions & 1 deletion src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ interface JiraProject {
* Allows enabling signature verification.
*/
WebHookSecurity security();

/**
* Allows enabling signature verification of downstream events.
*/
WebHookSecurity downstreamSecurity();
}

interface WebHookSecurity {
Expand All @@ -205,11 +210,20 @@ interface WebHookSecurity {
@WithDefault("false")
boolean enabled();

@WithDefault("SIGNATURE")
Type type();

/**
* The secret used to sing the web hook request body.
* Verification secret, e.g. the secret used to sing the web hook request body.
* Can also be just some token that we will compare. Depends on the security
* type.
*/
@WithDefault("not-a-secret")
String secret();

enum Type {
SIGNATURE, TOKEN
}
}

interface Instance {
Expand Down Expand Up @@ -374,6 +388,9 @@ interface UserValueMapping extends ValueMapping {
*/
@WithDefault("not-a-user")
Set<String> ignoredUpstreamUsers();

@WithDefault("not-a-user")
Set<String> ignoredDownstreamUsers();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.hibernate.infra.replicate.jira.resource;

import org.hibernate.infra.replicate.jira.service.jira.JiraService;
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
import org.hibernate.infra.replicate.jira.service.validation.ConfiguredProject;

Expand All @@ -26,8 +27,18 @@ public class JiraWebHookListenerResource {
@Consumes(MediaType.APPLICATION_JSON)
public String somethingHappenedUpstream(@RestPath @NotNull @ConfiguredProject String project,
@QueryParam("triggeredByUser") String triggeredByUser, JiraWebHookEvent event) {
Log.infof("Received a notification about %s project: %.200s...", project, event);
Log.tracef("Received a notification about %s project: %.200s...", project, event);
jiraService.acknowledge(project, event, triggeredByUser);
return "ack";
}

@POST
@Path("/mirror/{project}")
@Consumes(MediaType.APPLICATION_JSON)
public String somethingHappenedDownstream(@RestPath @NotNull @ConfiguredProject(upstream = false) String project,
JiraActionEvent data) {
Log.tracef("Received a downstream notification about %s project: %s...", project, data);
jiraService.downstreamAcknowledge(project, data);
return "ack";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ public void startProcessingEvent() throws InterruptedException {
projectGroupContext.startProcessingEvent();
}

public void startProcessingDownstreamEvent() throws InterruptedException {
projectGroupContext.startProcessingDownstreamEvent();
}

public JiraUser notMappedAssignee() {
return notMappedAssignee;
}
Expand Down Expand Up @@ -229,6 +233,14 @@ public void submitTask(Runnable runnable) {
projectGroupContext.submitTask(runnable);
}

public int pendingDownstreamEventsInCurrentContext() {
return projectGroupContext.pendingDownstreamEventsInCurrentContext();
}

public void submitDownstreamTask(Runnable runnable) {
projectGroupContext.submitDownstreamTask(runnable);
}

public Optional<HandlerProjectContext> contextForProjectInSameGroup(String project) {
if (!projectGroup().projects().containsKey(project)) {
// different project group, don't bother
Expand Down Expand Up @@ -351,4 +363,12 @@ private static boolean versionNeedsUpdate(JiraVersion upstreamVersion, JiraVersi
public boolean isUserIgnored(String triggeredByUser) {
return projectGroupContext.projectGroup().users().ignoredUpstreamUsers().contains(triggeredByUser);
}

public boolean isDownstreamUserIgnored(String triggeredByUser) {
return projectGroupContext.projectGroup().users().ignoredDownstreamUsers().contains(triggeredByUser);
}

public String upstreamUser(String mappedValue) {
return projectGroupContext.upstreamUser(mappedValue);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.hibernate.infra.replicate.jira.service.jira;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -17,9 +20,14 @@ public final class HandlerProjectGroupContext implements AutoCloseable {

private final ExecutorService eventHandlingExecutor;
private final Supplier<Integer> workQueueSize;

private final ExecutorService downstreamEventHandlingExecutor;
private final Supplier<Integer> downstreamWorkQueueSize;
private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1);
private final Semaphore rateLimiter;
private final Semaphore downstreamRateLimiter;
private final JiraConfig.JiraProjectGroup projectGroup;
private final Map<String, String> invertedUsers;

public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
this.projectGroup = projectGroup;
Expand All @@ -28,21 +36,39 @@ public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {

final int permits = processing.eventsPerTimeframe();
this.rateLimiter = new Semaphore(permits);
this.downstreamRateLimiter = new Semaphore(permits);
rateLimiterExecutor.scheduleAtFixedRate(() -> {
rateLimiter.drainPermits();
rateLimiter.release(permits);
downstreamRateLimiter.drainPermits();
downstreamRateLimiter.release(permits);
}, processing.timeframeInSeconds(), processing.timeframeInSeconds(), TimeUnit.SECONDS);

LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processing.queueSize());
workQueueSize = workQueue::size;
eventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
TimeUnit.MILLISECONDS, workQueue);

LinkedBlockingDeque<Runnable> downstreamWorkQueue = new LinkedBlockingDeque<>(processing.queueSize());
downstreamWorkQueueSize = downstreamWorkQueue::size;
downstreamEventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
TimeUnit.MILLISECONDS, downstreamWorkQueue);

Map<String, String> invertedUsers = new HashMap<>();
for (var entry : projectGroup.users().mapping().entrySet()) {
invertedUsers.put(entry.getValue(), entry.getKey());
}
this.invertedUsers = Collections.unmodifiableMap(invertedUsers);
}

public void startProcessingEvent() throws InterruptedException {
rateLimiter.acquire(1);
}

public void startProcessingDownstreamEvent() throws InterruptedException {
downstreamRateLimiter.acquire(1);
}

public JiraConfig.JiraProjectGroup projectGroup() {
return projectGroup;
}
Expand All @@ -51,26 +77,43 @@ public int pendingEventsInCurrentContext() {
return workQueueSize.get();
}

public int pendingDownstreamEventsInCurrentContext() {
return downstreamWorkQueueSize.get();
}

public void submitTask(Runnable task) {
eventHandlingExecutor.submit(task);
}

public void submitDownstreamTask(Runnable task) {
downstreamEventHandlingExecutor.submit(task);
}

@Override
public void close() {
// when requesting to close the context we aren't expecting to process any other
// events hence there's no point in continuing "releasing" more "permits":
if (!rateLimiterExecutor.isShutdown()) {
rateLimiterExecutor.shutdownNow();
}
if (!eventHandlingExecutor.isShutdown()) {
closeEventExecutor(eventHandlingExecutor);
closeEventExecutor(downstreamEventHandlingExecutor);
}

private static void closeEventExecutor(ExecutorService executor) {
if (!executor.isShutdown()) {
try {
eventHandlingExecutor.shutdown();
if (!eventHandlingExecutor.awaitTermination(2, TimeUnit.MINUTES)) {
executor.shutdown();
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
Log.warnf("Not all events were processed before the shutdown");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public String upstreamUser(String mappedValue) {
return invertedUsers.get(mappedValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueDeleteEventHandler;
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueSimpleUpsertEventHandler;
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueTransitionOnlyEventHandler;
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookIssue;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookIssueLink;
Expand Down Expand Up @@ -322,6 +323,29 @@ public void acknowledge(String project, JiraWebHookEvent event, String triggered
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.webhookEvent));
}

public void downstreamAcknowledge(String project, JiraActionEvent event) {
event.eventType().ifPresentOrElse(eventType -> {
var context = contextPerProject.get(project);
if (context == null) {
FailureCollector failureCollector = FailureCollector.collector(reportingConfig);
failureCollector.critical("Unable to determine handler context for project %s. Was it not configured ?"
.formatted(project));
failureCollector.close();
throw new ConstraintViolationException("Project " + project + " is not configured.", Set.of());
}

if (context.isDownstreamUserIgnored(event.triggeredByUser)) {
Log.infof("Event was triggered by %s user that is in the ignore list.", event.triggeredByUser);
return;
}

for (Runnable handler : eventType.handlers(reportingConfig, event, context)) {
context.submitDownstreamTask(handler);
}
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.event));

}

public void syncLastUpdated(String projectGroup) {
try (FailureCollector failureCollector = FailureCollector.collector(reportingConfig)) {
Log.infof("Starting scheduled sync of issues for the project group %s", projectGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public final void run() {
context.startProcessingEvent();
doRun();
} catch (RuntimeException e) {
failureCollector.critical("Failed to handled the event: %s".formatted(this), e);
failureCollector.critical("Failed to handle the event: %s".formatted(this), e);
} catch (InterruptedException e) {
failureCollector.critical("Interrupted while waiting in the queue", e);
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.hibernate.infra.replicate.jira.service.jira.handler.action;

import org.hibernate.infra.replicate.jira.service.jira.HandlerProjectContext;
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
import org.hibernate.infra.replicate.jira.service.jira.model.rest.JiraIssue;
import org.hibernate.infra.replicate.jira.service.reporting.FailureCollector;
import org.hibernate.infra.replicate.jira.service.reporting.ReportingConfig;

import io.quarkus.logging.Log;

public abstract class JiraActionEventHandler implements Runnable {

protected final JiraActionEvent event;
protected final FailureCollector failureCollector;
protected final HandlerProjectContext context;

protected JiraActionEventHandler(ReportingConfig reportingConfig, HandlerProjectContext context,
JiraActionEvent event) {
this.event = event;
this.failureCollector = FailureCollector.collector(reportingConfig);
this.context = context;
}

@Override
public final void run() {
try {
context.startProcessingDownstreamEvent();
doRun();
} catch (RuntimeException e) {
failureCollector.critical("Failed to handle the event: %s".formatted(this), e);
} catch (InterruptedException e) {
failureCollector.critical("Interrupted while waiting in the queue", e);
Thread.currentThread().interrupt();
} finally {
failureCollector.close();
Log.infof("Finished processing %s. Pending events in %s to process: %s", this.toString(),
context.projectGroupName(), context.pendingDownstreamEventsInCurrentContext());
}
}

protected String toSourceKey(String key) {
return "%s-%d".formatted(context.project().originalProjectKey(), JiraIssue.keyToLong(key));
}

protected abstract void doRun();

public abstract String toString();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.hibernate.infra.replicate.jira.service.jira.handler.action;

import org.hibernate.infra.replicate.jira.service.jira.HandlerProjectContext;
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
import org.hibernate.infra.replicate.jira.service.jira.model.rest.JiraFields;
import org.hibernate.infra.replicate.jira.service.jira.model.rest.JiraIssue;
import org.hibernate.infra.replicate.jira.service.jira.model.rest.JiraUser;
import org.hibernate.infra.replicate.jira.service.reporting.ReportingConfig;

public class JiraAssigneeActionEventHandler extends JiraActionEventHandler {

public JiraAssigneeActionEventHandler(ReportingConfig reportingConfig, HandlerProjectContext context,
JiraActionEvent event) {
super(reportingConfig, context, event);
}

@Override
protected void doRun() {
JiraIssue issue = context.destinationJiraClient().getIssue(event.key);

JiraIssue updated = new JiraIssue();
updated.fields = JiraFields.empty();
if (issue.fields.assignee != null) {
String accountId = context.upstreamUser(
issue.fields.assignee.mappedIdentifier(context.projectGroup().users().mappedPropertyName()));

if (accountId != null) {
updated.fields.assignee = new JiraUser(accountId);

}
} else {
updated.fields.assignee = new JiraUser("-1");
}
context.sourceJiraClient().update(toSourceKey(event.key), updated);
}

@Override
public String toString() {
return "JiraAssigneeActionEventHandler[" + "event=" + event + ", project=" + context.projectName() + ']';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.hibernate.infra.replicate.jira.service.jira.model.action;

import java.util.Optional;

import org.hibernate.infra.replicate.jira.service.jira.model.JiraBaseObject;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraActionEventType;

public class JiraActionEvent extends JiraBaseObject {
public String id;
public String key;
public String event;
public String assignee;
public String status;

public String triggeredByUser;

public Optional<JiraActionEventType> eventType() {
return JiraActionEventType.of(event);
}

@Override
public String toString() {
return "JiraActionEvent{" + "id='" + id + '\'' + ", key='" + key + '\'' + ", event='" + event + '\''
+ ", assignee='" + assignee + '\'' + ", status='" + status + '\'' + '}';
}
}
Loading

0 comments on commit 224ca18

Please sign in to comment.