Skip to content

Commit

Permalink
Update PolarisEclipseLinkStore to work with more databases (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-maynard authored Sep 16, 2024
1 parent a8a1f6c commit 537eee6
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public PolarisEclipseLinkMetaStoreSessionImpl(

// init store
this.store = store;
try (EntityManager session = emf.createEntityManager()) {
this.store.initialize(session);
}
this.storageIntegrationProvider = storageIntegrationProvider;
}

Expand Down Expand Up @@ -281,8 +284,8 @@ public void runActionInTransaction(
LOGGER.debug("transaction committed");
}
} catch (Exception e) {
LOGGER.debug("Rolling back transaction due to an error", e);
tr.rollback();
LOGGER.debug("transaction rolled back", e);

if (e instanceof OptimisticLockException
|| e.getCause() instanceof OptimisticLockException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import jakarta.persistence.TypedQuery;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.PolarisBaseEntity;
Expand Down Expand Up @@ -53,6 +54,9 @@ public class PolarisEclipseLinkStore {
// diagnostic services
private final PolarisDiagnostics diagnosticServices;

// Used to track when the store is initialized
private final AtomicBoolean initialized = new AtomicBoolean(false);

/**
* Constructor, allocate everything at once
*
Expand All @@ -62,14 +66,22 @@ public PolarisEclipseLinkStore(@NotNull PolarisDiagnostics diagnostics) {
this.diagnosticServices = diagnostics;
}

/** Initialize the store. This should be called before other methods. */
public void initialize(EntityManager session) {
PolarisSequenceUtil.initialize(session);
initialized.set(true);
}

long getNextSequence(EntityManager session) {
diagnosticServices.check(session != null, "session_is_null");
// implement with a sequence table POLARIS_SEQUENCE
return (long) session.createNativeQuery("SELECT NEXTVAL('POLARIS_SEQ')").getSingleResult();
checkInitialized();

return PolarisSequenceUtil.getNewId(session);
}

void writeToEntities(EntityManager session, PolarisBaseEntity entity) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntity model = lookupEntity(session, entity.getCatalogId(), entity.getId());
if (model != null) {
Expand All @@ -84,6 +96,7 @@ void writeToEntities(EntityManager session, PolarisBaseEntity entity) {

void writeToEntitiesActive(EntityManager session, PolarisBaseEntity entity) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntityActive model = lookupEntityActive(session, new PolarisEntitiesActiveKey(entity));
if (model == null) {
Expand All @@ -93,6 +106,7 @@ void writeToEntitiesActive(EntityManager session, PolarisBaseEntity entity) {

void writeToEntitiesDropped(EntityManager session, PolarisBaseEntity entity) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntityDropped entityDropped =
lookupEntityDropped(session, entity.getCatalogId(), entity.getId());
Expand All @@ -103,6 +117,7 @@ void writeToEntitiesDropped(EntityManager session, PolarisBaseEntity entity) {

void writeToEntitiesChangeTracking(EntityManager session, PolarisBaseEntity entity) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

// Update the existing change tracking if a record with the same ids exists; otherwise, persist
// a new one
Expand All @@ -119,12 +134,14 @@ void writeToEntitiesChangeTracking(EntityManager session, PolarisBaseEntity enti

void writeToGrantRecords(EntityManager session, PolarisGrantRecord grantRec) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

session.persist(ModelGrantRecord.fromGrantRecord(grantRec));
}

void deleteFromEntities(EntityManager session, long catalogId, long entityId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntity model = lookupEntity(session, catalogId, entityId);
diagnosticServices.check(model != null, "entity_not_found");
Expand All @@ -134,6 +151,7 @@ void deleteFromEntities(EntityManager session, long catalogId, long entityId) {

void deleteFromEntitiesActive(EntityManager session, PolarisEntitiesActiveKey key) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntityActive entity = lookupEntityActive(session, key);
diagnosticServices.check(entity != null, "active_entity_not_found");
Expand All @@ -142,6 +160,7 @@ void deleteFromEntitiesActive(EntityManager session, PolarisEntitiesActiveKey ke

void deleteFromEntitiesDropped(EntityManager session, long catalogId, long entityId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntityDropped entity = lookupEntityDropped(session, catalogId, entityId);
diagnosticServices.check(entity != null, "dropped_entity_not_found");
Expand All @@ -151,6 +170,7 @@ void deleteFromEntitiesDropped(EntityManager session, long catalogId, long entit

void deleteFromEntitiesChangeTracking(EntityManager session, PolarisEntityCore entity) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntityChangeTracking entityChangeTracking =
lookupEntityChangeTracking(session, entity.getCatalogId(), entity.getId());
Expand All @@ -161,6 +181,7 @@ void deleteFromEntitiesChangeTracking(EntityManager session, PolarisEntityCore e

void deleteFromGrantRecords(EntityManager session, PolarisGrantRecord grantRec) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelGrantRecord lookupGrantRecord =
lookupGrantRecord(
Expand All @@ -178,6 +199,7 @@ void deleteFromGrantRecords(EntityManager session, PolarisGrantRecord grantRec)

void deleteAllEntityGrantRecords(EntityManager session, PolarisEntityCore entity) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

// Delete grant records from grantRecords tables
lookupAllGrantRecordsOnSecurable(session, entity.getCatalogId(), entity.getId())
Expand All @@ -190,6 +212,7 @@ void deleteAllEntityGrantRecords(EntityManager session, PolarisEntityCore entity

void deleteAll(EntityManager session) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

session.createQuery("DELETE from ModelEntity").executeUpdate();
session.createQuery("DELETE from ModelEntityActive").executeUpdate();
Expand All @@ -203,6 +226,7 @@ void deleteAll(EntityManager session) {

ModelEntity lookupEntity(EntityManager session, long catalogId, long entityId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
Expand All @@ -218,6 +242,7 @@ ModelEntity lookupEntity(EntityManager session, long catalogId, long entityId) {
@SuppressWarnings("unchecked")
List<ModelEntity> lookupEntities(EntityManager session, List<PolarisEntityId> entityIds) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

if (entityIds == null || entityIds.isEmpty()) return new ArrayList<>();

Expand All @@ -234,6 +259,7 @@ List<ModelEntity> lookupEntities(EntityManager session, List<PolarisEntityId> en
ModelEntityActive lookupEntityActive(
EntityManager session, PolarisEntitiesActiveKey entityActiveKey) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
Expand All @@ -254,6 +280,7 @@ long countActiveChildEntities(
long parentId,
@Nullable PolarisEntityType entityType) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

String hql =
"SELECT COUNT(m) from ModelEntityActive m where m.catalogId=:catalogId and m.parentId=:parentId";
Expand All @@ -276,6 +303,7 @@ long countActiveChildEntities(
List<ModelEntity> lookupFullEntitiesActive(
EntityManager session, long catalogId, long parentId, @NotNull PolarisEntityType entityType) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

// Currently check against ENTITIES not joining with ENTITIES_ACTIVE
String hql =
Expand All @@ -293,6 +321,7 @@ List<ModelEntity> lookupFullEntitiesActive(

ModelEntityDropped lookupEntityDropped(EntityManager session, long catalogId, long entityId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
Expand All @@ -308,6 +337,7 @@ ModelEntityDropped lookupEntityDropped(EntityManager session, long catalogId, lo
ModelEntityChangeTracking lookupEntityChangeTracking(
EntityManager session, long catalogId, long entityId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
Expand All @@ -328,6 +358,7 @@ ModelGrantRecord lookupGrantRecord(
long granteeId,
int privilegeCode) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
Expand All @@ -350,6 +381,7 @@ ModelGrantRecord lookupGrantRecord(
List<ModelGrantRecord> lookupAllGrantRecordsOnSecurable(
EntityManager session, long securableCatalogId, long securableId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
Expand All @@ -365,6 +397,7 @@ List<ModelGrantRecord> lookupAllGrantRecordsOnSecurable(
List<ModelGrantRecord> lookupGrantRecordsOnGrantee(
EntityManager session, long granteeCatalogId, long granteeId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
Expand All @@ -379,6 +412,7 @@ List<ModelGrantRecord> lookupGrantRecordsOnGrantee(

ModelPrincipalSecrets lookupPrincipalSecrets(EntityManager session, String clientId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
Expand All @@ -392,6 +426,7 @@ ModelPrincipalSecrets lookupPrincipalSecrets(EntityManager session, String clien

void writePrincipalSecrets(EntityManager session, PolarisPrincipalSecrets principalSecrets) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelPrincipalSecrets modelPrincipalSecrets =
lookupPrincipalSecrets(session, principalSecrets.getPrincipalClientId());
Expand All @@ -406,10 +441,15 @@ void writePrincipalSecrets(EntityManager session, PolarisPrincipalSecrets princi

void deletePrincipalSecrets(EntityManager session, String clientId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelPrincipalSecrets modelPrincipalSecrets = lookupPrincipalSecrets(session, clientId);
diagnosticServices.check(modelPrincipalSecrets != null, "principal_secretes_not_found");

session.remove(modelPrincipalSecrets);
}

private void checkInitialized() {
diagnosticServices.check(this.initialized.get(), "store_not_initialized");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.extension.persistence.impl.eclipselink;

import jakarta.persistence.*;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.polaris.core.persistence.models.ModelSequenceId;
import org.eclipse.persistence.internal.jpa.EntityManagerImpl;
import org.eclipse.persistence.platform.database.DatabasePlatform;
import org.eclipse.persistence.platform.database.PostgreSQLPlatform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used to generate sequence IDs for Polaris entities. If the legacy `POLARIS_SEQ` generator is
* available it will be used then cleaned up. In all other cases the `POLARIS_SEQUENCE` table is
* used directly.
*/
class PolarisSequenceUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(PolarisSequenceUtil.class);

private static final AtomicBoolean initialized = new AtomicBoolean(false);

private PolarisSequenceUtil() {}

/* If `initialize` was never called, throw an exception */
private static void throwIfNotInitialized() {
if (!initialized.get()) {
throw new IllegalStateException("Sequence util has not been initialized");
}
}

/* Get the database platform associated with the `EntityManager` */
private static DatabasePlatform getDatabasePlatform(EntityManager session) {
EntityManagerImpl entityManagerImpl = session.unwrap(EntityManagerImpl.class);
return entityManagerImpl.getDatabaseSession().getPlatform();
}

private static void removeSequence(EntityManager session) {
LOGGER.info("Renaming legacy sequence `POLARIS_SEQ` to `POLARIS_SEQ_UNUSED`");
String renameSequenceQuery = "ALTER SEQUENCE POLARIS_SEQ RENAME TO POLARIS_SEQ_UNUSED";
session.createNativeQuery(renameSequenceQuery).executeUpdate();
}

/**
* Prepare the `PolarisSequenceUtil` to generate IDs. This may run a failing query, so it should
* be called for the first time outside the context of a transaction. This method should be called
* before any other methods. TODO: after a sufficient this can eventually be removed or altered
*/
public static void initialize(EntityManager session) {
// Trigger cleanup of the POLARIS_SEQ if it is present
DatabasePlatform databasePlatform = getDatabasePlatform(session);
if (!initialized.get()) {
if (databasePlatform instanceof PostgreSQLPlatform) {
Optional<Long> result = Optional.empty();
LOGGER.info("Checking if the sequence POLARIS_SEQ exists");
String checkSequenceQuery =
"SELECT COUNT(*) FROM information_schema.sequences WHERE sequence_name IN "
+ "('polaris_seq', 'POLARIS_SEQ')";
int sequenceExists =
((Number) session.createNativeQuery(checkSequenceQuery).getSingleResult()).intValue();

if (sequenceExists > 0) {
LOGGER.info("POLARIS_SEQ exists, calling NEXTVAL");
long queryResult =
(long) session.createNativeQuery("SELECT NEXTVAL('POLARIS_SEQ')").getSingleResult();
result = Optional.of(queryResult);
} else {
LOGGER.info("POLARIS_SEQ does not exist, skipping NEXTVAL");
}
result.ifPresent(
r -> {
ModelSequenceId modelSequenceId = new ModelSequenceId();
modelSequenceId.setId(r);

// Persist the new ID:
session.persist(modelSequenceId);
session.flush();

// Clean the sequence:
removeSequence(session);
});
}
initialized.set(true);
}
}

/**
* Generates a new ID from `POLARIS_SEQUENCE` or `POLARIS_SEQ` depending on availability.
* `initialize` should be called before this method.
*/
public static Long getNewId(EntityManager session) {
throwIfNotInitialized();

ModelSequenceId modelSequenceId = new ModelSequenceId();

// Persist the new ID:
session.persist(modelSequenceId);
session.flush();

return modelSequenceId.getId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ public class ModelEntity {
private long lastUpdateTimestamp;

// properties, serialized as a JSON string
@Column(length = 65535)
@Column(columnDefinition = "TEXT")
private String properties;

// internal properties, serialized as a JSON string
@Column(length = 65535)
@Column(columnDefinition = "TEXT")
private String internalProperties;

// current version for that entity, will be monotonically incremented
Expand Down
Loading

0 comments on commit 537eee6

Please sign in to comment.