Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RejectedExecutionException when executing container tests #6299

Open
haraldott opened this issue Aug 23, 2024 · 13 comments
Open

RejectedExecutionException when executing container tests #6299

haraldott opened this issue Aug 23, 2024 · 13 comments
Labels
Waiting on feedback Issues that require feedback from User/Other community members

Comments

@haraldott
Copy link

Describe the bug

So I have a container testing suite with a k8s cluster containing a kafka + strimzi cluster and with the fabric8 kubernetes-client I'm creating/deleting some KafkaUsers/KafkaTopic resources.
There are 18 tests and sometimes they run through correctly, but very often I'm getting the error that I've pasted down below. I'm not closing the client in between. It's happening completely arbitrarily, sometimes it happens, sometimes it doesn't.
Any advice?

Fabric8 Kubernetes Client version

other (please specify in additional context)

Steps to reproduce

I'm using the KubernetesClientBuilder like this to obtain a client:

public KubernetesClient getKubernetesClient() {
return new KubernetesClientBuilder().build();
}

Expected behavior

No crashes

Runtime

Kubernetes (vanilla)

Kubernetes API Server version

1.25.3@latest

Environment

macOS

Fabric8 Kubernetes Client Logs

java.util.concurrent.RejectedExecutionException: Task io.fabric8.kubernetes.client.utils.internal.SerialExecutor$$Lambda/0x000007fe018136a0@55e48584 rejected from java.util.concurrent.ThreadPoolExecutor@21fef032[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]

	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
	at io.fabric8.kubernetes.client.utils.internal.SerialExecutor.scheduleNext(SerialExecutor.java:75)
	at io.fabric8.kubernetes.client.utils.internal.SerialExecutor.lambda$execute$0(SerialExecutor.java:65)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)

Additional context

fabric8 kubernetes client version: 6.13.1

@shawkins
Copy link
Contributor

To clarify do the tests run incorrectly, or is this just unexpectedly logged? Either way can you provide more of the stacktrace? There have been two other issues addressed recently where an exception like this had been logged, but is now surpressed.

@rohanKanojia rohanKanojia added the Waiting on feedback Issues that require feedback from User/Other community members label Aug 27, 2024
@haraldott
Copy link
Author

This is the complete stacktrace. It doesn't even tell me which call from my code exactly causes it, but it's obviously some call to the k8s client library (client.resources(KafkaUser.class).inNamespace(this.KAFKA_NAMESPACE).load(yamlFile) .create()), I'm creating some Kafka resources with it.
It's causing the current test to fail completely, it's not just some logging - it continues with the next test.
I've also just updated to the latest version 6.13.3 and the problem is still there.

@manusa
Copy link
Member

manusa commented Aug 27, 2024

Is it possible that your tests are closing the client? (e.g. your first test performs a try with resources to the client and then the subsequent tests fail).

@haraldott
Copy link
Author

I've had some calls of the client inside try {} blocks:
I've transformed this:

try (final InputStream yamlFile = this.getClass().getResourceAsStream(resourcePath)) {
      assertNotNull(yamlFile, "File not found: " + resourcePath);
      this.client.resources(KafkaUser.class).inNamespace(this.KAFKA_NAMESPACE).load(yamlFile)
          .create();
    }

into this:

    final InputStream yamlFile = this.getClass().getResourceAsStream(resourcePath);
    assertNotNull(yamlFile, "File not found: " + resourcePath);
    this.client.resources(KafkaUser.class).inNamespace(this.KAFKA_NAMESPACE).load(yamlFile)
        .create();

Still getting the same errors.
It's always the same stacktrace that I've posted - never showing where exactly the problem happened in my code.

It's maybe easiest if I give you a complete example of a test
It's not always the same test that's failing but it already happened to this one and this is the least complex:

  void shouldCreateKafkaUser() {
    this.loadKafkaUserYaml("/kafka-user-yamls/kafka-user.yml");
    this.loadTopicsYaml("/kafka-user-yamls/kafka-topics.yml");

    waitUntilAsserted(() -> {
      final LinkedHashSet <OIDCUser> users = this.getUsersFromConfigMap();
      final boolean userCreated = users.stream()
          .anyMatch(user -> SSO_TESTUSER_NAME.equals(user.getUsername()));
      assertTrue(userCreated);
      final List<KafkaTopic> topics = this.client.resources(KafkaTopic.class).inNamespace(this.KAFKA_NAMESPACE).list().getItems();
      assertEquals(3, topics.size());
    });
  }

  private void loadKafkaUserYaml(final String resourcePath) {
    final InputStream yamlFile = this.getClass().getResourceAsStream(resourcePath);
    assertNotNull(yamlFile, "File not found: " + resourcePath);
    this.client.resources(KafkaUser.class).inNamespace(this.KAFKA_NAMESPACE).load(yamlFile)
        .create();
  }

  private void loadTopicsYaml(final String resourcePath) {
    final InputStream yamlFile = this.getClass().getResourceAsStream(resourcePath);
    assertNotNull(yamlFile, "File not found: " + resourcePath);
    final List<HasMetadata> _topicResult = this.client.load(yamlFile).items();
    this.client.resourceList(_topicResult).inNamespace(this.KAFKA_NAMESPACE).create();
  }

  public static void waitUntilAsserted(final ThrowingRunnable x) {
    await().atMost(TIMEOUT, SECONDS).untilAsserted(x);
  }

the waitUntilAsserted function is utilising await()...untilAsserted() from org.awaitility.Awaitility to do retries checking for the resources to be created, since it might take some time.

@manusa
Copy link
Member

manusa commented Aug 28, 2024

could you show the points where this.client is instantiated (this.client = ...) and (in case you're doing this) where this.client.close() is called?

@haraldott
Copy link
Author

@Component
public class KubernetesClientFactoryImp {
  public KubernetesClient getKubernetesClient() {
    return new KubernetesClientBuilder().build();
  }
}

// -----------------------

@Autowired
private KubernetesClientFactoryImp kubernetesClientFactory;

this.client = this.kubernetesClientFactory.getKubernetesClient();

.close() is never called

@manusa
Copy link
Member

manusa commented Aug 28, 2024

Have you noticed if the stacktrace comes after a test failure?
Are the tests run in parallel or with some sort of threading mechanism?

The only thing I can see here is the possibility that the await gets called in a thread where the client was already closed. However, the snippets you show seem to be blocking (unless the tests are orchestrated externally)

@haraldott
Copy link
Author

The only orchestration of the tests are the @Test and @Order(1) ... @Order(n) from org.junit.jupiter.api and @SpringBootTest.

The Exception itself is the testfailure, nothing else is executed after it.
The test are run sequentially. But there are other threads being started in the background.

We're using the io.fabric8.kubernetes.client.informers.ResourceEventHandler to react to changes on KafkaUser resources. These are started in the background, every triggering of the informer executes the onUpdate / onAdd functions in their own threads.

So for a test, I'm modifying a resource, which we can see here:

2024-08-28T16:15:38.879+02:00  INFO 29075 --- [           main] c.m.e.s.t.UserAkhqServiceContainerTest   : Start test groupsShouldContainAdditionalRolesAndPatternsAfterUpdatingKafkaUser
2024-08-28T16:15:38.888+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.c.w.AbstractResourceEventHandler   : K8s event received: 'MODIFIED' for KafkaUser: 'test-kafka-user'
2024-08-28T16:15:38.888+02:00  INFO 29075 --- [pool-5-thread-2] my.project.AkhqService    : Updating ACLs for user: TESTUSER
2024-08-28T16:15:38.888+02:00  INFO 29075 --- [pool-5-thread-2] my.project.AkhqService    : Saving AKHQ configuration.
2024-08-28T16:15:38.965+02:00  INFO 29075 --- [pool-5-thread-2] my.project.AkhqService    : Restarting AKHQ service to use the new configuration.

Then, another test is started, but we can see in the log output, that the onUpdate() function is triggered one more time, and executed in a new thread.
Lines 3-5 from the top are coming from the previous test.

2024-08-28T16:15:39.001+02:00  INFO 29075 --- [           main] c.m.e.s.t.UserAkhqServiceContainerTest   : Start test Given_kafkaUserContainsDeputy_When_deputyIsRemoved_Then_kafkaUserGroupShouldBeDeletedFromUser
2024-08-28T16:15:39.010+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.c.w.KafkaUserResourceEventHandler  : Members or deputies have not changed in annotations.
2024-08-28T16:15:39.010+02:00  INFO 29075 --- [pool-5-thread-3] c.m.e.c.w.AbstractResourceEventHandler   : K8s event received: 'MODIFIED' for KafkaUser: 'test-kafka-user'
2024-08-28T16:15:39.010+02:00  INFO 29075 --- [pool-5-thread-3] c.m.e.c.w.KafkaUserResourceEventHandler  : KafkaUser: 'test-kafka-user' was modified, but no ACLs have changed. Ignoring event.
2024-08-28T16:15:39.010+02:00  INFO 29075 --- [pool-5-thread-3] c.m.e.c.w.KafkaUserResourceEventHandler  : Members or deputies have not changed in annotations.
2024-08-28T16:15:44.021+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.c.w.AbstractResourceEventHandler   : K8s event received: 'ADDED' for KafkaUser: 'test-kafka-user-deputy'
2024-08-28T16:15:44.106+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Adding AKHQ security config for user: test-kafka-user-deputy
2024-08-28T16:15:44.106+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic.group and Kafka User test-kafka-user-deputy
2024-08-28T16:15:44.106+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic.group and Kafka User test-kafka-user-deputy
2024-08-28T16:15:44.106+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic-read-only-1 and Kafka User test-kafka-user-deputy
2024-08-28T16:15:44.106+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic-read-only-2 and Kafka User test-kafka-user-deputy
2024-08-28T16:15:44.106+02:00  INFO 29075 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic-2 and Kafka User test-kafka-user-deputy
2024-08-28T16:15:44.106+02:00  INFO 29075 --- [pool-5-thread-2] my.project.AkhqService    : adding OIDC users to AKHQ config as SSO Users from KafkaUser: [MYDEPUTYUSER]
2024-08-28T16:15:44.106+02:00  INFO 29075 --- [pool-5-thread-2] my.project.AkhqService    : Saving AKHQ configuration.

java.util.concurrent.RejectedExecutionException: Task io.fabric8.kubernetes.client.utils.internal.SerialExecutor$$Lambda$1269/0x0000000801487430@d53b1a2 rejected from java.util.concurrent.ThreadPoolExecutor@4e47c938[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]

	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2070)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
	at io.fabric8.kubernetes.client.utils.internal.SerialExecutor.scheduleNext(SerialExecutor.java:75)
	at io.fabric8.kubernetes.client.utils.internal.SerialExecutor.lambda$execute$0(SerialExecutor.java:65)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)


@shawkins
Copy link
Contributor

Can you increase the fabric8 logging? There is a debug message when the client is closed.

@haraldott
Copy link
Author

Now it happened again for the first test (the one that I've posted here already), no other tests are executed before it.

I think that only the last part of the output is relevant, we can see that the for some reason, close() is called in io.fabric8.kubernetes.client.impl.BaseClient.
The Socket closed exception happens quite often but doesn't seem to do any harm - other tests where this happens do not fail.
This is the complete debug output:

2024-08-28T17:17:38.912+02:00  WARN 36554 --- [       Thread-6] i.f.k.c.dsl.internal.VersionUsageUtils   : The client is using resource type 'kafkausers' with unstable version 'v1beta2'
2024-08-28T17:17:38.943+02:00 DEBUG 36554 --- [       Thread-6] i.f.kubernetes.client.impl.BaseClient    : The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
2024-08-28T17:17:38.945+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Trying to configure client from Kubernetes config...
2024-08-28T17:17:38.946+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Found for Kubernetes config at: [./kubeconfig].
2024-08-28T17:17:38.947+02:00 DEBUG 36554 --- [       Thread-6] i.f.k.client.utils.HttpClientUtils       : Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory
2024-08-28T17:17:38.973+02:00  WARN 36554 --- [       Thread-6] i.f.k.c.dsl.internal.VersionUsageUtils   : The client is using resource type 'kafkatopics' with unstable version 'v1beta2'
2024-08-28T17:17:38.986+02:00 DEBUG 36554 --- [       Thread-6] i.f.kubernetes.client.impl.BaseClient    : The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
2024-08-28T17:17:38.987+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Trying to configure client from Kubernetes config...
2024-08-28T17:17:38.987+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Found for Kubernetes config at: [./kubeconfig].
2024-08-28T17:17:38.988+02:00 DEBUG 36554 --- [       Thread-6] i.f.k.client.utils.HttpClientUtils       : Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory
2024-08-28T17:17:39.026+02:00 DEBUG 36554 --- [       Thread-6] i.f.kubernetes.client.impl.BaseClient    : The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
2024-08-28T17:17:39.026+02:00  INFO 36554 --- [       Thread-6] my.project..AkhqService    : Saving AKHQ configuration.
2024-08-28T17:17:39.031+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Trying to configure client from Kubernetes config...
2024-08-28T17:17:39.031+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Found for Kubernetes config at: [./kubeconfig].
2024-08-28T17:17:39.032+02:00 DEBUG 36554 --- [       Thread-6] i.f.k.client.utils.HttpClientUtils       : Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory
2024-08-28T17:17:39.065+02:00 DEBUG 36554 --- [       Thread-6] i.f.kubernetes.client.impl.BaseClient    : The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
2024-08-28T17:17:39.073+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Trying to configure client from Kubernetes config...
2024-08-28T17:17:39.074+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Found for Kubernetes config at: [./kubeconfig].
2024-08-28T17:17:39.075+02:00 DEBUG 36554 --- [       Thread-6] i.f.k.client.utils.HttpClientUtils       : Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
2024-08-28T17:17:39.112+02:00 DEBUG 36554 --- [       Thread-6] i.f.kubernetes.client.impl.BaseClient    : The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
2024-08-28T17:17:39.112+02:00  INFO 36554 --- [       Thread-6] my.project.AkhqService    : Restarting AKHQ service to use the new configuration.
2024-08-28T17:17:39.113+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Trying to configure client from Kubernetes config...
2024-08-28T17:17:39.113+02:00 DEBUG 36554 --- [       Thread-6] io.fabric8.kubernetes.client.Config      : Found for Kubernetes config at: [./kubeconfig].
2024-08-28T17:17:39.114+02:00 DEBUG 36554 --- [       Thread-6] i.f.k.client.utils.HttpClientUtils       : Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory
2024-08-28T17:17:39.173+02:00 DEBUG 36554 --- [       Thread-6] i.f.kubernetes.client.impl.BaseClient    : The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
2024-08-28T17:17:39.176+02:00 DEBUG 36554 --- [       Thread-6] i.f.k.c.i.i.DefaultSharedIndexInformer   : Ready to run resync and reflector for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkausers with resync 3600000
2024-08-28T17:17:39.178+02:00 DEBUG 36554 --- [       Thread-6] i.f.k.c.i.i.DefaultSharedIndexInformer   : Ready to run resync and reflector for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkatopics with resync 3600000
2024-08-28T17:17:39.179+02:00 DEBUG 36554 --- [       Thread-6] i.f.k.c.i.i.DefaultSharedIndexInformer   : Ready to run resync and reflector for v1/namespaces/kafka-container-test/secrets with resync 3600000
2024-08-28T17:17:39.179+02:00  INFO 36554 --- [       Thread-6] my.project.UserAkHQApplication    : Waiting for Kubernetes events...
2024-08-28T17:17:39.191+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.informers.impl.cache.Reflector   : Listing items (0) for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkausers at v18344
2024-08-28T17:17:39.192+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.informers.impl.cache.Reflector   : Listing items (0) for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkatopics at v18344
2024-08-28T17:17:39.193+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.informers.impl.cache.Reflector   : Starting watcher for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkatopics at v18344
2024-08-28T17:17:39.193+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.informers.impl.cache.Reflector   : Starting watcher for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkausers at v18344
2024-08-28T17:17:39.193+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.d.internal.AbstractWatchManager  : Watching https://localhost:54383/apis/kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkatopics?allowWatchBookmarks=true&resourceVersion=18344&timeoutSeconds=600&watch=true...
2024-08-28T17:17:39.193+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.d.internal.AbstractWatchManager  : Watching https://localhost:54383/apis/kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkausers?allowWatchBookmarks=true&resourceVersion=18344&timeoutSeconds=600&watch=true...
2024-08-28T17:17:39.196+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.informers.impl.cache.Reflector   : Listing items (9) for v1/namespaces/kafka-container-test/secrets at v18345
2024-08-28T17:17:39.196+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.informers.impl.cache.Reflector   : Starting watcher for v1/namespaces/kafka-container-test/secrets at v18345
2024-08-28T17:17:39.196+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.d.internal.AbstractWatchManager  : Watching https://localhost:54383/api/v1/namespaces/kafka-container-test/secrets?allowWatchBookmarks=true&resourceVersion=18345&timeoutSeconds=600&watch=true...
2024-08-28T17:17:39.208+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.d.i.WatcherWebSocketListener     : WebSocket successfully opened
2024-08-28T17:17:39.208+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.d.i.WatcherWebSocketListener     : WebSocket successfully opened
2024-08-28T17:17:39.208+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.d.i.WatcherWebSocketListener     : WebSocket successfully opened
2024-08-28T17:17:39.209+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.informers.impl.cache.Reflector   : Watch started for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkatopics
2024-08-28T17:17:39.209+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.informers.impl.cache.Reflector   : Watch started for v1/namespaces/kafka-container-test/secrets
2024-08-28T17:17:39.209+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.informers.impl.cache.Reflector   : Watch started for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkausers
2024-08-28T17:17:39.246+02:00 DEBUG 36554 --- [           main] i.f.kubernetes.client.CustomResource     : Calling CustomResource#setApiVersion doesn't do anything because the API version is computed and shouldn't be changed
2024-08-28T17:17:39.246+02:00 DEBUG 36554 --- [           main] i.f.kubernetes.client.CustomResource     : Calling CustomResource#setKind doesn't do anything because the Kind is computed and shouldn't be changed
2024-08-28T17:17:39.250+02:00 DEBUG 36554 --- [           main] i.f.kubernetes.client.CustomResource     : Calling CustomResource#setApiVersion doesn't do anything because the API version is computed and shouldn't be changed
2024-08-28T17:17:39.264+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.kubernetes.client.CustomResource     : Calling CustomResource#setApiVersion doesn't do anything because the API version is computed and shouldn't be changed
2024-08-28T17:17:39.264+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.kubernetes.client.CustomResource     : Calling CustomResource#setKind doesn't do anything because the Kind is computed and shouldn't be changed
2024-08-28T17:17:39.272+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.kubernetes.client.CustomResource     : Calling CustomResource#setApiVersion doesn't do anything because the API version is computed and shouldn't be changed
2024-08-28T17:17:39.272+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.kubernetes.client.CustomResource     : Calling CustomResource#setKind doesn't do anything because the Kind is computed and shouldn't be changed
2024-08-28T17:17:39.273+02:00 DEBUG 36554 --- [pool-5-thread-1] i.f.k.c.informers.impl.cache.Reflector   : Event received ADDED KafkaUser resourceVersion v18365 for kafka.strimzi.io/v1beta2/namespaces/kafka-container-test/kafkausers
2024-08-28T17:17:39.274+02:00  INFO 36554 --- [pool-5-thread-2] c.m.e.c.w.AbstractResourceEventHandler   : K8s event received: 'ADDED' for KafkaUser: 'test-kafka-user'
2024-08-28T17:17:39.274+02:00 DEBUG 36554 --- [pool-5-thread-2] io.fabric8.kubernetes.client.Config      : Trying to configure client from Kubernetes config...
2024-08-28T17:17:39.274+02:00 DEBUG 36554 --- [pool-5-thread-2] io.fabric8.kubernetes.client.Config      : Found for Kubernetes config at: [./kubeconfig].
2024-08-28T17:17:39.275+02:00 DEBUG 36554 --- [pool-5-thread-2] i.f.k.client.utils.HttpClientUtils       : Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory
2024-08-28T17:17:39.299+02:00 DEBUG 36554 --- [pool-5-thread-2] i.f.k.c.i.i.DefaultSharedIndexInformer   : Ready to run resync and reflector for v1/namespaces/kafka-container-test/secrets with resync 0
2024-08-28T17:17:39.299+02:00 DEBUG 36554 --- [pool-5-thread-2] i.f.k.c.i.i.DefaultSharedIndexInformer   : Resync skipped due to 0 full resync period for v1/namespaces/kafka-container-test/secrets
2024-08-28T17:17:39.309+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.informers.impl.cache.Reflector   : Listing items (0) for v1/namespaces/kafka-container-test/secrets at v18365
2024-08-28T17:17:39.310+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.informers.impl.cache.Reflector   : Starting watcher for v1/namespaces/kafka-container-test/secrets at v18365
2024-08-28T17:17:39.310+02:00 DEBUG 36554 --- [Http Dispatcher] i.f.k.c.d.internal.AbstractWatchManager  : Watching https://localhost:54383/api/v1/namespaces/kafka-container-test/secrets?allowWatchBookmarks=true&fieldSelector=metadata.name%3Dtest-kafka-user&resourceVersion=18365&timeoutSeconds=600&watch=true...
2024-08-28T17:17:39.314+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.d.i.WatcherWebSocketListener     : WebSocket successfully opened
2024-08-28T17:17:39.315+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.informers.impl.cache.Reflector   : Watch started for v1/namespaces/kafka-container-test/secrets
2024-08-28T17:17:39.333+02:00 DEBUG 36554 --- [pool-5-thread-1] i.f.k.c.informers.impl.cache.Reflector   : Event received ADDED Secret resourceVersion v18366 for v1/namespaces/kafka-container-test/secrets
2024-08-28T17:17:39.333+02:00 DEBUG 36554 --- [ool-13-thread-1] i.f.k.c.informers.impl.cache.Reflector   : Event received ADDED Secret resourceVersion v18366 for v1/namespaces/kafka-container-test/secrets
2024-08-28T17:17:39.333+02:00 DEBUG 36554 --- [pool-5-thread-2] i.f.kubernetes.client.impl.BaseClient    : The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
2024-08-28T17:17:39.334+02:00 DEBUG 36554 --- [ool-13-thread-1] i.f.k.c.informers.impl.cache.Reflector   : Stopping watcher for v1/namespaces/kafka-container-test/secrets at v18365
2024-08-28T17:17:39.334+02:00 DEBUG 36554 --- [ool-13-thread-1] i.f.k.c.d.internal.AbstractWatchManager  : Force closing the watch
2024-08-28T17:17:39.334+02:00 DEBUG 36554 --- [ool-13-thread-1] i.f.k.c.d.internal.AbstractWatchManager  : Closing the current watch
2024-08-28T17:17:39.334+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.d.internal.AbstractWatchManager  : Watch connection error, will reconnect if possible

java.net.SocketException: Socket closed
	at java.base/sun.nio.ch.NioSocketImpl.endRead(NioSocketImpl.java:245) ~[na:na]
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:324) ~[na:na]
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:347) ~[na:na]
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:800) ~[na:na]
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966) ~[na:na]
	at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:478) ~[na:na]
	at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:472) ~[na:na]
	at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70) ~[na:na]
	at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1460) ~[na:na]
	at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:1064) ~[na:na]
	at okio.InputStreamSource.read(JvmOkio.kt:93) ~[okio-jvm-3.6.0.jar:na]
	at okio.AsyncTimeout$source$1.read(AsyncTimeout.kt:128) ~[okio-jvm-3.6.0.jar:na]
	at okio.RealBufferedSource.request(RealBufferedSource.kt:209) ~[okio-jvm-3.6.0.jar:na]
	at okio.RealBufferedSource.require(RealBufferedSource.kt:202) ~[okio-jvm-3.6.0.jar:na]
	at okio.RealBufferedSource.readByte(RealBufferedSource.kt:212) ~[okio-jvm-3.6.0.jar:na]
	at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.kt:119) ~[okhttp-4.12.0.jar:na]
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.kt:102) ~[okhttp-4.12.0.jar:na]
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.kt:293) ~[okhttp-4.12.0.jar:na]
	at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse(RealWebSocket.kt:195) ~[okhttp-4.12.0.jar:na]
	at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519) ~[okhttp-4.12.0.jar:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2024-08-28T17:17:39.336+02:00 DEBUG 36554 --- [lhost:54383/...] i.f.k.c.d.internal.AbstractWatchManager  : Ignoring already closed/closing connection
2024-08-28T17:17:39.336+02:00  INFO 36554 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Adding AKHQ security config for user: test-kafka-user
2024-08-28T17:17:39.337+02:00  INFO 36554 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic.group and Kafka User test-kafka-user
2024-08-28T17:17:39.337+02:00  INFO 36554 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic.group and Kafka User test-kafka-user
2024-08-28T17:17:39.337+02:00  INFO 36554 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic-read-only-1 and Kafka User test-kafka-user
2024-08-28T17:17:39.337+02:00  INFO 36554 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic-read-only-2 and Kafka User test-kafka-user
2024-08-28T17:17:39.337+02:00  INFO 36554 --- [pool-5-thread-2] c.m.e.service.AkhqConfigMapGenerator     : Setting AKHQ security config for topic: test-topic-2 and Kafka User test-kafka-user
2024-08-28T17:17:39.337+02:00  INFO 36554 --- [pool-5-thread-2] my.project.AkhqService    : adding OIDC users to AKHQ config as SSO Users from KafkaUser: [TESTUSER]
2024-08-28T17:17:39.338+02:00  INFO 36554 --- [pool-5-thread-2] my.project.AkhqService    : Saving AKHQ configuration.
2024-08-28T17:17:39.340+02:00 DEBUG 36554 --- [pool-5-thread-2] io.fabric8.kubernetes.client.Config      : Trying to configure client from Kubernetes config...
2024-08-28T17:17:39.340+02:00 DEBUG 36554 --- [pool-5-thread-2] io.fabric8.kubernetes.client.Config      : Found for Kubernetes config at: [./kubeconfig].
2024-08-28T17:17:39.341+02:00 DEBUG 36554 --- [pool-5-thread-2] i.f.k.client.utils.HttpClientUtils       : Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory
2024-08-28T17:17:39.372+02:00 DEBUG 36554 --- [pool-5-thread-2] i.f.kubernetes.client.impl.BaseClient    : The client and associated httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl have been closed, the usage of this or any client using the httpclient will not work after this
2024-08-28T17:17:39.377+02:00 DEBUG 36554 --- [pool-5-thread-2] io.fabric8.kubernetes.client.Config      : Trying to configure client from Kubernetes config...
2024-08-28T17:17:39.377+02:00 DEBUG 36554 --- [pool-5-thread-2] io.fabric8.kubernetes.client.Config      : Found for Kubernetes config at: [./kubeconfig].
2024-08-28T17:17:39.378+02:00 DEBUG 36554 --- [pool-5-thread-2] i.f.k.client.utils.HttpClientUtils       : Using httpclient io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory factory

java.util.concurrent.RejectedExecutionException: Task io.fabric8.kubernetes.client.utils.internal.SerialExecutor$$Lambda$1272/0x00000008014837d0@2fd89ee4 rejected from java.util.concurrent.ThreadPoolExecutor@cd3897f[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]

	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2070)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
	at io.fabric8.kubernetes.client.utils.internal.SerialExecutor.scheduleNext(SerialExecutor.java:75)
	at io.fabric8.kubernetes.client.utils.internal.SerialExecutor.lambda$execute$0(SerialExecutor.java:65)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)


@shawkins
Copy link
Contributor

shawkins commented Aug 28, 2024

I think that only the last part of the output is relevant, we can see that the for some reason, close() is called in io.fabric8.kubernetes.client.impl.BaseClient.

Fabric8 won't proactively close the client, that has to be coming from something using the client. We also see one client being closed, while another one is being created - is that expected? The thread for test execution seems to be " Thread-6" while the close / creation is happening in "pool-5-thread-2" so it does seem like that could be happening asynchronously from the main test logic. It seems like you could look at when / why AkhqConfigMapGenerator is being called to figure out what is closing the client as that is happening on the same thread.

The Socket closed exception happens quite often but doesn't seem to do any harm - other tests where this happens do not fail.

Watches are tolerant to various exceptions, since this is logged at a debug level it is expected and handled internally.

@haraldott
Copy link
Author

haraldott commented Sep 17, 2024

Hi,
so I finally had time to look into this issue again.

We also see one client being closed, while another one is being created - is that expected?

There are multiple things happening in parallel here. As soon as a KafkaUser resource is updated, our ResourceEventHandler will get triggered, thus spawning a new thread. The program logic will do a couple of things, also using the client. The client is always correctly closed everywhere in the code after it is being used - it is not used after it is being closed, I always use it inside a try block like this for example:

    try (final KubernetesClient kc = this.kubernetesClientFactory.getKubernetesClient()) {
      log.info("Using kubernetes client in ConfigmapRepository replace");
      kc.configMaps().resource(configMap).update();
    }

So there's no way I'm accidentally using the client after it has been closed.

The thread for test execution seems to be " Thread-6" while the close / creation is happening in "pool-5-thread-2" so it does seem like that could be happening asynchronously from the main test logic

That's coming from the ResourceEventHandler like described above. The main thread will run the application itself while more threads will get spawned for every triggering of any ResourceEventHandler by a create / update of a watched resource.

It seems like you could look at when / why AkhqConfigMapGenerator is being called to figure out what is closing the client as that is happening on the same thread.

That's not the case. AkhqConfigMapGenerator does not use the client.

Like I said, also the tests are not doing anything spectacular:

this.loadKafkaUserYaml("/kafka-user-yamls/kafka-user-with-multiple-sso-users.yaml");
this.assertBasicAuthUserCreated(TEST_KAFKA_USER);

// some more assertions

this.loadKafkaUserYaml("/kafka-user-yamls/kafka-user-with-multiple-sso-users3.yaml");
this.assertBasicAuthUserCreated(NEW_KAFKA_USER);

// some more assertions
  private void assertBasicAuthUserCreated(final String username) {
    waitUntilAsserted(() -> {
      final Set<Auth> basicAuthUsers = this.getBasicAuthUsers();
      final boolean userCreated = basicAuthUsers.stream()
          .anyMatch(user -> username.equals(user.getUsername()));
      assertTrue(userCreated);
    });
  }

  private LinkedHashSet <OIDCUser> getUsersFromConfigMap() throws JsonProcessingException {
    return this.getRootAkhqConfigYaml().getYamlSectionAkhq().getSecurityConfigYaml().getOidc().getProviders().get("sso").getUsers();
  }

  private RootAkhqConfigYaml getRootAkhqConfigYaml() throws JsonProcessingException {
    final String akhqConfigMap = this.client.configMaps().inNamespace(this.AKHQ_NAMESPACE).withName(CONFIGMAP_NAME)
        .get().getData().get(AHQL_CONFIG_FILENAME);
    final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
    return mapper.readValue(akhqConfigMap, RootAkhqConfigYaml.class);
  }

@shawkins
Copy link
Contributor

So there's no way I'm accidentally using the client after it has been closed.

Perhaps not directly, but references to the client from within the try catch block can certainly live on after the closure.

That's not the case. AkhqConfigMapGenerator does not use the client.

I was talking about the thread - whatever thread is exercising AkhqConfigMapGenerator seems to be the same thread that is closing / opening clients.

Just to make sure, what thread is the rejected execution exception happening on in the log you are showing?

Like I said, also the tests are not doing anything spectacular:

The fabric8 code is pretty straigh-forward as well. The main executor don't shut itself down, that only happens when the client is closed. We know that is likely happening here because the rejected execution exception you are seeing does not originate with a serial executor, it is coming from an actual thread pool executor.

Would it be easy for you to provide a reproducer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Waiting on feedback Issues that require feedback from User/Other community members
Projects
None yet
Development

No branches or pull requests

4 participants