Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Dec 17, 2024
1 parent f26c2cc commit fa2197a
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,9 @@ public RisingwaveAlterStreamMacro(
this.fieldBuilder = new StringBuilder();
}

public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
AlterTopicState state = new AlterTopicState();
state.onStarted(traceId, authorization);

return state;
return new AlterTopicState();
}

private final class AlterTopicState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,9 @@ public RisingwaveAlterZtableMacro(
this.fieldBuilder = new StringBuilder();
}

public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
AlterTopicState state = new AlterTopicState();
state.onStarted(traceId, authorization);

return state;
return new AlterTopicState();
}

private final class AlterTopicState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,9 @@ else if (udf.language.equalsIgnoreCase("python"))
}


public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
CreateFunctionState state = new CreateFunctionState();
state.onStarted(traceId, authorization);

return state;
return new CreateFunctionState();
}

private final class CreateFunctionState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,9 @@ public RisingwaveCreateStreamMacro(
this.schemaRegistry = schemaRegistry;
}

public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
CreateTopicState state = new CreateTopicState();
state.onStarted(traceId, authorization);

return state;
return new CreateTopicState();
}

private final class CreateTopicState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ public void onStarted(

String newSql = sql.replace(ZTABLE_NAME, TABLE_NAME)
.replace("\u0000", "");
newSql = newSql.replaceAll("'", "''");
String sqlQuery = String.format(sqlFormat, systemSchema, ZTABLE_NAME, name, newSql);

handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,9 @@ public RisingwaveCreateZviewMacro(
this.columns = new Object2ObjectHashMap<>();
}

public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
CreateMaterializedViewState state = new CreateMaterializedViewState();
state.onStarted(traceId, authorization);

return state;
return new CreateMaterializedViewState();
}

private final class CreateMaterializedViewState implements RisingwaveMacroState
Expand Down Expand Up @@ -401,6 +396,7 @@ public void onStarted(

String newSql = sql.replace(ZVIEW_NAME, MATERIALIZED_VIEW_NAME)
.replace("\u0000", "");
newSql = newSql.replaceAll("'", "''");
String sqlQuery = String.format(sqlFormat, systemSchema, ZVIEW_NAME, name, newSql);

handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,9 @@ public RisingwaveDropStreamMacro(
this.handler = handler;
}

public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
DropTopicState state = new DropTopicState();
state.onStarted(traceId, authorization);

return state;
return new DropTopicState();
}

private final class DropTopicState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,9 @@ public RisingwaveDropZtableMacro(
this.handler = handler;
}

public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
DropTopicState state = new DropTopicState();
state.onStarted(traceId, authorization);

return state;
return new DropTopicState();
}

private final class DropTopicState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,9 @@ public RisingwaveDropZviewMacro(
this.handler = handler;
}

public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
DropTopicState state = new DropTopicState();
state.onStarted(traceId, authorization);

return state;
return new DropTopicState();
}

private final class DropTopicState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,9 @@ public RisingwaveShowCommandMacro(
}


public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
ShowCommandState state = new ShowCommandState();
state.onStarted(traceId, authorization);

return state;
return new ShowCommandState();
}

private final class ShowCommandState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,9 @@ public RisingwaveUnknownMacro(
}


public RisingwaveMacroState start(
long traceId,
long authorization)
public RisingwaveMacroState start()
{
UnknownState state = new UnknownState();
state.onStarted(traceId, authorization);

return state;
return new UnknownState();
}

private final class UnknownState implements RisingwaveMacroState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1689,8 +1689,10 @@ private void decodeCreateStreamCommand(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}

private void decodeCreateZviewCommand(
Expand All @@ -1713,8 +1715,10 @@ private void decodeCreateZviewCommand(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}

private void decodeCreateFunctionCommand(
Expand All @@ -1736,8 +1740,10 @@ private void decodeCreateFunctionCommand(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}

private void decodeAlterZtableCommand(
Expand All @@ -1756,13 +1762,18 @@ private void decodeAlterZtableCommand(
decodeUnsupportedCommand(server, traceId, authorization, RisingwaveCompletionCommand.ALTER_ZTABLE_COMMAND,
statement, "ALTER ZTABLE only supports ADD");
}
else if (server.macroState == null)
else
{
RisingwaveAlterZtableMacro machine = new RisingwaveAlterZtableMacro(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
if (server.macroState == null)
{
RisingwaveAlterZtableMacro machine = new RisingwaveAlterZtableMacro(
statement,
command,
server.macroHandler);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}
}

Expand All @@ -1782,13 +1793,18 @@ private void decodeAlterStreamCommand(
decodeUnsupportedCommand(server, traceId, authorization, RisingwaveCompletionCommand.ALTER_STREAM_COMMAND,
statement, "ALTER STREAM only supports ADD");
}
else if (server.macroState == null)
else
{
RisingwaveAlterStreamMacro machine = new RisingwaveAlterStreamMacro(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
if (server.macroState == null)
{
RisingwaveAlterStreamMacro machine = new RisingwaveAlterStreamMacro(
statement,
command,
server.macroHandler);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}
}

Expand Down Expand Up @@ -1823,8 +1839,10 @@ private void decodeDropZtableCommand(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}

private void decodeDropStreamCommand(
Expand All @@ -1843,8 +1861,10 @@ private void decodeDropStreamCommand(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}

private void decodeDropZviewCommand(
Expand All @@ -1863,8 +1883,10 @@ private void decodeDropZviewCommand(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}

private void decodeShowCommand(
Expand All @@ -1881,8 +1903,10 @@ private void decodeShowCommand(
statement,
command,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}

private void decodeUnknownCommand(
Expand All @@ -1896,8 +1920,10 @@ private void decodeUnknownCommand(
RisingwaveUnknownMacro machine = new RisingwaveUnknownMacro(
statement,
server.macroHandler);
server.macroState = machine.start(traceId, authorization);
server.macroState = machine.start();
}

server.macroState.onStarted(traceId, authorization);
}

public List<String> splitStatements(
Expand Down

0 comments on commit fa2197a

Please sign in to comment.