Skip to content

Commit

Permalink
🐛 Source Dynamodb: Fix missing scan permissions (#27045)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcos Marx <[email protected]>
Co-authored-by: marcosmarxm <[email protected]>
  • Loading branch information
3 people authored May 1, 2024
1 parent ccfb775 commit 960b5c9
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 50401137-8871-4c5a-abb7-1f5fda35545a
dockerImageTag: 0.3.1
dockerImageTag: 0.3.2
dockerRepository: airbyte/source-dynamodb
documentationUrl: https://docs.airbyte.com/integrations/sources/dynamodb
githubIssueLabel: source-dynamodb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public record DynamodbConfig(

String secretKey,

List<String> reservedAttributeNames
List<String> reservedAttributeNames,

boolean ignoreMissingPermissions

) {

Expand All @@ -32,12 +34,14 @@ public static DynamodbConfig createDynamodbConfig(JsonNode jsonNode) {
JsonNode endpoint = jsonNode.get("endpoint");
JsonNode region = jsonNode.get("region");
JsonNode attributeNames = jsonNode.get("reserved_attribute_names");
JsonNode missingPermissions = jsonNode.get("ignore_missing_read_permissions_tables");
return new DynamodbConfig(
endpoint != null && !endpoint.asText().isBlank() ? URI.create(endpoint.asText()) : null,
region != null && !region.asText().isBlank() ? Region.of(region.asText()) : null,
accessKeyId != null && !accessKeyId.asText().isBlank() ? accessKeyId.asText() : null,
secretAccessKey != null && !secretAccessKey.asText().isBlank() ? secretAccessKey.asText() : null,
attributeNames != null ? Arrays.asList(attributeNames.asText().split("\\s*,\\s*")) : List.of());
attributeNames != null ? Arrays.asList(attributeNames.asText().split("\\s*,\\s*")) : List.of(),
missingPermissions != null ? missingPermissions.asBoolean() : false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;

public class DynamodbSource extends BaseConnector implements Source {

Expand Down Expand Up @@ -70,23 +72,39 @@ public AirbyteConnectionStatus check(final JsonNode config) {
public AirbyteCatalog discover(final JsonNode config) {

final var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);
List<AirbyteStream> airbyteStreams = new ArrayList<>();

try (final var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {

final var airbyteStreams = dynamodbOperations.listTables().stream()
.map(tb -> new AirbyteStream()
.withName(tb)
.withJsonSchema(Jsons.jsonNode(ImmutableMap.builder()
.put("type", "object")
.put("properties", dynamodbOperations.inferSchema(tb, 1000))
.build()))
.withSourceDefinedPrimaryKey(Collections.singletonList(dynamodbOperations.primaryKey(tb)))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
.toList();

return new AirbyteCatalog().withStreams(airbyteStreams);
dynamodbOperations.listTables().forEach(table -> {
try {
airbyteStreams.add(
new AirbyteStream()
.withName(table)
.withJsonSchema(Jsons.jsonNode(ImmutableMap.builder()
.put("type", "object")
// will throw DynamoDbException if it can't scan the table from missing read permissions
.put("properties", dynamodbOperations.inferSchema(table, 1000))
.build()))
.withSourceDefinedPrimaryKey(Collections.singletonList(dynamodbOperations.primaryKey(table)))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)));
} catch (DynamoDbException e) {
if (dynamodbConfig.ignoreMissingPermissions()) {
// fragile way to check for missing read access but there is no dedicated exception for missing
// permissions.
if (e.getMessage().contains("not authorized")) {
LOGGER.warn("Connector doesn't have READ access for the table {}", table);
} else {
throw e;
}
} else {
throw e;
}
}
});
}

return new AirbyteCatalog().withStreams(airbyteStreams);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
"description": "Comma separated reserved attribute names present in your tables",
"airbyte_secret": true,
"examples": ["name, field_name, field-name"]
},
"ignore_missing_read_permissions_tables": {
"title": "Ignore missing read permissions tables",
"type": "boolean",
"description": "Ignore tables with missing scan/read permissions",
"default": false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import io.airbyte.commons.json.Jsons;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.regions.Region;
Expand All @@ -29,7 +30,9 @@ void testUserBasedDynamodbConfig() {
.hasFieldOrPropertyWithValue("endpoint", URI.create("http://localhost:8080"))
.hasFieldOrPropertyWithValue("region", Region.of("us-east-1"))
.hasFieldOrPropertyWithValue("accessKey", "A012345678910EXAMPLE")
.hasFieldOrPropertyWithValue("secretKey", "a012345678910ABCDEFGH/AbCdEfGhLEKEY");
.hasFieldOrPropertyWithValue("secretKey", "a012345678910ABCDEFGH/AbCdEfGhLEKEY")
.hasFieldOrPropertyWithValue("reservedAttributeNames", Collections.emptyList())
.hasFieldOrPropertyWithValue("ignoreMissingPermissions", false);

}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ the underlying role executing the container workload in AWS.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :-------------------------------------------------------- |:-----------------------------------------------------------------------|
| 0.3.2 | 2024-05-01 | [27045](https://github.com/airbytehq/airbyte/pull/27045) | Fix missing scan permissions |
| 0.3.1 | 2024-05-01 | [31935](https://github.com/airbytehq/airbyte/pull/31935) | Fix list more than 100 tables |
| 0.3.0 | 2024-04-24 | [37530](https://github.com/airbytehq/airbyte/pull/37530) | Allow role based access |
| 0.2.3 | 2024-02-13 | [35232](https://github.com/airbytehq/airbyte/pull/35232) | Adopt CDK 0.20.4 |
Expand Down

0 comments on commit 960b5c9

Please sign in to comment.