Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[Transaction] KoP transaction stage0 (#295)
Browse files Browse the repository at this point in the history
Task issue #39

# Motivation

KoP transaction stage0.

This PR initialized some classes for the KoP transaction.
Make the basic publish and dispatch transaction messages process pass.
  • Loading branch information
gaoran10 authored Jan 20, 2021
1 parent a3cb3fb commit bb66ff9
Show file tree
Hide file tree
Showing 33 changed files with 2,075 additions and 142 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: kop mvn build check and ut
name: kop mvn build check and kafka-impl test

on:
pull_request:
Expand Down Expand Up @@ -33,11 +33,8 @@ jobs:
- name: Spotbugs check
run: mvn spotbugs:check

- name: KafkaIntegrationTest
run: mvn test '-Dtest=KafkaIntegration*Test' -pl tests

- name: test after build
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest'
- name: kafka-impl test after build
run: mvn test -DfailIfNoTests=false -pl kafka-impl

# The DistributedClusterTest is hard to pass in CI tests environment, we disable it first
# the track issue: https://github.com/streamnative/kop/issues/184
Expand Down
43 changes: 43 additions & 0 deletions .github/workflows/pr-integration-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: kop integration tests

on:
pull_request:
branches:
- master
push:
branches:
- master
- branch-*

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8

- name: Build with Maven skipTests
run: mvn clean install -DskipTests

- name: kop integration test
run: mvn test '-Dtest=KafkaIntegration*Test' -pl tests

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
43 changes: 43 additions & 0 deletions .github/workflows/pr-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: kop tests

on:
pull_request:
branches:
- master
push:
branches:
- master
- branch-*

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8

- name: Build with Maven skipTests
run: mvn clean install -DskipTests

- name: tests module
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest' -pl tests

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -40,6 +41,8 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
@Getter
private final GroupCoordinator groupCoordinator;
@Getter
private final TransactionCoordinator transactionCoordinator;
@Getter
private final boolean enableTls;
@Getter
private final EndPoint advertisedEndPoint;
Expand All @@ -49,12 +52,14 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
public KafkaChannelInitializer(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
GroupCoordinator groupCoordinator,
TransactionCoordinator transactionCoordinator,
boolean enableTLS,
EndPoint advertisedEndPoint) {
super();
this.pulsarService = pulsarService;
this.kafkaConfig = kafkaConfig;
this.groupCoordinator = groupCoordinator;
this.transactionCoordinator = transactionCoordinator;
this.enableTls = enableTLS;
this.advertisedEndPoint = advertisedEndPoint;

Expand All @@ -74,7 +79,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig, groupCoordinator, enableTls, advertisedEndPoint));
new KafkaRequestHandler(pulsarService, kafkaConfig,
groupCoordinator, transactionCoordinator, enableTls, advertisedEndPoint));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,24 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case CREATE_TOPICS:
handleCreateTopics(kafkaHeaderAndRequest, responseFuture);
break;
case INIT_PRODUCER_ID:
handleInitProducerId(kafkaHeaderAndRequest, responseFuture);
break;
case ADD_PARTITIONS_TO_TXN:
handleAddPartitionsToTxn(kafkaHeaderAndRequest, responseFuture);
break;
case ADD_OFFSETS_TO_TXN:
handleAddOffsetsToTxn(kafkaHeaderAndRequest, responseFuture);
break;
case TXN_OFFSET_COMMIT:
handleTxnOffsetCommit(kafkaHeaderAndRequest, responseFuture);
break;
case END_TXN:
handleEndTxn(kafkaHeaderAndRequest, responseFuture);
break;
case WRITE_TXN_MARKERS:
handleWriteTxnMarkers(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_CONFIGS:
handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -372,6 +390,24 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) {
protected abstract void
handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleAddOffsetsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleTxnOffsetCommit(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

static class KafkaHeaderAndRequest implements Closeable {

private static final String DEFAULT_CLIENT_HOST = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
Expand Down Expand Up @@ -189,6 +191,8 @@ public enum ListenerType {
@Getter
private GroupCoordinator groupCoordinator;
@Getter
private TransactionCoordinator transactionCoordinator;
@Getter
private String bindAddress;


Expand Down Expand Up @@ -252,6 +256,13 @@ public void start(BrokerService service) {
log.error("initGroupCoordinator failed with", e);
}
}
if (kafkaConfig.isEnableTransactionCoordinator()) {
try {
initTransactionCoordinator();
} catch (Exception e) {
log.error("Initialized transaction coordinator failed.", e);
}
}
}

// this is called after initialize, and with kafkaConfig, brokerService all set.
Expand Down Expand Up @@ -279,12 +290,12 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
case PLAINTEXT:
case SASL_PLAINTEXT:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, false, advertisedEndPoint));
kafkaConfig, groupCoordinator, transactionCoordinator, false, advertisedEndPoint));
break;
case SSL:
case SASL_SSL:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, groupCoordinator, true, advertisedEndPoint));
kafkaConfig, groupCoordinator, transactionCoordinator, true, advertisedEndPoint));
break;
}
});
Expand Down Expand Up @@ -349,6 +360,11 @@ public void startGroupCoordinator() throws Exception {
}
}

public void initTransactionCoordinator() throws Exception {
TransactionConfig transactionConfig = TransactionConfig.builder().build();
this.transactionCoordinator = TransactionCoordinator.of(transactionConfig);
}

/**
* This method discovers ownership of offset topic partitions and attempts to load offset topics
* assigned to this broker.
Expand Down
Loading

0 comments on commit bb66ff9

Please sign in to comment.