Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add support for external engines #84

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ docker run \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/subscriptions.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/engines.crd.yaml" \
&& echo "done."
13 changes: 13 additions & 0 deletions hoptimator-api/src/main/java/com/linkedin/hoptimator/Engine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.linkedin.hoptimator;

import javax.sql.DataSource;

/** An execution engine. */
public interface Engine {

String engineName();

DataSource dataSource();

SqlDialect dialect();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import com.linkedin.hoptimator.k8s.models.V1alpha1Database;
import com.linkedin.hoptimator.k8s.models.V1alpha1DatabaseList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Engine;
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
Expand All @@ -34,6 +36,9 @@ public final class K8sApiEndpoints {
public static final K8sApiEndpoint<V1alpha1Database, V1alpha1DatabaseList> DATABASES =
new K8sApiEndpoint<>("Database", "hoptimator.linkedin.com", "v1alpha1", "databases", false,
V1alpha1Database.class, V1alpha1DatabaseList.class);
public static final K8sApiEndpoint<V1alpha1Engine, V1alpha1EngineList> ENGINES =
new K8sApiEndpoint<>("Engine", "hoptimator.linkedin.com", "v1alpha1", "engines", false,
V1alpha1Engine.class, V1alpha1EngineList.class);
public static final K8sApiEndpoint<V1alpha1Pipeline, V1alpha1PipelineList> PIPELINES =
new K8sApiEndpoint<>("Pipeline", "hoptimator.linkedin.com", "v1alpha1", "pipelines", false,
V1alpha1Pipeline.class, V1alpha1PipelineList.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ public Row(String name, String url, String schema, String dialect, String driver
}
// CHECKSTYLE:ON

public K8sDatabaseTable(K8sContext context) {
private final K8sEngineTable engines;

public K8sDatabaseTable(K8sContext context, K8sEngineTable engines) {
super(context, K8sApiEndpoints.DATABASES, Row.class);
this.engines = engines;
}

public void addDatabases(SchemaPlus parentSchema) {
for (Row row : rows()) {
parentSchema.add(schemaName(row),
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row)));
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row), engines.forDatabase(row.NAME)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.linkedin.hoptimator.k8s;

import javax.sql.DataSource;

import com.linkedin.hoptimator.Engine;
import com.linkedin.hoptimator.SqlDialect;

import org.apache.calcite.adapter.jdbc.JdbcSchema;


public class K8sEngine implements Engine {

private final String name;
private final String url;
private final SqlDialect dialect;
private final String driver;

public K8sEngine(String name, String url, SqlDialect dialect, String driver) {
this.name = name;
this.url = url;
this.dialect = dialect;
this.driver = driver;
}

@Override
public String engineName() {
return name;
}

@Override
public DataSource dataSource() {
// TODO support username, password via Secrets
return JdbcSchema.dataSource(url, driver, null, null);
}

@Override
public SqlDialect dialect() {
return dialect;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.linkedin.hoptimator.k8s;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.schema.Schema;

import io.kubernetes.client.openapi.models.V1ObjectMeta;

import com.linkedin.hoptimator.Engine;
import com.linkedin.hoptimator.SqlDialect;
import com.linkedin.hoptimator.k8s.models.V1alpha1Engine;
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineSpec;
import com.linkedin.hoptimator.util.HoptimatorJdbcSchema;


public class K8sEngineTable extends K8sTable<V1alpha1Engine, V1alpha1EngineList, K8sEngineTable.Row> {

// CHECKSTYLE:OFF
public static class Row {
public String NAME;
public String URL;
public String DIALECT;
public String DRIVER;
public String[] DATABASES;
Comment on lines +26 to +30
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should prob be lowercase so we don't need to disable checkstyle

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a quirk of Calcite. These fields are referenced in the generated code, and things go sideways if they aren't all caps.


public Row(String name, String url, String dialect, String driver, String[] databases) {
this.NAME = name;
this.URL = url;
this.DIALECT = dialect;
this.DRIVER = driver;
this.DATABASES = databases;
}
}
// CHECKSTYLE:ON

public K8sEngineTable(K8sContext context) {
super(context, K8sApiEndpoints.ENGINES, Row.class);
}

/** Engines supporting a given database. */
public List<Engine> forDatabase(String database) {
return rows().stream().filter(x -> x.DATABASES == null
|| x.DATABASES.length == 0
|| Arrays.asList(x.DATABASES).contains(database))
.map(x -> new K8sEngine(x.NAME, x.URL, dialect(x), x.DRIVER))
.collect(Collectors.toList());
}

@Override
public Row toRow(V1alpha1Engine obj) {
return new Row(obj.getMetadata().getName(), obj.getSpec().getUrl(),
Optional.ofNullable(obj.getSpec().getDialect()).map(x -> x.toString()).orElseGet(() -> null),
obj.getSpec().getDriver(), obj.getSpec().getDatabases() != null ?
obj.getSpec().getDatabases().toArray(new String[0]) : null);
}

@Override
public V1alpha1Engine fromRow(Row row) {
K8sUtils.checkK8sName(row.NAME);
return new V1alpha1Engine().kind(K8sApiEndpoints.ENGINES.kind())
.apiVersion(K8sApiEndpoints.ENGINES.apiVersion())
.metadata(new V1ObjectMeta().name(row.NAME))
.spec(new V1alpha1EngineSpec().url(row.URL)
.dialect(V1alpha1EngineSpec.DialectEnum.fromValue(row.DIALECT))
.driver(row.DRIVER).databases(Arrays.asList(row.DATABASES)));
}

private static SqlDialect dialect(Row row) {
if (row.DIALECT == null) {
return SqlDialect.ANSI;
}
switch (row.DIALECT) {
case "Flink":
return SqlDialect.FLINK;
default:
return SqlDialect.valueOf(row.DIALECT);
}
}

@Override
public Schema.TableType getJdbcTableType() {
return Schema.TableType.SYSTEM_TABLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,26 @@ public class K8sMetadata extends AbstractSchema {

private final Map<String, Table> tableMap = new HashMap<>();
private final K8sDatabaseTable databaseTable;
private final K8sEngineTable engineTable;
private final K8sViewTable viewTable;

public K8sMetadata(K8sContext context) {
this.databaseTable = new K8sDatabaseTable(context);
this.engineTable = new K8sEngineTable(context);
this.databaseTable = new K8sDatabaseTable(context, engineTable);
this.viewTable = new K8sViewTable(context);
tableMap.put("DATABASES", databaseTable);
tableMap.put("ENGINES", engineTable);
tableMap.put("VIEWS", viewTable);
}

public K8sDatabaseTable databaseTable() {
return databaseTable;
}

public K8sEngineTable engineTable() {
return engineTable;
}

public K8sViewTable viewTable() {
return viewTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Database metadata.
*/
@ApiModel(description = "Database metadata.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-09T16:52:51.758Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-14T23:39:16.570Z[Etc/UTC]")
public class V1alpha1Database implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* DatabaseList is a list of Database
*/
@ApiModel(description = "DatabaseList is a list of Database")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-09T16:52:51.758Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-14T23:39:16.570Z[Etc/UTC]")
public class V1alpha1DatabaseList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Database spec.
*/
@ApiModel(description = "Database spec.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-12-09T16:52:51.758Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-01-14T23:39:16.570Z[Etc/UTC]")
public class V1alpha1DatabaseSpec {
/**
* SQL dialect the driver expects.
Expand Down
Loading
Loading