diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 9a4166d3d1..c075a45806 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -657,9 +657,14 @@ public T executeInTransaction(OperationsCallback callback) { catch (SkipAbortException e) { // NOSONAR - exception flow control throw ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace } - catch (Exception e) { - producer.abortTransaction(); - throw e; + catch (Exception ex) { + try { + producer.abortTransaction(); + } + catch (Exception abortException) { + ex.addSuppressed(abortException); + } + throw ex; } finally { this.producers.remove(currentThread); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index b28fae476d..7155145826 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -506,6 +506,31 @@ public void testAbort() { verify(producer, never()).commitTransaction(); } + @Test + public void abortFiledOriginalExceptionRethrown() { + MockProducer producer = spy(new MockProducer<>()); + producer.initTransactions(); + producer.abortTransactionException = new RuntimeException("abort failed"); + + ProducerFactory pf = new MockProducerFactory<>((tx, id) -> producer, null); + + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + + assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> + template.executeInTransaction(t -> { + throw new RuntimeException("intentional"); + })) + .withMessage("intentional") + .withStackTraceContaining("abort failed"); + + assertThat(producer.transactionCommitted()).isFalse(); + assertThat(producer.transactionAborted()).isFalse(); + assertThat(producer.closed()).isTrue(); + verify(producer, never()).commitTransaction(); + } + @Test public void testExecuteInTransactionNewInnerTx() { @SuppressWarnings("unchecked")