diff --git a/airbyte-integrations/connectors/destination-e2e-test/metadata.yaml b/airbyte-integrations/connectors/destination-e2e-test/metadata.yaml index 142ee7791d41..3619cfd1573b 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/metadata.yaml +++ b/airbyte-integrations/connectors/destination-e2e-test/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: unknown connectorType: destination definitionId: 2eb65e87-983a-4fd7-b3e3-9d9dc6eb8537 - dockerImageTag: 0.3.3 + dockerImageTag: 0.3.4 dockerRepository: airbyte/destination-e2e-test githubIssueLabel: destination-e2e-test icon: airbyte.svg diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java index 2700adcd96e0..f63c4699446f 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java @@ -52,24 +52,21 @@ public void accept(final AirbyteMessage message) { if (message.getType() == Type.STATE) { LOGGER.info("Emitting state: {}", message); outputRecordCollector.accept(message); - return; } else if (message.getType() == Type.TRACE) { LOGGER.info("Received a trace: {}", message); - } else if (message.getType() != Type.RECORD) { - return; - } + } else if (message.getType() == Type.RECORD) { + final AirbyteRecordMessage recordMessage = message.getRecord(); + final AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage); - final AirbyteRecordMessage recordMessage = message.getRecord(); - final AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage); + if (!loggers.containsKey(pair)) { + throw new IllegalArgumentException( + String.format( + "Message contained record from a stream that was not in the catalog.\n Catalog: %s\n Message: %s", + Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); + } - if (!loggers.containsKey(pair)) { - throw new IllegalArgumentException( - String.format( - "Message contained record from a stream that was not in the catalog.\n Catalog: %s\n Message: %s", - Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); + loggers.get(pair).log(recordMessage); } - - loggers.get(pair).log(recordMessage); } @Override diff --git a/docs/integrations/destinations/e2e-test.md b/docs/integrations/destinations/e2e-test.md index f1d17327fac0..15cdefaf88e1 100644 --- a/docs/integrations/destinations/e2e-test.md +++ b/docs/integrations/destinations/e2e-test.md @@ -46,6 +46,7 @@ The OSS and Cloud variants have the same version number starting from version `0 | Version | Date | Pull Request | Subject | |:--------|:-----------| :------------------------------------------------------- |:----------------------------------------------------------| +| 0.3.4 | 2024-04-16 | [37366](https://github.com/airbytehq/airbyte/pull/37366) | Fix NPE | | 0.3.3 | 2024-04-16 | [37366](https://github.com/airbytehq/airbyte/pull/37366) | Fix Log trace messages | | 0.3.2 | 2024-02-14 | [36812](https://github.com/airbytehq/airbyte/pull/36812) | Log trace messages | | 0.3.1 | 2024-02-14 | [35278](https://github.com/airbytehq/airbyte/pull/35278) | Adopt CDK 0.20.6 |