diff --git a/api/src/main/java/org/apache/gravitino/model/ModelCatalog.java b/api/src/main/java/org/apache/gravitino/model/ModelCatalog.java index d7429fd02fc..cea2e94e3c7 100644 --- a/api/src/main/java/org/apache/gravitino/model/ModelCatalog.java +++ b/api/src/main/java/org/apache/gravitino/model/ModelCatalog.java @@ -79,12 +79,11 @@ default boolean modelExists(NameIdentifier ident) { * @param properties The properties of the model. The properties are optional and can be null or * empty. * @return The registered model object. + * @throws NoSuchSchemaException If the schema does not exist. * @throws ModelAlreadyExistsException If the model already registered. */ - default Model registerModel(NameIdentifier ident, String comment, Map properties) - throws ModelAlreadyExistsException { - return registerModel(ident, null, new String[0], comment, properties); - } + Model registerModel(NameIdentifier ident, String comment, Map properties) + throws NoSuchSchemaException, ModelAlreadyExistsException; /** * Register a model in the catalog if the model is not existed, otherwise the {@link @@ -99,16 +98,22 @@ default Model registerModel(NameIdentifier ident, String comment, Map properties) - throws ModelAlreadyExistsException, ModelVersionAliasesAlreadyExistException; + throws NoSuchSchemaException, ModelAlreadyExistsException, + ModelVersionAliasesAlreadyExistException { + Model model = registerModel(ident, comment, properties); + linkModelVersion(ident, uri, aliases, comment, properties); + return model; + } /** * Delete the model from the catalog. If the model does not exist, return false. Otherwise, return @@ -197,11 +202,10 @@ default boolean modelVersionExists(NameIdentifier ident, String alias) { * @param comment The comment of the model version. The comment is optional and can be null. * @param properties The properties of the model version. The properties are optional and can be * null or empty. - * @return The model version object. * @throws NoSuchModelException If the model does not exist. * @throws ModelVersionAliasesAlreadyExistException If the aliases already exist in the model. */ - ModelVersion linkModelVersion( + void linkModelVersion( NameIdentifier ident, String uri, String[] aliases, diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java index 21775038b38..36177bea37f 100644 --- a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -44,12 +44,12 @@ import org.apache.gravitino.audit.CallerContext; import org.apache.gravitino.audit.FilesetAuditConstants; import org.apache.gravitino.audit.FilesetDataOperation; +import org.apache.gravitino.catalog.ManagedSchemaOperations; import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; import org.apache.gravitino.connector.CatalogInfo; import org.apache.gravitino.connector.CatalogOperations; import org.apache.gravitino.connector.HasPropertyMetadata; -import org.apache.gravitino.connector.SupportsSchemas; import org.apache.gravitino.exceptions.AlreadyExistsException; import org.apache.gravitino.exceptions.FilesetAlreadyExistsException; import org.apache.gravitino.exceptions.GravitinoRuntimeException; @@ -74,7 +74,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HadoopCatalogOperations implements CatalogOperations, SupportsSchemas, FilesetCatalog { +public class HadoopCatalogOperations extends ManagedSchemaOperations + implements CatalogOperations, FilesetCatalog { private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist"; private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist"; private static final String SLASH = "/"; @@ -104,7 +105,8 @@ public HadoopCatalogOperations() { this(GravitinoEnv.getInstance().entityStore()); } - public EntityStore getStore() { + @Override + public EntityStore store() { return store; } @@ -451,19 +453,6 @@ public String getFileLocation(NameIdentifier ident, String subPath) return fileLocation; } - @Override - public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { - try { - List schemas = - store.list(namespace, SchemaEntity.class, Entity.EntityType.SCHEMA); - return schemas.stream() - .map(s -> NameIdentifier.of(namespace, s.name())) - .toArray(NameIdentifier[]::new); - } catch (IOException e) { - throw new RuntimeException("Failed to list schemas under namespace " + namespace, e); - } - } - @Override public Schema createSchema(NameIdentifier ident, String comment, Map properties) throws NoSuchCatalogException, SchemaAlreadyExistsException { @@ -496,53 +485,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map updateSchemaEntity(ident, schemaEntity, changes)); - - return HadoopSchema.builder() - .withName(ident.name()) - .withComment(entity.comment()) - .withProperties(entity.properties()) - .withAuditInfo(entity.auditInfo()) - .build(); - - } catch (IOException ioe) { - throw new RuntimeException("Failed to update schema " + ident, ioe); - } catch (NoSuchEntityException nsee) { - throw new NoSuchSchemaException(nsee, SCHEMA_DOES_NOT_EXIST_MSG, ident); - } catch (AlreadyExistsException aee) { - throw new RuntimeException( - "Schema with the same name " - + ident.name() - + " already exists, this is unexpected because schema doesn't support rename", - aee); - } + return super.alterSchema(ident, changes); } @Override @@ -600,6 +518,16 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty throw new NonEmptySchemaException("Schema %s is not empty", ident); } + SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class); + Map properties = + Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap()); + Path schemaPath = getSchemaPath(ident.name(), properties); + + boolean dropped = super.dropSchema(ident, cascade); + if (!dropped) { + return false; + } + // Delete all the managed filesets no matter whether the storage location is under the // schema path or not. // The reason why we delete the managed fileset's storage location one by one is because we @@ -635,30 +563,21 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty } }); - SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class); - Map properties = - Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap()); - // Delete the schema path if it exists and is empty. - Path schemaPath = getSchemaPath(ident.name(), properties); - // Nothing to delete if the schema path is not set. - if (schemaPath == null) { - return false; - } - - FileSystem fs = getFileSystem(schemaPath, conf); - // Nothing to delete if the schema path does not exist. - if (!fs.exists(schemaPath)) { - return false; - } - - FileStatus[] statuses = fs.listStatus(schemaPath); - if (statuses.length == 0) { - if (fs.delete(schemaPath, true)) { - LOG.info("Deleted schema {} location {}", ident, schemaPath); - } else { - LOG.warn("Failed to delete schema {} location {}", ident, schemaPath); - return false; + if (schemaPath != null) { + FileSystem fs = getFileSystem(schemaPath, conf); + if (fs.exists(schemaPath)) { + FileStatus[] statuses = fs.listStatus(schemaPath); + if (statuses.length == 0) { + if (fs.delete(schemaPath, true)) { + LOG.info("Deleted schema {} location {}", ident, schemaPath); + } else { + LOG.warn( + "Failed to delete schema {} because it has files/folders under location {}", + ident, + schemaPath); + } + } } } diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java index 9d21a1782a3..2180e45d423 100644 --- a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java @@ -107,7 +107,7 @@ public boolean dropFileset(NameIdentifier ident) { try { filesetEntity = hadoopCatalogOperations - .getStore() + .store() .get(ident, Entity.EntityType.FILESET, FilesetEntity.class); } catch (NoSuchEntityException e) { LOG.warn("Fileset {} does not exist", ident); @@ -143,9 +143,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map properties = Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap()); diff --git a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java index 9e4881432df..1a3e49b5499 100644 --- a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java @@ -446,6 +446,7 @@ public void testDropSchema() throws IOException { Assertions.assertFalse(fs.exists(schemaPath)); // Test drop non-empty schema with cascade = false + createSchema(name, comment, catalogPath, null); Fileset fs1 = createFileset("fs1", name, "comment", Fileset.Type.MANAGED, catalogPath, null); Path fs1Path = new Path(fs1.storageLocation()); @@ -459,6 +460,7 @@ public void testDropSchema() throws IOException { Assertions.assertFalse(fs.exists(fs1Path)); // Test drop both managed and external filesets + createSchema(name, comment, catalogPath, null); Fileset fs2 = createFileset("fs2", name, "comment", Fileset.Type.MANAGED, catalogPath, null); Path fs2Path = new Path(fs2.storageLocation()); @@ -472,6 +474,7 @@ public void testDropSchema() throws IOException { Assertions.assertTrue(fs.exists(fs3Path)); // Test drop schema with different storage location + createSchema(name, comment, catalogPath, null); Path fs4Path = new Path(TEST_ROOT_PATH + "/fs4"); createFileset("fs4", name, "comment", Fileset.Type.MANAGED, catalogPath, fs4Path.toString()); ops.dropSchema(id, true); diff --git a/catalogs/catalog-model/build.gradle.kts b/catalogs/catalog-model/build.gradle.kts index 33f8413a3b4..95af305fcae 100644 --- a/catalogs/catalog-model/build.gradle.kts +++ b/catalogs/catalog-model/build.gradle.kts @@ -40,11 +40,17 @@ dependencies { exclude(group = "*") } - compileOnly(libs.guava) - + implementation(libs.guava) implementation(libs.slf4j.api) + testImplementation(project(":clients:client-java")) + testImplementation(project(":integration-test-common", "testArtifacts")) + testImplementation(project(":server")) + testImplementation(project(":server-common")) + testImplementation(libs.bundles.log4j) + testImplementation(libs.commons.io) + testImplementation(libs.commons.lang3) testImplementation(libs.mockito.core) testImplementation(libs.mockito.inline) testImplementation(libs.junit.jupiter.api) diff --git a/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalog.java b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogImpl.java similarity index 89% rename from catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalog.java rename to catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogImpl.java index 51951d44076..5b90eab7265 100644 --- a/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalog.java +++ b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogImpl.java @@ -20,12 +20,14 @@ import java.util.Map; import org.apache.gravitino.CatalogProvider; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.connector.BaseCatalog; import org.apache.gravitino.connector.CatalogOperations; import org.apache.gravitino.connector.PropertiesMetadata; import org.apache.gravitino.connector.capability.Capability; -public class ModelCatalog extends BaseCatalog { +public class ModelCatalogImpl extends BaseCatalog { private static final ModelCatalogPropertiesMetadata CATALOG_PROPERTIES_META = new ModelCatalogPropertiesMetadata(); @@ -43,7 +45,8 @@ public String shortName() { @Override protected CatalogOperations newOps(Map config) { - return null; + EntityStore store = GravitinoEnv.getInstance().entityStore(); + return new ModelCatalogOperations(store); } @Override diff --git a/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogOperations.java b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogOperations.java new file mode 100644 index 00000000000..7683180f784 --- /dev/null +++ b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogOperations.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.model; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.ManagedSchemaOperations; +import org.apache.gravitino.connector.CatalogInfo; +import org.apache.gravitino.connector.CatalogOperations; +import org.apache.gravitino.connector.HasPropertyMetadata; +import org.apache.gravitino.exceptions.ModelAlreadyExistsException; +import org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.exceptions.NoSuchModelException; +import org.apache.gravitino.exceptions.NoSuchModelVersionException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.ModelEntity; +import org.apache.gravitino.meta.ModelVersionEntity; +import org.apache.gravitino.model.Model; +import org.apache.gravitino.model.ModelCatalog; +import org.apache.gravitino.model.ModelVersion; +import org.apache.gravitino.utils.NameIdentifierUtil; +import org.apache.gravitino.utils.NamespaceUtil; +import org.apache.gravitino.utils.PrincipalUtils; + +public class ModelCatalogOperations extends ManagedSchemaOperations + implements CatalogOperations, ModelCatalog { + + private static final int INIT_VERSION = 0; + + private final EntityStore store; + + public ModelCatalogOperations(EntityStore store) { + this.store = store; + } + + @Override + public void initialize( + Map config, CatalogInfo info, HasPropertyMetadata propertiesMetadata) + throws RuntimeException {} + + @Override + public void close() throws IOException {} + + @Override + public void testConnection( + NameIdentifier catalogIdent, + Catalog.Type type, + String provider, + String comment, + Map properties) { + // No-op for model catalog. + } + + @Override + protected EntityStore store() { + return store; + } + + @Override + public NameIdentifier[] listModels(Namespace namespace) throws NoSuchSchemaException { + NamespaceUtil.checkModel(namespace); + + try { + List models = store.list(namespace, ModelEntity.class, Entity.EntityType.MODEL); + return models.stream() + .map(m -> NameIdentifier.of(namespace, m.name())) + .toArray(NameIdentifier[]::new); + + } catch (NoSuchEntityException e) { + throw new NoSuchSchemaException(e, "Schema %s does not exist", namespace); + } catch (IOException ioe) { + throw new RuntimeException("Failed to list models under namespace " + namespace, ioe); + } + } + + @Override + public Model getModel(NameIdentifier ident) throws NoSuchModelException { + NameIdentifierUtil.checkModel(ident); + + try { + ModelEntity model = store.get(ident, Entity.EntityType.MODEL, ModelEntity.class); + return toModelImpl(model); + + } catch (NoSuchEntityException e) { + throw new NoSuchModelException(e, "Model %s does not exist", ident); + } catch (IOException ioe) { + throw new RuntimeException("Failed to get model " + ident, ioe); + } + } + + @Override + public Model registerModel(NameIdentifier ident, String comment, Map properties) + throws ModelAlreadyExistsException { + NameIdentifierUtil.checkModel(ident); + + StringIdentifier stringId = StringIdentifier.fromProperties(properties); + Preconditions.checkArgument(stringId != null, "Property string identifier should not be null"); + + ModelEntity model = + ModelEntity.builder() + .withId(stringId.id()) + .withName(ident.name()) + .withNamespace(ident.namespace()) + .withComment(comment) + .withProperties(properties) + .withLatestVersion(INIT_VERSION) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(model, false /* overwrite */); + } catch (IOException e) { + throw new RuntimeException("Failed to register model " + ident, e); + } catch (EntityAlreadyExistsException e) { + throw new ModelAlreadyExistsException(e, "Model %s already exists", ident); + } catch (NoSuchEntityException e) { + throw new NoSuchSchemaException(e, "Schema %s does not exist", ident.namespace()); + } + + return toModelImpl(model); + } + + @Override + public boolean deleteModel(NameIdentifier ident) { + NameIdentifierUtil.checkModel(ident); + + try { + return store.delete(ident, Entity.EntityType.MODEL); + } catch (IOException ioe) { + throw new RuntimeException("Failed to delete model " + ident, ioe); + } + } + + @Override + public int[] listModelVersions(NameIdentifier ident) throws NoSuchModelException { + NameIdentifierUtil.checkModel(ident); + Namespace modelVersionNs = NamespaceUtil.toModelVersionNs(ident); + + try { + List versions = + store.list(modelVersionNs, ModelVersionEntity.class, Entity.EntityType.MODEL_VERSION); + return versions.stream().mapToInt(ModelVersionEntity::version).toArray(); + + } catch (NoSuchEntityException e) { + throw new NoSuchModelException(e, "Model %s does not exist", ident); + } catch (IOException ioe) { + throw new RuntimeException("Failed to list model versions for model " + ident, ioe); + } + } + + @Override + public ModelVersion getModelVersion(NameIdentifier ident, int version) + throws NoSuchModelVersionException { + NameIdentifierUtil.checkModel(ident); + NameIdentifier modelVersionIdent = NameIdentifierUtil.toModelVersionIdentifier(ident, version); + + return internalGetModelVersion(modelVersionIdent); + } + + @Override + public ModelVersion getModelVersion(NameIdentifier ident, String alias) + throws NoSuchModelVersionException { + NameIdentifierUtil.checkModel(ident); + NameIdentifier modelVersionIdent = NameIdentifierUtil.toModelVersionIdentifier(ident, alias); + + return internalGetModelVersion(modelVersionIdent); + } + + @Override + public void linkModelVersion( + NameIdentifier ident, + String uri, + String[] aliases, + String comment, + Map properties) + throws NoSuchModelException, ModelVersionAliasesAlreadyExistException { + NameIdentifierUtil.checkModel(ident); + + StringIdentifier stringId = StringIdentifier.fromProperties(properties); + Preconditions.checkArgument(stringId != null, "Property string identifier should not be null"); + + List aliasList = aliases == null ? Lists.newArrayList() : Lists.newArrayList(aliases); + ModelVersionEntity modelVersion = + ModelVersionEntity.builder() + .withModelIdentifier(ident) + // This version is just a placeholder, it will not be used in the actual model version + // insert operation, the version will be updated to the latest version of the model when + // executing the insert operation. + .withVersion(INIT_VERSION) + .withAliases(aliasList) + .withUri(uri) + .withComment(comment) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(modelVersion, false /* overwrite */); + } catch (IOException e) { + throw new RuntimeException("Failed to link model version " + ident, e); + } catch (EntityAlreadyExistsException e) { + throw new ModelVersionAliasesAlreadyExistException( + e, "Model version aliases %s already exist", ident); + } catch (NoSuchEntityException e) { + throw new NoSuchModelException(e, "Model %s does not exist", ident); + } + } + + @Override + public boolean deleteModelVersion(NameIdentifier ident, int version) { + NameIdentifierUtil.checkModel(ident); + NameIdentifier modelVersionIdent = NameIdentifierUtil.toModelVersionIdentifier(ident, version); + + return internalDeleteModelVersion(modelVersionIdent); + } + + @Override + public boolean deleteModelVersion(NameIdentifier ident, String alias) { + NameIdentifierUtil.checkModel(ident); + NameIdentifier modelVersionIdent = NameIdentifierUtil.toModelVersionIdentifier(ident, alias); + + return internalDeleteModelVersion(modelVersionIdent); + } + + private ModelImpl toModelImpl(ModelEntity model) { + return ModelImpl.builder() + .withName(model.name()) + .withComment(model.comment()) + .withProperties(model.properties()) + .withLatestVersion(model.latestVersion()) + .withAuditInfo(model.auditInfo()) + .build(); + } + + private ModelVersionImpl toModelVersionImpl(ModelVersionEntity modelVersion) { + return ModelVersionImpl.builder() + .withVersion(modelVersion.version()) + .withAliases(modelVersion.aliases().toArray(new String[0])) + .withUri(modelVersion.uri()) + .withComment(modelVersion.comment()) + .withProperties(modelVersion.properties()) + .withAuditInfo(modelVersion.auditInfo()) + .build(); + } + + private ModelVersion internalGetModelVersion(NameIdentifier ident) { + try { + ModelVersionEntity modelVersion = + store.get(ident, Entity.EntityType.MODEL_VERSION, ModelVersionEntity.class); + return toModelVersionImpl(modelVersion); + + } catch (NoSuchEntityException e) { + throw new NoSuchModelVersionException(e, "Model version %s does not exist", ident); + } catch (IOException ioe) { + throw new RuntimeException("Failed to get model version " + ident, ioe); + } + } + + private boolean internalDeleteModelVersion(NameIdentifier ident) { + try { + return store.delete(ident, Entity.EntityType.MODEL_VERSION); + } catch (IOException ioe) { + throw new RuntimeException("Failed to delete model version " + ident, ioe); + } + } +} diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchema.java b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelImpl.java similarity index 60% rename from catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchema.java rename to catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelImpl.java index 65f0b607f28..51ef09edb78 100644 --- a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopSchema.java +++ b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelImpl.java @@ -16,32 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.gravitino.catalog.hadoop; +package org.apache.gravitino.catalog.model; -import org.apache.gravitino.connector.BaseSchema; +import org.apache.gravitino.connector.BaseModel; -public class HadoopSchema extends BaseSchema { +public class ModelImpl extends BaseModel { + + public static class Builder extends BaseModelBuilder { - public static class Builder extends BaseSchemaBuilder { - /** Creates a new instance of {@link Builder}. */ private Builder() {} @Override - protected HadoopSchema internalBuild() { - HadoopSchema schema = new HadoopSchema(); - schema.name = name; - schema.comment = comment; - schema.properties = properties; - schema.auditInfo = auditInfo; - return schema; + protected ModelImpl internalBuild() { + ModelImpl model = new ModelImpl(); + model.name = name; + model.comment = comment; + model.properties = properties; + model.latestVersion = latestVersion; + model.auditInfo = auditInfo; + return model; } } - /** - * Creates a new instance of {@link Builder}. - * - * @return The new instance. - */ public static Builder builder() { return new Builder(); } diff --git a/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelVersionImpl.java b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelVersionImpl.java new file mode 100644 index 00000000000..cff72c06ec1 --- /dev/null +++ b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelVersionImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.model; + +import org.apache.gravitino.connector.BaseModelVersion; + +public class ModelVersionImpl extends BaseModelVersion { + + public static class Builder extends BaseModelVersionBuilder { + + private Builder() {} + + @Override + protected ModelVersionImpl internalBuild() { + ModelVersionImpl modelVersion = new ModelVersionImpl(); + modelVersion.version = version; + modelVersion.comment = comment; + modelVersion.aliases = aliases; + modelVersion.uri = uri; + modelVersion.properties = properties; + modelVersion.auditInfo = auditInfo; + return modelVersion; + } + } + + public static Builder builder() { + return new Builder(); + } +} diff --git a/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java b/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java new file mode 100644 index 00000000000..acbaeb30a46 --- /dev/null +++ b/catalogs/catalog-model/src/test/java/org/apache/gravtitino/catalog/model/TestModelCatalogOperations.java @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravtitino.catalog.model; + +import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PATH; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER; +import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_STORE; +import static org.apache.gravitino.Configs.ENTITY_STORE; +import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE; +import static org.apache.gravitino.Configs.STORE_DELETE_AFTER_TIME; +import static org.apache.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME; +import static org.apache.gravitino.Configs.VERSION_RETENTION_COUNT; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.Config; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.EntityStoreFactory; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.model.ModelCatalogOperations; +import org.apache.gravitino.connector.CatalogInfo; +import org.apache.gravitino.connector.HasPropertyMetadata; +import org.apache.gravitino.exceptions.ModelAlreadyExistsException; +import org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException; +import org.apache.gravitino.exceptions.NoSuchCatalogException; +import org.apache.gravitino.exceptions.NoSuchModelException; +import org.apache.gravitino.exceptions.NoSuchModelVersionException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.BaseMetalake; +import org.apache.gravitino.meta.CatalogEntity; +import org.apache.gravitino.meta.SchemaVersion; +import org.apache.gravitino.model.Model; +import org.apache.gravitino.model.ModelVersion; +import org.apache.gravitino.storage.IdGenerator; +import org.apache.gravitino.storage.RandomIdGenerator; +import org.apache.gravitino.utils.NameIdentifierUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestModelCatalogOperations { + + private static final String STORE_PATH = + "/tmp/gravitino_test_entityStore_" + UUID.randomUUID().toString().replace("-", ""); + + private static final String METALAKE_NAME = "metalake_for_model_meta_test"; + + private static final String CATALOG_NAME = "catalog_for_model_meta_test"; + + private static EntityStore store; + + private static IdGenerator idGenerator; + + private static ModelCatalogOperations ops; + + @BeforeAll + public static void setUp() throws IOException { + Config config = Mockito.mock(Config.class); + when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE); + when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PATH)).thenReturn(STORE_PATH); + + // The following properties are used to create the JDBC connection; they are just for test, in + // the real world, + // they will be set automatically by the configuration file if you set ENTITY_RELATIONAL_STORE + // as EMBEDDED_ENTITY_RELATIONAL_STORE. + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL)) + .thenReturn(String.format("jdbc:h2:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", STORE_PATH)); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("gravitino"); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("gravitino"); + when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver"); + + when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L); + when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L); + when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); + + store = EntityStoreFactory.createEntityStore(config); + store.initialize(config); + idGenerator = new RandomIdGenerator(); + + // Create the metalake and catalog + AuditInfo auditInfo = + AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build(); + BaseMetalake metalake = + BaseMetalake.builder() + .withId(idGenerator.nextId()) + .withName(METALAKE_NAME) + .withVersion(SchemaVersion.V_0_1) + .withAuditInfo(auditInfo) + .withName(METALAKE_NAME) + .build(); + store.put(metalake, false); + + CatalogEntity catalog = + CatalogEntity.builder() + .withId(idGenerator.nextId()) + .withName(CATALOG_NAME) + .withNamespace(Namespace.of(METALAKE_NAME)) + .withProvider("model") + .withType(Catalog.Type.MODEL) + .withAuditInfo(auditInfo) + .build(); + store.put(catalog, false); + + ops = new ModelCatalogOperations(store); + ops.initialize( + Collections.emptyMap(), + Mockito.mock(CatalogInfo.class), + Mockito.mock(HasPropertyMetadata.class)); + } + + @AfterAll + public static void tearDown() throws IOException { + ops.close(); + store.close(); + FileUtils.deleteDirectory(new File(STORE_PATH)); + } + + @Test + public void testSchemaOperations() { + String schemaName = randomSchemaName(); + NameIdentifier schemaIdent = + NameIdentifierUtil.ofSchema(METALAKE_NAME, CATALOG_NAME, schemaName); + StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId()); + Map properties = StringIdentifier.newPropertiesWithId(stringId, null); + + ops.createSchema(schemaIdent, "schema comment", properties); + Schema loadedSchema = ops.loadSchema(schemaIdent); + + Assertions.assertEquals(schemaName, loadedSchema.name()); + Assertions.assertEquals("schema comment", loadedSchema.comment()); + Assertions.assertEquals(properties, loadedSchema.properties()); + + // Test create schema with the same name + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> ops.createSchema(schemaIdent, "schema comment", properties)); + + // Test create schema in a non-existent catalog + Assertions.assertThrows( + NoSuchCatalogException.class, + () -> + ops.createSchema( + NameIdentifierUtil.ofSchema(METALAKE_NAME, "non-existent-catalog", schemaName), + "schema comment", + properties)); + + // Test load a non-existent schema + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> + ops.loadSchema( + NameIdentifierUtil.ofSchema(METALAKE_NAME, CATALOG_NAME, "non-existent-schema"))); + + // Test load a non-existent schema in a non-existent catalog + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> + ops.loadSchema( + NameIdentifierUtil.ofSchema( + METALAKE_NAME, "non-existent-catalog", "non-existent-schema"))); + + // Create another schema + String schemaName2 = randomSchemaName(); + NameIdentifier schemaIdent2 = + NameIdentifierUtil.ofSchema(METALAKE_NAME, CATALOG_NAME, schemaName2); + StringIdentifier stringId2 = StringIdentifier.fromId(idGenerator.nextId()); + Map properties2 = StringIdentifier.newPropertiesWithId(stringId2, null); + + ops.createSchema(schemaIdent2, "schema comment 2", properties2); + + // Test list schemas + NameIdentifier[] idents = ops.listSchemas(Namespace.of(METALAKE_NAME, CATALOG_NAME)); + + Set resultSet = Arrays.stream(idents).collect(Collectors.toSet()); + Assertions.assertTrue(resultSet.contains(schemaIdent)); + Assertions.assertTrue(resultSet.contains(schemaIdent2)); + + // Test list schemas in a non-existent catalog + Assertions.assertThrows( + NoSuchCatalogException.class, + () -> ops.listSchemas(Namespace.of(METALAKE_NAME, "non-existent-catalog"))); + + // Test drop schema + Assertions.assertTrue(ops.dropSchema(schemaIdent, false)); + Assertions.assertFalse(ops.dropSchema(schemaIdent, false)); + Assertions.assertTrue(ops.dropSchema(schemaIdent2, false)); + Assertions.assertFalse(ops.dropSchema(schemaIdent2, false)); + + // Test drop non-existent schema + Assertions.assertFalse( + ops.dropSchema( + NameIdentifierUtil.ofSchema(METALAKE_NAME, CATALOG_NAME, "non-existent-schema"), + false)); + + // Test drop schema in a non-existent catalog + Assertions.assertFalse( + ops.dropSchema( + NameIdentifierUtil.ofSchema(METALAKE_NAME, "non-existent-catalog", schemaName2), + false)); + } + + @Test + public void testRegisterAndGetModel() { + String schemaName = randomSchemaName(); + createSchema(schemaName); + + String modelName = "model1"; + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(METALAKE_NAME, CATALOG_NAME, schemaName, modelName); + StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId()); + Map properties = StringIdentifier.newPropertiesWithId(stringId, null); + + Model registeredModel = ops.registerModel(modelIdent, "model comment", properties); + Assertions.assertEquals(modelName, registeredModel.name()); + Assertions.assertEquals("model comment", registeredModel.comment()); + Assertions.assertEquals(properties, registeredModel.properties()); + Assertions.assertEquals(0, registeredModel.latestVersion()); + + Model loadedModel = ops.getModel(modelIdent); + Assertions.assertEquals(modelName, loadedModel.name()); + Assertions.assertEquals("model comment", loadedModel.comment()); + Assertions.assertEquals(properties, loadedModel.properties()); + Assertions.assertEquals(0, loadedModel.latestVersion()); + + // Test register model with the same name + Assertions.assertThrows( + ModelAlreadyExistsException.class, + () -> ops.registerModel(modelIdent, "model comment", properties)); + + // Test register model in a non-existent schema + Assertions.assertThrows( + RuntimeException.class, + () -> + ops.registerModel( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, "non-existent-schema", modelName), + "model comment", + properties)); + + // Test get a non-existent model + Assertions.assertThrows( + NoSuchModelException.class, + () -> + ops.getModel( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, schemaName, "non-existent-model"))); + + // Test get a model in a non-existent schema + Assertions.assertThrows( + NoSuchModelException.class, + () -> + ops.getModel( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, "non-existent-schema", modelName))); + } + + @Test + public void testRegisterAndListModel() { + String schemaName = randomSchemaName(); + createSchema(schemaName); + + String modelName1 = "model1"; + NameIdentifier modelIdent1 = + NameIdentifierUtil.ofModel(METALAKE_NAME, CATALOG_NAME, schemaName, modelName1); + StringIdentifier stringId1 = StringIdentifier.fromId(idGenerator.nextId()); + Map properties1 = StringIdentifier.newPropertiesWithId(stringId1, null); + + ops.registerModel(modelIdent1, "model1 comment", properties1); + + NameIdentifier[] modelIdents = + ops.listModels(Namespace.of(METALAKE_NAME, CATALOG_NAME, schemaName)); + Assertions.assertEquals(1, modelIdents.length); + Assertions.assertEquals(modelIdent1, modelIdents[0]); + + String modelName2 = "model2"; + NameIdentifier modelIdent2 = + NameIdentifierUtil.ofModel(METALAKE_NAME, CATALOG_NAME, schemaName, modelName2); + StringIdentifier stringId2 = StringIdentifier.fromId(idGenerator.nextId()); + Map properties2 = StringIdentifier.newPropertiesWithId(stringId2, null); + + ops.registerModel(modelIdent2, "model2 comment", properties2); + + NameIdentifier[] modelIdents2 = + ops.listModels(Namespace.of(METALAKE_NAME, CATALOG_NAME, schemaName)); + Assertions.assertEquals(2, modelIdents2.length); + + Set resultSet = Arrays.stream(modelIdents2).collect(Collectors.toSet()); + Assertions.assertTrue(resultSet.contains(modelIdent1)); + Assertions.assertTrue(resultSet.contains(modelIdent2)); + + // Test list models in a non-existent schema + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> ops.listModels(Namespace.of(METALAKE_NAME, CATALOG_NAME, "non-existent-schema"))); + } + + @Test + public void testRegisterAndDeleteModel() { + String schemaName = randomSchemaName(); + createSchema(schemaName); + + String modelName = "model1"; + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(METALAKE_NAME, CATALOG_NAME, schemaName, modelName); + StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId()); + Map properties = StringIdentifier.newPropertiesWithId(stringId, null); + + ops.registerModel(modelIdent, "model1 comment", properties); + + Assertions.assertTrue(ops.deleteModel(modelIdent)); + Assertions.assertFalse(ops.deleteModel(modelIdent)); + + // Test get a deleted model + Assertions.assertThrows(NoSuchModelException.class, () -> ops.getModel(modelIdent)); + + // Test list models after deletion + Assertions.assertEquals( + 0, ops.listModels(Namespace.of(METALAKE_NAME, CATALOG_NAME, schemaName)).length); + + // Test delete non-existent model + Assertions.assertFalse( + ops.deleteModel( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, schemaName, "non-existent-model"))); + + // Test delete model in a non-existent schema + Assertions.assertFalse( + ops.deleteModel( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, "non-existent-schema", modelName))); + } + + @Test + public void testLinkAndGetModelVersion() { + // Create schema and model + String schemaName = randomSchemaName(); + createSchema(schemaName); + + String modelName = "model1"; + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(METALAKE_NAME, CATALOG_NAME, schemaName, modelName); + StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId()); + Map properties = StringIdentifier.newPropertiesWithId(stringId, null); + + ops.registerModel(modelIdent, "model1 comment", properties); + + // Link a model version to the registered model + StringIdentifier versionId = StringIdentifier.fromId(idGenerator.nextId()); + Map versionProperties = StringIdentifier.newPropertiesWithId(versionId, null); + + String[] aliases = new String[] {"alias1", "alias2"}; + ops.linkModelVersion( + modelIdent, "model_version_path", aliases, "version1 comment", versionProperties); + + Model loadedModel = ops.getModel(modelIdent); + Assertions.assertEquals(1, loadedModel.latestVersion()); + + ModelVersion loadedVersion = ops.getModelVersion(modelIdent, 0); + Assertions.assertEquals(0, loadedVersion.version()); + Assertions.assertEquals("version1 comment", loadedVersion.comment()); + Assertions.assertEquals("model_version_path", loadedVersion.uri()); + Assertions.assertEquals(versionProperties, loadedVersion.properties()); + + // Test get a model version using alias + ModelVersion loadedVersionByAlias = ops.getModelVersion(modelIdent, "alias1"); + Assertions.assertEquals(0, loadedVersionByAlias.version()); + + ModelVersion loadedVersionByAlias2 = ops.getModelVersion(modelIdent, "alias2"); + Assertions.assertEquals(0, loadedVersionByAlias2.version()); + + // Test link model version to a non-existent model + Assertions.assertThrows( + NoSuchModelException.class, + () -> + ops.linkModelVersion( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, schemaName, "non-existent-model"), + "model_version_path", + aliases, + "version1 comment", + versionProperties)); + + // Test link model version to a non-existent schema + Assertions.assertThrows( + NoSuchModelException.class, + () -> + ops.linkModelVersion( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, "non-existent-schema", modelName), + "model_version_path", + aliases, + "version1 comment", + versionProperties)); + + // Test link model version with existent aliases + Assertions.assertThrows( + ModelVersionAliasesAlreadyExistException.class, + () -> + ops.linkModelVersion( + modelIdent, + "model_version_path", + new String[] {"alias1"}, + "version1 comment", + versionProperties)); + + Assertions.assertThrows( + ModelVersionAliasesAlreadyExistException.class, + () -> + ops.linkModelVersion( + modelIdent, + "model_version_path", + new String[] {"alias2"}, + "version1 comment", + versionProperties)); + + // Test get a model version from non-existent model + Assertions.assertThrows( + NoSuchModelVersionException.class, + () -> + ops.getModelVersion( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, schemaName, "non-existent-model"), + 0)); + + // Test get a non-existent model version + Assertions.assertThrows( + NoSuchModelVersionException.class, () -> ops.getModelVersion(modelIdent, 1)); + + // Test get a non-existent model version using alias + Assertions.assertThrows( + NoSuchModelVersionException.class, + () -> ops.getModelVersion(modelIdent, "non-existent-alias")); + + // Test create a model version with null alias, comment and properties + StringIdentifier versionId2 = StringIdentifier.fromId(idGenerator.nextId()); + Map versionProperties2 = StringIdentifier.newPropertiesWithId(versionId2, null); + + ops.linkModelVersion(modelIdent, "model_version_path2", null, null, versionProperties2); + + // Test get a model version with null alias, comment and properties + ModelVersion loadedVersion2 = ops.getModelVersion(modelIdent, 1); + Assertions.assertEquals(1, loadedVersion2.version()); + Assertions.assertNull(loadedVersion2.comment()); + Assertions.assertEquals(versionProperties2, loadedVersion2.properties()); + Assertions.assertEquals(0, loadedVersion2.aliases().length); + + // Test get a model version with alias + Assertions.assertThrows( + NoSuchModelVersionException.class, () -> ops.getModelVersion(modelIdent, "alias3")); + } + + @Test + public void testLinkAndListModelVersions() { + // Create schema and model + String schemaName = randomSchemaName(); + createSchema(schemaName); + + String modelName = "model1"; + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(METALAKE_NAME, CATALOG_NAME, schemaName, modelName); + StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId()); + Map properties = StringIdentifier.newPropertiesWithId(stringId, null); + + ops.registerModel(modelIdent, "model1 comment", properties); + + // Create a model version + StringIdentifier versionId = StringIdentifier.fromId(idGenerator.nextId()); + Map versionProperties = StringIdentifier.newPropertiesWithId(versionId, null); + + String[] aliases = new String[] {"alias1", "alias2"}; + ops.linkModelVersion( + modelIdent, "model_version_path", aliases, "version1 comment", versionProperties); + + // List linked model versions + int[] versions = ops.listModelVersions(modelIdent); + Assertions.assertEquals(1, versions.length); + Assertions.assertEquals(0, versions[0]); + + ModelVersion loadedVersion = ops.getModelVersion(modelIdent, versions[0]); + Assertions.assertEquals(0, loadedVersion.version()); + Assertions.assertEquals("version1 comment", loadedVersion.comment()); + Assertions.assertEquals("model_version_path", loadedVersion.uri()); + Assertions.assertEquals(versionProperties, loadedVersion.properties()); + + // Create another model version + StringIdentifier versionId2 = StringIdentifier.fromId(idGenerator.nextId()); + Map versionProperties2 = StringIdentifier.newPropertiesWithId(versionId2, null); + + String[] aliases2 = new String[] {"alias3", "alias4"}; + ops.linkModelVersion( + modelIdent, "model_version_path2", aliases2, "version2 comment", versionProperties2); + + // List linked model versions + int[] versions2 = ops.listModelVersions(modelIdent); + Assertions.assertEquals(2, versions2.length); + + Set resultSet = Arrays.stream(versions2).boxed().collect(Collectors.toSet()); + Assertions.assertTrue(resultSet.contains(0)); + Assertions.assertTrue(resultSet.contains(1)); + + // Test list model versions in a non-existent model + Assertions.assertThrows( + NoSuchModelException.class, + () -> + ops.listModelVersions( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, schemaName, "non-existent-model"))); + + // Test list model versions in a non-existent schema + Assertions.assertThrows( + NoSuchModelException.class, + () -> + ops.listModelVersions( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, "non-existent-schema", modelName))); + } + + @Test + public void testDeleteModelVersion() { + // Create schema and model + String schemaName = randomSchemaName(); + createSchema(schemaName); + + String modelName = "model1"; + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(METALAKE_NAME, CATALOG_NAME, schemaName, modelName); + StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId()); + Map properties = StringIdentifier.newPropertiesWithId(stringId, null); + + ops.registerModel(modelIdent, "model1 comment", properties); + + // Create a model version + StringIdentifier versionId = StringIdentifier.fromId(idGenerator.nextId()); + Map versionProperties = StringIdentifier.newPropertiesWithId(versionId, null); + + String[] aliases = new String[] {"alias1", "alias2"}; + ops.linkModelVersion( + modelIdent, "model_version_path", aliases, "version1 comment", versionProperties); + + // Delete the model version + Assertions.assertTrue(ops.deleteModelVersion(modelIdent, 0)); + Assertions.assertFalse(ops.deleteModelVersion(modelIdent, 0)); + + // Test get a deleted model version + Assertions.assertThrows( + NoSuchModelVersionException.class, () -> ops.getModelVersion(modelIdent, 0)); + + // Test delete model version in a non-existent model + Assertions.assertFalse( + ops.deleteModelVersion( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, schemaName, "non-existent-model"), + 0)); + + // Test delete model version in a non-existent schema + Assertions.assertFalse( + ops.deleteModelVersion( + NameIdentifierUtil.ofModel( + METALAKE_NAME, CATALOG_NAME, "non-existent-schema", modelName), + 0)); + + // Test delete model version using alias + ops.linkModelVersion( + modelIdent, "model_version_path", aliases, "version1 comment", versionProperties); + + Assertions.assertTrue(ops.deleteModelVersion(modelIdent, "alias1")); + Assertions.assertFalse(ops.deleteModelVersion(modelIdent, "alias1")); + Assertions.assertFalse(ops.deleteModelVersion(modelIdent, "alias2")); + + // Test list model versions after deletion + Assertions.assertEquals(0, ops.listModelVersions(modelIdent).length); + + // Test get the latest version after deletion + Model loadedModel = ops.getModel(modelIdent); + Assertions.assertEquals(2, loadedModel.latestVersion()); + + ops.linkModelVersion( + modelIdent, "model_version_path", aliases, "version1 comment", versionProperties); + int[] versions = ops.listModelVersions(modelIdent); + Assertions.assertEquals(1, versions.length); + Assertions.assertEquals(2, versions[0]); + } + + @Test + public void testDeleteModelWithVersions() { + // Create schema and model + String schemaName = randomSchemaName(); + createSchema(schemaName); + + String modelName = "model1"; + NameIdentifier modelIdent = + NameIdentifierUtil.ofModel(METALAKE_NAME, CATALOG_NAME, schemaName, modelName); + StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId()); + Map properties = StringIdentifier.newPropertiesWithId(stringId, null); + + ops.registerModel(modelIdent, "model1 comment", properties); + + // Create a model version + StringIdentifier versionId = StringIdentifier.fromId(idGenerator.nextId()); + Map versionProperties = StringIdentifier.newPropertiesWithId(versionId, null); + + String[] aliases = new String[] {"alias1", "alias2"}; + ops.linkModelVersion( + modelIdent, "model_version_path", aliases, "version1 comment", versionProperties); + + // Delete the model + Assertions.assertTrue(ops.deleteModel(modelIdent)); + Assertions.assertFalse(ops.deleteModel(modelIdent)); + + // Test get a deleted model + Assertions.assertThrows(NoSuchModelException.class, () -> ops.getModel(modelIdent)); + + // Test list model versions after deletion + Assertions.assertThrows(NoSuchModelException.class, () -> ops.listModelVersions(modelIdent)); + } + + private String randomSchemaName() { + return "schema_" + UUID.randomUUID().toString().replace("-", ""); + } + + private void createSchema(String schemaName) { + NameIdentifier schemaIdent = + NameIdentifierUtil.ofSchema(METALAKE_NAME, CATALOG_NAME, schemaName); + StringIdentifier stringId = StringIdentifier.fromId(idGenerator.nextId()); + Map properties = StringIdentifier.newPropertiesWithId(stringId, null); + + ops.createSchema(schemaIdent, "schema comment", properties); + } +} diff --git a/core/src/main/java/org/apache/gravitino/catalog/ManagedSchemaOperations.java b/core/src/main/java/org/apache/gravitino/catalog/ManagedSchemaOperations.java new file mode 100644 index 00000000000..fec07baceaa --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/catalog/ManagedSchemaOperations.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.SchemaChange; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.connector.BaseSchema; +import org.apache.gravitino.connector.SupportsSchemas; +import org.apache.gravitino.exceptions.AlreadyExistsException; +import org.apache.gravitino.exceptions.NoSuchCatalogException; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NonEmptyEntityException; +import org.apache.gravitino.exceptions.NonEmptySchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.SchemaEntity; +import org.apache.gravitino.utils.PrincipalUtils; + +public abstract class ManagedSchemaOperations implements SupportsSchemas { + + private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist"; + + public static class ManagedSchema extends BaseSchema { + + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends BaseSchemaBuilder { + + private Builder() {} + + @Override + protected ManagedSchema internalBuild() { + ManagedSchema schema = new ManagedSchema(); + schema.name = name; + schema.comment = comment; + schema.properties = properties; + schema.auditInfo = auditInfo; + return schema; + } + } + } + + protected abstract EntityStore store(); + + @Override + public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { + try { + List schemas = + store().list(namespace, SchemaEntity.class, Entity.EntityType.SCHEMA); + return schemas.stream() + .map(s -> NameIdentifier.of(namespace, s.name())) + .toArray(NameIdentifier[]::new); + + } catch (NoSuchEntityException e) { + throw new NoSuchCatalogException(e, "Catalog %s does not exist", namespace); + } catch (IOException ioe) { + throw new RuntimeException("Failed to list schemas under namespace " + namespace, ioe); + } + } + + @Override + public Schema createSchema(NameIdentifier ident, String comment, Map properties) + throws NoSuchCatalogException, SchemaAlreadyExistsException { + try { + if (store().exists(ident, Entity.EntityType.SCHEMA)) { + throw new SchemaAlreadyExistsException("Schema %s already exists", ident); + } + } catch (IOException ioe) { + throw new RuntimeException("Failed to check if schema " + ident + " exists", ioe); + } + + StringIdentifier stringId = StringIdentifier.fromProperties(properties); + Preconditions.checkNotNull(stringId, "Property String identifier should not be null"); + + SchemaEntity schemaEntity = + SchemaEntity.builder() + .withName(ident.name()) + .withId(stringId.id()) + .withNamespace(ident.namespace()) + .withComment(comment) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + try { + store().put(schemaEntity, true /* overwrite */); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create schema " + ident, ioe); + } catch (NoSuchEntityException e) { + throw new NoSuchCatalogException(e, "Catalog %s does not exist", ident.namespace()); + } + + return ManagedSchema.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(schemaEntity.properties()) + .withAuditInfo(schemaEntity.auditInfo()) + .build(); + } + + @Override + public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { + try { + SchemaEntity schemaEntity = store().get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class); + + return ManagedSchema.builder() + .withName(ident.name()) + .withComment(schemaEntity.comment()) + .withProperties(schemaEntity.properties()) + .withAuditInfo(schemaEntity.auditInfo()) + .build(); + + } catch (NoSuchEntityException exception) { + throw new NoSuchSchemaException(exception, SCHEMA_DOES_NOT_EXIST_MSG, ident); + } catch (IOException ioe) { + throw new RuntimeException("Failed to load schema " + ident, ioe); + } + } + + @Override + public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) + throws NoSuchSchemaException { + try { + SchemaEntity entity = + store() + .update( + ident, + SchemaEntity.class, + Entity.EntityType.SCHEMA, + schemaEntity -> updateSchemaEntity(ident, schemaEntity, changes)); + + return ManagedSchema.builder() + .withName(ident.name()) + .withComment(entity.comment()) + .withProperties(entity.properties()) + .withAuditInfo(entity.auditInfo()) + .build(); + + } catch (IOException ioe) { + throw new RuntimeException("Failed to update schema " + ident, ioe); + } catch (NoSuchEntityException nsee) { + throw new NoSuchSchemaException(nsee, SCHEMA_DOES_NOT_EXIST_MSG, ident); + } catch (AlreadyExistsException aee) { + throw new RuntimeException( + "Schema with the same name " + + ident.name() + + " already exists, this is unexpected because schema doesn't support rename", + aee); + } + } + + @Override + public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { + try { + return store().delete(ident, Entity.EntityType.SCHEMA, cascade); + + } catch (IOException ioe) { + throw new RuntimeException("Failed to delete schema " + ident, ioe); + } catch (NonEmptyEntityException neee) { + throw new NonEmptySchemaException(neee, "Schema %s is not empty", ident); + } catch (NoSuchEntityException nsee) { + return false; + } + } + + private SchemaEntity updateSchemaEntity( + NameIdentifier ident, SchemaEntity schemaEntity, SchemaChange... changes) { + Map props = + schemaEntity.properties() == null + ? Maps.newHashMap() + : Maps.newHashMap(schemaEntity.properties()); + + for (SchemaChange change : changes) { + if (change instanceof SchemaChange.SetProperty) { + SchemaChange.SetProperty setProperty = (SchemaChange.SetProperty) change; + props.put(setProperty.getProperty(), setProperty.getValue()); + } else if (change instanceof SchemaChange.RemoveProperty) { + SchemaChange.RemoveProperty removeProperty = (SchemaChange.RemoveProperty) change; + props.remove(removeProperty.getProperty()); + } else { + throw new IllegalArgumentException( + "Unsupported schema change: " + change.getClass().getSimpleName()); + } + } + + return SchemaEntity.builder() + .withName(schemaEntity.name()) + .withNamespace(ident.namespace()) + .withId(schemaEntity.id()) + .withComment(schemaEntity.comment()) + .withProperties(props) + .withAuditInfo( + AuditInfo.builder() + .withCreator(schemaEntity.auditInfo().creator()) + .withCreateTime(schemaEntity.auditInfo().createTime()) + .withLastModifier(PrincipalUtils.getCurrentPrincipal().getName()) + .withLastModifiedTime(Instant.now()) + .build()) + .build(); + } +} diff --git a/core/src/main/java/org/apache/gravitino/catalog/ModelNormalizeDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/ModelNormalizeDispatcher.java index ea4933c3c2c..10683f6e9ce 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/ModelNormalizeDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/ModelNormalizeDispatcher.java @@ -69,22 +69,10 @@ public boolean modelExists(NameIdentifier ident) { @Override public Model registerModel(NameIdentifier ident, String comment, Map properties) - throws ModelAlreadyExistsException { + throws NoSuchSchemaException, ModelAlreadyExistsException { return dispatcher.registerModel(normalizeNameIdentifier(ident), comment, properties); } - @Override - public Model registerModel( - NameIdentifier ident, - String uri, - String[] aliases, - String comment, - Map properties) - throws ModelAlreadyExistsException, ModelVersionAliasesAlreadyExistException { - return dispatcher.registerModel( - normalizeNameIdentifier(ident), uri, aliases, comment, properties); - } - @Override public boolean deleteModel(NameIdentifier ident) { // The constraints of the name spec may be more strict than underlying catalog, @@ -120,15 +108,14 @@ public boolean modelVersionExists(NameIdentifier ident, String alias) { } @Override - public ModelVersion linkModelVersion( + public void linkModelVersion( NameIdentifier ident, String uri, String[] aliases, String comment, Map properties) throws NoSuchModelException, ModelVersionAliasesAlreadyExistException { - return dispatcher.linkModelVersion( - normalizeCaseSensitive(ident), uri, aliases, comment, properties); + dispatcher.linkModelVersion(normalizeCaseSensitive(ident), uri, aliases, comment, properties); } @Override diff --git a/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java index e8739673d63..eb1f17c96da 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java @@ -50,18 +50,7 @@ public Model getModel(NameIdentifier ident) throws NoSuchModelException { @Override public Model registerModel(NameIdentifier ident, String comment, Map properties) - throws ModelAlreadyExistsException { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public Model registerModel( - NameIdentifier ident, - String uri, - String[] aliases, - String comment, - Map properties) - throws ModelAlreadyExistsException, ModelVersionAliasesAlreadyExistException { + throws NoSuchModelException, ModelAlreadyExistsException { throw new UnsupportedOperationException("Not implemented"); } @@ -88,7 +77,7 @@ public ModelVersion getModelVersion(NameIdentifier ident, String alias) } @Override - public ModelVersion linkModelVersion( + public void linkModelVersion( NameIdentifier ident, String uri, String[] aliases, diff --git a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java index c6ec025ab93..ce870523a14 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java @@ -284,6 +284,12 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty NonEmptySchemaException.class, RuntimeException.class); + // For managed schema, we don't need to drop the schema from the store again. + boolean isManagedSchema = isManagedEntity(catalogIdent, Capability.Scope.SCHEMA); + if (isManagedSchema) { + return droppedFromCatalog; + } + // For unmanaged schema, it could happen that the schema: // 1. Is not found in the catalog (dropped directly from underlying sources) // 2. Is found in the catalog but not in the store (not managed by Gravitino) @@ -292,20 +298,15 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty // In all situations, we try to delete the schema from the store, but we don't take the // return value of the store operation into account. We only take the return value of the // catalog into account. - // - // For managed schema, we should take the return value of the store operation into account. - boolean droppedFromStore = false; try { - droppedFromStore = store.delete(ident, SCHEMA, cascade); + store.delete(ident, SCHEMA, cascade); } catch (NoSuchEntityException e) { LOG.warn("The schema to be dropped does not exist in the store: {}", ident, e); } catch (Exception e) { throw new RuntimeException(e); } - return isManagedEntity(catalogIdent, Capability.Scope.SCHEMA) - ? droppedFromStore - : droppedFromCatalog; + return droppedFromCatalog; } private void importSchema(NameIdentifier identifier) { diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseModel.java b/core/src/main/java/org/apache/gravitino/connector/BaseModel.java new file mode 100644 index 00000000000..4777af2561d --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/connector/BaseModel.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.connector; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.gravitino.annotation.Evolving; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.model.Model; + +/** An abstract class representing a base model. */ +@Evolving +public abstract class BaseModel implements Model { + + protected String name; + + @Nullable protected String comment; + + @Nullable protected Map properties; + + protected int latestVersion; + + protected AuditInfo auditInfo; + + /** @return The name of the model. */ + @Override + public String name() { + return name; + } + + /** @return The comment of the model. */ + @Nullable + @Override + public String comment() { + return comment; + } + + /** @return The properties of the model. */ + @Nullable + @Override + public Map properties() { + return properties; + } + + /** @return The latest version of the model. */ + @Override + public int latestVersion() { + return latestVersion; + } + + /** @return The audit information of the model. */ + @Override + public AuditInfo auditInfo() { + return auditInfo; + } + + interface Builder, T extends BaseModel> { + + SELF withName(String name); + + SELF withComment(@Nullable String comment); + + SELF withProperties(@Nullable Map properties); + + SELF withLatestVersion(int latestVersion); + + SELF withAuditInfo(AuditInfo auditInfo); + + T build(); + } + + public abstract static class BaseModelBuilder, T extends BaseModel> + implements Builder { + + protected String name; + + @Nullable protected String comment; + + @Nullable protected Map properties; + + protected int latestVersion; + + protected AuditInfo auditInfo; + + /** + * Sets the name of the model. + * + * @param name The name of the model. + * @return This builder instance. + */ + @Override + public SELF withName(String name) { + this.name = name; + return self(); + } + + /** + * Sets the comment of the model. + * + * @param comment The comment of the model. + * @return This builder instance. + */ + @Override + public SELF withComment(@Nullable String comment) { + this.comment = comment; + return self(); + } + + /** + * Sets the properties of the model. + * + * @param properties The properties of the model. + * @return This builder instance. + */ + @Override + public SELF withProperties(@Nullable Map properties) { + this.properties = properties; + return self(); + } + + /** + * Sets the latest version of the model. + * + * @param latestVersion The latest version of the model. + * @return This builder instance. + */ + @Override + public SELF withLatestVersion(int latestVersion) { + this.latestVersion = latestVersion; + return self(); + } + + /** + * Sets the audit information of the model. + * + * @param auditInfo The audit information of the model. + * @return This builder instance. + */ + @Override + public SELF withAuditInfo(AuditInfo auditInfo) { + this.auditInfo = auditInfo; + return self(); + } + + /** + * Builds the model object. + * + * @return The model object. + */ + @Override + public T build() { + return internalBuild(); + } + + /** + * Builds the concrete model object with the provided attributes. + * + * @return The concrete model object. + */ + protected abstract T internalBuild(); + + private SELF self() { + return (SELF) this; + } + } +} diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseModelVersion.java b/core/src/main/java/org/apache/gravitino/connector/BaseModelVersion.java new file mode 100644 index 00000000000..e58a90d1f7c --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/connector/BaseModelVersion.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.connector; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.gravitino.annotation.Evolving; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.model.ModelVersion; + +/** An abstract class representing a base model version. */ +@Evolving +public abstract class BaseModelVersion implements ModelVersion { + + protected int version; + + protected String[] aliases; + + @Nullable protected String comment; + + protected String uri; + + protected Map properties; + + protected AuditInfo auditInfo; + + /** @return the version of the model object. */ + @Override + public int version() { + return version; + } + + /** @return the aliases of the model version. */ + @Override + public String[] aliases() { + return aliases; + } + + /** @return the comment of the model version. */ + @Override + public String comment() { + return comment; + } + + /** @return the URI of the model artifact. */ + @Override + public String uri() { + return uri; + } + + /** @return the properties of the model version. */ + @Override + public Map properties() { + return properties; + } + + /** @return the audit details of the model version. */ + @Override + public AuditInfo auditInfo() { + return auditInfo; + } + + interface Builder, T extends BaseModelVersion> { + + SELF withVersion(int version); + + SELF withAliases(String[] aliases); + + SELF withComment(String comment); + + SELF withUri(String uri); + + SELF withProperties(Map properties); + + SELF withAuditInfo(AuditInfo auditInfo); + + T build(); + } + + public abstract static class BaseModelVersionBuilder< + SELF extends Builder, T extends BaseModelVersion> + implements Builder { + + protected int version; + + protected String[] aliases; + + protected String comment; + + protected String uri; + + protected Map properties; + + protected AuditInfo auditInfo; + + /** + * Sets the version of the model object. + * + * @param version The version of the model object. + * @return The builder instance. + */ + @Override + public SELF withVersion(int version) { + this.version = version; + return self(); + } + + /** + * Sets the aliases of the model version. + * + * @param aliases The aliases of the model version. + * @return The builder instance. + */ + @Override + public SELF withAliases(String[] aliases) { + this.aliases = aliases; + return self(); + } + + /** + * Sets the comment of the model version. + * + * @param comment The comment of the model version. + * @return The builder instance. + */ + @Override + public SELF withComment(String comment) { + this.comment = comment; + return self(); + } + + /** + * Sets the URI of the model artifact. + * + * @param uri The URI of the model artifact. + * @return The builder instance. + */ + @Override + public SELF withUri(String uri) { + this.uri = uri; + return self(); + } + + /** + * Sets the properties of the model version. + * + * @param properties The properties of the model version. + * @return The builder instance. + */ + @Override + public SELF withProperties(Map properties) { + this.properties = properties; + return self(); + } + + /** + * Sets the audit details of the model version. + * + * @param auditInfo The audit details of the model version. + * @return The builder instance. + */ + @Override + public SELF withAuditInfo(AuditInfo auditInfo) { + this.auditInfo = auditInfo; + return self(); + } + + /** + * Builds the model version object. + * + * @return The model version object. + */ + @Override + public T build() { + return internalBuild(); + } + + /** + * Builds the model version object. + * + * @return The model version object. + */ + protected abstract T internalBuild(); + + private SELF self() { + return (SELF) this; + } + } +} diff --git a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java index a1cb3ead63c..b656bfa95da 100644 --- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java +++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java @@ -154,6 +154,26 @@ public static NameIdentifier ofModel( return NameIdentifier.of(metalake, catalog, schema, model); } + /** + * Create the model {@link NameIdentifier} from the give model version's namespace. + * + * @param modelVersionNs The model version's namespace + * @return The created model {@link NameIdentifier} + */ + public static NameIdentifier toModelIdentifier(Namespace modelVersionNs) { + return NameIdentifier.of(modelVersionNs.levels()); + } + + /** + * Create the model {@link NameIdentifier} from the give model version's name identifier. + * + * @param modelIdent The model version's name identifier + * @return The created model {@link NameIdentifier} + */ + public static NameIdentifier toModelIdentifier(NameIdentifier modelIdent) { + return NameIdentifier.of(modelIdent.namespace().levels()); + } + /** * Create the model version {@link NameIdentifier} with the given metalake, catalog, schema, model * and version. @@ -170,6 +190,54 @@ public static NameIdentifier ofModelVersion( return NameIdentifier.of(metalake, catalog, schema, model, String.valueOf(version)); } + /** + * Create the model version {@link NameIdentifier} with the given metalake, catalog, schema, model + * and alias. + * + * @param metalake The metalake name + * @param catalog The catalog name + * @param schema The schema name + * @param model The model name + * @param alias The model version alias + * @return The created model version {@link NameIdentifier} + */ + public static NameIdentifier ofModelVersion( + String metalake, String catalog, String schema, String model, String alias) { + return NameIdentifier.of(metalake, catalog, schema, model, alias); + } + + /** + * Create the model version {@link NameIdentifier} with the given model identifier and version. + * + * @param modelIdent The model identifier + * @param version The model version + * @return The created model version {@link NameIdentifier} + */ + public static NameIdentifier toModelVersionIdentifier(NameIdentifier modelIdent, int version) { + return ofModelVersion( + modelIdent.namespace().level(0), + modelIdent.namespace().level(1), + modelIdent.namespace().level(2), + modelIdent.name(), + version); + } + + /** + * Create the model version {@link NameIdentifier} with the given model identifier and alias. + * + * @param modelIdent The model identifier + * @param alias The model version alias + * @return The created model version {@link NameIdentifier} + */ + public static NameIdentifier toModelVersionIdentifier(NameIdentifier modelIdent, String alias) { + return ofModelVersion( + modelIdent.namespace().level(0), + modelIdent.namespace().level(1), + modelIdent.namespace().level(2), + modelIdent.name(), + alias); + } + /** * Try to get the catalog {@link NameIdentifier} from the given {@link NameIdentifier}. * diff --git a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java index 03ad8dc2eab..d0e473c5010 100644 --- a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java +++ b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java @@ -20,6 +20,7 @@ import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; +import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; import org.apache.gravitino.exceptions.IllegalNamespaceException; @@ -133,6 +134,20 @@ public static Namespace ofModelVersion( return Namespace.of(metalake, catalog, schema, model); } + /** + * Convert a model name identifier to a model version namespace. + * + * @param modelIdent The model name identifier + * @return A model version namespace + */ + public static Namespace toModelVersionNs(NameIdentifier modelIdent) { + return ofModelVersion( + modelIdent.namespace().level(0), + modelIdent.namespace().level(1), + modelIdent.namespace().level(2), + modelIdent.name()); + } + /** * Check if the given metalake namespace is legal, throw an {@link IllegalNamespaceException} if * it's illegal. diff --git a/scripts/h2/schema-0.8.0-h2.sql b/scripts/h2/schema-0.8.0-h2.sql index 541c60da958..8a6b2f43431 100644 --- a/scripts/h2/schema-0.8.0-h2.sql +++ b/scripts/h2/schema-0.8.0-h2.sql @@ -333,6 +333,6 @@ CREATE TABLE IF NOT EXISTS `model_version_alias_rel` ( `model_version_alias` VARCHAR(128) NOT NULL COMMENT 'model version alias', `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version alias deleted at', PRIMARY KEY (`id`), - UNIQUE KEY `uk_mi_mv_mva_del` (`model_id`, `model_version`, `model_version_alias`, `deleted_at`), + UNIQUE KEY `uk_mi_mva_del` (`model_id`, `model_version_alias`, `deleted_at`), KEY `idx_mva` (`model_version_alias`) ) ENGINE=InnoDB; diff --git a/scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql b/scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql index 60c89a86eac..5cf0dfbf6ec 100644 --- a/scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql +++ b/scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql @@ -62,6 +62,6 @@ CREATE TABLE IF NOT EXISTS `model_version_alias_rel` ( `model_version_alias` VARCHAR(128) NOT NULL COMMENT 'model version alias', `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version alias deleted at', PRIMARY KEY (`id`), - UNIQUE KEY `uk_mi_mv_mva_del` (`model_id`, `model_version`, `model_version_alias`, `deleted_at`), + UNIQUE KEY `uk_mi_mva_del` (`model_id`, `model_version_alias`, `deleted_at`), KEY `idx_mva` (`model_version_alias`) ) ENGINE=InnoDB; diff --git a/scripts/mysql/schema-0.8.0-mysql.sql b/scripts/mysql/schema-0.8.0-mysql.sql index 07b8e146caa..fbf8fd9c44d 100644 --- a/scripts/mysql/schema-0.8.0-mysql.sql +++ b/scripts/mysql/schema-0.8.0-mysql.sql @@ -324,6 +324,6 @@ CREATE TABLE IF NOT EXISTS `model_version_alias_rel` ( `model_version_alias` VARCHAR(128) NOT NULL COMMENT 'model version alias', `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version alias deleted at', PRIMARY KEY (`id`), - UNIQUE KEY `uk_mi_mv_mva_del` (`model_id`, `model_version`, `model_version_alias`, `deleted_at`), + UNIQUE KEY `uk_mi_mva_del` (`model_id`, `model_version_alias`, `deleted_at`), KEY `idx_mva` (`model_version_alias`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'model_version_alias_rel'; diff --git a/scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql b/scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql index 7858237c793..adca86cb539 100644 --- a/scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql +++ b/scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql @@ -62,6 +62,6 @@ CREATE TABLE IF NOT EXISTS `model_version_alias_rel` ( `model_version_alias` VARCHAR(128) NOT NULL COMMENT 'model version alias', `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version alias deleted at', PRIMARY KEY (`id`), - UNIQUE KEY `uk_mi_mv_mva_del` (`model_id`, `model_version`, `model_version_alias`, `deleted_at`), + UNIQUE KEY `uk_mi_mva_del` (`model_id`, `model_version_alias`, `deleted_at`), KEY `idx_mva` (`model_version_alias`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'model_version_alias_rel'; diff --git a/scripts/postgresql/schema-0.8.0-postgresql.sql b/scripts/postgresql/schema-0.8.0-postgresql.sql index fea0458438d..d964c1f80eb 100644 --- a/scripts/postgresql/schema-0.8.0-postgresql.sql +++ b/scripts/postgresql/schema-0.8.0-postgresql.sql @@ -573,7 +573,7 @@ CREATE TABLE IF NOT EXISTS model_version_alias_rel ( model_version_alias VARCHAR(128) NOT NULL, deleted_at BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (id), - UNIQUE (model_id, model_version, model_version_alias, deleted_at) + UNIQUE (model_id, model_version_alias, deleted_at) ); CREATE INDEX IF NOT EXISTS idx_model_version_alias on model_version_alias_rel (model_version_alias); diff --git a/scripts/postgresql/upgrade-0.7.0-to-0.8.0-postgresql.sql b/scripts/postgresql/upgrade-0.7.0-to-0.8.0-postgresql.sql index a94c4ab2204..0d9ca934342 100644 --- a/scripts/postgresql/upgrade-0.7.0-to-0.8.0-postgresql.sql +++ b/scripts/postgresql/upgrade-0.7.0-to-0.8.0-postgresql.sql @@ -91,7 +91,7 @@ CREATE TABLE IF NOT EXISTS model_version_alias_rel ( model_version_alias VARCHAR(128) NOT NULL, deleted_at BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (id), - UNIQUE (model_id, model_version, model_version_alias, deleted_at) + UNIQUE (model_id, model_version_alias, deleted_at) ); CREATE INDEX IF NOT EXISTS idx_model_version_alias on model_version_alias_rel (model_version_alias);