Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions presto-clp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
Expand Down Expand Up @@ -154,4 +159,14 @@
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.1</version> <!-- upper bound -->
</dependency>
</dependencies>
</dependencyManagement>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ClpConfig
private long metadataRefreshInterval = 60;
private long metadataExpireInterval = 600;

private String metadataYamlPath;

private String splitFilterConfig;
private SplitFilterProviderType splitFilterProviderType = SplitFilterProviderType.MYSQL;
private SplitProviderType splitProviderType = SplitProviderType.MYSQL;
Expand Down Expand Up @@ -151,6 +153,18 @@ public ClpConfig setMetadataExpireInterval(long metadataExpireInterval)
return this;
}

public String getMetadataYamlPath()
{
return metadataYamlPath;
}

@Config("clp.metadata-yaml-path")
public ClpConfig setMetadataYamlPath(String metadataYamlPath)
{
this.metadataYamlPath = metadataYamlPath;
return this;
}

public String getSplitFilterConfig()
{
return splitFilterConfig;
Expand Down Expand Up @@ -189,7 +203,8 @@ public ClpConfig setSplitProviderType(SplitProviderType splitProviderType)

public enum MetadataProviderType
{
MYSQL
MYSQL,
YAML
}

public enum SplitFilterProviderType
Expand All @@ -199,6 +214,7 @@ public enum SplitFilterProviderType

public enum SplitProviderType
{
MYSQL
MYSQL,
PINOT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum ClpErrorCode
CLP_UNSUPPORTED_SPLIT_SOURCE(2, EXTERNAL),
CLP_UNSUPPORTED_TYPE(3, EXTERNAL),
CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL),
CLP_UNSUPPORTED_TABLE_SCHEMA_YAML(5, EXTERNAL),

CLP_SPLIT_FILTER_CONFIG_NOT_FOUND(10, USER_ERROR),
CLP_MANDATORY_SPLIT_FILTER_NOT_VALID(11, USER_ERROR),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider;
import com.facebook.presto.plugin.clp.metadata.ClpYamlMetadataProvider;
import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider;
import com.facebook.presto.plugin.clp.split.ClpPinotSplitProvider;
import com.facebook.presto.plugin.clp.split.ClpSplitProvider;
import com.facebook.presto.plugin.clp.split.filter.ClpMySqlSplitFilterProvider;
import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider;
Expand Down Expand Up @@ -56,13 +58,19 @@ protected void setup(Binder binder)
if (config.getMetadataProviderType() == MetadataProviderType.MYSQL) {
binder.bind(ClpMetadataProvider.class).to(ClpMySqlMetadataProvider.class).in(Scopes.SINGLETON);
}
else if (config.getMetadataProviderType() == MetadataProviderType.YAML) {
binder.bind(ClpMetadataProvider.class).to(ClpYamlMetadataProvider.class).in(Scopes.SINGLETON);
}
else {
throw new PrestoException(CLP_UNSUPPORTED_METADATA_SOURCE, "Unsupported metadata provider type: " + config.getMetadataProviderType());
}

if (config.getSplitProviderType() == SplitProviderType.MYSQL) {
binder.bind(ClpSplitProvider.class).to(ClpMySqlSplitProvider.class).in(Scopes.SINGLETON);
}
else if (config.getSplitProviderType() == SplitProviderType.PINOT) {
binder.bind(ClpSplitProvider.class).to(ClpPinotSplitProvider.class).in(Scopes.SINGLETON);
}
else {
throw new PrestoException(CLP_UNSUPPORTED_SPLIT_SOURCE, "Unsupported split provider type: " + config.getSplitProviderType());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.clp.metadata;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.plugin.clp.ClpColumnHandle;
import com.facebook.presto.plugin.clp.ClpConfig;
import com.facebook.presto.plugin.clp.ClpTableHandle;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import javax.inject.Inject;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.facebook.presto.plugin.clp.ClpConnectorFactory.CONNECTOR_NAME;
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_TABLE_SCHEMA_YAML;
import static java.lang.String.format;

public class ClpYamlMetadataProvider
implements ClpMetadataProvider
{
private static final Logger log = Logger.get(ClpYamlMetadataProvider.class);
private final ClpConfig config;
private Map<SchemaTableName, String> tableSchemaYamlMap;

@Inject
public ClpYamlMetadataProvider(ClpConfig config)
{
this.config = config;
}

@Override
public List<ClpColumnHandle> listColumnHandles(SchemaTableName schemaTableName)
{
Path tableSchemaPath = Paths.get(tableSchemaYamlMap.get(schemaTableName));
ClpSchemaTree schemaTree = new ClpSchemaTree(config.isPolymorphicTypeEnabled());
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

try {
Map<String, Object> root = mapper.readValue(
new File(tableSchemaPath.toString()),
new TypeReference<HashMap<String, Object>>() {});
ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Byte> typesBuilder = ImmutableList.builder();
collectTypes(root, "", namesBuilder, typesBuilder);
ImmutableList<String> names = namesBuilder.build();
ImmutableList<Byte> types = typesBuilder.build();
// The names and types should have same sizes
for (int i = 0; i < names.size(); i++) {
schemaTree.addColumn(names.get(i), types.get(i));
}
return schemaTree.collectColumnHandles();
}
catch (IOException e) {
log.error(format("Failed to parse table schema file %s, error: %s", tableSchemaPath, e.getMessage()), e);
}
return Collections.emptyList();
}

@Override
public List<ClpTableHandle> listTableHandles(String schemaName)
{
Path tablesSchemaPath = Paths.get(config.getMetadataYamlPath());
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

try {
Map<String, Object> root = mapper.readValue(new File(tablesSchemaPath.toString()),
new TypeReference<HashMap<String, Object>>() {});

Object catalogObj = root.get(CONNECTOR_NAME);
if (!(catalogObj instanceof Map)) {
throw new PrestoException(CLP_UNSUPPORTED_TABLE_SCHEMA_YAML, format("The table schema does not contain field: %s", CONNECTOR_NAME));
}
Object schemaObj = ((Map<String, Object>) catalogObj).get(schemaName);
ImmutableList.Builder<ClpTableHandle> tableHandlesBuilder = new ImmutableList.Builder<>();
ImmutableMap.Builder<SchemaTableName, String> tableSchemaYamlMapBuilder = new ImmutableMap.Builder<>();
for (Map.Entry<String, Object> schemaEntry : ((Map<String, Object>) schemaObj).entrySet()) {
String tableName = schemaEntry.getKey();
String tableSchemaYamlPath = schemaEntry.getValue().toString();
// The splits' absolute paths will be stored in Pinot metadata database
SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName);
tableHandlesBuilder.add(new ClpTableHandle(schemaTableName, ""));
tableSchemaYamlMapBuilder.put(schemaTableName, tableSchemaYamlPath);
}
this.tableSchemaYamlMap = tableSchemaYamlMapBuilder.build();
return tableHandlesBuilder.build();
}
catch (IOException e) {
log.error(format("Failed to parse metadata file: %s, error: %s", config.getMetadataYamlPath(), e.getMessage()), e);
}
return Collections.emptyList();
}

private void collectTypes(Object node, String prefix, ImmutableList.Builder<String> namesBuilder, ImmutableList.Builder<Byte> typesBuilder)
{
if (node instanceof Number) {
namesBuilder.add(prefix);
typesBuilder.add(((Number) node).byteValue());
return;
}
if (node instanceof List) {
for (Number type : (List<Number>) node) {
namesBuilder.add(prefix);
typesBuilder.add(type.byteValue());
}
return;
}
for (Map.Entry<String, Object> entry : ((Map<String, Object>) node).entrySet()) {
if (!prefix.isEmpty()) {
collectTypes(entry.getValue(), format("%s.%s", prefix, entry.getKey()), namesBuilder, typesBuilder);
continue;
}
collectTypes(entry.getValue(), entry.getKey(), namesBuilder, typesBuilder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP_MICROSECONDS;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION;
import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND;
Expand Down Expand Up @@ -914,6 +916,8 @@ public static boolean isClpCompatibleNumericType(Type type)
|| type.equals(TINYINT)
|| type.equals(DOUBLE)
|| type.equals(REAL)
|| type.equals(TIMESTAMP)
|| type.equals(TIMESTAMP_MICROSECONDS)
|| type instanceof DecimalType;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.clp.split;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.plugin.clp.ClpConfig;
import com.facebook.presto.plugin.clp.ClpSplit;
import com.facebook.presto.plugin.clp.ClpTableHandle;
import com.facebook.presto.plugin.clp.ClpTableLayoutHandle;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import static com.facebook.presto.plugin.clp.ClpSplit.SplitType;
import static com.facebook.presto.plugin.clp.ClpSplit.SplitType.ARCHIVE;
import static com.facebook.presto.plugin.clp.ClpSplit.SplitType.IR;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

public class ClpPinotSplitProvider
implements ClpSplitProvider
{
private static final Logger log = Logger.get(ClpPinotSplitProvider.class);
private static final String SQL_SELECT_SPLITS_TEMPLATE = "SELECT tpath FROM %s WHERE 1 = 1 AND (%s) LIMIT 999999";
private final ClpConfig config;

@Inject
public ClpPinotSplitProvider(ClpConfig config)
{
this.config = config;
}

@Override
public List<ClpSplit> listSplits(ClpTableLayoutHandle clpTableLayoutHandle)
{
ImmutableList.Builder<ClpSplit> splits = new ImmutableList.Builder<>();
ClpTableHandle clpTableHandle = clpTableLayoutHandle.getTable();
String tableName = clpTableHandle.getSchemaTableName().getTableName();
try {
URL url = new URL(config.getMetadataDbUrl() + "/query/sql");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setRequestProperty("Accept", "application/json");
conn.setDoOutput(true);
conn.setConnectTimeout((int) SECONDS.toMillis(5));
conn.setReadTimeout((int) SECONDS.toMillis(30));

String query = format(SQL_SELECT_SPLITS_TEMPLATE, tableName, clpTableLayoutHandle.getMetadataSql().orElse("1 = 1"));
log.info("Pinot query: %s", query);
ObjectMapper mapper = new ObjectMapper();
String body = format("{\"sql\": %s }", mapper.writeValueAsString(query));
try (OutputStream os = conn.getOutputStream()) {
os.write(body.getBytes(StandardCharsets.UTF_8));
}

int code = conn.getResponseCode();
InputStream is = (code >= 200 && code < 300) ? conn.getInputStream() : conn.getErrorStream();
if (is == null) {
throw new IOException("Pinot HTTP " + code + " with empty body");
}

JsonNode root;
try (InputStream in = is) {
root = mapper.readTree(in);
}
JsonNode resultTable = root.get("resultTable");
if (resultTable == null) {
throw new RuntimeException("No \"resultTable\" field found");
}
JsonNode rows = resultTable.get("rows");
if (rows == null) {
throw new RuntimeException("No \"rows\" field found");
}
for (Iterator<JsonNode> it = rows.elements(); it.hasNext(); ) {
JsonNode row = it.next();
String splitPath = row.elements().next().asText();
SplitType splitType = splitPath.endsWith(".clp.zst") ? IR : ARCHIVE;
splits.add(new ClpSplit(splitPath, splitType, clpTableLayoutHandle.getKqlQuery()));
}
List<ClpSplit> filteredSplits = splits.build();
log.debug("Number of filtered splits: %s", filteredSplits.size());
return filteredSplits;
}
catch (Exception e) {
log.error(e);
}

return Collections.emptyList();
}
}
Loading
Loading