Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema registry USER_INFO authentication seemed didn't work #31

Closed
wahyd4 opened this issue Sep 10, 2020 · 4 comments
Closed

Schema registry USER_INFO authentication seemed didn't work #31

wahyd4 opened this issue Sep 10, 2020 · 4 comments

Comments

@wahyd4
Copy link

wahyd4 commented Sep 10, 2020

Hi there, I am currently trying to use this transformer to migrate multiple Kafka clusters and schema registries to one cluster on Confluent cloud.

But with the distributed replicator configuration below, I am keeping getting 401 unauthorized error when the transformer tries to register the schema on the dest schema registry,the fetching schema from the src cluster part works fine as there is no authentication required for that cluster. By the way, the dest schema registry is the Confluent cloud hosted version.

My connector config details are:

{
  "name": "us-replication",
  "config": {
    "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
    "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
    "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "src.kafka.bootstrap.servers": "consumer-kafka:9094",
    "src.kafka.security.protocol": "SASL_SSL",
    "src.kafka.ssl.truststore.type": "jks",
    "src.kafka.ssl.endpoint.identification.algorithm": "https",
    "src.kafka.ssl.truststore.location": "/xxxx.jks",
    "src.kafka.ssl.truststore.password": "xxxx",
    "src.kafka.sasl.mechanism": "SCRAM-SHA-256",
    "src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";",
    "src.kafka.request.timeout.ms": "30000",
    "tasks.max": "1",
    "dest.kafka.bootstrap.servers": "dest-kafka:9092",
    "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";",
    "dest.kafka.security.protocol": "SASL_SSL",
    "dest.kafka.ssl.endpoint.identification.algorithm": "https",
    "dest.kafka.sasl.mechanism": "PLAIN",
    "dest.topic.replication.factor": "3",
    "dest.kafka.request.timeout.ms": "30000",
    "dest.kafka.retry.backoff.ms": "500",
    "topic.rename.format": "${topic}",
    "topic.regex": "^(qa.).*$",
    "offset.topic.commit": false,
    "offset.translator.tasks.max": 1,
    "offset.timestamps.commit": false,
    "transforms": "AvroSchemaTransfer",
    "transforms.AvroSchemaTransfer.type": "cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer",
    "transforms.AvroSchemaTransfer.transfer.message.keys": "false",
    "transforms.AvroSchemaTransfer.src.schema.registry.url": "http://consumer-registry",
    "transforms.AvroSchemaTransfer.dest.schema.registry.url": "https://producer-registry",
    "transforms.AvroSchemaTransfer.dest.basic.auth.credentials.source": "USER_INFO",
    "transforms.AvroSchemaTransfer.dest.basic.auth.user.info": "user:pass"    
  }
}

The errors are:

[2020-09-10 04:13:23,086] ERROR Unable to register source schema id 41 to destination registry. (cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer)
cricket.jmoore.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
	at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
	at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
	at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326)
	at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318)
	at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:313)
	at cricket.jmoore.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:119)
	at cricket.jmoore.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:156)
	at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.copySchema(SchemaRegistryTransfer.java:223)
	at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.apply(SchemaRegistryTransfer.java:176)
	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2020-09-10 04:13:23,106] ERROR Error encountered in task us-replication-0. Executing stage 'TRANSFORMATION' with class 'cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer', where source record is = SourceRecord{sourcePartition={topic=us-qa.dwh.paylater.payment.schedule.payment-scheduled.v1, partition=2}, sourceOffset={offset=932}} ConnectRecord{topic='us-qa.dwh.paylater.payment.schedule.payment-scheduled.v1', kafkaPartition=2, key=[B@27ecfdae, keySchema=Schema{BYTES}, value=[B@6bfd8474, valueSchema=Schema{BYTES}, timestamp=1599185438017, headers=ConnectHeaders(headers=[ConnectHeader(key=id, value=[B@1a081c44, schema=Schema{BYTES})])}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.ConnectException: Transform failed. Unable to update record schema id. (isKey=false)
	at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.lambda$apply$1(SchemaRegistryTransfer.java:178)
	at java.util.Optional.orElseThrow(Optional.java:290)
	at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.apply(SchemaRegistryTransfer.java:177)
	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2020-09-10 04:13:23,107] INFO WorkerSourceTask{id=us-replication-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-10 04:13:23,107] INFO WorkerSourceTask{id=us-replication-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-10 04:13:23,107] DEBUG WorkerSourceTask{id=us-replication-0} Finished offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-10 04:13:23,107] ERROR WorkerSourceTask{id=us-replication-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Transform failed. Unable to update record schema id. (isKey=false)
	at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.lambda$apply$1(SchemaRegistryTransfer.java:178)
	at java.util.Optional.orElseThrow(Optional.java:290)
	at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.apply(SchemaRegistryTransfer.java:177)
	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
	... 11 more
[2020-09-10 04:13:23,107] ERROR WorkerSourceTask{id=us-replication-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

And I also inspected the HTTP request of egistering a schema but didn't see any authentication headers inside, and it looks like the following:

POST /subjects/SomeEvent/versions HTTP/1.1
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Cache-Control: no-cache
Connection: keep-alive
Content-Length: 2463
Content-Type: application/vnd.schemaregistry.v1+json
Pragma: no-cache
User-Agent: Java/1.8.0_212

{"schema":"xxxx"}

Furthermore, I've tested both 0.2.0 and the current master branch, and none of them works in my case.

So, can you help to have a look with this? Thanks.

@OneCricketeer
Copy link
Owner

I haven't used replicator with this, but you can see #25 for MirrorMaker2

@wahyd4
Copy link
Author

wahyd4 commented Sep 11, 2020

I have had a custom build jar working by upgrading the confluent version to 5.4.x.

<confluent.version>5.1.0</confluent.version>

Because I saw there was an authentication change in this repo #19 which was last September but the schema registry client version in pom.xml didn't being changed accordingly.

So, this issue should be solved as long as we upgrade the confluent version to a version released after this pull request confluentinc/schema-registry#1204

@jheinnic
Copy link
Contributor

I've got two cents to offer here.

PR #19 was a workaround for the fact that, as of 5.1.0, Confluent maintained a global/static cache of BasicAuthenticationProviders in its BasicAuthenticationProviderFactory, keyed by the value each Provider's getAlias() name returned (e.g. USER_INFO or URL for Basic Authentication).

In order to proceed without the ability to modify the SchemaRegistryClient in a way that would remove this static cache. Without being able to change the implementation of that cache, the workaround implemented in #19 was to register two identical copies of the existing UserInfoBasicAuthenticationProvider, but with slightly modified getAlias( ) methods, such that one called itself "SRC_USER_INFO" and the other "DST_USER_INFO". The SMT was also modified to add "SRC_" or "DST_" as a prefix when retrieving telling the client which alias to apply the source or destination credentials to, as shown here

In 5.4.1, Confluent updated its implementation of BasicAuthenticationProviderFactory by dropping the static cache--each call to the provider returns a new instance of the provider with a matching alias instead of retrieving its JVM-wide singleton from the global hash. This change will make PR #19 unnecessary, but it should also not change anything if PR #19 is still in use. Confluent has not changed the Provider classes themselves, so it should not matter whether we get two unique instances by assigning unique names to two singleton providers or constructing two instances of just one non-singleton provider. I can't argue with results if bumping the client dependency to 5.4.1 from 5.1.0 resolved your issue, but I would not have expected it to have changed anything either.

There is a test case on line #296 here that tests whether the client gets configured correctly to send basic authentication credentials if only the destination USER_INFO strategy is configured, and that test case does still seem to be passing, so I'm surprised the request you captured shows no such header.

It is possible that the something changed between 5.1.0 and 5.4.1 about how Confluent uses the provider to cause basic authentication credentials to be attached, but I've no reason to believe that's the case. Your case is somewhat unique insofar as it is using SASL security to protect the Kafka client, but I don't see anything in the source that would suggest those configuration properties are in any way exposed to the Schema Registry client, therefore I would not expect them to be a part of this behavior.

I am curious as to how you managed to inspect the HTTP request sent, given that the communication channel is configured for https? If you are relaying what appeared in the log files on the registry, I think it is possible that Confluent is stripping any authentication credentials from the log file to prevent the log from being a security liability. If that's so, then you should be able to send a valid registry request from any other client and also not find the authentication header in the log.

Presuming the previous paragraph's assumption that the sample request came from a log file where the authentication header would have been stripped anyway as a security feature, I think the only way to observe what is really being sent from the SMT transform to the registry server would be to set up a Man-In-The-Middle TLS proxy configured to log each decoded request it receives before initiating a new TLS session with the actual registry and forwarding the original request unmodified.

If that's not how the sample request was collected and you're interested in running such an experiment, I'd start by checking out the Quick Start guide at https://mitmproxy.org/.

@wahyd4
Copy link
Author

wahyd4 commented Sep 22, 2020

Hi @jheinnic ,

Thanks for your reply.

It is possible that the something changed between 5.1.0 and 5.4.1 about how Confluent uses the provider to cause basic authentication credentials to be attached, but I've no reason to believe that's the case. Your case is somewhat unique insofar as it is using SASL security to protect the Kafka client, but I don't see anything in the source that would suggest those configuration properties are in any way exposed to the Schema Registry client, therefore I would not expect them to be a part of this behaviour.

I was having the issue when I use 5.1.0, not any later release version. And I was surprised by the result as well because I saw the #19 was mean to solve this kind of problem.

I am curious as to how you managed to inspect the HTTP request sent, given that the communication channel is configured for https? If you are relaying what appeared in the log files on the registry, I think it is possible that Confluent is stripping any authentication credentials from the log file to prevent the log from being a security liability. If that's so, then you should be able to send a valid registry request from any other client and also not find the authentication header in the log.

In terms of how did I get the HTTP response, I updated the target schema registry server in connect configuration to a local a dummy HTTP server httplab which prints out every request for any endpoints.

By the way, I think this issue can be solved when we merge #29

@wahyd4 wahyd4 closed this as completed Oct 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants