Skip to content

Commit

Permalink
feat: support configuring additional mapper types internally (#14202)
Browse files Browse the repository at this point in the history
Co-authored-by: Chandler Prall <[email protected]>
  • Loading branch information
pedroslopez and chandlerprall committed Oct 7, 2024
1 parent f8d158e commit 48143f2
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 25 deletions.
4 changes: 4 additions & 0 deletions airbyte-api/server-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ val genApiServer =
"StreamJsonSchema" to "com.fasterxml.jackson.databind.JsonNode",
"StateBlob" to "com.fasterxml.jackson.databind.JsonNode",
"FieldSchema" to "com.fasterxml.jackson.databind.JsonNode",
"MapperConfiguration" to "com.fasterxml.jackson.databind.JsonNode",
"DeclarativeManifest" to "com.fasterxml.jackson.databind.JsonNode",
"SecretPersistenceConfigurationJson" to "com.fasterxml.jackson.databind.JsonNode",
"ConnectorBuilderProjectTestingValues" to "com.fasterxml.jackson.databind.JsonNode",
Expand Down Expand Up @@ -142,6 +143,7 @@ val genApiServer2 =
"StreamJsonSchema" to "com.fasterxml.jackson.databind.JsonNode",
"StateBlob" to "com.fasterxml.jackson.databind.JsonNode",
"FieldSchema" to "com.fasterxml.jackson.databind.JsonNode",
"MapperConfiguration" to "com.fasterxml.jackson.databind.JsonNode",
"DeclarativeManifest" to "com.fasterxml.jackson.databind.JsonNode",
"SecretPersistenceConfigurationJson" to "com.fasterxml.jackson.databind.JsonNode",
"ConnectorBuilderProjectTestingValues" to "com.fasterxml.jackson.databind.JsonNode",
Expand Down Expand Up @@ -174,6 +176,7 @@ val genApiClient =
"StreamJsonSchema" to "com.fasterxml.jackson.databind.JsonNode",
"StateBlob" to "com.fasterxml.jackson.databind.JsonNode",
"FieldSchema" to "com.fasterxml.jackson.databind.JsonNode",
"MapperConfiguration" to "com.fasterxml.jackson.databind.JsonNode",
"DeclarativeManifest" to "com.fasterxml.jackson.databind.JsonNode",
"SecretPersistenceConfigurationJson" to "com.fasterxml.jackson.databind.JsonNode",
"ConnectorBuilderProjectTestingValues" to "com.fasterxml.jackson.databind.JsonNode",
Expand Down Expand Up @@ -220,6 +223,7 @@ val genApiDocs =
"StreamJsonSchema" to "com.fasterxml.jackson.databind.JsonNode",
"StateBlob" to "com.fasterxml.jackson.databind.JsonNode",
"FieldSchema" to "com.fasterxml.jackson.databind.JsonNode",
"MapperConfiguration" to "com.fasterxml.jackson.databind.JsonNode",
"ConnectorBuilderProjectTestingValues" to "com.fasterxml.jackson.databind.JsonNode",
"BillingEvent" to "com.fasterxml.jackson.databind.JsonNode",
)
Expand Down
25 changes: 24 additions & 1 deletion airbyte-api/server-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11597,9 +11597,15 @@ components:
selectedFields:
description: This must be set if `fieldSelectedEnabled` is set. An empty list indicates that no properties will be included.
$ref: "#/components/schemas/SelectedFields"
# TODO(pedro): remove `hashedFields` once the UI is updated to use `mappers`.
hashedFields:
description: Fields that should be hashed before being written to the destination.
description: Fields that should be hashed before being written to the destination. Deprecated, use `mappers` instead.
$ref: "#/components/schemas/SelectedFields"
mappers:
description: Mappers that should be applied to the stream before writing to the destination.
type: array
items:
$ref: "#/components/schemas/ConfiguredStreamMapper"
minimumGenerationId:
type: integer
format: int64
Expand All @@ -11622,6 +11628,23 @@ components:
items:
type: string
x-sdk-component: true
StreamMapperType:
type: string
enum:
- hashing
ConfiguredStreamMapper:
type: object
required:
- type
- mapperConfiguration
properties:
type:
$ref: "#/components/schemas/StreamMapperType"
mapperConfiguration:
$ref: "#/components/schemas/MapperConfiguration"
MapperConfiguration:
type: object
description: The values required to configure the mapper.
SelectedFields:
description: Paths to the fields that will be included in the configured catalog.
type: array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration;
import io.airbyte.api.client.model.generated.ConfiguredStreamMapper;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
import io.airbyte.api.client.model.generated.SyncMode;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.text.Names;
import io.airbyte.config.ConfiguredAirbyteStream;
import io.airbyte.config.ConfiguredMapper;
import io.airbyte.config.helpers.FieldGenerator;
import io.airbyte.mappers.helpers.MapperHelperKt;
import io.airbyte.validation.json.JsonValidationException;
import jakarta.annotation.Nullable;
import java.util.Collections;
Expand Down Expand Up @@ -196,16 +196,13 @@ private static io.airbyte.config.AirbyteStream toStreamInternal(final io.airbyte
.withIsResumable(stream.isResumable());
}

private static List<ConfiguredMapper> toConfiguredHashingMappers(final @Nullable List<SelectedFieldInfo> hashedFields) {
if (hashedFields == null) {
private static List<ConfiguredMapper> toConfiguredMappers(final @Nullable List<ConfiguredStreamMapper> mappers) {
if (mappers == null) {
return Collections.emptyList();
}

// FIXME(pedro): See https://github.com/airbytehq/airbyte-internal-issues/issues/9718
// We shouldn't have to rebuild these here, and can potentially lead to losing configuration that's
// actually stored in the db.
return hashedFields.stream().map(f -> MapperHelperKt.createHashingMapper(f.getFieldPath().getFirst()) // We don't support nested fields for now.
).toList();
return mappers.stream()
.map(mapper -> new ConfiguredMapper(mapper.getType().getValue(), Jsons.deserializeToStringMap(mapper.getMapperConfiguration())))
.collect(Collectors.toList());
}

private static ConfiguredAirbyteStream toConfiguredStreamInternal(final io.airbyte.api.client.model.generated.AirbyteStream stream,
Expand All @@ -220,11 +217,9 @@ private static ConfiguredAirbyteStream toConfiguredStreamInternal(final io.airby
.cursorField(config.getCursorField())
.generationId(config.getGenerationId())
.minimumGenerationId(config.getMinimumGenerationId())
.syncId(config.getSyncId());

builder
.syncId(config.getSyncId())
.fields(fieldGenerator.getFieldsFromSchema(convertedStream.getJsonSchema()))
.mappers(toConfiguredHashingMappers(config.getHashedFields()));
.mappers(toConfiguredMappers(config.getMappers()));

return builder.build();
}
Expand Down Expand Up @@ -259,6 +254,7 @@ private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration
null,
null,
null,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@

import com.google.common.collect.Lists;
import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
import io.airbyte.api.client.model.generated.ConfiguredStreamMapper;
import io.airbyte.api.client.model.generated.StreamMapperType;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.text.Names;
import io.airbyte.config.ConfiguredAirbyteCatalog;
import io.airbyte.config.ConfiguredMapper;
import io.airbyte.config.helpers.FieldGenerator;
import io.airbyte.mappers.helpers.MapperHelperKt;
import io.airbyte.protocol.models.AirbyteCatalog;
Expand Down Expand Up @@ -65,6 +68,7 @@ class CatalogClientConvertersTest {
null,
null,
null,
null,
null);

private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams(
Expand Down Expand Up @@ -92,6 +96,9 @@ void testConvertToProtocol() {
@Test
void testConvertInternalWithMapping() {
reset(fieldGenerator);

final ConfiguredMapper hashingMapper = MapperHelperKt.createHashingMapper(ID_FIELD_NAME);

final var streamConfig = new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration(
io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH,
io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND,
Expand All @@ -102,7 +109,8 @@ void testConvertInternalWithMapping() {
null,
null,
null,
List.of(new SelectedFieldInfo(List.of(ID_FIELD_NAME))),
null,
List.of(new ConfiguredStreamMapper(StreamMapperType.HASHING, Jsons.jsonNode(hashingMapper.getConfig()))),
null,
null,
null);
Expand Down Expand Up @@ -162,6 +170,7 @@ void testIsResumableExport() {
null,
null,
null,
null,
null);
final var streamAndConf = new AirbyteStreamAndConfiguration(stream, conf);
final List<AirbyteStreamAndConfiguration> streams = List.of(streamAndConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,7 @@ private boolean haveConfigChange(final AirbyteStreamConfiguration oldConfig, fin
final Set<List<String>> convertedNewPrimaryKey = new HashSet<>(newConfig.getPrimaryKey());
final boolean hasPrimaryKeyChanged = !(convertedOldPrimaryKey.equals(convertedNewPrimaryKey));

// TODO(pedro): This should be checked by generating the destination catalog to support all mappers
final List<SelectedFieldInfo> oldHashedFields =
oldConfig.getHashedFields() == null ? new ArrayList() : new ArrayList(oldConfig.getHashedFields());
final List<SelectedFieldInfo> newHashedFields =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDef
final OperationsHandler operationsHandler,
final EventRunner eventRunner,
final ConfigRepository configRepositoryDoNotUse,
CatalogService catalogService,
final CatalogService catalogService,
final ConnectionService connectionService,
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
final FieldGenerator fieldGenerator,
Expand Down Expand Up @@ -545,7 +545,9 @@ protected AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final Airbyt
outputStreamConfig.setSelected(originalConfiguredStream.getConfig().getSelected());
outputStreamConfig.setSuggested(originalConfiguredStream.getConfig().getSuggested());
outputStreamConfig.setFieldSelectionEnabled(originalStreamConfig.getFieldSelectionEnabled());
outputStreamConfig.setMappers(originalStreamConfig.getMappers());

// TODO(pedro): Handle other mappers that are no longer valid
// Add hashed field configs that are still present in the schema
if (originalStreamConfig.getHashedFields() != null && !originalStreamConfig.getHashedFields().isEmpty()) {
final List<String> discoveredFields =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import io.airbyte.api.model.generated.AirbyteStream;
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.model.generated.AirbyteStreamConfiguration;
import io.airbyte.api.model.generated.ConfiguredStreamMapper;
import io.airbyte.api.model.generated.DestinationSyncMode;
import io.airbyte.api.model.generated.SelectedFieldInfo;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamMapperType;
import io.airbyte.api.model.generated.SyncMode;
import io.airbyte.commons.converters.ApiConverters;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -61,6 +63,13 @@ private static io.airbyte.api.model.generated.AirbyteStream toApi(final io.airby
.isResumable(stream.getIsResumable());
}

public static ConfiguredStreamMapper toApi(final ConfiguredMapper mapper) {
return new ConfiguredStreamMapper()
.type(Enums.toEnum(mapper.getName(), StreamMapperType.class)
.orElseThrow(() -> new IllegalArgumentException("Unexpected mapper name: " + mapper.getName())))
.mapperConfiguration(Jsons.jsonNode(mapper.getConfig()));
}

/**
* Convert an internal catalog and field selection mask to an api catalog model.
*
Expand Down Expand Up @@ -88,9 +97,11 @@ public static io.airbyte.api.model.generated.AirbyteCatalog toApi(final Configur
.suggested(false)
.fieldSelectionEnabled(getStreamHasFieldSelectionEnabled(fieldSelectionData, streamDescriptor))
.selectedFields(List.of())
// TODO(pedro): `hashedFields` should be removed once the UI is updated to use `mappers`.
.hashedFields(configuredStream.getMappers().stream().filter(mapper -> MapperOperationName.HASHING.equals(mapper.getName()))
.map(CatalogConverter::toApiFieldInfo)
.collect(Collectors.toList()))
.mappers(configuredStream.getMappers().stream().map(CatalogConverter::toApi).collect(Collectors.toList()))
.generationId(configuredStream.getGenerationId())
.minimumGenerationId(configuredStream.getMinimumGenerationId())
.syncId(configuredStream.getSyncId());
Expand Down Expand Up @@ -212,6 +223,15 @@ private static List<ConfiguredMapper> toConfiguredHashingMappers(@Nullable final
).toList();
}

private static List<ConfiguredMapper> toConfiguredMappers(final @Nullable List<io.airbyte.api.model.generated.ConfiguredStreamMapper> mappers) {
if (mappers == null) {
return Collections.emptyList();
}
return mappers.stream()
.map(mapper -> new ConfiguredMapper(mapper.getType().toString(), Jsons.deserializeToStringMap(mapper.getMapperConfiguration())))
.collect(Collectors.toList());
}

private static SelectedFieldInfo toApiFieldInfo(final ConfiguredMapper configuredHashingMapper) {
Preconditions.checkArgument(MapperOperationName.HASHING.equals(configuredHashingMapper.getName()), "Expected hashing mapper");
return new SelectedFieldInfo()
Expand Down Expand Up @@ -242,11 +262,17 @@ public static ConfiguredAirbyteCatalog toConfiguredInternal(final io.airbyte.api
.syncMode(Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.config.SyncMode.class))
.destinationSyncMode(Enums.convertTo(s.getConfig().getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
.cursorField(s.getConfig().getCursorField())
.primaryKey(Optional.ofNullable(s.getConfig().getPrimaryKey()).orElse(Collections.emptyList()));

builder
.fields(fieldGenerator.getFieldsFromSchema(convertedStream.getJsonSchema()))
.mappers(toConfiguredHashingMappers(s.getConfig().getHashedFields()));
.primaryKey(Optional.ofNullable(s.getConfig().getPrimaryKey()).orElse(Collections.emptyList()))
.fields(fieldGenerator.getFieldsFromSchema(convertedStream.getJsonSchema()));

if (s.getConfig().getMappers() != null && !s.getConfig().getMappers().isEmpty()) {
builder
.mappers(toConfiguredMappers(s.getConfig().getMappers()));
} else {
// TODO(pedro): `hashedFields` support should be removed once the UI is updated to use `mappers`.
builder
.mappers(toConfiguredHashingMappers(s.getConfig().getHashedFields()));
}

return builder.build();
} catch (final JsonValidationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.model.generated.ConfiguredStreamMapper;
import io.airbyte.api.model.generated.DestinationSyncMode;
import io.airbyte.api.model.generated.SelectedFieldInfo;
import io.airbyte.api.model.generated.StreamMapperType;
import io.airbyte.api.model.generated.SyncMode;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.handlers.helpers.CatalogConverter;
import io.airbyte.commons.server.helpers.ConnectionHelpers;
import io.airbyte.config.ConfiguredMapper;
Expand Down Expand Up @@ -52,6 +55,28 @@ void testEnumConversion() {

@Test
void testConvertInternal() throws JsonValidationException {
final ConfiguredMapper hashingMapper = MapperHelperKt.createHashingMapper(SECOND_FIELD_NAME);
final var apiCatalog = ConnectionHelpers.generateApiCatalogWithTwoFields();
final var apiStream = apiCatalog.getStreams().getFirst();
apiStream.getConfig().setMappers(
List.of(new ConfiguredStreamMapper()
.type(StreamMapperType.HASHING)
.mapperConfiguration(Jsons.jsonNode(hashingMapper.getConfig()))));

final var internalCatalog = CatalogConverter.toConfiguredInternal(apiCatalog);
assertEquals(1, internalCatalog.getStreams().size());
final var internalStream = internalCatalog.getStreams().getFirst();
final var mappers = internalStream.getMappers();
assertEquals(1, mappers.size());

final var fields = internalStream.getFields();
assertEquals(2, fields.size());

assertEquals(hashingMapper, mappers.getFirst());
}

@Test
void testConvertInternalWithHashedFields() throws JsonValidationException {
final var apiCatalog = ConnectionHelpers.generateApiCatalogWithTwoFields();
final var apiStream = apiCatalog.getStreams().getFirst();
apiStream.getConfig().setHashedFields(List.of(new SelectedFieldInfo().fieldPath(List.of(SECOND_FIELD_NAME))));
Expand All @@ -67,7 +92,6 @@ void testConvertInternal() throws JsonValidationException {

final ConfiguredMapper expectedMapper = MapperHelperKt.createHashingMapper(SECOND_FIELD_NAME);
assertEquals(expectedMapper, mappers.getFirst());

}

@Test
Expand Down
Loading

0 comments on commit 48143f2

Please sign in to comment.