Skip to content

Commit

Permalink
feat: change JVM API for pubsub (#3549)
Browse files Browse the repository at this point in the history
The main changes are:

- No generated annotations, instead `ConsumeableTopic` classes will be
generated
- Subscriptions for local and remote topics look the same
- This results in a more consistent and user friendly API
  • Loading branch information
stuartwdouglas authored Nov 28, 2024
1 parent 0df2353 commit 0b09218
Show file tree
Hide file tree
Showing 26 changed files with 320 additions and 282 deletions.
5 changes: 3 additions & 2 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ import (
"testing"
"time"

"github.com/alecthomas/assert/v2"

"github.com/TBD54566975/ftl/backend/controller/async"
in "github.com/TBD54566975/ftl/internal/integration"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/alecthomas/assert/v2"
)

func TestPubSub(t *testing.T) {
calls := 20
events := calls * 10
in.Run(t,
in.WithLanguages("java", "go"),
in.WithLanguages("java", "go", "kotlin"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,26 @@
import xyz.block.ftl.Export;
import xyz.block.ftl.FromOffset;
import xyz.block.ftl.Subscription;
import xyz.block.ftl.SubscriptionOptions;
import xyz.block.ftl.Topic;
import xyz.block.ftl.TopicDefinition;
import xyz.block.ftl.Verb;
import xyz.block.ftl.WriteableTopic;

public class Publisher {

@Export
@TopicDefinition("testTopic")
interface TestTopic extends Topic<PubSubEvent> {
@Topic("testTopic")
interface TestTopic extends WriteableTopic<PubSubEvent> {

}

@TopicDefinition("localTopic")
interface LocalTopic extends Topic<PubSubEvent> {
@Topic("localTopic")
interface LocalTopic extends WriteableTopic<PubSubEvent> {

}

@Export
@TopicDefinition("topic2")
interface Topic2 extends Topic<PubSubEvent> {
@Topic("topic2")
interface Topic2 extends WriteableTopic<PubSubEvent> {

}

Expand All @@ -51,8 +50,7 @@ void publishOneToTopic2(Topic2 topic2) throws Exception {
topic2.publish(new PubSubEvent().setTime(t));
}

@Subscription(topicClass = LocalTopic.class)
@SubscriptionOptions(from = FromOffset.LATEST)
@Subscription(topic = LocalTopic.class, from = FromOffset.LATEST)
public void local(TestTopic testTopic, PubSubEvent event) {
testTopic.publish(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,31 @@

import ftl.builtin.CatchRequest;
import ftl.publisher.PubSubEvent;
import ftl.publisher.TestTopicSubscription;
import ftl.publisher.TestTopicTopic;
import ftl.publisher.Topic2Topic;
import io.quarkus.logging.Log;
import xyz.block.ftl.FromOffset;
import xyz.block.ftl.Retry;
import xyz.block.ftl.Subscription;
import xyz.block.ftl.SubscriptionOptions;
import xyz.block.ftl.Verb;
import xyz.block.ftl.VerbName;

public class Subscriber {

private static final AtomicInteger catchCount = new AtomicInteger();

@TestTopicSubscription
@SubscriptionOptions(from = FromOffset.BEGINNING)
@Subscription(topic = TestTopicTopic.class, from = FromOffset.BEGINNING)
void consume(PubSubEvent event) throws Exception {
Log.infof("Subscriber is consuming %s", event.getTime());
}

@Subscription(topic = "topic2", module = "publisher")
@SubscriptionOptions(from = FromOffset.BEGINNING)
@Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING)
@Retry(count = 2, minBackoff = "1s", maxBackoff = "1s", catchVerb = "catch")
public void consumeButFailAndRetry(PubSubEvent event) {
throw new RuntimeException("always error: event " + event.getTime());
}

@Subscription(topic = "topic2", module = "publisher")
@SubscriptionOptions(from = FromOffset.BEGINNING)
@Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING)
@Retry(count = 1, minBackoff = "1s", maxBackoff = "1s", catchVerb = "catchAny")
public void consumeButFailAndCatchAny(PubSubEvent event) {
throw new RuntimeException("always error: event " + event.getTime());
Expand Down
2 changes: 2 additions & 0 deletions backend/controller/pubsub/testdata/kotlin/publisher/ftl.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
module = "publisher"
language = "java"
14 changes: 14 additions & 0 deletions backend/controller/pubsub/testdata/kotlin/publisher/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>xyz.block.ftl.examples</groupId>
<artifactId>publisher</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>xyz.block.ftl</groupId>
<artifactId>ftl-build-parent-kotlin</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package xyz.block.ftl.java.test.publisher

import java.time.ZonedDateTime
class PubSubEvent(var time: ZonedDateTime) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package xyz.block.ftl.java.test.publisher

import io.quarkus.logging.Log
import xyz.block.ftl.*
import java.time.ZonedDateTime

class Publisher {
@Export
@Topic("testTopic")
interface TestTopic : WriteableTopic<PubSubEvent?>

@Topic("localTopic")
interface LocalTopic : WriteableTopic<PubSubEvent?>

@Export
@Topic("topic2")
interface Topic2 : WriteableTopic<PubSubEvent?>

@Verb
@Throws(Exception::class)
fun publishTen(testTopic: LocalTopic) {
for (i in 0..9) {
val t = ZonedDateTime.now()
Log.infof("Publishing %s", t)
testTopic.publish(PubSubEvent(t))
}
}

@Verb
@Throws(Exception::class)
fun publishOne(testTopic: TestTopic) {
val t = ZonedDateTime.now()
Log.infof("Publishing %s", t)
testTopic.publish(PubSubEvent(t))
}

@Verb
@Throws(Exception::class)
fun publishOneToTopic2(topic2: Topic2) {
val t = ZonedDateTime.now()
Log.infof("Publishing %s", t)
topic2.publish(PubSubEvent(t))
}

@Subscription(topic = LocalTopic::class, from = FromOffset.LATEST)
fun local(testTopic: TestTopic, event: PubSubEvent) {
testTopic.publish(event)
}
}
2 changes: 2 additions & 0 deletions backend/controller/pubsub/testdata/kotlin/subscriber/ftl.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
module = "subscriber"
language = "java"
14 changes: 14 additions & 0 deletions backend/controller/pubsub/testdata/kotlin/subscriber/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>xyz.block.ftl.examples</groupId>
<artifactId>subscriber</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>xyz.block.ftl</groupId>
<artifactId>ftl-build-parent-kotlin</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package xyz.block.ftl.java.test.subscriber

import ftl.builtin.CatchRequest
import ftl.publisher.PubSubEvent
import ftl.publisher.TestTopicTopic
import ftl.publisher.Topic2Topic
import io.quarkus.logging.Log
import xyz.block.ftl.*
import java.util.concurrent.atomic.AtomicInteger

class Subscriber {
@Subscription(topic = TestTopicTopic::class, from = FromOffset.BEGINNING)
@Throws(
Exception::class
)
fun consume(event: PubSubEvent) {
Log.infof("Subscriber is consuming %s", event.time)
}

@Subscription(topic = Topic2Topic::class, from = FromOffset.BEGINNING)
@Retry(count = 2, minBackoff = "1s", maxBackoff = "1s", catchVerb = "catch")
fun consumeButFailAndRetry(event: PubSubEvent) {
throw RuntimeException("always error: event " + event.time)
}

@Subscription(topic = Topic2Topic::class, from = FromOffset.BEGINNING)
@Retry(count = 1, minBackoff = "1s", maxBackoff = "1s", catchVerb = "catchAny")
fun consumeButFailAndCatchAny(event: PubSubEvent) {
throw RuntimeException("always error: event " + event.time)
}

@Verb
@VerbName("catch")
fun catchVerb(req: CatchRequest<PubSubEvent?>) {
if (!req.error.contains("always error: event")) {
throw RuntimeException("unexpected error: " + req.error)
}
if (catchCount.incrementAndGet() == 1) {
throw RuntimeException("catching error")
}
}

@Verb
fun catchAny(req: CatchRequest<Any>) {
require("subscriber" == req.verb.module) { String.format("unexpected verb module: %s", req.verb.module) }
require("consumeButFailAndCatchAny" == req.verb.name) {
String.format(
"unexpected verb name: %s",
req.verb.name
)
}
require("publisher.PubSubEvent" == req.requestType) {
String.format(
"unexpected request type: %s",
req.requestType
)
}
require(req.request is Map<*, *>) {
String.format(
"expected request to be a Map: %s",
req.request.javaClass.name
)
}
val request = req.request as Map<*, *>
val time = request["time"]
requireNotNull(time) { "expected request to have a time key" }
require(time is String) { "expected request to have a time value of type string" }
}

companion object {
private val catchCount = AtomicInteger()
}
}
66 changes: 16 additions & 50 deletions docs/content/docs/reference/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ First, declare a new topic :

```kotlin
@Export
@TopicDefinition("invoices")
internal interface InvoiceTopic : Topic<Invoice>
@Topic("invoices")
internal interface InvoiceTopic : WriteableTopic<Invoice>
```

Events can be published to a topic by injecting it into an `@Verb` method:
Expand All @@ -60,46 +60,31 @@ fun publishInvoice(request: InvoiceRequest, topic: InvoiceTopic) {
}
```

There are two ways to subscribe to a topic. The first is to declare a method with the `@Subscription` annotation, this is generally used when
subscribing to a topic inside the same module:
To subscribe to a topic use the `@Subscription` annotation, referencing the topic class and providing a method to consume the event:

```kotlin
@Subscription(topic = "invoices", name = "invoicesSubscription")
@SubscriptionOptions(from = FromOffset.LATEST)
@Subscription(topic = InvoiceTopic::class, from = FromOffset.LATEST)
fun consumeInvoice(event: Invoice) {
// ...
}
```

This is ok, but it requires the use of string constants for the topic name, which can be error-prone. If you are subscribing to a topic from
another module, FTL will generate a type-safe subscription meta annotation you can use to subscribe to the topic:
If you are subscribing to a topic from another module, FTL will generate a topic class for you so you can subscribe to it. This generated
topic cannot be published to, only subscribed to:

```kotlin
@Subscription(topic = "invoices", module = "publisher", name = "invoicesSubscription")
@SubscriptionOptions(from = FromOffset.LATEST)
annotation class InvoicesSubscription
@Topic(name="invoices", module="publisher")
internal interface InvoiceTopic : ConsumableTopic<Invoice>
```

This annotation can then be used to subscribe to the topic:

```kotlin
@InvoicesSubscription
@SubscriptionOptions(from = FromOffset.LATEST)
fun consumeInvoice(event: Invoice) {
// ...
}
```

Note that if you want multiple subscriptions or control over the subscription name you will need to use the `@Subscription` annotation.

<!-- java -->

First, declare a new topic:

```java
@Export
@TopicDefinition("invoices")
interface InvoiceTopic extends Topic<Invoice> {}
@Topic("invoices")
interface InvoiceTopic extends WriteableTopc<Invoice> {}
```

Events can be published to a topic by injecting it into an `@Verb` method:
Expand All @@ -111,42 +96,23 @@ void publishInvoice(InvoiceRequest request, InvoiceTopic topic) throws Exception
}
```

There are two ways to subscribe to a topic. The first is to declare a method with the `@Subscription` annotation, this is generally used when
subscribing to a topic inside the same module:
To subscribe to a topic use the `@Subscription` annotation, referencing the topic class and providing a method to consume the event:

```java
@Subscription(topicClass = InvoiceTopic.class, name = "invoicesSubscription")
@SubscriptionOptions(from = FromOffset.LATEST)
@Subscription(topic = InvoiceTopic.class, from = FromOffset.LATEST)
public void consumeInvoice(Invoice event) {
// ...
}
```

If you are subscribing to a topic from another module, FTL will generate a type-safe subscription meta annotation you can use to subscribe to the topic:
If you are subscribing to a topic from another module, FTL will generate a topic class for you so you can subscribe to it. This generated
topic cannot be published to, only subscribed to:

```java
@Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
@Subscription(
topic = "invoices",
module = "publisher",
name = "invoicesSubscription"
)
public @interface InvoicesSubscription {
}
@Topic(name="invoices", module="publisher")
interface InvoiceTopic extends ConsumableTopic<Invoice> {}
```

This annotation can then be used to subscribe to the topic:

```java
@InvoicesSubscription
@SubscriptionOptions(from = FromOffset.LATEST)
public void consumeInvoice(Invoice event) {
// ...
}
```

Note that if you want multiple subscriptions or control over the subscription name you will need to use the `@Subscription` annotation.

{% end %}
> **NOTE!**
> PubSub topics cannot be published to from outside the module that declared them, they can only be subscribed to. That is, if a topic is declared in module `A`, module `B` cannot publish to it.
Loading

0 comments on commit 0b09218

Please sign in to comment.