partitionPaths =
+ PartitionPathUtils.generatePartitionPaths(
+ getPartitions(partitionStrings), fileStoreTable.store().partitionType());
+
+ markDone(partitionPaths, actions);
+
+ IOUtils.closeAllQuietly(actions);
+
+ return new String[] {"Success"};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
new file mode 100644
index 0000000000000..acda2afd2e697
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.MergeIntoAction;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * Merge Into procedure. Usage:
+ *
+ *
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * -- when matched then upsert
+ * CALL sys.merge_into(
+ * 'targetTableId',
+ * 'targetAlias',
+ * 'sourceSqls', -- separate with ';'
+ * 'sourceTable',
+ * 'mergeCondition',
+ * 'matchedUpsertCondition',
+ * 'matchedUpsertSetting'
+ * )
+ *
+ * -- when matched then upsert + when not matched then insert
+ * CALL sys.merge_into(
+ * 'targetTableId'
+ * 'targetAlias',
+ * 'sourceSqls',
+ * 'sourceTable',
+ * 'mergeCondition',
+ * 'matchedUpsertCondition',
+ * 'matchedUpsertSetting',
+ * 'notMatchedInsertCondition',
+ * 'notMatchedInsertValues'
+ * )
+ *
+ * -- above + when matched then delete
+ * -- IMPORTANT: Use 'TRUE' if you want to delete data without filter condition.
+ * -- If matchedDeleteCondition='', it will ignore matched-delete action!
+ * CALL sys.merge_into(
+ * 'targetTableId',
+ * 'targetAlias',
+ * 'sourceSqls',
+ * 'sourceTable',
+ * 'mergeCondition',
+ * 'matchedUpsertCondition',
+ * 'matchedUpsertSetting',
+ * 'notMatchedInsertCondition',
+ * 'notMatchedInsertValues',
+ * 'matchedDeleteCondition'
+ * )
+ *
+ * -- when matched then delete (short form)
+ * CALL sys.merge_into(
+ * 'targetTableId'
+ * 'targetAlias',
+ * 'sourceSqls',
+ * 'sourceTable',
+ * 'mergeCondition',
+ * 'matchedDeleteCondition'
+ * )
+ *
+ *
+ * This procedure will be forced to use batch environments. Compared to {@link MergeIntoAction},
+ * this procedure doesn't provide arguments to control not-matched-by-source behavior because they
+ * are not commonly used and will make the methods too complex to use.
+ */
+public class MergeIntoProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "merge_into";
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String matchedUpsertCondition,
+ String matchedUpsertSetting) {
+ return call(
+ procedureContext,
+ targetTableId,
+ targetAlias,
+ sourceSqls,
+ sourceTable,
+ mergeCondition,
+ matchedUpsertCondition,
+ matchedUpsertSetting,
+ "",
+ "",
+ "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String matchedUpsertCondition,
+ String matchedUpsertSetting,
+ String notMatchedInsertCondition,
+ String notMatchedInsertValues) {
+ return call(
+ procedureContext,
+ targetTableId,
+ targetAlias,
+ sourceSqls,
+ sourceTable,
+ mergeCondition,
+ matchedUpsertCondition,
+ matchedUpsertSetting,
+ notMatchedInsertCondition,
+ notMatchedInsertValues,
+ "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String matchedDeleteCondition) {
+ return call(
+ procedureContext,
+ targetTableId,
+ targetAlias,
+ sourceSqls,
+ sourceTable,
+ mergeCondition,
+ "",
+ "",
+ "",
+ "",
+ matchedDeleteCondition);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String matchedUpsertCondition,
+ String matchedUpsertSetting,
+ String notMatchedInsertCondition,
+ String notMatchedInsertValues,
+ String matchedDeleteCondition) {
+ String warehouse = catalog.warehouse();
+ Map catalogOptions = catalog.options();
+ Identifier identifier = Identifier.fromString(targetTableId);
+ MergeIntoAction action =
+ new MergeIntoAction(
+ warehouse,
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ catalogOptions);
+ action.withTargetAlias(nullable(targetAlias));
+
+ if (!sourceSqls.isEmpty()) {
+ action.withSourceSqls(sourceSqls.split(";"));
+ }
+
+ checkArgument(!sourceTable.isEmpty(), "Must specify source table.");
+ action.withSourceTable(sourceTable);
+
+ checkArgument(!mergeCondition.isEmpty(), "Must specify merge condition.");
+ action.withMergeCondition(mergeCondition);
+
+ if (!matchedUpsertCondition.isEmpty() || !matchedUpsertSetting.isEmpty()) {
+ String condition = nullable(matchedUpsertCondition);
+ String setting = nullable(matchedUpsertSetting);
+ checkNotNull(setting, "matched-upsert must set the 'matchedUpsertSetting' argument");
+ action.withMatchedUpsert(condition, setting);
+ }
+
+ if (!notMatchedInsertCondition.isEmpty() || !notMatchedInsertValues.isEmpty()) {
+ String condition = nullable(notMatchedInsertCondition);
+ String values = nullable(notMatchedInsertValues);
+ checkNotNull(
+ values, "not-matched-insert must set the 'notMatchedInsertValues' argument");
+ action.withNotMatchedInsert(condition, values);
+ }
+
+ if (!matchedDeleteCondition.isEmpty()) {
+ action.withMatchedDelete(matchedDeleteCondition);
+ }
+
+ action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
+ action.validate();
+
+ DataStream dataStream = action.buildDataStream();
+ TableResult tableResult = action.batchSink(dataStream);
+ JobClient jobClient = tableResult.getJobClient().get();
+
+ return execute(procedureContext, jobClient);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
new file mode 100644
index 0000000000000..128875a8b8627
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.List;
+
+/** Migrate procedure to migrate all hive tables in database to paimon table. */
+public class MigrateDatabaseProcedure extends ProcedureBase {
+
+ @Override
+ public String identifier() {
+ return "migrate_database";
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String connector, String sourceDatabasePath)
+ throws Exception {
+ return call(procedureContext, connector, sourceDatabasePath, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceDatabasePath,
+ String properties)
+ throws Exception {
+ List migrators =
+ TableMigrationUtils.getImporters(
+ connector,
+ catalog,
+ sourceDatabasePath,
+ ParameterUtils.parseCommaSeparatedKeyValues(properties));
+
+ for (Migrator migrator : migrators) {
+ migrator.executeMigrate();
+ migrator.renameTable(false);
+ }
+
+ return new String[] {"Success"};
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
new file mode 100644
index 0000000000000..110b4e25fc003
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Collections;
+
+/** Add file procedure to add file from hive to paimon. */
+public class MigrateFileProcedure extends ProcedureBase {
+
+ @Override
+ public String identifier() {
+ return "migrate_file";
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String targetPaimonTablePath)
+ throws Exception {
+ call(procedureContext, connector, sourceTablePath, targetPaimonTablePath, true);
+ return new String[] {"Success"};
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String targetPaimonTablePath,
+ boolean deleteOrigin)
+ throws Exception {
+ migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin);
+ return new String[] {"Success"};
+ }
+
+ public void migrateHandle(
+ String connector,
+ String sourceTablePath,
+ String targetPaimonTablePath,
+ boolean deleteOrigin)
+ throws Exception {
+ Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+ Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
+
+ if (!(catalog.tableExists(targetTableId))) {
+ throw new IllegalArgumentException(
+ "Target paimon table does not exist: " + targetPaimonTablePath);
+ }
+
+ Migrator importer =
+ TableMigrationUtils.getImporter(
+ connector,
+ catalog,
+ sourceTableId.getDatabaseName(),
+ sourceTableId.getObjectName(),
+ targetTableId.getDatabaseName(),
+ targetTableId.getObjectName(),
+ Collections.emptyMap());
+ importer.deleteOriginTable(deleteOrigin);
+ importer.executeMigrate();
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
new file mode 100644
index 0000000000000..39e6092d84960
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Migrate procedure to migrate hive table to paimon table. */
+public class MigrateTableProcedure extends ProcedureBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MigrateTableProcedure.class);
+
+ private static final String PAIMON_SUFFIX = "_paimon_";
+
+ @Override
+ public String identifier() {
+ return "migrate_table";
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String connector, String sourceTablePath)
+ throws Exception {
+ return call(procedureContext, connector, sourceTablePath, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String properties)
+ throws Exception {
+ String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+
+ Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+ Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
+
+ TableMigrationUtils.getImporter(
+ connector,
+ catalog,
+ sourceTableId.getDatabaseName(),
+ sourceTableId.getObjectName(),
+ targetTableId.getDatabaseName(),
+ targetTableId.getObjectName(),
+ ParameterUtils.parseCommaSeparatedKeyValues(properties))
+ .executeMigrate();
+
+ LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
+ catalog.renameTable(targetTableId, sourceTableId, false);
+ return new String[] {"Success"};
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
new file mode 100644
index 0000000000000..d43056f9779e4
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.operation.OrphanFilesClean;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.List;
+
+import static org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
+
+/**
+ * Remove orphan files procedure. Usage:
+ *
+ *
+ * -- use the default file delete interval
+ * CALL sys.remove_orphan_files('tableId')
+ *
+ * -- use custom file delete interval
+ * CALL sys.remove_orphan_files('tableId', '2023-12-31 23:59:59')
+ *
+ * -- remove all tables' orphan files in db
+ * CALL sys.remove_orphan_files('databaseName.*', '2023-12-31 23:59:59')
+ *
+ */
+public class RemoveOrphanFilesProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "remove_orphan_files";
+
+ public String[] call(ProcedureContext procedureContext, String tableId) throws Exception {
+ return call(procedureContext, tableId, "");
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId, String olderThan)
+ throws Exception {
+ return call(procedureContext, tableId, olderThan, false);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String olderThan, boolean dryRun)
+ throws Exception {
+ Identifier identifier = Identifier.fromString(tableId);
+ String databaseName = identifier.getDatabaseName();
+ String tableName = identifier.getObjectName();
+
+ List tableCleans =
+ OrphanFilesClean.createOrphanFilesCleans(catalog, databaseName, tableName);
+
+ if (!StringUtils.isBlank(olderThan)) {
+ tableCleans.forEach(clean -> clean.olderThan(olderThan));
+ }
+
+ if (dryRun) {
+ tableCleans.forEach(clean -> clean.fileCleaner(path -> {}));
+ }
+
+ return executeOrphanFilesClean(tableCleans);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
new file mode 100644
index 0000000000000..0355d6dc1cab5
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Reset consumer procedure. Usage:
+ *
+ *
+ * -- reset the new next snapshot id in the consumer
+ * CALL sys.reset_consumer('tableId', 'consumerId', nextSnapshotId)
+ *
+ * -- delete consumer
+ * CALL sys.reset_consumer('tableId', 'consumerId')
+ *
+ */
+public class ResetConsumerProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "reset_consumer";
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String consumerId,
+ long nextSnapshotId)
+ throws Catalog.TableNotExistException {
+ FileStoreTable fileStoreTable =
+ (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
+ ConsumerManager consumerManager =
+ new ConsumerManager(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location(),
+ fileStoreTable.snapshotManager().branch());
+ consumerManager.resetConsumer(consumerId, new Consumer(nextSnapshotId));
+
+ return new String[] {"Success"};
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId, String consumerId)
+ throws Catalog.TableNotExistException {
+ FileStoreTable fileStoreTable =
+ (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
+ ConsumerManager consumerManager =
+ new ConsumerManager(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location(),
+ fileStoreTable.snapshotManager().branch());
+ consumerManager.deleteConsumer(consumerId);
+
+ return new String[] {"Success"};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java
new file mode 100644
index 0000000000000..29ae1b25b57a1
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
+import org.apache.paimon.flink.sink.RewriteFileIndexSink;
+import org.apache.paimon.flink.source.RewriteFileIndexSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.ParameterUtils.getPartitions;
+
+/** Rewrite file index procedure to re-generated all file index. */
+public class RewriteFileIndexProcedure extends ProcedureBase {
+
+ @Override
+ public String identifier() {
+ return "rewrite_file_index";
+ }
+
+ public String[] call(ProcedureContext procedureContext, String sourceTablePath)
+ throws Exception {
+ return call(procedureContext, sourceTablePath, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String sourceTablePath, String partitions)
+ throws Exception {
+
+ StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
+ Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
+
+ List