diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 103d8a7cf7ea..f18aa3434b5f 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -526,6 +526,8 @@ public static class SSTableConfig public volatile int tombstone_warn_threshold = 1000; public volatile int tombstone_failure_threshold = 100000; + public TombstonesMetricGranularity purgeable_tobmstones_metric_granularity = TombstonesMetricGranularity.row; + public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions(); @Replaces(oldName = "index_summary_capacity_in_mb", converter = Converters.MEBIBYTES_DATA_STORAGE_LONG, deprecated = true) @@ -1312,6 +1314,25 @@ public enum BatchlogEndpointStrategy } } + public enum TombstonesMetricGranularity + { + /** + * do not collect the metric at all + */ + disabled, + /** + * track only partition/range/row level tombstone, + * a good compromise between overheads and usability + */ + row, + /** + * track partition/range/row/cell level tombstones, + * the most granular option, but it has some performance overheads + * due to iteration over cells + */ + cell + } + private static final Set SENSITIVE_KEYS = new HashSet() {{ add("client_encryption_options"); add("server_encryption_options"); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index bf44b53e2b0d..33142c581ef6 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -5194,4 +5194,15 @@ public static void setRejectOutOfTokenRangeRequests(boolean enabled) { conf.reject_out_of_token_range_requests = enabled; } + + public static Config.TombstonesMetricGranularity getPurgeableTobmstonesMetricGranularity() + { + return conf.purgeable_tobmstones_metric_granularity; + } + + @VisibleForTesting + public static void setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetricGranularity granularity) + { + conf.purgeable_tobmstones_metric_granularity = granularity; + } } diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index a61d0a037ef7..6bccc32b08a8 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -455,6 +455,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut iterator = withQuerySizeTracking(iterator); iterator = maybeSlowDownForTesting(iterator); iterator = withQueryCancellation(iterator); + if (DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() != Config.TombstonesMetricGranularity.disabled) + iterator = withPurgeableTombstonesMetricRecording(iterator, cfs); iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false); iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); @@ -867,6 +869,134 @@ protected LongPredicate getPurgeEvaluator() return Transformation.apply(iterator, new WithoutPurgeableTombstones()); } + + /** + * Wraps the provided iterator so that metrics on count of purgeable tombstones are tracked and traced. + * It tracks only tombstones with localDeletionTime < now - gc_grace_period. + * Other (non-purgeable) tombstones will be tracked by regular Cassandra logic later. + */ + private UnfilteredPartitionIterator withPurgeableTombstonesMetricRecording(UnfilteredPartitionIterator iter, + ColumnFamilyStore cfs) + { + class PurgeableTombstonesMetricRecording extends Transformation + { + private int purgeableTombstones = 0; + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) + { + if (!iter.partitionLevelDeletion().isLive()) + purgeableTombstones++; + return Transformation.apply(iter, this); + } + + @Override + public Row applyToStatic(Row row) + { + return applyToRow(row); + } + + @Override + public Row applyToRow(Row row) + { + final long nowInSec = nowInSec(); + boolean hasTombstones = false; + + if (DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() == Config.TombstonesMetricGranularity.cell) + for (Cell cell : row.cells()) + { + if (!cell.isLive(nowInSec) && isPurgeable(cell.localDeletionTime(), nowInSec)) + { + purgeableTombstones++; + hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired + } + } + + // we replicate the logic is used for non-purged tombstones metric here + if (!row.primaryKeyLivenessInfo().isLive(nowInSec) + && row.hasDeletion(nowInSec) + && isPurgeable(row.deletion().time(), nowInSec) + && !hasTombstones) + { + // We're counting primary key deletions only here. + purgeableTombstones++; + } + + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + final long nowInSec = nowInSec(); + + // for boundary markers - increment metric only if both - close and open - markers are purgeable + if (marker.isBoundary()) + { + countIfBothPurgeable(marker.closeDeletionTime(false), + marker.openDeletionTime(false), + nowInSec); + } + // for bound markers - just increment if it is purgeable + else + { + countIfPurgeable(((RangeTombstoneBoundMarker) marker).deletionTime(), nowInSec); + } + + return marker; + } + + @Override + public void onClose() + { + cfs.metric.purgeableTombstoneScannedHistogram.update(purgeableTombstones); + if (purgeableTombstones > 0) + Tracing.trace("Read {} purgeable tombstone cells", purgeableTombstones); + } + + /** + * Increments if both - close and open - deletion times less than (now - gc_grace_period) + */ + private void countIfBothPurgeable(DeletionTime closeDeletionTime, + DeletionTime openDeletionTime, + long nowInSec) + { + if (isPurgeable(closeDeletionTime, nowInSec) && isPurgeable(openDeletionTime, nowInSec)) + purgeableTombstones++; + } + + /** + * Increments if deletion time less than (now - gc_grace_period) + */ + private void countIfPurgeable(DeletionTime deletionTime, + long nowInSec) + { + if (isPurgeable(deletionTime, nowInSec)) + purgeableTombstones++; + } + + /** + * Checks that deletion time < now - gc_grace_period + */ + private boolean isPurgeable(DeletionTime deletionTime, + long nowInSec) + { + return isPurgeable(deletionTime.localDeletionTime(), nowInSec); + } + + /** + * Checks that deletion time < now - gc_grace_period + */ + private boolean isPurgeable(long localDeletionTime, + long nowInSec) + { + return localDeletionTime < cfs.gcBefore(nowInSec); + } + } + + return Transformation.apply(iter, new PurgeableTombstonesMetricRecording()); + } + /** * Return the queried token(s) for logging */ diff --git a/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java b/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java index 5528c92011cc..e1defe51e69e 100644 --- a/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java +++ b/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java @@ -75,6 +75,7 @@ public static Collection getAll(String name) new LatencyTableMetric(name, "local_write_latency", t -> t.writeLatency.latency), new LatencyTableMetric(name, "coordinator_write_latency", t -> t.coordinatorWriteLatency), new HistogramTableMetric(name, "tombstones_per_read", t -> t.tombstoneScannedHistogram.cf), + new HistogramTableMetric(name, "purgeable_tombstones_per_read", t -> t.purgeableTombstoneScannedHistogram.cf), new HistogramTableMetric(name, "rows_per_read", t -> t.liveScannedHistogram.cf), new StorageTableMetric(name, "disk_usage", (TableMetrics t) -> t.totalDiskSpaceUsed), new StorageTableMetric(name, "max_partition_size", (TableMetrics t) -> t.maxPartitionSize), diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index ae15bbf95a04..984b4aa70496 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -86,6 +86,8 @@ public class KeyspaceMetrics public final Histogram sstablesPerRangeReadHistogram; /** Tombstones scanned in queries on this Keyspace */ public final Histogram tombstoneScannedHistogram; + /** Purgeable tombstones scanned in queries on this Keyspace */ + public final Histogram purgeableTombstoneScannedHistogram; /** Live cells scanned in queries on this Keyspace */ public final Histogram liveScannedHistogram; /** Column update time delta on this Keyspace */ @@ -238,6 +240,7 @@ public KeyspaceMetrics(final Keyspace ks) sstablesPerReadHistogram = createKeyspaceHistogram("SSTablesPerReadHistogram", true); sstablesPerRangeReadHistogram = createKeyspaceHistogram("SSTablesPerRangeReadHistogram", true); tombstoneScannedHistogram = createKeyspaceHistogram("TombstoneScannedHistogram", false); + purgeableTombstoneScannedHistogram = createKeyspaceHistogram("PurgeableTombstoneScannedHistogram", false); liveScannedHistogram = createKeyspaceHistogram("LiveScannedHistogram", false); colUpdateTimeDeltaHistogram = createKeyspaceHistogram("ColUpdateTimeDeltaHistogram", false); viewLockAcquireTime = createKeyspaceTimer("ViewLockAcquireTime"); diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index b2a8e61fe4ca..87abb7a8dd55 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -151,6 +151,8 @@ public class TableMetrics public final Gauge compressionMetadataOffHeapMemoryUsed; /** Tombstones scanned in queries on this CF */ public final TableHistogram tombstoneScannedHistogram; + /** Purgeable tombstones scanned in queries on this CF */ + public final TableHistogram purgeableTombstoneScannedHistogram; /** Live rows scanned in queries on this CF */ public final TableHistogram liveScannedHistogram; /** Column update time delta on this CF */ @@ -771,6 +773,7 @@ public Long getValue() additionalWriteLatencyNanos = createTableGauge("AdditionalWriteLatencyNanos", () -> MICROSECONDS.toNanos(cfs.additionalWriteLatencyMicros)); tombstoneScannedHistogram = createTableHistogram("TombstoneScannedHistogram", cfs.keyspace.metric.tombstoneScannedHistogram, false); + purgeableTombstoneScannedHistogram = createTableHistogram("PurgeableTombstoneScannedHistogram", cfs.keyspace.metric.purgeableTombstoneScannedHistogram, true); liveScannedHistogram = createTableHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram, false); colUpdateTimeDeltaHistogram = createTableHistogram("ColUpdateTimeDeltaHistogram", cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false); coordinatorReadLatency = createTableTimer("CoordinatorReadLatency"); diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 3701ae419339..3b5ecbfa9830 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.ColumnFilter; @@ -93,6 +94,9 @@ public class ReadCommandTest { + private static final String CREATE = "1"; + private static final String DELETE = "-1"; + private static final String KEYSPACE = "ReadCommandTest"; private static final String CF1 = "Standard1"; private static final String CF2 = "Standard2"; @@ -103,6 +107,12 @@ public class ReadCommandTest private static final String CF7 = "Counter7"; private static final String CF8 = "Standard8"; private static final String CF9 = "Standard9"; + private static final String CF10 = "Standard10"; + private static final String CF11 = "Standard11"; + private static final String CF12 = "Standard12"; + private static final String CF13 = "Standard13"; + private static final String CF14 = "Standard14"; + private static final InetAddressAndPort REPAIR_COORDINATOR; static { @@ -194,6 +204,61 @@ public static void defineSchema() throws ConfigurationException .addClusteringColumn("col", ReversedType.getInstance(Int32Type.instance)) .addRegularColumn("a", AsciiType.instance); + TableMetadata.Builder metadata10 = + TableMetadata.builder(KEYSPACE, CF10) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + + TableMetadata.Builder metadata11 = + TableMetadata.builder(KEYSPACE, CF11) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + + TableMetadata.Builder metadata12 = + TableMetadata.builder(KEYSPACE, CF12) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + + TableMetadata.Builder metadata13 = + TableMetadata.builder(KEYSPACE, CF13) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + + TableMetadata.Builder metadata14 = + TableMetadata.builder(KEYSPACE, CF14) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), @@ -205,7 +270,12 @@ public static void defineSchema() throws ConfigurationException metadata6, metadata7, metadata8, - metadata9); + metadata9, + metadata10, + metadata11, + metadata12, + metadata13, + metadata14); LocalSessionAccessor.startup(); } @@ -332,23 +402,23 @@ public void testSinglePartitionGroupMerge() throws Exception String[][][] groups = new String[][][] { new String[][] { - new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the row - new String[] { "1", "key2", "bb", "b" }, - new String[] { "1", "key3", "cc", "c" } + new String[] { CREATE, "key1", "aa", "a" }, + new String[] { CREATE, "key2", "bb", "b" }, + new String[] { CREATE, "key3", "cc", "c" } }, new String[][] { - new String[] { "1", "key3", "dd", "d" }, - new String[] { "1", "key2", "ee", "e" }, - new String[] { "1", "key1", "ff", "f" } + new String[] { CREATE, "key3", "dd", "d" }, + new String[] { CREATE, "key2", "ee", "e" }, + new String[] { CREATE, "key1", "ff", "f" } }, new String[][] { - new String[] { "1", "key6", "aa", "a" }, - new String[] { "1", "key5", "bb", "b" }, - new String[] { "1", "key4", "cc", "c" } + new String[] { CREATE, "key6", "aa", "a" }, + new String[] { CREATE, "key5", "bb", "b" }, + new String[] { CREATE, "key4", "cc", "c" } }, new String[][] { - new String[] { "-1", "key6", "aa", "a" }, - new String[] { "-1", "key2", "bb", "b" } + new String[] { DELETE, "key6", "aa", "a" }, + new String[] { DELETE, "key2", "bb", "b" } } }; @@ -371,7 +441,7 @@ public void testSinglePartitionGroupMerge() throws Exception for (String[] data : group) { - if (data[0].equals("1")) + if (data[0].equals(CREATE)) { new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1])) .clustering(data[2]) @@ -493,33 +563,32 @@ public void testCountDeletedRows() throws Exception String[][][] groups = new String[][][] { new String[][] { - new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the - // row - new String[] { "1", "key2", "bb", "b" }, - new String[] { "1", "key3", "cc", "c" } + new String[] { CREATE, "key1", "aa", "a" }, + new String[] { CREATE, "key2", "bb", "b" }, + new String[] { CREATE, "key3", "cc", "c" } }, new String[][] { - new String[] { "1", "key3", "dd", "d" }, - new String[] { "1", "key2", "ee", "e" }, - new String[] { "1", "key1", "ff", "f" } + new String[] { CREATE, "key3", "dd", "d" }, + new String[] { CREATE, "key2", "ee", "e" }, + new String[] { CREATE, "key1", "ff", "f" } }, new String[][] { - new String[] { "1", "key6", "aa", "a" }, - new String[] { "1", "key5", "bb", "b" }, - new String[] { "1", "key4", "cc", "c" } + new String[] { CREATE, "key6", "aa", "a" }, + new String[] { CREATE, "key5", "bb", "b" }, + new String[] { CREATE, "key4", "cc", "c" } }, new String[][] { - new String[] { "1", "key2", "aa", "a" }, - new String[] { "1", "key2", "cc", "c" }, - new String[] { "1", "key2", "dd", "d" } + new String[] { CREATE, "key2", "aa", "a" }, + new String[] { CREATE, "key2", "cc", "c" }, + new String[] { CREATE, "key2", "dd", "d" } }, new String[][] { - new String[] { "-1", "key6", "aa", "a" }, - new String[] { "-1", "key2", "bb", "b" }, - new String[] { "-1", "key2", "ee", "e" }, - new String[] { "-1", "key2", "aa", "a" }, - new String[] { "-1", "key2", "cc", "c" }, - new String[] { "-1", "key2", "dd", "d" } + new String[] { DELETE, "key6", "aa", "a" }, + new String[] { DELETE, "key2", "bb", "b" }, + new String[] { DELETE, "key2", "ee", "e" }, + new String[] { DELETE, "key2", "aa", "a" }, + new String[] { DELETE, "key2", "cc", "c" }, + new String[] { DELETE, "key2", "dd", "d" } } }; @@ -539,7 +608,7 @@ public void testCountDeletedRows() throws Exception for (String[] data : group) { - if (data[0].equals("1")) + if (data[0].equals(CREATE)) { new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1])) .clustering(data[2]) @@ -582,20 +651,19 @@ public void testCountWithNoDeletedRow() throws Exception String[][][] groups = new String[][][] { new String[][] { - new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the - // row - new String[] { "1", "key2", "bb", "b" }, - new String[] { "1", "key3", "cc", "c" } + new String[] { CREATE, "key1", "aa", "a" }, + new String[] { CREATE, "key2", "bb", "b" }, + new String[] { CREATE, "key3", "cc", "c" } }, new String[][] { - new String[] { "1", "key3", "dd", "d" }, - new String[] { "1", "key2", "ee", "e" }, - new String[] { "1", "key1", "ff", "f" } + new String[] { CREATE, "key3", "dd", "d" }, + new String[] { CREATE, "key2", "ee", "e" }, + new String[] { CREATE, "key1", "ff", "f" } }, new String[][] { - new String[] { "1", "key6", "aa", "a" }, - new String[] { "1", "key5", "bb", "b" }, - new String[] { "1", "key4", "cc", "c" } + new String[] { CREATE, "key6", "aa", "a" }, + new String[] { CREATE, "key5", "bb", "b" }, + new String[] { CREATE, "key4", "cc", "c" } } }; @@ -615,7 +683,7 @@ public void testCountWithNoDeletedRow() throws Exception for (String[] data : group) { - if (data[0].equals("1")) + if (data[0].equals(CREATE)) { new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1])) .clustering(data[2]) @@ -651,6 +719,319 @@ public void testCountWithNoDeletedRow() throws Exception assertEquals(1, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); } + @Test + public void testCountPurgeableRowTombstones() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF10); + TestWriteOperation[] operations = new TestWriteOperation[] + { + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), + + TestWriteOperation.insert("key2", "aa", "e"), + TestWriteOperation.deleteRow("key2", "aa", PURGEABLE_DELETION), + TestWriteOperation.deleteRow("key2", "bb", NEW_DELETION), + TestWriteOperation.deleteRow("key2", "cc", PURGEABLE_DELETION), + TestWriteOperation.deleteRow("key2", "dd", PURGEABLE_DELETION), + TestWriteOperation.deleteRow("key2", "ee", NEW_DELETION), + }; + + runTestWriteOperationsAndReadResults(cfs, operations); + + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount()); + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(3, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + @Test + public void testCountPurgeablePartitionTombstones() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF11); + TestWriteOperation[] operations = new TestWriteOperation[] + { + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), + + TestWriteOperation.insert("key2", "aa", "a"), + TestWriteOperation.insert("key2", "cc", "c"), + TestWriteOperation.insert("key2", "dd", "d"), + + TestWriteOperation.deletePartition("key2", PURGEABLE_DELETION), + TestWriteOperation.deletePartition("key3", NEW_DELETION) + }; + runTestWriteOperationsAndReadResults(cfs, operations); + + assertEquals(3, cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount()); + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(1, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + + @Test + public void testCountPurgeableCellTombstones() throws Exception + { + Config.TombstonesMetricGranularity original = DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity(); + try + { + DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetricGranularity.cell); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF12); + TestWriteOperation[] operations = new TestWriteOperation[] + { + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), + + TestWriteOperation.insert("key2", "aa", "a"), + TestWriteOperation.deleteCell("key2", "aa", "b", PURGEABLE_DELETION), + TestWriteOperation.deleteCell("key2", "aa", "f", NEW_DELETION), + TestWriteOperation.insert("key2", "cc", "c"), + TestWriteOperation.insert("key2", "dd", "d") + }; + runTestWriteOperationsAndReadResults(cfs, operations); + + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount()); + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(1, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + finally + { + DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(original); + } + } + + /** + * Test purgeable tombstones count for range tombstones with non-overlapping ranges, + * i.e. only Bound (not Boundary) Markers will be created and counted + */ + @Test + public void testCountPurgeableRangeTombstones_nonOverlappingRanges() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF13); + TestWriteOperation[] operations = new TestWriteOperation[] + { + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), + + TestWriteOperation.insert("key2", "aa", "a"), + TestWriteOperation.insert("key2", "cc", "c"), + TestWriteOperation.insert("key2", "dd", "d"), + + TestWriteOperation.deleteRange("key2", "aa", "bb", NEW_DELETION), + TestWriteOperation.deleteRange("key2", "dd", "ee", PURGEABLE_DELETION), + TestWriteOperation.deleteRange("key2", "ff", "ff", PURGEABLE_DELETION) + }; + runTestWriteOperationsAndReadResults(cfs, operations); + + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount()); + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(4, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + + /** + * Test purgeable tombstones count for range tombstones with overlapping ranges + */ + @Test + public void testCountPurgeableRangeTombstones_overlappingRanges() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF14); + + TestWriteOperation[] operations = new TestWriteOperation[] + { + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), + + TestWriteOperation.insert("key2", "aa", "a"), + TestWriteOperation.insert("key2", "bb", "b"), + TestWriteOperation.insert("key2", "cc", "c"), + TestWriteOperation.insert("key2", "dd", "d"), + TestWriteOperation.insert("key2", "ee", "e"), + + // this range tombstone is non-purgeable and overlaps with the next one, + // so it will create one non-purgeable bound marker + // and one non-purgeable boundary marker so TWO NON-PURGEABLE tombstones + TestWriteOperation.deleteRange("key2", "aa", "bb", NEW_DELETION), + + // this range tombstone is purgeable and overlaps with previous and next ones, + // so it will create one non-purgeable bound marker + // and one non-purgeable boundary marker so TWO non-purgeable tombstones will be counted + TestWriteOperation.deleteRange("key2", "bb", "ee", PURGEABLE_DELETION), + + // this range tombstone is purgeable and overlaps with previous one, + // it has a different deletion time to not combine into a single range, + // so it will create one non-purgeable boundary marker (same as previous one) + // and one purgeable bound marker, so it will increment purgeable tombstones counter on one, + // we expect TWO purgeable tombstones in total + TestWriteOperation.deleteRange("key2", "ee", "ff", PURGEABLE_DELETION - 1) + }; + + runTestWriteOperationsAndReadResults(cfs, operations); + + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + + + private static void runTestWriteOperationsAndReadResults(ColumnFamilyStore cfs, TestWriteOperation[] operations) throws IOException + { + try + { + Set usedPartitionKeys = runWriteOperations(cfs, operations); + runPartitionReadCommands(cfs, usedPartitionKeys); + } + finally + { + cfs.truncateBlocking(); + } + } + + private static void runPartitionReadCommands(ColumnFamilyStore cfs, Set partitionKeys) throws IOException + { + List commands = new ArrayList<>(partitionKeys.size()); + for (String partitionKey : partitionKeys) + { + commands.add(getWholePartitionReadCommand(cfs, partitionKey)); + } + executeReadCommands(commands); + } + + private static Set runWriteOperations(ColumnFamilyStore cfs, TestWriteOperation[] operations) + { + Set usedPartitionKeys = new HashSet<>(); + for (TestWriteOperation operation : operations) + { + if (operation.type == OperationType.CREATE) + { + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(operation.partitionKey)) + .clustering(operation.clusteringKey) + .add(operation.columnName, ByteBufferUtil.bytes(operation.columnValue)) + .build() + .apply(); + } + else if (operation.type == OperationType.DELETE_PARTITION) + { + new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), ByteBufferUtil.bytes(operation.partitionKey)) + .nowInSec(operation.deletionTime) + .delete() + .build()).apply(); + } + else if (operation.type == OperationType.DELETE_RANGE) + { + new RowUpdateBuilder(cfs.metadata(), operation.deletionTime, 0L, ByteBufferUtil.bytes(operation.partitionKey)) + .addRangeTombstone(operation.clusteringRangeStart, operation.clusteringRangeEnd).build().apply(); + } + else if (operation.type == OperationType.DELETE_ROW) + { + RowUpdateBuilder.deleteRowAt(cfs.metadata(), 0, operation.deletionTime, + ByteBufferUtil.bytes(operation.partitionKey), operation.clusteringKey + ).apply(); + } + else if (operation.type == OperationType.DELETE_CELL) + { + new RowUpdateBuilder(cfs.metadata(), operation.deletionTime, 0L, ByteBufferUtil.bytes(operation.partitionKey)) + .clustering(operation.clusteringKey) + .delete(operation.columnName) + .build().apply(); + } + + usedPartitionKeys.add(operation.partitionKey); + } + return usedPartitionKeys; + } + + private static final long NEW_DELETION = FBUtilities.nowInSeconds(); + private static final long PURGEABLE_DELETION = 42; + + private enum OperationType + { + CREATE, + DELETE_PARTITION, + DELETE_RANGE, + DELETE_ROW, + DELETE_CELL + } + + private static class TestWriteOperation + { + OperationType type; + String partitionKey; + String clusteringKey; + + String clusteringRangeStart, clusteringRangeEnd; + String columnName; + String columnValue = "bla"; + + long deletionTime; + + public TestWriteOperation(OperationType type, String partitionKey, + String clusteringKey, String clusteringRangeStart, String clusteringRangeEnd, + String columnName, long deletionTime) + { + this.type = type; + this.partitionKey = partitionKey; + this.clusteringKey = clusteringKey; + this.clusteringRangeStart = clusteringRangeStart; + this.clusteringRangeEnd = clusteringRangeEnd; + this.columnName = columnName; + this.deletionTime = deletionTime; + } + + public static TestWriteOperation insert(String partitionKey, String clusteringKey, + String columnName) + { + return new TestWriteOperation(OperationType.CREATE, partitionKey, clusteringKey, null, null, columnName, 0); + } + + public static TestWriteOperation deletePartition(String partitionKey, long deletionTime) + { + return new TestWriteOperation(OperationType.DELETE_PARTITION, partitionKey, + null, null, null, null, deletionTime); + } + + public static TestWriteOperation deleteRange(String partitionKey, String clusteringRangeStart, String clusteringRangeEnd, long deletionTime) + { + return new TestWriteOperation(OperationType.DELETE_RANGE, partitionKey, + null, clusteringRangeStart, clusteringRangeEnd, null, deletionTime); + } + + public static TestWriteOperation deleteRow(String partitionKey, String clusteringKey, long deletionTime) + { + return new TestWriteOperation(OperationType.DELETE_ROW, partitionKey, clusteringKey, + null, null, null, deletionTime); + } + + public static TestWriteOperation deleteCell(String partitionKey, String clusteringKey, String columnName, long deletionTime) + { + return new TestWriteOperation(OperationType.DELETE_CELL, partitionKey, clusteringKey, + null, null, columnName, deletionTime); + } + + + } + + private static void executeReadCommands(List commands) throws IOException + { + ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE); + + try (ReadExecutionController executionController = query.executionController(); + UnfilteredPartitionIterator iter = query.executeLocally(executionController); + DataOutputBuffer buffer = new DataOutputBuffer()) + { + UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, + query.columnFilter(), + buffer, + MessagingService.current_version); + } + } + + private static SinglePartitionReadCommand getWholePartitionReadCommand(ColumnFamilyStore cfs, String partitionKey) + { + long nowInSeconds = FBUtilities.nowInSeconds(); + ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build(); + RowFilter rowFilter = RowFilter.create(true); + Slice slice = Slice.make(BufferClusteringBound.BOTTOM, BufferClusteringBound.TOP); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata().comparator, slice), false); + return SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, + columnFilter, rowFilter, + DataLimits.NONE, Util.dk(partitionKey), sliceFilter); + } + @Test public void testSinglePartitionSliceRepairedDataTracking() throws Exception {