Skip to content

Commit

Permalink
[apache#4020] feat(core): Add Tag Manage core logic to support tag op…
Browse files Browse the repository at this point in the history
…erations (part-2) (apache#4109)

### What changes were proposed in this pull request?

This PR add the second part tag core logic to support associate tags
with metadata object, and query the associations between tags and
metadata objects.

### Why are the changes needed?

This is a part of work to support tag system.

Fix: apache#4020 

### Does this PR introduce _any_ user-facing change?

NO.

### How was this patch tested?

Add UTs to cover the logic.
  • Loading branch information
jerryshao authored Jul 16, 2024
1 parent 116e5ae commit d85a9b4
Show file tree
Hide file tree
Showing 26 changed files with 2,024 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.exceptions;

import com.google.errorprone.annotations.FormatMethod;

/** Exception thrown when a tag with specified name already associated to a metadata object. */
public class TagAlreadyAssociatedException extends AlreadyExistsException {

/**
* Constructs a new exception with the specified detail message.
*
* @param message the detail message.
* @param args the arguments to the message.
*/
@FormatMethod
public TagAlreadyAssociatedException(String message, Object... args) {
super(message, args);
}

/**
* Constructs a new exception with the specified detail message and cause.
*
* @param cause the cause.
* @param message the detail message.
* @param args the arguments to the message.
*/
@FormatMethod
public TagAlreadyAssociatedException(Throwable cause, String message, Object... args) {
super(cause, message, args);
}
}
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/gravitino/EntityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Function;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.tag.SupportsTagOperations;
import org.apache.gravitino.utils.Executable;

public interface EntityStore extends Closeable {
Expand Down Expand Up @@ -183,4 +184,14 @@ default boolean delete(NameIdentifier ident, EntityType entityType) throws IOExc
*/
<R, E extends Exception> R executeInTransaction(Executable<R, E> executable)
throws E, IOException;

/**
* Get the extra tag operations that are supported by the entity store.
*
* @return the tag operations object that are supported by the entity store
* @throws UnsupportedOperationException if the extra operations are not supported
*/
default SupportsTagOperations tagOperations() {
throw new UnsupportedOperationException("tag operations are not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.UnsupportedEntityTypeException;
Expand Down Expand Up @@ -328,6 +329,37 @@ public void close() throws IOException {
}
}

@Override
public List<MetadataObject> listAssociatedMetadataObjectsForTag(NameIdentifier tagIdent)
throws IOException {
return TagMetaService.getInstance().listAssociatedMetadataObjectsForTag(tagIdent);
}

@Override
public List<TagEntity> listAssociatedTagsForMetadataObject(
NameIdentifier objectIdent, Entity.EntityType objectType)
throws NoSuchEntityException, IOException {
return TagMetaService.getInstance().listTagsForMetadataObject(objectIdent, objectType);
}

@Override
public TagEntity getTagForMetadataObject(
NameIdentifier objectIdent, Entity.EntityType objectType, NameIdentifier tagIdent)
throws NoSuchEntityException, IOException {
return TagMetaService.getInstance().getTagForMetadataObject(objectIdent, objectType, tagIdent);
}

@Override
public List<TagEntity> associateTagsWithMetadataObject(
NameIdentifier objectIdent,
Entity.EntityType objectType,
NameIdentifier[] tagsToAdd,
NameIdentifier[] tagsToRemove)
throws NoSuchEntityException, EntityAlreadyExistsException, IOException {
return TagMetaService.getInstance()
.associateTagsWithMetadataObject(objectIdent, objectType, tagsToAdd, tagsToRemove);
}

enum JDBCBackendType {
H2(true),
MYSQL(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.tag.SupportsTagOperations;

/** Interface defining the operations for a Relation Backend. */
public interface RelationalBackend extends Closeable {
public interface RelationalBackend extends Closeable, SupportsTagOperations {

/**
* Initializes the Relational Backend environment with the provided configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.apache.gravitino.EntitySerDe;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.meta.TagEntity;
import org.apache.gravitino.tag.SupportsTagOperations;
import org.apache.gravitino.utils.Executable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,7 +46,7 @@
* MySQL, PostgreSQL, etc. If you want to use a different backend, you can implement the {@link
* RelationalBackend} interface
*/
public class RelationalEntityStore implements EntityStore {
public class RelationalEntityStore implements EntityStore, SupportsTagOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalEntityStore.class);
public static final ImmutableMap<String, String> RELATIONAL_BACKENDS =
ImmutableMap.of(
Expand Down Expand Up @@ -132,4 +135,40 @@ public void close() throws IOException {
garbageCollector.close();
backend.close();
}

@Override
public SupportsTagOperations tagOperations() {
return this;
}

@Override
public List<MetadataObject> listAssociatedMetadataObjectsForTag(NameIdentifier tagIdent)
throws IOException {
return backend.listAssociatedMetadataObjectsForTag(tagIdent);
}

@Override
public List<TagEntity> listAssociatedTagsForMetadataObject(
NameIdentifier objectIdent, Entity.EntityType objectType)
throws NoSuchEntityException, IOException {
return backend.listAssociatedTagsForMetadataObject(objectIdent, objectType);
}

@Override
public TagEntity getTagForMetadataObject(
NameIdentifier objectIdent, Entity.EntityType objectType, NameIdentifier tagIdent)
throws NoSuchEntityException, IOException {
return backend.getTagForMetadataObject(objectIdent, objectType, tagIdent);
}

@Override
public List<TagEntity> associateTagsWithMetadataObject(
NameIdentifier objectIdent,
Entity.EntityType objectType,
NameIdentifier[] tagsToAdd,
NameIdentifier[] tagsToRemove)
throws NoSuchEntityException, EntityAlreadyExistsException, IOException {
return backend.associateTagsWithMetadataObject(
objectIdent, objectType, tagsToAdd, tagsToRemove);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface TagMetaMapper {
String TAG_TABLE_NAME = "tag_meta";

@Select(
"SELECT tm.tag_id as tagId, tag_name as tagName,"
"SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+ " tm.metalake_id as metalakeId,"
+ " tm.tag_comment as comment,"
+ " tm.properties as properties,"
Expand All @@ -43,16 +43,41 @@ public interface TagMetaMapper {
+ TAG_TABLE_NAME
+ " tm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm on tm.metalake_id = mm.metalake_id"
+ " mm ON tm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.deleted_at = 0 AND mm.deleted_at = 0")
List<TagPO> listTagPOsByMetalake(@Param("metalakeName") String metalakeName);

@Select(
"<script>"
+ "SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+ " tm.metalake_id as metalakeId,"
+ " tm.tag_comment as comment,"
+ " tm.properties as properties,"
+ " tm.audit_info as auditInfo,"
+ " tm.current_version as currentVersion,"
+ " tm.last_version as lastVersion,"
+ " tm.deleted_at as deletedAt"
+ " FROM "
+ TAG_TABLE_NAME
+ " tm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm ON tm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name IN "
+ " <foreach"
+ " item='tagName' index='index' collection='tagNames' open='(' separator=',' close=')'>"
+ " #{tagName}"
+ " </foreach>"
+ " AND tm.deleted_at = 0 AND mm.deleted_at = 0"
+ "</script>")
List<TagPO> listTagPOsByMetalakeAndTagNames(
@Param("metalakeName") String metalakeName, @Param("tagNames") List<String> tagNames);

@Select(
"SELECT tm.tag_id as tagId FROM "
+ TAG_TABLE_NAME
+ " tm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm on tm.metalake_id = mm.metalake_id"
+ " mm ON tm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = #{tagName}"
+ " AND tm.deleted_at = 0 AND mm.deleted_at = 0")
Long selectTagIdByMetalakeAndName(
Expand All @@ -71,7 +96,7 @@ Long selectTagIdByMetalakeAndName(
+ TAG_TABLE_NAME
+ " tm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm on tm.metalake_id = mm.metalake_id"
+ " mm ON tm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = #{tagName}"
+ " AND tm.deleted_at = 0 AND mm.deleted_at = 0")
TagPO selectTagMetaByMetalakeAndName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,136 @@
*/
package org.apache.gravitino.storage.relational.mapper;

import java.util.List;
import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
import org.apache.gravitino.storage.relational.po.TagPO;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

public interface TagMetadataObjectRelMapper {
String TAG_METADATA_OBJECT_RELATION_TABLE_NAME = "tag_relation_meta";

@Select(
"SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+ " tm.metalake_id as metalakeId, tm.tag_comment as comment, tm.properties as properties,"
+ " tm.audit_info as auditInfo,"
+ " tm.current_version as currentVersion,"
+ " tm.last_version as lastVersion,"
+ " tm.deleted_at as deletedAt"
+ " FROM "
+ TagMetaMapper.TAG_TABLE_NAME
+ " tm JOIN "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " te ON tm.tag_id = te.tag_id"
+ " WHERE te.metadata_object_id = #{metadataObjectId}"
+ " AND te.metadata_object_type = #{metadataObjectType} AND te.deleted_at = 0"
+ " AND tm.deleted_at = 0")
List<TagPO> listTagPOsByMetadataObjectIdAndType(
@Param("metadataObjectId") Long metadataObjectId,
@Param("metadataObjectType") String metadataObjectType);

@Select(
"SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+ " tm.metalake_id as metalakeId, tm.tag_comment as comment, tm.properties as properties,"
+ " tm.audit_info as auditInfo,"
+ " tm.current_version as currentVersion,"
+ " tm.last_version as lastVersion,"
+ " tm.deleted_at as deletedAt"
+ " FROM "
+ TagMetaMapper.TAG_TABLE_NAME
+ " tm JOIN "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " te ON tm.tag_id = te.tag_id"
+ " WHERE te.metadata_object_id = #{metadataObjectId}"
+ " AND te.metadata_object_type = #{metadataObjectType} AND tm.tag_name = #{tagName}"
+ " AND te.deleted_at = 0 AND tm.deleted_at = 0")
TagPO getTagPOsByMetadataObjectAndTagName(
@Param("metadataObjectId") Long metadataObjectId,
@Param("metadataObjectType") String metadataObjectType,
@Param("tagName") String tagName);

@Select(
"SELECT te.tag_id as tagId, te.metadata_object_id as metadataObjectId,"
+ " te.metadata_object_type as metadataObjectType, te.audit_info as auditInfo,"
+ " te.current_version as currentVersion, te.last_version as lastVersion,"
+ " te.deleted_at as deletedAt"
+ " FROM "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " te JOIN "
+ TagMetaMapper.TAG_TABLE_NAME
+ " tm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm ON te.tag_id = tm.tag_id AND tm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = #{tagName}"
+ " AND te.deleted_at = 0 AND tm.deleted_at = 0 AND mm.deleted_at = 0")
List<TagMetadataObjectRelPO> listTagMetadataObjectRelsByMetalakeAndTagName(
@Param("metalakeName") String metalakeName, @Param("tagName") String tagName);

@Insert({
"<script>",
"INSERT INTO "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ "(tag_id, metadata_object_id, metadata_object_type, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES ",
"<foreach collection='tagRels' item='item' separator=','>",
"(#{item.tagId},"
+ " #{item.metadataObjectId},"
+ " #{item.metadataObjectType},"
+ " #{item.auditInfo},"
+ " #{item.currentVersion},"
+ " #{item.lastVersion},"
+ " #{item.deletedAt})",
"</foreach>",
"</script>"
})
void batchInsertTagMetadataObjectRels(@Param("tagRels") List<TagMetadataObjectRelPO> tagRelPOs);

@Update({
"<script>",
"UPDATE "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE tag_id IN ",
"<foreach item='tagId' collection='tagIds' open='(' separator=',' close=')'>",
"#{tagId}",
"</foreach>",
" And metadata_object_id = #{metadataObjectId}"
+ " AND metadata_object_type = #{metadataObjectType} AND deleted_at = 0",
"</script>"
})
void batchDeleteTagMetadataObjectRelsByTagIdsAndMetadataObject(
@Param("metadataObjectId") Long metadataObjectId,
@Param("metadataObjectType") String metadataObjectType,
@Param("tagIds") List<Long> tagIds);

@Update(
"UPDATE "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " tmo SET tmo.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " te SET te.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE tmo.tag_id IN (SELECT tm.tag_id FROM "
+ " WHERE te.tag_id IN (SELECT tm.tag_id FROM "
+ TagMetaMapper.TAG_TABLE_NAME
+ " tm WHERE tm.metalake_id IN (SELECT mm.metalake_id FROM "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+ " AND tm.deleted_at = 0) AND tmo.deleted_at = 0")
+ " AND tm.deleted_at = 0) AND te.deleted_at = 0")
Integer softDeleteTagMetadataObjectRelsByMetalakeAndTagName(
@Param("metalakeName") String metalakeName, @Param("tagName") String tagName);

@Update(
"UPDATE "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " tmo SET tmo.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " te SET te.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE EXISTS (SELECT * FROM "
+ TagMetaMapper.TAG_TABLE_NAME
+ " tm WHERE tm.metalake_id = #{metalakeId} AND tm.tag_id = tmo.tag_id"
+ " AND tm.deleted_at = 0) AND tmo.deleted_at = 0")
+ " tm WHERE tm.metalake_id = #{metalakeId} AND tm.tag_id = te.tag_id"
+ " AND tm.deleted_at = 0) AND te.deleted_at = 0")
void softDeleteTagMetadataObjectRelsByMetalakeId(@Param("metalakeId") Long metalakeId);

@Delete(
Expand Down
Loading

0 comments on commit d85a9b4

Please sign in to comment.