Skip to content

Commit

Permalink
S3 Destination Uses Async Framework (#42405)
Browse files Browse the repository at this point in the history
Co-authored-by: Brian Lai <[email protected]>
Co-authored-by: Gireesh Sreepathi <[email protected]>
  • Loading branch information
3 people authored Aug 1, 2024
1 parent 970715d commit 2320f78
Show file tree
Hide file tree
Showing 36 changed files with 1,376 additions and 501 deletions.
559 changes: 280 additions & 279 deletions airbyte-cdk/java/airbyte-cdk/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ interface BufferingStrategy : AutoCloseable {
message: AirbyteMessage
): Optional<BufferFlushType>

/** Flush buffered messages in a buffer from a particular stream */
@Throws(Exception::class)
fun flushSingleBuffer(stream: AirbyteStreamNameNamespacePair, buffer: SerializableBuffer)
/** Flush the buffered messages from a single stream */
@Throws(Exception::class) fun flushSingleStream(stream: AirbyteStreamNameNamespacePair)

/** Flush all buffers that were buffering message data so far. */
@Throws(Exception::class) fun flushAllBuffers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ class InMemoryRecordBufferingStrategy(
}

@Throws(Exception::class)
override fun flushSingleBuffer(
stream: AirbyteStreamNameNamespacePair,
buffer: SerializableBuffer
) {
override fun flushSingleStream(stream: AirbyteStreamNameNamespacePair) {
LOGGER.info {
"Flushing single stream ${stream.name}: ${streamBuffer[stream]!!.size} records"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SerializedBufferingStrategy
@Throws(Exception::class)
override fun addRecord(
stream: AirbyteStreamNameNamespacePair,
message: AirbyteMessage
message: AirbyteMessage,
): Optional<BufferFlushType> {
var flushed: Optional<BufferFlushType> = Optional.empty()

Expand Down Expand Up @@ -102,8 +102,7 @@ class SerializedBufferingStrategy
}
}

@Throws(Exception::class)
override fun flushSingleBuffer(
private fun flushSingleBuffer(
stream: AirbyteStreamNameNamespacePair,
buffer: SerializableBuffer
) {
Expand All @@ -116,6 +115,11 @@ class SerializedBufferingStrategy
LOGGER.info { "Flushing completed for ${stream.name}" }
}

@Throws(Exception::class)
override fun flushSingleStream(stream: AirbyteStreamNameNamespacePair) {
allBuffers[stream]?.let { flushSingleBuffer(stream, it) }
}

@Throws(Exception::class)
override fun flushAllBuffers() {
LOGGER.info {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.43.6
version=0.44.0

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"type": "RECORD", "record": {"stream": "object_array_test_1", "emitted_at": 1602637589100, "data": { "property_string" : "qqq", "property_array" : [ { "property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": 56.78, "property_big_number": "100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.1234", "property_integer": 42, "property_boolean": true } ] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "string_array" : ["foo bar", "some random special characters: ࠈൡሗ"], "array_date" : ["2021-01-23", "1504-02-29"], "array_timestamp_with_timezone" : ["2022-11-22T01:23:45+05:00", "9999-12-21T01:23:45-05:00"], "array_timestamp_without_timezone" : ["2022-11-22T01:23:45", "1504-02-29T01:23:45"], "array_number" : [56.78, 0, -12345.678], "array_big_number" : ["-12345.678", "100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.1234"], "array_integer" : [42, 0, 12345], "array_boolean" : [true, false] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,12 @@
{"type": "RECORD", "record": {"stream": "integer_test_1", "emitted_at": 1602637589300, "data": { "data" : -12345 }}}
{"type": "RECORD", "record": {"stream": "boolean_test_1", "emitted_at": 1602637589100, "data": { "data" : true }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "string_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "date_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "datetime_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "datetime_test_2"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "number_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "bignumber_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "integer_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "boolean_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}

Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"type": "RECORD", "record": {"stream": "object_test_1", "emitted_at": 1602637589100, "data": {"property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": 56.78, "property_big_number": "100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.1234", "property_integer": 42, "property_boolean": true }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,15 @@
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : 203 }}}
{"type": "RECORD", "record": {"stream": "stream_with_binary_data", "emitted_at": 1602637589500, "data": { "some_id" : 303, "binary_field_name":"dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "streamWithCamelCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_underscores"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_edge_case_field_names"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream-with:spécial:character_names"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "CapitalCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "reserved_keywords"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "groups"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "ProperCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name_next"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_binary_data"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"type": "RECORD", "record": {"stream": "object_array_test_1", "emitted_at": 1602637589100, "data": { "property_string" : "qqq", "property_array" : [ { "property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" } ] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "string_array" : ["foo bar", "some random special characters: ࠈൡሗ"], "array_date" : ["2021-01-23", "1504-02-29"], "array_timestamp_with_timezone" : ["2022-11-22T01:23:45+05:00", "9999-12-21T01:23:45-05:00"], "array_timestamp_without_timezone" : ["2022-11-22T01:23:45", "1504-02-29T01:23:45"], "array_number" : ["56.78", "0", "-12345.678"], "array_integer" : ["42", "0", "12345"], "array_boolean" : [true, false], "array_binary_data" : ["dGVzdA=="] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,13 @@
{"type": "RECORD", "record": {"stream": "boolean_test_1", "emitted_at": 1602637589200, "data": { "data" : true }}}
{"type": "RECORD", "record": {"stream": "binary_test_1", "emitted_at": 1602637589300, "data": { "data" : "dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "string_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "date_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "datetime_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "datetime_test_2"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "time_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "time_test_2"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "number_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "integer_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "boolean_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"type": "RECORD", "record": {"stream": "object_test_1", "emitted_at": 1602637589100, "data": {"property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,15 @@
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : "203" }}}
{"type": "RECORD", "record": {"stream": "stream_with_binary_data", "emitted_at": 1602637589500, "data": { "some_id" : "303", "binary_field_name":"dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "streamWithCamelCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_underscores"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_edge_case_field_names"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream-with:spécial:character_names"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "CapitalCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "reserved_keywords"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "groups"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "ProperCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name_next"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_binary_data"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.99", "USD": "10.99"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": "2", "currency": "EUR", "date": "2020-09-01T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "array_number" : ["-12345.678", "100000000000000000.1234"],"array_float" : ["-12345.678", "0", "1000000000000000000000000000000000000000000000000000.1234"], "array_integer" : ["42", "0", "12345"]}}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589200, "data": { "data" : "0" }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589300, "data": { "data" : "-12345.678" }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "int_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "float_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "default_number_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
Loading

0 comments on commit 2320f78

Please sign in to comment.