diff --git a/.gitignore b/.gitignore index f02f5b77b68b..b114af0e0fb3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .gradle .idea *.iml +*.swp build out .DS_Store diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteMessageConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteMessageConsumer.java index 9af7672edc9e..ebf332c2a4cf 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteMessageConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/AirbyteMessageConsumer.java @@ -17,13 +17,14 @@ * to. * * Lifecycle: + * * We encourage implementing this interface using the {@link FailureTrackingAirbyteMessageConsumer} * class. */ diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java index 4ed4472c5ef9..5625c2cc7e37 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java @@ -28,10 +28,10 @@ /** * This class handles reading and writing a debezium offset file. In many cases it is duplicating * logic in debezium because that logic is not exposed in the public API. We mostly treat the - * contents of this state file like a black box. We know it is a Map. We - * deserialize it to a Map so that the state file can be human readable. If we ever - * discover that any of the contents of these offset files is not string serializable we will likely - * have to drop the human readability support and just base64 encode it. + * contents of this state file like a black box. We know it is a Map<ByteBuffer, Bytebuffer>. + * We deserialize it to a Map<String, String> so that the state file can be human readable. If + * we ever discover that any of the contents of these offset files is not string serializable we + * will likely have to drop the human readability support and just base64 encode it. */ public class AirbyteFileOffsetBackingStore { diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java index ecb4773b6488..182c618da241 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java @@ -26,8 +26,8 @@ * The purpose of this class is : to , 1. Read the contents of the file {@link #path} which contains * the schema history at the end of the sync so that it can be saved in state for future syncs. * Check {@link #read()} 2. Write the saved content back to the file {@link #path} at the beginning - * of the sync so that debezium can function smoothly. Check {@link #persist(Optional)}. - * To understand more about file, please refer {@link FilteredFileDatabaseHistory} + * of the sync so that debezium can function smoothly. Check persist(Optional<JsonNode>). To + * understand more about file, please refer {@link FilteredFileDatabaseHistory} */ public class AirbyteSchemaHistoryStorage { diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java index 24af44c5af1a..d642921b60cc 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java @@ -19,9 +19,9 @@ * https://debezium.io/documentation/reference/1.4/development/converters.html This is built from * reference with {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} If you * rename this class then remember to rename the datetime.type property value in - * {@link io.airbyte-integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties()} (If you - * don't rename, a test would still fail but it might be tricky to figure out where to change the - * property name) + * io.airbyte-integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties() (If you don't + * rename, a test would still fail but it might be tricky to figure out where to change the property + * name) */ public class MySQLConverter implements CustomConverter { diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/BaseAzureBlobStorageWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/BaseAzureBlobStorageWriter.java index c31219e719a4..6fb54c80597f 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/BaseAzureBlobStorageWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/BaseAzureBlobStorageWriter.java @@ -15,9 +15,11 @@ /** * The base implementation takes care of the following: + *
    *
  • Create shared instance variables.
  • *
  • Create the bucket and prepare the bucket path.
  • *
  • Log and close the write.
  • + *
*/ public abstract class BaseAzureBlobStorageWriter implements AzureBlobStorageWriter { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java index 271c72d02f4b..301960eb2e94 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java @@ -76,7 +76,7 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage } @Override - public void close(boolean hasFailed) { + public void close(final boolean hasFailed) { fieldsWithRefDefinition.clear(); super.close(hasFailed); } @@ -86,7 +86,7 @@ protected JsonNode formatData(final FieldList fields, final JsonNode root) { if (fields == null) { return root; } - List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(fields); + final List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(fields); if (!dateTimeFields.isEmpty()) { BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, (ObjectNode) root); } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index 7c35ef7459a7..794d12d540f1 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -281,8 +281,8 @@ private Set extractJsonValues(final JsonNode node, final String attribut return resultSet; } - private JsonNode removeAirbyteMetadataFields(JsonNode record) { - for (String airbyteMetadataField : AIRBYTE_METADATA_FIELDS) { + private JsonNode removeAirbyteMetadataFields(final JsonNode record) { + for (final String airbyteMetadataField : AIRBYTE_METADATA_FIELDS) { ((ObjectNode) record).remove(airbyteMetadataField); } return record; @@ -311,7 +311,7 @@ private static Stream schemaAndDataProvider() { arguments(getSchema(), MESSAGE_USERS2)); } - private static AirbyteMessage createRecordMessage(String stream, JsonNode data) { + private static AirbyteMessage createRecordMessage(final String stream, final JsonNode data) { return new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(stream) .withData(data) diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index 3f58adbdc463..65276e07936d 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -26,15 +26,18 @@ * This implementation is similar to * {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. The difference is that * this implementation creates Parquet staging files, instead of CSV ones. - *

+ *

+ *

* It does the following operations: + *
    *
  • 1. Parquet writer writes data stream into staging parquet file in - * s3:////.
  • + * s3://bucket-name/bucket-path/staging-folder. *
  • 2. Create a tmp delta table based on the staging parquet file.
  • *
  • 3. Create the destination delta table based on the tmp delta table schema in - * s3:///.
  • + * s3://bucket/stream-name. *
  • 4. Copy the staging parquet file into the destination delta table.
  • *
  • 5. Delete the tmp delta table, and the staging parquet file.
  • + *
*/ public class DatabricksStreamCopier implements StreamCopier { diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java index c3be1fd83056..b5983f5eaa1f 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java @@ -28,8 +28,10 @@ /** * The base implementation takes care of the following: + *
    *
  • Create shared instance variables.
  • *
  • Create the bucket and prepare the bucket path.
  • + *
*/ public abstract class BaseGcsWriter implements S3Writer { @@ -52,8 +54,10 @@ protected BaseGcsWriter(final GcsDestinationConfig config, } /** + *
    *
  • 1. Create bucket if necessary.
  • *
  • 2. Under OVERWRITE mode, delete all objects with the output prefix.
  • + *
*/ @Override public void initialize() { diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index 7280b89cbc59..9460eeb152be 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -30,7 +30,8 @@ * The main function of this class is to convert a JsonSchema to Avro schema. It can also * standardize schema names, and keep track of a mapping from the original names to the standardized * ones, which is needed for unit tests. - *

+ *

+ *

* For limitations of this converter, see the README of this connector: * https://docs.airbyte.io/integrations/destinations/s3#avro */ diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/AvroRecordHelper.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/AvroRecordHelper.java index aeb2c7d3140e..ad2e42efc64b 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/AvroRecordHelper.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/AvroRecordHelper.java @@ -24,9 +24,11 @@ public static JsonFieldNameUpdater getFieldNameUpdater(final String streamName, /** * Convert an Airbyte JsonNode from Avro / Parquet Record to a plain one. + *
    *
  • Remove the airbyte id and emission timestamp fields.
  • - *
  • Remove null fields that must exist in Parquet but does not in original Json.
  • This - * function mutates the input Json. + *
  • Remove null fields that must exist in Parquet but does not in original Json. This function + * mutates the input Json.
  • + *
*/ public static JsonNode pruneAirbyteJson(final JsonNode input) { final ObjectNode output = (ObjectNode) input; diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/S3OutputPathHelper.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/S3OutputPathHelper.java index 68ff99f32738..883f712ff788 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/S3OutputPathHelper.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/S3OutputPathHelper.java @@ -17,8 +17,9 @@ public static String getOutputPrefix(final String bucketPath, final AirbyteStrea } /** - * Prefix: // + * Prefix: <bucket-path>/<source-namespace-if-present>/<stream-name> */ + // Prefix: // public static String getOutputPrefix(final String bucketPath, final String namespace, final String streamName) { final List paths = new LinkedList<>(); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java index 1c0c62ff283a..8ef3bf3aa3c8 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java @@ -28,9 +28,11 @@ /** * The base implementation takes care of the following: + *
    *
  • Create shared instance variables.
  • *
  • Create the bucket and prepare the bucket path.
  • *
  • Log and close the write.
  • + *
*/ public abstract class BaseS3Writer implements S3Writer { @@ -57,8 +59,10 @@ public String getOutputPrefix() { } /** + *
    *
  • 1. Create bucket if necessary.
  • *
  • 2. Under OVERWRITE mode, delete all objects with the output prefix.
  • + *
*/ @Override public void initialize() { diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java index 3c8fc22d3608..9edab761d41b 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java @@ -19,8 +19,8 @@ * This class contains helper functions and boilerplate for implementing a source connector for a * relational DB source. * - * @see io.airbyte.integrations.source.jdbc.AbstractJdbcSource if you are implementing a relational - * DB which can be accessed via JDBC driver. + * see io.airbyte.integrations.source.jdbc.AbstractJdbcSource if you are implementing a relational + * DB which can be accessed via JDBC driver. */ public abstract class AbstractRelationalDbSource extends AbstractDbSource implements Source {