-
Notifications
You must be signed in to change notification settings - Fork 74
/
Copy pathmemtablerep.h
544 lines (466 loc) · 21.5 KB
/
memtablerep.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
// Copyright (C) 2023 Speedb Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file contains the interface that must be implemented by any collection
// to be used as the backing store for a MemTable. Such a collection must
// satisfy the following properties:
// (1) It does not store duplicate items.
// (2) It uses MemTableRep::KeyComparator to compare items for iteration and
// equality.
// (3) It can be accessed concurrently by multiple readers and can support
// during reads. However, it needn't support multiple concurrent writes.
// (4) Items are never deleted.
// The liberal use of assertions is encouraged to enforce (1).
//
// The factory will be passed an MemTableAllocator object when a new MemTableRep
// is requested.
//
// Users can implement their own memtable representations. We include three
// types built in:
// - SkipListRep: This is the default; it is backed by a skip list.
// - HashSkipListRep: The memtable rep that is best used for keys that are
// structured like "prefix:suffix" where iteration within a prefix is
// common and iteration across different prefixes is rare. It is backed by
// a hash map where each bucket is a skip list.
// - VectorRep: This is backed by an unordered std::vector. On iteration, the
// vector is sorted. It is intelligent about sorting; once the MarkReadOnly()
// has been called, the vector will only be sorted once. It is optimized for
// random-write-heavy workloads.
//
// The last four implementations are designed for situations in which
// iteration over the entire collection is rare since doing so requires all the
// keys to be copied into a sorted data structure.
#pragma once
#include <stdint.h>
#include <stdlib.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <unordered_set>
#include "rocksdb/customizable.h"
#include "rocksdb/port_defs.h"
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
class Arena;
class Allocator;
class LookupKey;
class SliceTransform;
class Logger;
struct DBOptions;
using KeyHandle = void*;
extern Slice GetLengthPrefixedSlice(const char* data);
class MemTableRep {
public:
// KeyComparator provides a means to compare keys, which are internal keys
// concatenated with values.
class KeyComparator {
public:
using DecodedType = ROCKSDB_NAMESPACE::Slice;
virtual DecodedType decode_key(const char* key) const {
// The format of key is frozen and can be treated as a part of the API
// contract. Refer to MemTable::Add for details.
return GetLengthPrefixedSlice(key);
}
// Compare a and b. Return a negative value if a is less than b, 0 if they
// are equal, and a positive value if a is greater than b
virtual int operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const = 0;
virtual int operator()(const char* prefix_len_key,
const Slice& key) const = 0;
virtual ~KeyComparator() {}
};
explicit MemTableRep(Allocator* allocator) : allocator_(allocator) {}
// Allocate a buf of len size for storing key. The idea is that a
// specific memtable representation knows its underlying data structure
// better. By allowing it to allocate memory, it can possibly put
// correlated stuff in consecutive memory area to make processor
// prefetching more efficient.
virtual KeyHandle Allocate(const size_t len, char** buf);
// Insert key into the collection. (The caller will pack key and value into a
// single buffer and pass that in as the parameter to Insert).
// REQUIRES: nothing that compares equal to key is currently in the
// collection, and no concurrent modifications to the table in progress
virtual void Insert(KeyHandle handle) = 0;
// Same as ::Insert
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertKey(KeyHandle handle) {
Insert(handle);
return true;
}
// Same as Insert(), but in additional pass a hint to insert location for
// the key. If hint points to nullptr, a new hint will be populated.
// otherwise the hint will be updated to reflect the last insert location.
//
// Currently only skip-list based memtable implement the interface. Other
// implementations will fallback to Insert() by default.
virtual void InsertWithHint(KeyHandle handle, void** /*hint*/) {
// Ignore the hint by default.
Insert(handle);
}
// Same as ::InsertWithHint
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertKeyWithHint(KeyHandle handle, void** hint) {
InsertWithHint(handle, hint);
return true;
}
// Same as ::InsertWithHint, but allow concurrent write
//
// If hint points to nullptr, a new hint will be allocated on heap, otherwise
// the hint will be updated to reflect the last insert location. The hint is
// owned by the caller and it is the caller's responsibility to delete the
// hint later.
//
// Currently only skip-list based memtable implement the interface. Other
// implementations will fallback to InsertConcurrently() by default.
virtual void InsertWithHintConcurrently(KeyHandle handle, void** /*hint*/) {
// Ignore the hint by default.
InsertConcurrently(handle);
}
// Same as ::InsertWithHintConcurrently
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) {
InsertWithHintConcurrently(handle, hint);
return true;
}
// Like Insert(handle), but may be called concurrent with other calls
// to InsertConcurrently for other handles.
//
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual void InsertConcurrently(KeyHandle handle);
// Same as ::InsertConcurrently
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertKeyConcurrently(KeyHandle handle) {
InsertConcurrently(handle);
return true;
}
// Returns true iff an entry that compares equal to key is in the collection.
virtual bool Contains(const char* key) const = 0;
// Notify this table rep that it will no longer be added to. By default,
// does nothing. After MarkReadOnly() is called, this table rep will
// not be written to (ie No more calls to Allocate(), Insert(),
// or any writes done directly to entries accessed through the iterator.)
virtual void MarkReadOnly() {}
// Notify this table rep that it has been flushed to stable storage.
// By default, does nothing.
//
// Invariant: MarkReadOnly() is called, before MarkFlushed().
// Note that this method if overridden, should not run for an extended period
// of time. Otherwise, RocksDB may be blocked.
virtual void MarkFlushed() {}
// Look up key from the mem table, since the first key in the mem table whose
// user_key matches the one given k, call the function callback_func(), with
// callback_args directly forwarded as the first parameter, and the mem table
// key as the second parameter. If the return value is false, then terminates.
// Otherwise, go through the next key.
//
// It's safe for Get() to terminate after having finished all the potential
// key for the k.user_key(), or not.
//
// Default:
// Get() function with a default value of dynamically construct an iterator,
// seek and call the call back function.
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry));
virtual uint64_t ApproximateNumEntries(const Slice& /*start_ikey*/,
const Slice& /*end_key*/) {
return 0;
}
// Returns a vector of unique random memtable entries of approximate
// size 'target_sample_size' (this size is not strictly enforced).
virtual void UniqueRandomSample(const uint64_t num_entries,
const uint64_t target_sample_size,
std::unordered_set<const char*>* entries) {
(void)num_entries;
(void)target_sample_size;
(void)entries;
assert(false);
}
// Report an approximation of how much memory has been used other than memory
// that was allocated through the allocator. Safe to call from any thread.
virtual size_t ApproximateMemoryUsage() = 0;
virtual ~MemTableRep() {}
// Iteration over the contents of a skip collection
class Iterator {
public:
// Initialize an iterator over the specified collection.
// The returned iterator is not valid.
// explicit Iterator(const MemTableRep* collection);
virtual ~Iterator() {}
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const = 0;
virtual bool IsEmpty() { return false; }
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const = 0;
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() = 0;
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() = 0;
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& internal_key, const char* memtable_key) = 0;
// retreat to the first entry with a key <= target
virtual void SeekForPrev(const Slice& internal_key,
const char* memtable_key) = 0;
virtual void RandomSeek() {}
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() = 0;
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast() = 0;
};
// Return an iterator over the keys in this representation.
// arena: If not null, the arena needs to be used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy
// all the states but those allocated in arena.
virtual Iterator* GetIterator(Arena* arena = nullptr,
bool part_of_flush = false) = 0;
// Return an iterator that has a special Seek semantics. The result of
// a Seek might only include keys with the same prefix as the target key.
// arena: If not null, the arena is used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy
// all the states but those allocated in arena.
virtual Iterator* GetDynamicPrefixIterator(Arena* arena = nullptr) {
return GetIterator(arena);
}
// Return true if the current MemTableRep supports merge operator.
// Default: true
virtual bool IsMergeOperatorSupported() const { return true; }
// Return true if the current MemTableRep supports snapshot
// Default: true
virtual bool IsSnapshotSupported() const { return true; }
protected:
// When *key is an internal key concatenated with the value, returns the
// user key.
virtual Slice UserKey(const char* key) const;
Allocator* allocator_;
};
// This is the base class for all factories that are used by RocksDB to create
// new MemTableRep objects
class MemTableRepFactory : public Customizable {
public:
MemTableRepFactory() {}
~MemTableRepFactory() override {
if (enable_switch_memtable_) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
terminate_switch_memtable_.store(true);
}
switch_memtable_thread_cv_.notify_one();
switch_memtable_thread_.join();
const MemTableRep* memtable = switch_mem_.exchange(nullptr);
if (memtable != nullptr) {
delete memtable;
}
}
}
void Init() {
switch_memtable_thread_ =
port::Thread(&MemTableRepFactory::PrepareSwitchMemTable, this);
// need to verify the thread was executed
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
while (!switch_memtable_thread_init_.load()) {
switch_memtable_thread_cv_.wait(lck);
}
}
enable_switch_memtable_ = true;
}
static const char* Type() { return "MemTableRepFactory"; }
static Status CreateFromString(const ConfigOptions& config_options,
const std::string& id,
std::unique_ptr<MemTableRepFactory>* factory);
static Status CreateFromString(const ConfigOptions& config_options,
const std::string& id,
std::shared_ptr<MemTableRepFactory>* factory);
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
Allocator*, const SliceTransform*,
Logger* logger) = 0;
virtual MemTableRep* CreateMemTableRep(
const MemTableRep::KeyComparator& key_cmp, Allocator* allocator,
const SliceTransform* slice_transform, Logger* logger,
uint32_t /* column_family_id */) {
if (enable_switch_memtable_) {
return GetSwitchMemtable(key_cmp, allocator, slice_transform, logger);
} else {
return CreateMemTableRep(key_cmp, allocator, slice_transform, logger);
}
}
const char* Name() const override = 0;
// Return true if the current MemTableRep supports concurrent inserts
// Default: false
virtual bool IsInsertConcurrentlySupported() const { return false; }
// Return true if the current MemTableRep supports detecting duplicate
// <key,seq> at insertion time. If true, then MemTableRep::Insert* returns
// false when if the <key,seq> already exists.
// Default: false
virtual bool CanHandleDuplicatedKey() const { return false; }
virtual bool IsRefreshIterSupported() const { return true; }
virtual MemTableRep* PreCreateMemTableRep() { return nullptr; }
virtual void PostCreateMemTableRep(
MemTableRep* /*switch_mem*/,
const MemTableRep::KeyComparator& /*key_cmp*/, Allocator* /*allocator*/,
const SliceTransform* /*slice_transform*/, Logger* /*logger*/) {}
void PrepareSwitchMemTable() {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
switch_memtable_thread_init_.store(true);
}
switch_memtable_thread_cv_.notify_one();
for (;;) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
while (switch_mem_.load(std::memory_order_acquire) != nullptr) {
if (terminate_switch_memtable_.load()) {
return;
}
switch_memtable_thread_cv_.wait(lck);
}
}
// Construct new memtable only for the heavy object initilized proposed
switch_mem_.store(PreCreateMemTableRep(), std::memory_order_release);
}
}
MemTableRep* GetSwitchMemtable(const MemTableRep::KeyComparator& key_cmp,
Allocator* allocator,
const SliceTransform* slice_transform,
Logger* logger) {
MemTableRep* switch_mem = nullptr;
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release);
}
switch_memtable_thread_cv_.notify_one();
if (switch_mem == nullptr) {
// No point in suspending, just construct the memtable here
switch_mem =
CreateMemTableRep(key_cmp, allocator, slice_transform, logger);
} else {
PostCreateMemTableRep(switch_mem, key_cmp, allocator, slice_transform,
logger);
}
return switch_mem;
}
public:
// true if the current MemTableRep supports prepare memtable creation
// note that if it does the memtable contruction MUST NOT use any arena
// allocation!!! Default: false
bool enable_switch_memtable_ = false;
private:
port::Thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
std::atomic<bool> terminate_switch_memtable_ = false;
std::atomic<bool> switch_memtable_thread_init_ = false;
std::atomic<MemTableRep*> switch_mem_ = nullptr;
};
// This uses a skip list to store keys. It is the default.
//
// Parameters:
// lookahead: If non-zero, each iterator's seek operation will start the
// search from the previously visited record (doing at most 'lookahead'
// steps). This is an optimization for the access pattern including many
// seeks with consecutive keys.
class SkipListFactory : public MemTableRepFactory {
public:
explicit SkipListFactory(size_t lookahead = 0);
// Methods for Configurable/Customizable class overrides
static const char* kClassName() { return "SkipListFactory"; }
static const char* kNickName() { return "skip_list"; }
virtual const char* Name() const override { return kClassName(); }
virtual const char* NickName() const override { return kNickName(); }
std::string GetId() const override;
// Methods for MemTableRepFactory class overrides
using MemTableRepFactory::CreateMemTableRep;
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
Allocator*, const SliceTransform*,
Logger* logger) override;
bool IsInsertConcurrentlySupported() const override { return true; }
bool CanHandleDuplicatedKey() const override { return true; }
private:
size_t lookahead_;
};
// This creates MemTableReps that are backed by an std::vector. On iteration,
// the vector is sorted. This is useful for workloads where iteration is very
// rare and writes are generally not issued after reads begin.
//
// Parameters:
// count: Passed to the constructor of the underlying std::vector of each
// VectorRep. On initialization, the underlying array will be at least count
// bytes reserved for usage.
class VectorRepFactory : public MemTableRepFactory {
size_t count_;
public:
explicit VectorRepFactory(size_t count = 0);
// Methods for Configurable/Customizable class overrides
static const char* kClassName() { return "VectorRepFactory"; }
static const char* kNickName() { return "vector"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kNickName(); }
bool IsRefreshIterSupported() const override { return false; }
// Methods for MemTableRepFactory class overrides
using MemTableRepFactory::CreateMemTableRep;
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
Allocator*, const SliceTransform*,
Logger* logger) override;
};
// This class contains a fixed array of buckets, each
// pointing to a skiplist (null if the bucket is empty).
// bucket_count: number of fixed array buckets
// skiplist_height: the max height of the skiplist
// skiplist_branching_factor: probabilistic size ratio between adjacent
// link lists in the skiplist
extern MemTableRepFactory* NewHashSkipListRepFactory(
size_t bucket_count = 1000000, int32_t skiplist_height = 4,
int32_t skiplist_branching_factor = 4);
// The factory is to create memtables based on a hash table:
// it contains a fixed array of buckets, each pointing to either a linked list
// or a skip list if number of entries inside the bucket exceeds
// threshold_use_skiplist.
// @bucket_count: number of fixed array buckets
// @huge_page_tlb_size: if <=0, allocate the hash table bytes from malloc.
// Otherwise from huge page TLB. The user needs to reserve
// huge pages for it to be allocated, like:
// sysctl -w vm.nr_hugepages=20
// See linux doc Documentation/vm/hugetlbpage.txt
// @bucket_entries_logging_threshold: if number of entries in one bucket
// exceeds this number, log about it.
// @if_log_bucket_dist_when_flash: if true, log distribution of number of
// entries when flushing.
// @threshold_use_skiplist: a bucket switches to skip list if number of
// entries exceed this parameter.
extern MemTableRepFactory* NewHashLinkListRepFactory(
size_t bucket_count = 50000, size_t huge_page_tlb_size = 0,
int bucket_entries_logging_threshold = 4096,
bool if_log_bucket_dist_when_flash = true,
uint32_t threshold_use_skiplist = 256);
// The factory is to create memtables based on a sorted hash table - spdb hash:
extern MemTableRepFactory* NewHashSpdbRepFactory(size_t bucket_count = 1000000,
bool use_merge = true);
} // namespace ROCKSDB_NAMESPACE