diff --git a/components/ledger/ledger-persistence/src/integrationTest/kotlin/net/corda/ledger/persistence/utxo/tests/UtxoPersistenceServiceImplTest.kt b/components/ledger/ledger-persistence/src/integrationTest/kotlin/net/corda/ledger/persistence/utxo/tests/UtxoPersistenceServiceImplTest.kt index 9c26cd6904b..9a8c8d3139f 100644 --- a/components/ledger/ledger-persistence/src/integrationTest/kotlin/net/corda/ledger/persistence/utxo/tests/UtxoPersistenceServiceImplTest.kt +++ b/components/ledger/ledger-persistence/src/integrationTest/kotlin/net/corda/ledger/persistence/utxo/tests/UtxoPersistenceServiceImplTest.kt @@ -77,6 +77,8 @@ import net.corda.v5.ledger.utxo.observer.UtxoTokenPoolKey import net.corda.v5.ledger.utxo.transaction.UtxoLedgerTransaction import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy +import org.hibernate.Session +import org.hibernate.internal.SessionImpl import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.BeforeAll @@ -98,12 +100,14 @@ import java.security.KeyPairGenerator import java.security.MessageDigest import java.security.PublicKey import java.security.spec.ECGenParameterSpec +import java.sql.Connection import java.time.Duration import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Random import java.util.UUID import java.util.concurrent.atomic.AtomicInteger +import javax.persistence.EntityManager import javax.persistence.EntityManagerFactory @ExtendWith(ServiceExtension::class, BundleContextExtension::class) @@ -180,7 +184,7 @@ class UtxoPersistenceServiceImplTest { filteredTransactionFactory = ctx.getSandboxSingletonService() persistenceService = UtxoPersistenceServiceImpl( - entityManagerFactory, + { getConnection(entityManagerFactory.createEntityManager()) }, repository, serializationService, digestService, @@ -293,13 +297,17 @@ class UtxoPersistenceServiceImplTest { assertThat(retval).isEqualTo(expectedRetval) } + private fun getConnection(em: EntityManager): Connection { + return (em.unwrap(Session::class.java) as SessionImpl).connection() + } + @Test fun `find unconsumed visible transaction states`() { val entityFactory = UtxoEntityFactory(entityManagerFactory) val transaction1 = createSignedTransaction() val transaction2 = createSignedTransaction() - entityManagerFactory.transaction { em -> - + entityManagerFactory.createEntityManager().transaction { em -> + val conn = getConnection(em) em.createNativeQuery("DELETE FROM {h-schema}utxo_visible_transaction_output").executeUpdate() createTransactionEntity(entityFactory, transaction1, status = VERIFIED).also { em.persist(it) } @@ -334,9 +342,9 @@ class UtxoPersistenceServiceImplTest { ) ) - repository.persistVisibleTransactionOutputs(em, transaction1.id.toString(), Instant.now(), outputs) - repository.persistVisibleTransactionOutputs(em, transaction2.id.toString(), Instant.now(), outputs2) - repository.markTransactionVisibleStatesConsumed(em, listOf(StateRef(transaction2.id, 1)), Instant.now()) + repository.persistVisibleTransactionOutputs(conn, transaction1.id.toString(), Instant.now(), outputs) + repository.persistVisibleTransactionOutputs(conn, transaction2.id.toString(), Instant.now(), outputs2) + repository.markTransactionVisibleStatesConsumed(conn, listOf(StateRef(transaction2.id, 1)), Instant.now()) } val stateClass = TestContractState2::class.java @@ -1302,7 +1310,11 @@ class UtxoPersistenceServiceImplTest { val visibleOutputIndexes = listOf(0) // prove that the output of filtered tx with index 0 is used as an input - val indexes = repository.findConsumedTransactionSourcesForTransaction(em, transactionIdString, visibleOutputIndexes) + val indexes = repository.findConsumedTransactionSourcesForTransaction( + getConnection(em), + transactionIdString, + visibleOutputIndexes + ) assertThat(indexes).contains(0) } @@ -1404,7 +1416,7 @@ class UtxoPersistenceServiceImplTest { val visibleOutputIndexes = listOf(0, 1) // prove that the output of unverified tx is not consumed in other tx - val indexes = repository.findConsumedTransactionSourcesForTransaction(em, txAId, visibleOutputIndexes) + val indexes = repository.findConsumedTransactionSourcesForTransaction(getConnection(em), txAId, visibleOutputIndexes) assertThat(indexes).isEmpty() } @@ -1435,7 +1447,11 @@ class UtxoPersistenceServiceImplTest { val visibleOutputIndexes = listOf(0) // prove that the output of filtered tx with index 0 is used as an input - val indexes = repository.findConsumedTransactionSourcesForTransaction(em, transactionIdString, visibleOutputIndexes) + val indexes = repository.findConsumedTransactionSourcesForTransaction( + getConnection(em), + transactionIdString, + visibleOutputIndexes + ) assertThat(indexes).contains(0) } diff --git a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/utxo/impl/UtxoRequestHandlerSelectorImpl.kt b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/utxo/impl/UtxoRequestHandlerSelectorImpl.kt index 8dd8aaf97a2..b1502d02f84 100644 --- a/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/utxo/impl/UtxoRequestHandlerSelectorImpl.kt +++ b/components/ledger/ledger-persistence/src/main/kotlin/net/corda/ledger/persistence/utxo/impl/UtxoRequestHandlerSelectorImpl.kt @@ -47,6 +47,8 @@ import net.corda.persistence.common.getSerializationService import net.corda.sandboxgroupcontext.SandboxGroupContext import net.corda.sandboxgroupcontext.getSandboxSingletonService import net.corda.utilities.time.UTCClock +import org.hibernate.Session +import org.hibernate.internal.SessionImpl import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference @@ -63,7 +65,10 @@ class UtxoRequestHandlerSelectorImpl @Activate constructor( @Suppress("LongMethod") override fun selectHandler(sandbox: SandboxGroupContext, request: LedgerPersistenceRequest): RequestHandler { val persistenceService = UtxoPersistenceServiceImpl( - entityManagerFactory = sandbox.getEntityManagerFactory(), + connectionFactory = { + val emf = sandbox.getEntityManagerFactory() + (emf.createEntityManager().unwrap(Session::class.java) as SessionImpl).connection() + }, repository = sandbox.getSandboxSingletonService(), serializationService = sandbox.getSerializationService(), sandboxDigestService = sandbox.getSandboxSingletonService(), diff --git a/components/ledger/ledger-persistence/src/test/kotlin/net/corda/ledger/persistence/utxo/UtxoComponentGroupMapperTest.kt b/components/ledger/ledger-persistence/src/test/kotlin/net/corda/ledger/persistence/utxo/UtxoComponentGroupMapperTest.kt index 7af2e7441a6..908b7bcf2c7 100644 --- a/components/ledger/ledger-persistence/src/test/kotlin/net/corda/ledger/persistence/utxo/UtxoComponentGroupMapperTest.kt +++ b/components/ledger/ledger-persistence/src/test/kotlin/net/corda/ledger/persistence/utxo/UtxoComponentGroupMapperTest.kt @@ -19,6 +19,7 @@ class UtxoComponentGroupMapperTest { private fun mockTuple(values: List) = mock().apply { whenever(this.get(anyInt())).thenAnswer { invocation -> values[invocation.arguments[0] as Int] } + whenever(this.toArray()).thenAnswer { values.toTypedArray() } } @Test diff --git a/components/ledger/ledger-persistence/src/test/kotlin/net/corda/ledger/persistence/utxo/impl/UtxoPersistenceServiceImplTest.kt b/components/ledger/ledger-persistence/src/test/kotlin/net/corda/ledger/persistence/utxo/impl/UtxoPersistenceServiceImplTest.kt index d7f1413cc6f..ae0fa0b2e53 100644 --- a/components/ledger/ledger-persistence/src/test/kotlin/net/corda/ledger/persistence/utxo/impl/UtxoPersistenceServiceImplTest.kt +++ b/components/ledger/ledger-persistence/src/test/kotlin/net/corda/ledger/persistence/utxo/impl/UtxoPersistenceServiceImplTest.kt @@ -28,6 +28,8 @@ import net.corda.v5.ledger.utxo.StateRef import net.corda.v5.ledger.utxo.TransactionState import net.corda.v5.ledger.utxo.query.json.ContractStateVaultJsonFactory import org.assertj.core.api.Assertions.assertThat +import org.hibernate.Session +import org.hibernate.internal.SessionImpl import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -37,8 +39,8 @@ import org.mockito.kotlin.doAnswer import org.mockito.kotlin.doReturn import org.mockito.kotlin.mock import org.mockito.kotlin.whenever -import java.lang.IllegalArgumentException import java.security.PublicKey +import java.sql.Connection import javax.persistence.EntityManager import javax.persistence.EntityManagerFactory @@ -73,8 +75,16 @@ class UtxoPersistenceServiceImplTest { registerJsonFactory(InvalidStateJsonFactory() as ContractStateVaultJsonFactory) } + private val connectionMock = mock { + } + + private val mockSession = mock { + on { connection() } doReturn mock() + } + private val mockEm = mock { on { transaction } doReturn mock() + on { unwrap(Session::class.java) } doReturn mockSession } private val mockEmFactory = mock { @@ -82,7 +92,7 @@ class UtxoPersistenceServiceImplTest { } private val persistenceService = UtxoPersistenceServiceImpl( - mockEmFactory, + { connectionMock }, mockRepository, mock(), mockDigestService, @@ -140,7 +150,7 @@ class UtxoPersistenceServiceImplTest { whenever(emptyDefaultContractStateVaultJsonFactory.create(any(), any())).thenReturn("") val singlePersistenceService = UtxoPersistenceServiceImpl( - mockEmFactory, + { connectionMock }, mockRepository, mock(), mockDigestService, @@ -236,7 +246,7 @@ class UtxoPersistenceServiceImplTest { @Test fun `Persisting a transaction while zero JSON factories are registered will result still store the default state json`() { val emptyPersistenceService = UtxoPersistenceServiceImpl( - mockEmFactory, + { connectionMock }, mockRepository, mock(), mockDigestService, @@ -281,7 +291,7 @@ class UtxoPersistenceServiceImplTest { } val persistenceService = UtxoPersistenceServiceImpl( - mockEmFactory, + { connectionMock }, mockRepository, mock(), mockDigestService, diff --git a/components/ledger/ledger-utxo-flow/src/main/kotlin/net/corda/ledger/utxo/flow/impl/flows/backchain/TransactionBackchainVerifierImpl.kt b/components/ledger/ledger-utxo-flow/src/main/kotlin/net/corda/ledger/utxo/flow/impl/flows/backchain/TransactionBackchainVerifierImpl.kt index da7cd4005a0..4d99e1b9f95 100644 --- a/components/ledger/ledger-utxo-flow/src/main/kotlin/net/corda/ledger/utxo/flow/impl/flows/backchain/TransactionBackchainVerifierImpl.kt +++ b/components/ledger/ledger-utxo-flow/src/main/kotlin/net/corda/ledger/utxo/flow/impl/flows/backchain/TransactionBackchainVerifierImpl.kt @@ -46,7 +46,7 @@ class TransactionBackchainVerifierImpl @Activate constructor( val (transaction, status) = utxoLedgerPersistenceService.findSignedLedgerTransactionWithStatus( transactionId, UNVERIFIED - ) ?: throw CordaRuntimeException("Transaction does not exist locally") + ) ?: throw CordaRuntimeException("Transaction ($UNVERIFIED) with id: $transactionId does not exist locally") when (status) { INVALID -> { log.warn( diff --git a/libs/db/db-core/src/main/kotlin/net/corda/db/core/utils/ConnectionUtils.kt b/libs/db/db-core/src/main/kotlin/net/corda/db/core/utils/ConnectionUtils.kt index 6a2d5248115..b354e8d759e 100644 --- a/libs/db/db-core/src/main/kotlin/net/corda/db/core/utils/ConnectionUtils.kt +++ b/libs/db/db-core/src/main/kotlin/net/corda/db/core/utils/ConnectionUtils.kt @@ -1,11 +1,14 @@ package net.corda.db.core.utils +import org.slf4j.LoggerFactory import java.sql.Connection +private val log = LoggerFactory.getLogger("ConnectionUtils") + /** * Executes [block] in a transaction using the [Connection]. * - * Commits transaction if no exceptions were thrown by [block]. Otherwise rolls back the transaction. + * Commits transaction if no exceptions were thrown by [block]. Otherwise, rolls back the transaction. * * Finally closes the connection after committing or rolling back the changes. * @@ -14,12 +17,21 @@ import java.sql.Connection * * @return The result of executing [block]. */ -inline fun Connection.transaction(block: (Connection) -> R): R { +fun Connection.transaction(block: (Connection) -> R): R { + return transactionWithLogging(null, block) +} + +fun Connection.transactionWithLogging(name: String?, block: (Connection) -> R): R { + if(null != name && log.isTraceEnabled) log.trace("Start transaction $name") autoCommit = false return try { - block(this).also { commit() } + block(this).also { + commit() + if(null != name && log.isTraceEnabled) log.trace("Transaction $name committed") + } } catch (e: Exception) { rollback() + if(null != name && log.isWarnEnabled) log.error("Transaction $name rolled back") throw e } finally { close() diff --git a/libs/ledger-lib-persistence/build.gradle b/libs/ledger-lib-persistence/build.gradle index 2bef539e35a..1028c3f5f43 100644 --- a/libs/ledger-lib-persistence/build.gradle +++ b/libs/ledger-lib-persistence/build.gradle @@ -17,7 +17,6 @@ dependencies { implementation project(':libs:ledger:ledger-common-data') implementation project(':libs:ledger:ledger-utxo-data') implementation project(":libs:db:db-core") - implementation project(":libs:db:db-orm") implementation project(":libs:utilities") implementation project(":libs:serialization:json-validator-lib") @@ -26,7 +25,6 @@ dependencies { implementation 'net.corda:corda-ledger-common' implementation 'net.corda:corda-ledger-utxo' implementation libs.slf4j.api - implementation "org.hibernate:hibernate-core:$hibernateVersion" implementation 'org.jetbrains.kotlin:kotlin-stdlib' implementation libs.jackson.core } \ No newline at end of file diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/common/ComponentGroupMapper.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/common/ComponentGroupMapper.kt index 6c4ed50c9cb..050366a36d1 100644 --- a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/common/ComponentGroupMapper.kt +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/common/ComponentGroupMapper.kt @@ -8,4 +8,10 @@ interface ComponentGroupMapper { fun map(tuples: List): Map> } +interface ComponentGroupArrayMapper { + @Throws(SQLException::class) + fun mapColumns(tuples: List>): Map> +} + fun List.mapToComponentGroups(mapper: ComponentGroupMapper): Map> = mapper.map(this) +fun List>.mapToComponentGroups(mapper: ComponentGroupArrayMapper): Map> = mapper.mapColumns(this) diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/util/NamedParamQuery.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/util/NamedParamQuery.kt new file mode 100644 index 00000000000..1ea68d6185b --- /dev/null +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/util/NamedParamQuery.kt @@ -0,0 +1,48 @@ +package net.corda.ledger.libs.persistence.util + +interface NamedParamQuery { + companion object { + fun from(sql: String): NamedParamQuery { + val plainSql = StringBuilder() + val fields = mutableMapOf() + var marker: StringBuilder? = null + var markerIndex = 0 + for (i in sql.indices) { + val c = sql[i] + if (c == ':' && (i < sql.length - 1 && sql[i + 1].isValidTokenChar())) { + marker = StringBuilder() + markerIndex++ + plainSql.append('?') + continue + } + + if (null != marker) { + if (!c.isValidTokenChar()) { + fields[marker.toString()] = markerIndex + marker = null + } else { + marker.append(c) + continue + } + } + plainSql.append(c) + } + + if (null != marker) { + fields[marker.toString()] = markerIndex + } + + return NamedParamQueryImpl(plainSql.toString(), fields) + } + + private fun Char.isValidTokenChar(): Boolean = this.isLetterOrDigit() || this == '_' || this == '-' + } + + val sql: String + val fields: Map + + private class NamedParamQueryImpl( + override val sql: String, + override val fields: Map, + ) : NamedParamQuery +} diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/util/NamedParamStatement.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/util/NamedParamStatement.kt new file mode 100644 index 00000000000..f180ec3b213 --- /dev/null +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/util/NamedParamStatement.kt @@ -0,0 +1,113 @@ +package net.corda.ledger.libs.persistence.util + +import net.corda.crypto.core.InvalidParamsException +import net.corda.ledger.libs.persistence.utxo.impl.UtxoPersistenceServiceImpl +import net.corda.utilities.debug +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.ResultSet +import java.sql.Timestamp +import java.time.Instant +import java.util.Calendar +import java.util.TimeZone + +class NamedParamStatement( + private val namedParamQuery: NamedParamQuery, + private val conn: Connection, +) : AutoCloseable { + private companion object { + val tzUTC: Calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + val log: Logger = LoggerFactory.getLogger(UtxoPersistenceServiceImpl::class.java) + } + private val statement: PreparedStatement = conn.prepareStatement(namedParamQuery.sql) + + override fun close() { + statement.close() + } + + fun executeQuery(): ResultSet { + log.debug { "Execute Query: $statement" } + return statement.executeQuery() + } + + fun executeQueryAsList(mapper: (ResultSet) -> T): List { + val result = mutableListOf() + executeQuery().use { rs -> + while (rs.next()) { + result.add(mapper(rs)) + } + } + return result + } + + fun executeQueryAsListOfColumns(): List> { + val result = mutableListOf>() + executeQuery().use { rs -> + val colCount = rs.metaData.columnCount + while (rs.next()) { + val cols = arrayOfNulls(colCount) + for (i in 0 until colCount) { + cols[i] = rs.getObject(i + 1) + } + result.add(cols) + } + } + return result + } + + fun executeUpdate(): Int { + log.debug { "Execute Update: $statement" } + + return statement.executeUpdate() + } + + fun setInt(name: String, value: Int) { + statement.setInt( + getFieldIndex(name), + value + ) + } + + fun setStrings(name: String, values: List) { + val stringsArray = conn.createArrayOf("varchar", values.toTypedArray()) + statement.setArray( + getFieldIndex(name), + stringsArray + ) + } + + fun setInts(name: String, values: List) { + val intsArray = conn.createArrayOf("int", values.toTypedArray()) + statement.setArray( + getFieldIndex(name), + intsArray + ) + } + + fun setString(name: String, value: String) { + statement.setString( + getFieldIndex(name), + value + ) + } + + fun setInstant(name: String, value: Instant) { + statement.setTimestamp( + getFieldIndex(name), + Timestamp.from(value), + tzUTC + ) + } + + fun setBytes(name: String, value: ByteArray) { + statement.setBytes( + getFieldIndex(name), + value + ) + } + + private fun getFieldIndex(name: String) = + namedParamQuery.fields[name] ?: throw InvalidParamsException("Field $name not found") +} diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/UtxoRepository.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/UtxoRepository.kt index 1f7dfc93928..bbfcfd519fd 100644 --- a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/UtxoRepository.kt +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/UtxoRepository.kt @@ -10,56 +10,56 @@ import net.corda.v5.application.crypto.DigitalSignatureAndMetadata import net.corda.v5.crypto.SecureHash import net.corda.v5.ledger.utxo.StateRef import net.corda.v5.ledger.utxo.observer.UtxoToken +import java.sql.Connection import java.time.Instant -import javax.persistence.EntityManager @Suppress("TooManyFunctions") interface UtxoRepository { /** Retrieves transaction IDs and their statuses if its ID is included in the [transactionIds] list. */ fun findSignedTransactionIdsAndStatuses( - entityManager: EntityManager, + connection: Connection, transactionIds: List ): Map /** Retrieves transaction by [id] */ fun findTransaction( - entityManager: EntityManager, + connection: Connection, id: String ): SignedTransactionContainer? /** Retrieves transaction component leafs except metadata which is stored separately */ fun findTransactionComponentLeafs( - entityManager: EntityManager, + connection: Connection, transactionId: String ): Map> /** Retrieves transaction component leaves related to visible unspent states and subclass states.*/ fun findUnconsumedVisibleStatesByType( - entityManager: EntityManager + connection: Connection ): List /** Retrieves transaction component leafs related to specific StateRefs */ fun resolveStateRefs( - entityManager: EntityManager, + connection: Connection, stateRefs: List ): List /** Retrieves transaction signatures */ fun findTransactionSignatures( - entityManager: EntityManager, + connection: Connection, transactionId: String ): List /** Retrieves a transaction's status */ fun findSignedTransactionStatus( - entityManager: EntityManager, + connection: Connection, id: String, ): String? /** Marks visible states of transactions consumed */ fun markTransactionVisibleStatesConsumed( - entityManager: EntityManager, + connection: Connection, stateRefs: List, timestamp: Instant ) @@ -67,7 +67,7 @@ interface UtxoRepository { /** Persists transaction (operation is idempotent) */ @Suppress("LongParameterList") fun persistTransaction( - entityManager: EntityManager, + connection: Connection, id: String, privacySalt: ByteArray, account: String, @@ -79,7 +79,7 @@ interface UtxoRepository { /** Persists unverified transaction (operation is idempotent) */ @Suppress("LongParameterList") fun persistUnverifiedTransaction( - entityManager: EntityManager, + connection: Connection, id: String, privacySalt: ByteArray, account: String, @@ -90,17 +90,21 @@ interface UtxoRepository { /** Persists unverified transaction (operation is idempotent) */ @Suppress("LongParameterList") fun persistFilteredTransactions( - entityManager: EntityManager, + connection: Connection, filteredTransactions: List, timestamp: Instant, ) /** Updates an existing verified transaction */ - fun updateTransactionToVerified(entityManager: EntityManager, id: String, timestamp: Instant) + fun updateTransactionToVerified( + connection: Connection, + id: String, + timestamp: Instant + ) /** Persists transaction metadata (operation is idempotent) */ fun persistTransactionMetadata( - entityManager: EntityManager, + connection: Connection, hash: String, metadataBytes: ByteArray, groupParametersHash: String, @@ -110,7 +114,7 @@ interface UtxoRepository { /** Persists transaction source (operation is idempotent) */ @Suppress("LongParameterList") fun persistTransactionSources( - entityManager: EntityManager, + connection: Connection, transactionId: String, transactionSources: List ) @@ -123,21 +127,21 @@ interface UtxoRepository { ) fun persistTransactionComponents( - entityManager: EntityManager, + connection: Connection, transactionId: String, components: List>, hash: (ByteArray) -> String ) fun persistTransactionComponents( - entityManager: EntityManager, + connection: Connection, components: List, hash: (ByteArray) -> String ) /** Persists transaction output (operation is idempotent) */ fun persistVisibleTransactionOutputs( - entityManager: EntityManager, + connection: Connection, transactionId: String, timestamp: Instant, visibleTransactionOutputs: List @@ -145,7 +149,7 @@ interface UtxoRepository { /** Persists transaction [signature] (operation is idempotent) */ fun persistTransactionSignatures( - entityManager: EntityManager, + connection: Connection, signatures: List, timestamp: Instant ) @@ -158,7 +162,7 @@ interface UtxoRepository { * - INVALID -> INVALID */ fun updateTransactionStatus( - entityManager: EntityManager, + connection: Connection, transactionId: String, transactionStatus: TransactionStatus, timestamp: Instant @@ -166,52 +170,56 @@ interface UtxoRepository { /** Retrieves signed group parameters */ fun findSignedGroupParameters( - entityManager: EntityManager, + connection: Connection, hash: String ): SignedGroupParameters? /** Persists signed group parameters */ fun persistSignedGroupParameters( - entityManager: EntityManager, + connection: Connection, hash: String, signedGroupParameters: SignedGroupParameters, timestamp: Instant ) /** Persists a merkle proof and returns its ID */ - fun persistMerkleProofs(entityManager: EntityManager, merkleProofs: List) + fun persistMerkleProofs(connection: Connection, merkleProofs: List) /** Persist a leaf index that belongs to a given merkle proof with ID [merkleProofId] */ fun persistMerkleProofLeaves( - entityManager: EntityManager, + connection: Connection, leaves: List ) /** Find all the merkle proofs for a given list of transaction IDs */ fun findMerkleProofs( - entityManager: EntityManager, + connection: Connection, transactionIds: List ): Map> /** Find filtered transactions with the given [ids] */ fun findFilteredTransactions( - entityManager: EntityManager, + connection: Connection, ids: List ): Map - fun findConsumedTransactionSourcesForTransaction(entityManager: EntityManager, transactionId: String, indexes: List): List + fun findConsumedTransactionSourcesForTransaction( + connection: Connection, + transactionId: String, + indexes: List + ): List fun findTransactionsWithStatusCreatedBetweenTime( - entityManager: EntityManager, + connection: Connection, status: TransactionStatus, from: Instant, until: Instant, limit: Int, ): List - fun incrementTransactionRepairAttemptCount(entityManager: EntityManager, id: String) + fun incrementTransactionRepairAttemptCount(connection: Connection, id: String) - fun stateRefsExist(entityManager: EntityManager, stateRefs: List): List> + fun stateRefsExist(connection: Connection, stateRefs: List): List> data class TransactionComponent(val transactionId: String, val groupIndex: Int, val leafIndex: Int, val leafData: ByteArray) diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/AbstractUtxoQueryProvider.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/AbstractUtxoQueryProvider.kt index bb1d65153f1..98b6e4deaa0 100644 --- a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/AbstractUtxoQueryProvider.kt +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/AbstractUtxoQueryProvider.kt @@ -15,11 +15,13 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { val VERIFIED = TransactionStatus.VERIFIED.value } + abstract fun wrapInList(placeHolder: String): String + override val findSignedTransactionIdsAndStatuses: String get() = """ SELECT id, status - FROM {h-schema}utxo_transaction - WHERE id IN (:transactionIds) + FROM utxo_transaction + WHERE id ${wrapInList(":transactionIds")} AND NOT (status = 'V' AND is_filtered = true) """.trimIndent() @@ -29,16 +31,16 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { id, privacy_salt, utm.canonical_data - FROM {h-schema}utxo_transaction AS ut - JOIN {h-schema}utxo_transaction_metadata AS utm + FROM utxo_transaction AS ut + JOIN utxo_transaction_metadata AS utm ON ut.metadata_hash = utm.hash - WHERE id IN (:transactionIds)""" + WHERE id ${wrapInList(":transactionIds")}""" .trimIndent() override val findTransactionComponentLeafs: String get() = """ SELECT group_idx, leaf_idx, data - FROM {h-schema}utxo_transaction_component + FROM utxo_transaction_component WHERE transaction_id = :transactionId ORDER BY group_idx, leaf_idx""" .trimIndent() @@ -49,16 +51,16 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { tc_output.leaf_idx, tc_output_info.data as output_info_data, tc_output.data AS output_data - FROM {h-schema}utxo_visible_transaction_output AS vto - JOIN {h-schema}utxo_transaction_component AS tc_output_info + FROM utxo_visible_transaction_output AS vto + JOIN utxo_transaction_component AS tc_output_info ON tc_output_info.transaction_id = vto.transaction_id AND tc_output_info.leaf_idx = vto.leaf_idx AND tc_output_info.group_idx = ${UtxoComponentGroup.OUTPUTS_INFO.ordinal} - JOIN {h-schema}utxo_transaction_component AS tc_output + JOIN utxo_transaction_component AS tc_output ON tc_output.transaction_id = tc_output_info.transaction_id AND tc_output.leaf_idx = tc_output_info.leaf_idx AND tc_output.group_idx = ${UtxoComponentGroup.OUTPUTS.ordinal} - JOIN {h-schema}utxo_transaction AS tx + JOIN utxo_transaction AS tx ON tx.id = tc_output.transaction_id AND vto.consumed IS NULL AND tx.status = :verified @@ -68,24 +70,24 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { override val findTransactionSignatures: String get() = """ SELECT signature - FROM {h-schema}utxo_transaction_signature + FROM utxo_transaction_signature WHERE transaction_id = :transactionId""" .trimIndent() override val findSignedTransactionStatus: String get() = """ SELECT status - FROM {h-schema}utxo_transaction + FROM utxo_transaction WHERE id = :transactionId AND NOT (status = 'V' AND is_filtered = true)""" .trimIndent() override val markTransactionVisibleStatesConsumed: String get() = """ - UPDATE {h-schema}utxo_visible_transaction_output + UPDATE utxo_visible_transaction_output SET consumed = :consumed - WHERE transaction_id in (:transactionIds) - AND (transaction_id || ':' || leaf_idx) IN (:stateRefs)""" + WHERE transaction_id ${wrapInList(":transactionIds")} + AND (transaction_id || ':' || leaf_idx) ${wrapInList(":stateRefs")}""" .trimIndent() override val findSignedGroupParameters: String @@ -95,7 +97,7 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { signature_public_key, signature_content, signature_spec - FROM {h-schema}utxo_group_parameters + FROM utxo_group_parameters WHERE hash = :hash""" .trimIndent() @@ -105,15 +107,15 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { tc_output.leaf_idx, tc_output_info.data as output_info_data, tc_output.data AS output_data - FROM {h-schema}utxo_transaction_component AS tc_output_info - JOIN {h-schema}utxo_transaction_component AS tc_output + FROM utxo_transaction_component AS tc_output_info + JOIN utxo_transaction_component AS tc_output ON tc_output.transaction_id = tc_output_info.transaction_id AND tc_output.leaf_idx = tc_output_info.leaf_idx AND tc_output.group_idx = ${UtxoComponentGroup.OUTPUTS.ordinal} - JOIN {h-schema}utxo_transaction AS tx + JOIN utxo_transaction AS tx ON tx.id = tc_output.transaction_id - AND tc_output.transaction_id in (:transactionIds) - AND (tc_output.transaction_id||':'|| tc_output.leaf_idx) in (:stateRefs) + AND tc_output.transaction_id ${wrapInList(":transactionIds")} + AND (tc_output.transaction_id||':'|| tc_output.leaf_idx) ${wrapInList(":stateRefs")} AND tx.status = :verified AND tc_output_info.group_idx = ${UtxoComponentGroup.OUTPUTS_INFO.ordinal} ORDER BY tx.created, tc_output.transaction_id, tc_output.leaf_idx""" @@ -121,9 +123,9 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { override val updateTransactionStatus: String get() = """ - UPDATE {h-schema}utxo_transaction SET status = :newStatus, updated = :updatedAt + UPDATE utxo_transaction SET status = :newStatus, updated = :updatedAt WHERE id = :transactionId - AND (status = :newStatus OR status = '$UNVERIFIED')""" + AND (status = :existingStatus OR status = '$UNVERIFIED')""" .trimIndent() /** @@ -192,20 +194,20 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { FROM utxo_transaction_merkle_proof_leaves utmpl WHERE utmpl.merkle_proof_id = utmp.merkle_proof_id ) - WHERE utmp.transaction_id IN (:transactionIds)""" + WHERE utmp.transaction_id ${wrapInList(":transactionIds")}""" .trimIndent() override val findConsumedTransactionSourcesForTransaction: String get() = """ - SELECT utxo_transaction_sources.source_state_idx from {h-schema}utxo_transaction_sources + SELECT utxo_transaction_sources.source_state_idx from utxo_transaction_sources WHERE source_state_transaction_id = :transactionId AND group_idx = ${UtxoComponentGroup.INPUTS.ordinal} - AND source_state_idx in :inputStateIndexes + AND source_state_idx ${wrapInList(":inputStateIndexes")} """.trimIndent() override val updateTransactionToVerified: String get() = """ - UPDATE {h-schema}utxo_transaction + UPDATE utxo_transaction SET status = '$VERIFIED', updated = :updatedAt, is_filtered = FALSE WHERE id = :transactionId AND status in ('$UNVERIFIED', '$DRAFT') @@ -215,16 +217,17 @@ abstract class AbstractUtxoQueryProvider : UtxoQueryProvider { override val findTransactionsWithStatusCreatedBetweenTime: String get() = """ SELECT id - FROM {h-schema}utxo_transaction + FROM utxo_transaction WHERE status = :status AND created >= :from AND created < :until ORDER BY repair_attempt_count ASC, created ASC + LIMIT :limit """.trimIndent() override val incrementRepairAttemptCount: String get() = """ - UPDATE {h-schema}utxo_transaction + UPDATE utxo_transaction SET repair_attempt_count = repair_attempt_count + 1 WHERE id = :transactionId """.trimIndent() diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/PostgresUtxoQueryProvider.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/PostgresUtxoQueryProvider.kt index 64352c51c0d..c125560df70 100644 --- a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/PostgresUtxoQueryProvider.kt +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/PostgresUtxoQueryProvider.kt @@ -3,9 +3,13 @@ package net.corda.ledger.libs.persistence.utxo.impl import net.corda.ledger.utxo.data.transaction.UtxoComponentGroup class PostgresUtxoQueryProvider : AbstractUtxoQueryProvider() { + override fun wrapInList(placeHolder: String): String { + return "= ANY($placeHolder)" + } + override val persistTransaction: String get() = """ - INSERT INTO {h-schema}utxo_transaction(id, privacy_salt, account_id, created, status, updated, metadata_hash, is_filtered, repair_attempt_count) + INSERT INTO utxo_transaction(id, privacy_salt, account_id, created, status, updated, metadata_hash, is_filtered, repair_attempt_count) VALUES (:id, :privacySalt, :accountId, :createdAt, :status, :updatedAt, :metadataHash, FALSE, 0) ON CONFLICT(id) DO UPDATE SET status = EXCLUDED.status, updated = EXCLUDED.updated, is_filtered = FALSE @@ -15,7 +19,7 @@ class PostgresUtxoQueryProvider : AbstractUtxoQueryProvider() { override val persistUnverifiedTransaction: String get() = """ - INSERT INTO {h-schema}utxo_transaction(id, privacy_salt, account_id, created, status, updated, metadata_hash, is_filtered, repair_attempt_count) + INSERT INTO utxo_transaction(id, privacy_salt, account_id, created, status, updated, metadata_hash, is_filtered, repair_attempt_count) VALUES (:id, :privacySalt, :accountId, :createdAt, '$UNVERIFIED', :updatedAt, :metadataHash, FALSE, 0) ON CONFLICT(id) DO UPDATE SET status = EXCLUDED.status, updated = EXCLUDED.updated @@ -37,7 +41,7 @@ class PostgresUtxoQueryProvider : AbstractUtxoQueryProvider() { override val persistTransactionMetadata: String get() = """ - INSERT INTO {h-schema}utxo_transaction_metadata(hash, canonical_data, group_parameters_hash, cpi_file_checksum) + INSERT INTO utxo_transaction_metadata(hash, canonical_data, group_parameters_hash, cpi_file_checksum) VALUES (:hash, :canonicalData, :groupParametersHash, :cpiFileChecksum) ON CONFLICT DO NOTHING""" .trimIndent() @@ -83,7 +87,7 @@ class PostgresUtxoQueryProvider : AbstractUtxoQueryProvider() { override val persistSignedGroupParameters: String get() = """ - INSERT INTO {h-schema}utxo_group_parameters( + INSERT INTO utxo_group_parameters( hash, parameters, signature_public_key, signature_content, signature_spec, created) VALUES ( :hash, :parameters, :signature_public_key, :signature_content, :signature_spec, :createdAt) diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoComponentGroupMapper.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoComponentGroupMapper.kt index 8d40326d983..e3c842a374d 100644 --- a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoComponentGroupMapper.kt +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoComponentGroupMapper.kt @@ -1,13 +1,18 @@ package net.corda.ledger.libs.persistence.utxo.impl +import net.corda.ledger.libs.persistence.common.ComponentGroupArrayMapper import net.corda.ledger.libs.persistence.common.ComponentGroupMapper import javax.persistence.Tuple /** * Used by [UtxoRepositoryImpl.findTransactionSignatures] to map DB rows to transaction's components group lists */ -class UtxoComponentGroupMapper(private val transactionId: String) : ComponentGroupMapper { +class UtxoComponentGroupMapper(private val transactionId: String) : ComponentGroupMapper, ComponentGroupArrayMapper { override fun map(tuples: List): Map> { + return mapColumns(tuples.map { it.toArray() }) + } + + override fun mapColumns(tuples: List>): Map> { val componentGroupLists: MutableMap> = mutableMapOf() tuples.forEach { columns -> val groupIdx = (columns[0] as Number).toInt() diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoPersistenceServiceImpl.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoPersistenceServiceImpl.kt index 567835159b1..3d57fe8bbcf 100644 --- a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoPersistenceServiceImpl.kt +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoPersistenceServiceImpl.kt @@ -7,6 +7,8 @@ import net.corda.crypto.cipher.suite.merkle.MerkleTreeProvider import net.corda.crypto.core.SecureHashImpl import net.corda.crypto.core.bytes import net.corda.crypto.core.parseSecureHash +import net.corda.db.core.utils.transaction +import net.corda.db.core.utils.transactionWithLogging import net.corda.ledger.common.data.transaction.SignedTransactionContainer import net.corda.ledger.common.data.transaction.TransactionMetadataInternal import net.corda.ledger.common.data.transaction.TransactionMetadataUtils.parseMetadata @@ -33,7 +35,7 @@ import net.corda.ledger.utxo.data.transaction.UtxoVisibleTransactionOutputDto import net.corda.ledger.utxo.data.transaction.WrappedUtxoWireTransaction import net.corda.ledger.utxo.data.transaction.toMerkleProof import net.corda.libs.json.validator.JsonValidator -import net.corda.orm.utils.transaction +import net.corda.utilities.debug import net.corda.utilities.serialization.deserialize import net.corda.utilities.time.Clock import net.corda.v5.application.crypto.DigestService @@ -54,13 +56,12 @@ import net.corda.v5.ledger.utxo.query.json.ContractStateVaultJsonFactory import org.slf4j.Logger import org.slf4j.LoggerFactory import java.security.MessageDigest +import java.sql.Connection import java.time.Instant -import javax.persistence.EntityManager -import javax.persistence.EntityManagerFactory @Suppress("LongParameterList", "TooManyFunctions", "LargeClass") class UtxoPersistenceServiceImpl( - private val entityManagerFactory: EntityManagerFactory, + private val connectionFactory: () -> Connection, private val repository: UtxoRepository, private val serializationService: SerializationService, private val sandboxDigestService: DigestService, @@ -72,7 +73,7 @@ class UtxoPersistenceServiceImpl( private val merkleTreeProvider: MerkleTreeProvider, private val filteredTransactionFactory: FilteredTransactionFactory, private val digestService: DigestService, - private val utcClock: Clock + private val utcClock: Clock, ) : UtxoPersistenceService { private companion object { @@ -83,21 +84,21 @@ class UtxoPersistenceServiceImpl( override fun findSignedTransaction( id: String, - transactionStatus: TransactionStatus + transactionStatus: TransactionStatus, ): Pair { - return entityManagerFactory.transaction { em -> - findSignedTransaction(id, transactionStatus, em) + return connectionFactory().use { conn -> + findSignedTransaction(id, transactionStatus, conn) } } private fun findSignedTransaction( id: String, transactionStatus: TransactionStatus, - em: EntityManager, + connection: Connection, ): Pair { - val status = repository.findSignedTransactionStatus(em, id) ?: return null to null + val status = repository.findSignedTransactionStatus(connection, id) ?: return null to null return if (status == transactionStatus.value) { - repository.findTransaction(em, id) + repository.findTransaction(connection, id) ?: throw InconsistentLedgerStateException("Transaction $id in status $transactionStatus has disappeared from the database") } else { null @@ -112,13 +113,15 @@ class UtxoPersistenceServiceImpl( // create payload map and make values null by default, if the value of certain filtered tx is null at the end, // it means it's not found. The error will be handled in flow side. - val txIdToFilteredTxAndSignature: MutableMap>> = stateRefs - .groupBy { it.transactionId } - .mapValues { (_, _) -> null to emptyList() }.toMutableMap() + val txIdToFilteredTxAndSignature: MutableMap>> = + stateRefs + .groupBy { it.transactionId } + .mapValues { (_, _) -> null to emptyList() }.toMutableMap() - return entityManagerFactory.transaction { em -> + return connectionFactory().use { conn -> txIdToIndexesMap.keys.forEach { transactionId -> - val signedTransactionContainer = findSignedTransaction(transactionId.toString(), TransactionStatus.VERIFIED, em).first + val signedTransactionContainer = + findSignedTransaction(transactionId.toString(), TransactionStatus.VERIFIED, conn).first val wireTransaction = signedTransactionContainer?.wireTransaction val signatures = signedTransactionContainer?.signatures ?: emptyList() val indexesOfTxId = txIdToIndexesMap[transactionId]!! @@ -165,7 +168,7 @@ class UtxoPersistenceServiceImpl( // find from filtered transaction table if there are still unfounded filtered txs val txIdToFilteredTxSignaturePairFromMerkleTable = if (transactionIdsToFind.isNotEmpty()) { - findFilteredTransactions(transactionIdsToFind, em) + findFilteredTransactions(transactionIdsToFind, conn) } else { emptyMap() } @@ -174,21 +177,21 @@ class UtxoPersistenceServiceImpl( } override fun findSignedTransactionIdsAndStatuses( - transactionIds: List + transactionIds: List, ): Map { - return entityManagerFactory.transaction { em -> - repository.findSignedTransactionIdsAndStatuses(em, transactionIds) + connectionFactory().use { conn -> + return repository.findSignedTransactionIdsAndStatuses(conn, transactionIds) } } override fun findSignedLedgerTransaction( id: String, - transactionStatus: TransactionStatus + transactionStatus: TransactionStatus, ): Pair { - return entityManagerFactory.transaction { em -> - val status = repository.findSignedTransactionStatus(em, id) ?: return null to null + return connectionFactory().use { conn -> + val status = repository.findSignedTransactionStatus(conn, id) ?: return null to null if (status == transactionStatus.value) { - val (transaction, signatures) = repository.findTransaction(em, id) + val (transaction, signatures) = repository.findTransaction(conn, id) ?.let { WrappedUtxoWireTransaction(it.wireTransaction, serializationService) to it.signatures } ?: throw InconsistentLedgerStateException("Transaction $id in status $status has disappeared from the database") @@ -196,7 +199,7 @@ class UtxoPersistenceServiceImpl( // Note: calling the `resolveStateRefs` function would result in a new connection being established, // so we call the repository directly instead - val stateRefsToStateAndRefs = repository.resolveStateRefs(em, allStateRefs) + val stateRefsToStateAndRefs = repository.resolveStateRefs(conn, allStateRefs) .associateBy { StateRef(parseSecureHash(it.transactionId), it.leafIndex) } val inputStateAndRefs = transaction.inputStateRefs.map { @@ -208,7 +211,12 @@ class UtxoPersistenceServiceImpl( ?: throw CordaRuntimeException("Could not find reference StateRef $it when finding transaction $id") } - SignedLedgerTransactionContainer(transaction.wireTransaction, inputStateAndRefs, referenceStateAndRefs, signatures) + SignedLedgerTransactionContainer( + transaction.wireTransaction, + inputStateAndRefs, + referenceStateAndRefs, + signatures + ) } else { null } to status @@ -216,8 +224,8 @@ class UtxoPersistenceServiceImpl( } override fun findUnconsumedVisibleStatesByType(stateClass: Class): List { - return entityManagerFactory.transaction { em -> - repository.findUnconsumedVisibleStatesByType(em) + return connectionFactory().use { conn -> + repository.findUnconsumedVisibleStatesByType(conn) }.filter { val contractState = serializationService.deserialize(it.data) stateClass.isInstance(contractState) @@ -225,34 +233,42 @@ class UtxoPersistenceServiceImpl( } override fun resolveStateRefs(stateRefs: List): List { - return entityManagerFactory.transaction { em -> - repository.resolveStateRefs(em, stateRefs) + return connectionFactory().use { conn -> + repository.resolveStateRefs(conn, stateRefs) } } private fun hash(data: ByteArray) = sandboxDigestService.hash(data, DigestAlgorithmName.SHA2_256).toString() - override fun persistTransaction(transaction: UtxoTransactionReader, utxoTokenMap: Map): Instant { + override fun persistTransaction( + transaction: UtxoTransactionReader, + utxoTokenMap: Map, + ): Instant { return persistTransaction(transaction, utxoTokenMap) { block -> - entityManagerFactory.transaction { em -> block(em) } + connectionFactory().transactionWithLogging("persistTransaction") { conn -> + block(conn) + } } } override fun persistTransactionIfDoesNotExist(transaction: UtxoTransactionReader): String { - entityManagerFactory.transaction { em -> - val transactionIdString = transaction.id.toString() - val status = repository.findSignedTransactionStatus(em, transactionIdString) ?: run { - persistTransaction(transaction, emptyMap()) { block -> block(em) } - return "" + val transactionIdString = transaction.id.toString() + log.debug { "Persist transaction $transactionIdString if not exist." } + return connectionFactory().transactionWithLogging("persistTransactionIfDoesNotExist") { conn -> + val status = repository.findSignedTransactionStatus(conn, transactionIdString) ?: let { + persistTransaction(transaction, emptyMap()) { block -> block(conn) } + "" } - return status + log.debug { "Non-existing transaction $transactionIdString persisted ($status)." } + status } } + @Suppress("LongMethod") private inline fun persistTransaction( transaction: UtxoTransactionReader, utxoTokenMap: Map, - optionalTransactionBlock: ((EntityManager) -> Unit) -> Unit + optionalTransactionBlock: ((Connection) -> Unit) -> Unit, ): Instant { val nowUtc = utcClock.instant() val transactionIdString = transaction.id.toString() @@ -280,10 +296,10 @@ class UtxoPersistenceServiceImpl( ) } - optionalTransactionBlock { em -> - + optionalTransactionBlock { conn -> + log.debug { "Persist metadata $metadataHash for transaction $transactionIdString" } repository.persistTransactionMetadata( - em, + conn, metadataHash, metadataBytes, requireNotNull(metadata.getMembershipGroupParametersHash()) { "Metadata without membership group parameters hash" }, @@ -292,6 +308,7 @@ class UtxoPersistenceServiceImpl( val inserted = if (transaction.status != TransactionStatus.UNVERIFIED) { + log.debug { "Persist transaction $transactionIdString" } /** * Insert the Transaction * The record will only be inserted when: @@ -300,7 +317,7 @@ class UtxoPersistenceServiceImpl( * - there is record which is DRAFT stx * */ repository.persistTransaction( - em, + conn, transactionIdString, transaction.privacySalt.bytes, transaction.account, @@ -309,9 +326,10 @@ class UtxoPersistenceServiceImpl( metadataHash, ) } else { + log.debug { "Persist unverified transaction $transactionIdString" } // ignore if the incoming transaction is an unverified stx repository.persistUnverifiedTransaction( - em, + conn, transactionIdString, transaction.privacySalt.bytes, transaction.account, @@ -320,23 +338,41 @@ class UtxoPersistenceServiceImpl( ) null } + log.debug { "Transaction $transactionIdString persisted (inserted=$inserted)" } + log.debug { "Persist transaction components for $transactionIdString" } repository.persistTransactionComponents( - em, + conn, transactionIdString, transaction.rawGroupLists, this::hash ) val consumedTransactionSources = transaction.getConsumedStateRefs().mapIndexed { index, input -> - UtxoRepository.TransactionSource(UtxoComponentGroup.INPUTS, index, input.transactionId.toString(), input.index) + UtxoRepository.TransactionSource( + UtxoComponentGroup.INPUTS, + index, + input.transactionId.toString(), + input.index + ) } val referenceTransactionSources = transaction.getReferenceStateRefs().mapIndexed { index, input -> - UtxoRepository.TransactionSource(UtxoComponentGroup.REFERENCES, index, input.transactionId.toString(), input.index) + UtxoRepository.TransactionSource( + UtxoComponentGroup.REFERENCES, + index, + input.transactionId.toString(), + input.index + ) } - repository.persistTransactionSources(em, transactionIdString, consumedTransactionSources + referenceTransactionSources) + val sources = consumedTransactionSources + referenceTransactionSources + log.debug { "Persist transaction sources for $transactionIdString - $sources" } + repository.persistTransactionSources( + conn, + transactionIdString, + sources + ) // rectify data from U -> V if (inserted != null && !inserted) { @@ -344,31 +380,35 @@ class UtxoPersistenceServiceImpl( // tx not being inserted implies that tx of the same ID exists in the table, and need to be rectified val indexes = repository.findConsumedTransactionSourcesForTransaction( - em, + conn, transactionIdString, visibleTransactionOutputs.map { output -> output.stateIndex } ) // insert outputs to be able to mark spent outputs as consumed + log.debug { "Persist visible transaction outputs for $transactionIdString" } repository.persistVisibleTransactionOutputs( - em, + conn, transactionIdString, nowUtc, visibleTransactionOutputs ) if (indexes.isNotEmpty()) { + log.debug { "Persist transaction visible states consumed for $transactionIdString" } repository.markTransactionVisibleStatesConsumed( - em, + conn, indexes.map { index -> StateRef(transaction.id, index) }, nowUtc ) } - repository.updateTransactionToVerified(em, transactionIdString, nowUtc) + log.debug { "Update transaction $transactionIdString to verified" } + repository.updateTransactionToVerified(conn, transactionIdString, nowUtc) } else { // outputs of stx UNVERIFIED would be empty + log.debug { "Persist visible transaction outputs for $transactionIdString" } repository.persistVisibleTransactionOutputs( - em, + conn, transactionIdString, nowUtc, visibleTransactionOutputs @@ -379,8 +419,9 @@ class UtxoPersistenceServiceImpl( if (transaction.status == TransactionStatus.VERIFIED) { val inputStateRefs = transaction.getConsumedStateRefs() if (inputStateRefs.isNotEmpty()) { + log.debug { "Mark transaction visible states consumed for $transactionIdString" } repository.markTransactionVisibleStatesConsumed( - em, + conn, inputStateRefs, nowUtc ) @@ -388,9 +429,12 @@ class UtxoPersistenceServiceImpl( } // Insert the Transactions signatures - repository.persistTransactionSignatures(em, transactionSignatures, nowUtc) + log.debug { "Persist visible transaction signatures for $transactionIdString" } + repository.persistTransactionSignatures(conn, transactionSignatures, nowUtc) } + log.debug { "Persist transaction $transactionIdString COMPLETED $nowUtc" } + return nowUtc } @@ -404,14 +448,14 @@ class UtxoPersistenceServiceImpl( ) } - entityManagerFactory.transaction { em -> - repository.persistTransactionSignatures(em, transactionSignatures, utcClock.instant()) + connectionFactory().use { conn -> + repository.persistTransactionSignatures(conn, transactionSignatures, utcClock.instant()) } } override fun updateStatus(id: String, transactionStatus: TransactionStatus) { - entityManagerFactory.transaction { em -> - repository.updateTransactionStatus(em, id, transactionStatus, utcClock.instant()) + connectionFactory().transaction { conn -> + repository.updateTransactionStatus(conn, id, transactionStatus, utcClock.instant()) } } @@ -447,7 +491,10 @@ class UtxoPersistenceServiceImpl( Any::class.java ) } catch (e: Exception) { - log.warn("Error while processing factory for class: ${ContractState::class.java.name}. Defaulting to empty JSON.", e) + log.warn( + "Error while processing factory for class: ${ContractState::class.java.name}. Defaulting to empty JSON.", + e + ) jsonMarshallingService.parse("{}", Any::class.java) } @@ -461,17 +508,17 @@ class UtxoPersistenceServiceImpl( } override fun findSignedGroupParameters(hash: String): SignedGroupParameters? { - return entityManagerFactory.transaction { em -> - repository.findSignedGroupParameters(em, hash) + return connectionFactory().use { conn -> + repository.findSignedGroupParameters(conn, hash) } } override fun persistSignedGroupParametersIfDoNotExist(signedGroupParameters: SignedGroupParameters) { val hash = signedGroupParameters.groupParameters.hash(DigestAlgorithmName.SHA2_256).toString() if (findSignedGroupParameters(hash) == null) { - entityManagerFactory.transaction { em -> + connectionFactory().transaction { conn -> repository.persistSignedGroupParameters( - em, + conn, hash, signedGroupParameters, utcClock.instant() @@ -485,17 +532,18 @@ class UtxoPersistenceServiceImpl( filteredTransactionsAndSignatures: Map>, inputStateRefs: List, referenceStateRefs: List, - account: String + account: String, ) { - entityManagerFactory.transaction { em -> - + connectionFactory().transaction { conn -> val seenMetadata = mutableSetOf() - val transactionsToSkip = findFilteredTransactionsToSkipPersisting(em, inputStateRefs, referenceStateRefs) + val transactionsToSkip = + findFilteredTransactionsToSkipPersisting(conn, inputStateRefs, referenceStateRefs) val filteredTransactionsToPersist = filteredTransactionsAndSignatures .filterNot { (filteredTransaction, _) -> filteredTransaction.id in transactionsToSkip } .map { (filteredTransaction, signatures) -> - val metadataHash = persistTransactionMetadataIfNotAlreadySeen(em, filteredTransaction, seenMetadata) + val metadataHash = + persistTransactionMetadataIfNotAlreadySeen(conn, filteredTransaction, seenMetadata) createFilteredTransactionToPersist(filteredTransaction, signatures, account, metadataHash) } @@ -503,12 +551,16 @@ class UtxoPersistenceServiceImpl( // If the same id of UNVERIFIED or DRAFT stx exists, leaving the status as it is. repository.persistFilteredTransactions( - em, + conn, filteredTransactionsToPersist.map { it.filteredTransaction }, nowUtc ) - repository.persistTransactionSignatures(em, filteredTransactionsToPersist.flatMap { it.signatures }, nowUtc) + repository.persistTransactionSignatures( + conn, + filteredTransactionsToPersist.flatMap { it.signatures }, + nowUtc + ) // No need to persist the leaf data for the top level merkle proof as we can reconstruct that val topLevelProofs = filteredTransactionsToPersist.map(FilteredTransactionToPersist::topLevelProof) @@ -516,17 +568,17 @@ class UtxoPersistenceServiceImpl( .flatMap(FilteredTransactionToPersist::componentGroupProofs) .map(TransactionMerkleProofToPersist::proof) - repository.persistMerkleProofs(em, topLevelProofs + componentGroupProofs) + repository.persistMerkleProofs(conn, topLevelProofs + componentGroupProofs) repository.persistMerkleProofLeaves( - em, + conn, filteredTransactionsToPersist .flatMap(FilteredTransactionToPersist::componentGroupProofs) .flatMap(TransactionMerkleProofToPersist::leaves) ) repository.persistTransactionComponents( - em, + conn, filteredTransactionsToPersist .flatMap(FilteredTransactionToPersist::componentGroupProofs) .flatMap(TransactionMerkleProofToPersist::components), @@ -536,14 +588,15 @@ class UtxoPersistenceServiceImpl( } private fun findFilteredTransactionsToSkipPersisting( - em: EntityManager, + connection: Connection, inputStateRefs: List, - referenceStateRefs: List + referenceStateRefs: List, ): Set { return if (referenceStateRefs.isNotEmpty()) { - val existingReferenceStates = repository.stateRefsExist(em, referenceStateRefs).map { (transactionId, index) -> - StateRef(digestService.parseSecureHash(transactionId), index) - } + val existingReferenceStates = + repository.stateRefsExist(connection, referenceStateRefs).map { (transactionId, index) -> + StateRef(digestService.parseSecureHash(transactionId), index) + } val missingReferenceStates = referenceStateRefs - existingReferenceStates.toSet() val missingReferenceStateTransactions = missingReferenceStates.distinctTransactionIds() val existingReferenceStateTransactions = existingReferenceStates.distinctTransactionIds() @@ -556,9 +609,9 @@ class UtxoPersistenceServiceImpl( } private fun persistTransactionMetadataIfNotAlreadySeen( - em: EntityManager, + connection: Connection, filteredTransaction: FilteredTransaction, - seenMetadata: MutableSet + seenMetadata: MutableSet, ): SecureHash { // 1. Get the metadata bytes from the 0th component group merkle proof and create the hash val metadataBytes = filteredTransaction.filteredComponentGroups[0] @@ -575,8 +628,9 @@ class UtxoPersistenceServiceImpl( val metadataHash = sandboxDigestService.hash(metadataBytes, DigestAlgorithmName.SHA2_256) if (seenMetadata.add(metadataHash)) { + log.debug { "Persist transaction metadata for ${filteredTransaction.id}" } repository.persistTransactionMetadata( - em, + connection, metadataHash.toString(), metadataBytes, requireNotNull(metadata.getMembershipGroupParametersHash()) { @@ -594,7 +648,7 @@ class UtxoPersistenceServiceImpl( filteredTransaction: FilteredTransaction, signatures: List, account: String, - metadataHash: SecureHash + metadataHash: SecureHash, ): FilteredTransactionToPersist { val filteredTransactionId = filteredTransaction.id.toString() val transactionSignatures = signatures.map { signature -> @@ -613,30 +667,31 @@ class UtxoPersistenceServiceImpl( filteredTransaction.topLevelMerkleProof.hashes.map { it.toString() } ) - val componentGroupTransactionMerkleProofs = filteredTransaction.filteredComponentGroups.map { (groupIndex, groupData) -> - val proof = createTransactionMerkleProof( - filteredTransactionId, - groupIndex, - groupData.merkleProof.treeSize, - groupData.merkleProof.leaves.map { it.index }, - groupData.merkleProof.hashes.map { it.toString() } - ) - val leaves = groupData.merkleProof.leaves.map { leaf -> - UtxoRepository.TransactionMerkleProofLeaf( - proof.merkleProofId, - leaf.index - ) - } - val components = groupData.merkleProof.leaves.map { leaf -> - UtxoRepository.TransactionComponent( + val componentGroupTransactionMerkleProofs = + filteredTransaction.filteredComponentGroups.map { (groupIndex, groupData) -> + val proof = createTransactionMerkleProof( filteredTransactionId, groupIndex, - leaf.index, - leaf.leafData + groupData.merkleProof.treeSize, + groupData.merkleProof.leaves.map { it.index }, + groupData.merkleProof.hashes.map { it.toString() } ) + val leaves = groupData.merkleProof.leaves.map { leaf -> + UtxoRepository.TransactionMerkleProofLeaf( + proof.merkleProofId, + leaf.index + ) + } + val components = groupData.merkleProof.leaves.map { leaf -> + UtxoRepository.TransactionComponent( + filteredTransactionId, + groupIndex, + leaf.index, + leaf.leafData + ) + } + TransactionMerkleProofToPersist(proof, leaves, components) } - TransactionMerkleProofToPersist(proof, leaves, components) - } return FilteredTransactionToPersist( UtxoRepository.FilteredTransaction( @@ -653,16 +708,16 @@ class UtxoPersistenceServiceImpl( @VisibleForTesting fun findFilteredTransactions( - transactionIds: List + transactionIds: List, ): Map>> { - return entityManagerFactory.transaction { em -> findFilteredTransactions(transactionIds, em) } + return connectionFactory().use { conn -> findFilteredTransactions(transactionIds, conn) } } private fun findFilteredTransactions( transactionIds: List, - em: EntityManager + connection: Connection, ): Map>> { - return repository.findFilteredTransactions(em, transactionIds) + return repository.findFilteredTransactions(connection, transactionIds) .map { (transactionId, ftxDto) -> // Map through each found transaction @@ -696,35 +751,37 @@ class UtxoPersistenceServiceImpl( // 2. Merge the Merkle proofs for each component group val componentDigestProviders = mutableMapOf() - val mergedMerkleProofs = ftxDto.componentMerkleProofMap.mapValues { (componentGroupIndex, merkleProofDtoList) -> - val componentGroupHashDigestProvider = filteredTransactionMetadata.getComponentGroupMerkleTreeDigestProvider( - ftxDto.privacySalt!!, - componentGroupIndex, - merkleTreeProvider, - digestService - ) - componentDigestProviders[componentGroupIndex] = componentGroupHashDigestProvider - merkleProofDtoList.map { merkleProofDto -> - // Transform the MerkleProofDto objects to MerkleProof objects - // If the merkle proof is metadata, we need to add the bytes because it's not part of the component table - val merkleProofDtoOverride = if (merkleProofDto.groupIndex == 0) { - merkleProofDto.copy(leavesWithData = mapOf(0 to ftxDto.metadataBytes!!)) - } else { - merkleProofDto - } + val mergedMerkleProofs = + ftxDto.componentMerkleProofMap.mapValues { (componentGroupIndex, merkleProofDtoList) -> + val componentGroupHashDigestProvider = + filteredTransactionMetadata.getComponentGroupMerkleTreeDigestProvider( + ftxDto.privacySalt!!, + componentGroupIndex, + merkleTreeProvider, + digestService + ) + componentDigestProviders[componentGroupIndex] = componentGroupHashDigestProvider + merkleProofDtoList.map { merkleProofDto -> + // Transform the MerkleProofDto objects to MerkleProof objects + // If the merkle proof is metadata, we need to add the bytes because it's not part of the component table + val merkleProofDtoOverride = if (merkleProofDto.groupIndex == 0) { + merkleProofDto.copy(leavesWithData = mapOf(0 to ftxDto.metadataBytes!!)) + } else { + merkleProofDto + } - merkleProofDtoOverride.toMerkleProof( - merkleProofFactory, - componentGroupHashDigestProvider - ) - }.reduce { accumulator, merkleProof -> - // Then keep merging the elements into each other - (accumulator as MerkleProofInternal).merge( - merkleProof, - componentGroupHashDigestProvider - ) + merkleProofDtoOverride.toMerkleProof( + merkleProofFactory, + componentGroupHashDigestProvider + ) + }.reduce { accumulator, merkleProof -> + // Then keep merging the elements into each other + (accumulator as MerkleProofInternal).merge( + merkleProof, + componentGroupHashDigestProvider + ) + } } - } // 3. Calculate the root hash of each component group merkle proof val calculatedComponentGroupRootsHashes = mergedMerkleProofs.map { (componentGroupIndex, merkleProof) -> @@ -748,7 +805,8 @@ class UtxoPersistenceServiceImpl( it.visibleLeaves.associateWith { componentGroupIndex -> // Use the already calculated component group root - val componentGroupRootBytes = calculatedComponentGroupRootsHashes[componentGroupIndex]?.bytes + val componentGroupRootBytes = + calculatedComponentGroupRootsHashes[componentGroupIndex]?.bytes // At this point we should have this available requireNotNull(componentGroupRootBytes) { @@ -789,15 +847,15 @@ class UtxoPersistenceServiceImpl( until: Instant, limit: Int, ): List { - return entityManagerFactory.transaction { em -> - repository.findTransactionsWithStatusCreatedBetweenTime(em, status, from, until, limit) + return connectionFactory().use { conn -> + repository.findTransactionsWithStatusCreatedBetweenTime(conn, status, from, until, limit) .map { id -> digestService.parseSecureHash(id) } } } override fun incrementTransactionRepairAttemptCount(id: String) { - entityManagerFactory.transaction { em -> - repository.incrementTransactionRepairAttemptCount(em, id) + connectionFactory().transaction { conn -> + repository.incrementTransactionRepairAttemptCount(conn, id) } } @@ -806,7 +864,7 @@ class UtxoPersistenceServiceImpl( groupIndex: Int, treeSize: Int, leafIndexes: List, - leafHashes: List + leafHashes: List, ): UtxoRepository.TransactionMerkleProof { return UtxoRepository.TransactionMerkleProof( digestService.hash( @@ -829,13 +887,13 @@ class UtxoPersistenceServiceImpl( val filteredTransaction: UtxoRepository.FilteredTransaction, val topLevelProof: UtxoRepository.TransactionMerkleProof, val componentGroupProofs: List, - val signatures: List + val signatures: List, ) private data class TransactionMerkleProofToPersist( val proof: UtxoRepository.TransactionMerkleProof, val leaves: List, - val components: List + val components: List, ) /** diff --git a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoRepositoryImpl.kt b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoRepositoryImpl.kt index cf926bad19d..a2e29efa7b5 100644 --- a/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoRepositoryImpl.kt +++ b/libs/ledger-lib-persistence/src/main/kotlin/net/corda/ledger/libs/persistence/utxo/impl/UtxoRepositoryImpl.kt @@ -7,6 +7,8 @@ import net.corda.ledger.common.data.transaction.SignedTransactionContainer import net.corda.ledger.common.data.transaction.TransactionStatus import net.corda.ledger.common.data.transaction.factory.WireTransactionFactory import net.corda.ledger.libs.persistence.common.mapToComponentGroups +import net.corda.ledger.libs.persistence.util.NamedParamQuery +import net.corda.ledger.libs.persistence.util.NamedParamStatement import net.corda.ledger.libs.persistence.utxo.SignatureSpec import net.corda.ledger.libs.persistence.utxo.SignatureWithKey import net.corda.ledger.libs.persistence.utxo.SignedGroupParameters @@ -21,7 +23,6 @@ import net.corda.v5.application.crypto.DigitalSignatureAndMetadata import net.corda.v5.application.serialization.SerializationService import net.corda.v5.crypto.SecureHash import net.corda.v5.ledger.utxo.StateRef -import org.hibernate.Session import org.slf4j.LoggerFactory import java.sql.Connection import java.sql.Timestamp @@ -29,7 +30,6 @@ import java.sql.Types import java.time.Instant import java.util.Calendar import java.util.TimeZone -import javax.persistence.EntityManager import javax.persistence.Query import javax.persistence.Tuple @@ -38,7 +38,7 @@ class UtxoRepositoryImpl( private val batchPersistenceService: BatchPersistenceService, private val serializationService: SerializationService, private val wireTransactionFactory: WireTransactionFactory, - private val queryProvider: UtxoQueryProvider + private val queryProvider: UtxoQueryProvider, ) : UtxoRepository { private companion object { private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) @@ -47,117 +47,166 @@ class UtxoRepositoryImpl( private val utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")) } + private val findSignedTransactionIdsAndStatusesSql = + NamedParamQuery.from(queryProvider.findSignedTransactionIdsAndStatuses) + private val findTransactionsPrivacySaltAndMetadataSql = + NamedParamQuery.from(queryProvider.findTransactionsPrivacySaltAndMetadata) + private val findTransactionComponentLeafsSql = + NamedParamQuery.from(queryProvider.findTransactionComponentLeafs) + private val findTransactionSignaturesSql = + NamedParamQuery.from(queryProvider.findTransactionSignatures) + private val findSignedTransactionStatusSql = + NamedParamQuery.from(queryProvider.findSignedTransactionStatus) + private val findMerkleProofsSql = + NamedParamQuery.from(queryProvider.findMerkleProofs) + private val findUnconsumedVisibleStatesByTypeSql = + NamedParamQuery.from(queryProvider.findUnconsumedVisibleStatesByType) + private val resolveStateRefsSql = + NamedParamQuery.from(queryProvider.resolveStateRefs) + private val markTransactionVisibleStatesConsumedSql = + NamedParamQuery.from(queryProvider.markTransactionVisibleStatesConsumed) + private val persistTransactionMetadataSql = + NamedParamQuery.from(queryProvider.persistTransactionMetadata) + private val persistTransactionSql = + NamedParamQuery.from(queryProvider.persistTransaction) + private val persistUnverifiedTransactionSql = + NamedParamQuery.from(queryProvider.persistUnverifiedTransaction) + private val findConsumedTransactionSourcesForTransactionSql = + NamedParamQuery.from(queryProvider.findConsumedTransactionSourcesForTransaction) + private val updateTransactionToVerifiedSql = + NamedParamQuery.from(queryProvider.updateTransactionToVerified) + private val updateTransactionStatusSql = + NamedParamQuery.from(queryProvider.updateTransactionStatus) + private val persistSignedGroupParametersSql = + NamedParamQuery.from(queryProvider.persistSignedGroupParameters) + private val findSignedGroupParametersSql = + NamedParamQuery.from(queryProvider.findSignedGroupParameters) + private val findTransactionsWithStatusCreatedBetweenTimeSql = + NamedParamQuery.from(queryProvider.findTransactionsWithStatusCreatedBetweenTime) + private val incrementRepairAttemptCountSql = + NamedParamQuery.from(queryProvider.incrementRepairAttemptCount) + override fun findTransaction( - entityManager: EntityManager, - id: String + connection: Connection, + id: String, ): SignedTransactionContainer? { - val (privacySalt, metadataBytes) = findTransactionsPrivacySaltAndMetadata(entityManager, listOf(id))[id] + val (privacySalt, metadataBytes) = findTransactionsPrivacySaltAndMetadata(connection, listOf(id))[id] ?: return null val wireTransaction = wireTransactionFactory.create( - mapOf(0 to listOf(metadataBytes)) + findTransactionComponentLeafs(entityManager, id), + mapOf(0 to listOf(metadataBytes)) + findTransactionComponentLeafs(connection, id), privacySalt ) return SignedTransactionContainer( wireTransaction, - findTransactionSignatures(entityManager, id) + findTransactionSignatures(connection, id) ) } override fun findSignedTransactionIdsAndStatuses( - entityManager: EntityManager, - transactionIds: List + connection: Connection, + transactionIds: List, ): Map { - return entityManager.createNativeQuery(queryProvider.findSignedTransactionIdsAndStatuses, Tuple::class.java) - .setParameter("transactionIds", transactionIds) - .resultListAsTuples() - .associate { r -> parseSecureHash(r.get(0) as String) to r.get(1) as String } + NamedParamStatement(findSignedTransactionIdsAndStatusesSql, connection).use { stmt -> + stmt.setStrings("transactionIds", transactionIds) + return stmt.executeQueryAsList { + parseSecureHash(it.getString(1)) to it.getString(2) + }.toMap() + } } private fun findTransactionsPrivacySaltAndMetadata( - entityManager: EntityManager, - transactionIds: List + connection: Connection, + transactionIds: List, ): Map?> { - return entityManager.createNativeQuery(queryProvider.findTransactionsPrivacySaltAndMetadata, Tuple::class.java) - .setParameter("transactionIds", transactionIds) - .resultListAsTuples().associate { r -> - r.get(0) as String to Pair(PrivacySaltImpl(r.get(1) as ByteArray), r.get(2) as ByteArray) - } + NamedParamStatement(findTransactionsPrivacySaltAndMetadataSql, connection).use { stmt -> + stmt.setStrings("transactionIds", transactionIds) + return stmt.executeQueryAsList { + it.getString(1) to Pair(PrivacySaltImpl(it.getBytes(2)), it.getBytes(3)) + }.toMap() + } } override fun findTransactionComponentLeafs( - entityManager: EntityManager, - transactionId: String + connection: Connection, + transactionId: String, ): Map> { - return entityManager.createNativeQuery(queryProvider.findTransactionComponentLeafs, Tuple::class.java) - .setParameter("transactionId", transactionId) - .resultListAsTuples() - .mapToComponentGroups(UtxoComponentGroupMapper(transactionId)) - } - - private fun findUnconsumedVisibleStates( - entityManager: EntityManager, - query: String, - stateClassType: String? - ): List { - val queryObj = entityManager.createNativeQuery(query, Tuple::class.java) - .setParameter("verified", TransactionStatus.VERIFIED.value) - - if (stateClassType != null) { - queryObj.setParameter("type", stateClassType) + NamedParamStatement(findTransactionComponentLeafsSql, connection).use { stmt -> + stmt.setString("transactionId", transactionId) + return stmt.executeQueryAsListOfColumns() + .mapToComponentGroups(UtxoComponentGroupMapper(transactionId)) } - - return queryObj.mapToUtxoVisibleTransactionOutputDto() } override fun findUnconsumedVisibleStatesByType( - entityManager: EntityManager + connection: Connection, ): List { - return findUnconsumedVisibleStates(entityManager, queryProvider.findUnconsumedVisibleStatesByType, null) + NamedParamStatement(findUnconsumedVisibleStatesByTypeSql, connection).use { stmt -> + stmt.setString("verified", TransactionStatus.VERIFIED.value) + return stmt.executeQueryAsList { + UtxoVisibleTransactionOutputDto( + it.getString(1), // transactionId + it.getInt(2), // leaf ID + it.getBytes(3), // outputs info data + it.getBytes(4) // outputs data + ) + } + } } override fun resolveStateRefs( - entityManager: EntityManager, - stateRefs: List + connection: Connection, + stateRefs: List, ): List { - return entityManager.createNativeQuery(queryProvider.resolveStateRefs, Tuple::class.java) - .setParameter("transactionIds", stateRefs.map { it.transactionId.toString() }) - .setParameter("stateRefs", stateRefs.map { it.toString() }) - .setParameter("verified", TransactionStatus.VERIFIED.value) - .mapToUtxoVisibleTransactionOutputDto() + NamedParamStatement(resolveStateRefsSql, connection).use { stmt -> + stmt.setStrings("transactionIds", stateRefs.map { it.transactionId.toString() }) + stmt.setStrings("stateRefs", stateRefs.map { it.toString() }) + stmt.setString("verified", TransactionStatus.VERIFIED.value) + return stmt.executeQueryAsList { + UtxoVisibleTransactionOutputDto( + it.getString(1), // transactionId + it.getInt(2), // leaf ID + it.getBytes(3), // outputs info data + it.getBytes(4) // outputs data + ) + } + } } override fun findTransactionSignatures( - entityManager: EntityManager, - transactionId: String + connection: Connection, + transactionId: String, ): List { - return entityManager.createNativeQuery(queryProvider.findTransactionSignatures, Tuple::class.java) - .setParameter("transactionId", transactionId) - .resultListAsTuples() - .map { r -> serializationService.deserialize(r.get(0) as ByteArray) } + NamedParamStatement(findTransactionSignaturesSql, connection).use { stmt -> + stmt.setString("transactionId", transactionId) + return stmt.executeQueryAsList { + serializationService.deserialize(it.getBytes(1)) + } + } } - override fun findSignedTransactionStatus(entityManager: EntityManager, id: String): String? { - return entityManager.createNativeQuery(queryProvider.findSignedTransactionStatus, Tuple::class.java) - .setParameter("transactionId", id) - .resultListAsTuples() - .map { r -> r.get(0) as String } - .singleOrNull() + override fun findSignedTransactionStatus(connection: Connection, id: String): String? { + NamedParamStatement(findSignedTransactionStatusSql, connection).use { stmt -> + stmt.setString("transactionId", id) + return stmt.executeQueryAsList { it.getString(1) }.singleOrNull() + } } override fun markTransactionVisibleStatesConsumed( - entityManager: EntityManager, + connection: Connection, stateRefs: List, - timestamp: Instant + timestamp: Instant, ) { - entityManager.createNativeQuery(queryProvider.markTransactionVisibleStatesConsumed) - .setParameter("consumed", timestamp) - .setParameter("transactionIds", stateRefs.map { it.transactionId.toString() }) - .setParameter("stateRefs", stateRefs.map(StateRef::toString)) - .executeUpdate() + NamedParamStatement(markTransactionVisibleStatesConsumedSql, connection).use { stmt -> + stmt.setInstant("consumed", timestamp) + stmt.setStrings("transactionIds", stateRefs.map { it.transactionId.toString() }) + stmt.setStrings("stateRefs", stateRefs.map(StateRef::toString)) + + stmt.executeUpdate() + } } override fun persistTransaction( - entityManager: EntityManager, + connection: Connection, id: String, privacySalt: ByteArray, account: String, @@ -165,105 +214,106 @@ class UtxoRepositoryImpl( status: TransactionStatus, metadataHash: String, ): Boolean { - return entityManager.createNativeQuery(queryProvider.persistTransaction) - .setParameter("id", id) - .setParameter("privacySalt", privacySalt) - .setParameter("accountId", account) - .setParameter("createdAt", timestamp) - .setParameter("status", status.value) - .setParameter("updatedAt", timestamp) - .setParameter("metadataHash", metadataHash) - .executeUpdate() != 0 + NamedParamStatement(persistTransactionSql, connection).use { stmt -> + stmt.setString("id", id) + stmt.setBytes("privacySalt", privacySalt) + stmt.setString("accountId", account) + stmt.setInstant("createdAt", timestamp) + stmt.setString("status", status.value) + stmt.setInstant("updatedAt", timestamp) + stmt.setString("metadataHash", metadataHash) + + return stmt.executeUpdate() != 0 + } } override fun persistUnverifiedTransaction( - entityManager: EntityManager, + connection: Connection, id: String, privacySalt: ByteArray, account: String, timestamp: Instant, metadataHash: String, ) { - entityManager.createNativeQuery(queryProvider.persistUnverifiedTransaction) - .setParameter("id", id) - .setParameter("privacySalt", privacySalt) - .setParameter("accountId", account) - .setParameter("createdAt", timestamp) - .setParameter("updatedAt", timestamp) - .setParameter("metadataHash", metadataHash) - .executeUpdate() - .logResult("transaction [$id]") + NamedParamStatement(persistUnverifiedTransactionSql, connection).use { stmt -> + stmt.setString("id", id) + stmt.setBytes("privacySalt", privacySalt) + stmt.setString("accountId", account) + stmt.setInstant("createdAt", timestamp) + stmt.setInstant("updatedAt", timestamp) + stmt.setString("metadataHash", metadataHash) + + stmt.executeUpdate().logResult("transaction [$id]") + } } override fun persistFilteredTransactions( - entityManager: EntityManager, + connection: Connection, filteredTransactions: List, timestamp: Instant, ) { - entityManager.connection { connection -> - batchPersistenceService.persistBatch( - connection, - queryProvider.persistFilteredTransaction, - filteredTransactions - ) { statement, parameterIndex, filteredTransaction -> - statement.setString(parameterIndex.next(), filteredTransaction.transactionId) - statement.setBytes(parameterIndex.next(), filteredTransaction.privacySalt) - statement.setString(parameterIndex.next(), filteredTransaction.account) - statement.setTimestamp(parameterIndex.next(), Timestamp.from(timestamp), utcCalendar) - statement.setTimestamp(parameterIndex.next(), Timestamp.from(timestamp), utcCalendar) - statement.setString(parameterIndex.next(), filteredTransaction.metadataHash) - } + batchPersistenceService.persistBatch( + connection, + queryProvider.persistFilteredTransaction, + filteredTransactions + ) { statement, parameterIndex, filteredTransaction -> + statement.setString(parameterIndex.next(), filteredTransaction.transactionId) + statement.setBytes(parameterIndex.next(), filteredTransaction.privacySalt) + statement.setString(parameterIndex.next(), filteredTransaction.account) + statement.setTimestamp(parameterIndex.next(), Timestamp.from(timestamp), utcCalendar) + statement.setTimestamp(parameterIndex.next(), Timestamp.from(timestamp), utcCalendar) + statement.setString(parameterIndex.next(), filteredTransaction.metadataHash) } } - override fun updateTransactionToVerified(entityManager: EntityManager, id: String, timestamp: Instant) { - entityManager.createNativeQuery(queryProvider.updateTransactionToVerified) - .setParameter("transactionId", id) - .setParameter("updatedAt", timestamp) - .executeUpdate() + override fun updateTransactionToVerified(connection: Connection, id: String, timestamp: Instant) { + NamedParamStatement(updateTransactionToVerifiedSql, connection).use { stmt -> + stmt.setString("transactionId", id) + stmt.setInstant("updatedAt", timestamp) + + stmt.executeUpdate() + } } override fun persistTransactionMetadata( - entityManager: EntityManager, + connection: Connection, hash: String, metadataBytes: ByteArray, groupParametersHash: String, - cpiFileChecksum: String + cpiFileChecksum: String, ) { - entityManager.createNativeQuery(queryProvider.persistTransactionMetadata) - .setParameter("hash", hash) - .setParameter("canonicalData", metadataBytes) - .setParameter("groupParametersHash", groupParametersHash) - .setParameter("cpiFileChecksum", cpiFileChecksum) - .executeUpdate() - .logResult("transaction metadata [$hash]") + NamedParamStatement(persistTransactionMetadataSql, connection).use { stmt -> + stmt.setString("hash", hash) + stmt.setBytes("canonicalData", metadataBytes) + stmt.setString("groupParametersHash", groupParametersHash) + stmt.setString("cpiFileChecksum", cpiFileChecksum) + stmt.executeUpdate().logResult("transaction metadata [$hash]") + } } override fun persistTransactionSources( - entityManager: EntityManager, + connection: Connection, transactionId: String, - transactionSources: List + transactionSources: List, ) { - entityManager.connection { connection -> - batchPersistenceService.persistBatch( - connection, - queryProvider.persistTransactionSources, - transactionSources - ) { statement, parameterIndex, transactionSource -> - statement.setString(parameterIndex.next(), transactionId) // new transaction id - statement.setInt(parameterIndex.next(), transactionSource.group.ordinal) // refs or inputs - statement.setInt(parameterIndex.next(), transactionSource.index) // index in refs or inputs - statement.setString(parameterIndex.next(), transactionSource.sourceTransactionId) // tx state came from - statement.setInt(parameterIndex.next(), transactionSource.sourceIndex) // index from tx it came from - } + batchPersistenceService.persistBatch( + connection, + queryProvider.persistTransactionSources, + transactionSources + ) { statement, parameterIndex, transactionSource -> + statement.setString(parameterIndex.next(), transactionId) // new transaction id + statement.setInt(parameterIndex.next(), transactionSource.group.ordinal) // refs or inputs + statement.setInt(parameterIndex.next(), transactionSource.index) // index in refs or inputs + statement.setString(parameterIndex.next(), transactionSource.sourceTransactionId) // tx state came from + statement.setInt(parameterIndex.next(), transactionSource.sourceIndex) // index from tx it came from } } override fun persistTransactionComponents( - entityManager: EntityManager, + connection: Connection, transactionId: String, components: List>, - hash: (ByteArray) -> String + hash: (ByteArray) -> String, ) { fun isMetadata(groupIndex: Int, leafIndex: Int) = groupIndex == 0 && leafIndex == 0 @@ -276,25 +326,23 @@ class UtxoRepositoryImpl( } } }.flatten() - entityManager.connection { connection -> - batchPersistenceService.persistBatch( - connection, - queryProvider.persistTransactionComponents, - flattenedComponentList - ) { statement, parameterIndex, component -> - statement.setString(parameterIndex.next(), transactionId) - statement.setInt(parameterIndex.next(), component.first) - statement.setInt(parameterIndex.next(), component.second) - statement.setBytes(parameterIndex.next(), component.third) - statement.setString(parameterIndex.next(), hash(component.third)) - } + batchPersistenceService.persistBatch( + connection, + queryProvider.persistTransactionComponents, + flattenedComponentList + ) { statement, parameterIndex, component -> + statement.setString(parameterIndex.next(), transactionId) + statement.setInt(parameterIndex.next(), component.first) + statement.setInt(parameterIndex.next(), component.second) + statement.setBytes(parameterIndex.next(), component.third) + statement.setString(parameterIndex.next(), hash(component.third)) } } override fun persistTransactionComponents( - entityManager: EntityManager, + connection: Connection, components: List, - hash: (ByteArray) -> String + hash: (ByteArray) -> String, ) { fun isMetadata(groupIndex: Int, leafIndex: Int) = groupIndex == 0 && leafIndex == 0 @@ -306,220 +354,215 @@ class UtxoRepositoryImpl( component } } - entityManager.connection { connection -> - batchPersistenceService.persistBatch( - connection, - queryProvider.persistTransactionComponents, - componentsWithMetadataRemoved - ) { statement, parameterIndex, component -> - statement.setString(parameterIndex.next(), component.transactionId) - statement.setInt(parameterIndex.next(), component.groupIndex) - statement.setInt(parameterIndex.next(), component.leafIndex) - statement.setBytes(parameterIndex.next(), component.leafData) - statement.setString(parameterIndex.next(), hash(component.leafData)) - } + batchPersistenceService.persistBatch( + connection, + queryProvider.persistTransactionComponents, + componentsWithMetadataRemoved + ) { statement, parameterIndex, component -> + statement.setString(parameterIndex.next(), component.transactionId) + statement.setInt(parameterIndex.next(), component.groupIndex) + statement.setInt(parameterIndex.next(), component.leafIndex) + statement.setBytes(parameterIndex.next(), component.leafData) + statement.setString(parameterIndex.next(), hash(component.leafData)) } } override fun persistVisibleTransactionOutputs( - entityManager: EntityManager, + connection: Connection, transactionId: String, timestamp: Instant, - visibleTransactionOutputs: List + visibleTransactionOutputs: List, ) { - entityManager.connection { connection -> - batchPersistenceService.persistBatch( - connection, - queryProvider.persistVisibleTransactionOutputs, - visibleTransactionOutputs - ) { statement, parameterIndex, visibleTransactionOutput -> - statement.setString(parameterIndex.next(), transactionId) - statement.setInt(parameterIndex.next(), UtxoComponentGroup.OUTPUTS.ordinal) - statement.setInt(parameterIndex.next(), visibleTransactionOutput.stateIndex) - statement.setString(parameterIndex.next(), visibleTransactionOutput.className) - statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.poolKey?.tokenType) - statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.poolKey?.issuerHash?.toString()) - statement.setString(parameterIndex.next(), visibleTransactionOutput.notaryName) - statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.poolKey?.symbol) - statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.filterFields?.tag) - statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.filterFields?.ownerHash?.toString()) - if (visibleTransactionOutput.token != null) { - statement.setBigDecimal(parameterIndex.next(), visibleTransactionOutput.token.amount) - } else { - statement.setNull(parameterIndex.next(), Types.NUMERIC) - } - - if (visibleTransactionOutput.token?.priority != null) { - statement.setLong(parameterIndex.next(), visibleTransactionOutput.token.priority!!) - } else { - statement.setNull(parameterIndex.next(), Types.BIGINT) - } + batchPersistenceService.persistBatch( + connection, + queryProvider.persistVisibleTransactionOutputs, + visibleTransactionOutputs + ) { statement, parameterIndex, visibleTransactionOutput -> + statement.setString(parameterIndex.next(), transactionId) + statement.setInt(parameterIndex.next(), UtxoComponentGroup.OUTPUTS.ordinal) + statement.setInt(parameterIndex.next(), visibleTransactionOutput.stateIndex) + statement.setString(parameterIndex.next(), visibleTransactionOutput.className) + statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.poolKey?.tokenType) + statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.poolKey?.issuerHash?.toString()) + statement.setString(parameterIndex.next(), visibleTransactionOutput.notaryName) + statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.poolKey?.symbol) + statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.filterFields?.tag) + statement.setString(parameterIndex.next(), visibleTransactionOutput.token?.filterFields?.ownerHash?.toString()) + if (visibleTransactionOutput.token != null) { + statement.setBigDecimal(parameterIndex.next(), visibleTransactionOutput.token.amount) + } else { + statement.setNull(parameterIndex.next(), Types.NUMERIC) + } - statement.setTimestamp(parameterIndex.next(), Timestamp.from(timestamp), utcCalendar) - statement.setNull(parameterIndex.next(), Types.TIMESTAMP) - statement.setString(parameterIndex.next(), visibleTransactionOutput.customRepresentation.json) + if (visibleTransactionOutput.token?.priority != null) { + statement.setLong(parameterIndex.next(), visibleTransactionOutput.token.priority!!) + } else { + statement.setNull(parameterIndex.next(), Types.BIGINT) } + + statement.setTimestamp(parameterIndex.next(), Timestamp.from(timestamp), utcCalendar) + statement.setNull(parameterIndex.next(), Types.TIMESTAMP) + statement.setString(parameterIndex.next(), visibleTransactionOutput.customRepresentation.json) } } override fun persistTransactionSignatures( - entityManager: EntityManager, + connection: Connection, signatures: List, - timestamp: Instant + timestamp: Instant, ) { - entityManager.connection { connection -> - batchPersistenceService.persistBatch( - connection, - queryProvider.persistTransactionSignatures, - signatures - ) { statement, parameterIndex, signature -> - statement.setString(parameterIndex.next(), signature.transactionId) - statement.setString(parameterIndex.next(), signature.publicKeyHash.toString()) - statement.setBytes(parameterIndex.next(), signature.signatureBytes) - statement.setTimestamp(parameterIndex.next(), Timestamp.from(timestamp), utcCalendar) - } + batchPersistenceService.persistBatch( + connection, + queryProvider.persistTransactionSignatures, + signatures + ) { statement, parameterIndex, signature -> + statement.setString(parameterIndex.next(), signature.transactionId) + statement.setString(parameterIndex.next(), signature.publicKeyHash.toString()) + statement.setBytes(parameterIndex.next(), signature.signatureBytes) + statement.setTimestamp(parameterIndex.next(), Timestamp.from(timestamp), utcCalendar) } } override fun updateTransactionStatus( - entityManager: EntityManager, + connection: Connection, transactionId: String, transactionStatus: TransactionStatus, - timestamp: Instant + timestamp: Instant, ) { - // Update status. Update ignored unless: UNVERIFIED -> * | VERIFIED -> VERIFIED | INVALID -> INVALID - val rowsUpdated = entityManager.createNativeQuery(queryProvider.updateTransactionStatus) - .setParameter("transactionId", transactionId) - .setParameter("newStatus", transactionStatus.value) - .setParameter("updatedAt", timestamp) - .executeUpdate() - - check(rowsUpdated == 1 || transactionStatus == TransactionStatus.UNVERIFIED) { - // VERIFIED -> INVALID or INVALID -> VERIFIED is a system error as verify should always be consistent and deterministic - "Existing status for transaction with ID $transactionId can't be updated to $transactionStatus" + NamedParamStatement(updateTransactionStatusSql, connection).use { stmt -> + stmt.setString("transactionId", transactionId) + stmt.setString("newStatus", transactionStatus.value) + stmt.setString("existingStatus", transactionStatus.value) + stmt.setInstant("updatedAt", timestamp) + + // Update status. Update ignored unless: UNVERIFIED -> * | VERIFIED -> VERIFIED | INVALID -> INVALID + val rowsUpdated = stmt.executeUpdate() + check(rowsUpdated == 1 || transactionStatus == TransactionStatus.UNVERIFIED) { + // VERIFIED -> INVALID or INVALID -> VERIFIED is a system error as verify should always be consistent and deterministic + "Existing status for transaction with ID $transactionId can't be updated to $transactionStatus" + } } } - override fun findSignedGroupParameters(entityManager: EntityManager, hash: String): SignedGroupParameters? { - return entityManager.createNativeQuery(queryProvider.findSignedGroupParameters, Tuple::class.java) - .setParameter("hash", hash) - .resultListAsTuples() - .map { r -> - SignedGroupParameters( - r.get(0) as ByteArray, - SignatureWithKey( - r.get(1) as ByteArray, - r.get(2) as ByteArray - ), - SignatureSpec((r.get(3) as String), null, null) - ) - } - .singleOrNull() + override fun findSignedGroupParameters(connection: Connection, hash: String): SignedGroupParameters? { + NamedParamStatement(findSignedGroupParametersSql, connection).use { stmt -> + stmt.setString("hash", hash) + + return stmt + .executeQueryAsList { r -> + SignedGroupParameters( + r.getBytes(1), + SignatureWithKey( + r.getBytes(2), + r.getBytes(3) + ), + SignatureSpec(r.getString(4), null, null) + ) + } + .singleOrNull() + } } override fun persistSignedGroupParameters( - entityManager: EntityManager, + connection: Connection, hash: String, signedGroupParameters: SignedGroupParameters, - timestamp: Instant + timestamp: Instant, ) { - entityManager.createNativeQuery(queryProvider.persistSignedGroupParameters) - .setParameter("hash", hash) - .setParameter("parameters", signedGroupParameters.groupParameters) - .setParameter("signature_public_key", signedGroupParameters.mgmSignature.publicKey) - .setParameter("signature_content", signedGroupParameters.mgmSignature.bytes) - .setParameter("signature_spec", signedGroupParameters.mgmSignatureSpec.signatureName) - .setParameter("createdAt", timestamp) - .executeUpdate() - .logResult("signed group parameters [$hash]") - } - - override fun persistMerkleProofs(entityManager: EntityManager, merkleProofs: List) { - entityManager.connection { connection -> - batchPersistenceService.persistBatch( - connection, - queryProvider.persistMerkleProofs, - merkleProofs - ) { statement, parameterIndex, merkleProof -> - statement.setString(parameterIndex.next(), merkleProof.merkleProofId) - statement.setString(parameterIndex.next(), merkleProof.transactionId) - statement.setInt(parameterIndex.next(), merkleProof.groupIndex) - statement.setInt(parameterIndex.next(), merkleProof.treeSize) - statement.setString(parameterIndex.next(), merkleProof.leafIndexes.joinToString(",")) - statement.setString(parameterIndex.next(), merkleProof.leafHashes.joinToString(",")) - } + NamedParamStatement(persistSignedGroupParametersSql, connection).use { stmt -> + stmt.setString("hash", hash) + stmt.setBytes("parameters", signedGroupParameters.groupParameters) + stmt.setBytes("signature_public_key", signedGroupParameters.mgmSignature.publicKey) + stmt.setBytes("signature_content", signedGroupParameters.mgmSignature.bytes) + stmt.setString("signature_spec", signedGroupParameters.mgmSignatureSpec.signatureName) + stmt.setInstant("createdAt", timestamp) + stmt.executeUpdate().logResult("signed group parameters [$hash]") } } - override fun persistMerkleProofLeaves(entityManager: EntityManager, leaves: List) { - entityManager.connection { connection -> - batchPersistenceService.persistBatch( - connection, - queryProvider.persistMerkleProofLeaves, - leaves - ) { statement, parameterIndex, leaf -> - statement.setString(parameterIndex.next(), leaf.merkleProofId) - statement.setInt(parameterIndex.next(), leaf.leafIndex) - } + override fun persistMerkleProofs(connection: Connection, merkleProofs: List) { + batchPersistenceService.persistBatch( + connection, + queryProvider.persistMerkleProofs, + merkleProofs + ) { statement, parameterIndex, merkleProof -> + statement.setString(parameterIndex.next(), merkleProof.merkleProofId) + statement.setString(parameterIndex.next(), merkleProof.transactionId) + statement.setInt(parameterIndex.next(), merkleProof.groupIndex) + statement.setInt(parameterIndex.next(), merkleProof.treeSize) + statement.setString(parameterIndex.next(), merkleProof.leafIndexes.joinToString(",")) + statement.setString(parameterIndex.next(), merkleProof.leafHashes.joinToString(",")) + } + } + + override fun persistMerkleProofLeaves(connection: Connection, leaves: List) { + batchPersistenceService.persistBatch( + connection, + queryProvider.persistMerkleProofLeaves, + leaves + ) { statement, parameterIndex, leaf -> + statement.setString(parameterIndex.next(), leaf.merkleProofId) + statement.setInt(parameterIndex.next(), leaf.leafIndex) } } override fun findMerkleProofs( - entityManager: EntityManager, - transactionIds: List + connection: Connection, + transactionIds: List, ): Map> { - return entityManager.createNativeQuery(queryProvider.findMerkleProofs, Tuple::class.java) - .setParameter("transactionIds", transactionIds) - .resultListAsTuples() - .groupBy { tuple -> - // We'll have multiple rows for the same Merkle proof if it revealed more than one leaf - // We group the rows by the Merkle proof ID to see which are the ones that belong together - tuple.get(0) as String // Merkle Proof ID - }.map { (_, rows) -> - // We can retrieve most of the properties from the first row because they will be the same for each row - val firstRow = rows.first() - - MerkleProofDto( - firstRow.get(1) as String, // Transaction ID - firstRow.get(2) as Int, // Group index - firstRow.get(3) as Int, // Tree size - - // We store the hashes as a comma separated string, so we need to split it and parse into SecureHash - // we filter out the blank ones just in case - (firstRow.get(5) as String).split(",") - .filter { it.isNotBlank() }.map { parseSecureHash(it) }, - - firstRow.get(6) as ByteArray, // Privacy salt - - // Each leaf will have its own row, so we need to go through each row that belongs to the Merkle proof - rows.mapNotNull { - // Map the leaf index to the data we fetched from the component table - val leafIndex = it.get(7) as? Int - val leafData = it.get(8) as? ByteArray - - if (leafIndex != null && leafData != null) { - leafIndex to leafData - } else { - null - } - }.toMap(), - // We store the leaf indexes as a comma separated string, so we need to split it - (firstRow.get(4) as String).split(",").map { it.toInt() }, - - ) - }.groupBy { - // Group by transaction ID - it.transactionId - } + NamedParamStatement(findMerkleProofsSql, connection).use { stmt -> + stmt.setStrings("transactionIds", transactionIds) + return stmt.executeQueryAsListOfColumns() + .groupBy { tuple -> + // We'll have multiple rows for the same Merkle proof if it revealed more than one leaf + // We group the rows by the Merkle proof ID to see which are the ones that belong together + tuple[0] as String // Merkle Proof ID + }.map { (_, rows) -> + // We can retrieve most of the properties from the first row because they will be the same for each row + val firstRow = rows.first() + + MerkleProofDto( + firstRow[1] as String, // Transaction ID + firstRow[2] as Int, // Group index + firstRow[3] as Int, // Tree size + + // We store the hashes as a comma separated string, so we need to split it and parse into SecureHash + // we filter out the blank ones just in case + (firstRow[5] as String).split(",") + .filter { it.isNotBlank() }.map { parseSecureHash(it) }, + + firstRow[6] as ByteArray, // Privacy salt + + // Each leaf will have its own row, so we need to go through each row that belongs to the Merkle proof + rows.mapNotNull { + // Map the leaf index to the data we fetched from the component table + val leafIndex = it[7] as? Int + val leafData = it[8] as? ByteArray + + if (leafIndex != null && leafData != null) { + leafIndex to leafData + } else { + null + } + }.toMap(), + // We store the leaf indexes as a comma separated string, so we need to split it + (firstRow[4] as String).split(",").map { it.toInt() }, + + ) + }.groupBy { + // Group by transaction ID + it.transactionId + } + } } override fun findFilteredTransactions( - entityManager: EntityManager, - ids: List + connection: Connection, + ids: List, ): Map { - val privacySaltAndMetadataMap = findTransactionsPrivacySaltAndMetadata(entityManager, ids) - val merkleProofs = findMerkleProofs(entityManager, ids) - val signaturesMap = ids.associateWith { findTransactionSignatures(entityManager, it) } + val privacySaltAndMetadataMap = findTransactionsPrivacySaltAndMetadata(connection, ids) + val merkleProofs = findMerkleProofs(connection, ids) + val signaturesMap = ids.associateWith { findTransactionSignatures(connection, it) } return ids.associateWith { transactionId -> @@ -556,69 +599,67 @@ class UtxoRepositoryImpl( // select from transaction sources where input states of "previous" transaction are seen as sourceTransactionIds + indexes override fun findConsumedTransactionSourcesForTransaction( - entityManager: EntityManager, + connection: Connection, transactionId: String, - indexes: List + indexes: List, ): List { - return entityManager.createNativeQuery(queryProvider.findConsumedTransactionSourcesForTransaction) - .setParameter("transactionId", transactionId) - .setParameter("inputStateIndexes", indexes) - .resultList - .map { it as Int } + NamedParamStatement(findConsumedTransactionSourcesForTransactionSql, connection).use { stmt -> + stmt.setString("transactionId", transactionId) + stmt.setInts("inputStateIndexes", indexes) + + return stmt.executeQueryAsList { + it.getInt(1) + } + } } override fun findTransactionsWithStatusCreatedBetweenTime( - entityManager: EntityManager, + connection: Connection, status: TransactionStatus, from: Instant, until: Instant, limit: Int, ): List { - @Suppress("UNCHECKED_CAST") - return entityManager.createNativeQuery(queryProvider.findTransactionsWithStatusCreatedBetweenTime) - .setParameter("status", status.value) - .setParameter("from", from) - .setParameter("until", until) - .setMaxResults(limit) - .resultList as List + NamedParamStatement(findTransactionsWithStatusCreatedBetweenTimeSql, connection).use { stmt -> + stmt.setString("status", status.value) + stmt.setInstant("from", from) + stmt.setInstant("until", until) + stmt.setInt("limit", limit) + + return stmt.executeQueryAsList { + it.getString(1) + } + } } override fun incrementTransactionRepairAttemptCount( - entityManager: EntityManager, - id: String + connection: Connection, + id: String, ) { - entityManager.createNativeQuery(queryProvider.incrementRepairAttemptCount) - .setParameter("transactionId", id) - .executeUpdate() + NamedParamStatement(incrementRepairAttemptCountSql, connection).use { stmt -> + stmt.setString("transactionId", id) + stmt.executeUpdate() + } } - override fun stateRefsExist(entityManager: EntityManager, stateRefs: List): List> { + override fun stateRefsExist(connection: Connection, stateRefs: List): List> { val results = mutableListOf>() if (stateRefs.isNotEmpty()) { - entityManager.connection { connection -> - connection.prepareStatement(queryProvider.stateRefsExist(stateRefs.size)).use { statement -> - val parameterIndex = generateSequence(1) { it + 1 }.iterator() - for (stateRef in stateRefs) { - statement.setString(parameterIndex.next(), stateRef.transactionId.toString()) - statement.setInt(parameterIndex.next(), stateRef.index) - } - val resultSet = statement.executeQuery() - while (resultSet.next()) { - results += resultSet.getString(1) to resultSet.getInt(2) - } + connection.prepareStatement(queryProvider.stateRefsExist(stateRefs.size)).use { statement -> + val parameterIndex = generateSequence(1) { it + 1 }.iterator() + for (stateRef in stateRefs) { + statement.setString(parameterIndex.next(), stateRef.transactionId.toString()) + statement.setInt(parameterIndex.next(), stateRef.index) + } + val resultSet = statement.executeQuery() + while (resultSet.next()) { + results += resultSet.getString(1) to resultSet.getInt(2) } } } return results } - private fun EntityManager.connection(block: (connection: Connection) -> T) { - val hibernateSession = unwrap(Session::class.java) - hibernateSession.doWork { connection -> - block(connection) - } - } - private fun Int.logResult(entity: String): Int { if (this == 0) { logger.debug { @@ -630,16 +671,4 @@ class UtxoRepositoryImpl( @Suppress("UNCHECKED_CAST") private fun Query.resultListAsTuples() = resultList as List - - private fun Query.mapToUtxoVisibleTransactionOutputDto(): List { - return resultListAsTuples() - .map { t -> - UtxoVisibleTransactionOutputDto( - t[0] as String, // transactionId - t[1] as Int, // leaf ID - t[2] as ByteArray, // outputs info data - t[3] as ByteArray // outputs data - ) - } - } } diff --git a/libs/ledger-lib-persistence/src/test/kotlin/net/corda/ledger/libs/persistence/util/NamedParamQueryTest.kt b/libs/ledger-lib-persistence/src/test/kotlin/net/corda/ledger/libs/persistence/util/NamedParamQueryTest.kt new file mode 100644 index 00000000000..2b219b62ff8 --- /dev/null +++ b/libs/ledger-lib-persistence/src/test/kotlin/net/corda/ledger/libs/persistence/util/NamedParamQueryTest.kt @@ -0,0 +1,77 @@ +package net.corda.ledger.libs.persistence.util + +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.SoftAssertions.assertSoftly +import org.junit.jupiter.api.Test + +class NamedParamQueryTest { + @Test + fun `when no placeholders return`() { + val sql = "SELECT * FROM foo" + + val query = NamedParamQuery.from(sql) + + assertThat(query.sql).isEqualTo(sql) + } + + @Test + fun `when multiple with space placeholders return`() { + val sql = """ + UPDATE foo + SET bar= :bar WHERE id = :id) + RETURNING * + """.trimIndent() + + val query = NamedParamQuery.from(sql) + + assertSoftly { + it.assertThat(query.sql).isEqualTo( + """ + UPDATE foo + SET bar= ? WHERE id = ?) + RETURNING * + """.trimIndent() + ) + it.assertThat(query.fields.size).isEqualTo(2) + it.assertThat(query.fields["bar"]).isEqualTo(1) + it.assertThat(query.fields["id"]).isEqualTo(2) + } + } + + @Test + fun `when multiple with comma placeholders return`() { + val sql = """ + INSERT INTO foo(id, bar) + VALUES (:id, :bar) + RETURNING * + """.trimIndent() + + val query = NamedParamQuery.from(sql) + + assertSoftly { + it.assertThat(query.sql).isEqualTo( + """ + INSERT INTO foo(id, bar) + VALUES (?, ?) + RETURNING * + """.trimIndent() + ) + it.assertThat(query.fields.size).isEqualTo(2) + it.assertThat(query.fields["id"]).isEqualTo(1) + it.assertThat(query.fields["bar"]).isEqualTo(2) + } + } + + @Test + fun `when multiple with placeholder last return`() { + val sql = "SELECT * FROM foo WHERE id = :id" + + val query = NamedParamQuery.from(sql) + + assertSoftly { + it.assertThat(query.sql).isEqualTo("SELECT * FROM foo WHERE id = ?") + it.assertThat(query.fields.size).isEqualTo(1) + it.assertThat(query.fields["id"]).isEqualTo(1) + } + } +} diff --git a/testing/ledger/ledger-hsqldb/src/main/kotlin/net/corda/testing/ledger/utxo/HsqldbUtxoQueryProvider.kt b/testing/ledger/ledger-hsqldb/src/main/kotlin/net/corda/testing/ledger/utxo/HsqldbUtxoQueryProvider.kt index 0644423dd5d..9bd51eb9723 100644 --- a/testing/ledger/ledger-hsqldb/src/main/kotlin/net/corda/testing/ledger/utxo/HsqldbUtxoQueryProvider.kt +++ b/testing/ledger/ledger-hsqldb/src/main/kotlin/net/corda/testing/ledger/utxo/HsqldbUtxoQueryProvider.kt @@ -21,9 +21,14 @@ class HsqldbUtxoQueryProvider @Activate constructor( LoggerFactory.getLogger(this::class.java).debug { "Activated for ${databaseTypeProvider.databaseType}" } } + override fun wrapInList(placeHolder: String): String { + // HSQL needs to UNNEST the array + return "IN (UNNEST($placeHolder))" + } + override val persistTransaction: String get() = """ - MERGE INTO {h-schema}utxo_transaction AS ut + MERGE INTO utxo_transaction AS ut USING (VALUES :id, CAST(:privacySalt AS VARBINARY(64)), :accountId, CAST(:createdAt AS TIMESTAMP), :status, CAST(:updatedAt AS TIMESTAMP), :metadataHash, FALSE) AS x(id, privacy_salt, account_id, created, status, updated, metadata_hash, is_filtered) ON x.id = ut.id @@ -36,7 +41,7 @@ class HsqldbUtxoQueryProvider @Activate constructor( override val persistUnverifiedTransaction: String get() = """ - MERGE INTO {h-schema}utxo_transaction AS ut + MERGE INTO utxo_transaction AS ut USING (VALUES :id, CAST(:privacySalt AS VARBINARY(64)), :accountId, CAST(:createdAt AS TIMESTAMP), '$UNVERIFIED', CAST(:updatedAt AS TIMESTAMP), :metadataHash, FALSE) AS x(id, privacy_salt, account_id, created, status, updated, metadata_hash, is_filtered) ON x.id = ut.id @@ -68,7 +73,7 @@ class HsqldbUtxoQueryProvider @Activate constructor( override val persistTransactionMetadata: String get() = """ - MERGE INTO {h-schema}utxo_transaction_metadata AS m + MERGE INTO utxo_transaction_metadata AS m USING (VALUES :hash, CAST(:canonicalData AS VARBINARY(1048576)), :groupParametersHash, :cpiFileChecksum) AS x(hash, canonical_data, group_parameters_hash, cpi_file_checksum) ON x.hash = m.hash @@ -150,7 +155,7 @@ class HsqldbUtxoQueryProvider @Activate constructor( override val persistSignedGroupParameters: String get() = """ - MERGE INTO {h-schema}utxo_group_parameters AS ugp + MERGE INTO utxo_group_parameters AS ugp USING (VALUES :hash, CAST(:parameters AS VARBINARY(1048576)), CAST(:signature_public_key AS VARBINARY(1048576)), CAST(:signature_content AS VARBINARY(1048576)), :signature_spec, CAST(:createdAt AS TIMESTAMP))