forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sharder.hh
148 lines (133 loc) · 6.93 KB
/
sharder.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "dht/ring_position.hh"
#include "dht/token-sharding.hh"
#include "interval.hh"
#include <vector>
namespace dht {
// Utilities for sharding ring partition_range:s
// A ring_position range's data is divided into sub-ranges, where each sub-range's data
// is owned by a single shard. Note that multiple non-overlapping sub-ranges may map to a
// single shard, and some shards may not receive any sub-range.
//
// This module provides utilities for determining the sub-ranges to shard mapping. The utilities
// generate optimal mappings: each range that you get is the largest possible, so you
// get the minimum number of ranges possible. You can get many ranges, so operate on them
// one (or a few) at a time, rather than accumulating them.
// A mapping between a partition_range and a shard. All positions within `ring_range` are
// owned by `shard`.
//
// The classes that return ring_position_range_and_shard make `ring_range` as large as
// possible (maximizing the number of tokens), so the total number of such ranges is minimized.
// Successive ranges therefore always have a different `shard` than the previous return.
// (classes that return ring_position_range_and_shard_and_element can have the same `shard`
// in successive returns, if `element` is different).
struct ring_position_range_and_shard {
dht::partition_range ring_range;
unsigned shard;
};
// Incrementally divides a `partition_range` into sub-ranges wholly owned by a single shard.
// During tablet migration uses a view on shard routing for reads.
class ring_position_range_sharder {
const sharder& _sharder;
dht::partition_range _range;
bool _done = false;
public:
// Initializes the ring_position_range_sharder with a given range to subdivide.
ring_position_range_sharder(const sharder& sharder, interval<ring_position> rrp)
: _sharder(sharder), _range(std::move(rrp)) {}
// Fetches the next range-shard mapping. When the input range is exhausted, std::nullopt is
// returned. The returned ranges are contiguous and non-overlapping, and together span the
// entire input range.
std::optional<ring_position_range_and_shard> next(const schema& s);
};
// A mapping between a partition_range and a shard (like ring_position_range_and_shard) extended
// by having a reference to input range index. See ring_position_range_vector_sharder for use.
//
// The classes that return ring_position_range_and_shard_and_element make `ring_range` as large as
// possible (maximizing the number of tokens), so the total number of such ranges is minimized.
// Successive ranges therefore always have a different `shard` than the previous return.
// (classes that return ring_position_range_and_shard_and_element can have the same `shard`
// in successive returns, if `element` is different).
struct ring_position_range_and_shard_and_element : ring_position_range_and_shard {
ring_position_range_and_shard_and_element(ring_position_range_and_shard&& rpras, unsigned element)
: ring_position_range_and_shard(std::move(rpras)), element(element) {
}
unsigned element;
};
// Incrementally divides several non-overlapping `partition_range`:s into sub-ranges wholly owned by
// a single shard.
//
// Similar to ring_position_range_sharder, but instead of stopping when the input range is exhausted,
// moves on to the next input range (input ranges are supplied in a vector).
//
// This has two use cases:
// 1. vnodes. A vnode cannot be described by a single range, since
// one vnode wraps around from the largest token back to the smallest token. Hence it must be
// described as a vector of two ranges, (largest_token, +inf) and (-inf, smallest_token].
// 2. sstable shard mappings. An sstable has metadata describing which ranges it owns, and this is
// used to see what shards these ranges map to (and therefore to see if the sstable is shared or
// not, and which shards share it).
//
// During migration uses a view on shard routing for reads.
class ring_position_range_vector_sharder {
using vec_type = dht::partition_range_vector;
vec_type _ranges;
const sharder& _sharder;
vec_type::iterator _current_range;
std::optional<ring_position_range_sharder> _current_sharder;
private:
void next_range() {
if (_current_range != _ranges.end()) {
_current_sharder.emplace(_sharder, std::move(*_current_range++));
}
}
public:
// Initializes the `ring_position_range_vector_sharder` with the ranges to be processesd.
// Input ranges should be non-overlapping (although nothing bad will happen if they do
// overlap).
ring_position_range_vector_sharder(const sharder& sharder, dht::partition_range_vector ranges);
// Fetches the next range-shard mapping. When the input range is exhausted, std::nullopt is
// returned. Within an input range, results are contiguous and non-overlapping (but since input
// ranges usually are discontiguous, overall the results are not contiguous). Together, the results
// span the input ranges.
//
// The result is augmented with an `element` field which indicates the index from the input vector
// that the result belongs to.
//
// Results are returned sorted by index within the vector first, then within each vector item
std::optional<ring_position_range_and_shard_and_element> next(const schema& s);
};
// Incrementally divides a `partition_range` into sub-ranges wholly owned by a single shard.
// Unlike ring_position_range_sharder, it only returns result for a shard number provided by the caller.
// During topology changes, reflects shard assignment for reads.
class selective_token_range_sharder {
const sharder& _sharder;
dht::token_range _range;
shard_id _shard;
bool _done = false;
shard_id _next_shard;
dht::token _start_token;
std::optional<interval_bound<dht::token>> _start_boundary;
public:
// Initializes the selective_token_range_sharder with a token range and shard_id of interest.
selective_token_range_sharder(const sharder& sharder, dht::token_range range, shard_id shard)
: _sharder(sharder)
, _range(std::move(range))
, _shard(shard)
, _next_shard(_shard + 1 == _sharder.shard_count() ? 0 : _shard + 1)
, _start_token(_range.start() ? _range.start()->value() : minimum_token())
, _start_boundary(_sharder.shard_for_reads(_start_token) == shard ?
_range.start() : interval_bound<dht::token>(_sharder.token_for_next_shard_for_reads(_start_token, shard))) {
}
// Returns the next token_range that is both wholly contained within the input range and also
// wholly owned by the input shard_id. When the input range is exhausted, std::nullopt is returned.
// Note if the range does not intersect the shard at all, std::nullopt will be returned immediately.
std::optional<dht::token_range> next();
};
} // dht