forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrange_streamer.hh
176 lines (160 loc) · 7.19 KB
/
range_streamer.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include "locator/abstract_replication_strategy.hh"
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
#include "streaming/stream_reason.hh"
#include "service/topology_guard.hh"
#include "gms/inet_address.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/abort_source.hh>
#include <unordered_map>
#include <memory>
namespace replica {
class database;
}
namespace gms { class gossiper; }
namespace locator { class topology; }
namespace dht {
/**
* Assists in streaming ranges to a node.
*/
class range_streamer {
public:
using inet_address = gms::inet_address;
using token_metadata = locator::token_metadata;
using token_metadata_ptr = locator::token_metadata_ptr;
using stream_plan = streaming::stream_plan;
using stream_state = streaming::stream_state;
public:
/**
* A filter applied to sources to stream from when constructing a fetch map.
*/
class i_source_filter {
public:
virtual bool should_include(const locator::topology&, inet_address endpoint) = 0;
virtual ~i_source_filter() {}
};
/**
* Source filter which excludes any endpoints that are not alive according to a
* failure detector.
*/
class failure_detector_source_filter : public i_source_filter {
private:
std::set<gms::inet_address> _down_nodes;
public:
failure_detector_source_filter(std::set<gms::inet_address> down_nodes) : _down_nodes(std::move(down_nodes)) { }
virtual bool should_include(const locator::topology&, inet_address endpoint) override { return !_down_nodes.contains(endpoint); }
};
/**
* Source filter which excludes any endpoints that are not in a specific data center.
*/
class single_datacenter_filter : public i_source_filter {
private:
sstring _source_dc;
public:
single_datacenter_filter(const sstring& source_dc)
: _source_dc(source_dc) {
}
virtual bool should_include(const locator::topology& topo, inet_address endpoint) override {
return topo.get_datacenter(endpoint) == _source_dc;
}
};
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
locator::host_id address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason,
service::frozen_topology_guard topo_guard,
std::vector<sstring> tables = {})
: _db(db)
, _stream_manager(sm)
, _token_metadata_ptr(std::move(tmptr))
, _abort_source(abort_source)
, _tokens(std::move(tokens))
, _address(address)
, _dr(std::move(dr))
, _description(std::move(description))
, _reason(reason)
, _tables(std::move(tables))
, _topo_guard(topo_guard)
{
_abort_source.check();
}
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source,
locator::host_id address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector<sstring> tables = {})
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason, std::move(topo_guard), std::move(tables)) {
}
void add_source_filter(std::unique_ptr<i_source_filter> filter) {
_source_filters.emplace(std::move(filter));
}
future<> add_ranges(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing);
void add_tx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint);
void add_rx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint);
private:
bool use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::vnode_effective_replication_map_ptr& erm);
/**
* Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
* to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
*/
std::unordered_map<dht::token_range, std::vector<inet_address>>
get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges);
/**
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
*/
std::unordered_map<dht::token_range, std::vector<inet_address>>
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper);
private:
/**
* @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
* @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given
* here, we always exclude ourselves.
* @return
*/
std::unordered_map<inet_address, dht::token_range_vector>
get_range_fetch_map(const std::unordered_map<dht::token_range, std::vector<inet_address>>& ranges_with_sources,
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
const sstring& keyspace);
#if 0
// For testing purposes
Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch()
{
return toFetch;
}
#endif
// Can be called only before stream_async().
const token_metadata& get_token_metadata() {
return *_token_metadata_ptr;
}
public:
future<> stream_async();
size_t nr_ranges_to_stream();
private:
distributed<replica::database>& _db;
sharded<streaming::stream_manager>& _stream_manager;
token_metadata_ptr _token_metadata_ptr;
abort_source& _abort_source;
std::unordered_set<token> _tokens;
locator::host_id _address;
locator::endpoint_dc_rack _dr;
sstring _description;
streaming::stream_reason _reason;
std::vector<sstring> _tables;
service::frozen_topology_guard _topo_guard;
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_stream;
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
// Number of tx and rx ranges added
unsigned _nr_tx_added = 0;
unsigned _nr_rx_added = 0;
// Limit the number of nodes to stream in parallel to reduce memory pressure with large cluster.
seastar::semaphore _limiter{16};
size_t _nr_total_ranges = 0;
size_t _nr_ranges_remaining = 0;
};
} // dht