Skip to content

Commit

Permalink
feat: Add a new Queries task to run multiple SQL with multiple output…
Browse files Browse the repository at this point in the history
…s, parameter binding and transactions

extract common logic to utils and abstract classes

created test for queries

implemented queries for MySQL

#368
  • Loading branch information
mgabelle committed Oct 14, 2024
1 parent 0932f1b commit c85ef33
Show file tree
Hide file tree
Showing 17 changed files with 593 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcBatch;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.kestra.plugin.jdbc.mysql;

import io.micronaut.http.uri.UriBuilder;

import java.net.URI;
import java.nio.file.Path;
import java.util.Properties;

public class MysqlUtils {

private MysqlUtils() {
throw new IllegalStateException("Utility class");
}

protected static Properties createMysqlProperties(Properties props, Path workingDirectory,
boolean isMultiQuery) {
URI url = URI.create((String) props.get("jdbc.url"));
url = URI.create(url.getSchemeSpecificPart());

UriBuilder builder = UriBuilder.of(url);

// allow local in file for current worker and prevent the global one
builder.queryParam("allowLoadLocalInfileInPath", workingDirectory.toAbsolutePath().toString());
builder.replaceQueryParam("allowLoadLocalInfile", false);

// see https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html
// By default, ResultSets are completely retrieved and stored in memory.
builder.replaceQueryParam("useCursorFetch", true);

builder.scheme("jdbc:mysql");

if(isMultiQuery) {
builder.queryParam("allowMultiQueries", true);
}

props.put("jdbc.url", builder.build().toString());

return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.kestra.plugin.jdbc.mysql;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.runners.PluginUtilsService;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Properties;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Query a MySQL database."
)
@Plugin(
examples = {
@Example(
title = "Send a SQL query to a MySQL Database and fetch a row as output.",
full = true,
code = """
"""
)
}
)
public class Queries extends AbstractJdbcQueries {

@Schema(
title = "Add input file to be loaded with `LOAD DATA LOCAL`.",
description = "The file must be from Kestra's internal storage"
)
@PluginProperty(dynamic = true)
protected String inputFile;

@Getter(AccessLevel.NONE)
protected transient Path workingDirectory;

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new MysqlCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());
}

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
return MysqlUtils.createMysqlProperties(super.connectionProperties(runContext), this.workingDirectory, true);
}

@Override
public Integer getFetchSize() {
// The combination of useCursorFetch=true and preparedStatement.setFetchSize(10); push to use cursor on MySql DB instance side.
// This leads to consuming DB instance disk memory when we try to fetch more than aware table size.
// It actually just disables client-side caching of the entire response and gives you responses as they arrive as a result it has no effect on the DB
return this.isStore() ? Integer.MIN_VALUE : this.fetchSize;
}

@Override
public AbstractJdbcQueries.MultiQueryOutput run(RunContext runContext) throws Exception {
this.workingDirectory = runContext.workingDir().path();

if (this.inputFile != null) {
PluginUtilsService.createInputFiles(
runContext,
workingDirectory,
Map.of("inputFile", this.inputFile),
additionalVars
);
}

additionalVars.put("inputFile", workingDirectory.toAbsolutePath().resolve("inputFile").toString());

return super.run(runContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AutoCommitInterface;
import io.micronaut.http.uri.UriBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.net.URI;
import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;
Expand Down Expand Up @@ -103,28 +105,10 @@ public void registerDriver() throws SQLException {

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties props = super.connectionProperties(runContext);

URI url = URI.create((String) props.get("jdbc.url"));
url = URI.create(url.getSchemeSpecificPart());

UriBuilder builder = UriBuilder.of(url);

// allow local in file for current worker and prevent the global one
builder.queryParam("allowLoadLocalInfileInPath", this.workingDirectory.toAbsolutePath().toString());
builder.replaceQueryParam("allowLoadLocalInfile", false);

// see https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html
// By default, ResultSets are completely retrieved and stored in memory.
builder.replaceQueryParam("useCursorFetch", true);

builder.scheme("jdbc:mysql");

props.put("jdbc.url", builder.build().toString());

return props;
return MysqlUtils.createMysqlProperties(super.connectionProperties(runContext), this.workingDirectory, false);
}

@Override
public Integer getFetchSize() {
// The combination of useCursorFetch=true and preparedStatement.setFetchSize(10); push to use cursor on MySql DB instance side.
// This leads to consuming DB instance disk memory when we try to fetch more than aware table size.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.kestra.plugin.jdbc.mysql;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractRdbmsTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.Collections;

import static io.kestra.core.models.tasks.common.FetchType.FETCH;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

/**
* See : https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-type-conversions.html
*/
@KestraTest
public class QueriesMysqlTest extends AbstractRdbmsTest {

@Test
void testSelect() throws Exception {
RunContext runContext = runContextFactory.of(Collections.emptyMap());

Queries taskGet = Queries.builder()
.url(getUrl())
.username(getUsername())
.password(getPassword())
.fetchType(FETCH)
.timeZoneId("Europe/Paris")
.sql("""
SELECT firstName, lastName FROM employee;
SELECT brand FROM laptop;
""")
.build();

AbstractJdbcQueries.MultiQueryOutput runOutput = taskGet.run(runContext);
assertThat(runOutput.getOutputs().size(), is(2));
assertThat(runOutput.getOutputs().get(0), notNullValue());
assertThat(runOutput.getOutputs().get(1), notNullValue());
}

@Override
protected String getUrl() {
return "jdbc:mysql://127.0.0.1:64790/kestra";
}

@Override
protected String getUsername() {
return "root";
}

@Override
protected String getPassword() {
return "mysql_passwd";
}

@Override
protected void initDatabase() throws SQLException, FileNotFoundException, URISyntaxException {
executeSqlScript("scripts/mysql-queries.sql");
}

@Override
@BeforeEach
public void init() throws IOException, URISyntaxException, SQLException {
initDatabase();
}
}
24 changes: 24 additions & 0 deletions plugin-jdbc-mysql/src/test/resources/scripts/mysql-queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
USE kestra;
DROP TABLE IF EXISTS employee;

CREATE TABLE employee (employee_id SERIAL NOT NULL, firstName VARCHAR(30), lastName VARCHAR(30), PRIMARY KEY (employee_id));

INSERT INTO employee (employee_id, firstName, lastName)
VALUES
(1, 'John', 'Doe'),
(2, 'Bryan', 'Grant');

DROP TABLE IF EXISTS laptop;

CREATE TABLE laptop
(
laptop_id SERIAL NOT NULL,
brand VARCHAR(30),
model VARCHAR(30),
PRIMARY KEY (laptop_id)
);

INSERT INTO laptop (laptop_id, brand, model)
VALUES
(1, 'Apple', 'MacBookPro M1 16'),
(2, 'Lenovo', 'ThinkPad');
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

@SuperBuilder
@ToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;

Expand Down
Loading

0 comments on commit c85ef33

Please sign in to comment.