diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 85d1c039bd3..aa66f4b8399 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -229,23 +229,25 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, } if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && - ikey_.type != kTypeWideColumnEntity) { + ikey_.type != kTypeWideColumnEntity && + ikey_.type != kTypeTitanBlobIndex) { return true; } CompactionFilter::Decision decision = CompactionFilter::Decision::kUndetermined; - CompactionFilter::ValueType value_type = - ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue - : ikey_.type == kTypeBlobIndex - ? CompactionFilter::ValueType::kBlobIndex - : CompactionFilter::ValueType::kWideColumnEntity; + CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue; + if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeTitanBlobIndex) { + value_type = CompactionFilter::ValueType::kBlobIndex; + } else if (ikey_.type == kTypeWideColumnEntity) { + value_type = CompactionFilter::ValueType::kWideColumnEntity; + } // Hack: pass internal key to BlobIndexCompactionFilter since it needs // to get sequence number. assert(compaction_filter_); const Slice& filter_key = - (ikey_.type != kTypeBlobIndex || + ((ikey_.type != kTypeBlobIndex && ikey_.type != kTypeTitanBlobIndex) || !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) ? ikey_.user_key : key_; @@ -618,7 +620,8 @@ void CompactionIterator::NextFromInput() { // not compact out. We will keep this Put, but can drop it's data. // (See Optimization 3, below.) if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && - ikey_.type != kTypeWideColumnEntity) { + ikey_.type != kTypeWideColumnEntity && + ikey_.type != kTypeTitanBlobIndex) { ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output", ikey_.DebugString(allow_data_in_errors_, true).c_str()); assert(false); @@ -632,7 +635,8 @@ void CompactionIterator::NextFromInput() { assert(false); } - if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) { + if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity || + ikey_.type == kTypeTitanBlobIndex) { ikey_.type = kTypeValue; current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); } @@ -798,7 +802,8 @@ void CompactionIterator::NextFromInput() { // happened if (next_ikey.type != kTypeValue && next_ikey.type != kTypeBlobIndex && - next_ikey.type != kTypeWideColumnEntity) { + next_ikey.type != kTypeWideColumnEntity && + next_ikey.type != kTypeTitanBlobIndex) { ++iter_stats_.num_single_del_mismatch; } diff --git a/db/db_iter.cc b/db/db_iter.cc index e3b9e6c2948..723bba6636c 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -394,6 +394,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, case kTypeValue: case kTypeBlobIndex: case kTypeWideColumnEntity: + case kTypeTitanBlobIndex: if (!PrepareValue()) { return false; } @@ -405,7 +406,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, !iter_.iter()->IsKeyPinned() /* copy */); } - if (ikey_.type == kTypeBlobIndex) { + if (ikey_.type == kTypeBlobIndex || + ikey_.type == kTypeTitanBlobIndex) { if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) { return false; } @@ -593,7 +595,8 @@ bool DBIter::MergeValuesNewToOld() { merge_context_.PushOperand( iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); - } else if (kTypeBlobIndex == ikey.type) { + } else if (kTypeBlobIndex == ikey.type || + kTypeTitanBlobIndex == ikey.type) { if (expose_blob_index_) { status_ = Status::NotSupported("BlobDB does not support merge operator."); @@ -919,6 +922,7 @@ bool DBIter::FindValueForCurrentKey() { case kTypeValue: case kTypeBlobIndex: case kTypeWideColumnEntity: + case kTypeTitanBlobIndex: if (iter_.iter()->IsValuePinned()) { pinned_value_ = iter_.value(); } else { @@ -1004,7 +1008,8 @@ bool DBIter::FindValueForCurrentKey() { return false; } return true; - } else if (last_not_merge_type == kTypeBlobIndex) { + } else if (last_not_merge_type == kTypeBlobIndex || + last_not_merge_type == kTypeTitanBlobIndex) { if (expose_blob_index_) { status_ = Status::NotSupported("BlobDB does not support merge operator."); @@ -1041,6 +1046,7 @@ bool DBIter::FindValueForCurrentKey() { SetValueAndColumnsFromPlain(pinned_value_); break; + case kTypeTitanBlobIndex: case kTypeBlobIndex: if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) { return false; @@ -1143,10 +1149,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { saved_timestamp_.assign(ts.data(), ts.size()); } if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex || - ikey.type == kTypeWideColumnEntity) { + ikey.type == kTypeWideColumnEntity || ikey.type == kTypeTitanBlobIndex) { assert(iter_.iter()->IsValuePinned()); pinned_value_ = iter_.value(); - if (ikey.type == kTypeBlobIndex) { + if (ikey.type == kTypeBlobIndex || ikey.type == kTypeTitanBlobIndex) { if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) { return false; } @@ -1213,7 +1219,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { merge_context_.PushOperand( iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); - } else if (ikey.type == kTypeBlobIndex) { + } else if (ikey.type == kTypeBlobIndex || + ikey.type == kTypeTitanBlobIndex) { if (expose_blob_index_) { status_ = Status::NotSupported("BlobDB does not support merge operator."); diff --git a/db/dbformat.h b/db/dbformat.h index ab632a229e3..17e6974de7a 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -53,8 +53,15 @@ enum ValueType : unsigned char { kTypeNoop = 0xD, // WAL only. kTypeColumnFamilyRangeDeletion = 0xE, // WAL only. kTypeRangeDeletion = 0xF, // meta block - kTypeColumnFamilyBlobIndex = 0x10, // Blob DB only - kTypeTitanBlobIndex = 0x11, // Titan Blob DB only + // Titan has been using 0x10 and 0x11 (originally kTypeColumnFamilyBlobIndex + // and kTypeBlobIndex) for its own purposes. However, Titan and RocksDB native + // Blob DB diverged in their implementations, and RocksDB sometimes try to + // parse kTypeBlobIndex regardless of whether it is a native Blob DB or + // stackable implementation, thus causing issues. To avoid potential + // conflicts, we are reassigning these two values to new types. So that Titan + // does not need to migrate existing use cases. + kTypeTitanColumnFamilyBlobIndex = 0x10, // Titan Blob DB only + kTypeTitanBlobIndex = 0x11, // Titan Blob DB only // When the prepared record is also persisted in db, we use a different // record. This is to ensure that the WAL that is generated by a WritePolicy // is not mistakenly read by another, which would result into data @@ -68,7 +75,11 @@ enum ValueType : unsigned char { kTypeCommitXIDAndTimestamp = 0x15, // WAL only kTypeWideColumnEntity = 0x16, kTypeColumnFamilyWideColumnEntity = 0x17, // WAL only - kTypeBlobIndex = 0x18, // RocksDB native Blob DB only + // Moving kTypeColumnFamilyBlobIndex and kTypeBlobIndex (originally 0x10 and + // 0x11) to 0x18 and 0x19 respectively to avoid potential conflicts with + // Titan. See comments above for more details. + kTypeColumnFamilyBlobIndex = 0x18, // RocksDB native Blob DB only + kTypeBlobIndex = 0x19, // RocksDB native Blob DB only kTypeMaxValid, // Should be after the last valid type, only used for // validation kMaxValue = 0x7F // Not used for storing records. @@ -82,7 +93,8 @@ extern const ValueType kValueTypeForSeekForPrev; // (i.e. a type used in memtable skiplist and sst file datablock). inline bool IsValueType(ValueType t) { return t <= kTypeMerge || kTypeSingleDeletion == t || kTypeBlobIndex == t || - kTypeDeletionWithTimestamp == t || kTypeWideColumnEntity == t; + kTypeDeletionWithTimestamp == t || kTypeWideColumnEntity == t || + kTypeTitanBlobIndex == t; } // Checks whether a type is from user operation diff --git a/db/memtable.cc b/db/memtable.cc index 0b8786bc2ff..4c3275ae923 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -980,12 +980,14 @@ static bool SaveValue(void* arg, const char* entry) { } if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || - type == kTypeWideColumnEntity || type == kTypeDeletion || - type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp) && + type == kTypeTitanBlobIndex || type == kTypeWideColumnEntity || + type == kTypeDeletion || type == kTypeSingleDeletion || + type == kTypeDeletionWithTimestamp) && max_covering_tombstone_seq > seq) { type = kTypeRangeDeletion; } switch (type) { + case kTypeTitanBlobIndex: case kTypeBlobIndex: { if (!s->do_merge) { *(s->status) = Status::NotSupported( diff --git a/db/write_batch.cc b/db/write_batch.cc index 0b55cb4aae5..1e2e33ff641 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -131,6 +131,11 @@ struct BatchContentClassifier : public WriteBatch::Handler { return Status::OK(); } + Status PutTitanBlobIndexCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_BLOB_INDEX; + return Status::OK(); + } + Status MarkBeginPrepare(bool unprepare) override { content_flags |= ContentFlags::HAS_BEGIN_PREPARE; if (unprepare) { @@ -413,11 +418,13 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, } break; case kTypeColumnFamilyBlobIndex: + case kTypeTitanColumnFamilyBlobIndex: if (!GetVarint32(input, column_family)) { return Status::Corruption("bad WriteBatch BlobIndex"); } FALLTHROUGH_INTENDED; case kTypeBlobIndex: + case kTypeTitanBlobIndex: if (!GetLengthPrefixedSlice(input, key) || !GetLengthPrefixedSlice(input, value)) { return Status::Corruption("bad WriteBatch BlobIndex"); @@ -595,6 +602,15 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, found++; } break; + case kTypeTitanColumnFamilyBlobIndex: + case kTypeTitanBlobIndex: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); + s = handler->PutTitanBlobIndexCF(column_family, key, value); + if (LIKELY(s.ok())) { + found++; + } + break; case kTypeLogData: handler->LogData(blob); // A batch might have nothing but LogData. It is still a batch. @@ -1616,6 +1632,34 @@ Status WriteBatchInternal::PutBlobIndex(WriteBatch* b, return save.commit(); } +Status WriteBatchInternal::PutTitanBlobIndex(WriteBatch* b, + uint32_t column_family_id, + const Slice& key, + const Slice& value) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeTitanBlobIndex)); + } else { + b->rep_.push_back(static_cast(kTypeTitanColumnFamilyBlobIndex)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BLOB_INDEX, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, value, kTypeTitanBlobIndex) + .ProtectC(column_family_id)); + } + return save.commit(); +} + Status WriteBatch::PutLogData(const Slice& blob) { LocalSavePoint save(this); rep_.push_back(static_cast(kTypeLogData)); @@ -1736,6 +1780,10 @@ Status WriteBatch::VerifyChecksum() const { case kTypeBlobIndex: tag = kTypeBlobIndex; break; + case kTypeTitanColumnFamilyBlobIndex: + case kTypeTitanBlobIndex: + tag = kTypeTitanBlobIndex; + break; case kTypeLogData: case kTypeBeginPrepareXID: case kTypeEndPrepareXID: @@ -2654,6 +2702,27 @@ class MemTableInserter : public WriteBatch::Handler { return ret_status; } + Status PutTitanBlobIndexCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + const auto* kv_prot_info = NextProtectionInfo(); + Status ret_status; + if (kv_prot_info != nullptr) { + // Memtable needs seqno, doesn't need CF ID + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + // Same as PutCF except for value type. + ret_status = PutCFImpl(column_family_id, key, value, kTypeTitanBlobIndex, + &mem_kv_prot_info); + } else { + ret_status = PutCFImpl(column_family_id, key, value, kTypeTitanBlobIndex, + nullptr /* kv_prot_info */); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + void CheckMemtableFull() { if (flush_scheduler_ != nullptr) { auto* cfd = cf_mems_->current(); @@ -3060,6 +3129,11 @@ class ProtectionInfoUpdater : public WriteBatch::Handler { return UpdateProtInfo(cf, key, val, kTypeBlobIndex); } + Status PutTitanBlobIndexCF(uint32_t cf, const Slice& key, + const Slice& val) override { + return UpdateProtInfo(cf, key, val, kTypeTitanBlobIndex); + } + Status MarkBeginPrepare(bool /* unprepare */) override { return Status::OK(); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index fcae19f0cb2..43fdc75d7e9 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -11,6 +11,7 @@ #include #include +#include "db/dbformat.h" #include "db/flush_scheduler.h" #include "db/kv_checksum.h" #include "db/trim_history_scheduler.h" @@ -118,6 +119,9 @@ class WriteBatchInternal { static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, const Slice& key, const Slice& value); + static Status PutTitanBlobIndex(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid, const bool write_after_commit = true, const bool unprepared_batch = false); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 7740a65968b..3eaf9fe7920 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -318,6 +318,12 @@ class WriteBatch : public WriteBatchBase { return Status::InvalidArgument("PutBlobIndexCF not implemented"); } + virtual Status PutTitanBlobIndexCF(uint32_t /*column_family_id*/, + const Slice& /*key*/, + const Slice& /*value*/) { + return Status::InvalidArgument("PutTitanBlobIndexCF not implemented"); + } + // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 95fc36ae12f..56197f85436 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -12,7 +12,6 @@ #include "monitoring/statistics_impl.h" #include "rocksdb/convenience.h" #include "rocksdb/utilities/customizable_util.h" -#include "rocksdb/utilities/options_type.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { @@ -376,219 +375,4 @@ Status Statistics::CreateFromString(const ConfigOptions& config_options, return s; } -static std::unordered_map stats_type_info = { - {"inner", OptionTypeInfo::AsCustomSharedPtr( - 0, OptionVerificationType::kByNameAllowFromNull, - OptionTypeFlags::kCompareNever)}, -}; - -template -StatisticsImpl::StatisticsImpl( - std::shared_ptr stats) - : stats_(std::move(stats)) { - RegisterOptions("StatisticsOptions", &stats_, &stats_type_info); -} - -template -StatisticsImpl::~StatisticsImpl() = default; - -template -uint64_t StatisticsImpl::getTickerCount( - uint32_t tickerType) const { - MutexLock lock(&aggregate_lock_); - return getTickerCountLocked(tickerType); -} - -template -uint64_t StatisticsImpl::getTickerCountLocked( - uint32_t tickerType) const { - assert(tickerType < TICKER_MAX); - uint64_t res = 0; - for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { - res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType]; - } - return res; -} - -template -void StatisticsImpl::histogramData( - uint32_t histogramType, HistogramData* const data) const { - MutexLock lock(&aggregate_lock_); - getHistogramImplLocked(histogramType)->Data(data); -} - -template -std::unique_ptr -StatisticsImpl::getHistogramImplLocked( - uint32_t histogramType) const { - assert(histogramType < HISTOGRAM_MAX); - std::unique_ptr res_hist(new HistogramImpl()); - for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { - res_hist->Merge( - per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]); - } - return res_hist; -} - -template -std::string StatisticsImpl::getHistogramString( - uint32_t histogramType) const { - MutexLock lock(&aggregate_lock_); - return getHistogramImplLocked(histogramType)->ToString(); -} - -template -void StatisticsImpl::setTickerCount( - uint32_t tickerType, uint64_t count) { - { - MutexLock lock(&aggregate_lock_); - setTickerCountLocked(tickerType, count); - } - if (stats_ && tickerType < TICKER_MAX) { - stats_->setTickerCount(tickerType, count); - } -} - -template -void StatisticsImpl::setTickerCountLocked( - uint32_t tickerType, uint64_t count) { - assert(tickerType < TICKER_MAX); - for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { - if (core_idx == 0) { - per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count; - } else { - per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0; - } - } -} - -template -uint64_t StatisticsImpl::getAndResetTickerCount( - uint32_t tickerType) { - uint64_t sum = 0; - { - MutexLock lock(&aggregate_lock_); - assert(tickerType < TICKER_MAX); - for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { - sum += - per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange( - 0, std::memory_order_relaxed); - } - } - if (stats_ && tickerType < TICKER_MAX) { - stats_->setTickerCount(tickerType, 0); - } - return sum; -} - -template -void StatisticsImpl::recordTick(uint32_t tickerType, - uint64_t count) { - if (get_stats_level() <= StatsLevel::kExceptTickers) { - return; - } - if (tickerType < TICKER_MAX) { - per_core_stats_.Access()->tickers_[tickerType].fetch_add( - count, std::memory_order_relaxed); - if (stats_) { - stats_->recordTick(tickerType, count); - } - } else { - assert(false); - } -} - -template -void StatisticsImpl::recordInHistogram( - uint32_t histogramType, uint64_t value) { - assert(histogramType < HISTOGRAM_MAX); - if (get_stats_level() <= StatsLevel::kExceptHistogramOrTimers) { - return; - } - per_core_stats_.Access()->histograms_[histogramType].Add(value); - if (stats_ && histogramType < HISTOGRAM_MAX) { - stats_->recordInHistogram(histogramType, value); - } -} - -template -Status StatisticsImpl::Reset() { - MutexLock lock(&aggregate_lock_); - for (uint32_t i = 0; i < TICKER_MAX; ++i) { - setTickerCountLocked(i, 0); - } - for (uint32_t i = 0; i < HISTOGRAM_MAX; ++i) { - for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { - per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear(); - } - } - return Status::OK(); -} - -namespace { - -// a buffer size used for temp string buffers -const int kTmpStrBufferSize = 200; - -} // namespace - -template -std::string StatisticsImpl::ToString() const { - MutexLock lock(&aggregate_lock_); - std::string res; - res.reserve(20000); - for (const auto& t : TickersNameMap) { - assert(t.first < TICKER_MAX); - char buffer[kTmpStrBufferSize]; - snprintf(buffer, kTmpStrBufferSize, "%s COUNT : %" PRIu64 "\n", - t.second.c_str(), getTickerCountLocked(t.first)); - res.append(buffer); - } - for (const auto& h : HistogramsNameMap) { - assert(h.first < HISTOGRAM_MAX); - char buffer[kTmpStrBufferSize]; - HistogramData hData; - getHistogramImplLocked(h.first)->Data(&hData); - // don't handle failures - buffer should always be big enough and arguments - // should be provided correctly - int ret = - snprintf(buffer, kTmpStrBufferSize, - "%s P50 : %f P95 : %f P99 : %f P100 : %f COUNT : %" PRIu64 - " SUM : %" PRIu64 "\n", - h.second.c_str(), hData.median, hData.percentile95, - hData.percentile99, hData.max, hData.count, hData.sum); - if (ret < 0 || ret >= kTmpStrBufferSize) { - assert(false); - continue; - } - res.append(buffer); - } - res.shrink_to_fit(); - return res; -} - -template -bool StatisticsImpl::getTickerMap( - std::map* stats_map) const { - assert(stats_map); - if (!stats_map) { - return false; - } - stats_map->clear(); - MutexLock lock(&aggregate_lock_); - for (const auto& t : TickersNameMap) { - assert(t.first < TICKER_MAX); - (*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first); - } - return true; -} - -template -bool StatisticsImpl::HistEnabledForType( - uint32_t type) const { - return type < HISTOGRAM_MAX; -} - -template class StatisticsImpl; - } // namespace ROCKSDB_NAMESPACE diff --git a/monitoring/statistics_impl.h b/monitoring/statistics_impl.h index 754db76e927..7688a945119 100644 --- a/monitoring/statistics_impl.h +++ b/monitoring/statistics_impl.h @@ -5,6 +5,7 @@ // #pragma once #include +#include #include #include #include @@ -13,6 +14,7 @@ #include "port/likely.h" #include "port/port.h" #include "rocksdb/statistics.h" +#include "rocksdb/utilities/options_type.h" #include "util/core_local.h" #include "util/mutexlock.h" @@ -132,6 +134,219 @@ inline void SetTickerCount(Statistics* statistics, uint32_t ticker_type, } } +static std::unordered_map stats_type_info = { + {"inner", OptionTypeInfo::AsCustomSharedPtr( + 0, OptionVerificationType::kByNameAllowFromNull, + OptionTypeFlags::kCompareNever)}, +}; + +template +StatisticsImpl::StatisticsImpl( + std::shared_ptr stats) + : stats_(std::move(stats)) { + RegisterOptions("StatisticsOptions", &stats_, &stats_type_info); +} + +template +StatisticsImpl::~StatisticsImpl() = default; + +template +uint64_t StatisticsImpl::getTickerCount( + uint32_t tickerType) const { + MutexLock lock(&aggregate_lock_); + return getTickerCountLocked(tickerType); +} + +template +uint64_t StatisticsImpl::getTickerCountLocked( + uint32_t tickerType) const { + assert(tickerType < TICKER_MAX); + uint64_t res = 0; + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType]; + } + return res; +} + +template +void StatisticsImpl::histogramData( + uint32_t histogramType, HistogramData* const data) const { + MutexLock lock(&aggregate_lock_); + getHistogramImplLocked(histogramType)->Data(data); +} + +template +std::unique_ptr +StatisticsImpl::getHistogramImplLocked( + uint32_t histogramType) const { + assert(histogramType < HISTOGRAM_MAX); + std::unique_ptr res_hist(new HistogramImpl()); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + res_hist->Merge( + per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]); + } + return res_hist; +} + +template +std::string StatisticsImpl::getHistogramString( + uint32_t histogramType) const { + MutexLock lock(&aggregate_lock_); + return getHistogramImplLocked(histogramType)->ToString(); +} + +template +void StatisticsImpl::setTickerCount( + uint32_t tickerType, uint64_t count) { + { + MutexLock lock(&aggregate_lock_); + setTickerCountLocked(tickerType, count); + } + if (stats_ && tickerType < TICKER_MAX) { + stats_->setTickerCount(tickerType, count); + } +} + +template +void StatisticsImpl::setTickerCountLocked( + uint32_t tickerType, uint64_t count) { + assert(tickerType < TICKER_MAX); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + if (core_idx == 0) { + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count; + } else { + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0; + } + } +} + +template +uint64_t StatisticsImpl::getAndResetTickerCount( + uint32_t tickerType) { + uint64_t sum = 0; + { + MutexLock lock(&aggregate_lock_); + assert(tickerType < TICKER_MAX); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + sum += + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange( + 0, std::memory_order_relaxed); + } + } + if (stats_ && tickerType < TICKER_MAX) { + stats_->setTickerCount(tickerType, 0); + } + return sum; +} + +template +void StatisticsImpl::recordTick(uint32_t tickerType, + uint64_t count) { + if (get_stats_level() <= StatsLevel::kExceptTickers) { + return; + } + if (tickerType < TICKER_MAX) { + per_core_stats_.Access()->tickers_[tickerType].fetch_add( + count, std::memory_order_relaxed); + if (stats_) { + stats_->recordTick(tickerType, count); + } + } else { + assert(false); + } +} + +template +void StatisticsImpl::recordInHistogram( + uint32_t histogramType, uint64_t value) { + assert(histogramType < HISTOGRAM_MAX); + if (get_stats_level() <= StatsLevel::kExceptHistogramOrTimers) { + return; + } + per_core_stats_.Access()->histograms_[histogramType].Add(value); + if (stats_ && histogramType < HISTOGRAM_MAX) { + stats_->recordInHistogram(histogramType, value); + } +} + +template +Status StatisticsImpl::Reset() { + MutexLock lock(&aggregate_lock_); + for (uint32_t i = 0; i < TICKER_MAX; ++i) { + setTickerCountLocked(i, 0); + } + for (uint32_t i = 0; i < HISTOGRAM_MAX; ++i) { + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear(); + } + } + return Status::OK(); +} + +namespace { + +// a buffer size used for temp string buffers +const int kTmpStrBufferSize = 200; + +} // namespace + +template +std::string StatisticsImpl::ToString() const { + MutexLock lock(&aggregate_lock_); + std::string res; + res.reserve(20000); + for (const auto& t : TickersNameMap) { + assert(t.first < TICKER_MAX); + char buffer[kTmpStrBufferSize]; + snprintf(buffer, kTmpStrBufferSize, "%s COUNT : %" PRIu64 "\n", + t.second.c_str(), getTickerCountLocked(t.first)); + res.append(buffer); + } + for (const auto& h : HistogramsNameMap) { + assert(h.first < HISTOGRAM_MAX); + char buffer[kTmpStrBufferSize]; + HistogramData hData; + getHistogramImplLocked(h.first)->Data(&hData); + // don't handle failures - buffer should always be big enough and arguments + // should be provided correctly + int ret = + snprintf(buffer, kTmpStrBufferSize, + "%s P50 : %f P95 : %f P99 : %f P100 : %f COUNT : %" PRIu64 + " SUM : %" PRIu64 "\n", + h.second.c_str(), hData.median, hData.percentile95, + hData.percentile99, hData.max, hData.count, hData.sum); + if (ret < 0 || ret >= kTmpStrBufferSize) { + assert(false); + continue; + } + res.append(buffer); + } + res.shrink_to_fit(); + return res; +} + +template +bool StatisticsImpl::getTickerMap( + std::map* stats_map) const { + assert(stats_map); + if (!stats_map) { + return false; + } + stats_map->clear(); + MutexLock lock(&aggregate_lock_); + for (const auto& t : TickersNameMap) { + assert(t.first < TICKER_MAX); + (*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first); + } + return true; +} + +template +bool StatisticsImpl::HistEnabledForType( + uint32_t type) const { + return type < HISTOGRAM_MAX; +} + template std::shared_ptr CreateDBStatistics() { return std::make_shared>(nullptr); @@ -139,5 +354,5 @@ std::shared_ptr CreateDBStatistics() { template std::shared_ptr CreateDBStatistics(); - +template class StatisticsImpl; } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block.cc b/table/block_based/block.cc index bc18dd926da..662d16db6d5 100644 --- a/table/block_based/block.cc +++ b/table/block_based/block.cc @@ -452,7 +452,8 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) { value_type != ValueType::kTypeMerge && value_type != ValueType::kTypeSingleDeletion && value_type != ValueType::kTypeBlobIndex && - value_type != ValueType::kTypeWideColumnEntity) { + value_type != ValueType::kTypeWideColumnEntity && + value_type != ValueType::kTypeTitanBlobIndex) { SeekImpl(target); } diff --git a/table/get_context.cc b/table/get_context.cc index 7dafbd7d409..883faf661fc 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -278,8 +278,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, auto type = parsed_key.type; // Key matches. Process it if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || - type == kTypeWideColumnEntity || type == kTypeDeletion || - type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) && + type == kTypeTitanBlobIndex || type == kTypeWideColumnEntity || + type == kTypeDeletion || type == kTypeDeletionWithTimestamp || + type == kTypeSingleDeletion) && max_covering_tombstone_seq_ != nullptr && *max_covering_tombstone_seq_ > parsed_key.sequence) { // Note that deletion types are also considered, this is for the case @@ -290,9 +291,10 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, switch (type) { case kTypeValue: case kTypeBlobIndex: + case kTypeTitanBlobIndex: case kTypeWideColumnEntity: assert(state_ == kNotFound || state_ == kMerge); - if (type == kTypeBlobIndex) { + if (type == kTypeBlobIndex || type == kTypeTitanBlobIndex) { if (is_blob_index_ == nullptr) { // Blob value not supported. Stop. state_ = kUnexpectedBlobIndex; @@ -301,7 +303,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } if (is_blob_index_ != nullptr) { - *is_blob_index_ = (type == kTypeBlobIndex); + *is_blob_index_ = + (type == kTypeBlobIndex || type == kTypeTitanBlobIndex); } if (kNotFound == state_) { @@ -367,7 +370,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, push_operand(value_of_default, value_pinner); } else { - assert(type == kTypeValue); + assert(type == kTypeValue || type == kTypeTitanBlobIndex); push_operand(value, value_pinner); } } @@ -410,7 +413,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, push_operand(value_of_default, value_pinner); } } else { - assert(type == kTypeValue); + assert(type == kTypeValue || type == kTypeTitanBlobIndex); state_ = kFound; if (do_merge_) { diff --git a/utilities/debug.cc b/utilities/debug.cc index 0629846d302..979d478ffc0 100644 --- a/utilities/debug.cc +++ b/utilities/debug.cc @@ -39,6 +39,8 @@ static std::unordered_map value_type_string_map = { {"TypeColumnFamilyWideColumnEntity", ValueType::kTypeColumnFamilyWideColumnEntity}, {"TypeTitanBlobIndex", ValueType::kTypeTitanBlobIndex}, + {"TypeTitanColumnFamilyBlobIndex", + ValueType::kTypeTitanColumnFamilyBlobIndex}, }; std::string KeyVersion::GetTypeName() const {