Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink] Bump Flink to 1.19.1 #3335

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ connectServer / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
val flinkVersion = "1.16.1"
val flinkVersion = "1.19.1"
val hadoopVersion = "3.3.4"
val scalaTestVersion = "3.2.15"
val scalaTestVersionForConnectors = "3.0.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.delta.flink.internal.table.DeltaCatalogTableHelper.DeltaMetastoreTable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.Schema;
Expand Down Expand Up @@ -218,8 +217,8 @@ public void createTable(DeltaCatalogBaseTable catalogTable, boolean ignoreIfExis
}

// Add table to metastore
DeltaMetastoreTable metastoreTable =
DeltaCatalogTableHelper.prepareMetastoreTable(table, deltaTablePath);
ResolvedCatalogTable metastoreTable =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be a ResolvedCatalogTable since apache/flink@3d35048

DeltaCatalogTableHelper.prepareResolvedMetastoreTable(table, deltaTablePath);
this.decoratedCatalog.createTable(tableCatalogPath, metastoreTable, ignoreIfExists);
} else {
// Table does not exist on filesystem, we have to create a new _delta_log
Expand All @@ -237,8 +236,8 @@ public void createTable(DeltaCatalogBaseTable catalogTable, boolean ignoreIfExis
Operation.Name.CREATE_TABLE
);

DeltaMetastoreTable metastoreTable =
DeltaCatalogTableHelper.prepareMetastoreTable(table, deltaTablePath);
ResolvedCatalogTable metastoreTable =
DeltaCatalogTableHelper.prepareResolvedMetastoreTable(table, deltaTablePath);

// add table to metastore
this.decoratedCatalog.createTable(tableCatalogPath, metastoreTable, ignoreIfExists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.catalog.Column.PhysicalColumn;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
Expand Down Expand Up @@ -227,7 +228,10 @@ public static Map<String, String> filterMetastoreDdlOptions(Map<String, String>
/**
* Prepare catalog table to store in metastore. This table will have only selected
* options from DDL and an empty schema.
*
* @deprecated Use {@link #prepareResolvedMetastoreTable(CatalogBaseTable, String)} instead.
*/
@Deprecated
public static DeltaMetastoreTable prepareMetastoreTable(
CatalogBaseTable table,
String deltaTablePath) {
Expand Down Expand Up @@ -258,6 +262,42 @@ public static DeltaMetastoreTable prepareMetastoreTable(
);
}

/**
* Prepare catalog table to store in metastore. This table will have only selected
* options from DDL and an empty schema.
*/
public static ResolvedCatalogTable prepareResolvedMetastoreTable(
CatalogBaseTable table,
String deltaTablePath) {
// Store only path, table name and connector type in metastore.
// For computed and meta columns are not supported.
Map<String, String> optionsToStoreInMetastore = new HashMap<>();
optionsToStoreInMetastore.put(FactoryUtil.CONNECTOR.key(),
DeltaDynamicTableFactory.DELTA_CONNECTOR_IDENTIFIER);
optionsToStoreInMetastore.put(DeltaTableConnectorOptions.TABLE_PATH.key(),
deltaTablePath);

// Flink's Hive catalog calls CatalogTable::getSchema method (deprecated) and apply null
// check on the resul.
// The default implementation for this method returns null, and the DefaultCatalogTable
// returned by CatalogTable.of( ) does not override it,
// hence we need to have our own wrapper that will return empty TableSchema when
// getSchema method is called.
return new ResolvedCatalogTable(
CatalogTable.of(
// by design don't store schema in metastore. Also watermark and primary key will
// not be stored in metastore and for now it will not be supported by Delta
// connector SQL.
Schema.newBuilder().build(),
table.getComment(),
Collections.emptyList(),
optionsToStoreInMetastore
),
ResolvedSchema.of()
);
}


/**
* Validates DDL options against existing delta table properties. If there is any mismatch (i.e.
* same key, different value) and `allowOverride` is set to false throws an exception. Else,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.delta.flink.source.internal.DeltaSourceOptions;
import io.delta.flink.source.internal.builder.DeltaSourceBuilderBase;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.janino.util.Producer;
import org.codehaus.commons.compiler.util.Producer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down
Loading