Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore updates from specified users / enable validation through HV / draft for handling automation events from downstream #57

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ interface JiraProject {
* Allows enabling signature verification.
*/
WebHookSecurity security();

/**
* Allows enabling signature verification of downstream events.
*/
WebHookSecurity downstreamSecurity();
}

interface WebHookSecurity {
Expand All @@ -205,11 +210,20 @@ interface WebHookSecurity {
@WithDefault("false")
boolean enabled();

@WithDefault("SIGNATURE")
Type type();

/**
* The secret used to sing the web hook request body.
* Verification secret, e.g. the secret used to sing the web hook request body.
* Can also be just some token that we will compare. Depends on the security
* type.
*/
@WithDefault("not-a-secret")
String secret();

enum Type {
SIGNATURE, TOKEN
}
}

interface Instance {
Expand Down Expand Up @@ -366,6 +380,17 @@ interface UserValueMapping extends ValueMapping {
* {@link #mapping() mapping}.
*/
Optional<String> profileUrl();

/**
* It may be helpful in some cases to ignore webhook events triggered by some
* users. E.g. the sync user that applies updates upstream can be listed here to
* prevent infinite update loop.
*/
@WithDefault("not-a-user")
Set<String> ignoredUpstreamUsers();

@WithDefault("not-a-user")
Set<String> ignoredDownstreamUsers();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.hibernate.infra.replicate.jira.resource;

import org.hibernate.infra.replicate.jira.service.jira.JiraService;
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
import org.hibernate.infra.replicate.jira.service.validation.ConfiguredProject;

import org.jboss.resteasy.reactive.RestPath;

Expand All @@ -11,6 +13,7 @@
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;

@Path("/jira/webhooks")
Expand All @@ -22,10 +25,20 @@ public class JiraWebHookListenerResource {
@POST
@Path("/{project}")
@Consumes(MediaType.APPLICATION_JSON)
public String somethingHappened(@RestPath @NotNull /* @ConfiguredProject */ String project,
JiraWebHookEvent event) {
Log.infof("Received a notification about %s project: %.200s...", project, event);
jiraService.acknowledge(project, event);
public String somethingHappenedUpstream(@RestPath @NotNull @ConfiguredProject String project,
@QueryParam("triggeredByUser") String triggeredByUser, JiraWebHookEvent event) {
Log.tracef("Received a notification about %s project: %.200s...", project, event);
jiraService.acknowledge(project, event, triggeredByUser);
return "ack";
}

@POST
@Path("/mirror/{project}")
@Consumes(MediaType.APPLICATION_JSON)
public String somethingHappenedDownstream(@RestPath @NotNull @ConfiguredProject String project,
JiraActionEvent data) {
Log.tracef("Received a downstream notification about %s project: %s...", project, data);
jiraService.downstreamAcknowledge(project, data);
return "ack";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ public void startProcessingEvent() throws InterruptedException {
projectGroupContext.startProcessingEvent();
}

public void startProcessingDownstreamEvent() throws InterruptedException {
projectGroupContext.startProcessingDownstreamEvent();
}

public JiraUser notMappedAssignee() {
return notMappedAssignee;
}
Expand Down Expand Up @@ -229,6 +233,14 @@ public void submitTask(Runnable runnable) {
projectGroupContext.submitTask(runnable);
}

public int pendingDownstreamEventsInCurrentContext() {
return projectGroupContext.pendingDownstreamEventsInCurrentContext();
}

public void submitDownstreamTask(Runnable runnable) {
projectGroupContext.submitDownstreamTask(runnable);
}

public Optional<HandlerProjectContext> contextForProjectInSameGroup(String project) {
if (!projectGroup().projects().containsKey(project)) {
// different project group, don't bother
Expand Down Expand Up @@ -347,4 +359,20 @@ private static boolean versionNeedsUpdate(JiraVersion upstreamVersion, JiraVersi
|| upstreamVersion.released != downstreamVersion.released
|| !Objects.equals(upstreamVersion.releaseDate, downstreamVersion.releaseDate);
}

public boolean isUserIgnored(String triggeredByUser) {
return projectGroupContext.projectGroup().users().ignoredUpstreamUsers().contains(triggeredByUser);
}

public boolean isDownstreamUserIgnored(String triggeredByUser) {
return projectGroupContext.projectGroup().users().ignoredDownstreamUsers().contains(triggeredByUser);
}

public String upstreamUser(String mappedValue) {
return projectGroupContext.upstreamUser(mappedValue);
}

public String upstreamStatus(String mappedValue) {
return projectGroupContext.upstreamStatus(mappedValue);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.hibernate.infra.replicate.jira.service.jira;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -17,9 +20,15 @@ public final class HandlerProjectGroupContext implements AutoCloseable {

private final ExecutorService eventHandlingExecutor;
private final Supplier<Integer> workQueueSize;

private final ExecutorService downstreamEventHandlingExecutor;
private final Supplier<Integer> downstreamWorkQueueSize;
private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1);
private final Semaphore rateLimiter;
private final Semaphore downstreamRateLimiter;
private final JiraConfig.JiraProjectGroup projectGroup;
private final Map<String, String> invertedUsers;
private final Map<String, String> invertedStatuses;

public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
this.projectGroup = projectGroup;
Expand All @@ -28,21 +37,44 @@ public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {

final int permits = processing.eventsPerTimeframe();
this.rateLimiter = new Semaphore(permits);
this.downstreamRateLimiter = new Semaphore(permits);
rateLimiterExecutor.scheduleAtFixedRate(() -> {
rateLimiter.drainPermits();
rateLimiter.release(permits);
downstreamRateLimiter.drainPermits();
downstreamRateLimiter.release(permits);
}, processing.timeframeInSeconds(), processing.timeframeInSeconds(), TimeUnit.SECONDS);

LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processing.queueSize());
workQueueSize = workQueue::size;
eventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
TimeUnit.MILLISECONDS, workQueue);

LinkedBlockingDeque<Runnable> downstreamWorkQueue = new LinkedBlockingDeque<>(processing.queueSize());
downstreamWorkQueueSize = downstreamWorkQueue::size;
downstreamEventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
TimeUnit.MILLISECONDS, downstreamWorkQueue);

this.invertedUsers = invert(projectGroup.users().mapping());
this.invertedStatuses = invert(projectGroup.statuses().mapping());
}

private static Map<String, String> invert(Map<String, String> map) {
Map<String, String> result = new HashMap<>();
for (var entry : map.entrySet()) {
result.put(entry.getValue(), entry.getKey());
}
return Collections.unmodifiableMap(result);
}

public void startProcessingEvent() throws InterruptedException {
rateLimiter.acquire(1);
}

public void startProcessingDownstreamEvent() throws InterruptedException {
downstreamRateLimiter.acquire(1);
}

public JiraConfig.JiraProjectGroup projectGroup() {
return projectGroup;
}
Expand All @@ -51,26 +83,48 @@ public int pendingEventsInCurrentContext() {
return workQueueSize.get();
}

public int pendingDownstreamEventsInCurrentContext() {
return downstreamWorkQueueSize.get();
}

public void submitTask(Runnable task) {
eventHandlingExecutor.submit(task);
}

public void submitDownstreamTask(Runnable task) {
downstreamEventHandlingExecutor.submit(task);
}

@Override
public void close() {
// when requesting to close the context we aren't expecting to process any other
// events hence there's no point in continuing "releasing" more "permits":
if (!rateLimiterExecutor.isShutdown()) {
rateLimiterExecutor.shutdownNow();
}
if (!eventHandlingExecutor.isShutdown()) {
closeEventExecutor(eventHandlingExecutor);
closeEventExecutor(downstreamEventHandlingExecutor);
}

private static void closeEventExecutor(ExecutorService executor) {
if (!executor.isShutdown()) {
try {
eventHandlingExecutor.shutdown();
if (!eventHandlingExecutor.awaitTermination(2, TimeUnit.MINUTES)) {
executor.shutdown();
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
Log.warnf("Not all events were processed before the shutdown");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public String upstreamUser(String mappedValue) {
return invertedUsers.get(mappedValue);
}

public String upstreamStatus(String mappedValue) {
return invertedStatuses.get(mappedValue);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueDeleteEventHandler;
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueSimpleUpsertEventHandler;
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueTransitionOnlyEventHandler;
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookIssue;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookIssueLink;
Expand Down Expand Up @@ -46,6 +47,8 @@
@ApplicationScoped
public class JiraService {

private static final String SYSTEM_USER = "94KJcxFzgxZlXyTss4oR0rDNqtjwjhIiZLzYNx0Mwuc=";

private final ReportingConfig reportingConfig;
private final Map<String, HandlerProjectContext> contextPerProject;
private final JiraConfig jiraConfig;
Expand Down Expand Up @@ -297,8 +300,10 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
* {@link JiraConfig.JiraProjectGroup#projects()}
* @param event
* The body of the event posted by the webhook.
* @param triggeredByUser
* The ID of the Jira user that triggered the webhook event.
*/
public void acknowledge(String project, JiraWebHookEvent event) {
public void acknowledge(String project, JiraWebHookEvent event, String triggeredByUser) {
event.eventType().ifPresentOrElse(eventType -> {
var context = contextPerProject.get(project);
if (context == null) {
Expand All @@ -309,12 +314,40 @@ public void acknowledge(String project, JiraWebHookEvent event) {
throw new ConstraintViolationException("Project " + project + " is not configured.", Set.of());
}

if (context.isUserIgnored(triggeredByUser)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this means we will buffer a request body and deserialize the event before we check whether we ignore the request entirely (because of the user).
A potential improvement for later, to reduce the stress on the app, would be to implement this check as a filter instead. This might avoid the deserialization -- though probably not the buffering of the request body.
I say for later, because I'm not sure we even need such an optimization without our current workload :)

Log.infof("Event was triggered by %s user that is in the ignore list: %.200s", triggeredByUser, event);
return;
}

for (Runnable handler : eventType.handlers(reportingConfig, event, context)) {
context.submitTask(handler);
}
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.webhookEvent));
}

public void downstreamAcknowledge(String project, JiraActionEvent event) {
event.eventType().ifPresentOrElse(eventType -> {
var context = contextPerProject.get(project);
if (context == null) {
FailureCollector failureCollector = FailureCollector.collector(reportingConfig);
failureCollector.critical("Unable to determine handler context for project %s. Was it not configured ?"
.formatted(project));
failureCollector.close();
throw new ConstraintViolationException("Project " + project + " is not configured.", Set.of());
}

if (context.isDownstreamUserIgnored(event.triggeredByUser)) {
Log.infof("Event was triggered by %s user that is in the ignore list.", event.triggeredByUser);
return;
}

for (Runnable handler : eventType.handlers(reportingConfig, event, context)) {
context.submitDownstreamTask(handler);
}
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.event));

}

public void syncLastUpdated(String projectGroup) {
try (FailureCollector failureCollector = FailureCollector.collector(reportingConfig)) {
Log.infof("Starting scheduled sync of issues for the project group %s", projectGroup);
Expand Down Expand Up @@ -381,7 +414,7 @@ private void triggerSyncEvent(JiraIssue jiraIssue, HandlerProjectContext context
event.issue = issue;

String projectKey = Objects.toString(jiraIssue.fields.project.properties().get("key"));
acknowledge(projectKey, event);
acknowledge(projectKey, event, SYSTEM_USER);

// now sync comments:
if (jiraIssue.fields.comment != null && jiraIssue.fields.comment.comments != null) {
Expand All @@ -401,7 +434,7 @@ private void triggerSyncEvent(JiraIssue jiraIssue, HandlerProjectContext context
event.issueLink = new JiraWebHookIssueLink();
event.issueLink.id = Long.parseLong(link.id);

acknowledge(projectKey, event);
acknowledge(projectKey, event, SYSTEM_USER);
}
}
}
Expand All @@ -413,7 +446,7 @@ private void triggerCommentSyncEvents(String projectKey, JiraWebHookIssue issue,
event.comment.id = Long.parseLong(comment.id);
event.issue = issue;
event.webhookEvent = JiraWebhookEventType.COMMENT_UPDATED.getName();
acknowledge(projectKey, event);
acknowledge(projectKey, event, SYSTEM_USER);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
* version (included), e.g.:
*/
// so that we do not spam with all notifications ...
@ClientQueryParam(name = "notifyUsers", value = "false")
// since `notifyUsers=false` does not apply to all requests and we've disabled notifications downstream
// this query param is not sent anymore to allow automation updating upstream issues to work with a non-admin user.
// @ClientQueryParam(name = "notifyUsers", value = "false")
public interface JiraRestClient {

@GET
Expand Down
Loading
Loading