Skip to content

Commit

Permalink
feat: storage now have a namespace parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Nov 21, 2024
1 parent 0b4037c commit 0b11d2a
Show file tree
Hide file tree
Showing 15 changed files with 50 additions and 53 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=0.20.0-SNAPSHOT
kestraVersion=[0.18,)
kestraVersion=[0.20,)
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void updateBatch() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

BulkInsert taskUpdate = BulkInsert.builder()
.from(uri.toString())
Expand Down Expand Up @@ -161,7 +161,7 @@ public void noSqlForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

BulkInsert task = BulkInsert.builder()
.url(getUrl())
Expand Down Expand Up @@ -189,7 +189,7 @@ public void noSqlWithNamedColumnsForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

BulkInsert task = BulkInsert.builder()
.url(getUrl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import jakarta.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
Expand All @@ -18,13 +16,8 @@

import java.io.File;
import java.io.FileInputStream;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -58,10 +51,10 @@ void multiSelect() throws Exception {
.sql("""
CREATE TABLE employee (id INTEGER PRIMARY KEY, name VARCHAR, age INTEGER);
CREATE TABLE laptop (id INTEGER PRIMARY KEY, brand VARCHAR, model VARCHAR);
INSERT INTO employee(id, name, age) VALUES (1, 'John', 25), (2, 'Bryan', 35);
INSERT INTO laptop(id, brand, model) VALUES (1, 'Apple', 'MacBook M3 16'), (2, 'LG', 'Gram');
SELECT * FROM employee where age > :age;
SELECT * FROM laptop;
""")
Expand Down Expand Up @@ -90,10 +83,10 @@ void multiSelectFromExistingFileInUrl() throws Exception {
.sql("""
CREATE TABLE employee (id INTEGER PRIMARY KEY, name VARCHAR, age INTEGER);
CREATE TABLE laptop (id INTEGER PRIMARY KEY, brand VARCHAR, model VARCHAR);
INSERT INTO employee(id, name, age) VALUES (1, 'John', 25), (2, 'Bryan', 35);
INSERT INTO laptop(id, brand, model) VALUES (1, 'Apple', 'MacBook M3 16'), (2, 'LG', 'Gram');
SELECT * FROM employee where age > :age;
SELECT * FROM laptop;
""")
Expand All @@ -120,6 +113,7 @@ static Stream<String> nullOrFilledDuckDbUrl() {
@MethodSource("nullOrFilledDuckDbUrl") // six numbers
void inputOutputFiles(String url) throws Exception {
URI source = storageInterface.put(
null,
null,
new URI("/" + IdUtils.create()),
new FileInputStream(new File(Objects.requireNonNull(DuckDbQueriesTest.class.getClassLoader()
Expand Down Expand Up @@ -156,7 +150,7 @@ void inputOutputFiles(String url) throws Exception {
assertThat("Query name", runOutput.getOutputs().getLast().getRow().get("name"), is("Ailane"));

assertThat(
IOUtils.toString(storageInterface.get(null, runOutput.getOutputFiles().get("out")), Charsets.UTF_8),
IOUtils.toString(storageInterface.get(null, null, runOutput.getOutputFiles().get("out")), Charsets.UTF_8),
is( "id,name\n" +
"4814976,Viva\n" +
"1010871,Voomm\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ static Stream<String> nullOrFilledDuckDbUrl() {
@MethodSource("nullOrFilledDuckDbUrl") // six numbers
void inputOutputFiles(String url) throws Exception {
URI source = storageInterface.put(
null,
null,
new URI("/" + IdUtils.create()),
new FileInputStream(new File(Objects.requireNonNull(DuckDbTest.class.getClassLoader()
Expand Down Expand Up @@ -286,7 +287,7 @@ void inputOutputFiles(String url) throws Exception {
Query.Output runOutput = task.run(runContext);

assertThat(
IOUtils.toString(storageInterface.get(null, runOutput.getOutputFiles().get("out")), Charsets.UTF_8),
IOUtils.toString(storageInterface.get(null, null, runOutput.getOutputFiles().get("out")), Charsets.UTF_8),
is( "id,name\n" +
"4814976,Viva\n" +
"1010871,Voomm\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void insert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -92,7 +92,7 @@ INSERT INTO mysql_types (
year_type,
json_type,
blob_type
) VALUES (
) VALUES (
?,
?,
?,
Expand All @@ -113,7 +113,7 @@ INSERT INTO mysql_types (
?,
?,
?,
?
?
);
"""
)
Expand All @@ -140,7 +140,7 @@ public void namedInsert() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -171,7 +171,7 @@ public void namedColumnsInsert() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -220,7 +220,7 @@ public void noSqlForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -248,7 +248,7 @@ public void noSqlWithNamedColumnsForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void load() throws Exception {
URL resource = LoadTest.class.getClassLoader().getResource("load.csv");

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
new FileInputStream(Objects.requireNonNull(resource).getFile())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void insert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -92,7 +92,7 @@ public void namedInsert() throws Exception {

}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -122,7 +122,7 @@ public void namedColumnsInsert() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

ArrayList<String> columns = new ArrayList<>();
columns.add("t_id");
Expand Down Expand Up @@ -175,7 +175,7 @@ public void noSqlForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -203,7 +203,7 @@ public void noSqlWithNamedColumnsForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void insert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -155,7 +155,7 @@ public void namedInsert() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(TestUtils.url())
Expand Down Expand Up @@ -192,7 +192,7 @@ public void namedColumnsInsert() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(TestUtils.url())
Expand Down Expand Up @@ -248,7 +248,7 @@ public void noSqlForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -282,7 +282,7 @@ public void noSqlWithNamedColumnsForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void selectAndFetchToFile() throws Exception {
AbstractJdbcQuery.Output runOutput = task.run(runContext);
assertThat(runOutput.getUri(), notNullValue());

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.storageInterface.get(null, runOutput.getUri())));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.storageInterface.get(null, null, runOutput.getUri())));
int lines = 0;
while (bufferedReader.readLine() != null) {
lines++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void success() throws Exception {
URL resource = UploadDownloadTest.class.getClassLoader().getResource("scripts/snowflake.sql");

URI put = storageInterface.put(
null,
null,
new URI("/file/storage/snowflake.sql"),
new FileInputStream(Objects.requireNonNull(resource).getFile())
Expand Down Expand Up @@ -83,8 +84,8 @@ void success() throws Exception {
assertThat(downloadRun.getUri(), notNullValue());

assertThat(
IOUtils.toString(this.storageInterface.get(null, downloadRun.getUri()), Charsets.UTF_8),
is(IOUtils.toString(this.storageInterface.get(null, put), Charsets.UTF_8))
IOUtils.toString(this.storageInterface.get(null, null, downloadRun.getUri()), Charsets.UTF_8),
is(IOUtils.toString(this.storageInterface.get(null, null, put), Charsets.UTF_8))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ void selectFromExistingDatabase() throws Exception {
URL resource = SqliteTest.class.getClassLoader().getResource("db/Chinook_Sqlite.sqlite");

URI input = storageInterface.put(
null,
null,
new URI("/file/storage/get.yml"),
new FileInputStream(Objects.requireNonNull(resource).getFile())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void insert() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -94,7 +94,7 @@ public void namedInsert() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -125,7 +125,7 @@ public void namedColumnsInsert() throws Exception {
}


URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

ArrayList<String> columns = new ArrayList<>();
columns.add("t_id");
Expand Down Expand Up @@ -183,7 +183,7 @@ public void noSqlForInsert() throws Exception {
);
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down Expand Up @@ -211,7 +211,7 @@ public void noSqlWithNamedColumnsForInsert() throws Exception {
));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Batch task = Batch.builder()
.url(getUrl())
Expand Down
Loading

0 comments on commit 0b11d2a

Please sign in to comment.