Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemaless ( JSON events ) to bq fail to create table #272

Open
raphaelauv opened this issue Jan 30, 2023 · 2 comments
Open

schemaless ( JSON events ) to bq fail to create table #272

raphaelauv opened this issue Jan 30, 2023 · 2 comments

Comments

@raphaelauv
Copy link

raphaelauv commented Jan 30, 2023

I want archive the topic _schemas in bigquery ( it contains only events with a JSON key and a JSON value but not serialized with a JSON schema )

so I want use the schemaless functionnality of the connector but it fail on

com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to unionize schemas of records for the table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=schema_metadata, tableId=_schemas}} Caused by: Could not convert to BigQuery schema with a batch of tombstone records. at 

com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:297) at 

com.wepay.kafka.connect.bigquery.SchemaManager.createTable(SchemaManager.java:240) at 

com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptTableCreate(AdaptiveBigQueryWriter.java:161) at

com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:102) at

com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:112) at 

did someone found a way to send kafka events without schema to bq ?

thanks

@b-goyal
Copy link
Member

b-goyal commented Sep 5, 2023

@raphaelauv , connector does not support table creation and schema update on JSON input. If you create the table manually with appropriate schema and send payload of type Map, data ingestion should not fail.

@himashan
Copy link

himashan commented Feb 21, 2024

has anyone found a way to solve this?
the only thing i could get working is to use a StringConverter and the HoistField SMT to wrap the whole thing in a single value. but the table just has a single column with the entire string as a string. would love a way to have the schema infered from the json and flatten it.

{
"name": "BigQuerySinkConnector-metadata-5",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": "1",
"topics.regex": ".*\.json$",
"project": "montu-bi-playground",
"defaultDataset": "gh_himashan",
"keyfile": "...",
"keySource": "JSON",
"autoCreateTables": "true",
"autoUpdateSchemas": "true",
"allowNewBigQueryFields": "true",
"allowBigQueryRequiredFieldRelaxation": "true",
"sanitizeTopics": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "bqSinkTopicDlq",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"transforms": "WrapAsStruct",
"transforms.WrapAsStruct.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.WrapAsStruct.field": "message_value"
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants