diff --git a/utilities/secondary_index/secondary_index_mixin.h b/utilities/secondary_index/secondary_index_mixin.h index 6f5f85384a3..02c5d4c14a9 100644 --- a/utilities/secondary_index/secondary_index_mixin.h +++ b/utilities/secondary_index/secondary_index_mixin.h @@ -35,15 +35,23 @@ class SecondaryIndexMixin : public Txn { } using Txn::Put; - Status Put(ColumnFamilyHandle* /* column_family */, const Slice& /* key */, - const Slice& /* value */, - const bool /* assume_tracked */ = false) override { - return Status::NotSupported("Put with secondary indices not yet supported"); + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, const bool assume_tracked = false) override { + return PerformWithSavePoint([&]() { + const bool do_validate = !assume_tracked; + return PutWithSecondaryIndices(column_family, key, value, do_validate); + }); } - Status Put(ColumnFamilyHandle* /* column_family */, - const SliceParts& /* key */, const SliceParts& /* value */, - const bool /* assume_tracked */ = false) override { - return Status::NotSupported("Put with secondary indices not yet supported"); + Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value, + const bool assume_tracked = false) override { + std::string key_str; + const Slice key_slice(key, &key_str); + + std::string value_str; + const Slice value_slice(value, &value_str); + + return Put(column_family, key_slice, value_slice, assume_tracked); } Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key, @@ -92,17 +100,22 @@ class SecondaryIndexMixin : public Txn { } using Txn::PutUntracked; - Status PutUntracked(ColumnFamilyHandle* /* column_family */, - const Slice& /* key */, - const Slice& /* value */) override { - return Status::NotSupported( - "PutUntracked with secondary indices not yet supported"); + Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override { + return PerformWithSavePoint([&]() { + constexpr bool do_validate = false; + return PutWithSecondaryIndices(column_family, key, value, do_validate); + }); } - Status PutUntracked(ColumnFamilyHandle* /* column_family */, - const SliceParts& /* key */, - const SliceParts& /* value */) override { - return Status::NotSupported( - "PutUntracked with secondary indices not yet supported"); + Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override { + std::string key_str; + const Slice key_slice(key, &key_str); + + std::string value_str; + const Slice value_slice(value, &value_str); + + return PutUntracked(column_family, key_slice, value_slice); } Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key, @@ -179,21 +192,6 @@ class SecondaryIndexMixin : public Txn { var); } - template - static Iterator FindPrimaryColumn(const SecondaryIndex* secondary_index, - ColumnFamilyHandle* column_family, - Iterator begin, Iterator end) { - assert(secondary_index); - assert(column_family); - - if (column_family != secondary_index->GetPrimaryColumnFamily()) { - return end; - } - - return WideColumnsHelper::Find(begin, end, - secondary_index->GetPrimaryColumnName()); - } - template Status PerformWithSavePoint(Operation&& operation) { Txn::SetSavePoint(); @@ -247,6 +245,15 @@ class SecondaryIndexMixin : public Txn { secondary_key); } + Status AddPrimaryEntry(ColumnFamilyHandle* column_family, + const Slice& primary_key, const Slice& primary_value) { + assert(column_family); + + constexpr bool assume_tracked = true; + + return Txn::Put(column_family, primary_key, primary_value, assume_tracked); + } + Status AddPrimaryEntry(ColumnFamilyHandle* column_family, const Slice& primary_key, const WideColumns& primary_columns) { @@ -317,9 +324,15 @@ class SecondaryIndexMixin : public Txn { const auto& existing_columns = existing_primary_columns.columns(); for (const auto& secondary_index : *secondary_indices_) { - const auto it = - FindPrimaryColumn(secondary_index.get(), column_family, - existing_columns.cbegin(), existing_columns.cend()); + assert(secondary_index); + + if (secondary_index->GetPrimaryColumnFamily() != column_family) { + continue; + } + + const auto it = WideColumnsHelper::Find( + existing_columns.cbegin(), existing_columns.cend(), + secondary_index->GetPrimaryColumnName()); if (it == existing_columns.cend()) { continue; } @@ -334,18 +347,67 @@ class SecondaryIndexMixin : public Txn { return Status::OK(); } + Status UpdatePrimaryColumnValues(ColumnFamilyHandle* column_family, + const Slice& primary_key, + Slice& primary_value, + autovector& applicable_indices) { + assert(column_family); + assert(applicable_indices.empty()); + + applicable_indices.reserve(secondary_indices_->size()); + + for (const auto& secondary_index : *secondary_indices_) { + assert(secondary_index); + + if (secondary_index->GetPrimaryColumnFamily() != column_family) { + continue; + } + + if (secondary_index->GetPrimaryColumnName() != kDefaultWideColumnName) { + continue; + } + + applicable_indices.emplace_back( + IndexData(secondary_index.get(), primary_value)); + + auto& index_data = applicable_indices.back(); + + const Status s = secondary_index->UpdatePrimaryColumnValue( + primary_key, index_data.previous_column_value(), + &index_data.updated_column_value()); + if (!s.ok()) { + return s; + } + + primary_value = index_data.primary_column_value(); + } + + return Status::OK(); + } + Status UpdatePrimaryColumnValues(ColumnFamilyHandle* column_family, const Slice& primary_key, WideColumns& primary_columns, autovector& applicable_indices) { + assert(column_family); assert(applicable_indices.empty()); + // TODO: as an optimization, we can avoid calling SortColumns a second time + // in WriteBatchInternal::PutEntity + WideColumnsHelper::SortColumns(primary_columns); + applicable_indices.reserve(secondary_indices_->size()); for (const auto& secondary_index : *secondary_indices_) { - const auto it = - FindPrimaryColumn(secondary_index.get(), column_family, - primary_columns.begin(), primary_columns.end()); + assert(secondary_index); + + if (secondary_index->GetPrimaryColumnFamily() != column_family) { + continue; + } + + const auto it = WideColumnsHelper::Find( + primary_columns.begin(), primary_columns.end(), + secondary_index->GetPrimaryColumnName()); if (it == primary_columns.end()) { continue; } @@ -382,10 +444,11 @@ class SecondaryIndexMixin : public Txn { return Status::OK(); } - Status PutEntityWithSecondaryIndices(ColumnFamilyHandle* column_family, - const Slice& key, - const WideColumns& columns, - bool do_validate) { + template + Status PutWithSecondaryIndicesImpl(ColumnFamilyHandle* column_family, + const Slice& key, + const Value& value_or_columns, + bool do_validate) { // TODO: we could avoid removing and recreating secondary entries for // which neither the secondary key prefix nor the value has changed @@ -403,14 +466,13 @@ class SecondaryIndexMixin : public Txn { } } + auto primary_value_or_columns = value_or_columns; autovector applicable_indices; - WideColumns primary_columns(columns); - WideColumnsHelper::SortColumns(primary_columns); - { - const Status s = UpdatePrimaryColumnValues( - column_family, primary_key, primary_columns, applicable_indices); + const Status s = UpdatePrimaryColumnValues(column_family, primary_key, + primary_value_or_columns, + applicable_indices); if (!s.ok()) { return s; } @@ -418,7 +480,7 @@ class SecondaryIndexMixin : public Txn { { const Status s = - AddPrimaryEntry(column_family, primary_key, primary_columns); + AddPrimaryEntry(column_family, primary_key, primary_value_or_columns); if (!s.ok()) { return s; } @@ -434,6 +496,20 @@ class SecondaryIndexMixin : public Txn { return Status::OK(); } + Status PutWithSecondaryIndices(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value, + bool do_validate) { + return PutWithSecondaryIndicesImpl(column_family, key, value, do_validate); + } + + Status PutEntityWithSecondaryIndices(ColumnFamilyHandle* column_family, + const Slice& key, + const WideColumns& columns, + bool do_validate) { + return PutWithSecondaryIndicesImpl(column_family, key, columns, + do_validate); + } + const std::vector>* secondary_indices_; }; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 7ac4255bc34..92dcf1a86a3 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -8008,7 +8008,324 @@ TEST_P(TransactionTest, AttributeGroupIteratorSanityChecks) { } } -TEST_P(TransactionTest, SecondaryIndex) { +TEST_P(TransactionTest, SecondaryIndexPut) { + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { + ROCKSDB_GTEST_BYPASS("Test only WriteCommitted for now"); + return; + } + + // A basic secondary index that indexes the default column. + class DefaultSecondaryIndex : public SecondaryIndex { + public: + void SetPrimaryColumnFamily(ColumnFamilyHandle* cfh) override { + primary_cfh_ = cfh; + } + + void SetSecondaryColumnFamily(ColumnFamilyHandle* cfh) override { + secondary_cfh_ = cfh; + } + + ColumnFamilyHandle* GetPrimaryColumnFamily() const override { + return primary_cfh_; + } + + ColumnFamilyHandle* GetSecondaryColumnFamily() const override { + return secondary_cfh_; + } + + Slice GetPrimaryColumnName() const override { + return kDefaultWideColumnName; + } + + Status UpdatePrimaryColumnValue( + const Slice& /* primary_key */, const Slice& /* primary_column_value */, + std::optional< + std::variant>* /* updated_column_value */) + const override { + return Status::OK(); + } + + Status GetSecondaryKeyPrefix( + const Slice& primary_column_value, + std::variant* secondary_key_prefix) const override { + assert(secondary_key_prefix); + + std::string prefix; + PutLengthPrefixedSlice(&prefix, primary_column_value); + + *secondary_key_prefix = std::move(prefix); + + return Status::OK(); + } + + Status GetSecondaryValue(const Slice& /* primary_column_value */, + const Slice& /* previous_column_value */, + std::optional>* + /* secondary_value */) const override { + return Status::OK(); + } + + std::unique_ptr NewIterator( + const SecondaryIndexReadOptions& /* read_options */, + std::unique_ptr&& underlying_it) const override { + return std::make_unique(this, + std::move(underlying_it)); + } + + private: + ColumnFamilyHandle* primary_cfh_{}; + ColumnFamilyHandle* secondary_cfh_{}; + }; + + txn_db_options.secondary_indices.emplace_back( + std::make_shared()); + + ASSERT_OK(ReOpen()); + + ColumnFamilyOptions cf1_opts; + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf1_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyOptions cf2_opts; + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf2_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + auto& index = txn_db_options.secondary_indices.back(); + index->SetPrimaryColumnFamily(cfh1); + index->SetSecondaryColumnFamily(cfh2); + + { + std::unique_ptr txn(db->BeginTransaction(WriteOptions())); + + // Default CF => OK but not indexed + ASSERT_OK(txn->Put(db->DefaultColumnFamily(), "key0", "foo")); + + // CF1 but no default column => OK but not indexed + ASSERT_OK(txn->PutEntity(cfh1, "key1", {{"hello", "world"}})); + + // CF1, "bar" in the default column => OK and indexed + ASSERT_OK(txn->Put(cfh1, "key2", "bar")); + + // CF1, "baz" in the default column => OK and indexed + ASSERT_OK(txn->Put(cfh1, "key3", "baz")); + + ASSERT_OK(txn->Commit()); + } + + // Expected keys: "key0" in the default CF; "key1", "key2", "key3" in CF1; + // secondary index entries for "key2" and "key3" in CF2 + { + std::unique_ptr it( + db->NewIterator(ReadOptions(), db->DefaultColumnFamily())); + + it->SeekToFirst(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "key0"); + ASSERT_EQ(it->value(), "foo"); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } + + { + std::unique_ptr it(db->NewIterator(ReadOptions(), cfh1)); + + it->SeekToFirst(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "key1"); + WideColumns expected1{{"hello", "world"}}; + ASSERT_EQ(it->columns(), expected1); + + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "key2"); + ASSERT_EQ(it->value(), "bar"); + + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "key3"); + ASSERT_EQ(it->value(), "baz"); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } + + { + // Read the raw secondary index entries from CF2 + std::unique_ptr it(db->NewIterator(ReadOptions(), cfh2)); + + it->SeekToFirst(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "\3barkey2"); + ASSERT_TRUE(it->value().empty()); + + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "\3bazkey3"); + ASSERT_TRUE(it->value().empty()); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } + + { + // Query the secondary index + std::unique_ptr underlying_it( + db->NewIterator(ReadOptions(), cfh2)); + std::unique_ptr it(index->NewIterator(SecondaryIndexReadOptions(), + std::move(underlying_it))); + + it->SeekToFirst(); + ASSERT_FALSE(it->Valid()); + ASSERT_TRUE(it->status().IsNotSupported()); + + it->SeekToLast(); + ASSERT_FALSE(it->Valid()); + ASSERT_TRUE(it->status().IsNotSupported()); + + it->SeekForPrev("bar"); + ASSERT_FALSE(it->Valid()); + ASSERT_TRUE(it->status().IsNotSupported()); + + it->Seek("bar"); + ASSERT_TRUE(it->Valid()); + ASSERT_OK(it->status()); + ASSERT_EQ(it->key(), "key2"); + ASSERT_TRUE(it->value().empty()); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + + it->Seek("baz"); + ASSERT_TRUE(it->Valid()); + ASSERT_OK(it->status()); + ASSERT_EQ(it->key(), "key3"); + ASSERT_TRUE(it->value().empty()); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } + + // Make some updates to the key-values indexed above through the database + // interface (i.e. using implicit transactions) + + // Add a default column to "key1" which previously had none + ASSERT_OK( + db->PutEntity(WriteOptions(), cfh1, "key1", + {{"hello", "world"}, {kDefaultWideColumnName, "quux"}})); + + // Change the value of the default column in "key2" + ASSERT_OK(db->Put(WriteOptions(), cfh1, "key2", "quux")); + + // Remove the default column from "key3" + ASSERT_OK(db->PutEntity(WriteOptions(), cfh1, "key3", {{"1", "2"}})); + + // Expected keys: "key0" in the default CF; "key1", "key2", "key3" in CF1; + // secondary index entries for "key1" and "key2" in CF2 + { + std::unique_ptr it( + db->NewIterator(ReadOptions(), db->DefaultColumnFamily())); + + it->SeekToFirst(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "key0"); + ASSERT_EQ(it->value(), "foo"); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } + + { + std::unique_ptr it(db->NewIterator(ReadOptions(), cfh1)); + + it->SeekToFirst(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "key1"); + WideColumns expected1{{kDefaultWideColumnName, "quux"}, {"hello", "world"}}; + ASSERT_EQ(it->columns(), expected1); + + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "key2"); + ASSERT_EQ(it->value(), "quux"); + + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "key3"); + WideColumns expected3{{"1", "2"}}; + ASSERT_EQ(it->columns(), expected3); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } + + { + // Read the raw secondary index entries from CF2 + std::unique_ptr it(db->NewIterator(ReadOptions(), cfh2)); + + it->SeekToFirst(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "\4quuxkey1"); + ASSERT_TRUE(it->value().empty()); + + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(it->key(), "\4quuxkey2"); + ASSERT_TRUE(it->value().empty()); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } + + { + // Query the secondary index + std::unique_ptr underlying_it( + db->NewIterator(ReadOptions(), cfh2)); + std::unique_ptr it(index->NewIterator(SecondaryIndexReadOptions(), + std::move(underlying_it))); + + it->SeekToFirst(); + ASSERT_FALSE(it->Valid()); + ASSERT_TRUE(it->status().IsNotSupported()); + + it->SeekToLast(); + ASSERT_FALSE(it->Valid()); + ASSERT_TRUE(it->status().IsNotSupported()); + + it->SeekForPrev("quux"); + ASSERT_FALSE(it->Valid()); + ASSERT_TRUE(it->status().IsNotSupported()); + + it->Seek("quux"); + ASSERT_TRUE(it->Valid()); + ASSERT_OK(it->status()); + ASSERT_EQ(it->key(), "key1"); + ASSERT_TRUE(it->value().empty()); + + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_OK(it->status()); + ASSERT_EQ(it->key(), "key2"); + ASSERT_TRUE(it->value().empty()); + + it->Next(); + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + } +} + +TEST_P(TransactionTest, SecondaryIndexPutEntity) { const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { ROCKSDB_GTEST_BYPASS("Test only WriteCommitted for now");