diff --git a/.github/workflows/pr-test.yml b/.github/workflows/pr-impl-test.yml similarity index 82% rename from .github/workflows/pr-test.yml rename to .github/workflows/pr-impl-test.yml index 0b09553752..95981e8280 100644 --- a/.github/workflows/pr-test.yml +++ b/.github/workflows/pr-impl-test.yml @@ -1,4 +1,4 @@ -name: kop mvn build check and ut +name: kop mvn build check and kafka-impl test on: pull_request: @@ -33,11 +33,8 @@ jobs: - name: Spotbugs check run: mvn spotbugs:check - - name: KafkaIntegrationTest - run: mvn test '-Dtest=KafkaIntegration*Test' -pl tests - - - name: test after build - run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest' + - name: kafka-impl test after build + run: mvn test -DfailIfNoTests=false -pl kafka-impl # The DistributedClusterTest is hard to pass in CI tests environment, we disable it first # the track issue: https://github.com/streamnative/kop/issues/184 diff --git a/.github/workflows/pr-integration-tests.yml b/.github/workflows/pr-integration-tests.yml new file mode 100644 index 0000000000..6d9c9728aa --- /dev/null +++ b/.github/workflows/pr-integration-tests.yml @@ -0,0 +1,43 @@ +name: kop integration tests + +on: + pull_request: + branches: + - master + push: + branches: + - master + - branch-* + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + + - name: Build with Maven skipTests + run: mvn clean install -DskipTests + + - name: kop integration test + run: mvn test '-Dtest=KafkaIntegration*Test' -pl tests + + - name: package surefire artifacts + if: failure() + run: | + rm -rf artifacts + mkdir artifacts + find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; + zip -r artifacts.zip artifacts + + - uses: actions/upload-artifact@master + name: upload surefire-artifacts + if: failure() + with: + name: surefire-artifacts + path: artifacts.zip diff --git a/.github/workflows/pr-tests.yml b/.github/workflows/pr-tests.yml new file mode 100644 index 0000000000..1ad98e03e0 --- /dev/null +++ b/.github/workflows/pr-tests.yml @@ -0,0 +1,43 @@ +name: kop tests + +on: + pull_request: + branches: + - master + push: + branches: + - master + - branch-* + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + + - name: Build with Maven skipTests + run: mvn clean install -DskipTests + + - name: tests module + run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest' -pl tests + + - name: package surefire artifacts + if: failure() + run: | + rm -rf artifacts + mkdir artifacts + find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; + zip -r artifacts.zip artifacts + + - uses: actions/upload-artifact@master + name: upload surefire-artifacts + if: failure() + with: + name: surefire-artifacts + path: artifacts.zip diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java index 9f443242a5..b4f6a49797 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java @@ -21,6 +21,7 @@ import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.ssl.SslHandler; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils; import lombok.Getter; import org.apache.pulsar.broker.PulsarService; @@ -40,6 +41,8 @@ public class KafkaChannelInitializer extends ChannelInitializer { @Getter private final GroupCoordinator groupCoordinator; @Getter + private final TransactionCoordinator transactionCoordinator; + @Getter private final boolean enableTls; @Getter private final EndPoint advertisedEndPoint; @@ -49,12 +52,14 @@ public class KafkaChannelInitializer extends ChannelInitializer { public KafkaChannelInitializer(PulsarService pulsarService, KafkaServiceConfiguration kafkaConfig, GroupCoordinator groupCoordinator, + TransactionCoordinator transactionCoordinator, boolean enableTLS, EndPoint advertisedEndPoint) { super(); this.pulsarService = pulsarService; this.kafkaConfig = kafkaConfig; this.groupCoordinator = groupCoordinator; + this.transactionCoordinator = transactionCoordinator; this.enableTls = enableTLS; this.advertisedEndPoint = advertisedEndPoint; @@ -74,7 +79,8 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4)); ch.pipeline().addLast("handler", - new KafkaRequestHandler(pulsarService, kafkaConfig, groupCoordinator, enableTls, advertisedEndPoint)); + new KafkaRequestHandler(pulsarService, kafkaConfig, + groupCoordinator, transactionCoordinator, enableTls, advertisedEndPoint)); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index b9abf5df6f..1bdcda495d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -232,6 +232,24 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case CREATE_TOPICS: handleCreateTopics(kafkaHeaderAndRequest, responseFuture); break; + case INIT_PRODUCER_ID: + handleInitProducerId(kafkaHeaderAndRequest, responseFuture); + break; + case ADD_PARTITIONS_TO_TXN: + handleAddPartitionsToTxn(kafkaHeaderAndRequest, responseFuture); + break; + case ADD_OFFSETS_TO_TXN: + handleAddOffsetsToTxn(kafkaHeaderAndRequest, responseFuture); + break; + case TXN_OFFSET_COMMIT: + handleTxnOffsetCommit(kafkaHeaderAndRequest, responseFuture); + break; + case END_TXN: + handleEndTxn(kafkaHeaderAndRequest, responseFuture); + break; + case WRITE_TXN_MARKERS: + handleWriteTxnMarkers(kafkaHeaderAndRequest, responseFuture); + break; case DESCRIBE_CONFIGS: handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture); break; @@ -372,6 +390,24 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) { protected abstract void handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void + handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + + protected abstract void + handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + + protected abstract void + handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + + protected abstract void + handleTxnOffsetCommit(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + + protected abstract void + handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + + protected abstract void + handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + static class KafkaHeaderAndRequest implements Closeable { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index b88323919e..500805f41a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -23,6 +23,8 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils; @@ -189,6 +191,8 @@ public enum ListenerType { @Getter private GroupCoordinator groupCoordinator; @Getter + private TransactionCoordinator transactionCoordinator; + @Getter private String bindAddress; @@ -252,6 +256,13 @@ public void start(BrokerService service) { log.error("initGroupCoordinator failed with", e); } } + if (kafkaConfig.isEnableTransactionCoordinator()) { + try { + initTransactionCoordinator(); + } catch (Exception e) { + log.error("Initialized transaction coordinator failed.", e); + } + } } // this is called after initialize, and with kafkaConfig, brokerService all set. @@ -279,12 +290,12 @@ public Map> newChannelIniti case PLAINTEXT: case SASL_PLAINTEXT: builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(), - kafkaConfig, groupCoordinator, false, advertisedEndPoint)); + kafkaConfig, groupCoordinator, transactionCoordinator, false, advertisedEndPoint)); break; case SSL: case SASL_SSL: builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(), - kafkaConfig, groupCoordinator, true, advertisedEndPoint)); + kafkaConfig, groupCoordinator, transactionCoordinator, true, advertisedEndPoint)); break; } }); @@ -349,6 +360,11 @@ public void startGroupCoordinator() throws Exception { } } + public void initTransactionCoordinator() throws Exception { + TransactionConfig transactionConfig = TransactionConfig.builder().build(); + this.transactionCoordinator = TransactionCoordinator.of(transactionConfig); + } + /** * This method discovers ownership of offset topic partitions and attempts to load offset topics * assigned to this broker. diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 57ca22e13d..60f8fcaa29 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -22,10 +22,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.AbortedIndexEntry; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; @@ -68,11 +72,15 @@ import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AddOffsetsToTxnRequest; +import org.apache.kafka.common.requests.AddPartitionsToTxnRequest; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; @@ -85,11 +93,13 @@ import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMember; import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata; +import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; +import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.LeaveGroupRequest; @@ -114,15 +124,24 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.common.requests.TxnOffsetCommitRequest; +import org.apache.kafka.common.requests.TxnOffsetCommitResponse; +import org.apache.kafka.common.requests.WriteTxnMarkersRequest; +import org.apache.kafka.common.requests.WriteTxnMarkersResponse; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarMarkers; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Murmur3_32Hash; @@ -142,6 +161,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final KafkaServiceConfiguration kafkaConfig; private final KafkaTopicManager topicManager; private final GroupCoordinator groupCoordinator; + private final TransactionCoordinator transactionCoordinator; private final String clusterName; private final ScheduledExecutorService executor; @@ -162,12 +182,14 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { public KafkaRequestHandler(PulsarService pulsarService, KafkaServiceConfiguration kafkaConfig, GroupCoordinator groupCoordinator, + TransactionCoordinator transactionCoordinator, Boolean tlsEnabled, EndPoint advertisedEndPoint) throws Exception { super(); this.pulsarService = pulsarService; this.kafkaConfig = kafkaConfig; this.groupCoordinator = groupCoordinator; + this.transactionCoordinator = transactionCoordinator; this.clusterName = kafkaConfig.getClusterName(); this.executor = pulsarService.getExecutor(); this.admin = pulsarService.getAdminClient(); @@ -570,11 +592,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, checkArgument(produceHar.getRequest() instanceof ProduceRequest); ProduceRequest produceRequest = (ProduceRequest) produceHar.getRequest(); if (produceRequest.transactionalId() != null) { - log.warn("[{}] Transactions not supported", ctx.channel()); - - resultFuture.complete( - failedResponse(produceHar, new UnsupportedOperationException("No transaction support"))); - return; + // TODO auth check } Map> responsesFutures = new HashMap<>(); @@ -601,7 +619,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, MemoryRecords records = (MemoryRecords) entry.getValue(); String fullPartitionName = KopTopic.toString(topicPartition); PendingProduce pendingProduce = new PendingProduce(partitionResponse, topicManager, fullPartitionName, - entryFormatter, records, executor); + entryFormatter, records, executor, transactionCoordinator); PendingProduceQueue queue = pendingProduceQueueMap.computeIfAbsent(topicPartition, ignored -> new PendingProduceQueue()); queue.add(pendingProduce); @@ -631,37 +649,43 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato checkArgument(findCoordinator.getRequest() instanceof FindCoordinatorRequest); FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); - if (request.coordinatorType() == FindCoordinatorRequest.CoordinatorType.GROUP) { - int partition = groupCoordinator.partitionFor(request.coordinatorKey()); - String pulsarTopicName = groupCoordinator.getTopicPartitionName(partition); + String pulsarTopicName; + int partition; + + if (request.coordinatorType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION) { + partition = transactionCoordinator.partitionFor(request.coordinatorKey()); + pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition); + } else if (request.coordinatorType() == FindCoordinatorRequest.CoordinatorType.GROUP) { + partition = groupCoordinator.partitionFor(request.coordinatorKey()); + pulsarTopicName = groupCoordinator.getTopicPartitionName(partition); + } else { + throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type " + + request.coordinatorType()); + } - findBroker(TopicName.get(pulsarTopicName)) + findBroker(TopicName.get(pulsarTopicName)) .whenComplete((node, t) -> { if (t != null || node == null){ log.error("[{}] Request {}: Error while find coordinator, .", - ctx.channel(), findCoordinator.getHeader(), t); + ctx.channel(), findCoordinator.getHeader(), t); AbstractResponse response = new FindCoordinatorResponse( - Errors.LEADER_NOT_AVAILABLE, - Node.noNode()); + Errors.LEADER_NOT_AVAILABLE, + Node.noNode()); resultFuture.complete(response); return; } if (log.isDebugEnabled()) { log.debug("[{}] Found node {} as coordinator for key {} partition {}.", - ctx.channel(), node.leader(), request.coordinatorKey(), partition); + ctx.channel(), node.leader(), request.coordinatorKey(), partition); } AbstractResponse response = new FindCoordinatorResponse( - Errors.NONE, - node.leader()); + Errors.NONE, + node.leader()); resultFuture.complete(response); }); - } else { - throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type " - + request.coordinatorType()); - } } protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, @@ -1039,7 +1063,7 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, } MessageFetchContext fetchContext = MessageFetchContext.get(this); - fetchContext.handleFetch(resultFuture, fetch); + fetchContext.handleFetch(resultFuture, fetch, transactionCoordinator); } protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup, @@ -1283,6 +1307,153 @@ protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs, }); } + @Override + protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response) { + InitProducerIdRequest request = (InitProducerIdRequest) kafkaHeaderAndRequest.getRequest(); + transactionCoordinator.handleInitProducerId( + request.transactionalId(), request.transactionTimeoutMs(), response); + } + + @Override + protected void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response) { + AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) kafkaHeaderAndRequest.getRequest(); + transactionCoordinator.handleAddPartitionsToTransaction(request.transactionalId(), + request.producerId(), request.producerEpoch(), request.partitions(), response); + } + + @Override + protected void handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response) { + AddOffsetsToTxnRequest request = (AddOffsetsToTxnRequest) kafkaHeaderAndRequest.getRequest(); + int partition = groupCoordinator.partitionFor(request.consumerGroupId()); + String offsetTopicName = groupCoordinator.getGroupManager().getOffsetConfig().offsetsTopicName(); + transactionCoordinator.handleAddPartitionsToTransaction( + request.transactionalId(), + request.producerId(), + request.producerEpoch(), + Collections.singletonList(new TopicPartition(offsetTopicName, partition)), response); + } + + @Override + protected void handleTxnOffsetCommit(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response) { + TxnOffsetCommitRequest request = (TxnOffsetCommitRequest) kafkaHeaderAndRequest.getRequest(); + log.info("handleTxnOffsetCommit transactionalId: {}", request.transactionalId()); + response.complete(new TxnOffsetCommitResponse(0, Collections.EMPTY_MAP)); + } + + @Override + protected void handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response) { + EndTxnRequest request = (EndTxnRequest) kafkaHeaderAndRequest.getRequest(); + transactionCoordinator.handleEndTransaction( + request.transactionalId(), + request.producerId(), + request.producerEpoch(), + request.command(), + this, + response); + } + + @Override + protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response) { + WriteTxnMarkersRequest request = (WriteTxnMarkersRequest) kafkaHeaderAndRequest.getRequest(); + Map> resultMap = new HashMap<>(); + List> offsetFutureList = new ArrayList<>(); + for (WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry : request.markers()) { + resultMap.compute(txnMarkerEntry.producerId(), (key, value) -> { + if (value == null) { + value = new HashMap<>(); + } + for (TopicPartition topicPartition : txnMarkerEntry.partitions()) { + CompletableFuture completableFuture = writeTxnMarker( + topicPartition, + txnMarkerEntry.transactionResult(), + txnMarkerEntry.producerId(), + txnMarkerEntry.producerEpoch()); + completableFuture.whenComplete((offset, throwable) -> { + String fullPartitionName = KopTopic.toString(topicPartition); + + TopicName topicName = TopicName.get(fullPartitionName); + long firstOffset = transactionCoordinator.removeActivePidOffset( + topicName, txnMarkerEntry.producerId()); + long lastStableOffset = transactionCoordinator.getLastStableOffset(topicName); + + if (txnMarkerEntry.transactionResult().equals(TransactionResult.ABORT)) { + transactionCoordinator.addAbortedIndex(AbortedIndexEntry.builder() + .version(request.version()) + .pid(txnMarkerEntry.producerId()) + .firstOffset(firstOffset) + .lastOffset(offset) + .lastStableOffset(lastStableOffset) + .build()); + } + }); + offsetFutureList.add(completableFuture); + value.put(topicPartition, Errors.NONE); + } + return value; + }); + } + FutureUtil.waitForAll(offsetFutureList).whenComplete((ignored, throwable) -> { + response.complete(new WriteTxnMarkersResponse(resultMap)); + }); + } + + /** + * Write the txn marker to the topic partition. + * + * @param topicPartition + */ + private CompletableFuture writeTxnMarker(TopicPartition topicPartition, + TransactionResult transactionResult, + long producerId, + short producerEpoch) { + CompletableFuture offsetFuture = new CompletableFuture<>(); + String fullPartitionName = KopTopic.toString(topicPartition); + TopicName topicName = TopicName.get(fullPartitionName); + topicManager.getTopic(topicName.toString()) + .thenAccept(persistentTopic -> { + persistentTopic.publishMessage( + generateTxnMarker(transactionResult, producerId, producerEpoch), + MessagePublishContext.get( + offsetFuture, + persistentTopic, + persistentTopic.getManagedLedger(), + 1, + SystemTime.SYSTEM.milliseconds())); + }); + return offsetFuture; + } + + private ByteBuf generateTxnMarker(TransactionResult transactionResult, long producerId, short producerEpoch) { + ControlRecordType controlRecordType; + PulsarMarkers.MarkerType markerType; + if (transactionResult.equals(TransactionResult.COMMIT)) { + markerType = PulsarMarkers.MarkerType.TXN_COMMIT; + controlRecordType = ControlRecordType.COMMIT; + } else { + markerType = PulsarMarkers.MarkerType.TXN_ABORT; + controlRecordType = ControlRecordType.ABORT; + } + EndTransactionMarker marker = new EndTransactionMarker(controlRecordType, 0); + MemoryRecords memoryRecords = MemoryRecords.withEndTransactionMarker(producerId, producerEpoch, marker); + + ByteBuf byteBuf = Unpooled.wrappedBuffer(memoryRecords.buffer()); + PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder() + .setTxnidMostBits(producerId) + .setTxnidLeastBits(producerEpoch) + .setMarkerType(markerType.getNumber()) + .setPublishTime(SystemTime.SYSTEM.milliseconds()) + .setProducerName("") + .setSequenceId(0L) + .build(); + return Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, messageMetadata, byteBuf); + } + private SaslHandshakeResponse checkSaslMechanism(String mechanism) { if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) { return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms()); @@ -1395,7 +1566,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { return returnFuture; } - private CompletableFuture findBroker(TopicName topic) { + private boolean isOffsetTopic(String topic) { + String offsetsTopic = kafkaConfig.getKafkaMetadataTenant() + "/" + + kafkaConfig.getKafkaMetadataNamespace() + + "/" + Topic.GROUP_METADATA_TOPIC_NAME; + + return topic.contains(offsetsTopic); + } + + public CompletableFuture findBroker(TopicName topic) { if (log.isDebugEnabled()) { log.debug("[{}] Handle Lookup for {}", ctx.channel(), topic); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 6014a7068a..72ca201c3c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -45,6 +45,9 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { @Category private static final String CATEGORY_KOP_SSL = "Kafka on Pulsar SSL configuration"; + + @Category + private static final String CATEGORY_KOP_TRANSACTION = "Kafka on Pulsar transaction"; // // --- Kafka on Pulsar Broker configuration --- // @@ -284,4 +287,10 @@ public String getListeners() { doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]" ) private String entryFormat = "pulsar"; + @FieldContext( + category = CATEGORY_KOP_TRANSACTION, + doc = "Flag to enable transaction coordinator" + ) + private boolean enableTransactionCoordinator = false; + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index deabbacd6b..6c8dd9b90b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -19,8 +19,10 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; + import java.util.Date; import java.util.LinkedHashMap; import java.util.List; @@ -50,9 +52,11 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FetchResponse.PartitionData; +import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.pulsar.common.naming.TopicName; /** - * MessageFetchContext handling FetchRequest . + * MessageFetchContext handling FetchRequest. */ @Slf4j public final class MessageFetchContext { @@ -87,7 +91,8 @@ public void recycle() { // handle request public CompletableFuture handleFetch( CompletableFuture fetchResponse, - KafkaHeaderAndRequest fetchRequest) { + KafkaHeaderAndRequest fetchRequest, + TransactionCoordinator transactionCoordinator) { LinkedHashMap> responseData = new LinkedHashMap<>(); // Map of partition and related tcm. @@ -167,7 +172,7 @@ public CompletableFuture handleFetch( .filter(x -> x != null) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - readMessages(fetchRequest, partitionCursor, fetchResponse, responseData); + readMessages(fetchRequest, partitionCursor, fetchResponse, responseData, transactionCoordinator); }); return fetchResponse; @@ -179,11 +184,12 @@ public CompletableFuture handleFetch( private void readMessages(KafkaHeaderAndRequest fetch, Map> cursors, CompletableFuture resultFuture, - LinkedHashMap> responseData) { + LinkedHashMap> responseData, + TransactionCoordinator tc) { AtomicInteger bytesRead = new AtomicInteger(0); Map> entryValues = new ConcurrentHashMap<>(); - readMessagesInternal(fetch, cursors, bytesRead, entryValues, resultFuture, responseData); + readMessagesInternal(fetch, cursors, bytesRead, entryValues, resultFuture, responseData, tc); } private void readMessagesInternal(KafkaHeaderAndRequest fetch, @@ -191,12 +197,18 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, AtomicInteger bytesRead, Map> responseValues, CompletableFuture resultFuture, - LinkedHashMap> responseData) { + LinkedHashMap> responseData, + TransactionCoordinator tc) { + AtomicInteger entriesRead = new AtomicInteger(0); // here do the real read, and in read callback put cursor back to KafkaTopicConsumerManager. Map>> readFutures = readAllCursorOnce(cursors); CompletableFuture.allOf(readFutures.values().stream().toArray(CompletableFuture[]::new)) .whenComplete((ignore, ex) -> { + + FetchRequest request = (FetchRequest) fetch.getRequest(); + IsolationLevel isolationLevel = request.isolationLevel(); + // keep entries since all read completed. readFutures.entrySet().parallelStream().forEach(kafkaTopicReadEntry -> { TopicPartition kafkaTopic = kafkaTopicReadEntry.getKey(); @@ -206,13 +218,28 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, List entryList = responseValues.computeIfAbsent(kafkaTopic, l -> Lists.newArrayList()); if (entries != null && !entries.isEmpty()) { - entryList.addAll(entries); - entriesRead.addAndGet(entries.size()); - bytesRead.addAndGet(entryList.stream().parallel().map(e -> + if (isolationLevel.equals(IsolationLevel.READ_UNCOMMITTED)) { + entryList.addAll(entries); + entriesRead.addAndGet(entries.size()); + bytesRead.addAndGet(entryList.stream().parallel().map(e -> e.getLength()).reduce(0, Integer::sum)); + } else { + TopicName topicName = TopicName.get(KopTopic.toString(kafkaTopic)); + long lso = tc.getLastStableOffset(topicName); + for (Entry entry : entries) { + if (lso >= MessageIdUtils.peekBaseOffsetFromEntry(entry)) { + entryList.add(entry); + entriesRead.incrementAndGet(); + bytesRead.addAndGet(entry.getLength()); + } else { + break; + } + } + } + if (log.isDebugEnabled()) { log.debug("Request {}: For topic {}, entries in list: {}.", - fetch.getHeader(), kafkaTopic.toString(), entryList.size()); + fetch.getHeader(), kafkaTopic.toString(), entryList.size()); } } } catch (Exception e) { @@ -250,7 +277,6 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, } }); - FetchRequest request = (FetchRequest) fetch.getRequest(); int maxBytes = request.maxBytes(); int minBytes = request.minBytes(); int waitTime = request.maxWait(); // in ms @@ -324,12 +350,19 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, } final MemoryRecords records = requestHandler.getEntryFormatter().decode(entries, magic); + List abortedTransactions; + if (IsolationLevel.READ_UNCOMMITTED.equals(isolationLevel)) { + abortedTransactions = null; + } else { + abortedTransactions = tc.getAbortedIndexList( + request.fetchData().get(kafkaPartition).fetchOffset); + } partitionData = new FetchResponse.PartitionData( Errors.NONE, highWatermark, highWatermark, highWatermark, - null, + abortedTransactions, records); } responseData.put(kafkaPartition, partitionData); @@ -364,7 +397,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, fetch.getHeader()); } // need do another round read - readMessagesInternal(fetch, cursors, bytesRead, responseValues, resultFuture, responseData); + readMessagesInternal(fetch, cursors, bytesRead, responseValues, resultFuture, responseData, tc); } }); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java index d92e3493f1..f0a3248027 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.kop; import io.netty.buffer.ByteBuf; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -21,9 +22,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.naming.TopicName; /** * Pending context related to the produce task. @@ -38,13 +41,17 @@ public class PendingProduce { private final CompletableFuture topicFuture; private final CompletableFuture byteBufFuture; private CompletableFuture offsetFuture; + private final TransactionCoordinator transactionCoordinator; + private long pid; + private boolean isTransactional; public PendingProduce(CompletableFuture responseFuture, KafkaTopicManager topicManager, String partitionName, EntryFormatter entryFormatter, MemoryRecords memoryRecords, - ExecutorService executor) { + ExecutorService executor, + TransactionCoordinator transactionCoordinator) { this.responseFuture = responseFuture; this.topicManager = topicManager; this.partitionName = partitionName; @@ -61,6 +68,11 @@ public PendingProduce(CompletableFuture responseFuture, }); executor.execute(() -> byteBufFuture.complete(entryFormatter.encode(memoryRecords, numMessages))); this.offsetFuture = new CompletableFuture<>(); + + RecordBatch batch = memoryRecords.batchIterator().next(); + this.transactionCoordinator = transactionCoordinator; + this.pid = batch.producerId(); + this.isTransactional = batch.isTransactional(); } public boolean ready() { @@ -114,6 +126,9 @@ public void publishMessages() { persistentTopic.getManagedLedger(), numMessages, System.nanoTime())); offsetFuture.whenComplete((offset, e) -> { if (e == null) { + if (this.isTransactional) { + transactionCoordinator.addActivePidOffset(TopicName.get(partitionName), pid, offset); + } responseFuture.complete(new PartitionResponse(Errors.NONE, offset, -1L, -1L)); } else { log.error("publishMessages for topic partition: {} failed when write.", partitionName, e); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java index 964d4cc20b..5c5a694e78 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java @@ -57,6 +57,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Data; +import lombok.Getter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; @@ -163,6 +164,7 @@ public String toString() { private final byte magicValue = RecordBatch.CURRENT_MAGIC_VALUE; private final CompressionType compressionType; + @Getter private final OffsetConfig offsetConfig; private final ConcurrentMap groupMetadataCache; /* lock protecting access to loading and owned partition sets */ diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/AbortedIndexEntry.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/AbortedIndexEntry.java new file mode 100644 index 0000000000..8c8c2eae5b --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/AbortedIndexEntry.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import lombok.Builder; +import lombok.Data; + +/** + * This class presents aborted index data. + */ +@Builder +@Data +public class AbortedIndexEntry { + + private short version; + private long pid; + private long firstOffset; + private long lastOffset; + private long lastStableOffset; + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager.java new file mode 100644 index 0000000000..d9f37bba3a --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/ProducerIdManager.java @@ -0,0 +1,30 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class used to manage producer id. + */ +public class ProducerIdManager { + + private final AtomicLong producerId = new AtomicLong(0); + + public long generateProducerId() { + // TODO generate unique producer id + return producerId.incrementAndGet(); + } + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java new file mode 100644 index 0000000000..547d8cdf8a --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java @@ -0,0 +1,38 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import lombok.Builder; +import lombok.Builder.Default; +import lombok.Data; + +/** + * Transaction config. + */ +@Builder +@Data +public class TransactionConfig { + + public static final String DefaultTransactionMetadataTopicName = "public/default/__transaction_state"; + public static final int DefaultTransactionMetadataTopicPartition = 1; + public static final int DefaultTransactionMaxTimieoutMs = 1000 * 60 * 60 * 24; + + @Default + private String transactionMetadataTopicName = DefaultTransactionMetadataTopicName; + @Default + private int transactionMetadataTopicPartition = DefaultTransactionMetadataTopicPartition; + @Default + private int transactionMaxTimeoutMs = DefaultTransactionMaxTimieoutMs; + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java new file mode 100644 index 0000000000..e369b06e61 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -0,0 +1,353 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; + +import com.google.common.collect.Lists; +import io.streamnative.pulsar.handlers.kop.KafkaRequestHandler; +import io.streamnative.pulsar.handlers.kop.utils.KopTopic; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.MathUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; +import org.apache.kafka.common.requests.EndTxnResponse; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.InitProducerIdResponse; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.common.requests.WriteTxnMarkersRequest; +import org.apache.kafka.common.requests.WriteTxnMarkersResponse; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; + +/** + * Transaction coordinator. + */ +@Slf4j +public class TransactionCoordinator { + + private final TransactionConfig transactionConfig; + private final ProducerIdManager producerIdManager; + private final TransactionStateManager txnStateManager; + private final TransactionMarkerChannelManager transactionMarkerChannelManager; + + // map from topic to the map from initial offset to producerId + private final Map> activeOffsetPidMap = new HashMap<>(); + private final Map> activePidOffsetMap = new HashMap<>(); + private final List abortedIndexList = new ArrayList<>(); + + private TransactionCoordinator(TransactionConfig transactionConfig) { + this.transactionConfig = transactionConfig; + this.txnStateManager = new TransactionStateManager(transactionConfig); + this.producerIdManager = new ProducerIdManager(); + this.transactionMarkerChannelManager = new TransactionMarkerChannelManager(null, null, false); + } + + public static TransactionCoordinator of(TransactionConfig transactionConfig) { + return new TransactionCoordinator(transactionConfig); + } + + public int partitionFor(String transactionalId) { + return MathUtils.signSafeMod( + transactionalId.hashCode(), + transactionConfig.getTransactionMetadataTopicPartition() + ); + } + + public String getTopicPartitionName() { + return transactionConfig.getTransactionMetadataTopicName(); + } + + public String getTopicPartitionName(int partitionId) { + return getTopicPartitionName() + PARTITIONED_TOPIC_SUFFIX + partitionId; + } + + public void handleInitProducerId(String transactionalId, int transactionTimeoutMs, + CompletableFuture response) { + if (transactionalId == null) { + // if the transactional id is null, then always blindly accept the request + // and return a new producerId from the producerId manager + long producerId = producerIdManager.generateProducerId(); + short producerEpoch = 0; + response.complete(new InitProducerIdResponse(0, Errors.NONE, producerId, producerEpoch)); + } else if (StringUtils.isEmpty(transactionalId)) { + // if transactional id is empty then return error as invalid request. This is + // to make TransactionCoordinator's behavior consistent with producer client + response.complete(new InitProducerIdResponse(0, Errors.INVALID_REQUEST)); + } else if (!txnStateManager.validateTransactionTimeoutMs(transactionTimeoutMs)){ + // check transactionTimeoutMs is not larger than the broker configured maximum allowed value + response.complete(new InitProducerIdResponse(0, Errors.INVALID_TRANSACTION_TIMEOUT)); + } else { + TransactionMetadata metadata = txnStateManager.getTransactionState(transactionalId); + if (metadata == null) { + long producerId = producerIdManager.generateProducerId(); + short producerEpoch = 0; + metadata = TransactionMetadata.builder() + .transactionalId(transactionalId) + .producerId(producerId) + .producerEpoch(producerEpoch) + .state(TransactionState.EMPTY) + .topicPartitions(new HashSet<>()) + .build(); + txnStateManager.putTransactionStateIfNotExists(transactionalId, metadata); + response.complete(new InitProducerIdResponse( + 0, Errors.NONE, metadata.getProducerId(), metadata.getProducerEpoch())); + } else { + // TODO generate monotonically increasing epoch + // TODO conflict resolve + response.complete(new InitProducerIdResponse( + 0, Errors.NONE, metadata.getProducerId(), metadata.getProducerEpoch())); + } + } + } + + public void handleAddPartitionsToTransaction(String transactionalId, + long producerId, + short producerEpoch, + List partitionList, + CompletableFuture response) { + TransactionMetadata metadata = txnStateManager.getTransactionState(transactionalId); + TransactionMetadata.TxnTransitMetadata newMetadata = + metadata.prepareAddPartitions(new HashSet<>(partitionList), SystemTime.SYSTEM.milliseconds()); + txnStateManager.appendTransactionToLog( + transactionalId, 0, newMetadata, + new TransactionStateManager.ResponseCallback() { + @Override + public void complete() { + Map data = new HashMap<>(); + for (TopicPartition topicPartition : newMetadata.getTopicPartitions()) { + data.put(topicPartition, Errors.NONE); + } + response.complete(new AddPartitionsToTxnResponse(0, data)); + } + + @Override + public void fail(Exception e) { + + } + }); + } + + public void handleEndTransaction(String transactionalId, + long producerId, + short producerEpoch, + TransactionResult transactionResult, + KafkaRequestHandler requestHandler, + CompletableFuture response) { + + TransactionMetadata metadata = txnStateManager.getTransactionState(transactionalId); + switch (metadata.getState()) { + case ONGOING: + TransactionState nextState; + if (transactionResult == TransactionResult.COMMIT) { + nextState = TransactionState.PREPARE_COMMIT; + } else { + nextState = TransactionState.PREPARE_ABORT; + } + + if (nextState == TransactionState.PREPARE_ABORT && metadata.getPendingState() != null + && metadata.getPendingState().equals(TransactionState.PREPARE_EPOCH_FENCE)) { + // We should clear the pending state to make way for the transition to PrepareAbort and also bump + // the epoch in the transaction metadata we are about to append. + metadata.setPendingState(null); + metadata.setProducerEpoch(producerEpoch); + } + + TransactionMetadata.TxnTransitMetadata newMetadata = + metadata.prepareAbortOrCommit(nextState, SystemTime.SYSTEM.milliseconds()); + txnStateManager.appendTransactionToLog(transactionalId, 0, newMetadata, + new TransactionStateManager.ResponseCallback() { + @Override + public void complete() { + + } + + @Override + public void fail(Exception e) { + + } + }); + break; + case COMPLETE_COMMIT: + break; + case COMPLETE_ABORT: + break; + case PREPARE_COMMIT: + break; + case PREPARE_ABORT: + break; + case EMPTY: + break; + case DEAD: + case PREPARE_EPOCH_FENCE: + break; + } + + final Map markerHandlerMap = new HashMap<>(); + for (TopicPartition topicPartition : metadata.getTopicPartitions()) { + String pulsarTopic = new KopTopic(topicPartition.topic()).getPartitionName(topicPartition.partition()); + requestHandler.findBroker(TopicName.get(pulsarTopic)) + .thenAccept(partitionMetadata -> { + InetSocketAddress socketAddress = new InetSocketAddress( + partitionMetadata.leader().host(), partitionMetadata.leader().port()); + CompletableFuture handlerFuture = + transactionMarkerChannelManager.getChannel(socketAddress); + markerHandlerMap.compute(socketAddress, (key, value) -> { + if (value == null) { + List topicPartitionList = new ArrayList<>(); + topicPartitionList.add(topicPartition); + return MarkerHandler.builder() + .topicPartitionList(topicPartitionList) + .handlerFuture(handlerFuture) + .build(); + } else { + value.topicPartitionList.add(topicPartition); + return value; + } + }); + }); + } + + List> completableFutureList = new ArrayList<>(); + for (MarkerHandler markerHandler : markerHandlerMap.values()) { + completableFutureList.add( + markerHandler.writeTxnMarker(producerId, producerEpoch, transactionResult)); + } + + FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, throwable) -> { + TransactionMetadata.TxnTransitMetadata newMetadata = + metadata.prepareComplete(SystemTime.SYSTEM.milliseconds()); + txnStateManager.appendTransactionToLog(transactionalId, 0, newMetadata, + new TransactionStateManager.ResponseCallback() { + @Override + public void complete() { + response.complete(new EndTxnResponse(0, Errors.NONE)); + } + + @Override + public void fail(Exception e) { + + } + }); + }); + } + + public void addActivePidOffset(TopicName topicName, long pid, long offset) { + ConcurrentHashMap pidOffsetMap = + activePidOffsetMap.computeIfAbsent(topicName, topicKey -> new ConcurrentHashMap<>()); + + NavigableMap offsetPidMap = + activeOffsetPidMap.computeIfAbsent(topicName, topicKey -> new ConcurrentSkipListMap<>()); + + pidOffsetMap.computeIfAbsent(pid, pidKey -> { + offsetPidMap.computeIfAbsent(offset, offsetKey -> { + return pid; + }); + return offset; + }); + } + + public long removeActivePidOffset(TopicName topicName, long pid) { + ConcurrentHashMap pidOffsetMap = activePidOffsetMap.getOrDefault(topicName, null); + if (pidOffsetMap == null) { + return -1; + } + + NavigableMap offsetPidMap = activeOffsetPidMap.getOrDefault(topicName, null); + if (offsetPidMap == null) { + log.warn("[removeActivePidOffset] offsetPidMap is null"); + return -1; + } + + Long offset = pidOffsetMap.remove(pid); + if (offset == null) { + log.warn("[removeActivePidOffset] pidOffsetMap is not contains pid {}.", pid); + return -1; + } + + if (offsetPidMap.containsKey(offset)) { + offsetPidMap.remove(offset); + } + return offset; + } + + public long getLastStableOffset(TopicName topicName) { + NavigableMap map = activeOffsetPidMap.getOrDefault(topicName, null); + if (map == null) { + log.warn("[activeOffsetPidMap] size: {} the topic {} map is null last", + activeOffsetPidMap.size(), topicName); + return Long.MAX_VALUE; + } + if (map.size() == 0) { + return Long.MAX_VALUE; + } + return map.firstKey(); + } + + public void addAbortedIndex(AbortedIndexEntry abortedIndexEntry) { + abortedIndexList.add(abortedIndexEntry); + } + + public List getAbortedIndexList(long fetchOffset) { + List abortedTransactions = new ArrayList<>(abortedIndexList.size()); + for (AbortedIndexEntry abortedIndexEntry : abortedIndexList) { + if (abortedIndexEntry.getLastOffset() >= fetchOffset) { + abortedTransactions.add( + new FetchResponse.AbortedTransaction( + abortedIndexEntry.getPid(), + abortedIndexEntry.getFirstOffset())); + } + } + return abortedTransactions; + } + + @Builder + @Data + private static class MarkerHandler { + + private CompletableFuture handlerFuture; + private List topicPartitionList; + + public CompletableFuture writeTxnMarker(long producerId, + short producerEpoch, + TransactionResult transactionResult) { + WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry = new WriteTxnMarkersRequest.TxnMarkerEntry( + producerId, + producerEpoch, + 1, + transactionResult, + topicPartitionList); + WriteTxnMarkersRequest txnMarkersRequest = + new WriteTxnMarkersRequest.Builder(Lists.newArrayList(txnMarkerEntry)).build(); + return handlerFuture.thenCompose(handler -> handler.enqueueRequest(txnMarkersRequest)); + } + } + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionLog.java new file mode 100644 index 0000000000..cdff5109f7 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionLog.java @@ -0,0 +1,23 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +/** + * Transaction log. + */ +public class TransactionLog { + + + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java new file mode 100644 index 0000000000..cd6a59b006 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java @@ -0,0 +1,156 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestUtils; +import org.apache.kafka.common.requests.WriteTxnMarkersRequest; +import org.apache.kafka.common.requests.WriteTxnMarkersResponse; + +/** + * Transaction marker channel handler. + */ +@Slf4j +public class TransactionMarkerChannelHandler extends ChannelInboundHandlerAdapter { + + private ChannelHandlerContext cnx; + + private final Queue requestQueue = new LinkedBlockingQueue<>(); + private final Queue requestResponseQueue = new LinkedBlockingQueue<>(); + + public CompletableFuture enqueueRequest(WriteTxnMarkersRequest request) { + log.info("enqueueRequest"); + TxnMarkerRequestResponse txnMarkerRequestResponse = new TxnMarkerRequestResponse(request); + if (requestQueue.offer(txnMarkerRequestResponse)) { + pollRequest(); + return txnMarkerRequestResponse.responseFuture; + } else { + txnMarkerRequestResponse.responseFuture.completeExceptionally( + new Exception("The transaction markers queue is full")); + return txnMarkerRequestResponse.responseFuture; + } + } + + public void pollRequest() { + log.info("poll request queue: {}", requestQueue.size()); + TxnMarkerRequestResponse request = requestQueue.poll(); + while (request != null) { + requestResponseQueue.offer(request); + ByteBuf byteBuf = request.getRequestData(); + log.info("byteBuff {}", byteBuf); + cnx.writeAndFlush(byteBuf); + log.info("poll request write and flush"); + request = requestQueue.poll(); + } + } + + private static class TxnMarkerRequestResponse { + private final WriteTxnMarkersRequest request; + private final CompletableFuture responseFuture = new CompletableFuture<>(); + + public TxnMarkerRequestResponse(WriteTxnMarkersRequest request) { + this.request = request; + } + + public ByteBuf getRequestData() { + RequestHeader requestHeader = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, request.version(), "", -1); + return RequestUtils.serializeRequest(request.version(), requestHeader, request); + } + + public void onComplete(ByteBuf byteBuf) { + WriteTxnMarkersResponse response = WriteTxnMarkersResponse + .parse(byteBuf.skipBytes(4).nioBuffer(), ApiKeys.WRITE_TXN_MARKERS.latestVersion()); + responseFuture.complete(response); + } + } + + @Override + public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception { + log.info("[TransactionMarkerChannelHandler] channelRegistered"); + super.channelRegistered(channelHandlerContext); + } + + @Override + public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception { + log.info("[TransactionMarkerChannelHandler] channelUnregistered"); + super.channelUnregistered(channelHandlerContext); + } + + @Override + public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception { + log.info("[TransactionMarkerChannelHandler] channelActive"); + this.cnx = channelHandlerContext; + super.channelActive(channelHandlerContext); + } + + @Override + public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception { + log.info("[TransactionMarkerChannelHandler] channelInactive"); + super.channelInactive(channelHandlerContext); + } + + @Override + public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { + log.info("[TransactionMarkerChannelHandler] channelRead"); + TxnMarkerRequestResponse requestResponse = requestResponseQueue.poll(); + if (requestResponse != null) { + requestResponse.onComplete((ByteBuf) o); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception { + log.info("[TransactionMarkerChannelHandler] channelReadComplete"); + super.channelReadComplete(channelHandlerContext); + } + + @Override + public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { + log.info("[TransactionMarkerChannelHandler] userEventTriggered"); + super.userEventTriggered(channelHandlerContext, o); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception { + log.info("[TransactionMarkerChannelHandler] channelWritabilityChanged"); + super.channelWritabilityChanged(channelHandlerContext); + } + + @Override + public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception { + log.info("[TransactionMarkerChannelHandler] exceptionCaught"); + super.exceptionCaught(channelHandlerContext, throwable); + } + + @Override + public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception { + log.info("[TransactionMarkerChannelHandler] handlerAdded"); + super.handlerAdded(channelHandlerContext); + } + + @Override + public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception { + log.info("[TransactionMarkerChannelHandler] handlerRemoved"); + super.handlerRemoved(channelHandlerContext); + } + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelInitializer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelInitializer.java new file mode 100644 index 0000000000..a2a1911a39 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelInitializer.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import static io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer.MAX_FRAME_LENGTH; +import static io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.TLS_HANDLER; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.ssl.SslHandler; +import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils; +import org.apache.pulsar.broker.PulsarService; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +/** + * Transaction marker channel initializer. + */ +public class TransactionMarkerChannelInitializer extends ChannelInitializer { + + private final KafkaServiceConfiguration kafkaConfig; + private final PulsarService pulsarService; + private final boolean enableTls; + private final SslContextFactory sslContextFactory; + + public TransactionMarkerChannelInitializer(KafkaServiceConfiguration kafkaConfig, + PulsarService pulsarService, + boolean enableTls) { + this.kafkaConfig = kafkaConfig; + this.pulsarService = pulsarService; + this.enableTls = enableTls; + if (enableTls) { + sslContextFactory = SSLUtils.createSslContextFactory(kafkaConfig); + } else { + sslContextFactory = null; + } + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + if (this.enableTls) { + ch.pipeline().addLast(TLS_HANDLER, new SslHandler(SSLUtils.createSslEngine(sslContextFactory))); + } + ch.pipeline().addLast(new LengthFieldPrepender(4)); + ch.pipeline().addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4)); + ch.pipeline().addLast("handler", new TransactionMarkerChannelHandler()); + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java new file mode 100644 index 0000000000..1822ddd79c --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java @@ -0,0 +1,77 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.util.netty.ChannelFutures; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +/** + * Transaction marker channel manager. + */ +@Slf4j +public class TransactionMarkerChannelManager { + + private final KafkaServiceConfiguration kafkaConfig; + private final PulsarService pulsarService; + private final EventLoopGroup eventLoopGroup; + private final boolean enableTls; + private final SslContextFactory sslContextFactory; + + private final Bootstrap bootstrap; + + private Map> handlerMap = new HashMap<>(); + + public TransactionMarkerChannelManager(KafkaServiceConfiguration kafkaConfig, + PulsarService pulsarService, + boolean enableTls) { + this.kafkaConfig = kafkaConfig; + this.pulsarService = pulsarService; + this.enableTls = enableTls; + if (this.enableTls) { + sslContextFactory = SSLUtils.createSslContextFactory(kafkaConfig); + } else { + sslContextFactory = null; + } + eventLoopGroup = new NioEventLoopGroup(); + bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new TransactionMarkerChannelInitializer(kafkaConfig, pulsarService, enableTls)); + } + + public CompletableFuture getChannel(InetSocketAddress socketAddress) { + return handlerMap.computeIfAbsent(socketAddress, address -> { + CompletableFuture handlerFuture = new CompletableFuture<>(); + ChannelFutures.toCompletableFuture(bootstrap.connect(socketAddress)) + .thenAccept(channel -> { + handlerFuture.complete( + (TransactionMarkerChannelHandler) channel.pipeline().get("handler")); + }); + return handlerFuture; + }); + } + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMetadata.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMetadata.java new file mode 100644 index 0000000000..40a7e35b99 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMetadata.java @@ -0,0 +1,297 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import com.google.common.collect.Sets; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; +import org.inferred.freebuilder.shaded.com.google.common.collect.Maps; + +/** + * Transaction metadata. + */ +@Slf4j +@Builder +@Data +public class TransactionMetadata { + + private static final int DefaultTxnTimeOutMs = 1000 * 60; + + // transactional id + private String transactionalId; + + //producer id + private long producerId; + + // current epoch of the producer + private short producerEpoch; + + // timeout to be used to abort long running transactions + private int txnTimeoutMs = DefaultTxnTimeOutMs; + + // current state of the transaction + private TransactionState state; + + // current set of partitions that are part of this transaction + private Set topicPartitions; + + // time the transaction was started, i.e., when first partition is added + private long txnStartTimestamp; + + // updated when any operation updates the TransactionMetadata. To be used for expiration + private long txnLastUpdateTimestamp; + + // pending state is used to indicate the state that this transaction is going to + // transit to, and for blocking future attempts to transit it again if it is not legal; + // initialized as the same as the current state + private TransactionState pendingState; + + private static Map> validPreviousStates; + + static { + validPreviousStates = Maps.newHashMap(); + validPreviousStates.put(TransactionState.EMPTY, Sets.immutableEnumSet( + TransactionState.EMPTY, + TransactionState.COMPLETE_COMMIT, + TransactionState.COMPLETE_ABORT)); + + validPreviousStates.put(TransactionState.ONGOING, Sets.immutableEnumSet( + TransactionState.ONGOING, + TransactionState.EMPTY, + TransactionState.COMPLETE_COMMIT, + TransactionState.COMPLETE_ABORT)); + + validPreviousStates.put(TransactionState.PREPARE_COMMIT, Sets.immutableEnumSet( + TransactionState.ONGOING)); + + validPreviousStates.put(TransactionState.PREPARE_ABORT, Sets.immutableEnumSet( + TransactionState.ONGOING, + TransactionState.PREPARE_EPOCH_FENCE)); + + validPreviousStates.put(TransactionState.COMPLETE_COMMIT, Sets.immutableEnumSet( + TransactionState.PREPARE_COMMIT)); + + validPreviousStates.put(TransactionState.COMPLETE_ABORT, Sets.immutableEnumSet( + TransactionState.PREPARE_ABORT)); + + validPreviousStates.put(TransactionState.DEAD, Sets.immutableEnumSet( + TransactionState.EMPTY, + TransactionState.COMPLETE_COMMIT, + TransactionState.COMPLETE_ABORT)); + + validPreviousStates.put(TransactionState.PREPARE_EPOCH_FENCE, Sets.immutableEnumSet( + TransactionState.ONGOING)); + } + + public TxnTransitMetadata prepareTransitionTo(TransactionState newState, + long newProducerId, + short newEpoch, + int newTxnTimeoutMs, + Set newTopicPartitions, + long newTxnStartTimestamp, + long updateTimestamp) { + if (pendingState != null) { + throw new IllegalStateException("Preparing transaction state transition to " + newState + + " while it already a pending state " + pendingState); + } + + if (newProducerId < 0) { + throw new IllegalArgumentException("Illegal new producer id $newProducerId"); + } + + if (newEpoch < 0) { + throw new IllegalArgumentException("Illegal new producer epoch $newEpoch"); + } + + // check that the new state transition is valid and update the pending state if necessary + if (validPreviousStates.get(newState).contains(state)) { + pendingState = newState; + return new TxnTransitMetadata(newProducerId, newEpoch, newTxnTimeoutMs, newState, + newTopicPartitions, newTxnStartTimestamp, updateTimestamp); + } else { + throw new IllegalStateException("Preparing transaction state transition to " + newState + + "failed since the target state" + newState + + " is not a valid previous state of the current state " + state); + } + } + + /** + * Transaction transit metadata. + */ + @AllArgsConstructor + @Data + public static class TxnTransitMetadata { + private long producerId; + private short producerEpoch; + private int txnTimeoutMs; + private TransactionState txnState; + private Set topicPartitions; + private long txnStartTimestamp; + private long txnLastUpdateTimestamp; + } + + public void completeTransitionTo(TxnTransitMetadata transitMetadata) throws Exception { + // metadata transition is valid only if all the following conditions are met: + // + // 1. the new state is already indicated in the pending state. + // 2. the epoch should be either the same value, the old value + 1, or 0 if we have a new producerId. + // 3. the last update time is no smaller than the old value. + // 4. the old partitions set is a subset of the new partitions set. + // + // plus, we should only try to update the metadata after the corresponding log entry has been successfully + // written and replicated (see TransactionStateManager#appendTransactionToLog) + // + // if valid, transition is done via overwriting the whole object to ensure synchronization + + TransactionState toState = pendingState; + + if (toState != transitMetadata.txnState) { + throwStateTransitionFailure(transitMetadata); + } else { + switch(toState) { + case EMPTY: // from initPid + if ((producerEpoch != transitMetadata.producerEpoch && !validProducerEpochBump(transitMetadata)) + || transitMetadata.topicPartitions.isEmpty() + || transitMetadata.txnStartTimestamp != -1) { + + throwStateTransitionFailure(transitMetadata); + } else { + txnTimeoutMs = transitMetadata.txnTimeoutMs; + producerEpoch = transitMetadata.producerEpoch; + producerId = transitMetadata.producerId; + } + break; + case ONGOING: // from addPartitions + if (!validProducerEpoch(transitMetadata) + // || !transitMetadata.topicPartitions.containsAll(topicPartitions) + || txnTimeoutMs != transitMetadata.txnTimeoutMs + || txnStartTimestamp > transitMetadata.txnStartTimestamp) { + + throwStateTransitionFailure(transitMetadata); + } else { + txnStartTimestamp = transitMetadata.txnStartTimestamp; + topicPartitions.addAll(transitMetadata.topicPartitions); + } + break; + case PREPARE_ABORT: // from endTxn + case PREPARE_COMMIT: // from endTxn + if (!validProducerEpoch(transitMetadata) + || !topicPartitions.equals(transitMetadata.topicPartitions) + || txnTimeoutMs != transitMetadata.txnTimeoutMs + || txnStartTimestamp != transitMetadata.txnStartTimestamp) { + + throwStateTransitionFailure(transitMetadata); + } + break; + case COMPLETE_ABORT: // from write markers + case COMPLETE_COMMIT: // from write markers + if (!validProducerEpoch(transitMetadata) + || txnTimeoutMs != transitMetadata.txnTimeoutMs + || transitMetadata.txnStartTimestamp == -1) { + + throwStateTransitionFailure(transitMetadata); + } else { + this.txnStartTimestamp = transitMetadata.txnStartTimestamp; + this.topicPartitions.clear(); + } + break; + case PREPARE_EPOCH_FENCE: + // We should never get here, since once we prepare to fence the epoch, + // we immediately set the pending state + // to PrepareAbort, and then consequently to CompleteAbort after the markers are written.. + // So we should never ever try to complete a transition to PrepareEpochFence, + // as it is not a valid previous state for any other state, + // and hence can never be transitioned out of. + throwStateTransitionFailure(transitMetadata); + break; + case DEAD: + // The transactionalId was being expired. The completion of the operation should result in + // removal of the the metadata from the cache, + // so we should never realistically transition to the dead state. + throw new IllegalStateException("TransactionalId " + transactionalId + + "is trying to complete a transition to " + + toState + ". This means that the transactionalId was being expired, and the only " + + "acceptable completion of this operation is to remove the transaction metadata " + + "from the cache, not to persist the " + toState + "in the"); + } + + log.info("TransactionalId {} complete transition from {} to {}", transactionalId, state, transitMetadata); + this.txnLastUpdateTimestamp = transitMetadata.txnLastUpdateTimestamp; + this.pendingState = null; + this.state = toState; + } + } + + private boolean validProducerEpoch(TxnTransitMetadata transitMetadata) { + short transitEpoch = transitMetadata.producerEpoch; + long transitProducerId = transitMetadata.producerId; + return transitEpoch == producerEpoch && transitProducerId == producerId; + } + + private boolean validProducerEpochBump(TxnTransitMetadata transitMetadata) { + short transitEpoch = transitMetadata.producerEpoch; + long transitProducerId = transitMetadata.producerId; + return transitEpoch == producerEpoch + 1 || (transitEpoch == 0 && transitProducerId != producerId); + } + + public TxnTransitMetadata prepareAddPartitions(Set addedTopicPartitions, Long updateTimestamp) { + long newTxnStartTimestamp; + switch(state) { + case EMPTY: + case COMPLETE_ABORT: + case COMPLETE_COMMIT: + newTxnStartTimestamp = updateTimestamp; + break; + default: + newTxnStartTimestamp = txnStartTimestamp; + } + Set newPartitionSet = new HashSet<>(); + if (topicPartitions != null) { + newPartitionSet.addAll(topicPartitions); + } + newPartitionSet.addAll(new HashSet<>(addedTopicPartitions)); + return prepareTransitionTo(TransactionState.ONGOING, producerId, producerEpoch, + txnTimeoutMs, newPartitionSet, newTxnStartTimestamp, updateTimestamp); + } + + public TxnTransitMetadata prepareAbortOrCommit(TransactionState newState, Long updateTimestamp) { + return prepareTransitionTo(newState, producerId, producerEpoch, + txnTimeoutMs, topicPartitions, txnStartTimestamp, updateTimestamp); + } + + public TxnTransitMetadata prepareComplete(Long updateTimestamp) { + TransactionState newState; + if (state == TransactionState.PREPARE_COMMIT) { + newState = TransactionState.COMPLETE_COMMIT; + } else { + newState = TransactionState.COMPLETE_ABORT; + } + return prepareTransitionTo(newState, producerId, producerEpoch, + txnTimeoutMs, Collections.emptySet(), txnStartTimestamp, updateTimestamp); + } + + private void throwStateTransitionFailure(TxnTransitMetadata txnTransitMetadata) throws IllegalStateException { + log.error("{} transition to {} failed: this should not happen.", this, txnTransitMetadata); + throw new IllegalStateException("TransactionalId " + transactionalId + " failed transition to state " + + txnTransitMetadata + "due to unexpected metadata"); + } + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java new file mode 100644 index 0000000000..63e7afddf4 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java @@ -0,0 +1,71 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +/** + * Transaction state. + */ +public enum TransactionState { + + /** + * Transaction has not existed yet. + * transition: received AddPartitionsToTxnRequest => Ongoing + * received AddOffsetsToTxnRequest => Ongoing + */ + EMPTY, + + /** + * Transaction has started and ongoing. + * transition: received EndTxnRequest with commit => PrepareCommit + * received EndTxnRequest with abort => PrepareAbort + * received AddPartitionsToTxnRequest => Ongoing + * received AddOffsetsToTxnRequest => Ongoing + */ + ONGOING, + + /** + * Group is preparing to commit. + * transition: received acks from all partitions => CompleteCommit + */ + PREPARE_COMMIT, + + /** + * Group is preparing to abort. + * transition: received acks from all partitions => CompleteAbort + */ + PREPARE_ABORT, + + /** + * Group has completed commit. + * Will soon be removed from the ongoing transaction cache + */ + COMPLETE_COMMIT, + + /** + * Group has completed abort. + * Will soon be removed from the ongoing transaction cache + */ + COMPLETE_ABORT, + + /** + * TransactionalId has expired and is about to be removed from the transaction cache. + */ + DEAD, + + /** + * We are in the middle of bumping the epoch and fencing out older producers. + */ + PREPARE_EPOCH_FENCE; + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java new file mode 100644 index 0000000000..e9b87e1c08 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -0,0 +1,98 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; + +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestUtils; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.common.requests.WriteTxnMarkersRequest; + +/** + * Transaction state manager. + */ +@Slf4j +public class TransactionStateManager { + + private final TransactionConfig transactionConfig; + private final Map transactionStateMap; + private ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock(); + + public TransactionStateManager(TransactionConfig transactionConfig) { + this.transactionConfig = transactionConfig; + this.transactionStateMap = new ConcurrentHashMap<>(); + } + + public void putTransactionStateIfNotExists(String transactionalId, TransactionMetadata metadata) { + transactionStateMap.put(transactionalId, metadata); + } + + public void appendTransactionToLog(String transactionalId, + int coordinatorEpoch, + TransactionMetadata.TxnTransitMetadata newMetadata, + ResponseCallback responseCallback) { + try { + // TODO save transaction log + TransactionMetadata metadata = getTransactionState(transactionalId); + metadata.completeTransitionTo(newMetadata); + responseCallback.complete(); + } catch (Exception e) { + // TODO exception process + log.error("failed to handle", e); + responseCallback.fail(e); + } + } + + /** + * Response callback interface. + */ + public interface ResponseCallback { + void complete(); + void fail(Exception e); + } + + public ByteBuf getWriteMarker(String transactionalId) { + TransactionMetadata metadata = transactionStateMap.get(transactionalId); + WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry = new WriteTxnMarkersRequest.TxnMarkerEntry( + metadata.getProducerId(), + metadata.getProducerEpoch(), + 1, + TransactionResult.COMMIT, + new ArrayList<>(metadata.getTopicPartitions())); + WriteTxnMarkersRequest txnMarkersRequest = new WriteTxnMarkersRequest.Builder( + Lists.newArrayList(txnMarkerEntry)).build(); + RequestHeader requestHeader = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, txnMarkersRequest.version(), "", -1); + return RequestUtils.serializeRequest(txnMarkersRequest.version(), requestHeader, txnMarkersRequest); + } + + + public TransactionMetadata getTransactionState(String transactionalId) { + return transactionStateMap.get(transactionalId); + } + + /** + * Validate the given transaction timeout value. + */ + public boolean validateTransactionTimeoutMs(int txnTimeoutMs) { + return txnTimeoutMs <= transactionConfig.getTransactionMaxTimeoutMs() && txnTimeoutMs > 0; + } + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/package-info.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/package-info.java new file mode 100644 index 0000000000..118fcc2b0b --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Timer related classes. + * + *

The classes under this package are ported from Kafka. + */ +package io.streamnative.pulsar.handlers.kop.coordinator.transaction; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java index c11fb7a4f6..d5b648a0c9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java @@ -51,6 +51,8 @@ public MemoryRecords decode(List entries, byte magic) { for (Entry entry : entries) { size += entry.getLength(); } + // TODO The memory records should maintain multiple batches, one entry present one batch, + // this is necessary, because one entry only belongs to one transaction. final MemoryRecordsBuilder builder = MemoryRecords.builder( ByteBuffer.allocate(size), magic, @@ -83,4 +85,5 @@ private static PulsarApi.MessageMetadata getMessageMetadataWithNumberMessages(in builder.setNumMessagesInBatch(numMessages); return builder.build(); } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index 103839cf86..917d8e6c29 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -30,12 +30,13 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; @@ -44,6 +45,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; +import org.apache.pulsar.common.api.proto.PulsarMarkers; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.protocol.Commands; @@ -75,12 +77,18 @@ public ByteBuf encode(final MemoryRecords records, final int numMessages) { List> messages = Lists.newArrayListWithExpectedSize(numMessages); MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); - StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { - MessageImpl message = recordToEntry(record); - messages.add(message); - if (messageMetaBuilder.getPublishTime() <= 0) { - messageMetaBuilder.setPublishTime(message.getPublishTime()); - } + records.batches().forEach(recordBatch -> { + StreamSupport.stream(recordBatch.spliterator(), true).forEachOrdered(record -> { + MessageImpl message = recordToEntry(record); + messages.add(message); + if (messageMetaBuilder.getPublishTime() <= 0) { + messageMetaBuilder.setPublishTime(message.getPublishTime()); + } + if (recordBatch.isTransactional()) { + messageMetaBuilder.setTxnidMostBits(recordBatch.producerId()); + messageMetaBuilder.setTxnidLeastBits(recordBatch.producerEpoch()); + } + }); }); for (MessageImpl message : messages) { @@ -123,105 +131,124 @@ public ByteBuf encode(final MemoryRecords records, final int numMessages) { @Override public MemoryRecords decode(final List entries, final byte magic) { - try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) { - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, magic, + ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_FETCH_BUFFER_SIZE); + + entries.parallelStream().forEachOrdered(entry -> { + long baseOffset = MessageIdUtils.peekBaseOffsetFromEntry(entry); + // each entry is a batched message + ByteBuf metadataAndPayload = entry.getDataBuffer(); + + // Uncompress the payload if necessary + MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); + + if (msgMetadata.getMarkerType() == PulsarMarkers.MarkerType.TXN_COMMIT_VALUE + || msgMetadata.getMarkerType() == PulsarMarkers.MarkerType.TXN_ABORT_VALUE) { + MemoryRecords memoryRecords = MemoryRecords.withEndTransactionMarker( + baseOffset, + msgMetadata.getPublishTime(), + 0, + msgMetadata.getTxnidMostBits(), + (short) msgMetadata.getTxnidLeastBits(), + new EndTransactionMarker( + msgMetadata.getMarkerType() == PulsarMarkers.MarkerType.TXN_COMMIT_VALUE + ? ControlRecordType.COMMIT : ControlRecordType.ABORT, 0)); + byteBuffer.put(memoryRecords.buffer()); + return; + } + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(byteBuffer, magic, org.apache.kafka.common.record.CompressionType.NONE, TimestampType.CREATE_TIME, // using the first entry, index 0 as base offset - MessageIdUtils.peekBaseOffsetFromEntry(entries.get(0)), - RecordBatch.NO_TIMESTAMP, + baseOffset, + msgMetadata.getPublishTime(), RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, + msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits(), + false, RecordBatch.NO_PARTITION_LEADER_EPOCH, MAX_RECORDS_BUFFER_SIZE); - entries.parallelStream().forEachOrdered(entry -> { - // each entry is a batched message - ByteBuf metadataAndPayload = entry.getDataBuffer(); - - long entryOffset = MessageIdUtils.peekOffsetFromEntry(entry); - // Uncompress the payload if necessary - MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); - CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression()); - int uncompressedSize = msgMetadata.getUncompressedSize(); - ByteBuf payload; - try { - payload = codec.decode(metadataAndPayload, uncompressedSize); - } catch (IOException ioe) { - log.error("Meet IOException: {}", ioe); - throw new UncheckedIOException(ioe); - } - int numMessages = msgMetadata.getNumMessagesInBatch(); - boolean notBatchMessage = (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()); - - if (log.isDebugEnabled()) { - log.debug("entriesToRecords. NumMessagesInBatch: {}, isBatchMessage: {}, entries in list: {}." - + " new entryId {}:{}, readerIndex: {}, writerIndex: {}", - numMessages, !notBatchMessage, entries.size(), entry.getLedgerId(), - entry.getEntryId(), payload.readerIndex(), payload.writerIndex()); - } + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression()); + int uncompressedSize = msgMetadata.getUncompressedSize(); + ByteBuf payload; + try { + payload = codec.decode(metadataAndPayload, uncompressedSize); + } catch (IOException ioe) { + log.error("Meet IOException: {}", ioe); + throw new UncheckedIOException(ioe); + } + int numMessages = msgMetadata.getNumMessagesInBatch(); + boolean notBatchMessage = (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()); - // need handle encryption - checkState(msgMetadata.getEncryptionKeysCount() == 0); - - if (!notBatchMessage) { - long startOffset = entryOffset - numMessages + 1; - IntStream.range(0, numMessages).parallel().forEachOrdered(i -> { - if (log.isDebugEnabled()) { - log.debug(" processing message num - {} in batch", i); - } - try { - SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata - .newBuilder(); - ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, - singleMessageMetadataBuilder, i, numMessages); - - SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build(); - Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList()); - - - final ByteBuffer value = (singleMessageMetadata.getNullValue()) - ? null - : ByteBufUtils.getNioBuffer(singleMessagePayload); - builder.appendWithOffset( - startOffset + i, - msgMetadata.getEventTime() > 0 - ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - ByteBufUtils.getKeyByteBuffer(singleMessageMetadata), - value, - headers); - singleMessagePayload.release(); - singleMessageMetadataBuilder.recycle(); - } catch (IOException e) { - log.error("Meet IOException: {}", e); - throw new UncheckedIOException(e); - } - }); - } else { - Header[] headers = getHeadersFromMetadata(msgMetadata.getPropertiesList()); - - builder.appendWithOffset( - entryOffset, - msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - ByteBufUtils.getKeyByteBuffer(msgMetadata), - ByteBufUtils.getNioBuffer(payload), - headers); - } + if (log.isDebugEnabled()) { + log.debug("entriesToRecords. NumMessagesInBatch: {}, isBatchMessage: {}, entries in list: {}." + + " new entryId {}:{}, readerIndex: {}, writerIndex: {}", + numMessages, !notBatchMessage, entries.size(), entry.getLedgerId(), + entry.getEntryId(), payload.readerIndex(), payload.writerIndex()); + } - msgMetadata.recycle(); - payload.release(); - entry.release(); - }); - return builder.build(); - } catch (IOException ioe){ - log.error("Meet IOException: {}", ioe); - throw new UncheckedIOException(ioe); - } catch (Exception e){ - log.error("Meet exception: {}", e); - throw e; - } + // need handle encryption + checkState(msgMetadata.getEncryptionKeysCount() == 0); + + if (msgMetadata.hasTxnidMostBits()) { + builder.setProducerState( + msgMetadata.getTxnidMostBits(), + (short) msgMetadata.getTxnidLeastBits(), 0, true); + } + + if (!notBatchMessage) { + IntStream.range(0, numMessages).parallel().forEachOrdered(i -> { + if (log.isDebugEnabled()) { + log.debug(" processing message num - {} in batch", i); + } + try { + SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata + .newBuilder(); + ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, + singleMessageMetadataBuilder, i, numMessages); + + SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build(); + Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList()); + + final ByteBuffer value = (singleMessageMetadata.getNullValue()) + ? null + : ByteBufUtils.getNioBuffer(singleMessagePayload); + builder.appendWithOffset( + baseOffset + i, + msgMetadata.getEventTime() > 0 + ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), + ByteBufUtils.getKeyByteBuffer(singleMessageMetadata), + value, + headers); + singleMessagePayload.release(); + singleMessageMetadataBuilder.recycle(); + } catch (IOException e) { + log.error("Meet IOException: {}", e); + throw new UncheckedIOException(e); + } + }); + } else { + Header[] headers = getHeadersFromMetadata(msgMetadata.getPropertiesList()); + + builder.appendWithOffset( + baseOffset, + msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), + ByteBufUtils.getKeyByteBuffer(msgMetadata), + ByteBufUtils.getNioBuffer(payload), + headers); + } + + msgMetadata.recycle(); + payload.release(); + entry.release(); + + builder.close(); + }); + + byteBuffer.flip(); + return MemoryRecords.readableRecords(byteBuffer); } // convert kafka Record to Pulsar Message. diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/RequestUtils.java new file mode 100644 index 0000000000..ce5f582385 --- /dev/null +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/RequestUtils.java @@ -0,0 +1,55 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.nio.ByteBuffer; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.types.Struct; + +/** + * Provide util classes to access protected fields in kafka structures. + */ +@Slf4j +public class RequestUtils { + + /** + * Serialize a kafka request into a byte buf. + * @param version + * @param requestHeader + * @param request + * @return + */ + public static ByteBuf serializeRequest(short version, + RequestHeader requestHeader, + AbstractRequest request) { + return serialize( + requestHeader.toStruct(), + request.toStruct() + ); + } + + public static ByteBuf serialize(Struct headerStruct, Struct bodyStruct) { + int size = headerStruct.sizeOf() + bodyStruct.sizeOf(); + ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(size, size); + buf.writerIndex(buf.readerIndex() + size); + ByteBuffer buffer = buf.nioBuffer(); + headerStruct.writeTo(buffer); + bodyStruct.writeTo(buffer); + buffer.rewind(); + return buf; + } + +} diff --git a/pom.xml b/pom.xml index 33c6307f86..3c937d1d58 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ 2.13.3 1.18.4 2.22.0 - 2.8.0-rc-202101042232 + 2.8.0-rc-202101052233 1.7.25 3.1.8 1.12.5 diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeTest.java index ead4542131..c24a9b822e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeTest.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import java.net.InetSocketAddress; import java.net.SocketAddress; import org.apache.pulsar.broker.protocol.ProtocolHandler; @@ -81,11 +82,13 @@ protected void setup() throws Exception { ProtocolHandler handler = pulsar.getProtocolHandlers().protocol("kafka"); GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator(); + TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler).getTransactionCoordinator(); kafkaRequestHandler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf, groupCoordinator, + transactionCoordinator, false, getPlainEndPoint()); ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 0e3384d829..59b684fd30 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandlerContext; import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -132,11 +133,13 @@ protected void setup() throws Exception { ProtocolHandler handler = pulsar.getProtocolHandlers().protocol("kafka"); GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator(); + TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler).getTransactionCoordinator(); kafkaRequestHandler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf, groupCoordinator, + transactionCoordinator, false, getPlainEndPoint()); ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java index 5875a1156f..6ac5b4b5c7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java @@ -156,8 +156,8 @@ private static void checkForErrorsInLogs(final String logs) { @BeforeClass @Override protected void setup() throws Exception { - super.resetConfig(); + this.conf.setEnableTransactionCoordinator(true); // in order to access PulsarBroker when using Docker for Mac, we need to adjust things: // - set pulsar advertized address to host IP // - use the `host.testcontainers.internal` address exposed by testcontainers diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index d348cdf9a1..6dd06aad8e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -31,6 +31,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -141,11 +142,13 @@ protected void setup() throws Exception { ProtocolHandler handler1 = pulsar.getProtocolHandlers().protocol("kafka"); GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler1).getGroupCoordinator(); + TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler1).getTransactionCoordinator(); handler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf, groupCoordinator, + transactionCoordinator, false, getPlainEndPoint()); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java index 46897c5c18..eaf37c3961 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Properties; @@ -60,11 +61,13 @@ protected void setup() throws Exception { ProtocolHandler handler = pulsar.getProtocolHandlers().protocol("kafka"); GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator(); + TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler).getTransactionCoordinator(); kafkaRequestHandler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf, groupCoordinator, + transactionCoordinator, false, getPlainEndPoint()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java new file mode 100644 index 0000000000..54234fab09 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java @@ -0,0 +1,201 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import com.google.common.collect.Sets; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Transaction test. + */ +@Slf4j +public class TransactionTest extends KopProtocolHandlerTestBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + this.conf.setEnableTransactionCoordinator(true); + super.internalSetup(); + log.info("success internal setup"); + + if (!admin.clusters().getClusters().contains(configClusterName)) { + // so that clients can test short names + admin.clusters().createCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } else { + admin.clusters().updateCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } + + if (!admin.tenants().getTenants().contains("public")) { + admin.tenants().createTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } else { + admin.tenants().updateTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } + if (!admin.namespaces().getNamespaces("public").contains("public/default")) { + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/default", + new RetentionPolicies(60, 1000)); + } + if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) { + admin.namespaces().createNamespace("public/__kafka"); + admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/__kafka", + new RetentionPolicies(-1, -1)); + } + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 1000 * 20) + public void readCommittedTest() throws Exception { + basicProduceAndConsumeTest("read-committed-test", "txn-11", "read_committed"); + } + + @Test(timeOut = 1000 * 20) + public void readUncommittedTest() throws Exception { + basicProduceAndConsumeTest("read-uncommitted-test", "txn-12", "read_uncommitted"); + } + + public void basicProduceAndConsumeTest(String topicName, + String transactionalId, + String isolation) throws Exception { + String kafkaServer = "localhost:" + getKafkaBrokerPort(); + + Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10); + producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + + @Cleanup + KafkaProducer producer = new KafkaProducer<>(producerProps); + + producer.initTransactions(); + + int totalTxnCount = 10; + int messageCountPerTxn = 10; + + String lastMessage = ""; + for (int txnIndex = 0; txnIndex < totalTxnCount; txnIndex++) { + producer.beginTransaction(); + + String contentBase; + if (txnIndex % 2 != 0) { + contentBase = "commit msg txnIndex %s messageIndex %s"; + } else { + contentBase = "abort msg txnIndex %s messageIndex %s"; + } + + for (int messageIndex = 0; messageIndex < messageCountPerTxn; messageIndex++) { + String msgContent = String.format(contentBase, txnIndex, messageIndex); + log.info("send txn message {}", msgContent); + lastMessage = msgContent; + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)).get(); + } + + if (txnIndex % 2 != 0) { + producer.commitTransaction(); + } else { + producer.abortTransaction(); + } + } + + consumeTxnMessage(topicName, totalTxnCount * messageCountPerTxn, lastMessage, isolation); + } + + private void consumeTxnMessage(String topicName, + int totalMessageCount, + String lastMessage, + String isolation) throws InterruptedException { + String kafkaServer = "localhost:" + getKafkaBrokerPort(); + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolation); + + @Cleanup + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singleton(topicName)); + + log.info("the last message is: {}", lastMessage); + AtomicInteger receiveCount = new AtomicInteger(0); + while (true) { + ConsumerRecords consumerRecords = + consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + + boolean readFinish = false; + for (ConsumerRecord record : consumerRecords) { + log.info("Fetch for receive record offset: {}, key: {}, value: {}", + record.offset(), record.key(), record.value()); + receiveCount.incrementAndGet(); + if (lastMessage.equalsIgnoreCase(record.value())) { + log.info("receive the last message"); + readFinish = true; + } + } + + if (readFinish) { + log.info("Fetch for read finish."); + break; + } + } + log.info("Fetch for receive message finish. isolation: {}, receive count: {}", isolation, receiveCount.get()); + + if (isolation.equals("read_committed")) { + Assert.assertEquals(receiveCount.get(), totalMessageCount / 2); + } else { + Assert.assertEquals(receiveCount.get(), totalMessageCount); + } + log.info("Fetch for finish consume messages. isolation: {}", isolation); + } + +}