Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ztable support and convert transformation logic to state machine #1352

Merged
merged 84 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
7f4b2da
Update antlr gramma
akrambek Nov 12, 2024
84eacb3
Refactor scripts for zstream
akrambek Nov 14, 2024
a5f3121
Remove redundant files
akrambek Nov 14, 2024
cfe8ade
Remove redunant files
akrambek Nov 14, 2024
519279b
Fix typo
akrambek Nov 14, 2024
b60ee1e
Replace weather with commands
akrambek Nov 14, 2024
912bd26
Remove redundant exclude
akrambek Nov 14, 2024
1085600
WIP
akrambek Nov 25, 2024
7bc5c5d
Merge branch 'develop' into feature/z-catalog
akrambek Nov 25, 2024
4039029
Revert back the change
akrambek Nov 25, 2024
8a5ace9
Checkpoint zview
akrambek Nov 26, 2024
92930f8
Stable state
akrambek Nov 26, 2024
463d426
WIP
akrambek Nov 26, 2024
bed9c1c
WIP
akrambek Nov 26, 2024
586ea67
WIP
akrambek Nov 26, 2024
ac7758d
WIP
akrambek Nov 26, 2024
8372f5c
WIP
akrambek Nov 26, 2024
3dc8c3a
WIP
akrambek Nov 26, 2024
478be8f
WIP
akrambek Nov 27, 2024
d1b247c
WIP
akrambek Nov 27, 2024
546473c
Checkpoint
akrambek Nov 27, 2024
99b7216
Remove uncessary files
akrambek Nov 27, 2024
f6435a3
WIP
akrambek Nov 27, 2024
4bbc4aa
WIP
akrambek Nov 28, 2024
8971b87
WIP
akrambek Nov 28, 2024
9dee812
WIP
akrambek Nov 28, 2024
cc6c80e
WIP
akrambek Nov 28, 2024
212872d
Change user
akrambek Nov 28, 2024
21e6963
Checkpoint
akrambek Nov 28, 2024
e4377a0
Update the script
akrambek Nov 28, 2024
b052c58
WIP
akrambek Nov 29, 2024
5401488
WIP
akrambek Nov 29, 2024
7fc64be
WIP
akrambek Nov 30, 2024
5a286cc
WIP
akrambek Dec 1, 2024
38f7371
WIP
akrambek Dec 1, 2024
d1270a1
WIP
akrambek Dec 1, 2024
eb0bdbb
WIP
akrambek Dec 2, 2024
1fea51d
WIP
akrambek Dec 2, 2024
e8f5e7e
Checkpoint
akrambek Dec 2, 2024
48cfbf7
Merge branch 'feature/z-catalog' into feature/ztable
akrambek Dec 2, 2024
13fcdb6
WIP
akrambek Dec 2, 2024
7d8c68d
WIP
akrambek Dec 2, 2024
8e199ba
WIP
akrambek Dec 3, 2024
1042c68
WIP
akrambek Dec 4, 2024
2e53b82
WIP
akrambek Dec 4, 2024
7614312
Checkpoint
akrambek Dec 4, 2024
b16a5e8
Checkpoint
akrambek Dec 5, 2024
97314ac
WIP
akrambek Dec 5, 2024
7c5db96
Fix casting
akrambek Dec 5, 2024
ef9077a
WIP
akrambek Dec 6, 2024
d47d74f
Ignore case sensitivity
akrambek Dec 6, 2024
a45c87c
WIP
akrambek Dec 9, 2024
8a9305b
WIP
akrambek Dec 9, 2024
cd63668
Fix remaining issues
akrambek Dec 9, 2024
93126b7
WIP
akrambek Dec 10, 2024
17fdc27
Fix stylecheck errors
akrambek Dec 10, 2024
0bbab15
Merge branch 'feature/z-catalog' into feature/ztable
akrambek Dec 10, 2024
9f606b4
WIP
akrambek Dec 11, 2024
34f93bf
WIP
akrambek Dec 13, 2024
2a4815f
Merge branch 'develop' into feature/ztable
akrambek Dec 13, 2024
49a7546
WIP
akrambek Dec 14, 2024
e32253c
WIP
akrambek Dec 14, 2024
da9a880
WIP all marros
akrambek Dec 16, 2024
e8324c7
WIP remove statements
akrambek Dec 16, 2024
e5532e5
WIP
akrambek Dec 16, 2024
5546db9
WIP
akrambek Dec 16, 2024
c4ad8f5
WIP
akrambek Dec 16, 2024
57fb1d5
WIP
akrambek Dec 16, 2024
e87bc76
WIP
akrambek Dec 16, 2024
2cff032
WIP
akrambek Dec 17, 2024
6f9409b
WIP
akrambek Dec 17, 2024
9928d02
WIP
akrambek Dec 17, 2024
bd9e9c1
Checkpoint
akrambek Dec 17, 2024
e923e86
Merge branch 'develop' into feature/ztable
akrambek Dec 17, 2024
f26c2cc
WIP
akrambek Dec 17, 2024
fa2197a
WIP
akrambek Dec 17, 2024
11f3e09
WIP
akrambek Dec 17, 2024
661a4bf
Catch exception
akrambek Dec 17, 2024
bf4ee6c
Fix tests
akrambek Dec 17, 2024
56e4d34
Fix checkstyle
akrambek Dec 17, 2024
ec564d1
WIP
akrambek Dec 18, 2024
5da2130
WIP
akrambek Dec 18, 2024
469afdd
Terminate transition on error
akrambek Dec 18, 2024
b63aef5
Terminate state on error
akrambek Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ node_modules/
*.pdf
*.zshrc
hs_err_pid*.log
*.interp
*.tokens
gen/
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright 2021-2024 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

---
name: test
catalogs:
catalog0:
type: test
options:
url: http://localhost:8081
id: 1
schema: |-
{
"schema": "{\"fields\":[{\"name\":\"id\",\"type\":[\"null\",{\"type\":\"string\"}]},{\"name\":\"city\",\"type\":[\"null\",{\"type\":\"string\"}]},{\"name\":\"owner_id\",\"type\":[\"null\",{\"type\":\"string\"}],\"zillaType\":\"ZILLA_IDENTITY\"},{\"name\":\"created_at\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"zillaType\":\"ZILLA_TIMESTAMP\"}],\"name\":\"cities\",\"namespace\":\"public\",\"type\":\"record\"}",
"schemaType": "AVRO"
}
bindings:
app0:
type: pgsql-kafka
kind: proxy
catalog:
catalog0:
- strategy: topic
exit: app1
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ connect "zilla://streams/app0"
option zilla:window 8192
option zilla:transmission "duplex"


write zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "app")
Expand All @@ -32,18 +33,19 @@ write zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
write "CREATE TABLE IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
write "CREATE TOPIC IF NOT EXISTS cities ("
"id VARCHAR, "
"city VARCHAR, "
"owner_id VARCHAR GENERATED ALWAYS AS IDENTITY, "
"created_at TIMESTAMP GENERATED ALWAYS AS NOW, "
"PRIMARY KEY (id, city));"
[0x00]

write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("CREATE_TABLE")
.tag("CREATE_TOPIC")
.build()
.build()}

Expand All @@ -54,3 +56,5 @@ read advised zilla:flush ${pgsql:flushEx()
.build()
.build()}

write close
read closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright 2021-2024 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

accept "zilla://streams/app0"
option zilla:window 8192
option zilla:transmission "duplex"

accepted

read zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

read zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
read "CREATE TOPIC IF NOT EXISTS cities ("
"id VARCHAR, "
"city VARCHAR, "
"owner_id VARCHAR GENERATED ALWAYS AS IDENTITY, "
"created_at TIMESTAMP GENERATED ALWAYS AS NOW, "
"PRIMARY KEY (id, city));"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("CREATE_TOPIC")
.build()
.build()}

write notify CREATE_TOPIC_COMPLETED

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}
read closed
write close
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

public record AvroField(
String name,
Object type)
Object type,
Object zillaType)
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.schema;

import java.util.List;
import java.util.Map;

import jakarta.json.Json;
import jakarta.json.JsonArray;
Expand All @@ -25,13 +26,25 @@
import jakarta.json.bind.JsonbBuilder;
import jakarta.json.bind.JsonbConfig;

import org.agrona.collections.Object2ObjectHashMap;

import io.aklivity.zilla.runtime.binding.pgsql.parser.model.AlterExpression;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Operation;

public abstract class PgsqlKafkaAvroSchemaTemplate
{
protected static final String DATABASE_PLACEHOLDER = "{database}";

protected static final String ZILLA_IDENTITY = "GENERATED ALWAYS AS IDENTITY";
protected static final String ZILLA_TIMESTAMP = "GENERATED ALWAYS AS NOW";

protected static final Map<String, String> ZILLA_MAPPINGS = new Object2ObjectHashMap<>();
static
{
ZILLA_MAPPINGS.put(ZILLA_IDENTITY, "ZILLA_IDENTITY");
ZILLA_MAPPINGS.put(ZILLA_TIMESTAMP, "ZILLA_TIMESTAMP");
}

protected Jsonb jsonbFormatted = JsonbBuilder.create(new JsonbConfig().withFormatting(true));
protected Jsonb jsonb = JsonbBuilder.create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public String generate(
final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database);

List<AvroField> fields = command.columns().stream()
.map(column -> new AvroField(column.name(), mapSqlTypeToAvroType(column.type())))
.map(column -> new AvroField(column.name(), mapSqlTypeToAvroType(column.type()), null))
.collect(Collectors.toList());

AvroSchema schema = new AvroSchema("record", command.name(), newNamespace, fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ public String generate(
String columnName = column.name();
String sqlType = column.type();
Object avroType = mapSqlTypeToAvroType(sqlType);
List<String> constraints = column.constraints();

boolean isNullable = !column.constraints().contains("NOT NULL");
boolean isNullable = !constraints.contains("NOT NULL");

String zillaType = ZILLA_MAPPINGS.entrySet().stream()
.filter(e -> constraints.contains(e.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);

return isNullable
? new AvroField(columnName, new Object[]{"null", avroType})
: new AvroField(columnName, avroType);
? new AvroField(columnName, new Object[]{"null", avroType}, zillaType)
: new AvroField(columnName, avroType, zillaType);
})
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ public void shouldCreateTopic() throws Exception
k3po.finish();
}

@Test
@Configuration("proxy.generated.as.yaml")
@Specification({
"${pgsql}/create.topic.with.generated.as/client",
"${kafka}/create.topic/server"
})
public void shouldCreateTopicWithGeneratedAs() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.alter.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.aklivity.zilla.specs.binding.pgsql.internal.types.stream.PgsqlFlushExFW;
import io.aklivity.zilla.specs.binding.pgsql.internal.types.stream.PgsqlFormat;
import io.aklivity.zilla.specs.binding.pgsql.internal.types.stream.PgsqlMessageType;
import io.aklivity.zilla.specs.binding.pgsql.internal.types.stream.PgsqlNoticeFlushExFW;
import io.aklivity.zilla.specs.binding.pgsql.internal.types.stream.PgsqlQueryDataExFW;
import io.aklivity.zilla.specs.binding.pgsql.internal.types.stream.PgsqlReadyFlushExFW;
import io.aklivity.zilla.specs.binding.pgsql.internal.types.stream.PgsqlRowDataExFW;
Expand Down Expand Up @@ -217,6 +218,13 @@ public PgsqlCompletedFlushExBuilder completion()
return new PgsqlCompletedFlushExBuilder();
}

public PgsqlNoticeFlushExBuilder notice()
{
flushExRW.kind(PgsqlMessageType.NOTICE.value());

return new PgsqlNoticeFlushExBuilder();
}

public PgsqlErrorFlushExBuilder error()
{
flushExRW.kind(PgsqlMessageType.ERROR.value());
Expand Down Expand Up @@ -405,6 +413,44 @@ public PgsqlFlushExBuilder build()
}
}

public final class PgsqlNoticeFlushExBuilder
{
private final PgsqlNoticeFlushExFW.Builder pgsqlNoticeFlushExRW = new PgsqlNoticeFlushExFW.Builder();

private PgsqlNoticeFlushExBuilder()
{
pgsqlNoticeFlushExRW.wrap(writeBuffer, PgsqlFlushExFW.FIELD_OFFSET_TYPE, writeBuffer.capacity());
}

public PgsqlNoticeFlushExBuilder severity(
String severity)
{
pgsqlNoticeFlushExRW.severity(String.format("S%s\u0000", severity));
return this;
}

public PgsqlNoticeFlushExBuilder code(
String code)
{
pgsqlNoticeFlushExRW.code(String.format("C%s\u0000", code));
return this;
}

public PgsqlNoticeFlushExBuilder message(
String message)
{
pgsqlNoticeFlushExRW.message(String.format("M%s\u0000", message));
return this;
}

public PgsqlFlushExBuilder build()
{
final PgsqlNoticeFlushExFW pgsqlNoticeFlushEx = pgsqlNoticeFlushExRW.build();
flushExRO.wrap(writeBuffer, 0, pgsqlNoticeFlushEx.limit());
return PgsqlFlushExBuilder.this;
}
}

public final class PgsqlCancelRequestFlushExBuilder
{
private final PgsqlCancelRequestFlushExFW.Builder pgsqlCancelREquestFlushExRW =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ scope pgsql
ROW (68),
TYPE (84),
COMPLETION (67),
NOTICE (78),
ERROR (69),
CANCEL_REQUEST (255),
READY (90)
Expand Down Expand Up @@ -60,6 +61,7 @@ scope pgsql
case 84: pgsql::stream::PgsqlTypeFlushEx type;
case 67: pgsql::stream::PgsqlCompletedFlushEx completion;
case 69: pgsql::stream::PgsqlErrorFlushEx error;
case 78: pgsql::stream::PgsqlNoticeFlushEx notice;
case 90: pgsql::stream::PgsqlReadyFlushEx ready;
}

Expand Down Expand Up @@ -103,6 +105,13 @@ scope pgsql
string16 message;
}

struct PgsqlNoticeFlushEx
{
string16 severity;
string16 code;
string16 message;
}

enum PgsqlStatus (uint8)
{
IDLE(73),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Copyright 2021-2024 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

connect "zilla://streams/app0"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "CREATE VIEW IF NOT EXISTS cities_view AS SELECT name, created_at::timestamp FROM cities;"
[0x00]
write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.notice()
.severity("NOTICE")
.code("00000")
.message("Your session timezone is UTC.")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("CREATE_VIEW")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

read closed
write close
Loading
Loading