Skip to content

Commit

Permalink
feat(inbound)!: message acknowledgement config (#2333)
Browse files Browse the repository at this point in the history
* feat(inbound): message acknowledgement config

* rework the error handling strategy in correlation response

* apply review suggestions

* resolve conflicts
  • Loading branch information
chillleader authored May 21, 2024
1 parent e279160 commit 61b48fe
Show file tree
Hide file tree
Showing 43 changed files with 734 additions and 346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public class Keywords {
*/
public static final String ACTIVATION_CONDITION_KEYWORD = "activationCondition";

/**
* The keyword that defines whether connector should reject or consume events that did not lead to
* a successful activation due to unmatched activation condition.
*
* <p>This value only exists for inbound Connectors and comes from the extension properties of a
* BPMN element.
*/
public static final String CONSUME_UNMATCHED_EVENTS_KEYWORD = "consumeUnmatchedEvents";

@Deprecated
public static final String DEPRECATED_ACTIVATION_CONDITION_KEYWORD =
"inbound.activationCondition";
Expand Down Expand Up @@ -118,19 +127,35 @@ public enum DeduplicationMode {

public static final String DEDUPLICATION_ID_KEYWORD = "deduplicationId";

public static final Set<String> ALL_KEYWORDS =
/**
* Properties that are handled by the connector runtime and should not be passed to the inbound
* connector along with the properties defined by the connector.
*/
public static final Set<String> INBOUND_RUNTIME_PROPERTIES =
Set.of(
RESULT_VARIABLE_KEYWORD,
RESULT_EXPRESSION_KEYWORD,
ERROR_EXPRESSION_KEYWORD,
INBOUND_TYPE_KEYWORD,
DEDUPLICATION_MODE_KEYWORD,
DEDUPLICATION_ID_KEYWORD,
MESSAGE_ID_EXPRESSION,
CORRELATION_KEY_EXPRESSION_KEYWORD,
DEPRECATED_ACTIVATION_CONDITION_KEYWORD,
ACTIVATION_CONDITION_KEYWORD,
CONSUME_UNMATCHED_EVENTS_KEYWORD,
MESSAGE_TTL);

/**
* Subset of {@link #INBOUND_RUNTIME_PROPERTIES} that should not be used for connector
* deduplication
*/
public static final Set<String> PROPERTIES_EXCLUDED_FROM_DEDUPLICATION =
Set.of(
INBOUND_TYPE_KEYWORD,
RETRY_BACKOFF_KEYWORD,
DEDUPLICATION_MODE_KEYWORD,
DEDUPLICATION_ID_KEYWORD,
MESSAGE_ID_EXPRESSION,
CORRELATION_KEY_EXPRESSION_KEYWORD,
DEPRECATED_ACTIVATION_CONDITION_KEYWORD,
ACTIVATION_CONDITION_KEYWORD,
MESSAGE_TTL,
CORRELATION_REQUIRED_KEYWORD,
DEDUPLICATION_MODE_MANUAL_FLAG_KEYWORD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.EvictingQueue;
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.CorrelationResult.Failure.ActivationConditionNotMet;
import io.camunda.connector.api.inbound.CorrelationResult.Failure.Other;
import io.camunda.connector.api.inbound.CorrelationResult.Success;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorDefinition;
Expand Down Expand Up @@ -80,34 +75,6 @@ public InboundConnectorContextImpl(
this.logs = logs;
}

@Override
public void correlate(Object variables) {
var result = correlateWithResult(variables);
if (result == null) {
log(
Activity.level(Severity.ERROR)
.tag("error")
.message("Failed to correlate inbound event, result is null"));
throw new ConnectorException("Failed to correlate inbound event, result is null");
}
if (result instanceof ActivationConditionNotMet || result instanceof Success) {
return;
}
if (result instanceof CorrelationResult.Failure.InvalidInput invalidInput) {
log(Activity.level(Severity.ERROR).tag("error").message(invalidInput.message()));
throw new ConnectorInputException(invalidInput.message(), invalidInput.error());
}
if (result instanceof Other exception) {
log(Activity.level(Severity.ERROR).tag("error").message(exception.error().getMessage()));
throw new ConnectorException(exception.error());
}
log(
Activity.level(Severity.ERROR)
.tag("error")
.message("Failed to correlate inbound event, details: " + result));
throw new ConnectorException("Failed to correlate inbound event, details: " + result);
}

@Override
public CorrelationResult correlateWithResult(Object variables) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public String activationCondition() {
.orElseGet(() -> rawProperties.get(Keywords.DEPRECATED_ACTIVATION_CONDITION_KEYWORD));
}

public boolean consumeUnmatchedEvents() {
return Optional.ofNullable(rawProperties.get(Keywords.CONSUME_UNMATCHED_EVENTS_KEYWORD))
.map(Boolean::parseBoolean)
.orElse(false);
}

public String deduplicationId(List<String> deduplicationProperties) {
LOG.debug("Computing deduplicationId for element {}", element.elementId());
var deduplicationMode = rawProperties.get(Keywords.DEDUPLICATION_MODE_KEYWORD);
Expand Down Expand Up @@ -98,7 +104,7 @@ private String computeDeduplicationId(List<String> deduplicationProperties) {
.filter(Objects::nonNull)
.toList();
} else {
propsToHash = rawPropertiesWithoutKeywords().values().stream().toList();
propsToHash = propertiesForDeduplication().values().stream().toList();
}
if (propsToHash.isEmpty()) {
throw new InvalidInboundConnectorDefinitionException(
Expand All @@ -107,9 +113,15 @@ private String computeDeduplicationId(List<String> deduplicationProperties) {
return tenantId() + "-" + element.bpmnProcessId() + "-" + Objects.hash(propsToHash);
}

public Map<String, String> rawPropertiesWithoutKeywords() {
public Map<String, String> connectorLevelProperties() {
return rawProperties.entrySet().stream()
.filter(e -> !Keywords.INBOUND_RUNTIME_PROPERTIES.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<String, String> propertiesForDeduplication() {
return rawProperties.entrySet().stream()
.filter(e -> !Keywords.ALL_KEYWORDS.contains(e.getKey()))
.filter(e -> !Keywords.PROPERTIES_EXCLUDED_FROM_DEDUPLICATION.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ private ProcessInstanceContext createProcessInstanceContext(FlowNodeInstance nod
this, node, validationProvider, correlationHandler, objectMapper, variableSupplier);
}

@Override
@Deprecated
public void correlate(final Object variables) {
inboundContext.correlate(variables);
}

@Override
public CorrelationResult correlateWithResult(Object variables) {
return inboundContext.correlateWithResult(variables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public CorrelationResult correlate(
}

if (matchingElements.isEmpty()) {
return ActivationConditionNotMet.INSTANCE;
var discardUnmatchedEvents =
elements.stream()
.map(InboundConnectorElement::consumeUnmatchedEvents)
.anyMatch(e -> e.equals(Boolean.TRUE));
return new ActivationConditionNotMet(discardUnmatchedEvents);
}
if (matchingElements.size() > 1) {
return new Failure.InvalidInput("Multiple connectors are activated for the same input", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private static Map<String, String> extractRawProperties(List<InboundConnectorEle

var distinctPropertySets =
elements.stream()
.collect(Collectors.groupingBy(InboundConnectorElement::rawPropertiesWithoutKeywords));
.collect(Collectors.groupingBy(InboundConnectorElement::propertiesForDeduplication));
if (distinctPropertySets.size() > 1) {
Set<String> divergingProperties = getDivergingProperties(distinctPropertySets);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ void activationCondition_deprecated() {
}

@Test
void rawPropertiesWithoutKeywords() {
void connectorLevelProperties() {
// given
var keywordProps =
Keywords.ALL_KEYWORDS.stream()
Keywords.INBOUND_RUNTIME_PROPERTIES.stream()
.map(k -> Map.entry(k, "value"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
var withCustomProps = new HashMap<>(keywordProps);
Expand All @@ -298,7 +298,7 @@ void rawPropertiesWithoutKeywords() {
new ProcessElement("myProcess", 0, 0, "element1", "<default>"));

// when
var result = testObj.rawPropertiesWithoutKeywords();
var result = testObj.connectorLevelProperties();

// then
assertThat(result).containsOnlyKeys("property");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy;
import io.camunda.connector.api.inbound.CorrelationResult.Failure;
import io.camunda.connector.api.inbound.CorrelationResult.Success;
import io.camunda.connector.feel.FeelEngineWrapper;
Expand Down Expand Up @@ -323,17 +324,37 @@ void multipleElements_multipleMatches_errorRaised() {
class ActivationCondition {

@Test
void activationConditionFalse_shouldNotCorrelate() {
void activationConditionFalse_strategyForwardErrorToUpstream() {
// given
var element = mock(InboundConnectorElement.class);
when(element.activationCondition()).thenReturn("=testKey=\"otherValue\"");
when(element.consumeUnmatchedEvents()).thenReturn(false);

Map<String, Object> variables = Map.of("testKey", "testValue");

// when & then
var result = assertDoesNotThrow(() -> handler.correlate(List.of(element), variables));
verifyNoMoreInteractions(zeebeClient);
assertThat(result).isInstanceOf(Failure.ActivationConditionNotMet.class);
assertThat(((Failure.ActivationConditionNotMet) result).handlingStrategy())
.isInstanceOf(CorrelationFailureHandlingStrategy.ForwardErrorToUpstream.class);
}

@Test
void activationConditionFalse_strategyIgnore() {
// given
var element = mock(InboundConnectorElement.class);
when(element.activationCondition()).thenReturn("=testKey=\"otherValue\"");
when(element.consumeUnmatchedEvents()).thenReturn(true);

Map<String, Object> variables = Map.of("testKey", "testValue");

// when & then
var result = assertDoesNotThrow(() -> handler.correlate(List.of(element), variables));
verifyNoMoreInteractions(zeebeClient);
assertThat(result).isInstanceOf(Failure.ActivationConditionNotMet.class);
assertThat(((Failure) result).handlingStrategy())
.isInstanceOf(CorrelationFailureHandlingStrategy.Ignore.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private Map<String, Object> getData(ActiveExecutableResponse connector) {
if (executableClass != null
&& WebhookConnectorExecutable.class.isAssignableFrom(executableClass)) {
try {
var properties = connector.elements().getFirst().rawPropertiesWithoutKeywords();
var properties = connector.elements().getFirst().connectorLevelProperties();
var contextPath = properties.get("inbound.context");
data = Map.of("path", contextPath);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void testSuccessfulProcessingWithFailedActivation() throws Exception {

var correlationHandlerMock = mock(InboundCorrelationHandler.class);
when(correlationHandlerMock.correlate(any(), any()))
.thenReturn(new CorrelationResult.Failure.ActivationConditionNotMet());
.thenReturn(new CorrelationResult.Failure.ActivationConditionNotMet(false));

var webhookDef = webhookDefinition("nonExistingProcess", 1, "myPath");
var webhookContext =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.api.inbound;

/**
* Strategy to handle correlation failures. The Connector runtime provides this strategy to the
* connector implementation as a hint on how to handle the failure. It's strongly recommended to
* follow the strategy provided by the runtime.
*
* <p>The Connector runtime may provide this information based on:
*
* <ul>
* <li>the type of the failure
* <li>the configuration of the connector provided by the user
* <li>the configuration of the Connector runtime
* </ul>
*/
public sealed interface CorrelationFailureHandlingStrategy {

/**
* When the connector implementation receives this strategy from the runtime, it should forward
* the error to the upstream system. The exact way of how to do this is up to the connector and
* the upstream system, however, it is expected that the upstream system receives the error and
* can handle it.
*
* <p>For example, the connector that consumes messages from a message queue may redeliver the
* message to the queue, so the upstream system can retry processing the message. Depending on the
* value of {@code isRetryable}, the upstream system may retry the operation at a later point in
* time. A message queue connector may set the corresponding flag in the message metadata to
* indicate whether the message should be redelivered or not.
*
* @param isRetryable whether the error is retryable or not. If the error is retryable, the
* upstream system may retry the operation at a later point in time (e.g. by redelivering the
* message).
*/
record ForwardErrorToUpstream(boolean isRetryable) implements CorrelationFailureHandlingStrategy {

public static final ForwardErrorToUpstream RETRYABLE = new ForwardErrorToUpstream(true);
public static final ForwardErrorToUpstream NON_RETRYABLE = new ForwardErrorToUpstream(false);
}

/**
* When the connector implementation receives this strategy from the runtime, it should ignore the
* error and continue processing. This strategy is useful when the error is an expected outcome,
* and the event can be safely discarded. The exact way of how to do this is up to the connector
* and the upstream system, however, it is expected that the upstream system doesn't receive the
* error.
*/
final class Ignore implements CorrelationFailureHandlingStrategy {

public static final Ignore INSTANCE = new Ignore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.camunda.connector.api.inbound;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.ForwardErrorToUpstream;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.Ignore;

public sealed interface CorrelationResult {

Expand All @@ -40,27 +42,44 @@ record MessageAlreadyCorrelated(@JsonIgnore ProcessElementContext activatedEleme

sealed interface Failure extends CorrelationResult {

default boolean isRetryable() {
return false;
String message();

default CorrelationFailureHandlingStrategy handlingStrategy() {
return ForwardErrorToUpstream.RETRYABLE;
}

record InvalidInput(String message, Throwable error) implements Failure {}
record InvalidInput(String message, Throwable error) implements Failure {

record ActivationConditionNotMet() implements Failure {
public static final ActivationConditionNotMet INSTANCE = new ActivationConditionNotMet();
@Override
public CorrelationFailureHandlingStrategy handlingStrategy() {
return ForwardErrorToUpstream.NON_RETRYABLE;
}
}

record ZeebeClientStatus(String status, String message) implements Failure {
record ActivationConditionNotMet(boolean consumeUnmatched) implements Failure {

@Override
public boolean isRetryable() {
return true;
public String message() {
return "Activation condition not met";
}

@Override
public CorrelationFailureHandlingStrategy handlingStrategy() {
if (consumeUnmatched) {
return Ignore.INSTANCE;
} else {
return ForwardErrorToUpstream.NON_RETRYABLE;
}
}
}

record ZeebeClientStatus(String status, String message) implements Failure {}

record Other(Throwable error) implements Failure {

@Override
public boolean isRetryable() {
return true;
public String message() {
return error.getMessage();
}
}
}
Expand Down
Loading

0 comments on commit 61b48fe

Please sign in to comment.