Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Records Duplication in BigQuery when Streaming Data from Kafka Topic via kafka-connect-bigquery #369

Open
spd123 opened this issue Dec 10, 2023 · 8 comments

Comments

@spd123
Copy link

spd123 commented Dec 10, 2023

Issue Description:
When streaming data from a Kafka topic to BigQuery using the kafka-connect-bigquery connector, I have observed instances of records being duplicated in the BigQuery table. This behavior appears to be inconsistent but recurring.

Steps to Reproduce:

  1. Set up a Kafka Connect BigQuery connector to stream data from a Kafka topic to BigQuery.
  2. Observe the BigQuery table for any occurrences of duplicated records.

Expected Behavior:
Records should be ingested into BigQuery without any duplication.

Actual Behavior:
Some records are duplicated in the BigQuery table during the streaming process.

Dependecies:
Kafka Connect BigQuery Connector Version: 1.6.6

@OneCricketeer
Copy link

Are you sure you don't have equivalent (not duplicate) data in the topic? For example, your producer is sending the same record, more than once, but at different offsets? Maybe you can use InsertField transform to include offset/partition information

@spd123
Copy link
Author

spd123 commented Dec 15, 2023

Yes, Initially, I had the same concern that records might be duplicated from the producer's side, but upon further investigation, I discovered that this is not the case. What I found was that each record is duplicated precisely once and not more than twice. This means that this is being caused by the connector.

@OneCricketeer
Copy link

Does console consumer show the same? Can you get offset information for each record via connect transforms?

@spd123
Copy link
Author

spd123 commented Dec 16, 2023

Do you mean in kafka connect logs? How do we get that?

@OneCricketeer
Copy link

  1. kafka-console-consumer - just read the topic. Are you sure the connector is causing duplication vs just data in the topic?

  2. If you insert the offset field, is it duplicated, or always incremental?

@spd123
Copy link
Author

spd123 commented Dec 16, 2023

Yes, no duplicates exist in the records produced to the topic. I have verified this by using Kafdrop and writting a separate consumer that logs record_id (the same set of record_ids that was duplicated in bigquery) and offset values. Both methods showed no duplicates. When the producer create records, each record receives a unique timestamp and record_id. However, I have noticed that the duplicated records are exactly identical, including the timestamp. It seems like the same record is being inserted into Bigquery twice

@b-goyal
Copy link
Member

b-goyal commented Dec 20, 2023

@spd123 BigQuery is at-least once connector which means it guarantees to ingest records at-least once in the destination table. If you have verified that the topic contains unique data and there is only one bigquery connector/writer writing into the topic, then duplication can happen if the record ingestion is re-attempted. Re-attempt is made when the same set of records are sent again as those are not committed in first attempt. Could you check if there are warn/error logs which indicate commit failures?

@james-johnston-thumbtack

We also saw a similar situation here several months back. It's extremely rare, but it did happen, and as a workaround a wrapping BigQuery view was unfortunately created to keep only the newest row version, using PostgreSQL LSN to keep only the latest.

This was for sinking a Kafka topic originating from Debezium PostgreSQL connector, and stored in the simple UPSERT topic format (that is, Kafka key == table primary key, Kafka value == new row value, or tombstone if deleting). The Debezium connector snapshots the PG table into Kafka, and then follows the PG write-ahead log to stream subsequent updates. Thus we can safely assume that the most recent Kafka message with a particular key represents the latest row version in the underlying PG table.

The BigQuery sink configuration at the time of the record duplication was as follows:

  • Installed on self-managed Kafka Connect with confluent-hub install --no-prompt wepay/kafka-connect-bigquery:2.4.0
    "connector.class"                      = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    "name"                                 = var.connector_name
    "tasks.max"                            = "1"
    "topics"                               = <exactly one Kafka topic>
    "project"                              = var.gcp_project
    "defaultDataset"                       = var.gcp_dataset
    "schemaRetriever"                      = "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever"
    "keyfile"                              = var.keyfile_location
    "autoCreateTables"                     = "true"
    "sanitizeTopics"                       = "false"
    "allBQFieldsNullable"                  = "true"
    "allowSchemaUnionization"              = "true"
    "allowBigQueryRequiredFieldRelaxation" = "true"
    "allowNewBigQueryFields"               = "true"
    "upsertEnabled"                        = "true"
    "deleteEnabled"                        = "true"
    "kafkaKeyFieldName"                    = "kafkakey"
    "mergeIntervalMs"                      = "60000"
    "mergeRecordsThreshold"                = "-1"
    "bigQueryRetry"                        = "10"
    "topic2TableMap"                       = <mapping for exactly one Kafka topic to one BigQuery table>

This was before exactly-one support in Debezium, so obviously some duplication can be expected there. But I think that is not a problem, because the duplicates would all have the same Kafka key and only the most recent message with the same Kafka key should count. @b-goyal also correctly points out that BigQuery streaming inserts are normally at-least-once. Thus the initial BigQuery inserts could also be subject to even more duplication.

However, the key here is that the connector was configured with both upsertEnabled and deleteEnabled. Thus, any duplicate incoming records should be treated as UPDATEs: while the intermediate temporary table might have duplicates, the final output table generated with the MERGE BigQuery SQL statement should not have any. Here is the relevant MERGE statement from connector version 2.4.0 when we saw this issue: https://github.com/confluentinc/kafka-connect-bigquery/blob/v2.4.0/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java#L216-L242

Notice the key USING block when SELECTing from the temporary intermediate table, which seems (to me) to clearly select only at most one row from the source - the newest row. The remainder of the MERGE statement seems to clearly use it to update any existing row in the target table rather than insert a new one.

        + "USING ("
          + "SELECT * FROM ("
            + "SELECT ARRAY_AGG("
              + "x ORDER BY " + i + " DESC LIMIT 1"
            + ")[OFFSET(0)] src "
            + "FROM " + table(intermediateTable) + " x "
            + "WHERE " + batch + "=" + batchNumber + " "
            + "GROUP BY " + String.join(", ", keyFields)
          + ")"
        + ") "

Thus, even though the intermediate table might have duplicates due to the multiple issues mentioned above, they shouldn't make it into the final target table due to the de-duplication that the MERGE statement does.

Yet unfortunately it did happen:

Screenshot 2023-06-26 at 6 37 56 PM

In this example, transaction_pk is the primary key for the table, and is the kafkakey that is used by the MERGE statement for deduplication. Yet the output target table had two rows for the same primary key, as shown in the above screenshot. In this case, the row was first created with a transaction status of transaction_created. Approximately 3 minutes later, the same row was later updated with a new transaction status of transaction_success. The primary key is the same thus it should have superseded the earlier row when the MERGE statement ran. Yet it did not.

This happened some time ago, so I can't look up the exact logs any more. But the engineer on call did note at the time that the BigQuery sink was being rate-limited around the same time for an extended duration with a bunch of errors like: com.google.cloud.bigquery.BigQueryException: Exceeded rate limits: too many concurrent queries for this project_and_region. For more information, see http://<snip>. If the connector or any tasks got stuck in a failure state, the connector would have been auto-restarted by an in-house watchdog service that restarts them. (This was due to the errors around BQ write serialization not being retried - that fix wasn't yet in v2.4.0). Thus, generally speaking, I believe that whatever edge cases / race conditions might exist with either the BigQuery sink connector, and/or BigQuery itself, would have had a higher risk of appearance under this scenario with a high error rate, potential connector restarts, & ongoing rate limiting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants