diff --git a/airbyte-integrations/connectors/source-dynamodb/metadata.yaml b/airbyte-integrations/connectors/source-dynamodb/metadata.yaml index 1a6cf19cd0be..b94512975d7e 100644 --- a/airbyte-integrations/connectors/source-dynamodb/metadata.yaml +++ b/airbyte-integrations/connectors/source-dynamodb/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: api connectorType: source definitionId: 50401137-8871-4c5a-abb7-1f5fda35545a - dockerImageTag: 0.3.1 + dockerImageTag: 0.3.2 dockerRepository: airbyte/source-dynamodb documentationUrl: https://docs.airbyte.com/integrations/sources/dynamodb githubIssueLabel: source-dynamodb diff --git a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbConfig.java b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbConfig.java index 2fd2c10c787f..4854994b9b09 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbConfig.java +++ b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbConfig.java @@ -20,7 +20,9 @@ public record DynamodbConfig( String secretKey, - List reservedAttributeNames + List reservedAttributeNames, + + boolean ignoreMissingPermissions ) { @@ -32,12 +34,14 @@ public static DynamodbConfig createDynamodbConfig(JsonNode jsonNode) { JsonNode endpoint = jsonNode.get("endpoint"); JsonNode region = jsonNode.get("region"); JsonNode attributeNames = jsonNode.get("reserved_attribute_names"); + JsonNode missingPermissions = jsonNode.get("ignore_missing_read_permissions_tables"); return new DynamodbConfig( endpoint != null && !endpoint.asText().isBlank() ? URI.create(endpoint.asText()) : null, region != null && !region.asText().isBlank() ? Region.of(region.asText()) : null, accessKeyId != null && !accessKeyId.asText().isBlank() ? accessKeyId.asText() : null, secretAccessKey != null && !secretAccessKey.asText().isBlank() ? secretAccessKey.asText() : null, - attributeNames != null ? Arrays.asList(attributeNames.asText().split("\\s*,\\s*")) : List.of()); + attributeNames != null ? Arrays.asList(attributeNames.asText().split("\\s*,\\s*")) : List.of(), + missingPermissions != null ? missingPermissions.asBoolean() : false); } } diff --git a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbSource.java b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbSource.java index f3ffab950c66..9374aaf742c1 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbSource.java +++ b/airbyte-integrations/connectors/source-dynamodb/src/main/java/io/airbyte/integrations/source/dynamodb/DynamodbSource.java @@ -28,6 +28,7 @@ import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.SyncMode; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,6 +36,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; public class DynamodbSource extends BaseConnector implements Source { @@ -70,23 +72,39 @@ public AirbyteConnectionStatus check(final JsonNode config) { public AirbyteCatalog discover(final JsonNode config) { final var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config); + List airbyteStreams = new ArrayList<>(); try (final var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) { - final var airbyteStreams = dynamodbOperations.listTables().stream() - .map(tb -> new AirbyteStream() - .withName(tb) - .withJsonSchema(Jsons.jsonNode(ImmutableMap.builder() - .put("type", "object") - .put("properties", dynamodbOperations.inferSchema(tb, 1000)) - .build())) - .withSourceDefinedPrimaryKey(Collections.singletonList(dynamodbOperations.primaryKey(tb))) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) - .toList(); - - return new AirbyteCatalog().withStreams(airbyteStreams); + dynamodbOperations.listTables().forEach(table -> { + try { + airbyteStreams.add( + new AirbyteStream() + .withName(table) + .withJsonSchema(Jsons.jsonNode(ImmutableMap.builder() + .put("type", "object") + // will throw DynamoDbException if it can't scan the table from missing read permissions + .put("properties", dynamodbOperations.inferSchema(table, 1000)) + .build())) + .withSourceDefinedPrimaryKey(Collections.singletonList(dynamodbOperations.primaryKey(table))) + .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))); + } catch (DynamoDbException e) { + if (dynamodbConfig.ignoreMissingPermissions()) { + // fragile way to check for missing read access but there is no dedicated exception for missing + // permissions. + if (e.getMessage().contains("not authorized")) { + LOGGER.warn("Connector doesn't have READ access for the table {}", table); + } else { + throw e; + } + } else { + throw e; + } + } + }); } + return new AirbyteCatalog().withStreams(airbyteStreams); } @Override diff --git a/airbyte-integrations/connectors/source-dynamodb/src/main/resources/spec.json b/airbyte-integrations/connectors/source-dynamodb/src/main/resources/spec.json index 6b84ae48a5eb..a4d956aab30c 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-dynamodb/src/main/resources/spec.json @@ -110,6 +110,12 @@ "description": "Comma separated reserved attribute names present in your tables", "airbyte_secret": true, "examples": ["name, field_name, field-name"] + }, + "ignore_missing_read_permissions_tables": { + "title": "Ignore missing read permissions tables", + "type": "boolean", + "description": "Ignore tables with missing scan/read permissions", + "default": false } } } diff --git a/airbyte-integrations/connectors/source-dynamodb/src/test/java/io/airbyte/integrations/source/dynamodb/DynamodbConfigTest.java b/airbyte-integrations/connectors/source-dynamodb/src/test/java/io/airbyte/integrations/source/dynamodb/DynamodbConfigTest.java index ff3e181df629..b5d29e7a29fd 100644 --- a/airbyte-integrations/connectors/source-dynamodb/src/test/java/io/airbyte/integrations/source/dynamodb/DynamodbConfigTest.java +++ b/airbyte-integrations/connectors/source-dynamodb/src/test/java/io/airbyte/integrations/source/dynamodb/DynamodbConfigTest.java @@ -8,6 +8,7 @@ import io.airbyte.commons.json.Jsons; import java.net.URI; +import java.util.Collections; import java.util.Map; import org.junit.jupiter.api.Test; import software.amazon.awssdk.regions.Region; @@ -29,7 +30,9 @@ void testUserBasedDynamodbConfig() { .hasFieldOrPropertyWithValue("endpoint", URI.create("http://localhost:8080")) .hasFieldOrPropertyWithValue("region", Region.of("us-east-1")) .hasFieldOrPropertyWithValue("accessKey", "A012345678910EXAMPLE") - .hasFieldOrPropertyWithValue("secretKey", "a012345678910ABCDEFGH/AbCdEfGhLEKEY"); + .hasFieldOrPropertyWithValue("secretKey", "a012345678910ABCDEFGH/AbCdEfGhLEKEY") + .hasFieldOrPropertyWithValue("reservedAttributeNames", Collections.emptyList()) + .hasFieldOrPropertyWithValue("ignoreMissingPermissions", false); } diff --git a/docs/integrations/sources/dynamodb.md b/docs/integrations/sources/dynamodb.md index a39ca606400e..f81340759c53 100644 --- a/docs/integrations/sources/dynamodb.md +++ b/docs/integrations/sources/dynamodb.md @@ -75,6 +75,7 @@ the underlying role executing the container workload in AWS. | Version | Date | Pull Request | Subject | |:--------| :--------- | :-------------------------------------------------------- |:-----------------------------------------------------------------------| +| 0.3.2 | 2024-05-01 | [27045](https://github.com/airbytehq/airbyte/pull/27045) | Fix missing scan permissions | | 0.3.1 | 2024-05-01 | [31935](https://github.com/airbytehq/airbyte/pull/31935) | Fix list more than 100 tables | | 0.3.0 | 2024-04-24 | [37530](https://github.com/airbytehq/airbyte/pull/37530) | Allow role based access | | 0.2.3 | 2024-02-13 | [35232](https://github.com/airbytehq/airbyte/pull/35232) | Adopt CDK 0.20.4 |