-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add Kafka Connect Cloud Bigtable sink connector #2466
base: main
Are you sure you want to change the base?
Conversation
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
Warning: This pull request is touching the following templated files:
|
e2f0361
to
40ae074
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is shaping up nicely! All my comments are pretty minor. I did not review the Integration tests yet. I'll get back to you soon on how to handle logical types.
.../src/main/java/com/google/cloud/kafka/connect/bigtable/autocreate/BigtableSchemaManager.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/kafka/connect/bigtable/autocreate/BigtableSchemaManager.java
Outdated
Show resolved
Hide resolved
...-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/ValueMapper.java
Show resolved
Hide resolved
...-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/ValueMapper.java
Show resolved
Hide resolved
...-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/ValueMapper.java
Outdated
Show resolved
Hide resolved
...onnect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapperTest.java
Show resolved
Hide resolved
...onnect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapperTest.java
Show resolved
Hide resolved
...onnect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapperTest.java
Outdated
Show resolved
Hide resolved
...onnect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapperTest.java
Show resolved
Hide resolved
...onnect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapperTest.java
Show resolved
Hide resolved
@brandtnewton I also have a question: how do you want to review further commits (mainly logical types support, some more integration tests, and some minor tweaks throughout the codebase)? In this PR? In a new one? Or maybe do you want to create a new repository and have the PR(s) sent there? Please let me know. |
@prawilny you can just add those commits to this PR. GitHub makes it easy to see only the changes since my last review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the changes! Feel free to add more changes here or in another PR, whatever is easier for you. See comments:
…oting configuration keys for consistency with Kafka convention
move code from BaseIT to BaseKafkaConnectIT get rid of protected and use public/private in base classes
@brandtnewton, please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm all caught up now, including the tests. This is getting super close! I'd like to run the IT tests locally, can you add a quick doc on how to do that? Also I don't have permission to resolve comments so feel free to resolve them yourself.
// b) modify the record as if it didn't contain the deletion of columns that don't exist so that | ||
// other operations within the row have a chance to execute successfully. | ||
// I think the option a) is clearer to reason about, so we mark the column family as required. | ||
requiredColumnFamilies.add(columnFamily); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we will create and then delete a column family if it doesn't exist and this method is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the design doc:
"""
The SinkRecord value has a top level field set to null
Example: {foo: null}. This will result in all columns in the "foo" column family being deleted for this row.
"""
We create a column family in the table's schema and then delete all the cells within the row.
Yes, the "global" (within a table) creation is required only so that the Bigtable modification request succeeds, but that modification is not reverted by a column family deletion within a row.
*/ | ||
@VisibleForTesting | ||
// It is generic so that we can test it with naturally ordered values easily. | ||
static <K, V> Map<K, V> orderMap(Map<K, V> map, Collection<K> order) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can prepareRecords() do this? It could create a LinkedHashMap instead of a HashMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could, but then we'd need to also return a LinkedHashMap from autoCreateTablesAndHandleErrors()
and autoCreateColumnFamiliesAndHandleErrors()
. I thought that it'd couple these functions too tightly. What do you think?
@@ -461,7 +468,6 @@ static Optional<StatusCode> maybeExtractBigtableStatusCode(Throwable t) { | |||
static boolean isStatusCodeCausedByInputError(StatusCode.Code code) { | |||
switch (code) { | |||
case INVALID_ARGUMENT: | |||
case FAILED_PRECONDITION: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this error code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the setRetryableCodes code in BigtableSinkConfig.java - I assume that's why this got removed. Couldn't a user input error still cause a failed precondition error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs state that it is a "user error", but also that "Depending on the circumstance, you might want to retry. For example, you can encounter this error if you attempt to perform operations on a table while it is still being created.".
I encountered this status code when creating different column families concurrently. The error message directly stated that it was due to concurrent table schema modification. That is why it needs to be retried in our case.
I also think that the other example error given by the docs, "attempt to perform operations on a table while it is still being created", should also be retried if it were encountered (due to either a bug in our resource auto creation or the user's too late manual creation of the table if the sink were used with the resource auto creation disabled).
Apart from that, I don't really know if a user input error could cause a failed precondition error - I didn't manage to find a direct answer in the docs. I only know that the invalid operations I test (such as invalid table name) cause INVALID_ARGUMENT and at least some of the FAILED_PRECONDITION errors should be retried.
Do you think we should read error messages and decide whether the exception is caused by user error depending on that? If so, do you have any pointers where to find a more or less complete list of the error messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we should err on leniency's side with branding something a user error since in that case (if the user configured DLQ) we send the offending record to the DLQ with InvalidBigtableSchemaModificationException, which we describe in the docs as:
/**
* An {@link Exception} that signifies that input {@link org.apache.kafka.connect.sink.SinkRecord
* SinkRecord(s)} cause attempt of invalid Cloud Bigtable schema modification and thus is invalid
* and should not be retried.
*/
.modifyColumnFamiliesSettings() | ||
.setRetrySettings(adminApiWriteRetrySettings) | ||
// Retry createTable() for status codes other admin operations retry by default as | ||
// seen in BigtableTableAdminStubSettings and for FAILED_PRECONDITION which is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't FAILED_PRECONDITION continue to fail if the column family has already been added by another request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, then the caller receives ALREADY_EXISTS
error, which is not retriable. Then awaitResourceCreationAndHandleInvalidInputErrors()
logs that information and then swallows the exception, then refreshTableColumnFamiliesCache()
gets the table's schema and the returned table's schema is used to check if the column family creation succeeded.
* @param fields Fields that need to be accessed before the target value is reached. | ||
* @return Extracted nested field. | ||
*/ | ||
private Object extractField(Object value, Iterator<String> fields) { | ||
private SchemaAndValue extractField( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would it be cleaner to just pass a SchemaAndValue instead of a value and schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
props.put(BigtableSinkConfig.AUTO_CREATE_TABLES_CONFIG, "true"); | ||
props.put(BigtableSinkConfig.AUTO_CREATE_COLUMN_FAMILIES_CONFIG, "true"); | ||
props.put(BigtableSinkConfig.INSERT_MODE_CONFIG, InsertMode.INSERT.name()); | ||
props.put(BigtableSinkConfig.ERROR_MODE_CONFIG, BigtableErrorMode.IGNORE.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we test that an error occurs in some way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Map<ByteString, Row> rows = readAllRows(bigtableData, testId); | ||
Row row1 = rows.get(KEY1_BYTES); | ||
Row row2 = rows.get(KEY2_BYTES); | ||
assertEquals(1, row1.getCells().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test that VALUE1 is set and not VALUE2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public static final ByteString NESTED_NULL_STRUCT_FIELD_NAME_BYTES = | ||
ByteString.copyFrom(NESTED_NULL_STRUCT_FIELD_NAME.getBytes(StandardCharsets.UTF_8)); | ||
|
||
public static Struct getStructhWithNullOnNthNestingLevel(int n) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Structh typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public void testCreationOfInvalidTable() throws InterruptedException { | ||
String dlqTopic = createDlq(); | ||
Map<String, String> props = baseConnectorProps(); | ||
String invalidTableName = "T".repeat(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: MAX_BIGTABLE_TABLE_NAME_LENGTH + 1 instead of 100
to make it more clear why this is invalid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
@Test | ||
public void testDeletionFailsWhenAutoCreationDisabled() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a desired behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a test documenting consequences of design choices described in comments in MutationDataBuilder
(in deleteFamily()
and deleteCells()
), so I think it is a desired behavior. The alternative is described in these comments and is - in my opinion - worse.
I added a comment to make that connection explicit.
…stEnumCaseInsensitivity
…tionOfInvalidTable()
@brandtnewton, please take another look. Beware, the integration tests are a bit flaky right now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great doc. Thanks for adding it!
…family auto creation
curl "https://strimzi.io/install/latest?namespace=kafka" | yq -P 'sort_keys(..)' | sponge strimzi-cluster-operator-0.45.0.yaml
@brandtnewton, please take another look. |
This PR adds Kafka Connect sink connector.
The code is to land in a different repository, but the repository hasn't been created yet, so we bring the code for early review here.
The fact that it's targetting another repo is the reason of the following:
Things yet to be done (in future PRs):