-
Notifications
You must be signed in to change notification settings - Fork 696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Inner suspended transaction doesn't roll back when outer transaction throws #1586
Comments
It's somewhere in the documentation. The reason is when creating a suspendedTransaction, it starts a transaction as a separate new one. So it really doesn't do anything if you start a txn inside another txn. It always will be a separate one. So in this case, the child's txn will still be complete while the parent gets rollback.
Edit: As @AlexeySoshin it's not an issue. Still, you can use JDBC with coroutines, but the downside is it blocks the thread till the SQL server returns the result. I did some digging and posted about that in the comments down below. You can have a look if you are interested. Sorry for the confusion. |
@pablo-silverback Could you try the following code, please, and tell me if this is the behavior you were expecting? newSuspendedTransaction(Dispatchers.Default, db) {
assertNoRows()
ExampleTable.add("outer")
suspendedTransaction {
ExampleTable.add("inner")
}
throw Exception("outer transaction throws")
} |
@AlexeySoshin thanks! that fixes the test case. The problem is that our code is not that simple and it looks more like this: newSuspendedTransaction(Dispatchers.Default, db) {
assertNoRows()
ExampleTable.add("outer")
insertInner()
throw Exception("outer transaction throws")
}
private fun insertInner() {
suspendedTransaction { // -> obviously doesn't compile
ExampleTable.add("inner")
}
} That's not an issue with |
@AlexeySoshin Also, if it's a known issue like @thamidu points out, I'm happy to work on a fix for it (with some pointers of where to look). We rely heavily on exposed and would love to help improve it 🤗 (instead of work around it) |
Thank you for the proposal, and contributions would be more than welcome, but I disagree with @thamidu that it's an issue at all. The reason the above code doesn't compile is that:
The following code compiles and works as expected: private suspend fun Transaction.insertInner() {
suspendedTransaction { // -> compiles
ExampleTable.add("inner")
}
} Hope that helps. |
@AlexeySoshin hi 👋 suspend fun A() {
newSuspendedTransaction(Dispatchers.Default, db) {
assertNoRows()
ExampleTable.add("outer")
insertInner()
}
}
private suspend fun Transaction.insertInner() {
suspendedTransaction {
ExampleTable.add("inner")
}
} And then let's say you have somewhere, in another class/module/package/etc: suspend fun B() {
newSuspendedTransaction(Dispatchers.Default, db) {
A()
... // more code
throw Exception("outer outer transaction throws")
}
} In this case I think there are ways of structuring the code to try to work-around this but wouldn't it be nice for Exposed to notice that if the transaction is inside another one it has to treat it as the extension case? |
@AlexeySoshin We have scenarios where My question then would be, what's the recommended way of dealing with this scenario (suspended tx calls that may or may not be nested)? |
I had this scenario and created an utility function which to solve it: suspend fun <T> inTransaction(
transaction: Transaction? = null,
context: CoroutineDispatcher = Dispatchers.IO,
db: Database? = null,
transactionIsolation: Int? = null,
statement: suspend (Transaction) -> T
): T = transaction
?.suspendedTransaction(
context = context,
statement = { statement(this) }
)
?: newSuspendedTransaction(
context = context,
db = db,
transactionIsolation = transactionIsolation,
statement = { statement(this) }
) |
@leoneparise thanks! using that definition of try {
inTransaction(context = Dispatchers.Default, db = db) {
assertNoRows()
ExampleTable.add("outer")
inTransaction(context = Dispatchers.Default, db = db) {
ExampleTable.add("inner")
}
throw Exception("outer transaction throws")
}
} catch (e: Exception) {
transaction(db) {
Assertions.assertEquals("inner", ExampleTable.getAll().single())
}
} I assume the second call it's expected to be Note this is not required with the ordinary |
I did some dig up on how those things works. So I recreated all of the above codes and compared them with the SQL query log of my MariaDB Server. Those are my findings. Specs:MariaDB - 10.2.11
Case 01 (Initial question)Code - suspend fun testNested(): Nothing {
newSuspendedTransaction {
Students.selectAll().count()
newSuspendedTransaction {
Students.update({Students.id eq 1}) { st -> st[Students.status] = 0 }
}
throw Exception()
}
} MariaDB Log -
Conclusion - As I mentioned in my previous comment, when you call (create) Case 2 (@AlexeySoshin Solution)Code - suspend fun testNested(): Nothing {
newSuspendedTransaction {
Students.selectAll().count()
suspendedTransaction {
Students.update({Students.id eq 1}) { st -> st[Students.status] = 0 }
}
throw Exception()
}
} MariaDB Log -
Conclusion - This is the way. When you declare a Case 3 (@leoneparise solution)This also output the same as Case 2, but with an exception. You need to pass the parent's txn to the child.
My implementationI also have this same scenario just like yours. In my project, I also have some functions which calls directly and inside another function. So here is my implementation which also fixes the issue you raised and does the same function as Case 2 and 3 suspend fun <T> susTxn(
db: Database? = null,
statement: suspend (Transaction) -> T
): T = withContext(Dispatchers.IO){
TransactionManager.currentOrNull()
?.let { it.suspendedTransaction { statement(this) } }
?: newSuspendedTransaction(db = db) { statement(this) }
} In this case, you don't have to pass parent's transaction or whats or ever. You can use your functions as a standalone or inside another transaction. Please note that this implementation has not been tested. So please, use it with caution. I am not sure about how it will behave under many requests concurrently. But currently, we use this method in our development builds (not released to production yet) to fetch/persist data. No issue so far. Test Case (For the solution above with parrarel requests)Code - // KTor Route
get("/test") {
testNested()
call.respond(HttpStatusCode.OK)
}
suspend fun testNested() = susTxn {
Students.selectAll().count()
innerTxn()
throw Exception("Exception occurred")
null
}
// Here, the innerTxn can be called separately too.
suspend fun innerTxn() = susTxn {
Students.update({Students.id eq 1}) { st -> st[Students.status] = 0 }
} # Will send 10 concurrent requests
xargs -I % -P 10 curl -X GET -v http://localhost:8080/test < <(printf '%s\n' {1..10}) MariaDB Log -
MariaDB Log (sorted by thread ID) -
Conclusion - As you can see, all of those 10 requests got processed and got rolledbacked. But I'm little bit skeptical about some threads (aka DB Connections) executed multiple transactions. Im not sure, whether Hikari reused those connections once a transaction is processed / or queries got mixed up. But my bet is Hikari reused those connections. Hope this helps you to get a basic understanding and solves your issue. Please let me know any corrections to be made. |
That's a great analysis, @thamidu, thanks for that! |
@thamidu thanks for the deep analysis, unfortunately that approach relies on ThreadLocal, which is not AFAIK safe between kotlin coroutines. For something like this to be safe I believe exposed should take into consideration something like the context pattern (also used in Go's goroutines). //cc @AlexeySoshin |
Hi! Could anyone clarify, why nested suspending transactions code (as recommended by @AlexeySoshin) newSuspendedTransaction {
exec("SELECT 1")
suspendedTransaction {
exec("SELECT 2")
}
} has different behavior in comparison with regular transactions? transaction {
exec("SELECT 1")
transaction {
exec("SELECT 2")
}
} With suspended transactions only one transaction is actually created and reused for both statements. But with regular transactions two transactions are created, one having another as outerTransaction. |
@pablo-silverback Kotlin coroutines support context. You can see some examples there: |
I wanted to have a similar behavior using regular suspendedTransaction {
suspendedTransaction {
insert(1)
}
suspendedTransaction {
insert(2)
}
throw Error("Both should roll back")
} rolls back both My actual use code are composition of use cases so if a later one fails it also rolls back the changes of previous ones. This is the utility function I am using using the public APIs, it seems to work well with the clear limitation that there is still only one transaction at the root, so changing the DB or the isolation level is not possible currently. import kotlinx.coroutines.Dispatchers
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.exposedLogger
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync
import org.jetbrains.exposed.sql.transactions.experimental.withSuspendTransaction
import kotlin.coroutines.CoroutineContext
suspend fun <T> suspendedTransaction(
context: CoroutineContext? = null,
db: Database? = null,
transactionIsolation: Int? = null,
statement: suspend Transaction.() -> T
) {
val existing = TransactionManager.currentOrNull()
if (existing == null) {
// Create a new transaction, it will propagate the actual tx reference
// using coroutine ThreadContextElements into the TransactionManager if this method is nested
suspendedTransactionAsync(
context = context,
db = db,
transactionIsolation = transactionIsolation,
statement = statement
).await()
} else {
if (transactionIsolation != null || db != null) {
exposedLogger.warn(
"transactionIsolation=${transactionIsolation}, db=${db} " +
"was provided but will " +
"be ignored since this transaction was propagated."
)
}
existing.withSuspendTransaction(context = context, statement = statement)
}
} |
@pablo-silverback regarding this statement:
I am not sure you are entirely correct, "TransactionManager.currentOrNull()" relies on thread local, but Expose synchronizes the ThreadLocal state (take a look at Line 36 in 123bba9
|
In a context of suspended parent/child transactions, when there's an error after the child block ended the transaction doesn't roll back.
Pseudocode:
In this scenario,
B
is inserted on the database (transaction doesn't roll back).Note that replacing
newSuspendedTransaction
withtransaction
(not suspended) works as expected.Here's a complete working test that illustrates the issue:
The text was updated successfully, but these errors were encountered: