Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][doc] add golang transaction demo in txn-use.md #948

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 130 additions & 1 deletion docs/txn-use.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps.
Transaction coordinator metadata setup success
```

3. Create a Pulsar client and enable transactions.
3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions.

4. Create producers and consumers.

Expand All @@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps.

**Input**

````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>

<TabItem value="Java">

```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
Expand Down Expand Up @@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following steps.
}
```

</TabItem>
<TabItem value="Go">

```go
// Step 3: create a Pulsar client and enable transactions.
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<serviceUrl>",
EnableTransaction: true,
})
if err != nil {
log.Fatalf("create client fail, err = %v", err)
}
defer client.Close()
// Step 4: create three producers to produce messages to input and output topics.
inputTopic := "inputTopic"
outputTopicOne := "outputTopicOne"
outputTopicTwo := "outputTopicTwo"
subscriptionName := "your-subscription-name"
inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: inputTopic,
SendTimeout: 0,
})
defer inputProducer.Close()
outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: outputTopicOne,
SendTimeout: 0,
})
defer outputProducerOne.Close()
outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: outputTopicTwo,
SendTimeout: 0,
})
defer outputProducerTwo.Close()

// Step 4: create three consumers to consume messages from input and output topics.
inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: inputTopic,
SubscriptionName: subscriptionName,
})
defer inputConsumer.Close()
outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: outputTopicOne,
SubscriptionName: subscriptionName,
})
defer outputConsumerOne.Close()
outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: outputTopicTwo,
SubscriptionName: subscriptionName,
})
defer outputConsumerTwo.Close()

// Step 5: produce messages to input topics.
ctx := context.Background()
count := 2
for i := 0; i < count; i++ {
inputProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
})
}
// Step 5: consume messages and produce them to output topics with transactions.
for i := 0; i < count; i++ {
// Step 5: the consumer successfully receives messages.
message, err := inputConsumer.Receive(ctx)
if err != nil {
log.Printf("receive message from %s fail, err = %v", inputTopic, err)
continue
}
// Step 6: create transactions.
// The transaction timeout is specified as 10 seconds.
// If the transaction is not committed within 10 seconds, the transaction is automatically aborted.
txn, err := client.NewTransaction(10 * time.Second)
if err != nil {
log.Printf("create txn fail, err = %v", err)
continue
}
// Step 6: you can process the received message with your use case and business logic.
// processMessage(message)
// Step 7: the producers produce messages to output topics with transactions
_, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{
Transaction: txn,
Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)),
})
if err != nil {
log.Printf("send to producerOne fail %v", err)
txn.Abort(ctx)
}
_, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{
Transaction: txn,
Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)),
})
if err != nil {
log.Printf("send to producerTwo fail %v", err)
txn.Abort(ctx)
}
// Step 7: the consumers acknowledge the input message with the transactions *individually*.
err = inputConsumer.AckWithTxn(message, txn)
if err != nil {
log.Printf("ack message fail %v", err)
txn.Abort(ctx)
}
// Step 8: commit transactions.
err = txn.Commit(ctx)
if err != nil {
log.Printf("commit txn fail %v", err)
}
}

// Final result: consume messages from output topics and print them.
for i := 0; i < count; i++ {
message, _ := outputConsumerOne.Receive(ctx)
log.Printf("Receive transaction message: %s", string(message.Payload()))
}
for i := 0; i < count; i++ {
message, _ := outputConsumerTwo.Receive(ctx)
log.Printf("Receive transaction message: %s", string(message.Payload()))
}
```

</TabItem>
</Tabs>
````

**Output**

```java
Expand Down
131 changes: 130 additions & 1 deletion versioned_docs/version-3.0.x/txn-use.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0
Transaction coordinator metadata setup success
```

3. Create a Pulsar client and enable transactions.
3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions.

4. Create producers and consumers.

Expand All @@ -75,6 +75,13 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0

**Input**

````mdx-code-block
<Tabs groupId="api-choice"
defaultValue="Java"
values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}>

<TabItem value="Java">

```java
PulsarClient client = PulsarClient.builder()
// Step 3: create a Pulsar client and enable transactions.
Expand Down Expand Up @@ -157,6 +164,128 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0
}
```

</TabItem>
<TabItem value="Go">

```go
// Step 3: create a Pulsar client and enable transactions.
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<serviceUrl>",
EnableTransaction: true,
})
if err != nil {
log.Fatalf("create client fail, err = %v", err)
}
defer client.Close()
// Step 4: create three producers to produce messages to input and output topics.
inputTopic := "inputTopic"
outputTopicOne := "outputTopicOne"
outputTopicTwo := "outputTopicTwo"
subscriptionName := "your-subscription-name"
inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: inputTopic,
SendTimeout: 0,
})
defer inputProducer.Close()
outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: outputTopicOne,
SendTimeout: 0,
})
defer outputProducerOne.Close()
outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: outputTopicTwo,
SendTimeout: 0,
})
defer outputProducerTwo.Close()

// Step 4: create three consumers to consume messages from input and output topics.
inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: inputTopic,
SubscriptionName: subscriptionName,
})
defer inputConsumer.Close()
outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: outputTopicOne,
SubscriptionName: subscriptionName,
})
defer outputConsumerOne.Close()
outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: outputTopicTwo,
SubscriptionName: subscriptionName,
})
defer outputConsumerTwo.Close()

// Step 5: produce messages to input topics.
ctx := context.Background()
count := 2
for i := 0; i < count; i++ {
inputProducer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)),
})
}
// Step 5: consume messages and produce them to output topics with transactions.
for i := 0; i < count; i++ {
// Step 5: the consumer successfully receives messages.
message, err := inputConsumer.Receive(ctx)
if err != nil {
log.Printf("receive message from %s fail, err = %v", inputTopic, err)
continue
}
// Step 6: create transactions.
// The transaction timeout is specified as 10 seconds.
// If the transaction is not committed within 10 seconds, the transaction is automatically aborted.
txn, err := client.NewTransaction(10 * time.Second)
if err != nil {
log.Printf("create txn fail, err = %v", err)
continue
}
// Step 6: you can process the received message with your use case and business logic.
// processMessage(message)
// Step 7: the producers produce messages to output topics with transactions
_, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{
Transaction: txn,
Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)),
})
if err != nil {
log.Printf("send to producerOne fail %v", err)
txn.Abort(ctx)
}
_, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{
Transaction: txn,
Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)),
})
if err != nil {
log.Printf("send to producerTwo fail %v", err)
txn.Abort(ctx)
}
// Step 7: the consumers acknowledge the input message with the transactions *individually*.
err = inputConsumer.AckWithTxn(message, txn)
if err != nil {
log.Printf("ack message fail %v", err)
txn.Abort(ctx)
}
// Step 8: commit transactions.
err = txn.Commit(ctx)
if err != nil {
log.Printf("commit txn fail %v", err)
}
}

// Final result: consume messages from output topics and print them.
for i := 0; i < count; i++ {
message, _ := outputConsumerOne.Receive(ctx)
log.Printf("Receive transaction message: %s", string(message.Payload()))
}
for i := 0; i < count; i++ {
message, _ := outputConsumerTwo.Receive(ctx)
log.Printf("Receive transaction message: %s", string(message.Payload()))
}
```

</TabItem>
</Tabs>
````

**Output**

```java
Expand Down
Loading