Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coral-beam for converting Calcite logical plan into Apache Beam API Java code #21

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
**Coral** is a library for analyzing, processing, and rewriting views defined in the Hive Metastore, and sharing them
across multiple execution engines. It performs SQL translations to enable views expressed in HiveQL (and potentially
other languages) to be accessible in engines such as [Presto](https://prestosql.io/),
[Apache Spark](https://spark.apache.org/), and [Apache Pig](https://pig.apache.org/).
[Apache Spark](https://spark.apache.org/), [Apache Pig](https://pig.apache.org/),
or Java Streaming API with [Apache Beam](https://beam.apache.org/).
Coral not only translates view definitions between different SQL/non-SQL dialects, but also rewrites expressions to
produce semantically equivalent ones, taking into account the semantics of the target language or engine.
For example, it automatically composes new built-in expressions that are equivalent to each built-in expression in the
Expand All @@ -23,6 +24,8 @@ and implementing query rewrite algorithms for data governance and query optimiza
- Coral-Pig: Converts view logical plan to Pig-latin.
- Coral-Schema: Derives Avro schema of view using view logical plan and input Avro schemas of base tables.
- Coral-Spark-Plan: Converts Spark plan strings to equivalent logical plan (in progress).
- Coral-Beam: Convert view logical plan to Beam Java API streaming code
- Coral-Beam-Runtime: runtime module to support Coral-Beam

## How to Build
Clone the repository:
Expand All @@ -40,3 +43,4 @@ Please see the [Contribution Agreement](CONTRIBUTING.md).

## Resources
- [Coral & Transport UDFs: Building Blocks of a Postmodern Data Warehouse](https://www.slideshare.net/walaa_eldin_moustafa/coral-transport-udfs-building-blocks-of-a-postmodern-data-warehouse-229545076), Tech-talk, Facebook HQ, 2/28/2020.
- [Bridging Offline and Nearline Computations with Apache Calcite](https://engineering.linkedin.com/blog/2019/01/bridging-offline-and-nearline-computations-with-apache-calcite), LinkedIn Engineering Blogs 01/29/2019.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ allprojects {
maven {
url 'https://linkedin.bintray.com/maven/'
}
maven {
url "https://packages.confluent.io/maven/"
content { includeGroup "io.confluent" }
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions coral-beam-runtime/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apply plugin: 'java'

dependencies {
compile deps.'beam'.'java-core'
compile deps.'beam'.'java-io-kafka'
compile deps.'pig'.'pig'
compile deps.'hadoop'.'hadoop-common'
compile 'org.apache.avro:avro:1.7.7'
compile 'log4j:log4j:1.2.17'
}

artifacts {
archives jar, javadocJar, sourcesJar
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright 2019 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.beam.excution;

import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.log4j.Logger;
import org.joda.time.Instant;


public class BeamAPIUtil {
private static final Logger LOG = Logger.getLogger(BeamAPIUtil.class.getName());

private BeamAPIUtil() {
}

public static SerializableFunction<KV<String, GenericRecord>, Instant> withTimeFunction(List<String> timeField) {
return (SerializableFunction<KV<String, GenericRecord>, Instant>) input -> {
final Instant now = Instant.now();
if (timeField == null || timeField.isEmpty()) {
// No time field specified return current processing time
return now;
}
GenericRecord record = input.getValue();
for (int i = 0; i < timeField.size(); i++) {
Object value = record.get(timeField.get(i));
if (value == null) {
LOG.warn("Cannot get event time for input record, use current processing time instead. Time field: "
+ timeField + ". Record: " + input.getValue());
return now;
}
if (i < timeField.size() - 1) {
if (!(value instanceof GenericRecord)) {
throw new RuntimeException(
"Invalid schema for time field. Time field: " + timeField + ". Record schema: " + input.getValue().getSchema());
}
record = (GenericRecord) value;
continue;
}
return new Instant(value);
}

// Never reach here
return null;
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Copyright 2019 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.beam.excution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.values.KV;


public class BeamArrayFlatten {
private BeamArrayFlatten() {
}

/**
* Flattens a Beam KV pair of String and Avro record on a given of array columns in the Avro record. These columns
* must have ARRAY schema type and the input values for those column must be either Java array or Collection.
*
* @param inputKV Input Beam KV pair of String and Avro record
* @param flattenCols Columns to be flattened
* @param outputSchema Schemas of ouput records
* @return Collection of output records.
*/
public static Collection<KV<String, GenericRecord>> flatten(KV<String, GenericRecord> inputKV,
List<String> flattenCols, Schema outputSchema) {
final List<KV<String, GenericRecord>> results = new ArrayList<>();
final String key = inputKV.getKey();
final Collection<GenericRecord> resultRecords = flatten(inputKV.getValue(), flattenCols, outputSchema);
for (GenericRecord record : resultRecords) {
results.add(KV.of(key, record));
}
return results;
}

/**
* Flattens a record on a given of array columns. These columns must have ARRAY schema type
* and the input values for those column must be either Java array or Collection.
*
* @param inputRecord Input Avro records
* @param flattenCols Columns to be flattened
* @param outputSchema Schemas of ouput records
* @return Collection of output records.
*/
public static Collection<GenericRecord> flatten(GenericRecord inputRecord, List<String> flattenCols,
Schema outputSchema) {
List<GenericRecord> results = new ArrayList<>();
if (inputRecord == null) {
return results;
}
Schema inputSchema = inputRecord.getSchema();
Map<String, Object> partialRecord = new HashMap<>();
for (Schema.Field field : inputSchema.getFields()) {
partialRecord.put(field.name(), inputRecord.get(field.name()));
}
List<Map<String, Object>> partialResults = new ArrayList<>();
partialResults.add(partialRecord);

for (String flattenCol : flattenCols) {
Schema flattenSchema = getFlattenElementSchema(inputSchema, flattenCol);
if (flattenSchema == null) {
throw new RuntimeException("Column " + flattenCol + " is not an array in records"
+ " with schema: " + inputSchema);
}
Object obj = inputRecord.get(flattenCol);
List<Object> arrayValue = null;
if (obj != null) {
if (obj.getClass().isArray()) {
arrayValue = Arrays.asList((Object[]) obj);
} else if (obj instanceof Collection) {
if (obj instanceof List) {
arrayValue = (List<Object>) obj;
} else {
arrayValue = new ArrayList<>((Collection<Object>) obj);
}
} else {
throw new RuntimeException("Invalid avro array value. Value type: " + obj.getClass().getName());
}
}
partialResults = extendFlattenRecords(partialResults, flattenCol, arrayValue, flattenSchema);
}

for (Map<String, Object> outputRec : partialResults) {
GenericRecord record = new GenericData.Record(outputSchema);
for (Schema.Field field : outputSchema.getFields()) {
record.put(field.name(), outputRec.get(field.name()));
}
results.add(record);
}
return results;
}

private static List<Map<String, Object>> extendFlattenRecords(List<Map<String, Object>> partialResults,
String flattenCol, List<Object> flattenValue, Schema flattenSchema) {
// Special optimization: no need to create new Map and List if array has only one value
if (flattenValue == null || flattenValue.size() <= 1) {
Object value = (flattenValue == null || flattenValue.isEmpty()) ? null : flattenValue.get(0);
for (Map<String, Object> partialRecord : partialResults) {
projectNestedFields(value, flattenCol, flattenSchema, partialRecord);
}
return partialResults;
}

List<Map<String, Object>> results = new ArrayList<>();
for (Map<String, Object> partialRecord : partialResults) {
for (Object value : flattenValue) {
Map<String, Object> newRecord = new HashMap<>(partialRecord);
projectNestedFields(value, flattenCol, flattenSchema, newRecord);
results.add(newRecord);
}
}
return results;
}

private static Schema getFlattenElementSchema(Schema recordSchema, String flattenCol) {
if (recordSchema.getField(flattenCol) == null) {
return null;
}
Schema colSchema = recordSchema.getField(flattenCol).schema();
if (colSchema.getType() == Schema.Type.ARRAY) {
return colSchema.getElementType();
}
if (colSchema.getType() == Schema.Type.UNION) {
for (Schema typeSchema : colSchema.getTypes()) {
if (typeSchema.getType() == Schema.Type.ARRAY) {
return typeSchema.getElementType();
}
}
}

return null;
}

private static void projectNestedFields(Object nestedValue, String flattenCol, Schema flattenSchema,
Map<String, Object> partialRecord) {
if (flattenSchema.getType() == Schema.Type.RECORD) {
for (Schema.Field field : flattenSchema.getFields()) {
Object fieldValue = (nestedValue != null) ? ((GenericRecord) nestedValue).get(field.name()) : null;
partialRecord.put(field.name(), fieldValue);
}
} else {
partialRecord.put(flattenCol, nestedValue);
}
}
}
Loading