-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTestErrorOnNextIterator.kt
98 lines (83 loc) · 3.77 KB
/
TestErrorOnNextIterator.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import com.logan.accumulo.example.ErrorOnNextIterator
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.accumulo.core.client.BatchWriterConfig
import org.apache.accumulo.core.client.IteratorSetting
import org.apache.accumulo.core.client.admin.CompactionConfig
import org.apache.accumulo.core.data.Mutation
import org.apache.accumulo.core.data.Value
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.fail
import org.slf4j.LoggerFactory
class TestErrorOnNextIterator {
private val logger = LoggerFactory.getLogger(TestErrorOnNextIterator::class.java)
@Test
fun testIteratorFailures() {
MacUtils.startCluster()
val tableName = "error_on_next_table"
val conn = MacUtils.connector
val fateHelper = FateHelper(conn)
logger.info("Creating $tableName")
val tableOps = conn.tableOperations()
tableOps.create(tableName)
logger.info("Writing data to $tableName...")
val m = Mutation("row1")
m.put("cf", "cq", Value("value1"))
val writerConfig = BatchWriterConfig().setMaxMemory(1_000_000).setMaxLatency(1_000, TimeUnit.MILLISECONDS).setMaxWriteThreads(10)
val writer = conn.createBatchWriter(tableName, writerConfig)
writer.addMutation(m)
writer.flush()
val compactConfig = CompactionConfig().apply {
startRow = null
endRow = null
flush = true
wait = false
}
logger.info("Running a normal compaction on $tableName")
tableOps.compact(tableName, compactConfig)
if (!fateHelper.waitForAllTransactionsToComplete().first) {
fail("Initial compaction on $tableName never completed")
}
logger.info("Compaction completed successfully as expected")
// Add The ErrorOnNextIterator to the table.
logger.info("Attaching the ErrorOnNextIterator to $tableName")
val nextIteratorSetting = IteratorSetting(10, ErrorOnNextIterator::class.java)
tableOps.attachIterator(tableName, nextIteratorSetting)
if (!fateHelper.waitForAllTransactionsToComplete().first) {
fail("Failed to attached iterator to $tableName")
}
logger.info("Iterator is attached!")
logger.info("Running the compaction again on $tableName")
tableOps.compact(tableName, compactConfig)
val (allCompleted, completedTxIds, runningTxIds) = fateHelper.waitForAllTransactionsToComplete()
if (allCompleted) {
fail("Somehow the compaction with the iterator that errors completed.")
}
assertEquals(0, completedTxIds.size)
assertEquals(1, runningTxIds.size)
logger.info("Compaction appears to be in crash loop txid: ${runningTxIds.first()}")
// Optional, if you want to see what's happening in zookeeper at this point
pause()
logger.info("Canceling compactions on $tableName")
tableOps.cancelCompaction(tableName)
val (compactionCompleted, _, _) = fateHelper.waitForTransactionsToComplete(runningTxIds)
assertTrue(compactionCompleted, "Compaction was not cleaned up after cancel")
// Optional, if you want to see what's happening in zookeeper at thsi point
pause()
MacUtils.stopCluster()
}
private fun pause(numMinutes: Int = 5) {
logger.info("Pausing for $numMinutes minutes. Have a look in ZooKeeper")
runBlocking {
launch {
val numSeconds = numMinutes * 60
val numMillis = (numSeconds * 1000).toLong()
delay(numMillis)
}
}
}
}