Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Comma separated list of topics is not WAI in kafka-to-bigquery #2038

Open
atognolag opened this issue Nov 27, 2024 · 0 comments
Open
Labels
bug Something isn't working needs triage p2

Comments

@atognolag
Copy link

Related Template(s)

kafka_to_bigquery_flex

Template Version

2024-11-06-01_rc00

What happened?

The string of comma separated Kafka topics is not correctly parsed into a list and is instead provided as a string to the KafkaIO where a List is expected

Relevant log output

{"message":"org.apache.beam.sdk.extensions.gcp.options.GcpOptions$DefaultProjectFactory - Inferred default GCP project \u0027sdg-af-goog\u0027 from gcloud. If this is the incorrect project, please cancel this Pipeline and specify the command-line argument --project.","severity":"INFO"}
{"message":"org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: \n\tallow.auto.create.topics \u003d true\n\tauto.commit.interval.ms \u003d 5000\n\tauto.include.jmx.reporter \u003d true\n\tauto.offset.reset \u003d earliest\n\tbootstrap.servers \u003d [34.175.68.53:9092]\n\tcheck.crcs \u003d true\n\tclient.dns.lookup \u003d use_all_dns_ips\n\tclient.id \u003d consumer-null-1\n\tclient.rack \u003d \n\tconnections.max.idle.ms \u003d 540000\n\tdefault.api.timeout.ms \u003d 60000\n\tenable.auto.commit \u003d false\n\tenable.metrics.push \u003d true\n\texclude.internal.topics \u003d true\n\tfetch.max.bytes \u003d 52428800\n\tfetch.max.wait.ms \u003d 500\n\tfetch.min.bytes \u003d 1\n\tgroup.id \u003d null\n\tgroup.instance.id \u003d null\n\tgroup.protocol \u003d classic\n\tgroup.remote.assignor \u003d null\n\theartbeat.interval.ms \u003d 3000\n\tinterceptor.classes \u003d []\n\tinternal.leave.group.on.close \u003d true\n\tinternal.throw.on.fetch.stable.offset.unsupported \u003d false\n\tisolation.level \u003d read_uncommitted\n\tkey.deserializer \u003d class org.apache.kafka.common.serialization.ByteArrayDeserializer\n\tmax.partition.fetch.bytes \u003d 1048576\n\tmax.poll.interval.ms \u003d 300000\n\tmax.poll.records \u003d 500\n\tmetadata.max.age.ms \u003d 300000\n\tmetric.reporters \u003d []\n\tmetrics.num.samples \u003d 2\n\tmetrics.recording.level \u003d INFO\n\tmetrics.sample.window.ms \u003d 30000\n\tpartition.assignment.strategy \u003d [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]\n\treceive.buffer.bytes \u003d 524288\n\treconnect.backoff.max.ms \u003d 1000\n\treconnect.backoff.ms \u003d 50\n\trequest.timeout.ms \u003d 30000\n\tretry.backoff.max.ms \u003d 1000\n\tretry.backoff.ms \u003d 100\n\tsasl.client.callback.handler.class \u003d null\n\tsasl.jaas.config \u003d null\n\tsasl.kerberos.kinit.cmd \u003d /usr/bin/kinit\n\tsasl.kerberos.min.time.before.relogin \u003d 60000\n\tsasl.kerberos.service.name \u003d null\n\tsasl.kerberos.ticket.renew.jitter \u003d 0.05\n\tsasl.kerberos.ticket.renew.window.factor \u003d 0.8\n\tsasl.login.callback.handler.class \u003d null\n\tsasl.login.class \u003d null\n\tsasl.login.connect.timeout.ms \u003d null\n\tsasl.login.read.timeout.ms \u003d null\n\tsasl.login.refresh.buffer.seconds \u003d 300\n\tsasl.login.refresh.min.period.seconds \u003d 60\n\tsasl.login.refresh.window.factor \u003d 0.8\n\tsasl.login.refresh.window.jitter \u003d 0.05\n\tsasl.login.retry.backoff.max.ms \u003d 10000\n\tsasl.login.retry.backoff.ms \u003d 100\n\tsasl.mechanism \u003d GSSAPI\n\tsasl.oauthbearer.clock.skew.seconds \u003d 30\n\tsasl.oauthbearer.expected.audience \u003d null\n\tsasl.oauthbearer.expected.issuer \u003d null\n\tsasl.oauthbearer.jwks.endpoint.refresh.ms \u003d 3600000\n\tsasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms \u003d 10000\n\tsasl.oauthbearer.jwks.endpoint.retry.backoff.ms \u003d 100\n\tsasl.oauthbearer.jwks.endpoint.url \u003d null\n\tsasl.oauthbearer.scope.claim.name \u003d scope\n\tsasl.oauthbearer.sub.claim.name \u003d sub\n\tsasl.oauthbearer.token.endpoint.url \u003d null\n\tsecurity.protocol \u003d PLAINTEXT\n\tsecurity.providers \u003d null\n\tsend.buffer.bytes \u003d 131072\n\tsession.timeout.ms \u003d 45000\n\tsocket.connection.setup.timeout.max.ms \u003d 30000\n\tsocket.connection.setup.timeout.ms \u003d 10000\n\tssl.cipher.suites \u003d null\n\tssl.enabled.protocols \u003d [TLSv1.2, TLSv1.3]\n\tssl.endpoint.identification.algorithm \u003d https\n\tssl.engine.factory.class \u003d null\n\tssl.key.password \u003d null\n\tssl.keymanager.algorithm \u003d SunX509\n\tssl.keystore.certificate.chain \u003d null\n\tssl.keystore.key \u003d null\n\tssl.keystore.location \u003d null\n\tssl.keystore.password \u003d null\n\tssl.keystore.type \u003d JKS\n\tssl.protocol \u003d TLSv1.3\n\tssl.provider \u003d null\n\tssl.secure.random.implementation \u003d null\n\tssl.trustmanager.algorithm \u003d PKIX\n\tssl.truststore.certificates \u003d null\n\tssl.truststore.location \u003d null\n\tssl.truststore.password \u003d null\n\tssl.truststore.type \u003d JKS\n\tvalue.deserializer \u003d class org.apache.kafka.common.serialization.ByteArrayDeserializer\n","severity":"INFO"}
{"message":"org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector","severity":"INFO"}
{"message":"org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0","severity":"INFO"}
{"message":"org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5","severity":"INFO"}
{"message":"org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1732712593883","severity":"INFO"}
{"message":"org.apache.kafka.clients.Metadata - [Consumer clientId\u003dconsumer-null-1, groupId\u003dnull] Cluster ID: MkU3OEVBNTcwNTJENDM2Qk","severity":"INFO"}
{"message":"org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed","severity":"INFO"}
{"message":"org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter","severity":"INFO"}
{"message":"org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter","severity":"INFO"}
{"message":"org.apache.kafka.common.metrics.Metrics - Metrics reporters closed","severity":"INFO"}
{"message":"org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-null-1 unregistered","severity":"INFO"}
{"message":"com.google.cloud.teleport.v2.common.UncaughtExceptionLogger - The template launch failed.\norg.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.InvalidTopicException: Topic \u0027cdc,cdc2\u0027 is invalid\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:331)\n\tat org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)\n\tat com.google.cloud.teleport.v2.templates.KafkaToBigQueryFlex.run(KafkaToBigQueryFlex.java:296)\n\tat com.google.cloud.teleport.v2.templates.KafkaToBigQueryFlex.main(KafkaToBigQueryFlex.java:146)\nCaused by: org.apache.kafka.common.errors.InvalidTopicException: Topic \u0027cdc,cdc2\u0027 is invalid\n","severity":"ERROR"}
org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.InvalidTopicException: Topic 'cdc,cdc2' is invalid
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:331)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
	at com.google.cloud.teleport.v2.templates.KafkaToBigQueryFlex.run(KafkaToBigQueryFlex.java:296)
	at com.google.cloud.teleport.v2.templates.KafkaToBigQueryFlex.main(KafkaToBigQueryFlex.java:146)
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Topic 'cdc,cdc2' is invalid
@atognolag atognolag added bug Something isn't working needs triage p2 labels Nov 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage p2
Projects
None yet
Development

No branches or pull requests

1 participant