diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala index b715b7a0be8..b0e949fc189 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala @@ -467,7 +467,7 @@ private[delta] class ConflictChecker( winningDomainMetadataMap.get(domainMetadataFromCurrentTransaction.domain)) match { // No-conflict case. case (domain, None) => domain - case (domain, _) if RowTrackingMetadataDomain.isRowTrackingDomain(domain) => domain + case (domain, _) if RowTrackingMetadataDomain.isSameDomain(domain) => domain case (_, Some(_)) => // Any conflict not specifically handled by a previous case must fail the transaction. throw new io.delta.exceptions.ConcurrentTransactionException( @@ -541,7 +541,7 @@ private[delta] class ConflictChecker( } Some(a.copy(baseRowId = Some(newBaseRowId))) // The row ID high water mark will be replaced if it exists. - case d: DomainMetadata if RowTrackingMetadataDomain.isRowTrackingDomain(d) => None + case d: DomainMetadata if RowTrackingMetadataDomain.isSameDomain(d) => None case a => Some(a) } currentTransactionInfo = currentTransactionInfo.copy( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/JsonMetadataDomain.scala b/spark/src/main/scala/org/apache/spark/sql/delta/JsonMetadataDomain.scala index 2dd783dd649..48d4c8b176f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/JsonMetadataDomain.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/JsonMetadataDomain.scala @@ -49,5 +49,7 @@ abstract class JsonMetadataDomainUtils[T: Manifest] { protected def fromJsonConfiguration(domain: DomainMetadata): T = JsonUtils.fromJson[T](domain.configuration) + + def isSameDomain(d: DomainMetadata): Boolean = d.domain == domainName } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 205dcf1635d..2ec0279a2d5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1599,7 +1599,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite dataChanged = true } // Row tracking is able to resolve write conflicts regardless of isolation level. - case d: DomainMetadata if RowTrackingMetadataDomain.isRowTrackingDomain(d) => + case d: DomainMetadata if RowTrackingMetadataDomain.isSameDomain(d) => // Do nothing case _ => hasIncompatibleActions = true diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala b/spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala index 14e7a442c39..5490cb6eb9a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala @@ -38,8 +38,6 @@ object RowId { case d: DomainMetadata if d.domain == domainName => Some(fromJsonConfiguration(d)) case _ => None } - - def isRowTrackingDomain(d: DomainMetadata): Boolean = d.domain == domainName } val MISSING_HIGH_WATER_MARK: Long = -1L @@ -87,7 +85,7 @@ object RowId { throw DeltaErrors.rowIdAssignmentWithoutStats } a.copy(baseRowId = Some(baseRowId)) - case d: DomainMetadata if RowTrackingMetadataDomain.isRowTrackingDomain(d) => + case d: DomainMetadata if RowTrackingMetadataDomain.isSameDomain(d) => throw new IllegalStateException( "Manually setting the Row ID high water mark is not allowed") case other => other