Skip to content

Commit

Permalink
[FLINK-36484] [table-api] Remove deprecated methods StreamTableEnviro…
Browse files Browse the repository at this point in the history
…nment#registerFunction (#25529)

* feat: remove deprecated function

* fix: remove-deprecated-methods

* fix: remove-deprecated-methods

---------

Co-authored-by: Ammu Parvathy <[email protected]>
  • Loading branch information
ammu20-dev and Ammu Parvathy authored Jan 3, 2025
1 parent fafd86c commit 4263ad3
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -122,56 +118,6 @@ static StreamTableEnvironment create(
return StreamTableEnvironmentImpl.create(executionEnvironment, settings);
}

/**
* Registers a {@link TableFunction} under a unique name in the TableEnvironment's catalog.
* Registered functions can be referenced in Table API and SQL queries.
*
* @param name The name under which the function is registered.
* @param tableFunction The TableFunction to register.
* @param <T> The type of the output row.
* @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead.
* Please note that the new method also uses the new type system and reflective extraction
* logic. It might be necessary to update the function implementation as well. See the
* documentation of {@link TableFunction} for more information on the new function design.
*/
@Deprecated
<T> void registerFunction(String name, TableFunction<T> tableFunction);

/**
* Registers an {@link AggregateFunction} under a unique name in the TableEnvironment's catalog.
* Registered functions can be referenced in Table API and SQL queries.
*
* @param name The name under which the function is registered.
* @param aggregateFunction The AggregateFunction to register.
* @param <T> The type of the output value.
* @param <ACC> The type of aggregate accumulator.
* @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead.
* Please note that the new method also uses the new type system and reflective extraction
* logic. It might be necessary to update the function implementation as well. See the
* documentation of {@link AggregateFunction} for more information on the new function
* design.
*/
@Deprecated
<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);

/**
* Registers an {@link TableAggregateFunction} under a unique name in the TableEnvironment's
* catalog. Registered functions can only be referenced in Table API.
*
* @param name The name under which the function is registered.
* @param tableAggregateFunction The TableAggregateFunction to register.
* @param <T> The type of the output value.
* @param <ACC> The type of aggregate accumulator.
* @deprecated Use {@link #createTemporarySystemFunction(String, UserDefinedFunction)} instead.
* Please note that the new method also uses the new type system and reflective extraction
* logic. It might be necessary to update the function implementation as well. See the
* documentation of {@link TableAggregateFunction} for more information on the new function
* design.
*/
@Deprecated
<T, ACC> void registerFunction(
String name, TableAggregateFunction<T, ACC> tableAggregateFunction);

/**
* Converts the given {@link DataStream} into a {@link Table}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExternalQueryOperation;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
Expand Down Expand Up @@ -173,39 +169,6 @@ public static StreamTableEnvironment create(
settings.isStreamingMode());
}

@Override
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo =
UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);

functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
}

@Override
public <T, ACC> void registerFunction(
String name, AggregateFunction<T, ACC> aggregateFunction) {
TypeInformation<T> typeInfo =
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
TypeInformation<ACC> accTypeInfo =
UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);

functionCatalog.registerTempSystemAggregateFunction(
name, aggregateFunction, typeInfo, accTypeInfo);
}

@Override
public <T, ACC> void registerFunction(
String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
TypeInformation<T> typeInfo =
UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
TypeInformation<ACC> accTypeInfo =
UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(
tableAggregateFunction);

functionCatalog.registerTempSystemAggregateFunction(
name, tableAggregateFunction, typeInfo, accTypeInfo);
}

@Override
public <T> Table fromDataStream(DataStream<T> dataStream) {
return fromStreamInternal(dataStream, null, null, ChangelogMode.insertOnly());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,72 +60,6 @@ import org.apache.flink.types.{Row, RowKind}
@PublicEvolving
trait StreamTableEnvironment extends TableEnvironment {

/**
* Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog. Registered
* functions can be referenced in SQL queries.
*
* @param name
* The name under which the function is registered.
* @param tf
* The TableFunction to register
*
* @deprecated
* Use [[createTemporarySystemFunction(String, UserDefinedFunction)]] instead. Please note that
* the new method also uses the new type system and reflective extraction logic. It might be
* necessary to update the function implementation as well. See the documentation of
* [[TableFunction]] for more information on the new function design.
*/
@deprecated
def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit

/**
* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog.
* Registered functions can be referenced in Table API and SQL queries.
*
* @param name
* The name under which the function is registered.
* @param f
* The AggregateFunction to register.
* @tparam T
* The type of the output value.
* @tparam ACC
* The type of aggregate accumulator.
*
* @deprecated
* Use [[createTemporarySystemFunction(String, UserDefinedFunction)]] instead. Please note that
* the new method also uses the new type system and reflective extraction logic. It might be
* necessary to update the function implementation as well. See the documentation of
* [[AggregateFunction]] for more information on the new function design.
*/
@deprecated
def registerFunction[T: TypeInformation, ACC: TypeInformation](
name: String,
f: AggregateFunction[T, ACC]): Unit

/**
* Registers an [[TableAggregateFunction]] under a unique name in the TableEnvironment's catalog.
* Registered functions can only be referenced in Table API.
*
* @param name
* The name under which the function is registered.
* @param f
* The TableAggregateFunction to register.
* @tparam T
* The type of the output value.
* @tparam ACC
* The type of aggregate accumulator.
*
* @deprecated
* Use [[createTemporarySystemFunction(String, UserDefinedFunction)]] instead. Please note that
* the new method also uses the new type system and reflective extraction logic. It might be
* necessary to update the function implementation as well. See the documentation of
* [[TableAggregateFunction]] for more information on the new function design.
*/
@deprecated
def registerFunction[T: TypeInformation, ACC: TypeInformation](
name: String,
f: TableAggregateFunction[T, ACC]): Unit

/**
* Converts the given [[DataStream]] into a [[Table]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,46 +224,6 @@ class StreamTableEnvironmentImpl(
toStreamInternal(table, modifyOperation)
}

override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
val typeInfo = UserDefinedFunctionHelper
.getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
functionCatalog.registerTempSystemTableFunction(
name,
tf,
typeInfo
)
}

override def registerFunction[T: TypeInformation, ACC: TypeInformation](
name: String,
f: AggregateFunction[T, ACC]): Unit = {
val typeInfo = UserDefinedFunctionHelper
.getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
val accTypeInfo = UserDefinedFunctionHelper
.getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
functionCatalog.registerTempSystemAggregateFunction(
name,
f,
typeInfo,
accTypeInfo
)
}

override def registerFunction[T: TypeInformation, ACC: TypeInformation](
name: String,
f: TableAggregateFunction[T, ACC]): Unit = {
val typeInfo = UserDefinedFunctionHelper
.getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
val accTypeInfo = UserDefinedFunctionHelper
.getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
functionCatalog.registerTempSystemAggregateFunction(
name,
f,
typeInfo,
accTypeInfo
)
}

override def createTemporaryView[T](
path: String,
dataStream: DataStream[T],
Expand Down

0 comments on commit 4263ad3

Please sign in to comment.