From 0358225f7b00f617ee30dbe4b69ff4508a94ba47 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Sun, 8 Dec 2024 16:10:01 +0300 Subject: [PATCH 1/2] Add new table metric PurgeableTombstoneScannedHistogram and a tracing event for scanned purgeable tombstones Patch by Dmitry Konstantinov; reviewed by TBD for CASSANDRA-20132 --- .../org/apache/cassandra/db/ReadCommand.java | 125 ++++++ .../db/virtual/TableMetricTables.java | 1 + .../cassandra/metrics/KeyspaceMetrics.java | 3 + .../cassandra/metrics/TableMetrics.java | 3 + .../apache/cassandra/db/ReadCommandTest.java | 406 ++++++++++++++++-- 5 files changed, 494 insertions(+), 44 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index a61d0a037ef7..d777e3a5202b 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -455,6 +455,7 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut iterator = withQuerySizeTracking(iterator); iterator = maybeSlowDownForTesting(iterator); iterator = withQueryCancellation(iterator); + iterator = withPurgeableTombstonesMetricRecording(iterator, cfs); iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false); iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); @@ -867,6 +868,130 @@ protected LongPredicate getPurgeEvaluator() return Transformation.apply(iterator, new WithoutPurgeableTombstones()); } + + /** + * Wraps the provided iterator so that metrics on count of purgeable tombstones are tracked and traced. + * It tracks only tombstones with localDeletionTime < now - gc_grace_period. + * Other (non-purgeable) tombstones will be tracked by regular Cassandra logic later. + */ + private UnfilteredPartitionIterator withPurgeableTombstonesMetricRecording(UnfilteredPartitionIterator iter, + ColumnFamilyStore cfs) + { + class PurgeableTombstonesMetricRecording extends Transformation + { + private int purgeableTombstones = 0; + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) + { + return Transformation.apply(iter, this); + } + + @Override + public Row applyToStatic(Row row) + { + return applyToRow(row); + } + + @Override + public Row applyToRow(Row row) + { + final long nowInSec = nowInSec(); + boolean hasTombstones = false; + + for (Cell cell : row.cells()) + { + if (!cell.isLive(nowInSec) && isPurgeable(cell.localDeletionTime(), nowInSec)) + { + purgeableTombstones++; + hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired + } + } + + if (!row.primaryKeyLivenessInfo().isLive(nowInSec) + && row.hasDeletion(nowInSec) + && isPurgeable(row.deletion().time(), nowInSec) + && !hasTombstones) + { + // We're counting primary key deletions only here. + purgeableTombstones++; + } + + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + final long nowInSec = nowInSec(); + + // for boundary markers - increment metric only if both - close and open - markers are purgeable, + if (marker.isBoundary()) + { + countIfBothPurgeable(marker.closeDeletionTime(false), + marker.openDeletionTime(false), + nowInSec); + } + // for bound markers - just increment if it is purgeable + else + { + countIfPurgeable(((RangeTombstoneBoundMarker) marker).deletionTime(), nowInSec); + } + + return marker; + } + + @Override + public void onClose() + { + cfs.metric.purgeableTombstoneScannedHistogram.update(purgeableTombstones); + if (purgeableTombstones > 0) + Tracing.trace("Read {} purgeable tombstone cells", purgeableTombstones); + } + + /** + * Increments if both - close and open - deletion times less than (now - gc_grace_period) + */ + private void countIfBothPurgeable(DeletionTime closeDeletionTime, + DeletionTime openDeletionTime, + long nowInSec) + { + if (isPurgeable(closeDeletionTime, nowInSec) && isPurgeable(openDeletionTime, nowInSec)) + purgeableTombstones++; + } + + /** + * Increments if deletion time less than (now - gc_grace_period) + */ + private void countIfPurgeable(DeletionTime deletionTime, + long nowInSec) + { + if (isPurgeable(deletionTime, nowInSec)) + purgeableTombstones++; + } + + /** + * Checks that deletion time < now - gc_grace_period + */ + private boolean isPurgeable(DeletionTime deletionTime, + long nowInSec) + { + return isPurgeable(deletionTime.localDeletionTime(), nowInSec); + } + + /** + * Checks that deletion time < now - gc_grace_period + */ + private boolean isPurgeable(long localDeletionTime, + long nowInSec) + { + return localDeletionTime < cfs.gcBefore(nowInSec); + } + } + + return Transformation.apply(iter, new PurgeableTombstonesMetricRecording()); + } + /** * Return the queried token(s) for logging */ diff --git a/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java b/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java index 5528c92011cc..e1defe51e69e 100644 --- a/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java +++ b/src/java/org/apache/cassandra/db/virtual/TableMetricTables.java @@ -75,6 +75,7 @@ public static Collection getAll(String name) new LatencyTableMetric(name, "local_write_latency", t -> t.writeLatency.latency), new LatencyTableMetric(name, "coordinator_write_latency", t -> t.coordinatorWriteLatency), new HistogramTableMetric(name, "tombstones_per_read", t -> t.tombstoneScannedHistogram.cf), + new HistogramTableMetric(name, "purgeable_tombstones_per_read", t -> t.purgeableTombstoneScannedHistogram.cf), new HistogramTableMetric(name, "rows_per_read", t -> t.liveScannedHistogram.cf), new StorageTableMetric(name, "disk_usage", (TableMetrics t) -> t.totalDiskSpaceUsed), new StorageTableMetric(name, "max_partition_size", (TableMetrics t) -> t.maxPartitionSize), diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index ae15bbf95a04..984b4aa70496 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -86,6 +86,8 @@ public class KeyspaceMetrics public final Histogram sstablesPerRangeReadHistogram; /** Tombstones scanned in queries on this Keyspace */ public final Histogram tombstoneScannedHistogram; + /** Purgeable tombstones scanned in queries on this Keyspace */ + public final Histogram purgeableTombstoneScannedHistogram; /** Live cells scanned in queries on this Keyspace */ public final Histogram liveScannedHistogram; /** Column update time delta on this Keyspace */ @@ -238,6 +240,7 @@ public KeyspaceMetrics(final Keyspace ks) sstablesPerReadHistogram = createKeyspaceHistogram("SSTablesPerReadHistogram", true); sstablesPerRangeReadHistogram = createKeyspaceHistogram("SSTablesPerRangeReadHistogram", true); tombstoneScannedHistogram = createKeyspaceHistogram("TombstoneScannedHistogram", false); + purgeableTombstoneScannedHistogram = createKeyspaceHistogram("PurgeableTombstoneScannedHistogram", false); liveScannedHistogram = createKeyspaceHistogram("LiveScannedHistogram", false); colUpdateTimeDeltaHistogram = createKeyspaceHistogram("ColUpdateTimeDeltaHistogram", false); viewLockAcquireTime = createKeyspaceTimer("ViewLockAcquireTime"); diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index b2a8e61fe4ca..36013f814cf2 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -151,6 +151,8 @@ public class TableMetrics public final Gauge compressionMetadataOffHeapMemoryUsed; /** Tombstones scanned in queries on this CF */ public final TableHistogram tombstoneScannedHistogram; + /** Purgeable tombstones scanned in queries on this CF */ + public final TableHistogram purgeableTombstoneScannedHistogram; /** Live rows scanned in queries on this CF */ public final TableHistogram liveScannedHistogram; /** Column update time delta on this CF */ @@ -771,6 +773,7 @@ public Long getValue() additionalWriteLatencyNanos = createTableGauge("AdditionalWriteLatencyNanos", () -> MICROSECONDS.toNanos(cfs.additionalWriteLatencyMicros)); tombstoneScannedHistogram = createTableHistogram("TombstoneScannedHistogram", cfs.keyspace.metric.tombstoneScannedHistogram, false); + purgeableTombstoneScannedHistogram = createTableHistogram("PurgeableTombstoneScannedHistogram", cfs.keyspace.metric.purgeableTombstoneScannedHistogram, false); liveScannedHistogram = createTableHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram, false); colUpdateTimeDeltaHistogram = createTableHistogram("ColUpdateTimeDeltaHistogram", cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false); coordinatorReadLatency = createTableTimer("CoordinatorReadLatency"); diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 3701ae419339..41d9d684a52c 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -93,6 +93,9 @@ public class ReadCommandTest { + private static final String CREATE = "1"; + private static final String DELETE = "-1"; + private static final String KEYSPACE = "ReadCommandTest"; private static final String CF1 = "Standard1"; private static final String CF2 = "Standard2"; @@ -103,6 +106,9 @@ public class ReadCommandTest private static final String CF7 = "Counter7"; private static final String CF8 = "Standard8"; private static final String CF9 = "Standard9"; + private static final String CF10 = "Standard10"; + private static final String CF11 = "Standard11"; + private static final String CF12 = "Standard12"; private static final InetAddressAndPort REPAIR_COORDINATOR; static { @@ -194,6 +200,39 @@ public static void defineSchema() throws ConfigurationException .addClusteringColumn("col", ReversedType.getInstance(Int32Type.instance)) .addRegularColumn("a", AsciiType.instance); + TableMetadata.Builder metadata10 = + TableMetadata.builder(KEYSPACE, CF10) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + + TableMetadata.Builder metadata11 = + TableMetadata.builder(KEYSPACE, CF11) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + + TableMetadata.Builder metadata12 = + TableMetadata.builder(KEYSPACE, CF12) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), @@ -205,7 +244,10 @@ public static void defineSchema() throws ConfigurationException metadata6, metadata7, metadata8, - metadata9); + metadata9, + metadata10, + metadata11, + metadata12); LocalSessionAccessor.startup(); } @@ -332,23 +374,23 @@ public void testSinglePartitionGroupMerge() throws Exception String[][][] groups = new String[][][] { new String[][] { - new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the row - new String[] { "1", "key2", "bb", "b" }, - new String[] { "1", "key3", "cc", "c" } + new String[] { CREATE, "key1", "aa", "a" }, + new String[] { CREATE, "key2", "bb", "b" }, + new String[] { CREATE, "key3", "cc", "c" } }, new String[][] { - new String[] { "1", "key3", "dd", "d" }, - new String[] { "1", "key2", "ee", "e" }, - new String[] { "1", "key1", "ff", "f" } + new String[] { CREATE, "key3", "dd", "d" }, + new String[] { CREATE, "key2", "ee", "e" }, + new String[] { CREATE, "key1", "ff", "f" } }, new String[][] { - new String[] { "1", "key6", "aa", "a" }, - new String[] { "1", "key5", "bb", "b" }, - new String[] { "1", "key4", "cc", "c" } + new String[] { CREATE, "key6", "aa", "a" }, + new String[] { CREATE, "key5", "bb", "b" }, + new String[] { CREATE, "key4", "cc", "c" } }, new String[][] { - new String[] { "-1", "key6", "aa", "a" }, - new String[] { "-1", "key2", "bb", "b" } + new String[] { DELETE, "key6", "aa", "a" }, + new String[] { DELETE, "key2", "bb", "b" } } }; @@ -371,7 +413,7 @@ public void testSinglePartitionGroupMerge() throws Exception for (String[] data : group) { - if (data[0].equals("1")) + if (data[0].equals(CREATE)) { new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1])) .clustering(data[2]) @@ -493,33 +535,32 @@ public void testCountDeletedRows() throws Exception String[][][] groups = new String[][][] { new String[][] { - new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the - // row - new String[] { "1", "key2", "bb", "b" }, - new String[] { "1", "key3", "cc", "c" } + new String[] { CREATE, "key1", "aa", "a" }, + new String[] { CREATE, "key2", "bb", "b" }, + new String[] { CREATE, "key3", "cc", "c" } }, new String[][] { - new String[] { "1", "key3", "dd", "d" }, - new String[] { "1", "key2", "ee", "e" }, - new String[] { "1", "key1", "ff", "f" } + new String[] { CREATE, "key3", "dd", "d" }, + new String[] { CREATE, "key2", "ee", "e" }, + new String[] { CREATE, "key1", "ff", "f" } }, new String[][] { - new String[] { "1", "key6", "aa", "a" }, - new String[] { "1", "key5", "bb", "b" }, - new String[] { "1", "key4", "cc", "c" } + new String[] { CREATE, "key6", "aa", "a" }, + new String[] { CREATE, "key5", "bb", "b" }, + new String[] { CREATE, "key4", "cc", "c" } }, new String[][] { - new String[] { "1", "key2", "aa", "a" }, - new String[] { "1", "key2", "cc", "c" }, - new String[] { "1", "key2", "dd", "d" } + new String[] { CREATE, "key2", "aa", "a" }, + new String[] { CREATE, "key2", "cc", "c" }, + new String[] { CREATE, "key2", "dd", "d" } }, new String[][] { - new String[] { "-1", "key6", "aa", "a" }, - new String[] { "-1", "key2", "bb", "b" }, - new String[] { "-1", "key2", "ee", "e" }, - new String[] { "-1", "key2", "aa", "a" }, - new String[] { "-1", "key2", "cc", "c" }, - new String[] { "-1", "key2", "dd", "d" } + new String[] { DELETE, "key6", "aa", "a" }, + new String[] { DELETE, "key2", "bb", "b" }, + new String[] { DELETE, "key2", "ee", "e" }, + new String[] { DELETE, "key2", "aa", "a" }, + new String[] { DELETE, "key2", "cc", "c" }, + new String[] { DELETE, "key2", "dd", "d" } } }; @@ -539,7 +580,7 @@ public void testCountDeletedRows() throws Exception for (String[] data : group) { - if (data[0].equals("1")) + if (data[0].equals(CREATE)) { new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1])) .clustering(data[2]) @@ -582,20 +623,19 @@ public void testCountWithNoDeletedRow() throws Exception String[][][] groups = new String[][][] { new String[][] { - new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the - // row - new String[] { "1", "key2", "bb", "b" }, - new String[] { "1", "key3", "cc", "c" } + new String[] { CREATE, "key1", "aa", "a" }, + new String[] { CREATE, "key2", "bb", "b" }, + new String[] { CREATE, "key3", "cc", "c" } }, new String[][] { - new String[] { "1", "key3", "dd", "d" }, - new String[] { "1", "key2", "ee", "e" }, - new String[] { "1", "key1", "ff", "f" } + new String[] { CREATE, "key3", "dd", "d" }, + new String[] { CREATE, "key2", "ee", "e" }, + new String[] { CREATE, "key1", "ff", "f" } }, new String[][] { - new String[] { "1", "key6", "aa", "a" }, - new String[] { "1", "key5", "bb", "b" }, - new String[] { "1", "key4", "cc", "c" } + new String[] { CREATE, "key6", "aa", "a" }, + new String[] { CREATE, "key5", "bb", "b" }, + new String[] { CREATE, "key4", "cc", "c" } } }; @@ -615,7 +655,7 @@ public void testCountWithNoDeletedRow() throws Exception for (String[] data : group) { - if (data[0].equals("1")) + if (data[0].equals(CREATE)) { new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1])) .clustering(data[2]) @@ -651,6 +691,284 @@ public void testCountWithNoDeletedRow() throws Exception assertEquals(1, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); } + @Test + public void testCountPurgeableTombstones() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF10); + + String[][][] groups = new String[][][] { + new String[][] { + new String[] { CREATE, "key1", "aa", "a" }, + new String[] { CREATE, "key2", "bb", "b" }, + new String[] { CREATE, "key3", "cc", "c" } + }, + new String[][] { + new String[] { CREATE, "key3", "dd", "d" }, + new String[] { CREATE, "key2", "ee", "e" }, + new String[] { CREATE, "key1", "ff", "f" } + }, + new String[][] { + new String[] { CREATE, "key6", "aa", "a" }, + new String[] { CREATE, "key5", "bb", "b" }, + new String[] { CREATE, "key4", "cc", "c" } + }, + new String[][] { + new String[] { CREATE, "key2", "aa", "a" }, + new String[] { CREATE, "key2", "cc", "c" }, + new String[] { CREATE, "key2", "dd", "d" } + }, + new String[][] { + // last column for deletions contains localDeletionTime in seconds + new String[] { DELETE, "key6", "aa", String.valueOf(FBUtilities.nowInSeconds()) }, // <-- just to make sure counters separation for different partitions as in other tests + new String[] { DELETE, "key6", "bb", "42" }, // <-- just to make sure counters separation for different partitions as in other tests + new String[] { DELETE, "key2", "bb", String.valueOf(FBUtilities.nowInSeconds()) }, // <-- tombstone for this deletion is inside gc_grace_period range so isn't purgeable + new String[] { DELETE, "key2", "ee", String.valueOf(FBUtilities.nowInSeconds()) }, // <-- this one two, so TWO non-purgeable tombstones should be tracked at max + new String[] { DELETE, "key2", "aa", "42" }, // <-- this tombstone has very old localDeletionTime so should be treated as purgeable one + new String[] { DELETE, "key2", "cc", "42" }, // <-- this one too + new String[] { DELETE, "key2", "dd", "42" } // <-- this one too, so THREE purgeable tombstones should be tracked at max + } + }; + + List buffers = new ArrayList<>(groups.length); + long nowInSeconds = FBUtilities.nowInSeconds(); + ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build(); + RowFilter rowFilter = RowFilter.create(true); + Slice slice = Slice.make(BufferClusteringBound.BOTTOM, BufferClusteringBound.TOP); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter( + Slices.with(cfs.metadata().comparator, slice), false); + + for (String[][] group : groups) + { + cfs.truncateBlocking(); + + List commands = new ArrayList<>(group.length); + + for (String[] data : group) + { + if (data[0].equals(CREATE)) + { + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1])) + .clustering(data[2]) + .add(data[3], ByteBufferUtil.bytes("blah")) + .build() + .apply(); + } + else + { + RowUpdateBuilder.deleteRowAt(cfs.metadata(), + FBUtilities.timestampMicros(), Long.parseLong(data[3]), + ByteBufferUtil.bytes(data[1]), data[2]) + .apply(); + } + commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter, + DataLimits.NONE, Util.dk(data[1]), sliceFilter)); + } + + Util.flush(cfs); + + ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE); + + try (ReadExecutionController executionController = query.executionController(); + UnfilteredPartitionIterator iter = query.executeLocally(executionController); + DataOutputBuffer buffer = new DataOutputBuffer()) + { + UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, + columnFilter, + buffer, + MessagingService.current_version); + buffers.add(buffer.buffer()); + } + } + + + assertEquals(2, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); + assertEquals(3, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + + /** + * Test purgeable tombstones count for range tombstones with non-overlapping ranges, + * i.e. only Bound (not Boundary) Markers will be created and counted + */ + @Test + public void testCountPurgeableRangeTombstones_nonOverlappingRanges() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF11); + + String[][][] groups = new String[][][] { + new String[][] { + new String[] { "key1", "aa", "a" }, + new String[] { "key2", "bb", "b" }, + new String[] { "key3", "cc", "c" } + }, + new String[][] { + new String[] { "key3", "dd", "d" }, + new String[] { "key2", "ee", "e" }, + new String[] { "key1", "ff", "f" } + }, + new String[][] { + new String[] { "key6", "aa", "a" }, + new String[] { "key5", "bb", "b" }, + new String[] { "key4", "cc", "c" } + }, + new String[][] { + new String[] { "key2", "aa", "a" }, + new String[] { "key2", "cc", "c" }, + new String[] { "key2", "dd", "d" } + }, + }; + + List buffers = new ArrayList<>(groups.length); + long nowInSeconds = FBUtilities.nowInSeconds(); + ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build(); + RowFilter rowFilter = RowFilter.create(true); + Slice slice = Slice.make(BufferClusteringBound.BOTTOM, BufferClusteringBound.TOP); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter( + Slices.with(cfs.metadata().comparator, slice), false); + + for (String[][] group : groups) + { + cfs.truncateBlocking(); + + List commands = new ArrayList<>(group.length); + + for (String[] data : group) + { + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[0])) + .clustering(data[1]) + .add(data[2], ByteBufferUtil.bytes("blah")) + .build() + .apply(); + + commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter, + DataLimits.NONE, Util.dk(data[0]), sliceFilter)); + } + + new RowUpdateBuilder(cfs.metadata(), FBUtilities.nowInSeconds(), 0L, ByteBufferUtil.bytes("key2")) + .addRangeTombstone("aa", "bb").build().apply(); // <-- this range tombstone is non-purgeable and doesn't overlap with other ranges + // so it will create two markers which will be counted as TWO non-purgeable tombstones + + new RowUpdateBuilder(cfs.metadata(), 42, 0L, ByteBufferUtil.bytes("key2")) + .addRangeTombstone("dd", "ee").build().apply(); // <-- this range tombstone is purgeable and doesn't overlap with other ranges + // so it will create two markers which will be counted as TWO purgeable tombstones + + new RowUpdateBuilder(cfs.metadata(), 42, 0L, ByteBufferUtil.bytes("key2")) + .addRangeTombstone("ff", "ff").build().apply(); // <-- this one too so there would be FOUR purgeable tombstones in total + + Util.flush(cfs); + + ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE); + + try (ReadExecutionController executionController = query.executionController(); + UnfilteredPartitionIterator iter = query.executeLocally(executionController); + DataOutputBuffer buffer = new DataOutputBuffer()) + { + UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, + columnFilter, + buffer, + MessagingService.current_version); + buffers.add(buffer.buffer()); + } + } + + assertEquals(2, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); + assertEquals(4, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + + /** + * Test purgeable tombstones count for range tombstones with overlapping ranges, + * i.e. both Bound and Boundary Markers will be created and counted + */ + @Test + public void testCountPurgeableRangeTombstones_overlappingRanges() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF12); + + String[][][] groups = new String[][][] { + new String[][] { + new String[] { "key1", "aa", "a" }, + new String[] { "key2", "bb", "b" }, + new String[] { "key3", "cc", "c" } + }, + new String[][] { + new String[] { "key3", "dd", "d" }, + new String[] { "key2", "ee", "e" }, + new String[] { "key1", "ff", "f" } + }, + new String[][] { + new String[] { "key6", "aa", "a" }, + new String[] { "key5", "bb", "b" }, + new String[] { "key4", "cc", "c" } + }, + new String[][] { + new String[] { "key2", "aa", "a" }, + new String[] { "key2", "cc", "c" }, + new String[] { "key2", "dd", "d" } + }, + }; + + List buffers = new ArrayList<>(groups.length); + long nowInSeconds = FBUtilities.nowInSeconds(); + ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build(); + RowFilter rowFilter = RowFilter.create(true); + Slice slice = Slice.make(BufferClusteringBound.BOTTOM, BufferClusteringBound.TOP); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter( + Slices.with(cfs.metadata().comparator, slice), false); + + for (String[][] group : groups) + { + cfs.truncateBlocking(); + + List commands = new ArrayList<>(group.length); + + for (String[] data : group) + { + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[0])) + .clustering(data[1]) + .add(data[2], ByteBufferUtil.bytes("blah")) + .build() + .apply(); + + commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter, + DataLimits.NONE, Util.dk(data[0]), sliceFilter)); + } + + new RowUpdateBuilder(cfs.metadata(), FBUtilities.nowInSeconds(), 0L, ByteBufferUtil.bytes("key2")) + .addRangeTombstone("aa", "bb").build().apply(); // <-- this range tombstone is non-purgeable and overlaps with the next one + // so it will create one non-purgeable bound marker + // and one non-purgeable boundary marker so TWO non-purgeable tombstones will be counted + + new RowUpdateBuilder(cfs.metadata(), 42, 0L, ByteBufferUtil.bytes("key2")) + .addRangeTombstone("bb", "ee").build().apply(); // <-- this range tombstone is purgeable and overlaps with previous and next ones + // so it will create one non-purgeable boundary marker (same as previous one) + // and one purgeable boundary marker, so it will increment purgeable tombstones counter on one + + new RowUpdateBuilder(cfs.metadata(), 52, 0L, ByteBufferUtil.bytes("key2")) + .addRangeTombstone("ee", "ff").build().apply(); // <-- this range tombstone is purgeable and overlaps with previous one + // so it will create one non-purgeable boundary marker (same as previous one) + // and one purgeable bound marker, so it will increment purgeable tombstones counter on one, + // so we expect TWO purgeable tombstones in total + + Util.flush(cfs); + + ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE); + + try (ReadExecutionController executionController = query.executionController(); + UnfilteredPartitionIterator iter = query.executeLocally(executionController); + DataOutputBuffer buffer = new DataOutputBuffer()) + { + UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, + columnFilter, + buffer, + MessagingService.current_version); + buffers.add(buffer.buffer()); + } + } + + + assertEquals(2, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + @Test public void testSinglePartitionSliceRepairedDataTracking() throws Exception { From 1ea154e822c26f42d249f8b853ef29d029ecef5f Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Sun, 19 Jan 2025 14:59:48 +0000 Subject: [PATCH 2/2] Address review comments: enable considerZeros for the new metric, add partition tombstone counting Add purgeable_tobmstones_metric_granularity configuration option to give the ability to control an overhead of the added metric collection, use row mode as a default to reduce overheads Make added unit tests more readable Patch by Dmitry Konstantinov; reviewed by Chris Lohfink for CASSANDRA-20132 --- .../org/apache/cassandra/config/Config.java | 21 + .../cassandra/config/DatabaseDescriptor.java | 11 + .../org/apache/cassandra/db/ReadCommand.java | 21 +- .../cassandra/metrics/TableMetrics.java | 2 +- .../apache/cassandra/db/ReadCommandTest.java | 505 ++++++++++-------- 5 files changed, 330 insertions(+), 230 deletions(-) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 103d8a7cf7ea..f18aa3434b5f 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -526,6 +526,8 @@ public static class SSTableConfig public volatile int tombstone_warn_threshold = 1000; public volatile int tombstone_failure_threshold = 100000; + public TombstonesMetricGranularity purgeable_tobmstones_metric_granularity = TombstonesMetricGranularity.row; + public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions(); @Replaces(oldName = "index_summary_capacity_in_mb", converter = Converters.MEBIBYTES_DATA_STORAGE_LONG, deprecated = true) @@ -1312,6 +1314,25 @@ public enum BatchlogEndpointStrategy } } + public enum TombstonesMetricGranularity + { + /** + * do not collect the metric at all + */ + disabled, + /** + * track only partition/range/row level tombstone, + * a good compromise between overheads and usability + */ + row, + /** + * track partition/range/row/cell level tombstones, + * the most granular option, but it has some performance overheads + * due to iteration over cells + */ + cell + } + private static final Set SENSITIVE_KEYS = new HashSet() {{ add("client_encryption_options"); add("server_encryption_options"); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index bf44b53e2b0d..33142c581ef6 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -5194,4 +5194,15 @@ public static void setRejectOutOfTokenRangeRequests(boolean enabled) { conf.reject_out_of_token_range_requests = enabled; } + + public static Config.TombstonesMetricGranularity getPurgeableTobmstonesMetricGranularity() + { + return conf.purgeable_tobmstones_metric_granularity; + } + + @VisibleForTesting + public static void setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetricGranularity granularity) + { + conf.purgeable_tobmstones_metric_granularity = granularity; + } } diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index d777e3a5202b..6bccc32b08a8 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -455,7 +455,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut iterator = withQuerySizeTracking(iterator); iterator = maybeSlowDownForTesting(iterator); iterator = withQueryCancellation(iterator); - iterator = withPurgeableTombstonesMetricRecording(iterator, cfs); + if (DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() != Config.TombstonesMetricGranularity.disabled) + iterator = withPurgeableTombstonesMetricRecording(iterator, cfs); iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false); iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); @@ -884,6 +885,8 @@ class PurgeableTombstonesMetricRecording extends Transformation cell : row.cells()) - { - if (!cell.isLive(nowInSec) && isPurgeable(cell.localDeletionTime(), nowInSec)) + if (DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() == Config.TombstonesMetricGranularity.cell) + for (Cell cell : row.cells()) { - purgeableTombstones++; - hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired + if (!cell.isLive(nowInSec) && isPurgeable(cell.localDeletionTime(), nowInSec)) + { + purgeableTombstones++; + hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired + } } - } + // we replicate the logic is used for non-purged tombstones metric here if (!row.primaryKeyLivenessInfo().isLive(nowInSec) && row.hasDeletion(nowInSec) && isPurgeable(row.deletion().time(), nowInSec) @@ -925,7 +930,7 @@ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) { final long nowInSec = nowInSec(); - // for boundary markers - increment metric only if both - close and open - markers are purgeable, + // for boundary markers - increment metric only if both - close and open - markers are purgeable if (marker.isBoundary()) { countIfBothPurgeable(marker.closeDeletionTime(false), diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 36013f814cf2..87abb7a8dd55 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -773,7 +773,7 @@ public Long getValue() additionalWriteLatencyNanos = createTableGauge("AdditionalWriteLatencyNanos", () -> MICROSECONDS.toNanos(cfs.additionalWriteLatencyMicros)); tombstoneScannedHistogram = createTableHistogram("TombstoneScannedHistogram", cfs.keyspace.metric.tombstoneScannedHistogram, false); - purgeableTombstoneScannedHistogram = createTableHistogram("PurgeableTombstoneScannedHistogram", cfs.keyspace.metric.purgeableTombstoneScannedHistogram, false); + purgeableTombstoneScannedHistogram = createTableHistogram("PurgeableTombstoneScannedHistogram", cfs.keyspace.metric.purgeableTombstoneScannedHistogram, true); liveScannedHistogram = createTableHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram, false); colUpdateTimeDeltaHistogram = createTableHistogram("ColUpdateTimeDeltaHistogram", cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false); coordinatorReadLatency = createTableTimer("CoordinatorReadLatency"); diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 41d9d684a52c..3b5ecbfa9830 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.ColumnFilter; @@ -109,6 +110,9 @@ public class ReadCommandTest private static final String CF10 = "Standard10"; private static final String CF11 = "Standard11"; private static final String CF12 = "Standard12"; + private static final String CF13 = "Standard13"; + private static final String CF14 = "Standard14"; + private static final InetAddressAndPort REPAIR_COORDINATOR; static { @@ -233,6 +237,28 @@ public static void defineSchema() throws ConfigurationException .addRegularColumn("e", AsciiType.instance) .addRegularColumn("f", AsciiType.instance); + TableMetadata.Builder metadata13 = + TableMetadata.builder(KEYSPACE, CF13) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + + TableMetadata.Builder metadata14 = + TableMetadata.builder(KEYSPACE, CF14) + .addPartitionKeyColumn("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance); + SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), @@ -247,7 +273,9 @@ public static void defineSchema() throws ConfigurationException metadata9, metadata10, metadata11, - metadata12); + metadata12, + metadata13, + metadata14); LocalSessionAccessor.startup(); } @@ -692,97 +720,80 @@ public void testCountWithNoDeletedRow() throws Exception } @Test - public void testCountPurgeableTombstones() throws Exception + public void testCountPurgeableRowTombstones() throws Exception { ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF10); - - String[][][] groups = new String[][][] { - new String[][] { - new String[] { CREATE, "key1", "aa", "a" }, - new String[] { CREATE, "key2", "bb", "b" }, - new String[] { CREATE, "key3", "cc", "c" } - }, - new String[][] { - new String[] { CREATE, "key3", "dd", "d" }, - new String[] { CREATE, "key2", "ee", "e" }, - new String[] { CREATE, "key1", "ff", "f" } - }, - new String[][] { - new String[] { CREATE, "key6", "aa", "a" }, - new String[] { CREATE, "key5", "bb", "b" }, - new String[] { CREATE, "key4", "cc", "c" } - }, - new String[][] { - new String[] { CREATE, "key2", "aa", "a" }, - new String[] { CREATE, "key2", "cc", "c" }, - new String[] { CREATE, "key2", "dd", "d" } - }, - new String[][] { - // last column for deletions contains localDeletionTime in seconds - new String[] { DELETE, "key6", "aa", String.valueOf(FBUtilities.nowInSeconds()) }, // <-- just to make sure counters separation for different partitions as in other tests - new String[] { DELETE, "key6", "bb", "42" }, // <-- just to make sure counters separation for different partitions as in other tests - new String[] { DELETE, "key2", "bb", String.valueOf(FBUtilities.nowInSeconds()) }, // <-- tombstone for this deletion is inside gc_grace_period range so isn't purgeable - new String[] { DELETE, "key2", "ee", String.valueOf(FBUtilities.nowInSeconds()) }, // <-- this one two, so TWO non-purgeable tombstones should be tracked at max - new String[] { DELETE, "key2", "aa", "42" }, // <-- this tombstone has very old localDeletionTime so should be treated as purgeable one - new String[] { DELETE, "key2", "cc", "42" }, // <-- this one too - new String[] { DELETE, "key2", "dd", "42" } // <-- this one too, so THREE purgeable tombstones should be tracked at max - } + TestWriteOperation[] operations = new TestWriteOperation[] + { + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), + + TestWriteOperation.insert("key2", "aa", "e"), + TestWriteOperation.deleteRow("key2", "aa", PURGEABLE_DELETION), + TestWriteOperation.deleteRow("key2", "bb", NEW_DELETION), + TestWriteOperation.deleteRow("key2", "cc", PURGEABLE_DELETION), + TestWriteOperation.deleteRow("key2", "dd", PURGEABLE_DELETION), + TestWriteOperation.deleteRow("key2", "ee", NEW_DELETION), }; - List buffers = new ArrayList<>(groups.length); - long nowInSeconds = FBUtilities.nowInSeconds(); - ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build(); - RowFilter rowFilter = RowFilter.create(true); - Slice slice = Slice.make(BufferClusteringBound.BOTTOM, BufferClusteringBound.TOP); - ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter( - Slices.with(cfs.metadata().comparator, slice), false); + runTestWriteOperationsAndReadResults(cfs, operations); - for (String[][] group : groups) + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount()); + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(3, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + @Test + public void testCountPurgeablePartitionTombstones() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF11); + TestWriteOperation[] operations = new TestWriteOperation[] { - cfs.truncateBlocking(); + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), - List commands = new ArrayList<>(group.length); + TestWriteOperation.insert("key2", "aa", "a"), + TestWriteOperation.insert("key2", "cc", "c"), + TestWriteOperation.insert("key2", "dd", "d"), - for (String[] data : group) - { - if (data[0].equals(CREATE)) - { - new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1])) - .clustering(data[2]) - .add(data[3], ByteBufferUtil.bytes("blah")) - .build() - .apply(); - } - else - { - RowUpdateBuilder.deleteRowAt(cfs.metadata(), - FBUtilities.timestampMicros(), Long.parseLong(data[3]), - ByteBufferUtil.bytes(data[1]), data[2]) - .apply(); - } - commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter, - DataLimits.NONE, Util.dk(data[1]), sliceFilter)); - } - - Util.flush(cfs); + TestWriteOperation.deletePartition("key2", PURGEABLE_DELETION), + TestWriteOperation.deletePartition("key3", NEW_DELETION) + }; + runTestWriteOperationsAndReadResults(cfs, operations); - ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE); + assertEquals(3, cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount()); + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(1, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } - try (ReadExecutionController executionController = query.executionController(); - UnfilteredPartitionIterator iter = query.executeLocally(executionController); - DataOutputBuffer buffer = new DataOutputBuffer()) + @Test + public void testCountPurgeableCellTombstones() throws Exception + { + Config.TombstonesMetricGranularity original = DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity(); + try + { + DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(Config.TombstonesMetricGranularity.cell); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF12); + TestWriteOperation[] operations = new TestWriteOperation[] { - UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, - columnFilter, - buffer, - MessagingService.current_version); - buffers.add(buffer.buffer()); - } - } - + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), + + TestWriteOperation.insert("key2", "aa", "a"), + TestWriteOperation.deleteCell("key2", "aa", "b", PURGEABLE_DELETION), + TestWriteOperation.deleteCell("key2", "aa", "f", NEW_DELETION), + TestWriteOperation.insert("key2", "cc", "c"), + TestWriteOperation.insert("key2", "dd", "d") + }; + runTestWriteOperationsAndReadResults(cfs, operations); - assertEquals(2, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); - assertEquals(3, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount()); + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(1, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + finally + { + DatabaseDescriptor.setPurgeableTobmstonesMetricGranularity(original); + } } /** @@ -792,181 +803,233 @@ public void testCountPurgeableTombstones() throws Exception @Test public void testCountPurgeableRangeTombstones_nonOverlappingRanges() throws Exception { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF11); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF13); + TestWriteOperation[] operations = new TestWriteOperation[] + { + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), - String[][][] groups = new String[][][] { - new String[][] { - new String[] { "key1", "aa", "a" }, - new String[] { "key2", "bb", "b" }, - new String[] { "key3", "cc", "c" } - }, - new String[][] { - new String[] { "key3", "dd", "d" }, - new String[] { "key2", "ee", "e" }, - new String[] { "key1", "ff", "f" } - }, - new String[][] { - new String[] { "key6", "aa", "a" }, - new String[] { "key5", "bb", "b" }, - new String[] { "key4", "cc", "c" } - }, - new String[][] { - new String[] { "key2", "aa", "a" }, - new String[] { "key2", "cc", "c" }, - new String[] { "key2", "dd", "d" } - }, + TestWriteOperation.insert("key2", "aa", "a"), + TestWriteOperation.insert("key2", "cc", "c"), + TestWriteOperation.insert("key2", "dd", "d"), + + TestWriteOperation.deleteRange("key2", "aa", "bb", NEW_DELETION), + TestWriteOperation.deleteRange("key2", "dd", "ee", PURGEABLE_DELETION), + TestWriteOperation.deleteRange("key2", "ff", "ff", PURGEABLE_DELETION) }; + runTestWriteOperationsAndReadResults(cfs, operations); - List buffers = new ArrayList<>(groups.length); - long nowInSeconds = FBUtilities.nowInSeconds(); - ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build(); - RowFilter rowFilter = RowFilter.create(true); - Slice slice = Slice.make(BufferClusteringBound.BOTTOM, BufferClusteringBound.TOP); - ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter( - Slices.with(cfs.metadata().comparator, slice), false); + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getCount()); + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(4, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } - for (String[][] group : groups) + /** + * Test purgeable tombstones count for range tombstones with overlapping ranges + */ + @Test + public void testCountPurgeableRangeTombstones_overlappingRanges() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF14); + + TestWriteOperation[] operations = new TestWriteOperation[] + { + TestWriteOperation.insert("key1", "aa", "a"), + TestWriteOperation.insert("key1", "ff", "f"), + + TestWriteOperation.insert("key2", "aa", "a"), + TestWriteOperation.insert("key2", "bb", "b"), + TestWriteOperation.insert("key2", "cc", "c"), + TestWriteOperation.insert("key2", "dd", "d"), + TestWriteOperation.insert("key2", "ee", "e"), + + // this range tombstone is non-purgeable and overlaps with the next one, + // so it will create one non-purgeable bound marker + // and one non-purgeable boundary marker so TWO NON-PURGEABLE tombstones + TestWriteOperation.deleteRange("key2", "aa", "bb", NEW_DELETION), + + // this range tombstone is purgeable and overlaps with previous and next ones, + // so it will create one non-purgeable bound marker + // and one non-purgeable boundary marker so TWO non-purgeable tombstones will be counted + TestWriteOperation.deleteRange("key2", "bb", "ee", PURGEABLE_DELETION), + + // this range tombstone is purgeable and overlaps with previous one, + // it has a different deletion time to not combine into a single range, + // so it will create one non-purgeable boundary marker (same as previous one) + // and one purgeable bound marker, so it will increment purgeable tombstones counter on one, + // we expect TWO purgeable tombstones in total + TestWriteOperation.deleteRange("key2", "ee", "ff", PURGEABLE_DELETION - 1) + }; + + runTestWriteOperationsAndReadResults(cfs, operations); + + assertEquals(0, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMin()); + assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + } + + + private static void runTestWriteOperationsAndReadResults(ColumnFamilyStore cfs, TestWriteOperation[] operations) throws IOException + { + try + { + Set usedPartitionKeys = runWriteOperations(cfs, operations); + runPartitionReadCommands(cfs, usedPartitionKeys); + } + finally { cfs.truncateBlocking(); + } + } - List commands = new ArrayList<>(group.length); + private static void runPartitionReadCommands(ColumnFamilyStore cfs, Set partitionKeys) throws IOException + { + List commands = new ArrayList<>(partitionKeys.size()); + for (String partitionKey : partitionKeys) + { + commands.add(getWholePartitionReadCommand(cfs, partitionKey)); + } + executeReadCommands(commands); + } - for (String[] data : group) + private static Set runWriteOperations(ColumnFamilyStore cfs, TestWriteOperation[] operations) + { + Set usedPartitionKeys = new HashSet<>(); + for (TestWriteOperation operation : operations) + { + if (operation.type == OperationType.CREATE) { - new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[0])) - .clustering(data[1]) - .add(data[2], ByteBufferUtil.bytes("blah")) + new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(operation.partitionKey)) + .clustering(operation.clusteringKey) + .add(operation.columnName, ByteBufferUtil.bytes(operation.columnValue)) .build() .apply(); - - commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter, - DataLimits.NONE, Util.dk(data[0]), sliceFilter)); } - - new RowUpdateBuilder(cfs.metadata(), FBUtilities.nowInSeconds(), 0L, ByteBufferUtil.bytes("key2")) - .addRangeTombstone("aa", "bb").build().apply(); // <-- this range tombstone is non-purgeable and doesn't overlap with other ranges - // so it will create two markers which will be counted as TWO non-purgeable tombstones - - new RowUpdateBuilder(cfs.metadata(), 42, 0L, ByteBufferUtil.bytes("key2")) - .addRangeTombstone("dd", "ee").build().apply(); // <-- this range tombstone is purgeable and doesn't overlap with other ranges - // so it will create two markers which will be counted as TWO purgeable tombstones - - new RowUpdateBuilder(cfs.metadata(), 42, 0L, ByteBufferUtil.bytes("key2")) - .addRangeTombstone("ff", "ff").build().apply(); // <-- this one too so there would be FOUR purgeable tombstones in total - - Util.flush(cfs); - - ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE); - - try (ReadExecutionController executionController = query.executionController(); - UnfilteredPartitionIterator iter = query.executeLocally(executionController); - DataOutputBuffer buffer = new DataOutputBuffer()) + else if (operation.type == OperationType.DELETE_PARTITION) { - UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, - columnFilter, - buffer, - MessagingService.current_version); - buffers.add(buffer.buffer()); + new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), ByteBufferUtil.bytes(operation.partitionKey)) + .nowInSec(operation.deletionTime) + .delete() + .build()).apply(); + } + else if (operation.type == OperationType.DELETE_RANGE) + { + new RowUpdateBuilder(cfs.metadata(), operation.deletionTime, 0L, ByteBufferUtil.bytes(operation.partitionKey)) + .addRangeTombstone(operation.clusteringRangeStart, operation.clusteringRangeEnd).build().apply(); + } + else if (operation.type == OperationType.DELETE_ROW) + { + RowUpdateBuilder.deleteRowAt(cfs.metadata(), 0, operation.deletionTime, + ByteBufferUtil.bytes(operation.partitionKey), operation.clusteringKey + ).apply(); + } + else if (operation.type == OperationType.DELETE_CELL) + { + new RowUpdateBuilder(cfs.metadata(), operation.deletionTime, 0L, ByteBufferUtil.bytes(operation.partitionKey)) + .clustering(operation.clusteringKey) + .delete(operation.columnName) + .build().apply(); } + + usedPartitionKeys.add(operation.partitionKey); } + return usedPartitionKeys; + } - assertEquals(2, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); - assertEquals(4, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + private static final long NEW_DELETION = FBUtilities.nowInSeconds(); + private static final long PURGEABLE_DELETION = 42; + + private enum OperationType + { + CREATE, + DELETE_PARTITION, + DELETE_RANGE, + DELETE_ROW, + DELETE_CELL } - /** - * Test purgeable tombstones count for range tombstones with overlapping ranges, - * i.e. both Bound and Boundary Markers will be created and counted - */ - @Test - public void testCountPurgeableRangeTombstones_overlappingRanges() throws Exception + private static class TestWriteOperation { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF12); + OperationType type; + String partitionKey; + String clusteringKey; - String[][][] groups = new String[][][] { - new String[][] { - new String[] { "key1", "aa", "a" }, - new String[] { "key2", "bb", "b" }, - new String[] { "key3", "cc", "c" } - }, - new String[][] { - new String[] { "key3", "dd", "d" }, - new String[] { "key2", "ee", "e" }, - new String[] { "key1", "ff", "f" } - }, - new String[][] { - new String[] { "key6", "aa", "a" }, - new String[] { "key5", "bb", "b" }, - new String[] { "key4", "cc", "c" } - }, - new String[][] { - new String[] { "key2", "aa", "a" }, - new String[] { "key2", "cc", "c" }, - new String[] { "key2", "dd", "d" } - }, - }; + String clusteringRangeStart, clusteringRangeEnd; + String columnName; + String columnValue = "bla"; - List buffers = new ArrayList<>(groups.length); - long nowInSeconds = FBUtilities.nowInSeconds(); - ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build(); - RowFilter rowFilter = RowFilter.create(true); - Slice slice = Slice.make(BufferClusteringBound.BOTTOM, BufferClusteringBound.TOP); - ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter( - Slices.with(cfs.metadata().comparator, slice), false); + long deletionTime; - for (String[][] group : groups) + public TestWriteOperation(OperationType type, String partitionKey, + String clusteringKey, String clusteringRangeStart, String clusteringRangeEnd, + String columnName, long deletionTime) { - cfs.truncateBlocking(); + this.type = type; + this.partitionKey = partitionKey; + this.clusteringKey = clusteringKey; + this.clusteringRangeStart = clusteringRangeStart; + this.clusteringRangeEnd = clusteringRangeEnd; + this.columnName = columnName; + this.deletionTime = deletionTime; + } - List commands = new ArrayList<>(group.length); + public static TestWriteOperation insert(String partitionKey, String clusteringKey, + String columnName) + { + return new TestWriteOperation(OperationType.CREATE, partitionKey, clusteringKey, null, null, columnName, 0); + } - for (String[] data : group) - { - new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[0])) - .clustering(data[1]) - .add(data[2], ByteBufferUtil.bytes("blah")) - .build() - .apply(); + public static TestWriteOperation deletePartition(String partitionKey, long deletionTime) + { + return new TestWriteOperation(OperationType.DELETE_PARTITION, partitionKey, + null, null, null, null, deletionTime); + } - commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter, - DataLimits.NONE, Util.dk(data[0]), sliceFilter)); - } + public static TestWriteOperation deleteRange(String partitionKey, String clusteringRangeStart, String clusteringRangeEnd, long deletionTime) + { + return new TestWriteOperation(OperationType.DELETE_RANGE, partitionKey, + null, clusteringRangeStart, clusteringRangeEnd, null, deletionTime); + } - new RowUpdateBuilder(cfs.metadata(), FBUtilities.nowInSeconds(), 0L, ByteBufferUtil.bytes("key2")) - .addRangeTombstone("aa", "bb").build().apply(); // <-- this range tombstone is non-purgeable and overlaps with the next one - // so it will create one non-purgeable bound marker - // and one non-purgeable boundary marker so TWO non-purgeable tombstones will be counted + public static TestWriteOperation deleteRow(String partitionKey, String clusteringKey, long deletionTime) + { + return new TestWriteOperation(OperationType.DELETE_ROW, partitionKey, clusteringKey, + null, null, null, deletionTime); + } - new RowUpdateBuilder(cfs.metadata(), 42, 0L, ByteBufferUtil.bytes("key2")) - .addRangeTombstone("bb", "ee").build().apply(); // <-- this range tombstone is purgeable and overlaps with previous and next ones - // so it will create one non-purgeable boundary marker (same as previous one) - // and one purgeable boundary marker, so it will increment purgeable tombstones counter on one + public static TestWriteOperation deleteCell(String partitionKey, String clusteringKey, String columnName, long deletionTime) + { + return new TestWriteOperation(OperationType.DELETE_CELL, partitionKey, clusteringKey, + null, null, columnName, deletionTime); + } - new RowUpdateBuilder(cfs.metadata(), 52, 0L, ByteBufferUtil.bytes("key2")) - .addRangeTombstone("ee", "ff").build().apply(); // <-- this range tombstone is purgeable and overlaps with previous one - // so it will create one non-purgeable boundary marker (same as previous one) - // and one purgeable bound marker, so it will increment purgeable tombstones counter on one, - // so we expect TWO purgeable tombstones in total - Util.flush(cfs); + } - ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE); + private static void executeReadCommands(List commands) throws IOException + { + ReadQuery query = SinglePartitionReadCommand.Group.create(commands, DataLimits.NONE); - try (ReadExecutionController executionController = query.executionController(); - UnfilteredPartitionIterator iter = query.executeLocally(executionController); - DataOutputBuffer buffer = new DataOutputBuffer()) - { - UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, - columnFilter, - buffer, - MessagingService.current_version); - buffers.add(buffer.buffer()); - } + try (ReadExecutionController executionController = query.executionController(); + UnfilteredPartitionIterator iter = query.executeLocally(executionController); + DataOutputBuffer buffer = new DataOutputBuffer()) + { + UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, + query.columnFilter(), + buffer, + MessagingService.current_version); } + } - - assertEquals(2, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax()); - assertEquals(2, cfs.metric.purgeableTombstoneScannedHistogram.cf.getSnapshot().getMax()); + private static SinglePartitionReadCommand getWholePartitionReadCommand(ColumnFamilyStore cfs, String partitionKey) + { + long nowInSeconds = FBUtilities.nowInSeconds(); + ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata(), false).build(); + RowFilter rowFilter = RowFilter.create(true); + Slice slice = Slice.make(BufferClusteringBound.BOTTOM, BufferClusteringBound.TOP); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata().comparator, slice), false); + return SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, + columnFilter, rowFilter, + DataLimits.NONE, Util.dk(partitionKey), sliceFilter); } @Test