Skip to content

Commit bccc76e

Browse files
committed
#1837: Fixed TSAN issues in C++ driver in PartitionAwarePolicy.
Summary: Reworked updating of partitions in the metadata object for PartitionAwarePolicy: - the partitions updating timer (PeriodicTask) was moved from PartitionAwarePolicy into Session object. (Using Session's event loop.) - Instead of internal `ControlMultipleRequestCallback::execute_query` now using `Session::execute ` async API. Test Plan: ybd tsan --cxx-test integration-tests_cassandra_cpp_driver-test --gtest_filter CppCassandraDriverTest.BatchWriteDuringSoftMemoryLimit Reviewers: mikhail Reviewed By: mikhail Subscribers: yql Differential Revision: https://phabricator.dev.yugabyte.com/D9241
1 parent 32416b8 commit bccc76e

File tree

8 files changed

+110
-52
lines changed

8 files changed

+110
-52
lines changed

src/config.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,10 @@ class Config {
298298

299299
bool partition_aware_routing() const { return partition_aware_routing_; }
300300

301+
unsigned partition_refresh_frequency_secs() const {
302+
return partition_refresh_frequency_secs_;
303+
}
304+
301305
void set_partition_aware_routing(bool is_partition_aware,
302306
unsigned refresh_frequency_secs = CASS_DEFAULT_METADATA_REFRESH_FREQUENCY_SECS) {
303307
partition_aware_routing_ = is_partition_aware;

src/control_connection.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,8 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
509509
session->purge_hosts(is_initial_connection);
510510

511511
if (control_connection->use_schema_ ||
512-
control_connection->token_aware_routing_) {
512+
control_connection->token_aware_routing_ ||
513+
control_connection->partition_aware_routing_) {
513514
control_connection->query_meta_schema();
514515
} else if (is_initial_connection) {
515516
control_connection->state_ = CONTROL_STATE_READY;
@@ -520,11 +521,8 @@ void ControlConnection::on_query_hosts(ControlConnection* control_connection,
520521
}
521522
}
522523

523-
void ControlConnection::refresh_partitions() {
524-
SharedRefPtr<ControlMultipleRequestCallback<UnusedData> > callback(
525-
new ControlMultipleRequestCallback<UnusedData>(this, ControlConnection::on_query_meta_schema, UnusedData()));
526-
527-
callback->execute_query("partitions", YB_SELECT_PARTITIONS);
524+
std::string ControlConnection::get_yb_select_partitions_statement() {
525+
return std::string(YB_SELECT_PARTITIONS);
528526
}
529527

530528
//TODO: query and callbacks should be in Metadata

src/control_connection.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class ControlConnection : public Connection::Listener {
4646
const Value* rpc_value,
4747
Address* output);
4848

49+
static std::string get_yb_select_partitions_statement();
50+
4951
enum State {
5052
CONTROL_STATE_NEW,
5153
CONTROL_STATE_READY,
@@ -73,8 +75,6 @@ class ControlConnection : public Connection::Listener {
7375
void on_up(const Address& address);
7476
void on_down(const Address& address);
7577

76-
void refresh_partitions();
77-
7878
private:
7979
template<class T>
8080
class ControlMultipleRequestCallback : public MultipleRequestCallback {

src/metadata.cpp

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -890,14 +890,9 @@ void Metadata::update_aggregates(int protocol_version, const VersionNumber& cass
890890
}
891891

892892
void Metadata::update_partitions(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) {
893+
ScopedMutex l(&mutex_);
893894
schema_snapshot_version_++;
894-
895-
if (is_front_buffer()) {
896-
ScopedMutex l(&mutex_);
897-
updating_->update_partitions(protocol_version, cassandra_version, cache_, result);
898-
} else {
899-
updating_->update_partitions(protocol_version, cassandra_version, cache_, result);
900-
}
895+
updating_->update_partitions(protocol_version, cassandra_version, cache_, result);
901896
}
902897

903898
void Metadata::drop_keyspace(const std::string& keyspace_name) {

src/partition_aware_policy.cpp

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -179,25 +179,6 @@ void PartitionAwarePolicy::init(const Host::Ptr& connected_host,
179179
ChainedLoadBalancingPolicy::init(connected_host, hosts, random);
180180
}
181181

182-
void PartitionAwarePolicy::register_handles(uv_loop_t* loop) {
183-
ChainedLoadBalancingPolicy::register_handles(loop);
184-
185-
refresh_metadata_task_ = PeriodicTask::start(loop,
186-
refresh_frequency_secs_*1000,
187-
this,
188-
PartitionAwarePolicy::on_work,
189-
PartitionAwarePolicy::on_after_work);
190-
}
191-
192-
void PartitionAwarePolicy::close_handles() {
193-
if (refresh_metadata_task_) {
194-
PeriodicTask::stop(refresh_metadata_task_);
195-
refresh_metadata_task_.reset();
196-
}
197-
198-
ChainedLoadBalancingPolicy::close_handles();
199-
}
200-
201182
QueryPlan* PartitionAwarePolicy::new_query_plan(const std::string& keyspace,
202183
RequestHandler* request_handler) {
203184
QueryPlan* const child_plan = child_policy_->new_query_plan(keyspace, request_handler);
@@ -315,15 +296,4 @@ Host::Ptr PartitionAwarePolicy::PartitionAwareQueryPlan::compute_next() {
315296
return Host::Ptr();
316297
}
317298

318-
void PartitionAwarePolicy::on_work(PeriodicTask* task) {
319-
PartitionAwarePolicy* const policy = static_cast<PartitionAwarePolicy*>(task->data());
320-
if (policy->control_connection() != NULL) {
321-
policy->control_connection()->refresh_partitions();
322-
}
323-
}
324-
325-
void PartitionAwarePolicy::on_after_work(PeriodicTask*) {
326-
// No-op.
327-
}
328-
329299
} // namespace cass

src/partition_aware_policy.hpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ class CASS_EXPORT PartitionAwarePolicy: public ChainedLoadBalancingPolicy {
2929

3030
virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random);
3131

32-
virtual void register_handles(uv_loop_t* loop);
33-
virtual void close_handles();
34-
3532
virtual QueryPlan* new_query_plan(const std::string& keyspace,
3633
RequestHandler* request_handler);
3734

@@ -74,10 +71,6 @@ class CASS_EXPORT PartitionAwarePolicy: public ChainedLoadBalancingPolicy {
7471
size_t remaining_;
7572
};
7673

77-
static void on_work(PeriodicTask* task);
78-
static void on_after_work(PeriodicTask*);
79-
80-
PeriodicTask::Ptr refresh_metadata_task_;
8174
CopyOnWriteHostVec hosts_;
8275
int index_;
8376
unsigned refresh_frequency_secs_;

src/session.cpp

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
#include "cluster.hpp"
2121
#include "config.hpp"
2222
#include "constants.hpp"
23+
#include "error_response.hpp"
2324
#include "execute_request.hpp"
2425
#include "logger.hpp"
2526
#include "prepare_request.hpp"
27+
#include "query_request.hpp"
2628
#include "scoped_lock.hpp"
2729
#include "statement.hpp"
2830
#include "timer.hpp"
@@ -158,13 +160,15 @@ Session::Session()
158160
uv_mutex_init(&state_mutex_);
159161
uv_mutex_init(&hosts_mutex_);
160162
uv_mutex_init(&keyspace_mutex_);
163+
uv_mutex_init(&refresh_metadata_future_mutex_);
161164
}
162165

163166
Session::~Session() {
164167
join();
165168
uv_mutex_destroy(&state_mutex_);
166169
uv_mutex_destroy(&hosts_mutex_);
167170
uv_mutex_destroy(&keyspace_mutex_);
171+
uv_mutex_destroy(&refresh_metadata_future_mutex_);
168172
}
169173

170174
void Session::clear(const Config& config) {
@@ -173,6 +177,10 @@ void Session::clear(const Config& config) {
173177
metrics_.reset(new Metrics(config_.thread_count_io() + 1));
174178
connect_future_.reset();
175179
close_future_.reset();
180+
{
181+
ScopedMutex lock_future(&refresh_metadata_future_mutex_);
182+
refresh_metadata_future_.reset();
183+
}
176184
{ // Lock hosts
177185
ScopedMutex l(&hosts_mutex_);
178186
hosts_.clear();
@@ -416,6 +424,76 @@ void Session::notify_connected() {
416424
connect_future_->set();
417425
connect_future_.reset();
418426
}
427+
{
428+
ScopedMutex lock_future(&refresh_metadata_future_mutex_);
429+
if (refresh_metadata_future_) {
430+
refresh_metadata_future_->set();
431+
refresh_metadata_future_.reset();
432+
}
433+
}
434+
435+
if (config().partition_aware_routing()) {
436+
refresh_metadata_task_ = PeriodicTask::start(loop(),
437+
config().partition_refresh_frequency_secs()*1000,
438+
this,
439+
Session::on_refresh_metadata,
440+
Session::on_after_refresh_metadata);
441+
}
442+
}
443+
444+
void Session::on_refresh_metadata(PeriodicTask* task) {
445+
Session* const session = static_cast<Session*>(task->data());
446+
{
447+
ScopedMutex l(&session->state_mutex_);
448+
if (session->state_.load(MEMORY_ORDER_RELAXED) != SESSION_STATE_CONNECTED) {
449+
return; // The session is finished.
450+
}
451+
}
452+
453+
ScopedMutex lock_future(&session->refresh_metadata_future_mutex_);
454+
if (!session->refresh_metadata_future_) {
455+
session->refresh_metadata_future_.reset(new ResponseFuture());
456+
session->refresh_metadata_future_->set_callback(&Session::refresh_metadata_callback, session);
457+
458+
cass::QueryRequest* const query_request =
459+
new cass::QueryRequest(ControlConnection::get_yb_select_partitions_statement(), 0);
460+
RequestHandler::Ptr request_handler(new RequestHandler(
461+
QueryRequest::ConstPtr(query_request), session->refresh_metadata_future_, session));
462+
session->execute(request_handler);
463+
}
464+
}
465+
466+
void Session::on_after_refresh_metadata(PeriodicTask* task) {
467+
// No-op.
468+
}
469+
470+
void Session::refresh_metadata_callback(CassFuture* future, void* data) {
471+
ResponseFuture::Ptr rf(static_cast<cass::ResponseFuture*>(future->from()));
472+
Session* const session = static_cast<Session*>(data);
473+
474+
ScopedMutex lock_future(&session->refresh_metadata_future_mutex_);
475+
session->refresh_metadata_future_.reset();
476+
477+
{
478+
ScopedMutex l(&session->state_mutex_);
479+
if (session->state_.load(MEMORY_ORDER_RELAXED) != SESSION_STATE_CONNECTED) {
480+
return; // The session is finished.
481+
}
482+
}
483+
484+
if (!rf->ready()) {
485+
return; // ResponseFuture is not ready.
486+
}
487+
488+
cass::Response::Ptr response(rf->response());
489+
if (!response || check_error_or_invalid_response("Session", CQL_OPCODE_RESULT, response.get())) {
490+
return; // Error or null response.
491+
}
492+
493+
const int protocol_version = session->control_connection_.protocol_version();
494+
const VersionNumber& cassandra_version = session->control_connection_.cassandra_version();
495+
ResultResponse* const partitions_result = static_cast<ResultResponse*>(response.get());
496+
session->metadata().update_partitions(protocol_version, cassandra_version, partitions_result);
419497
}
420498

421499
void Session::notify_connect_error(CassError code, const std::string& message) {
@@ -445,12 +523,23 @@ void Session::notify_closed() {
445523
close_future_->set();
446524
close_future_.reset();
447525
}
526+
527+
ScopedMutex lock_future(&refresh_metadata_future_mutex_);
528+
if (refresh_metadata_future_) {
529+
refresh_metadata_future_->set();
530+
refresh_metadata_future_.reset();
531+
}
448532
}
449533

450534
void Session::close_handles() {
451535
EventThread<SessionEvent>::close_handles();
452536
request_queue_->close_handles();
453537
config_.load_balancing_policy()->close_handles();
538+
539+
if (refresh_metadata_task_) {
540+
PeriodicTask::stop(refresh_metadata_task_);
541+
refresh_metadata_task_.reset();
542+
}
454543
}
455544

456545
void Session::on_run() {

src/session.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,11 @@ class Session : public EventThread<SessionEvent>, public RequestListener {
216216
const std::string& result_metadata_id,
217217
const ResultResponse::ConstPtr& result_response);
218218

219+
static void on_refresh_metadata(PeriodicTask* task);
220+
static void on_after_refresh_metadata(PeriodicTask* task);
221+
222+
static void refresh_metadata_callback(CassFuture* future, void* data);
223+
219224
private:
220225
typedef std::vector<IOWorker::Ptr > IOWorkerVec;
221226

@@ -229,6 +234,10 @@ class Session : public EventThread<SessionEvent>, public RequestListener {
229234
Future::Ptr connect_future_;
230235
Future::Ptr close_future_;
231236

237+
PeriodicTask::Ptr refresh_metadata_task_;
238+
uv_mutex_t refresh_metadata_future_mutex_;
239+
ResponseFuture::Ptr refresh_metadata_future_;
240+
232241
HostMap hosts_;
233242
uv_mutex_t hosts_mutex_;
234243

0 commit comments

Comments
 (0)