Skip to content

Commit

Permalink
Support DROP TABLE, STREAM, and MATERIALIZED VIEW (#1266)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek authored Oct 24, 2024
1 parent b23e3f0 commit fd013ef
Show file tree
Hide file tree
Showing 29 changed files with 1,544 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public PgsqlParser()
this.lexer = new PostgreSqlLexer(null);
this.tokens = new CommonTokenStream(lexer);
this.parser = new PostgreSqlParser(tokens);
this.commandListener = new SqlCommandListener();
this.commandListener = new SqlCommandListener(tokens);
this.createTableListener = new SqlCreateTableTopicListener(tokens);
this.createStreamListener = new SqlCreateStreamListener(tokens);
this.createFunctionListener = new SqlCreateFunctionListener(tokens);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,23 @@
*/
package io.aklivity.zilla.runtime.binding.pgsql.parser.listener;

import org.antlr.v4.runtime.TokenStream;

import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParser;
import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParserBaseListener;

public class SqlCommandListener extends PostgreSqlParserBaseListener
{
private String command = "";

private final TokenStream tokens;

public SqlCommandListener(
TokenStream tokens)
{
this.tokens = tokens;
}

public String command()
{
return command;
Expand Down Expand Up @@ -70,6 +80,6 @@ public void enterCreatefunctionstmt(
public void enterDropstmt(
PostgreSqlParser.DropstmtContext ctx)
{
command = "DROP %s".formatted(ctx.object_type_any_name().getText());
command = "DROP %s".formatted(tokens.getText(ctx.object_type_any_name()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ bindings:
when:
- commands:
- "CREATE TOPIC"
- "DROP TOPIC"
exit: app1
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
"items":
{
"type": "string",
"enum": [ "CREATE TOPIC" ]
"enum": [ "CREATE TOPIC", "DROP TOPIC" ]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

connect "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "DROP MATERIALIZED VIEW weather;"
[0x00]
write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("DROP_MATERIALIZED_VIEW")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "DROP SINK weather_sink;"
[0x00]
write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("DROP_SINK")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

connect "zilla://streams/app2"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "DROP TOPIC weather;"
[0x00]
write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("DROP_TOPIC")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

accept "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "duplex"

accepted

read zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

read zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
read "DROP MATERIALIZED VIEW weather;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("DROP_MATERIALIZED_VIEW")
.build()
.build()}

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

read zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
read "DROP SINK weather_sink;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("DROP_SINK")
.build()
.build()}

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

accept "zilla://streams/app2"
option zilla:window 8192
option zilla:transmission "duplex"

accepted

read zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

read zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
read "DROP TOPIC weather;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("DROP_TOPIC")
.build()
.build()}


write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

connect "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "DROP SOURCE weather;"
[0x00]
write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("DROP_SOURCE")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

connect "zilla://streams/app2"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${pgsql:beginEx()
.typeId(zilla:id("pgsql"))
.parameter("user", "root")
.parameter("database", "dev")
.parameter("application_name", "psql")
.parameter("client_encoding", "UTF8")
.build()}

connected

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "DROP TOPIC weather;"
[0x00]

write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("DROP_TOPIC")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

Loading

0 comments on commit fd013ef

Please sign in to comment.