Skip to content

MINOR: testDelayedTxnOffsetCommitWithBumpedEpochIsRejected #20024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package kafka.server

import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.message.OffsetFetchRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.JoinGroupRequest
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
Expand Down Expand Up @@ -51,6 +51,16 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
testTxnOffsetCommit(false)
}

@ClusterTest
def testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithOldConsumerGroupProtocol(): Unit = {
testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the naming, it should pass false instead of true, shouldn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

@ClusterTest
def testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithNewConsumerGroupProtocol(): Unit = {
testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(false)
}

private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = {
val topic = "topic"
val partition = 0
Expand Down Expand Up @@ -234,4 +244,91 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head
partitionRecord.committedOffset
}

private def testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(useNewProtocol: Boolean): Unit = {
val topic = "topic"
val partition = 0
val transactionalId = "txn"
val groupId = "group"
val offset = 100L

// Creates the __consumer_offsets and __transaction_state topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
createTransactionStateTopic()

// Join the consumer group. Note that we don't heartbeat here so we must use
// a session long enough for the duration of the test.
val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol)
assertTrue(memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
    assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)

assertNotEquals could show more readable messages.

assertTrue(memberEpoch != JoinGroupRequest.UNKNOWN_GENERATION_ID)

createTopic(topic, 1)

for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
val useTV2 = version >= 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 instead of magic number 5?


// Initialize producer. Wait until the coordinator finishes loading.
var producerIdAndEpoch: ProducerIdAndEpoch = null
TestUtils.waitUntilTrue(() =>
try {
producerIdAndEpoch = initProducerId(
transactionalId = transactionalId,
producerIdAndEpoch = ProducerIdAndEpoch.NONE,
expectedError = Errors.NONE
)
true
} catch {
case _: Throwable => false
}, "initProducerId request failed"
)

addOffsetsToTxn(
groupId = groupId,
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId
)

// Complete the transaction.
endTxn(
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId,
isTransactionV2Enabled = useTV2,
committed = true,
expectedError = Errors.NONE
)

// Start a new transaction. Wait for the previous transaction to complete.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we complete this new transaction? otherwise, the next version will have a ongoing transaction.

TestUtils.waitUntilTrue(() =>
try {
addOffsetsToTxn(
groupId = groupId,
producerId = producerIdAndEpoch.producerId,
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch,
transactionalId = transactionalId
)
true
} catch {
case _: Throwable => false
}, "addOffsetsToTxn request failed"
)

// Committing offset with old epoch succeeds for TV1 and fails for TV2.
commitTxnOffset(
groupId = groupId,
memberId = if (version >= 3) memberId else JoinGroupRequest.UNKNOWN_MEMBER_ID,
generationId = if (version >= 3) 1 else JoinGroupRequest.UNKNOWN_GENERATION_ID,
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId,
topic = topic,
partition = partition,
offset = offset,
expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else Errors.NONE,
version = version.toShort
)
}
}
}