Skip to content

Commit

Permalink
add golang transaction demo
Browse files Browse the repository at this point in the history
  • Loading branch information
ninjazhou committed Jul 31, 2024
1 parent e41fdc0 commit 52adf5a
Show file tree
Hide file tree
Showing 5 changed files with 650 additions and 5 deletions.
131 changes: 130 additions & 1 deletion docs/txn-use.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps.
Transaction coordinator metadata setup success
```

3. Create a Pulsar client and enable transactions.
3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions.

4. Create producers and consumers.

Expand All @@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps.

**Input**

````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>

<TabItem value="Java">

```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
Expand Down Expand Up @@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following steps.
}
```
</TabItem>
<TabItem value="Go">
```go
// Step 3: create a Pulsar client and enable transactions.
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<serviceUrl>",
EnableTransaction: true,
})
if err != nil {
log.Fatalf("create client fail, err = %v", err)
}
defer client.Close()
// Step 4: create three producers to produce messages to input and output topics.
inputTopic := "inputTopic"
outputTopicOne := "outputTopicOne"
outputTopicTwo := "outputTopicTwo"
subscriptionName := "your-subscription-name"
inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: inputTopic,
SendTimeout: 0,
})
defer inputProducer.Close()
outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: outputTopicOne,
SendTimeout: 0,
})
defer outputProducerOne.Close()
outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: outputTopicTwo,
SendTimeout: 0,
})
defer outputProducerTwo.Close()
// Step 4: create three consumers to consume messages from input and output topics.
inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: inputTopic,
SubscriptionName: subscriptionName,
})
defer inputConsumer.Close()
outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: outputTopicOne,
SubscriptionName: subscriptionName,
})
defer outputConsumerOne.Close()
outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: outputTopicTwo,
SubscriptionName: subscriptionName,
})
defer outputConsumerTwo.Close()
// Step 5: produce messages to input topics.
ctx := context.Background()
count := 2
for i := 0; i < count; i++ {
inputProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
})
}
// Step 5: consume messages and produce them to output topics with transactions.
for i := 0; i < count; i++ {
// Step 5: the consumer successfully receives messages.
message, err := inputConsumer.Receive(ctx)
if err != nil {
log.Printf("receive message from %s fail, err = %v", inputTopic, err)
continue
}
// Step 6: create transactions.
// The transaction timeout is specified as 10 seconds.
// If the transaction is not committed within 10 seconds, the transaction is automatically aborted.
txn, err := client.NewTransaction(10 * time.Second)
if err != nil {
log.Printf("create txn fail, err = %v", err)
continue
}
// Step 6: you can process the received message with your use case and business logic.
// processMessage(message)
// Step 7: the producers produce messages to output topics with transactions
_, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{
Transaction: txn,
Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)),
})
if err != nil {
log.Printf("send to producerOne fail %v", err)
txn.Abort(ctx)
}
_, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{
Transaction: txn,
Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)),
})
if err != nil {
log.Printf("send to producerTwo fail %v", err)
txn.Abort(ctx)
}
// Step 7: the consumers acknowledge the input message with the transactions *individually*.
err = inputConsumer.AckWithTxn(message, txn)
if err != nil {
log.Printf("ack message fail %v", err)
txn.Abort(ctx)
}
// Step 8: commit transactions.
err = txn.Commit(ctx)
if err != nil {
log.Printf("commit txn fail %v", err)
}
}
// Final result: consume messages from output topics and print them.
for i := 0; i < count; i++ {
message, _ := outputConsumerOne.Receive(ctx)
log.Printf("Receive transaction message: %s", string(message.Payload()))
}
for i := 0; i < count; i++ {
message, _ := outputConsumerTwo.Receive(ctx)
log.Printf("Receive transaction message: %s", string(message.Payload()))
}
```
</TabItem>
</Tabs>
````
**Output**
```java
Expand Down
131 changes: 130 additions & 1 deletion versioned_docs/version-3.0.x/txn-use.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0
Transaction coordinator metadata setup success
```

3. Create a Pulsar client and enable transactions.
3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions.

4. Create producers and consumers.

Expand All @@ -75,6 +75,13 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0

**Input**

````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>

<TabItem value="Java">

```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
Expand Down Expand Up @@ -157,6 +164,128 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0
}
```
</TabItem>
<TabItem value="Go">
```go
// Step 3: create a Pulsar client and enable transactions.
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<serviceUrl>",
EnableTransaction: true,
})
if err != nil {
log.Fatalf("create client fail, err = %v", err)
}
defer client.Close()
// Step 4: create three producers to produce messages to input and output topics.
inputTopic := "inputTopic"
outputTopicOne := "outputTopicOne"
outputTopicTwo := "outputTopicTwo"
subscriptionName := "your-subscription-name"
inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: inputTopic,
SendTimeout: 0,
})
defer inputProducer.Close()
outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: outputTopicOne,
SendTimeout: 0,
})
defer outputProducerOne.Close()
outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: outputTopicTwo,
SendTimeout: 0,
})
defer outputProducerTwo.Close()
// Step 4: create three consumers to consume messages from input and output topics.
inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: inputTopic,
SubscriptionName: subscriptionName,
})
defer inputConsumer.Close()
outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: outputTopicOne,
SubscriptionName: subscriptionName,
})
defer outputConsumerOne.Close()
outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: outputTopicTwo,
SubscriptionName: subscriptionName,
})
defer outputConsumerTwo.Close()
// Step 5: produce messages to input topics.
ctx := context.Background()
count := 2
for i := 0; i < count; i++ {
inputProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
})
}
// Step 5: consume messages and produce them to output topics with transactions.
for i := 0; i < count; i++ {
// Step 5: the consumer successfully receives messages.
message, err := inputConsumer.Receive(ctx)
if err != nil {
log.Printf("receive message from %s fail, err = %v", inputTopic, err)
continue
}
// Step 6: create transactions.
// The transaction timeout is specified as 10 seconds.
// If the transaction is not committed within 10 seconds, the transaction is automatically aborted.
txn, err := client.NewTransaction(10 * time.Second)
if err != nil {
log.Printf("create txn fail, err = %v", err)
continue
}
// Step 6: you can process the received message with your use case and business logic.
// processMessage(message)
// Step 7: the producers produce messages to output topics with transactions
_, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{
Transaction: txn,
Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)),
})
if err != nil {
log.Printf("send to producerOne fail %v", err)
txn.Abort(ctx)
}
_, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{
Transaction: txn,
Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)),
})
if err != nil {
log.Printf("send to producerTwo fail %v", err)
txn.Abort(ctx)
}
// Step 7: the consumers acknowledge the input message with the transactions *individually*.
err = inputConsumer.AckWithTxn(message, txn)
if err != nil {
log.Printf("ack message fail %v", err)
txn.Abort(ctx)
}
// Step 8: commit transactions.
err = txn.Commit(ctx)
if err != nil {
log.Printf("commit txn fail %v", err)
}
}
// Final result: consume messages from output topics and print them.
for i := 0; i < count; i++ {
message, _ := outputConsumerOne.Receive(ctx)
log.Printf("Receive transaction message: %s", string(message.Payload()))
}
for i := 0; i < count; i++ {
message, _ := outputConsumerTwo.Receive(ctx)
log.Printf("Receive transaction message: %s", string(message.Payload()))
}
```
</TabItem>
</Tabs>
````
**Output**
```java
Expand Down
Loading

0 comments on commit 52adf5a

Please sign in to comment.