From a27002f03f38d937ff2f20f3dd5467a5a5c3760e Mon Sep 17 00:00:00 2001 From: Dries Samyn Date: Tue, 29 Oct 2024 18:20:24 +0100 Subject: [PATCH] Remove the need for hibernate and javax.persistence in the uniqueness lib. --- .../impl/JPABackingStoreOsgiImplBenchmark.kt | 2 +- ...JPABackingStoreOsgiImplIntegrationTests.kt | 92 ++++-- .../impl/osgi/JPABackingStoreOsgiImpl.kt | 17 +- .../impl/JPABackingStoreOsgiImplTest.kt | 209 +++++------- ...UniquenessCheckerImplDBIntegrationTests.kt | 17 +- libs/ledger-lib-uniqueness/build.gradle | 9 + .../PersistenceExceptionDbIntegrationTest.kt | 162 ++++++++++ .../SqlSessionDbIntegrationTest.kt | 301 ++++++++++++++++++ .../TransactionOpsDbIntegrationTest.kt | 292 +++++++++++++++++ .../libs/uniqueness/backingstore/Utils.kt | 54 ++++ .../impl/DefaultSqlQueryProvider.kt | 87 +++++ .../backingstore/impl/SqlQueryProvider.kt | 11 + .../uniqueness/backingstore/BackingStore.kt | 2 + .../backingstore/impl/SqlBackingStoreImpl.kt | 43 +++ .../SqlPersistenceExceptionCategorizerImpl.kt | 84 +++++ .../backingstore/impl/SqlSessionImpl.kt | 253 +++++++++++++++ .../impl/SqlTransactionOpsImpl.kt | 106 ++++++ .../libs/uniqueness/SqlSessionImplTest.kt | 104 ++++++ .../UniquenessCheckRequestInternal.kt | 2 +- 19 files changed, 1664 insertions(+), 183 deletions(-) create mode 100644 libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/PersistenceExceptionDbIntegrationTest.kt create mode 100644 libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/SqlSessionDbIntegrationTest.kt create mode 100644 libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/TransactionOpsDbIntegrationTest.kt create mode 100644 libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/Utils.kt create mode 100644 libs/ledger-lib-uniqueness/src/main/java/net/corda/ledger/libs/uniqueness/backingstore/impl/DefaultSqlQueryProvider.kt create mode 100644 libs/ledger-lib-uniqueness/src/main/java/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlQueryProvider.kt create mode 100644 libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlBackingStoreImpl.kt create mode 100644 libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlPersistenceExceptionCategorizerImpl.kt create mode 100644 libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlSessionImpl.kt create mode 100644 libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlTransactionOpsImpl.kt create mode 100644 libs/ledger-lib-uniqueness/src/test/kotlin/net/corda/ledger/libs/uniqueness/SqlSessionImplTest.kt diff --git a/components/uniqueness/backing-store-impl/src/backingStoreBenchmark/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplBenchmark.kt b/components/uniqueness/backing-store-impl/src/backingStoreBenchmark/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplBenchmark.kt index d7f14370097..8c8c7d25fae 100644 --- a/components/uniqueness/backing-store-impl/src/backingStoreBenchmark/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplBenchmark.kt +++ b/components/uniqueness/backing-store-impl/src/backingStoreBenchmark/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplBenchmark.kt @@ -155,7 +155,7 @@ class JPABackingStoreOsgiImplBenchmark { mock(), jpaEntitiesRegistry, dbConnectionManager, - JPABackingStoreOsgiImpl(jpaEntitiesRegistry, dbConnectionManager, mock(), virtualNodeInfoReadService, metrics, secureHashFactory) + JPABackingStoreOsgiImpl(jpaEntitiesRegistry, dbConnectionManager, virtualNodeInfoReadService, metrics, secureHashFactory) ).apply { eventHandler(RegistrationStatusChangeEvent(mock(), LifecycleStatus.UP), mock()) } diff --git a/components/uniqueness/backing-store-impl/src/integrationTest/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplIntegrationTests.kt b/components/uniqueness/backing-store-impl/src/integrationTest/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplIntegrationTests.kt index 3ea38a11233..24becec9d6b 100644 --- a/components/uniqueness/backing-store-impl/src/integrationTest/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplIntegrationTests.kt +++ b/components/uniqueness/backing-store-impl/src/integrationTest/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplIntegrationTests.kt @@ -8,13 +8,12 @@ import net.corda.db.testkit.DatabaseInstaller import net.corda.db.testkit.DbUtils import net.corda.db.testkit.TestDbInfo import net.corda.ledger.libs.uniqueness.backingstore.impl.JPABackingStoreEntities -import net.corda.ledger.libs.uniqueness.backingstore.impl.UniquenessTransactionDetailEntity import net.corda.ledger.libs.uniqueness.backingstore.impl.jpaBackingStoreObjectMapper import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity import net.corda.libs.packaging.core.CpiIdentifier +import net.corda.orm.EntityManagerConfiguration import net.corda.orm.impl.EntityManagerFactoryFactoryImpl import net.corda.orm.impl.JpaEntitiesRegistryImpl -import net.corda.orm.impl.PersistenceExceptionCategorizerImpl import net.corda.test.util.identity.createTestHoldingIdentity import net.corda.test.util.time.AutoTickTestClock import net.corda.uniqueness.backingstore.impl.osgi.JPABackingStoreOsgiImpl @@ -43,6 +42,7 @@ import net.corda.virtualnode.VirtualNodeInfo import net.corda.virtualnode.read.VirtualNodeInfoReadService import org.assertj.core.api.Assertions.assertThat import org.hibernate.Session +import org.hibernate.internal.SessionImpl import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Disabled @@ -53,8 +53,6 @@ import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource import org.mockito.Mockito import org.mockito.kotlin.any import org.mockito.kotlin.doReturn @@ -64,6 +62,9 @@ import org.mockito.kotlin.mock import org.mockito.kotlin.never import org.mockito.kotlin.times import org.mockito.kotlin.whenever +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.SQLException import java.time.Duration import java.time.Instant import java.time.LocalDate @@ -73,21 +74,12 @@ import java.util.UUID import java.util.concurrent.Callable import java.util.concurrent.ExecutionException import java.util.concurrent.Executors -import javax.persistence.EntityExistsException import javax.persistence.EntityManagerFactory -import javax.persistence.OptimisticLockException import javax.persistence.RollbackException -import kotlin.reflect.full.createInstance /** - * Note: To run tests against PostgreSQL, follow the steps in the link below. - * https://github.com/corda/corda-runtime-os/wiki/Debugging-integration-tests#debugging-integration-tests-with-postgres - * - * Also, in order to run the Intellij Code Coverage feature, you may need to exclude the following types to avoid - * Hibernate related errors. - * - org.hibernate.hql.internal.antlr.HqlTokenTypes - * - org.hibernate.hql.internal.antlr.SqlTokenTypes - * - org.hibernate.sql.ordering.antlr.GeneratedOrderByFragmentRendererTokenTypes + * Note: To run tests against PostgreSQL, run: + * `gradle :components:uniqueness:backing-store-impl:integrationTest -PdatabaseType=POSTGRES` */ @Suppress("FunctionName") @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -103,12 +95,19 @@ class JPABackingStoreOsgiImplIntegrationTests { .let { UniquenessHoldingIdentity(it.x500Name, it.groupId, it.shortHash, it.hash) } private val notaryVNodeIdentityDbName = VirtualNodeDbType.UNIQUENESS.getSchemaName(notaryVNodeIdentity.shortHash) private val notaryVNodeIdentityDbId = UUID.randomUUID() - private val dbConfig = DbUtils.getEntityManagerConfiguration(notaryVNodeIdentityDbName) + private val dbConfig: EntityManagerConfiguration private val databaseInstaller = DatabaseInstaller( EntityManagerFactoryFactoryImpl(), LiquibaseSchemaMigratorImpl(), JpaEntitiesRegistryImpl() ) + + init { + // uncomment this to run the test against local Postgres + System.setProperty("databaseType", "POSTGRES") + dbConfig = DbUtils.getEntityManagerConfiguration(notaryVNodeIdentityDbName) + } + private val notaryVNodeEmFactory: EntityManagerFactory = databaseInstaller.setupDatabase( TestDbInfo(name = "unique_test_default", schemaName = notaryVNodeIdentityDbName, rewriteBatchedInserts = true), "vnode-uniqueness", @@ -161,7 +160,7 @@ class JPABackingStoreOsgiImplIntegrationTests { return JPABackingStoreOsgiImpl( JpaEntitiesRegistryImpl(), dbConnectionManager, - PersistenceExceptionCategorizerImpl(), +// PersistenceExceptionCategorizerImpl(), virtualNodeInfoReadService, JPABackingStoreOsgiMetricsFactory(), secureHashFactory @@ -409,6 +408,10 @@ class JPABackingStoreOsgiImplIntegrationTests { .writeValueAsBytes(UniquenessCheckErrorMalformedRequestImpl("")).size // Available characters that need filling is the hard-coded limit minus fixed size + // NOTE: this is incorrect. + // The type of VARBINARY(1024) as set in Liquibase, does not exist in Postgesql, and instead a BYTEA is used + // which fits 1Gb of space. + // For now, this limitation is put into place to replicate the existing behaviour, but we should consider relaxing this. val maxErrMsgLength = UniquenessConstants.REJECTED_TRANSACTION_ERROR_DETAILS_LENGTH - baseObjectSize @@ -657,17 +660,24 @@ class JPABackingStoreOsgiImplIntegrationTests { fun `Exceptions thrown while querying triggers retry`() { val spyEmFactory = Mockito.spy(createEntityManagerFactory()) val spyEm = Mockito.spy(spyEmFactory.createEntityManager()) + val statementMock = mock { + on { executeQuery() } doThrow SQLException("Connection is closed") + } + val connectionMock = mock { + on { prepareStatement(any()) } doReturn statementMock + } + val mockSession = mock { + on { connection() } doReturn connectionMock + } Mockito.doReturn(spyEm).whenever(spyEmFactory).createEntityManager() + Mockito.doReturn(mockSession).whenever(spyEm).unwrap(eq(Session::class.java)) - val mockSession = mock { - on { byMultipleIds(eq(UniquenessTransactionDetailEntity::class.java)) } doThrow EntityExistsException() - } Mockito.doReturn(spyEm).whenever(spyEmFactory).createEntityManager() Mockito.doReturn(mockSession).whenever(spyEm).unwrap(eq(Session::class.java)) val storeImpl = createBackingStoreImpl(spyEmFactory) - assertThrows { + assertThrows { storeImpl.session(notaryVNodeUniquenessHoldingIdentity) { session -> val txIds = listOf(randomSecureHash()) session.getTransactionDetails(txIds) @@ -676,13 +686,22 @@ class JPABackingStoreOsgiImplIntegrationTests { } // Review with CORE-4983 for different types of exceptions such as PersistenceException. - @ParameterizedTest - @ValueSource(classes = [EntityExistsException::class, RollbackException::class, OptimisticLockException::class]) - fun `Persistence errors raised while persisting triggers retry`(e: Class) { + @Test + fun `Persistence errors raised while persisting triggers retry`() { val spyEmFactory = Mockito.spy(createEntityManagerFactory()) val spyEm = Mockito.spy(spyEmFactory.createEntityManager()) - Mockito.doThrow(e.kotlin.createInstance()).whenever(spyEm).persist(any()) + + val statementMock = mock { + on { executeBatch() } doThrow SQLException("Connection is closed") + } + val connectionMock = mock { + on { prepareStatement(any()) } doReturn statementMock + } + val mockSession = mock { + on { connection() } doReturn connectionMock + } Mockito.doReturn(spyEm).whenever(spyEmFactory).createEntityManager() + Mockito.doReturn(mockSession).whenever(spyEm).unwrap(eq(Session::class.java)) val storeImpl = createBackingStoreImpl(spyEmFactory) @@ -693,16 +712,26 @@ class JPABackingStoreOsgiImplIntegrationTests { session.executeTransaction { _, txnOps -> txnOps.createUnconsumedStates(stateRefs) } } } - Mockito.verify(spyEm, times(MAX_ATTEMPTS)).persist(any()) + Mockito.verify(connectionMock, times(MAX_ATTEMPTS)).rollback() + Mockito.verify(statementMock, times(MAX_ATTEMPTS)).executeBatch() } @Test fun `Transaction rollback gets triggered if transaction is active for an unexpected exception type`() { val spyEmFactory = Mockito.spy(createEntityManagerFactory()) val spyEm = Mockito.spy(spyEmFactory.createEntityManager()) - val spyEmTransaction = Mockito.spy(spyEm.transaction) + + val statementMock = mock { + on { executeBatch() } doThrow SQLException("Connection is closed") + } + val connectionMock = mock { + on { prepareStatement(any()) } doReturn statementMock + } + val mockSession = mock { + on { connection() } doReturn connectionMock + } Mockito.doReturn(spyEm).whenever(spyEmFactory).createEntityManager() - Mockito.doReturn(spyEmTransaction).whenever(spyEm).transaction + Mockito.doReturn(mockSession).whenever(spyEm).unwrap(eq(Session::class.java)) val storeImpl = createBackingStoreImpl(spyEmFactory) @@ -711,10 +740,9 @@ class JPABackingStoreOsgiImplIntegrationTests { session.executeTransaction { _, _ -> throw DummyException("dummy exception") } } } - // Note that unlike for expected exceptions, no retry should happen. - Mockito.verify(spyEmTransaction, times(1)).begin() - Mockito.verify(spyEmTransaction, never()).commit() - Mockito.verify(spyEmTransaction, times(1)).rollback() + + Mockito.verify(connectionMock, never()).commit() + Mockito.verify(connectionMock, times(1)).rollback() } } diff --git a/components/uniqueness/backing-store-impl/src/main/kotlin/net/corda/uniqueness/backingstore/impl/osgi/JPABackingStoreOsgiImpl.kt b/components/uniqueness/backing-store-impl/src/main/kotlin/net/corda/uniqueness/backingstore/impl/osgi/JPABackingStoreOsgiImpl.kt index 7ff58125d91..a2a14f32fba 100644 --- a/components/uniqueness/backing-store-impl/src/main/kotlin/net/corda/uniqueness/backingstore/impl/osgi/JPABackingStoreOsgiImpl.kt +++ b/components/uniqueness/backing-store-impl/src/main/kotlin/net/corda/uniqueness/backingstore/impl/osgi/JPABackingStoreOsgiImpl.kt @@ -6,12 +6,13 @@ import net.corda.ledger.libs.uniqueness.UniquenessSecureHashFactory import net.corda.ledger.libs.uniqueness.backingstore.BackingStore import net.corda.ledger.libs.uniqueness.backingstore.BackingStoreMetricsFactory import net.corda.ledger.libs.uniqueness.backingstore.impl.JPABackingStoreEntities -import net.corda.ledger.libs.uniqueness.backingstore.impl.JPABackingStoreImpl +import net.corda.ledger.libs.uniqueness.backingstore.impl.SqlBackingStoreImpl import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity import net.corda.libs.virtualnode.common.exception.VirtualNodeNotFoundException import net.corda.orm.JpaEntitiesRegistry -import net.corda.orm.PersistenceExceptionCategorizer import net.corda.virtualnode.read.VirtualNodeInfoReadService +import org.hibernate.Session +import org.hibernate.internal.SessionImpl import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference @@ -26,19 +27,19 @@ class JPABackingStoreOsgiImpl(delegate: BackingStore, jpaEntitiesRegistry: JpaEn jpaEntitiesRegistry: JpaEntitiesRegistry, @Reference(service = DbConnectionManager::class) dbConnectionManager: DbConnectionManager, - @Reference(service = PersistenceExceptionCategorizer::class) - persistenceExceptionCategorizer: PersistenceExceptionCategorizer, @Reference(service = VirtualNodeInfoReadService::class) virtualNodeInfoReadService: VirtualNodeInfoReadService, @Reference(service = BackingStoreMetricsFactory::class) backingStoreMetricsFactory: BackingStoreMetricsFactory, @Reference(service = UniquenessSecureHashFactory::class) - uniquenessSecureHashFactory: UniquenessSecureHashFactory + uniquenessSecureHashFactory: UniquenessSecureHashFactory, ) : this( - JPABackingStoreImpl( - getEntityManagerFactory = { getEntityManagerFactory(virtualNodeInfoReadService, dbConnectionManager, jpaEntitiesRegistry, it) }, + SqlBackingStoreImpl( + connectionFactory = { + val emf = getEntityManagerFactory(virtualNodeInfoReadService, dbConnectionManager, jpaEntitiesRegistry, it) + (emf.createEntityManager().unwrap(Session::class.java) as SessionImpl).connection() + }, backingStoreMetricsFactory = backingStoreMetricsFactory, - persistenceExceptionCategorizer = persistenceExceptionCategorizer, uniquenessSecureHashFactory = uniquenessSecureHashFactory ), jpaEntitiesRegistry diff --git a/components/uniqueness/backing-store-impl/src/test/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplTest.kt b/components/uniqueness/backing-store-impl/src/test/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplTest.kt index 595064637dd..bb643ad651f 100644 --- a/components/uniqueness/backing-store-impl/src/test/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplTest.kt +++ b/components/uniqueness/backing-store-impl/src/test/kotlin/net/corda/uniqueness/backingstore/impl/JPABackingStoreOsgiImplTest.kt @@ -5,6 +5,7 @@ import net.corda.crypto.testkit.SecureHashUtils.randomSecureHash import net.corda.db.connection.manager.DbConnectionManager import net.corda.db.core.CloseableDataSource import net.corda.ledger.libs.uniqueness.backingstore.BackingStoreMetricsFactory +import net.corda.ledger.libs.uniqueness.backingstore.impl.DefaultSqlQueryProvider import net.corda.ledger.libs.uniqueness.backingstore.impl.UniquenessRejectedTransactionEntity import net.corda.ledger.libs.uniqueness.backingstore.impl.UniquenessStateDetailEntity import net.corda.ledger.libs.uniqueness.backingstore.impl.UniquenessTransactionDetailEntity @@ -15,12 +16,9 @@ import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity import net.corda.libs.packaging.core.CpiIdentifier import net.corda.orm.JpaEntitiesRegistry import net.corda.orm.JpaEntitiesSet -import net.corda.orm.PersistenceExceptionCategorizer -import net.corda.orm.PersistenceExceptionType import net.corda.test.util.identity.createTestHoldingIdentity import net.corda.uniqueness.backingstore.impl.osgi.JPABackingStoreOsgiImpl import net.corda.uniqueness.backingstore.impl.osgi.UniquenessSecureHashFactoryOsgiImpl -import net.corda.uniqueness.datamodel.common.UniquenessConstants import net.corda.uniqueness.datamodel.impl.UniquenessCheckErrorMalformedRequestImpl import net.corda.v5.application.uniqueness.model.UniquenessCheckErrorMalformedRequest import net.corda.v5.application.uniqueness.model.UniquenessCheckResultFailure @@ -29,9 +27,9 @@ import net.corda.virtualnode.read.VirtualNodeInfoReadService import org.assertj.core.api.Assertions.assertThat import org.hibernate.MultiIdentifierLoadAccess import org.hibernate.Session +import org.hibernate.internal.SessionImpl import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.mockito.Mockito.verify import org.mockito.kotlin.any @@ -42,29 +40,24 @@ import org.mockito.kotlin.mock import org.mockito.kotlin.times import org.mockito.kotlin.whenever import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.ResultSet +import java.sql.Timestamp import java.time.Instant -import java.time.LocalDate -import java.time.ZoneOffset +import java.util.Calendar import java.util.UUID import javax.persistence.EntityManager import javax.persistence.EntityManagerFactory import javax.persistence.EntityTransaction -import javax.persistence.OptimisticLockException import javax.persistence.TypedQuery class JPABackingStoreOsgiImplTest { - - private companion object { - private const val MAX_ATTEMPTS = 10 - } - private val entityManager = mock() private val entityTransaction = mock() private val entityManagerFactory = mock() private val dummyDataSource = mock() private val jpaEntitiesRegistry = mock() private val dbConnectionManager = mock() - private val persistenceExceptionCategorizer = mock() private val virtualNodeInfoReadService = mock() private val metricsFactory = mock() @@ -75,7 +68,6 @@ class JPABackingStoreOsgiImplTest { private val backingStore = JPABackingStoreOsgiImpl( jpaEntitiesRegistry, dbConnectionManager, - persistenceExceptionCategorizer, virtualNodeInfoReadService, metricsFactory, secureHashFactory @@ -90,8 +82,7 @@ class JPABackingStoreOsgiImplTest { private val notaryRepIdentity = createTestHoldingIdentity("C=GB, L=London, O=NotaryRep1", groupId).let { UniquenessHoldingIdentity(it.x500Name, it.groupId, it.shortHash, it.hash) } - - private val originatorX500Name = "C=GB, L=London, O=Alice" + private val mockConnection = mock() @Suppress("ComplexMethod") @BeforeEach @@ -111,12 +102,11 @@ class JPABackingStoreOsgiImplTest { whenever(resultList) doReturn errorEntities } - val dummySession = mock().apply { - // No need do anything here as this will have no effect in a unit test - whenever(setJdbcBatchSize(any())).thenAnswer { } - + val dummySession = mock().apply { whenever(byMultipleIds(UniquenessStateDetailEntity::class.java)) doReturn stateMultiLoad whenever(byMultipleIds(UniquenessTransactionDetailEntity::class.java)) doReturn txMultiLoad + + whenever(connection()) doReturn mockConnection } whenever(entityManager.transaction) doReturn entityTransaction @@ -168,7 +158,7 @@ class JPABackingStoreOsgiImplTest { @Test fun `Session always closes entity manager after use`() { backingStore.session(notaryRepIdentity) { } - verify(entityManager, times(1)).close() + verify(mockConnection, times(1)).close() } @Test @@ -177,7 +167,7 @@ class JPABackingStoreOsgiImplTest { assertThrows { backingStore.session(notaryRepIdentity) { throw RuntimeException("test exception") } } - verify(entityManager, times(1)).close() + verify(mockConnection, times(1)).close() } @Test @@ -186,24 +176,40 @@ class JPABackingStoreOsgiImplTest { session.executeTransaction { _, _ -> } } - verify(entityTransaction, times(1)).begin() - verify(entityTransaction, times(1)).commit() - verify(entityManager, times(1)).close() + verify(mockConnection, times(1)).commit() + verify(mockConnection, times(1)).close() } @Test fun `Throw if no error detail is available for a failed transaction`() { - // Prepare a rejected transaction - txnDetails.add( - UniquenessTransactionDetailEntity( - "SHA-256", - "0xA1".toByteArray(), - originatorX500Name, - LocalDate.parse("2099-12-12").atStartOfDay().toInstant(ZoneOffset.UTC), - LocalDate.now().atStartOfDay().toInstant(ZoneOffset.UTC), - UniquenessConstants.RESULT_REJECTED_REPRESENTATION - ) - ) + val txId = randomSecureHash() + + // NOTE: this isn't really a good unit test as it's testing the side effects of the class's dependency + // so ths mocking gets messy. + // it would be better to just verify the expected queries are called, + // then unit test the functions that do the parsing + val sqlQueryProvider = DefaultSqlQueryProvider() + val mockResultSet = mock { + on { next() }.thenReturn(true).thenReturn(false) + on { getString(1) } doReturn (txId.algorithm) + on { getBytes(2) } doReturn (txId.bytes) + on { getString(3) } doReturn ("R") + on { getTimestamp(eq(4), any())} doReturn (Timestamp.from(Instant.now())) + } + val mockPreparedStatement = mock { + on { executeQuery() } doReturn mockResultSet + } + whenever(mockConnection.prepareStatement(sqlQueryProvider.findTransactionDetailByKeyQuery())) + .doReturn(mockPreparedStatement) + + val mockErrorResultSet = mock { + on { next() } doReturn false + } + val mockErrorPreparedStatement = mock { + on { executeQuery() } doReturn mockErrorResultSet + } + whenever(mockConnection.prepareStatement(sqlQueryProvider.findRejectedTransactionQuery())) + .doReturn(mockErrorPreparedStatement) // Expect an exception because no error details is available from the mock. assertThrows { @@ -216,113 +222,46 @@ class JPABackingStoreOsgiImplTest { @Test fun `Retrieve correct failed status without exceptions when both tx details and rejection details are present`() { val txId = randomSecureHash() - // Prepare a rejected transaction - txnDetails.add( - UniquenessTransactionDetailEntity( - "SHA-256", - txId.bytes, - originatorX500Name, - LocalDate.parse("2099-12-12").atStartOfDay().toInstant(ZoneOffset.UTC), - LocalDate.now().atStartOfDay().toInstant(ZoneOffset.UTC), - UniquenessConstants.RESULT_REJECTED_REPRESENTATION - ) - ) - - errorEntities.add( - UniquenessRejectedTransactionEntity( - "SHA-256", - txId.bytes, - jpaBackingStoreObjectMapper(secureHashFactory).writeValueAsBytes( - UniquenessCheckErrorMalformedRequestImpl("Error") - ) - ) - ) - - backingStore.session(notaryRepIdentity) { session -> - val txResult = session.getTransactionDetails(listOf(txId))[txId]?.result!! - assertThat(txResult).isInstanceOf(UniquenessCheckResultFailure::class.java) - assertThat((txResult as UniquenessCheckResultFailure).error) - .isInstanceOf(UniquenessCheckErrorMalformedRequest::class.java) + // NOTE: this isn't really a good unit test as it's testing the side effects of the class's dependency + // so ths mocking gets messy. + // it would be better to just verify the expected queries are called, + // then unit test the functions that do the parsing + val sqlQueryProvider = DefaultSqlQueryProvider() + val mockResultSet = mock { + on { next() }.thenReturn(true).thenReturn(false) + on { getString(1) } doReturn (txId.algorithm) + on { getBytes(2) } doReturn (txId.bytes) + on { getString(3) } doReturn ("R") + on { getTimestamp(eq(4), any())} doReturn (Timestamp.from(Instant.now())) } - } - - @Test - fun `Executing transaction does not retry upon fatal exception`() { - whenever(persistenceExceptionCategorizer.categorize(any())).thenReturn(PersistenceExceptionType.FATAL) - var execCounter = 0 - assertThrows { - backingStore.session(notaryRepIdentity) { session -> - session.executeTransaction { _, _ -> - execCounter++ - throw DummyException() - } - } + val mockPreparedStatement = mock { + on { executeQuery() } doReturn mockResultSet } - assertThat(execCounter).isEqualTo(1) - } + whenever(mockConnection.prepareStatement(sqlQueryProvider.findTransactionDetailByKeyQuery())) + .doReturn(mockPreparedStatement) - @Test - fun `Executing transaction retries upon data_related exception`() { - whenever(persistenceExceptionCategorizer.categorize(any())).thenReturn(PersistenceExceptionType.DATA_RELATED) - var execCounter = 0 - assertThrows { - backingStore.session(notaryRepIdentity) { session -> - session.executeTransaction { _, _ -> - execCounter++ - throw DummyException() - } - } - } - assertThat(execCounter).isEqualTo(MAX_ATTEMPTS) - } - @Test - fun `Executing transaction retries upon transient exception`() { - whenever(persistenceExceptionCategorizer.categorize(any())).thenReturn(PersistenceExceptionType.TRANSIENT) - var execCounter = 0 - assertThrows { - backingStore.session(notaryRepIdentity) { session -> - session.executeTransaction { _, _ -> - execCounter++ - throw DummyException() - } - } + val error = jpaBackingStoreObjectMapper(secureHashFactory).writeValueAsBytes( + UniquenessCheckErrorMalformedRequestImpl("Error") + ) + val mockErrorResultSet = mock { + on { next() }.thenReturn(true).thenReturn(false) + on { getBytes(1) } doReturn error } - assertThat(execCounter).isEqualTo(MAX_ATTEMPTS) - } - - @Test - fun `Executing transaction does not retry upon uncategorized exception`() { - whenever(persistenceExceptionCategorizer.categorize(any())).thenReturn(PersistenceExceptionType.UNCATEGORIZED) - var execCounter = 0 - assertThrows { - backingStore.session(notaryRepIdentity) { session -> - session.executeTransaction { _, _ -> - execCounter++ - throw DummyException() - } - } + val mockErrorPreparedStatement = mock { + on { executeQuery() } doReturn mockErrorResultSet } - assertThat(execCounter).isEqualTo(1) - } + whenever(mockConnection.prepareStatement(sqlQueryProvider.findRejectedTransactionQuery())) + .doReturn(mockErrorPreparedStatement) + whenever(mockErrorResultSet.getBytes(1)).doReturn(error) - @Test - fun `Executing transaction succeeds after transient failures`() { - whenever(persistenceExceptionCategorizer.categorize(any())).thenReturn(PersistenceExceptionType.TRANSIENT) - val retryCnt = 3 - var execCounter = 0 - assertDoesNotThrow { - backingStore.session(notaryRepIdentity) { session -> - session.executeTransaction { _, _ -> - execCounter++ - if (execCounter < retryCnt) - throw OptimisticLockException() - } - } + backingStore.session(notaryRepIdentity) { session -> + val txResult = session.getTransactionDetails(listOf(txId))[txId]?.result + + assertThat(txResult).isInstanceOf(UniquenessCheckResultFailure::class.java) + assertThat((txResult as UniquenessCheckResultFailure).error) + .isInstanceOf(UniquenessCheckErrorMalformedRequest::class.java) } - assertThat(execCounter).isEqualTo(retryCnt) } - - class DummyException(message: String = "") : Exception(message) } \ No newline at end of file diff --git a/components/uniqueness/uniqueness-checker-impl/src/integrationTest/kotlin/net/corda/uniqueness/checker/impl/UniquenessCheckerImplDBIntegrationTests.kt b/components/uniqueness/uniqueness-checker-impl/src/integrationTest/kotlin/net/corda/uniqueness/checker/impl/UniquenessCheckerImplDBIntegrationTests.kt index eef7bbec8ce..f5a1e527846 100644 --- a/components/uniqueness/uniqueness-checker-impl/src/integrationTest/kotlin/net/corda/uniqueness/checker/impl/UniquenessCheckerImplDBIntegrationTests.kt +++ b/components/uniqueness/uniqueness-checker-impl/src/integrationTest/kotlin/net/corda/uniqueness/checker/impl/UniquenessCheckerImplDBIntegrationTests.kt @@ -15,9 +15,9 @@ import net.corda.ledger.libs.uniqueness.data.UniquenessCheckRequest import net.corda.ledger.libs.uniqueness.data.UniquenessCheckResponse import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity import net.corda.libs.packaging.core.CpiIdentifier +import net.corda.orm.EntityManagerConfiguration import net.corda.orm.impl.EntityManagerFactoryFactoryImpl import net.corda.orm.impl.JpaEntitiesRegistryImpl -import net.corda.orm.impl.PersistenceExceptionCategorizerImpl import net.corda.test.util.identity.createTestHoldingIdentity import net.corda.test.util.time.AutoTickTestClock import net.corda.uniqueness.backingstore.impl.JPABackingStoreTestUtilities @@ -71,11 +71,17 @@ import kotlin.test.assertEquals // TODO - Find an elegant way to avoid duplication of unit tests @TestInstance(TestInstance.Lifecycle.PER_CLASS) class UniquenessCheckerImplDBIntegrationTests { + private val clusterDbConfig: EntityManagerConfiguration - private val clusterDbConfig = DbUtils.getEntityManagerConfiguration( - inMemoryDbName = "clusterdb", - showSql = false - ) + init { + // uncomment this to run the test against local Postgres +// System.setProperty("databaseType", "POSTGRES") + + clusterDbConfig = DbUtils.getEntityManagerConfiguration( + inMemoryDbName = "clusterdb", + showSql = false + ) + } private val baseTime: Instant = Instant.EPOCH @@ -282,7 +288,6 @@ class UniquenessCheckerImplDBIntegrationTests { eq(noDbHoldingIdentityDbId), any(), any())) doThrow DBConfigurationException("") whenever(getClusterDataSource()) doReturn clusterDbConfig.dataSource }, - PersistenceExceptionCategorizerImpl(), mock().apply { whenever(getByHoldingIdentityShortHash(eq(defaultHoldingIdentity.shortHash))).thenReturn( VirtualNodeInfo( diff --git a/libs/ledger-lib-uniqueness/build.gradle b/libs/ledger-lib-uniqueness/build.gradle index f05483a3496..fcf998a8a54 100644 --- a/libs/ledger-lib-uniqueness/build.gradle +++ b/libs/ledger-lib-uniqueness/build.gradle @@ -34,6 +34,15 @@ dependencies { testImplementation project(":testing:test-utilities") testImplementation project(":testing:uniqueness:backing-store-fake") testImplementation project(":testing:uniqueness:uniqueness-utilities") + + integrationTestImplementation 'net.corda:corda-db-schema' + integrationTestImplementation project(':libs:db:db-core') + integrationTestImplementation project(':libs:db:db-admin') + integrationTestImplementation project(':libs:db:db-admin-impl') + integrationTestImplementation project(':testing:db-testkit') + + integrationTestRuntimeOnly "org.hsqldb:hsqldb:$hsqldbVersion" + integrationTestRuntimeOnly libs.postgresql.jdbc } tasks.named('jar', Jar) { diff --git a/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/PersistenceExceptionDbIntegrationTest.kt b/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/PersistenceExceptionDbIntegrationTest.kt new file mode 100644 index 00000000000..9ede5219418 --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/PersistenceExceptionDbIntegrationTest.kt @@ -0,0 +1,162 @@ +package net.corda.ledger.libs.uniqueness.backingstore + +import net.corda.db.admin.impl.ClassloaderChangeLog +import net.corda.db.admin.impl.LiquibaseSchemaMigratorImpl +import net.corda.db.schema.DbSchema +import net.corda.db.testkit.DbUtils +import net.corda.ledger.libs.uniqueness.backingstore.impl.SqlPersistenceExceptionCategorizerImpl +import net.corda.orm.EntityManagerConfiguration +import net.corda.orm.PersistenceExceptionType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assumptions +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.assertThrows +import java.sql.SQLException +import java.util.UUID + +// These tests validate the exception types (returned from a real DB) used in SqlPersistenceExceptionCategorizerImpl +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class PersistenceExceptionDbIntegrationTest { + private val dbConfig: EntityManagerConfiguration + + init { + // uncomment this to run the test against local Postgres +// System.setProperty("databaseType", "POSTGRES") + + dbConfig = DbUtils.getEntityManagerConfiguration("uniqueness_session") + + val dbChange = ClassloaderChangeLog( + linkedSetOf( + ClassloaderChangeLog.ChangeLogResourceFiles( + DbSchema::class.java.packageName, + listOf("net/corda/db/schema/vnode-uniqueness/db.changelog-master.xml"), + DbSchema::class.java.classLoader + ) + ) + ) + dbConfig.dataSource.connection.use { connection -> + LiquibaseSchemaMigratorImpl().updateDb(connection, dbChange) + } + + dbConfig.dataSource.connection.use { connection -> + connection.createStatement().use { statement -> + statement.execute( + """ + CREATE TABLE IF NOT EXISTS superman( + name VARCHAR(100) NOT NULL PRIMARY KEY, + age INT NOT NULL, + phone_number VARCHAR(50) NOT NULL, + UNIQUE(phone_number) + ); + """.trimIndent() + ) + connection.commit() + } + } + } + + private val exceptionCategorizer = SqlPersistenceExceptionCategorizerImpl() + + @Test + fun `primary key constraint violation`() { + val e = assertThrows { + dbConfig.dataSource.connection.use { connection -> + val id = UUID.randomUUID() + connection.createStatement().use { statement -> + statement.execute("INSERT INTO superman(name, age, phone_number) VALUES('$id', 10, 'p${UUID.randomUUID()}');") + statement.execute("INSERT INTO superman(name, age, phone_number) VALUES('$id', 11, 'p${UUID.randomUUID()}');") + connection.commit() + } + } + } + + assertThat(exceptionCategorizer.categorize(e)).isEqualTo(PersistenceExceptionType.DATA_RELATED) + } + + @Test + fun `unique key constraint violation`() { + val e = assertThrows { + val phone = "p-${UUID.randomUUID()}" + dbConfig.dataSource.connection.use { connection -> + connection.createStatement().use { statement -> + statement.execute("INSERT INTO superman(name, age, phone_number) VALUES('${UUID.randomUUID()}', 10, '$phone');") + statement.execute("INSERT INTO superman(name, age, phone_number) VALUES('${UUID.randomUUID()}', 11, '$phone');") + connection.commit() + } + } + } + + assertThat(exceptionCategorizer.categorize(e)).isEqualTo(PersistenceExceptionType.DATA_RELATED) + } + + @Test + fun `null constraint violation`() { + val e = assertThrows { + dbConfig.dataSource.connection.use { connection -> + connection.createStatement().use { statement -> + statement.execute( + "INSERT INTO superman(name, age, phone_number) VALUES('${UUID.randomUUID()}', NULL, '${UUID.randomUUID()}');" + ) + connection.commit() + } + } + } + + assertThat(exceptionCategorizer.categorize(e)).isEqualTo(PersistenceExceptionType.DATA_RELATED) + } + + @Test + fun `invalid SQL`() { + Assumptions.assumeFalse(DbUtils.isInMemory, "Skipping this test when run against in-memory DB.") + + val e = assertThrows { + dbConfig.dataSource.connection.use { connection -> + connection.createStatement().use { statement -> + statement.execute( + "INSERPP INTO superman(name, age, phone_number) VALUES('${UUID.randomUUID()}', NULL, '${UUID.randomUUID()}');" + ) + connection.commit() + } + } + } + + assertThat(exceptionCategorizer.categorize(e)).isEqualTo(PersistenceExceptionType.FATAL) + } + + @Test + fun `invalid field`() { + Assumptions.assumeFalse(DbUtils.isInMemory, "Skipping this test when run against in-memory DB.") + + val e = assertThrows { + dbConfig.dataSource.connection.use { connection -> + connection.createStatement().use { statement -> + statement.execute( + "INSERT INTO superman(namedddd, age, phone_number) VALUES('${UUID.randomUUID()}', NULL, '${UUID.randomUUID()}');" + ) + connection.commit() + } + } + } + + assertThat(exceptionCategorizer.categorize(e)).isEqualTo(PersistenceExceptionType.FATAL) + } + + @Test + fun `invalid table`() { + Assumptions.assumeFalse(DbUtils.isInMemory, "Skipping this test when run against in-memory DB.") + + val e = assertThrows { + dbConfig.dataSource.connection.use { connection -> + connection.createStatement().use { statement -> + statement.execute( + "INSERT INTO supermanssss(name, age, phone_number) VALUES('${UUID.randomUUID()}', NULL, '${UUID.randomUUID()}');" + ) + connection.commit() + } + } + } + + assertThat(exceptionCategorizer.categorize(e)).isEqualTo(PersistenceExceptionType.FATAL) + } +} diff --git a/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/SqlSessionDbIntegrationTest.kt b/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/SqlSessionDbIntegrationTest.kt new file mode 100644 index 00000000000..e272dec8a5c --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/SqlSessionDbIntegrationTest.kt @@ -0,0 +1,301 @@ +package net.corda.ledger.libs.uniqueness.backingstore + +import net.corda.crypto.core.SecureHashImpl +import net.corda.crypto.core.bytes +import net.corda.db.admin.impl.ClassloaderChangeLog +import net.corda.db.admin.impl.LiquibaseSchemaMigratorImpl +import net.corda.db.schema.DbSchema +import net.corda.db.testkit.DbUtils +import net.corda.ledger.libs.uniqueness.UniquenessSecureHashFactory +import net.corda.ledger.libs.uniqueness.UniquenessSecureHashFactoryTestImpl +import net.corda.ledger.libs.uniqueness.backingstore.impl.SqlSessionImpl +import net.corda.ledger.libs.uniqueness.backingstore.impl.jpaBackingStoreObjectMapper +import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity +import net.corda.orm.EntityManagerConfiguration +import net.corda.orm.PersistenceExceptionCategorizer +import net.corda.test.util.time.toSafeWindowsPrecision +import net.corda.uniqueness.datamodel.common.toCharacterRepresentation +import net.corda.uniqueness.datamodel.impl.UniquenessCheckErrorMalformedRequestImpl +import net.corda.v5.application.uniqueness.model.UniquenessCheckStateDetails +import net.corda.v5.base.util.ByteArrays +import net.corda.v5.crypto.SecureHash +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.SoftAssertions.assertSoftly +import org.junit.jupiter.api.Assumptions +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.mockito.Mockito.mock +import java.sql.Connection +import java.sql.Timestamp +import java.time.Instant +import java.util.Calendar +import java.util.TimeZone +import java.util.UUID + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class SqlSessionDbIntegrationTest { + companion object { + val tzUTC: Calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + } + + private val dbConfig: EntityManagerConfiguration + + init { + // uncomment this to run the test against local Postgres + // System.setProperty("databaseType", "POSTGRES") + + dbConfig = DbUtils.getEntityManagerConfiguration("uniqueness_session") + + val dbChange = ClassloaderChangeLog( + linkedSetOf( + ClassloaderChangeLog.ChangeLogResourceFiles( + DbSchema::class.java.packageName, + listOf("net/corda/db/schema/vnode-uniqueness/db.changelog-master.xml"), + DbSchema::class.java.classLoader + ) + ) + ) + dbConfig.dataSource.connection.use { connection -> + LiquibaseSchemaMigratorImpl().updateDb(connection, dbChange) + } + } + + private val holdingIdentity = mock() + private val metricsFactory = mock() + private val exceptionCategorizer = mock() + + @Test + fun getTransactionDetailsTest() { + Assumptions.assumeFalse(DbUtils.isInMemory, "Skipping this test when run against in-memory DB.") + + // creating more than needed to make sure where clause works. + val retrieveTxDetails = createTxDetails(3).take(2) + val rejectedDetails = createRejectedTxDetails(2).take(1) + val combined = retrieveTxDetails + rejectedDetails + dbConfig.dataSource.connection.use { connection -> + val session = createSession(connection) + val found = session.getTransactionDetails(combined.map { SecureHashImpl(it.txIdAlgo, it.txId) }) + + assertSoftly { softly -> + softly.assertThat(found.count()).isEqualTo(combined.count()) + combined.forEach { + softly.assertThat(found[SecureHashImpl(it.txIdAlgo, it.txId)]?.result?.resultTimestamp) + .isEqualTo(it.commitTimestamp) + } + rejectedDetails.forEach { + softly.assertThat(found[SecureHashImpl(it.txIdAlgo, it.txId)]?.result?.toCharacterRepresentation()) + .isEqualTo('R') + } + } + } + } + + @Test + fun getStateDetailsTest() { + Assumptions.assumeFalse(DbUtils.isInMemory, "Skipping this test when run against in-memory DB.") + + val retrieveDetails = createStateDetails(3).take(2) + dbConfig.dataSource.connection.use { connection -> + val session = createSession(connection) + val found = session.getStateDetails(retrieveDetails.map { it.stateRef }) + + assertSoftly { softly -> + softly.assertThat(found.count()).isEqualTo(retrieveDetails.count()) + retrieveDetails.forEach { + val row = found.entries.single { r -> r.key.txHash == it.stateRef.txHash && r.key.stateIndex == r.key.stateIndex }.value + softly.assertThat(row.consumingTxId) + .isEqualTo(it.consumingTxId) + } + } + } + } + + @Test + fun `when executeTransaction commit`() { + val hash = SecureHashImpl("algoo", randomBytes()) + + // persist something + dbConfig.dataSource.connection.use { connection -> + val session = createSession(connection) + session.executeTransaction { _, _ -> + connection.prepareStatement( + """ + INSERT INTO uniqueness_rejected_txs( + tx_id_algo, + tx_id, + error_details + ) VALUES (?,?,?) + """.trimIndent() + ).use { statement -> + statement.setString(1, hash.algorithm) + statement.setBytes(2, hash.bytes) + statement.setBytes( + 3, + jpaBackingStoreObjectMapper(UniquenessSecureHashFactoryTestImpl()).writeValueAsBytes( + UniquenessCheckErrorMalformedRequestImpl("error ${UUID.randomUUID()}") + ) + ) + println(statement) + statement.executeUpdate() + } + } + } + + // check it was committed + dbConfig.dataSource.connection.use { connection -> + val session = createSession(connection) + session.executeTransaction { _, _ -> + connection.prepareStatement( + """ + SELECT 1 FROM uniqueness_rejected_txs WHERE tx_id = ? + """.trimIndent() + ).use { statement -> + statement.setBytes(1, hash.bytes) + println(statement) + val rs = statement.executeQuery() + assertThat(rs.next()).isTrue() + } + } + } + } + + private fun createSession(connection: Connection): BackingStore.Session { + return SqlSessionImpl( + holdingIdentity, + connection, + metricsFactory, + exceptionCategorizer, + object : UniquenessSecureHashFactory { + override fun createSecureHash(algorithm: String, bytes: ByteArray): SecureHash { + return SecureHashImpl(algorithm, bytes) + } + + override fun getBytes(hash: SecureHash): ByteArray { + return hash.bytes + } + + override fun parseSecureHash(hashString: String): SecureHash { + hashString.split('|').let { + return SecureHashImpl(it[0], ByteArrays.parseAsHex(it[2])) + } + } + } + ) + } + + private fun createStateDetails(n: Int): List { + dbConfig.dataSource.connection.use { connection -> + connection + .prepareStatement( + """ + INSERT INTO uniqueness_state_details(issue_tx_id_algo, issue_tx_id, issue_tx_output_idx, consuming_tx_id_algo, consuming_tx_id) + VALUES (?,?,?,?,?) + """.trimIndent() + ) + .use { statement -> + val results = (1..n).map { + val details = StateDetails( + StateRef(SecureHashImpl("algo-$it", randomBytes()), it), + SecureHashImpl("c-algo-$it", randomBytes()) + ) + + statement.setString(1, details.stateRef.txHash.algorithm) + statement.setBytes(2, details.stateRef.txHash.bytes) + statement.setInt(3, details.stateRef.stateIndex) + statement.setString(4, details.consumingId!!.algorithm) + statement.setBytes(5, details.consumingId.bytes) + + statement.addBatch() + println(statement) + details + } + + assertThat(statement.executeBatch().sum()).isEqualTo(n) + connection.commit() + + return results + } + } + } + + private fun createRejectedTxDetails(n: Int): List { + val details = createTxDetails(n, 'R') + + dbConfig.dataSource.connection.use { connection -> + connection.autoCommit = true + connection.prepareStatement( + """ + INSERT INTO uniqueness_rejected_txs( + tx_id_algo, + tx_id, + error_details + ) VALUES (?,?,?) + """.trimIndent() + ).use { statement -> + details.forEach { + statement.setString(1, it.txIdAlgo) + statement.setBytes(2, it.txId) + statement.setBytes( + 3, + jpaBackingStoreObjectMapper(UniquenessSecureHashFactoryTestImpl()).writeValueAsBytes( + UniquenessCheckErrorMalformedRequestImpl("error ${UUID.randomUUID()}") + ) + ) + statement.addBatch() + } + println(statement) + statement.executeBatch() + } + } + + return details + } + + private fun createTxDetails(n: Int, result: Char = 'A'): List { + dbConfig.dataSource.connection.use { connection -> + connection + .prepareStatement( + """ + INSERT INTO + uniqueness_tx_details(tx_id_algo, tx_id, originator_x500_name, commit_timestamp, expiry_datetime, result) + VALUES (?,?,?,?,?,?) + """.trimIndent() + ) + .use { statement -> + val results = (1..n).map { + val txDetails = TransactionDetails( + "algo$it", + randomBytes(), + "X500 name$it", + Instant.now().toSafeWindowsPrecision(), + result, + ) + + statement.setString(1, txDetails.txIdAlgo) + statement.setBytes(2, txDetails.txId) + statement.setString(3, txDetails.originatorX500Name) + statement.setTimestamp( + 4, + Timestamp.from(txDetails.commitTimestamp), + tzUTC + ) + statement.setTimestamp( + 5, + Timestamp.from(txDetails.commitTimestamp.plusSeconds(100)), + tzUTC + ) + statement.setString(6, txDetails.result.toString()) + + statement.addBatch() + println(statement) + txDetails + } + + assertThat(statement.executeBatch().sum()).isEqualTo(n) + connection.commit() + + return results + } + } + } +} diff --git a/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/TransactionOpsDbIntegrationTest.kt b/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/TransactionOpsDbIntegrationTest.kt new file mode 100644 index 00000000000..f33d8e262f6 --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/TransactionOpsDbIntegrationTest.kt @@ -0,0 +1,292 @@ +package net.corda.ledger.libs.uniqueness.backingstore + +import net.corda.crypto.core.SecureHashImpl +import net.corda.crypto.core.bytes +import net.corda.db.admin.impl.ClassloaderChangeLog +import net.corda.db.admin.impl.LiquibaseSchemaMigratorImpl +import net.corda.db.schema.DbSchema +import net.corda.db.testkit.DbUtils +import net.corda.ledger.libs.uniqueness.UniquenessSecureHashFactory +import net.corda.ledger.libs.uniqueness.backingstore.impl.DefaultSqlQueryProvider +import net.corda.ledger.libs.uniqueness.backingstore.impl.SqlTransactionOpsImpl +import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity +import net.corda.orm.EntityManagerConfiguration +import net.corda.test.util.time.toSafeWindowsPrecision +import net.corda.uniqueness.datamodel.common.toCharacterRepresentation +import net.corda.uniqueness.datamodel.impl.UniquenessCheckResultFailureImpl +import net.corda.uniqueness.datamodel.impl.UniquenessCheckResultSuccessImpl +import net.corda.uniqueness.datamodel.internal.UniquenessCheckRequestInternal +import net.corda.v5.application.uniqueness.model.UniquenessCheckError +import net.corda.v5.base.util.ByteArrays +import net.corda.v5.crypto.SecureHash +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.SoftAssertions.assertSoftly +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.mockito.Mockito.mock +import java.sql.Connection +import java.time.Instant +import java.util.UUID + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class TransactionOpsDbIntegrationTest { + private val dbConfig: EntityManagerConfiguration + private val holdingIdentity = mock() + private val metricsFactory = mock() + + init { + // uncomment this to run the test against local Postgres +// System.setProperty("databaseType", "POSTGRES") + + dbConfig = DbUtils.getEntityManagerConfiguration("uniqueness_tx_ops") + + val dbChange = ClassloaderChangeLog( + linkedSetOf( + ClassloaderChangeLog.ChangeLogResourceFiles( + DbSchema::class.java.packageName, + listOf("net/corda/db/schema/vnode-uniqueness/db.changelog-master.xml"), + DbSchema::class.java.classLoader + ) + ) + ) + dbConfig.dataSource.connection.use { connection -> + LiquibaseSchemaMigratorImpl().updateDb(connection, dbChange) + } + } + + @Test + fun createUnconsumedStatesTest() { + val stateRefs = createStateRefs(3) + + dbConfig.dataSource.connection.use { connection -> + connection.autoCommit = true + val txOps = createTxOps(connection) + txOps.createUnconsumedStates(stateRefs) + } + + val foundIds = mutableListOf() + + dbConfig.dataSource.connection.use { connection -> + connection.createStatement().use { statement -> + val rs = statement.executeQuery( + """ + SELECT + issue_tx_id_algo, + issue_tx_id, + issue_tx_output_idx, + consuming_tx_id_algo, + consuming_tx_id + FROM uniqueness_state_details + """.trimIndent() + ) + + assertSoftly { softly -> + while (rs.next()) { + val txId = rs.getBytes("issue_tx_id") + stateRefs.singleOrNull { it.hash.bytes.contentEquals(txId) }?.let { + foundIds.add(txId) + softly.assertThat(rs.getString("issue_tx_id_algo")).isEqualTo(it.hash.algorithm) + softly.assertThat(rs.getInt("issue_tx_output_idx")).isEqualTo(it.index) + softly.assertThat(rs.getString("consuming_tx_id_algo")).isNull() + softly.assertThat(rs.getBytes("consuming_tx_id")).isNull() + } + } + } + assertThat(foundIds).hasSize(3) + } + } + } + + @Test + fun consumeStatesTest() { + val stateRefs = createStateRefs(3) + + dbConfig.dataSource.connection.use { connection -> + connection.autoCommit = true + createUnConsumedStates(connection, stateRefs) + } + + val consumingState = stateRefs.first() + val consumedStates = stateRefs.drop(1) + dbConfig.dataSource.connection.use { connection -> + connection.autoCommit = true + val txOps = createTxOps(connection) + txOps.consumeStates(consumingState.txHash, consumedStates) + } + + dbConfig.dataSource.connection.use { connection -> + connection.prepareStatement( + """ + SELECT + issue_tx_id, + issue_tx_id_algo + FROM uniqueness_state_details + WHERE + consuming_tx_id_algo = ? AND + consuming_tx_id = ? + """.trimIndent() + ).use { statement -> + statement.setString(1, consumingState.hash.algorithm) + statement.setBytes(2, consumingState.hash.bytes) + + val rs = statement.executeQuery() + val found = mutableListOf() + while (rs.next()) { + found.add(SecureHashImpl(rs.getString("issue_tx_id_algo"), rs.getBytes("issue_tx_id"))) + } + @Suppress("SpreadOperator") + assertThat(found) + .containsExactlyInAnyOrder(*consumedStates.map { it.hash }.toTypedArray()) + } + } + } + + @Test + fun commitTransactionsTest() { + val requests = dbConfig.dataSource.connection.use { connection -> + connection.autoCommit = true + createRequests(connection, 3) + } + + val success = listOf( + Pair(requests[0], UniquenessCheckResultSuccessImpl(Instant.now().toSafeWindowsPrecision())), + Pair(requests[1], UniquenessCheckResultSuccessImpl(Instant.now().toSafeWindowsPrecision())), + ) + val rejected = listOf( + Pair(requests[2], UniquenessCheckResultFailureImpl(Instant.now().toSafeWindowsPrecision(), object : UniquenessCheckError {})), + ) + + dbConfig.dataSource.connection.use { connection -> + connection.autoCommit = true + val txOps = createTxOps(connection) + txOps.commitTransactions(success + rejected) + } + + dbConfig.dataSource.connection.use { connection -> + connection.prepareStatement( + """ + SELECT + tx_id_algo, + tx_id, + originator_x500_name, + expiry_datetime, + commit_timestamp, + result + FROM uniqueness_tx_details + WHERE + originator_x500_name = ? OR originator_x500_name = ? OR originator_x500_name = ? + ORDER BY originator_x500_name + """.trimIndent() + ).use { statement -> + statement.setString(1, success.first().first.originatorX500Name) + statement.setString(2, success.drop(1).first().first.originatorX500Name) + statement.setString(3, rejected.first().first.originatorX500Name) + + val rs = statement.executeQuery() + val compare = ArrayDeque((success + rejected).sortedBy { it.first.originatorX500Name }) + assertSoftly { softly -> + while (rs.next()) { + val expected = compare.removeFirst() + softly.assertThat(rs.getString("tx_id_algo")).isEqualTo(expected.first.txId.algorithm) + softly.assertThat(rs.getBytes("tx_id")).isEqualTo(expected.first.txId.bytes) + softly.assertThat(rs.getTimestamp("expiry_datetime").toInstant()) + .isEqualTo(expected.first.timeWindowUpperBound) + softly.assertThat(rs.getTimestamp("commit_timestamp").toInstant()) + .isEqualTo(expected.second.resultTimestamp) + softly.assertThat(rs.getString("result")).isEqualTo(expected.second.toCharacterRepresentation().toString()) + } + } + } + + // rejected + connection.prepareStatement( + """ + SELECT + error_details + FROM uniqueness_rejected_txs + WHERE + tx_id_algo = ? AND tx_id = ? + """.trimIndent() + ).use { statement -> + statement.setString(1, rejected.first().first.txId.algorithm) + statement.setBytes(2, rejected.first().first.txId.bytes) + + val rs = statement.executeQuery() + assertThat(rs.next()).isTrue() + assertThat(rs.getString("error_details")).isNotEmpty() + } + } + } + + private fun createRequests(connection: Connection, n: Int): List { + val inputStates = ArrayDeque(createStateRefs(n * 2)) + createUnConsumedStates(connection, inputStates) + + return (1..n).map { + UniquenessCheckRequestInternal( + SecureHashImpl("algo-$it", randomBytes()), + "TX-${UUID.randomUUID()}-$it", + "X500-$it-${UUID.randomUUID()}", + listOf(inputStates.removeFirst()), + listOf(inputStates.removeFirst()), + 1, + Instant.now().minusSeconds(10).toSafeWindowsPrecision(), + Instant.now().plusSeconds(1000).toSafeWindowsPrecision(), + ) + } + } + + private fun createUnConsumedStates(connection: Connection, stateRefs: List) { + connection + .prepareStatement( + """ + INSERT INTO uniqueness_state_details( + issue_tx_id_algo, + issue_tx_id, + issue_tx_output_idx) + VALUES (?,?,?) + """.trimIndent() + ) + .use { statement -> + stateRefs.forEach { stateRef -> + statement.setString(1, stateRef.txHash.algorithm) + statement.setBytes(2, stateRef.txHash.bytes) + statement.setInt(3, stateRef.stateIndex) + + statement.addBatch() + println(statement) + } + statement.executeBatch() + } + } + + private fun createStateRefs(n: Int): List { + return (1..n).map { + StateRef(SecureHashImpl("algo-$it", randomBytes()), it) + } + } + + private fun createTxOps(connection: Connection): BackingStore.Session.TransactionOps { + return SqlTransactionOpsImpl( + connection, + DefaultSqlQueryProvider(), + object : UniquenessSecureHashFactory { + override fun createSecureHash(algorithm: String, bytes: ByteArray): SecureHash { + return SecureHashImpl(algorithm, bytes) + } + + override fun getBytes(hash: SecureHash): ByteArray { + return hash.bytes + } + + override fun parseSecureHash(hashString: String): SecureHash { + hashString.split('|').let { + return SecureHashImpl(it[0], ByteArrays.parseAsHex(it[2])) + } + } + }, + metricsFactory, + holdingIdentity, + ) + } +} diff --git a/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/Utils.kt b/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/Utils.kt new file mode 100644 index 00000000000..faa8201b0b4 --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/integrationTest/kotlin/net/corda/ledger/libs/uniqueness/backingstore/Utils.kt @@ -0,0 +1,54 @@ +package net.corda.ledger.libs.uniqueness.backingstore + +import net.corda.v5.application.uniqueness.model.UniquenessCheckStateDetails +import net.corda.v5.application.uniqueness.model.UniquenessCheckStateRef +import net.corda.v5.crypto.SecureHash +import java.time.Instant + +fun randomBytes(): ByteArray { + return (1..16).map { ('0'..'9').random() }.joinToString("").toByteArray() +} + +data class StateDetails(val sRef: UniquenessCheckStateRef, val consumingId: SecureHash?) : + UniquenessCheckStateDetails { + override fun getStateRef() = sRef + override fun getConsumingTxId(): SecureHash? = consumingId +} + +data class StateRef(val hash: SecureHash, val index: Int) : + UniquenessCheckStateRef { + override fun getTxHash(): SecureHash = hash + override fun getStateIndex(): Int = index +} + +data class TransactionDetails( + val txIdAlgo: String, + val txId: ByteArray, + val originatorX500Name: String, + val commitTimestamp: Instant, + val result: Char +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as TransactionDetails + + if (txIdAlgo != other.txIdAlgo) return false + if (!txId.contentEquals(other.txId)) return false + if (originatorX500Name != other.originatorX500Name) return false + if (commitTimestamp != other.commitTimestamp) return false + if (result != other.result) return false + + return true + } + + override fun hashCode(): Int { + var result1 = txIdAlgo.hashCode() + result1 = 31 * result1 + txId.contentHashCode() + result1 = 31 * result1 + originatorX500Name.hashCode() + result1 = 31 * result1 + commitTimestamp.hashCode() + result1 = 31 * result1 + result.hashCode() + return result1 + } +} diff --git a/libs/ledger-lib-uniqueness/src/main/java/net/corda/ledger/libs/uniqueness/backingstore/impl/DefaultSqlQueryProvider.kt b/libs/ledger-lib-uniqueness/src/main/java/net/corda/ledger/libs/uniqueness/backingstore/impl/DefaultSqlQueryProvider.kt new file mode 100644 index 00000000000..4987df3fa66 --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/main/java/net/corda/ledger/libs/uniqueness/backingstore/impl/DefaultSqlQueryProvider.kt @@ -0,0 +1,87 @@ +package net.corda.ledger.libs.uniqueness.backingstore.impl + +class DefaultSqlQueryProvider : SqlQueryProvider { + override fun findStatesByKeyQuery(): String { + return """ + SELECT issue_tx_id_algo, issue_tx_id, issue_tx_output_idx, consuming_tx_id_algo, consuming_tx_id + FROM uniqueness_state_details + WHERE (issue_tx_id_algo, issue_tx_id, issue_tx_output_idx) = + ANY( + SELECT + UNNEST(? :: text[]), + UNNEST(? :: bytea[]), + UNNEST(? :: int[]) + ) + """.trimIndent() + } + + override fun findTransactionDetailByKeyQuery(): String { + return """ + SELECT tx_id_algo, tx_id, result, commit_timestamp + FROM uniqueness_tx_details + WHERE (tx_id_algo, tx_id) = + ANY( + SELECT + UNNEST(? :: text[]), + UNNEST(? :: bytea[]) + ) + """.trimIndent() + } + + override fun findRejectedTransactionQuery(): String { + return """ + SELECT error_details FROM uniqueness_rejected_txs + WHERE tx_id_algo = ? AND tx_id = ? + """.trimIndent() + } + + override fun insertUnconsumedStatesQuery(): String { + return """ + INSERT INTO uniqueness_state_details( + issue_tx_id_algo, + issue_tx_id, + issue_tx_output_idx, + consuming_tx_id_algo, + consuming_tx_id + ) + VALUES (?,?,?,NULL,NULL) + """.trimIndent() + } + + override fun consumeStatesQuery(): String { + return """ + UPDATE uniqueness_state_details SET + consuming_tx_id_algo = ?, + consuming_tx_id = ? + WHERE + issue_tx_id_algo = ? AND + issue_tx_id = ? AND + issue_tx_output_idx = ? AND + consuming_tx_id IS NULL + """.trimIndent() + } + + override fun insertTransactionDetailsQuery(): String { + return """ + INSERT INTO uniqueness_tx_details( + tx_id_algo, + tx_id, + originator_x500_name, + expiry_datetime, + commit_timestamp, + result + ) VALUES (?,?,?,?,?,?) + """.trimIndent() + } + + override fun insertRejectedTransactionQuery(): String { + return """ + INSERT INTO uniqueness_rejected_txs( + tx_id_algo, + tx_id, + error_details + ) VALUES (?,?,?) + """.trimIndent() + } + +} diff --git a/libs/ledger-lib-uniqueness/src/main/java/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlQueryProvider.kt b/libs/ledger-lib-uniqueness/src/main/java/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlQueryProvider.kt new file mode 100644 index 00000000000..0774db18be8 --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/main/java/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlQueryProvider.kt @@ -0,0 +1,11 @@ +package net.corda.ledger.libs.uniqueness.backingstore.impl + +interface SqlQueryProvider { + fun findStatesByKeyQuery(): String + fun findTransactionDetailByKeyQuery(): String + fun findRejectedTransactionQuery(): String + fun insertUnconsumedStatesQuery(): String + fun consumeStatesQuery(): String + fun insertTransactionDetailsQuery(): String + fun insertRejectedTransactionQuery(): String +} diff --git a/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/BackingStore.kt b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/BackingStore.kt index a5de5ac3d6c..71dbe8d722c 100644 --- a/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/BackingStore.kt +++ b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/BackingStore.kt @@ -127,3 +127,5 @@ interface BackingStore { } } } + +class ConsumeStateFailedException(message: String) : IllegalStateException(message) diff --git a/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlBackingStoreImpl.kt b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlBackingStoreImpl.kt new file mode 100644 index 00000000000..467e9082e4b --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlBackingStoreImpl.kt @@ -0,0 +1,43 @@ +package net.corda.ledger.libs.uniqueness.backingstore.impl + +import net.corda.ledger.libs.uniqueness.UniquenessSecureHashFactory +import net.corda.ledger.libs.uniqueness.backingstore.BackingStore +import net.corda.ledger.libs.uniqueness.backingstore.BackingStoreMetricsFactory +import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity +import net.corda.orm.PersistenceExceptionCategorizer +import java.sql.Connection +import java.time.Duration + +/** + * Backing store using "plain" SQL. + * This could be further specialised for different types of DBs if necessary. + */ +class SqlBackingStoreImpl( + private val connectionFactory: (holdingIdentity: UniquenessHoldingIdentity) -> Connection, + private val backingStoreMetricsFactory: BackingStoreMetricsFactory, + private val uniquenessSecureHashFactory: UniquenessSecureHashFactory, + private val persistenceExceptionCategorizer: PersistenceExceptionCategorizer = SqlPersistenceExceptionCategorizerImpl() +) : BackingStore { + override fun session(holdingIdentity: UniquenessHoldingIdentity, block: (BackingStore.Session) -> Unit) { + val sessionStartTime = System.nanoTime() + @Suppress("TooGenericExceptionCaught") + try { + connectionFactory(holdingIdentity).use { connection -> + block( + SqlSessionImpl( + holdingIdentity, + connection, + backingStoreMetricsFactory, + persistenceExceptionCategorizer, + uniquenessSecureHashFactory + ) + ) + } + } finally { + backingStoreMetricsFactory.recordSessionExecutionTime( + Duration.ofNanos(System.nanoTime() - sessionStartTime), + holdingIdentity + ) + } + } +} diff --git a/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlPersistenceExceptionCategorizerImpl.kt b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlPersistenceExceptionCategorizerImpl.kt new file mode 100644 index 00000000000..4ba30d7c0ba --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlPersistenceExceptionCategorizerImpl.kt @@ -0,0 +1,84 @@ +package net.corda.ledger.libs.uniqueness.backingstore.impl + +import net.corda.ledger.libs.uniqueness.backingstore.ConsumeStateFailedException +import net.corda.orm.PersistenceExceptionCategorizer +import net.corda.orm.PersistenceExceptionType +import net.corda.utilities.criteria +import org.slf4j.LoggerFactory +import java.net.SocketException +import java.sql.SQLException +import java.sql.SQLTransientConnectionException + +class SqlPersistenceExceptionCategorizerImpl : PersistenceExceptionCategorizer { + + companion object { + internal const val CONNECTION_CLOSED_MESSAGE = "Connection is closed" + private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + } + + override fun categorize(exception: Exception): PersistenceExceptionType { + return when { + isFatal(exception) -> PersistenceExceptionType.FATAL + isDataRelated(exception) -> PersistenceExceptionType.DATA_RELATED + isTransient(exception) -> PersistenceExceptionType.TRANSIENT + else -> PersistenceExceptionType.UNCATEGORIZED + }.also { + logger.warn("Categorized exception as $it: $exception", exception) + } + } + + // list of sqlSate codes: https://github.com/spring-projects/spring-framework/blob/main/spring-jdbc/src/main/resources/org/springframework/jdbc/support/sql-error-codes.xml + private fun isFatal(exception: Exception): Boolean { + val checks = listOf( + criteria { + it.sqlState in setOf( + // badSqlGrammarCodes + "03000", "42000", "42601", "42602", "42622", "42804", "42P01", + // incorrect field + "42703" + ) + }, + ) + return checks.any { it.meetsCriteria(exception) } + } + + private fun isDataRelated(exception: Exception): Boolean { + val checks = listOf( + criteria(), + criteria { + it.sqlState in setOf( + // duplicateKeyCodes + "21000", "23505", + // dataIntegrityViolationCodes + "23000", "23502", "23503", "23514", + ) + }, + ) + return checks.any { it.meetsCriteria(exception) } + } + + private fun isTransient(exception: Exception): Boolean { + val checks = listOf( + criteria { + exception.message?.lowercase()?.contains("connection is not available") == true + }, + criteria { + it.sqlState in setOf( + // dataAccessResourceFailureCodes + "53000", "53100", "53200", "53300", + // cannotAcquireLockCodes + "55P03", + // deadlockLoserCodes + "40P01", + // unsure when these happen (source unknown) + "08001", "08003", "08004", "08006", "08007", "58030", + ) + }, + criteria { + it.message?.contains(CONNECTION_CLOSED_MESSAGE) ?: false + }, + criteria() + ) + return checks.any { it.meetsCriteria(exception) } + } +} diff --git a/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlSessionImpl.kt b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlSessionImpl.kt new file mode 100644 index 00000000000..c8f240b47bc --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlSessionImpl.kt @@ -0,0 +1,253 @@ +package net.corda.ledger.libs.uniqueness.backingstore.impl + +import net.corda.ledger.libs.uniqueness.UniquenessSecureHashFactory +import net.corda.ledger.libs.uniqueness.backingstore.BackingStore +import net.corda.ledger.libs.uniqueness.backingstore.BackingStoreMetricsFactory +import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity +import net.corda.orm.PersistenceExceptionCategorizer +import net.corda.orm.PersistenceExceptionType +import net.corda.uniqueness.datamodel.common.UniquenessConstants.RESULT_ACCEPTED_REPRESENTATION +import net.corda.uniqueness.datamodel.common.UniquenessConstants.RESULT_REJECTED_REPRESENTATION +import net.corda.uniqueness.datamodel.impl.UniquenessCheckResultFailureImpl +import net.corda.uniqueness.datamodel.impl.UniquenessCheckResultSuccessImpl +import net.corda.uniqueness.datamodel.impl.UniquenessCheckStateDetailsImpl +import net.corda.uniqueness.datamodel.impl.UniquenessCheckStateRefImpl +import net.corda.uniqueness.datamodel.internal.UniquenessCheckTransactionDetailsInternal +import net.corda.v5.application.uniqueness.model.UniquenessCheckError +import net.corda.v5.application.uniqueness.model.UniquenessCheckStateDetails +import net.corda.v5.application.uniqueness.model.UniquenessCheckStateRef +import net.corda.v5.crypto.SecureHash +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.sql.Connection +import java.time.Duration +import java.util.Calendar +import java.util.TimeZone + +@Suppress("LongParameterList") +class SqlSessionImpl( + private val holdingIdentity: UniquenessHoldingIdentity, + private val connection: Connection, + private val backingStoreMetricsFactory: BackingStoreMetricsFactory, + private val persistenceExceptionCategorizer: PersistenceExceptionCategorizer, + private val uniquenessSecureHashFactory: UniquenessSecureHashFactory, + private val sqlQueryProvider: SqlQueryProvider = DefaultSqlQueryProvider() +) : BackingStore.Session { + private companion object { + private val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + const val MAX_ATTEMPTS = 10 + private val tzUTC: Calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + } + + init { + connection.autoCommit = false + } + + private val transactionOps = + SqlTransactionOpsImpl( + connection, + sqlQueryProvider, + uniquenessSecureHashFactory, + backingStoreMetricsFactory, + holdingIdentity + ) + + @Suppress("NestedBlockDepth") + override fun executeTransaction(block: (BackingStore.Session, BackingStore.Session.TransactionOps) -> Unit) { + val transactionStartTime = System.nanoTime() + + try { + for (attemptNumber in 1..MAX_ATTEMPTS) { + try { + block(this, transactionOps) + connection.commit() + + backingStoreMetricsFactory.recordTransactionAttempts( + attemptNumber, + holdingIdentity + ) + return + } catch (e: Exception) { + when (persistenceExceptionCategorizer.categorize(e)) { + PersistenceExceptionType.DATA_RELATED, + PersistenceExceptionType.TRANSIENT -> { + // [ConsumeStateFailedException] Occurs when another worker committed a + // request with conflicting input states. Retry (by not re-throwing the + // exception), because the requests with conflicts are removed from the + // batch by the code passed in as `block`. + + // TODO This is needed because some of the exceptions + // we retry do not roll the transaction back. Once + // we improve our error handling in CORE-4983 this + // won't be necessary + if (!connection.isClosed && !connection.autoCommit) { + connection.rollback() + log.warn("Rolled back transaction") + } + backingStoreMetricsFactory.incrementTransactionErrorCount(e, holdingIdentity) + + if (attemptNumber < MAX_ATTEMPTS) { + log.warn( + "Retrying DB operation. The request might have been " + + "handled by a different notary worker or a DB error " + + "occurred when attempting to commit. Message: ${e.message}." + ) + } else { + throw IllegalStateException( + "Failed to execute transaction after the maximum number of " + + "attempts (${MAX_ATTEMPTS}). Message: ${e.message}." + ) + } + } + PersistenceExceptionType.UNCATEGORIZED, PersistenceExceptionType.FATAL -> { + log.warn("Unexpected error occurred. Message: ${e.message}") + // We potentially leak a database connection, if we don't rollback. When + // the HSM signing operation throws an exception this code path is + // triggered. + if (!connection.isClosed && !connection.autoCommit) { + connection.rollback() + log.warn("Rolled back transaction") + } + backingStoreMetricsFactory.incrementTransactionErrorCount(e, holdingIdentity) + + throw e + } + } + } + } + } finally { + backingStoreMetricsFactory.recordTransactionExecutionTime( + Duration.ofNanos(System.nanoTime() - transactionStartTime), + holdingIdentity + ) + } + } + + @Suppress("NestedBlockDepth") + override fun getStateDetails(states: Collection): Map { + val queryStartTime = System.nanoTime() + + val results = HashMap< + UniquenessCheckStateRef, + UniquenessCheckStateDetails + >() + + val statePks = states.map { + UniquenessTxAlgoStateRefKey(it.txHash.algorithm, uniquenessSecureHashFactory.getBytes(it.txHash), it.stateIndex) + } + + connection.prepareStatement(sqlQueryProvider.findStatesByKeyQuery()).use { stmt -> + stmt.setObject(1, statePks.map { it.issueTxIdAlgo }.toTypedArray()) + stmt.setObject(2, statePks.map { it.issueTxId }.toTypedArray()) + stmt.setObject(3, statePks.map { it.issueTxOutputIndex }.toTypedArray()) + stmt.executeQuery().use { rs -> + while (rs.next()) { + val selectIssueTxIdAlgo = rs.getString(1) + val selectIssueTxId = rs.getBytes(2) + val selectConsumingTxId = rs.getObject(5) + val consumingTxId = + if (rs.wasNull()) { + null + } else { + val selectConsumingTxIdAlgo = rs.getString(4) + uniquenessSecureHashFactory.createSecureHash(selectConsumingTxIdAlgo!!, selectConsumingTxId as ByteArray) + } + + val selectIssueTxOutputIndex = rs.getInt(3) + + val returnedState = UniquenessCheckStateRefImpl( + uniquenessSecureHashFactory.createSecureHash(selectIssueTxIdAlgo, selectIssueTxId), + selectIssueTxOutputIndex + ) + results[returnedState] = UniquenessCheckStateDetailsImpl(returnedState, consumingTxId) + } + } + } + + backingStoreMetricsFactory.recordDatabaseReadTime( + Duration.ofNanos(System.nanoTime() - queryStartTime), + holdingIdentity + ) + return results + } + + @Suppress("NestedBlockDepth") + override fun getTransactionDetails(txIds: Collection): Map { + val queryStartTime = System.nanoTime() + + val txPks = txIds.map { + UniquenessTxAlgoIdKey(it.algorithm, uniquenessSecureHashFactory.getBytes(it)) + } + + val results = mutableMapOf() + + connection.prepareStatement(sqlQueryProvider.findTransactionDetailByKeyQuery()).use { stmt -> + stmt.setObject(1, txPks.map { it.txIdAlgo }.toTypedArray()) + stmt.setObject(2, txPks.map { it.txId }.toTypedArray()) + stmt.executeQuery().use { rs -> + while (rs.next()) { + val txIdAlgo = rs.getString(1) + val txId = rs.getBytes(2) + val txResult = rs.getString(3).first() + val txCommitTimestamp = rs.getTimestamp(4, tzUTC).toInstant() + + val result = when (txResult) { + RESULT_ACCEPTED_REPRESENTATION -> { + UniquenessCheckResultSuccessImpl(txCommitTimestamp) + } + RESULT_REJECTED_REPRESENTATION -> { + // If the transaction is rejected we need to make sure it is also + // stored in the rejected tx table + UniquenessCheckResultFailureImpl( + txCommitTimestamp, + getTransactionError(txIdAlgo, txId) ?: throw IllegalStateException( + "Transaction with id $txId was rejected but no records were " + + "found in the rejected transactions table" + ) + ) + } + else -> throw IllegalStateException( + "Transaction result can only be " + + "'$RESULT_ACCEPTED_REPRESENTATION' or '$RESULT_REJECTED_REPRESENTATION'" + ) + } + val txHash = uniquenessSecureHashFactory.createSecureHash(txIdAlgo, txId) + results[txHash] = UniquenessCheckTransactionDetailsInternal(txHash, result) + } + } + } + + backingStoreMetricsFactory.recordDatabaseReadTime( + Duration.ofNanos(System.nanoTime() - queryStartTime), + holdingIdentity + ) + return results + } + + private fun getTransactionError( + txIdAlgo: String, + txId: ByteArray, + ): UniquenessCheckError? { + val queryStartTime = System.nanoTime() + + return connection.prepareStatement(sqlQueryProvider.findRejectedTransactionQuery()).use { stmt -> + stmt.setString(1, txIdAlgo) + stmt.setBytes(2, txId) + stmt.executeQuery().use { rs -> + if (rs.next()) { + jpaBackingStoreObjectMapper(uniquenessSecureHashFactory).readValue( + rs.getBytes(1), + UniquenessCheckError::class.java + ) + } else { + null + } + }.also { + backingStoreMetricsFactory.recordDatabaseReadTime( + Duration.ofNanos(System.nanoTime() - queryStartTime), + holdingIdentity + ) + } + } + } +} diff --git a/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlTransactionOpsImpl.kt b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlTransactionOpsImpl.kt new file mode 100644 index 00000000000..35e9dba2f38 --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/main/kotlin/net/corda/ledger/libs/uniqueness/backingstore/impl/SqlTransactionOpsImpl.kt @@ -0,0 +1,106 @@ +package net.corda.ledger.libs.uniqueness.backingstore.impl + +import net.corda.crypto.core.bytes +import net.corda.ledger.libs.uniqueness.UniquenessSecureHashFactory +import net.corda.ledger.libs.uniqueness.backingstore.BackingStore +import net.corda.ledger.libs.uniqueness.backingstore.BackingStoreMetricsFactory +import net.corda.ledger.libs.uniqueness.backingstore.ConsumeStateFailedException +import net.corda.ledger.libs.uniqueness.data.UniquenessHoldingIdentity +import net.corda.uniqueness.datamodel.common.UniquenessConstants.REJECTED_TRANSACTION_ERROR_DETAILS_LENGTH +import net.corda.uniqueness.datamodel.common.toCharacterRepresentation +import net.corda.uniqueness.datamodel.internal.UniquenessCheckRequestInternal +import net.corda.v5.application.uniqueness.model.UniquenessCheckResult +import net.corda.v5.application.uniqueness.model.UniquenessCheckResultFailure +import net.corda.v5.application.uniqueness.model.UniquenessCheckStateRef +import net.corda.v5.crypto.SecureHash +import java.sql.Connection +import java.sql.Timestamp +import java.time.Duration +import java.util.Calendar +import java.util.TimeZone + +// TODO - integration test these queries in isolation +class SqlTransactionOpsImpl( + private val connection: Connection, + private val sqlQueryProvider: SqlQueryProvider, + private val uniquenessSecureHashFactory: UniquenessSecureHashFactory, + private val backingStoreMetricsFactory: BackingStoreMetricsFactory, + private val holdingIdentity: UniquenessHoldingIdentity, +) : BackingStore.Session.TransactionOps { + companion object { + val tzUTC: Calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + } + override fun createUnconsumedStates(stateRefs: Collection) { + connection.prepareStatement(sqlQueryProvider.insertUnconsumedStatesQuery()).use { stmt -> + stateRefs.forEach { stateRef -> + stmt.setString(1, stateRef.txHash.algorithm) + stmt.setBytes(2, uniquenessSecureHashFactory.getBytes(stateRef.txHash)) + stmt.setInt(3, stateRef.stateIndex) + stmt.addBatch() + } + stmt.executeBatch() + } + } + + override fun consumeStates(consumingTxId: SecureHash, stateRefs: Collection) { + connection.prepareStatement(sqlQueryProvider.consumeStatesQuery()).use { stmt -> + stateRefs.forEach { stateRef -> + stmt.setString(1, consumingTxId.algorithm) + stmt.setBytes(2, consumingTxId.bytes) + stmt.setString(3, stateRef.txHash.algorithm) + stmt.setBytes(4, stateRef.txHash.bytes) + stmt.setInt(5, stateRef.stateIndex) + + // Not using batch insert so we can check each query has done an update. + // this replicates existing behaviour, but could be further optimised! + val updatedRowCount = stmt.executeUpdate() + if (updatedRowCount == 0) { + throw ConsumeStateFailedException( + "No states were consumed, this might be an in-flight double spend" + ) + } + } + } + } + + @Suppress("NestedBlockDepth") + override fun commitTransactions(transactionDetails: Collection>) { + val commitStartTime = System.nanoTime() + + connection.prepareStatement(sqlQueryProvider.insertTransactionDetailsQuery()).use { stmt -> + connection.prepareStatement(sqlQueryProvider.insertRejectedTransactionQuery()).use { rejectStmt -> + transactionDetails.forEach { (request, result) -> + stmt.setString(1, request.txId.algorithm) + stmt.setBytes(2, request.txId.bytes) + stmt.setString(3, request.originatorX500Name) + stmt.setTimestamp(4, Timestamp.from(request.timeWindowUpperBound), tzUTC) + stmt.setTimestamp(5, Timestamp.from(result.resultTimestamp), tzUTC) + stmt.setString(6, result.toCharacterRepresentation().toString()) + stmt.addBatch() + + if (result is UniquenessCheckResultFailure) { + val errorDetails = jpaBackingStoreObjectMapper(uniquenessSecureHashFactory).writeValueAsBytes(result.error) + // NOTE: this limitation is put in to replicate the existing behaviour, but this is un-necessary. + // The type of VARBINARY(1024) as set in Liquibase, does not exist in PostgeSQL, and instead a BYTEA is used + // which fits 1Gb of space. + if (errorDetails.size > REJECTED_TRANSACTION_ERROR_DETAILS_LENGTH) { + throw IllegalArgumentException( + "The maximum size of the error_details field is $REJECTED_TRANSACTION_ERROR_DETAILS_LENGTH" + ) + } + rejectStmt.setString(1, request.txId.algorithm) + rejectStmt.setBytes(2, request.txId.bytes) + rejectStmt.setBytes(3, errorDetails) + rejectStmt.addBatch() + } + } + stmt.executeBatch() + rejectStmt.executeBatch() + } + } + backingStoreMetricsFactory.recordDatabaseCommitTime( + Duration.ofNanos(System.nanoTime() - commitStartTime), + holdingIdentity + ) + } +} diff --git a/libs/ledger-lib-uniqueness/src/test/kotlin/net/corda/ledger/libs/uniqueness/SqlSessionImplTest.kt b/libs/ledger-lib-uniqueness/src/test/kotlin/net/corda/ledger/libs/uniqueness/SqlSessionImplTest.kt new file mode 100644 index 00000000000..633389d35a3 --- /dev/null +++ b/libs/ledger-lib-uniqueness/src/test/kotlin/net/corda/ledger/libs/uniqueness/SqlSessionImplTest.kt @@ -0,0 +1,104 @@ +package net.corda.ledger.libs.uniqueness + +import net.corda.ledger.libs.uniqueness.backingstore.impl.SqlSessionImpl +import net.corda.orm.PersistenceExceptionCategorizer +import net.corda.orm.PersistenceExceptionType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.mockito.Mockito.mock +import org.mockito.kotlin.any +import org.mockito.kotlin.whenever +import javax.persistence.OptimisticLockException + +class SqlSessionImplTest { + private companion object { + private const val MAX_ATTEMPTS = 10 + } + + val persistenceExceptionCategorizer = mock() + + val session = SqlSessionImpl( + mock(), + mock(), + mock(), + persistenceExceptionCategorizer, + mock(), + ) + + @Test + fun `Executing transaction retries upon data_related exception`() { + whenever(persistenceExceptionCategorizer.categorize(any())) + .thenReturn(PersistenceExceptionType.DATA_RELATED) + var execCounter = 0 + assertThrows { + session.executeTransaction { _, _ -> + execCounter++ + throw DummyException() + } + } + assertThat(execCounter).isEqualTo(MAX_ATTEMPTS) + } + + @Test + fun `Executing transaction retries upon transient exception`() { + whenever(persistenceExceptionCategorizer.categorize(any())) + .thenReturn(PersistenceExceptionType.TRANSIENT) + var execCounter = 0 + assertThrows { + session.executeTransaction { _, _ -> + execCounter++ + throw DummyException() + } + } + assertThat(execCounter).isEqualTo(MAX_ATTEMPTS) + } + + @Test + fun `Executing transaction does not retry upon uncategorized exception`() { + whenever(persistenceExceptionCategorizer.categorize(any())) + .thenReturn(PersistenceExceptionType.UNCATEGORIZED) + var execCounter = 0 + assertThrows { + session.executeTransaction { _, _ -> + execCounter++ + throw DummyException() + } + } + assertThat(execCounter).isEqualTo(1) + } + + @Test + fun `Executing transaction succeeds after transient failures`() { + whenever(persistenceExceptionCategorizer.categorize(any())) + .thenReturn(PersistenceExceptionType.TRANSIENT) + val retryCnt = 3 + var execCounter = 0 + assertDoesNotThrow { + session.executeTransaction { _, _ -> + execCounter++ + if (execCounter < retryCnt) { + throw OptimisticLockException() + } + } + } + assertThat(execCounter).isEqualTo(retryCnt) + } + + @Test + fun `Executing transaction does not retry upon fatal exception`() { + whenever(persistenceExceptionCategorizer.categorize(any())) + .thenReturn(PersistenceExceptionType.FATAL) + var execCounter = 0 + assertThrows { + session.executeTransaction { _, _ -> + execCounter++ + throw DummyException() + } + } + assertThat(execCounter).isEqualTo(1) + } + + class DummyException(message: String = "") : Exception(message) +} diff --git a/libs/uniqueness/common/src/main/kotlin/net/corda/uniqueness/datamodel/internal/UniquenessCheckRequestInternal.kt b/libs/uniqueness/common/src/main/kotlin/net/corda/uniqueness/datamodel/internal/UniquenessCheckRequestInternal.kt index 4824801f1ac..583dc2d4313 100644 --- a/libs/uniqueness/common/src/main/kotlin/net/corda/uniqueness/datamodel/internal/UniquenessCheckRequestInternal.kt +++ b/libs/uniqueness/common/src/main/kotlin/net/corda/uniqueness/datamodel/internal/UniquenessCheckRequestInternal.kt @@ -9,7 +9,7 @@ import java.time.Instant * backing store only. This simply wraps the external message bus request, converting data that * is represented as primitive types into the internal types used within the uniqueness checker. */ -data class UniquenessCheckRequestInternal constructor( +data class UniquenessCheckRequestInternal( val txId: SecureHash, val rawTxId: String, val originatorX500Name: String,