Skip to content

Commit 23b809a

Browse files
authored
KAFKA-19301 Move and rewrite partition state classes to Java in org.apache.kafka.storage.internals.log (#20083)
move following interface/class to `org.apache.kafka.storage.internals.log` and rewrtie to java `PartitionListener‎` `AlterPartitionListener` `AssignmentState` `OngoingReassignmentState` `SimpleAssignmentState` `PartitionState` `PendingPartitionChange` `PendingExpandIsr` `PendingShrinkIsr` `CommittedPartitionState` Reviewers: TengYao Chi <[email protected]>, TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 6b96273 commit 23b809a

File tree

25 files changed

+632
-300
lines changed

25 files changed

+632
-300
lines changed

core/src/main/java/kafka/server/share/SharePartitionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package kafka.server.share;
1818

19-
import kafka.cluster.PartitionListener;
2019
import kafka.server.ReplicaManager;
2120

2221
import org.apache.kafka.common.TopicIdPartition;
@@ -59,6 +58,7 @@
5958
import org.apache.kafka.server.util.timer.SystemTimerReaper;
6059
import org.apache.kafka.server.util.timer.Timer;
6160
import org.apache.kafka.server.util.timer.TimerTask;
61+
import org.apache.kafka.storage.internals.log.PartitionListener;
6262
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
6363

6464
import org.slf4j.Logger;

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 34 additions & 180 deletions
Large diffs are not rendered by default.

core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package kafka.coordinator.group
1818

19-
import kafka.cluster.PartitionListener
2019
import kafka.server.ReplicaManager
2120
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
2221
import org.apache.kafka.common.protocol.Errors
@@ -25,7 +24,7 @@ import org.apache.kafka.coordinator.common.runtime.PartitionWriter
2524
import org.apache.kafka.server.ActionQueue
2625
import org.apache.kafka.server.common.RequestLocal
2726
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
28-
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
27+
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, PartitionListener, VerificationGuard}
2928

3029
import java.util.concurrent.CompletableFuture
3130
import scala.collection.Map

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package kafka.server
1818

1919
import com.yammer.metrics.core.Meter
20-
import kafka.cluster.{Partition, PartitionListener}
20+
import kafka.cluster.{Partition}
2121
import kafka.log.LogManager
2222
import kafka.server.HostedPartition.Online
2323
import kafka.server.QuotaFactory.QuotaManagers
@@ -65,7 +65,7 @@ import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
6565
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
6666
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
6767
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
68-
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
68+
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard,PartitionListener}
6969
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
7070

7171
import java.io.File

core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.kafka.server.common.MetadataVersion
2727
import org.apache.kafka.server.config.ReplicationConfigs
2828
import org.apache.kafka.server.util.MockTime
2929
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
30-
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
30+
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,AlterPartitionListener}
3131
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
3232
import org.junit.jupiter.api.{AfterEach, BeforeEach}
3333
import org.mockito.ArgumentMatchers

core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
*/
1717
package kafka.cluster
1818

19+
import org.apache.kafka.storage.internals.log.SimpleAssignmentState
1920
import org.apache.kafka.common.DirectoryId
2021
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
2122
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
2223
import org.junit.jupiter.params.ParameterizedTest
2324
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
2425

2526
import java.util
26-
import scala.jdk.CollectionConverters._
2727

2828
object AssignmentStateTest {
2929
import AbstractPartitionTest._
@@ -88,7 +88,7 @@ class AssignmentStateTest extends AbstractPartitionTest {
8888
@MethodSource(Array("parameters"))
8989
def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int],
9090
adding: Array[Int], removing: Array[Int],
91-
original: util.List[Int], isUnderReplicated: Boolean): Unit = {
91+
original: util.List[Integer], isUnderReplicated: Boolean): Unit = {
9292
val partitionRegistration = new PartitionRegistration.Builder()
9393
.setLeader(brokerId)
9494
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
@@ -103,7 +103,7 @@ class AssignmentStateTest extends AbstractPartitionTest {
103103

104104
// set the original replicas as the URP calculation will need them
105105
if (!original.isEmpty)
106-
partition.assignmentState = SimpleAssignmentState(original.asScala)
106+
partition.assignmentState = new SimpleAssignmentState(original)
107107
// do the test
108108
partition.makeLeader(partitionRegistration, isNew = false, offsetCheckpoints, None)
109109
val isReassigning = !adding.isEmpty || !removing.isEmpty

core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
3737
import org.apache.kafka.server.util.MockTime
3838
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
3939
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
40-
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
40+
import org.apache.kafka.storage.internals.log.{AlterPartitionListener, AppendOrigin, CleanerConfig, CommittedPartitionState, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, PendingShrinkIsr, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
4141
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
4242
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
4343
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

0 commit comments

Comments
 (0)