Skip to content

Commit

Permalink
Merge branch 'release/0.9.110'
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows committed Dec 13, 2024
2 parents 413f4c8 + 0942cc0 commit 2916401
Show file tree
Hide file tree
Showing 259 changed files with 2,679 additions and 741 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## [Unreleased](https://github.com/aklivity/zilla/tree/HEAD)

[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.109...HEAD)

**Merged pull requests:**

- Support ZVIEW command [\#1329](https://github.com/aklivity/zilla/pull/1329) ([akrambek](https://github.com/akrambek))

## [0.9.109](https://github.com/aklivity/zilla/tree/0.9.109) (2024-12-13)

[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.108...0.9.109)

## [0.9.108](https://github.com/aklivity/zilla/tree/0.9.108) (2024-12-12)

[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.107...0.9.108)
Expand Down
2 changes: 1 addition & 1 deletion build/flyweight-maven-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>build</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion build/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
8 changes: 7 additions & 1 deletion cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>cloud</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -411,6 +411,9 @@
<platform>linux/amd64</platform>
<platform>linux/arm64</platform>
</platforms>
<attestations>
<provenance>false</provenance>
</attestations>
</buildx>
<assemblies>
<assembly>
Expand Down Expand Up @@ -462,6 +465,9 @@
<platforms>
<platform>linux/amd64</platform>
</platforms>
<attestations>
<provenance>false</provenance>
</attestations>
</buildx>
<assemblies>
<assembly>
Expand Down
2 changes: 1 addition & 1 deletion cloud/helm-chart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>cloud</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion conf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-amqp.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql-kafka.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ catalogs:
id: 1
schema: |-
{
"schema": "{\"fields\":[{\"name\":\"id\",\"type\":[\"null\",{\"type\":\"string\"}]},{\"name\":\"city\",\"type\":[\"null\",{\"type\":\"string\"}]}],\"name\":\"cities\",\"namespace\":\"dev\",\"type\":\"record\"}",
"schema": "{\"fields\":[{\"name\":\"id\",\"type\":[\"null\",{\"type\":\"string\"}]},{\"name\":\"city\",\"type\":[\"null\",{\"type\":\"string\"}]}],\"name\":\"cities\",\"namespace\":\"public\",\"type\":\"record\"}",
"schemaType": "AVRO"
}
bindings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ write zilla:begin.ext ${kafka:beginEx()
.request()
.createTopics()
.topic()
.name("dev.cities")
.name("public.cities")
.partitionCount(1)
.replicas(1)
.config("cleanup.policy", "delete")
Expand All @@ -40,7 +40,7 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.createTopics()
.throttle(0)
.topic()
.name("dev.cities")
.name("public.cities")
.error(0)
.build()
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.request()
.createTopics()
.topic()
.name("dev.cities")
.name("public.cities")
.partitionCount(1)
.replicas(1)
.config("cleanup.policy", "delete")
Expand All @@ -44,7 +44,7 @@ write zilla:begin.ext ${kafka:beginEx()
.createTopics()
.throttle(0)
.topic()
.name("dev.cities")
.name("public.cities")
.error(0)
.build()
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ write zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.request()
.deleteTopics()
.topic("dev.cities")
.topic("public.cities")
.timeout(30000)
.build()
.build()}
Expand All @@ -34,7 +34,7 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.deleteTopics()
.throttle(0)
.topic()
.name("dev.cities")
.name("public.cities")
.error(0)
.build()
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.typeId(zilla:id("kafka"))
.request()
.deleteTopics()
.topic("dev.cities")
.topic("public.cities")
.timeout(30000)
.build()
.build()}
Expand All @@ -38,7 +38,7 @@ write zilla:begin.ext ${kafka:beginEx()
.deleteTopics()
.throttle(0)
.topic()
.name("dev.cities")
.name("public.cities")
.error(0)
.build()
.build()
Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.List;
import java.util.stream.Collectors;

import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Table;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable;

public class PgsqlKafkaKeyAvroSchemaTemplate extends PgsqlKafkaAvroSchemaTemplate
{
Expand All @@ -31,15 +31,15 @@ public PgsqlKafkaKeyAvroSchemaTemplate(

public String generate(
String database,
Table table)
CreateTable command)
{
final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database);

List<AvroField> fields = table.columns().stream()
List<AvroField> fields = command.columns().stream()
.map(column -> new AvroField(column.name(), mapSqlTypeToAvroType(column.type())))
.collect(Collectors.toList());

AvroSchema schema = new AvroSchema("record", table.name(), newNamespace, fields);
AvroSchema schema = new AvroSchema("record", command.name(), newNamespace, fields);
AvroPayload payload = new AvroPayload("AVRO", jsonb.toJson(schema));

return jsonbFormatted.toJson(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import jakarta.json.JsonValue;

import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Alter;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Table;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable;

public class PgsqlKafkaValueAvroSchemaTemplate extends PgsqlKafkaAvroSchemaTemplate
{
Expand All @@ -39,12 +39,11 @@ public PgsqlKafkaValueAvroSchemaTemplate(
}

public String generate(
String database,
Table table)
CreateTable command)
{
final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database);
final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, command.schema());

List<AvroField> fields = table.columns().stream()
List<AvroField> fields = command.columns().stream()
.map(column ->
{
String columnName = column.name();
Expand All @@ -59,7 +58,7 @@ public String generate(
})
.collect(Collectors.toList());

AvroSchema schema = new AvroSchema("record", table.name(), newNamespace, fields);
AvroSchema schema = new AvroSchema("record", command.name(), newNamespace, fields);
AvroPayload payload = new AvroPayload("AVRO", jsonb.toJson(schema));

return jsonbFormatted.toJson(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.WindowFW;
import io.aklivity.zilla.runtime.binding.pgsql.parser.PgsqlParser;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Alter;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Table;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Drop;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.BindingHandler;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
Expand Down Expand Up @@ -1335,11 +1336,12 @@ private void decodeCreateTopicCommand(
}
else if (server.commandsProcessed == 0)
{
final Table createTopic = parser.parseCreateTable(statement);
final CreateTable createTopic = parser.parseCreateTable(statement);
final String schema = createTopic.schema();
final String topic = createTopic.name();

topics.clear();
topics.add(String.format("%s.%s", server.database, topic));
topics.add(String.format("%s.%s", schema, topic));

final PgsqlKafkaBindingConfig binding = server.binding;

Expand All @@ -1348,7 +1350,7 @@ else if (server.commandsProcessed == 0)
if (!primaryKeys.isEmpty())
{
//TODO: assign versionId to avoid test failure
final String subjectKey = String.format("%s.%s-key", server.database, topic);
final String subjectKey = String.format("%s.%s-key", schema, topic);

String keySchema = primaryKeys.size() > 1
? binding.avroKeySchema.generate(server.database, createTopic)
Expand All @@ -1358,8 +1360,8 @@ else if (server.commandsProcessed == 0)

if (versionId != NO_VERSION_ID)
{
final String subjectValue = String.format("%s.%s-value", server.database, topic);
final String schemaValue = binding.avroValueSchema.generate(server.database, createTopic);
final String subjectValue = String.format("%s.%s-value", schema, topic);
final String schemaValue = binding.avroValueSchema.generate(createTopic);
versionId = binding.catalog.register(subjectValue, schemaValue);
}

Expand All @@ -1386,15 +1388,16 @@ private void decodeAlterTopicCommand(
String statement)
{
final Alter alter = parser.parseAlterTable(statement);
final String schema = alter.schema();
final String topic = alter.name();

topics.clear();
topics.add(String.format("%s.%s", server.database, topic));
topics.add(String.format("%s.%s", schema, topic));

final PgsqlKafkaBindingConfig binding = server.binding;
final CatalogHandler catalog = binding.catalog;

final String subjectValue = String.format("%s.%s-value", server.database, topic);
final String subjectValue = String.format("%s.%s-value", schema, topic);
final int schemaId = catalog.resolve(subjectValue, "latest");
final String existingSchemaJson = catalog.resolve(schemaId);
final String schemaValue = binding.avroValueSchema.generate(existingSchemaJson, alter);
Expand Down Expand Up @@ -1428,20 +1431,21 @@ private void decodeDropTopicCommand(
}
else if (server.commandsProcessed == 0)
{
List<String> drops = parser.parseDrop(statement);
List<Drop> drops = parser.parseDrop(statement);
drops.stream()
.findFirst()
.ifPresent(d ->
{
final PgsqlKafkaBindingConfig binding = server.binding;
final String subjectKey = String.format("%s.%s-key", server.database, d);
final String subjectValue = String.format("%s.%s-value", server.database, d);
final String subjectKey = String.format("%s.%s-key", d.schema(), d.name());
final String subjectValue = String.format("%s.%s-value", d.schema(), d.name());

binding.catalog.unregister(subjectKey);
binding.catalog.unregister(subjectValue);

final KafkaDeleteTopicsProxy deleteTopicsProxy = server.deleteTopicsProxy;
deleteTopicsProxy.doKafkaBegin(traceId, authorization, List.of("%s.%s".formatted(server.database, d)));
deleteTopicsProxy.doKafkaBegin(
traceId, authorization, List.of("%s.%s".formatted(d.schema(), d.name())));
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.109</version>
<version>0.9.110</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ read zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
read [0x00 0x03] # Field Count
[0x00 0x00 0x00 0x04] # Length
[0x31 0x39 0x36 0x34] # Data
[0x00 0x00 0x00 0x0a] # Length
[0x31 0x37 0x32 0x33 0x35 0x39 0x33 0x31 0x31 0x33] # Data
[0x00 0x00 0x00 0x04] # Length
[0x5c 0x78 0x33 0x31] # Data
[0x00 0x00 0x00 0x04] # Length
[0x31 0x39 0x36 0x34] # Data
[0x00 0x00 0x00 0x0a] # Length
[0x31 0x37 0x32 0x33 0x35 0x39 0x33 0x31 0x31 0x33] # Data
[0x00 0x00 0x00 0x04] # Length
[0x5c 0x78 0x33 0x31] # Data


read advised zilla:flush ${pgsql:flushEx()
Expand Down
Loading

0 comments on commit 2916401

Please sign in to comment.