Skip to content

Commit

Permalink
chore: add activity logs for Inbound Connector (#2519)
Browse files Browse the repository at this point in the history
* chore: add activity logs for Inbound Connector

* chore: add activity logs when message received
  • Loading branch information
Oleksiivanov authored May 15, 2024
1 parent a98f3aa commit 321ef40
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import com.amazonaws.services.sns.message.SnsSubscriptionConfirmation;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.api.inbound.webhook.MappedHttpRequest;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable;
import io.camunda.connector.api.inbound.webhook.WebhookProcessingPayload;
Expand All @@ -32,6 +34,8 @@
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InboundConnector(name = "AWS SNS Inbound", type = "io.camunda:aws-sns-webhook:1")
@ElementTemplate(
Expand Down Expand Up @@ -67,6 +71,7 @@
templateNameOverride = "SNS HTTPS Boundary Event Connector")
})
public class SnsWebhookExecutable implements WebhookConnectorExecutable {
private static final Logger LOGGER = LoggerFactory.getLogger(SnsWebhookExecutable.class);

protected static final String TOPIC_ARN_HEADER = "x-amz-sns-topic-arn";

Expand All @@ -89,6 +94,16 @@ public SnsWebhookExecutable(
@Override
public WebhookResult triggerWebhook(WebhookProcessingPayload webhookProcessingPayload)
throws Exception {
LOGGER.trace(
"Triggered webhook with context {} and payload {}",
props.context(),
webhookProcessingPayload);

context.log(
Activity.level(Severity.INFO)
.tag(webhookProcessingPayload.method())
.message("Url: " + webhookProcessingPayload.requestURL()));

checkMessageAllowListed(webhookProcessingPayload);
Map bodyAsMap = objectMapper.readValue(webhookProcessingPayload.rawBody(), Map.class);
String region = extractRegionFromTopicArnHeader(webhookProcessingPayload.headers());
Expand All @@ -100,7 +115,13 @@ public WebhookResult triggerWebhook(WebhookProcessingPayload webhookProcessingPa
} else if (msg instanceof SnsNotification) {
return handleNotification(webhookProcessingPayload, bodyAsMap);
} else {
throw new IOException("Operation not supported: " + msg.getClass().getName());
String errorMessage = "Operation not supported: " + msg.getClass().getName();
LOGGER.warn(errorMessage);
context.log(
Activity.level(Severity.ERROR)
.tag(webhookProcessingPayload.method())
.message("Url: " + webhookProcessingPayload.requestURL() + ". " + errorMessage));
throw new IOException(errorMessage);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

import com.amazonaws.services.sqs.AmazonSQS;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.aws.AwsUtils;
import io.camunda.connector.aws.CredentialsProviderSupport;
import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier;
Expand Down Expand Up @@ -89,10 +91,13 @@ public SqsExecutable(

@Override
public void activate(final InboundConnectorContext context) {
SqsInboundProperties properties = context.bindProperties(SqsInboundProperties.class);
LOGGER.info("Subscription activation requested by the Connector runtime: {}", properties);

this.context = context;
LOGGER.info("Subscription activation requested by the Connector runtime");
context.log(
Activity.level(Severity.INFO)
.tag("Subscription activation")
.message("Subscription activation requested"));
SqsInboundProperties properties = context.bindProperties(SqsInboundProperties.class);

var region =
AwsUtils.extractRegionOrDefault(
Expand All @@ -106,14 +111,21 @@ public void activate(final InboundConnectorContext context) {
}
executorService.execute(sqsQueueConsumer);
LOGGER.debug("SQS queue consumer started successfully");
context.log(
Activity.level(Severity.INFO)
.tag("Subscription activation")
.message("Activated subscription for queue: " + properties.getQueue().url()));
context.reportHealth(Health.up());
}

@Override
public void deactivate() {

sqsQueueConsumer.setQueueConsumerActive(false);
LOGGER.debug("Deactivating subscription");
context.log(
Activity.level(Severity.INFO)
.tag("Subscription activation")
.message("Deactivating subscription"));
context.reportHealth(Health.down());
if (executorService != null) {
LOGGER.debug("Shutting down executor service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -50,15 +52,27 @@ public void run() {
receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest);
List<Message> messages = receiveMessageResult.getMessages();
for (Message message : messages) {
context.log(
Activity.level(Severity.INFO)
.tag("Message")
.message("Received SQS Message with ID " + message.getMessageId()));
try {
context.correlate(MessageMapper.toSqsInboundMessage(message));
sqsClient.deleteMessage(properties.getQueue().url(), message.getReceiptHandle());
} catch (ConnectorInputException e) {
LOGGER.warn("NACK - failed to parse SQS message body: {}", e.getMessage());
context.log(
Activity.level(Severity.WARNING)
.tag("Message")
.message("NACK - failed to parse SQS message body: " + e.getMessage()));
}
}
} catch (Exception e) {
LOGGER.debug("NACK - failed to correlate event", e);
context.log(
Activity.level(Severity.WARNING)
.tag("Message")
.message("NACK - failed to correlate event : " + e.getMessage()));
}
} while (queueConsumerActive.get());
LOGGER.info("Stopping SQS consumer for queue {}", properties.getQueue().url());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.Severity;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -114,6 +116,10 @@ private void prepareConsumer() {
reportUp();
} catch (Exception ex) {
LOG.error("Failed to initialize connector: {}", ex.getMessage());
context.log(
Activity.level(Severity.ERROR)
.tag("Subscription")
.message("Failed to initialize connector: " + ex.getMessage()));
context.reportHealth(Health.down(ex));
throw ex;
}
Expand Down Expand Up @@ -172,6 +178,10 @@ private void pollAndPublish() {

private void handleMessage(ConsumerRecord<Object, Object> record) {
LOG.trace("Kafka message received: key = {}, value = {}", record.key(), record.value());
context.log(
Activity.level(Severity.INFO)
.tag("Message")
.message("Received message with key : " + record.key()));
var reader = avroObjectReader != null ? avroObjectReader : objectMapper.reader();
var mappedMessage = convertConsumerRecordToKafkaInboundMessage(record, reader);
this.context.correlate(mappedMessage);
Expand Down Expand Up @@ -205,6 +215,10 @@ private void reportUp() {

private void reportDown(Throwable error) {
var newStatus = Health.down(error);
context.log(
Activity.level(Severity.ERROR)
.tag("Kafka Consumer")
.message("Kafka Consumer status changed to DOWN: " + newStatus));
if (!newStatus.equals(consumerStatus)) {
consumerStatus = newStatus;
context.reportHealth(Health.down(error));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
package io.camunda.connector.kafka.inbound;

import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.generator.dsl.BpmnType;
import io.camunda.connector.generator.java.annotation.ElementTemplate;
import java.util.Properties;
Expand Down Expand Up @@ -71,15 +73,30 @@ public KafkaExecutable() {
}

@Override
public void activate(InboundConnectorContext connectorContext) {
public void activate(InboundConnectorContext context) {
try {
LOG.info("Subscription activation requested by the Connector runtime");
context.log(
Activity.level(Severity.INFO)
.tag("Subscription activation")
.message("Subscription activation requested by the Connector runtime"));

KafkaConnectorProperties elementProps =
connectorContext.bindProperties(KafkaConnectorProperties.class);
context.bindProperties(KafkaConnectorProperties.class);
this.kafkaConnectorConsumer =
new KafkaConnectorConsumer(consumerCreatorFunction, connectorContext, elementProps);
new KafkaConnectorConsumer(consumerCreatorFunction, context, elementProps);
this.kafkaConnectorConsumer.startConsumer();
context.log(
Activity.level(Severity.INFO)
.tag("Subscription activation")
.message("Subscription activated successfully"));
} catch (Exception ex) {
connectorContext.reportHealth(Health.down(ex));
context.log(
Activity.level(Severity.ERROR)
.tag("Subscription activation")
.message("Subscription activation failed: " + ex.getMessage()));
context.reportHealth(Health.down(ex));
LOG.warn("Subscription activation failed: ", ex);
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.rabbitmq.inbound.model.RabbitMqInboundResult;
import io.camunda.connector.rabbitmq.inbound.model.RabbitMqInboundResult.RabbitMqInboundMessage;
import io.camunda.connector.rabbitmq.supplier.ObjectMapperSupplier;
Expand All @@ -43,15 +45,27 @@ public void handleDelivery(
throws IOException {

LOGGER.debug("Received AMQP message with delivery tag {}", envelope.getDeliveryTag());
context.log(
Activity.level(Severity.INFO)
.tag("Message")
.message("Received AMQP message with delivery tag " + envelope.getDeliveryTag()));
try {
RabbitMqInboundResult variables = prepareVariables(consumerTag, properties, body);
context.correlate(variables);
getChannel().basicAck(envelope.getDeliveryTag(), false);
} catch (ConnectorInputException e) {
LOGGER.warn("NACK (no requeue) - failed to parse AMQP message body: {}", e.getMessage());
context.log(
Activity.level(Severity.WARNING)
.tag("Message")
.message("NACK (no requeue) - failed to parse AMQP message body: " + e.getMessage()));
getChannel().basicReject(envelope.getDeliveryTag(), false);
} catch (Exception e) {
LOGGER.debug("NACK (requeue) - failed to correlate event", e);
context.log(
Activity.level(Severity.DEBUG)
.tag("Message")
.message("NACK (requeue) - failed to correlate event"));
getChannel().basicReject(envelope.getDeliveryTag(), true);
}
}
Expand All @@ -60,6 +74,10 @@ public void handleDelivery(
public void handleCancel(String consumerTag) {
LOGGER.info("Consumer cancelled: {}", consumerTag);
try {
context.log(
Activity.level(Severity.INFO)
.tag("Subscription")
.message("Consumer cancelled: " + consumerTag));
context.cancel(null);
} catch (Exception e) {
context.reportHealth(Health.down(e));
Expand All @@ -70,6 +88,10 @@ public void handleCancel(String consumerTag) {
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
LOGGER.error("Consumer shutdown: {}", consumerTag, sig);
context.log(
Activity.level(Severity.INFO)
.tag("Subscription")
.message("Consumer shutdown: " + consumerTag + sig));
try {
context.cancel(sig);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.generator.dsl.BpmnType;
import io.camunda.connector.generator.java.annotation.ElementTemplate;
import io.camunda.connector.rabbitmq.inbound.model.RabbitMqInboundProperties;
Expand Down Expand Up @@ -82,7 +84,11 @@ public void activate(InboundConnectorContext context) throws Exception {
RabbitMqInboundProperties properties = context.bindProperties(RabbitMqInboundProperties.class);

LOGGER.info("Subscription activation requested by the Connector runtime: {}", properties);

context.log(
Activity.level(Severity.INFO)
.tag("Subscription activation")
.message(
"Subscription activation requested for queue name :" + properties.getQueueName()));
connection = openConnection(properties);
channel = connection.createChannel();
Consumer consumer = new RabbitMqConsumer(channel, context);
Expand All @@ -96,6 +102,11 @@ public void activate(InboundConnectorContext context) throws Exception {

consumerTag = startConsumer(properties, consumer);
LOGGER.info("Started RabbitMQ consumer for queue {}", properties.getQueueName());
context.log(
Activity.level(Severity.INFO)
.tag("Subscription activation")
.message("Activated subscription for queue: " + properties.getQueueName()));
context.reportHealth(Health.up());
}

@Override
Expand Down

0 comments on commit 321ef40

Please sign in to comment.