From 6938762bba13d89ee2d02b59e41b2356cc2ec86c Mon Sep 17 00:00:00 2001 From: Keith Wansbrough Date: Tue, 10 Apr 2018 16:50:28 +0100 Subject: [PATCH 1/8] CPP-499 Add new API and tests. --- gtests/src/integration/objects/cluster.hpp | 13 ++++ .../tests/test_control_connection.cpp | 61 +++++++++++++++++++ include/cassandra.h | 33 ++++++++++ src/cluster.cpp | 12 ++++ src/config.hpp | 8 +++ 5 files changed, 127 insertions(+) diff --git a/gtests/src/integration/objects/cluster.hpp b/gtests/src/integration/objects/cluster.hpp index 029cd4c6a..3f77b275e 100644 --- a/gtests/src/integration/objects/cluster.hpp +++ b/gtests/src/integration/objects/cluster.hpp @@ -134,6 +134,19 @@ class Cluster : public Object { return *this; } + /** + * Assign the local address to bind; passing an empty string will clear + * the local address. + * + * @param name An IP address or hostname + * @return Cluster object + */ + Cluster& with_local_address(const std::string& name) { + EXPECT_EQ(CASS_OK, cass_cluster_set_local_address(get(), + name.c_str())); + return *this; + } + /** * Assign the number of connections made to each node/server for each * connections thread diff --git a/gtests/src/integration/tests/test_control_connection.cpp b/gtests/src/integration/tests/test_control_connection.cpp index 06b4587b0..897f39cdf 100644 --- a/gtests/src/integration/tests/test_control_connection.cpp +++ b/gtests/src/integration/tests/test_control_connection.cpp @@ -186,6 +186,67 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, ConnectUsingInvalidPort) { } } +/** + * Perform session connection using invalid local IP address + * + * This test will attempt to perform a connection using an invalid local IP + * address and ensure the control connection is not established against a + * single node cluster. + * + * @test_category control_connection + * @since core:1.0.0 + * @expected_result Control connection will not be established + */ +CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, + ConnectUsingInvalidLocalIpAddress) { + CHECK_FAILURE; + + // Attempt to connect to the server using an invalid local IP address + logger_.add_critera("Unable to establish a control connection to host " \ + "1.1.1.1 because of the following error: Connection " \ + "timeout"); + Cluster cluster = default_cluster().with_local_address("1.1.1.1"); + try { + cluster.connect(); + FAIL() << "Connection was established using invalid local IP address"; + } catch (Session::Exception& se) { + ASSERT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, se.error_code()); + ASSERT_GE(logger_.count(), 1u); + } +} + +/** + * Perform session connection using valid local IP address but invalid + * remote address + * + * This test will attempt to perform a connection using a valid local IP + * address and invalid remote address and ensure the control connection is + * not established against a single node cluster. + * + * @test_category control_connection + * @since core:1.0.0 + * @expected_result Control connection will not be established + */ +CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, + ConnectUsingValidLocalIpAddressButInvalidRemote) { + CHECK_FAILURE; + + // Attempt to connect to the server using an valid local IP address + // but invalid remote address + logger_.add_critera("Unable to establish a control connection to host " \ + "1.1.1.1 because of the following error: Connection " \ + "timeout"); + Cluster cluster = Cluster::build().with_contact_points("1.1.1.1") + .with_local_address("127.0.0.1"); + try { + cluster.connect(); + FAIL() << "Connection was established using invalid IP address"; + } catch (Session::Exception& se) { + ASSERT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, se.error_code()); + ASSERT_GE(logger_.count(), 1u); + } +} + /** * Perform session connection while forcing a control connection reconnect * diff --git a/include/cassandra.h b/include/cassandra.h index 86e16b43d..3d5fbb0a7 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -946,6 +946,39 @@ CASS_EXPORT CassError cass_cluster_set_port(CassCluster* cluster, int port); +/** + * Same as cass_cluster_set_local_address(), but with lengths for string + * parameters. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] name + * @param[in] name_length + * @return same as cass_cluster_set_local_address() + * + * @see cass_cluster_set_local_address() + */ +CASS_EXPORT CassError +cass_cluster_set_local_address_n(CassCluster* cluster, + const char* name, + size_t name_length); + +/** + * Sets the local address to bind when connecting to the cluster, + * if desired. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] name IP address or hostname to bind, or empty string + * for no binding. + * @return CASS_OK if successful, otherwise an error occurred. + */ +CASS_EXPORT CassError +cass_cluster_set_local_address(CassCluster* cluster, + const char* name); + /** * Sets the SSL context and enables SSL. * diff --git a/src/cluster.cpp b/src/cluster.cpp index fe2b6a398..b8c636668 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -465,6 +465,18 @@ CassError cass_cluster_set_prepare_on_up_or_add_host(CassCluster* cluster, return CASS_OK; } +CassError cass_cluster_set_local_address(CassCluster* cluster, + const char* name) { + return cass_cluster_set_local_address_n(cluster, name, SAFE_STRLEN(name)); +} + +CassError cass_cluster_set_local_address_n(CassCluster* cluster, + const char* name, + size_t name_length) { + cluster->config().set_local_address(std::string(name, name_length)); + return CASS_OK; +} + CassError cass_cluster_set_no_compact(CassCluster* cluster, cass_bool_t enabled) { cluster->config().set_no_compact(enabled == cass_true); diff --git a/src/config.hpp b/src/config.hpp index 257c33a06..ac33fcfdb 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -81,6 +81,7 @@ class Config { , use_randomized_contact_points_(true) , prepare_on_all_hosts_(true) , prepare_on_up_or_add_host_(true) + , local_address_("") , no_compact_(false) { } Config new_instance() const { @@ -395,6 +396,12 @@ class Config { prepare_on_up_or_add_host_ = enabled; } + const std::string& local_address() const { return local_address_; } + + void set_local_address(const std::string& name) { + local_address_ = name; + } + bool no_compact() const { return no_compact_; } void set_no_compact(bool enabled) { @@ -448,6 +455,7 @@ class Config { bool use_randomized_contact_points_; bool prepare_on_all_hosts_; bool prepare_on_up_or_add_host_; + std::string local_address_; bool no_compact_; }; From 7fd6eb0f02758ce4b6ebdb176cad69b53a56f4a9 Mon Sep 17 00:00:00 2001 From: Keith Wansbrough Date: Wed, 11 Apr 2018 14:37:41 +0100 Subject: [PATCH 2/8] CPP-499 Improve tests. - Don't let it connect until we've set the cluster config! - Use better test names and fix up log messages. --- .../tests/test_control_connection.cpp | 53 +++++++++++++++---- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/gtests/src/integration/tests/test_control_connection.cpp b/gtests/src/integration/tests/test_control_connection.cpp index 897f39cdf..353a592e3 100644 --- a/gtests/src/integration/tests/test_control_connection.cpp +++ b/gtests/src/integration/tests/test_control_connection.cpp @@ -22,6 +22,17 @@ * Control connection integration tests; single node cluster */ class ControlConnectionTests : public Integration { + +public: + + void SetUp() { + // Call the parent setup function (don't automatically start session, + // because we don't want any connections established until we have + // set up the cluster). + is_session_requested_ = false; + Integration::SetUp(); + } + protected: /** * Execute multiple requests and ensure the expected nodes are used during @@ -187,9 +198,30 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, ConnectUsingInvalidPort) { } /** - * Perform session connection using invalid local IP address + * Perform session connection using unresolvable local IP address + * + * This test will attempt to perform a connection using an unresolvable local + * IP address and ensure the control connection is not established against a + * single node cluster. + * + * @test_category control_connection + * @since core:1.0.0 + * @expected_result Control connection will not be established + */ +CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, + ConnectUsingUnresolvableLocalIpAddress) { + CHECK_FAILURE; + + // Attempt to connect to the server using an unresolvable local IP address + Cluster cluster = default_cluster(); + EXPECT_EQ(CASS_ERROR_LIB_HOST_RESOLUTION, + cass_cluster_set_local_address(cluster.get(), "unknown.invalid")); +} + +/** + * Perform session connection using unbindable local IP address * - * This test will attempt to perform a connection using an invalid local IP + * This test will attempt to perform a connection using an unbindable local IP * address and ensure the control connection is not established against a * single node cluster. * @@ -198,17 +230,15 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, ConnectUsingInvalidPort) { * @expected_result Control connection will not be established */ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, - ConnectUsingInvalidLocalIpAddress) { + ConnectUsingUnbindableLocalIpAddress) { CHECK_FAILURE; - // Attempt to connect to the server using an invalid local IP address - logger_.add_critera("Unable to establish a control connection to host " \ - "1.1.1.1 because of the following error: Connection " \ - "timeout"); + // Attempt to connect to the server using an unbindable local IP address + logger_.add_critera("Unable to bind local address"); Cluster cluster = default_cluster().with_local_address("1.1.1.1"); try { cluster.connect(); - FAIL() << "Connection was established using invalid local IP address"; + FAIL() << "Connection was established using unbindable local IP address"; } catch (Session::Exception& se) { ASSERT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, se.error_code()); ASSERT_GE(logger_.count(), 1u); @@ -232,10 +262,11 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, CHECK_FAILURE; // Attempt to connect to the server using an valid local IP address - // but invalid remote address + // but invalid remote address. The specified remote is not routable + // from the specified local. logger_.add_critera("Unable to establish a control connection to host " \ - "1.1.1.1 because of the following error: Connection " \ - "timeout"); + "1.1.1.1 because of the following error: " \ + "Connect error 'operation not permitted'"); Cluster cluster = Cluster::build().with_contact_points("1.1.1.1") .with_local_address("127.0.0.1"); try { From 23a2f7d14951ab9553f132319662b1def05bf712 Mon Sep 17 00:00:00 2001 From: Keith Wansbrough Date: Wed, 11 Apr 2018 14:39:07 +0100 Subject: [PATCH 3/8] CPP-499 Implement local address binding. - Look up address at config time (but don't use resolver, because that might be slow). - Apply it at connection time. --- src/cluster.cpp | 8 +++++++- src/config.hpp | 10 +++++----- src/connection.cpp | 20 ++++++++++++++++---- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/cluster.cpp b/src/cluster.cpp index b8c636668..00558080a 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -473,7 +473,13 @@ CassError cass_cluster_set_local_address(CassCluster* cluster, CassError cass_cluster_set_local_address_n(CassCluster* cluster, const char* name, size_t name_length) { - cluster->config().set_local_address(std::string(name, name_length)); + cass::Address address; + if (name_length == 0 || + cass::Address::from_string(std::string(name, name_length), 0, &address)) { + cluster->config().set_local_address(address); + } else { + return CASS_ERROR_LIB_HOST_RESOLUTION; + } return CASS_OK; } diff --git a/src/config.hpp b/src/config.hpp index ac33fcfdb..4f86c9612 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -81,7 +81,6 @@ class Config { , use_randomized_contact_points_(true) , prepare_on_all_hosts_(true) , prepare_on_up_or_add_host_(true) - , local_address_("") , no_compact_(false) { } Config new_instance() const { @@ -396,10 +395,11 @@ class Config { prepare_on_up_or_add_host_ = enabled; } - const std::string& local_address() const { return local_address_; } + const Address* local_address() const { + return local_address_.is_valid() ? &local_address_ : NULL; } - void set_local_address(const std::string& name) { - local_address_ = name; + void set_local_address(const Address& address) { + local_address_ = address; } bool no_compact() const { return no_compact_; } @@ -455,7 +455,7 @@ class Config { bool use_randomized_contact_points_; bool prepare_on_all_hosts_; bool prepare_on_up_or_add_host_; - std::string local_address_; + Address local_address_; bool no_compact_; }; diff --git a/src/connection.cpp b/src/connection.cpp index ec9f2d36b..ba1ecf159 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -232,6 +232,7 @@ Connection::Connection(uv_loop_t* loop, , heartbeat_outstanding_(false) { socket_.data = this; uv_tcp_init(loop_, &socket_); + bool ok = true; if (uv_tcp_nodelay(&socket_, config.tcp_nodelay_enable() ? 1 : 0) != 0) { @@ -244,9 +245,19 @@ Connection::Connection(uv_loop_t* loop, LOG_WARN("Unable to set tcp keepalive"); } - SslContext* ssl_context = config_.ssl_context(); - if (ssl_context != NULL) { - ssl_session_.reset(ssl_context->create_session(host)); + const Address* local_address = config_.local_address(); + if (local_address) { + if (uv_tcp_bind(&socket_, local_address->addr(), 0)) { + ok = false; + notify_error("Unable to bind local address"); + } + } + + if (ok) { + SslContext* ssl_context = config_.ssl_context(); + if (ssl_context != NULL) { + ssl_session_.reset(ssl_context->create_session(host)); + } } } @@ -378,7 +389,8 @@ void Connection::set_state(ConnectionState new_state) { switch (state_) { case CONNECTION_STATE_NEW: - assert(new_state == CONNECTION_STATE_CONNECTING && + assert((new_state == CONNECTION_STATE_CONNECTING || + new_state == CONNECTION_STATE_CLOSE_DEFUNCT) && "Invalid connection state after new"); state_ = new_state; break; From 62da406bd406208478e1981ba710bde955487a14 Mon Sep 17 00:00:00 2001 From: Keith Wansbrough Date: Wed, 11 Apr 2018 14:41:03 +0100 Subject: [PATCH 4/8] CPP-499 Documentation for cass_cluster_set_local_address. --- include/cassandra.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/cassandra.h b/include/cassandra.h index 3d5fbb0a7..d524f8090 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -971,8 +971,8 @@ cass_cluster_set_local_address_n(CassCluster* cluster, * @public @memberof CassCluster * * @param[in] cluster - * @param[in] name IP address or hostname to bind, or empty string - * for no binding. + * @param[in] name IP address to bind, or empty string for no binding. + * Only numeric addresses are supported; no resolution is done. * @return CASS_OK if successful, otherwise an error occurred. */ CASS_EXPORT CassError From 3a460f66cc86f59ef1c26576b08d27d1c35b030d Mon Sep 17 00:00:00 2001 From: Keith Wansbrough Date: Mon, 16 Apr 2018 12:18:23 +0100 Subject: [PATCH 5/8] CPP-499 Review markups: error reporting, NULL handling. --- gtests/src/integration/tests/test_control_connection.cpp | 2 +- src/cluster.cpp | 3 ++- src/connection.cpp | 5 +++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/gtests/src/integration/tests/test_control_connection.cpp b/gtests/src/integration/tests/test_control_connection.cpp index 353a592e3..747f4e7ac 100644 --- a/gtests/src/integration/tests/test_control_connection.cpp +++ b/gtests/src/integration/tests/test_control_connection.cpp @@ -234,7 +234,7 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, CHECK_FAILURE; // Attempt to connect to the server using an unbindable local IP address - logger_.add_critera("Unable to bind local address"); + logger_.add_critera("Unable to bind local address: address not available"); Cluster cluster = default_cluster().with_local_address("1.1.1.1"); try { cluster.connect(); diff --git a/src/cluster.cpp b/src/cluster.cpp index 00558080a..87ea51fc1 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -473,8 +473,9 @@ CassError cass_cluster_set_local_address(CassCluster* cluster, CassError cass_cluster_set_local_address_n(CassCluster* cluster, const char* name, size_t name_length) { - cass::Address address; + cass::Address address; // default to AF_UNSPEC if (name_length == 0 || + name == NULL || cass::Address::from_string(std::string(name, name_length), 0, &address)) { cluster->config().set_local_address(address); } else { diff --git a/src/connection.cpp b/src/connection.cpp index ba1ecf159..1280fb81e 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -247,9 +247,10 @@ Connection::Connection(uv_loop_t* loop, const Address* local_address = config_.local_address(); if (local_address) { - if (uv_tcp_bind(&socket_, local_address->addr(), 0)) { + int rc = uv_tcp_bind(&socket_, local_address->addr(), 0); + if (rc) { ok = false; - notify_error("Unable to bind local address"); + notify_error("Unable to bind local address: " + std::string(UV_ERRSTR(rc, loop_))); } } From 256b0a2ff15c34b64478649fed7165a7a46d151f Mon Sep 17 00:00:00 2001 From: Keith Wansbrough Date: Fri, 20 Apr 2018 14:22:22 +0100 Subject: [PATCH 6/8] CPP-499: Further review markups. --- .../tests/test_control_connection.cpp | 1 + include/cassandra.h | 34 +++++++++---------- src/connection.cpp | 11 +++--- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/gtests/src/integration/tests/test_control_connection.cpp b/gtests/src/integration/tests/test_control_connection.cpp index 747f4e7ac..3bf7445f3 100644 --- a/gtests/src/integration/tests/test_control_connection.cpp +++ b/gtests/src/integration/tests/test_control_connection.cpp @@ -587,6 +587,7 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, CHECK_FAILURE; // Stop the cluster and attempt to perform a request + connect(); ccm_->stop_cluster(); Result result = session_.execute(SELECT_ALL_SYSTEM_LOCAL_CQL, CASS_CONSISTENCY_ONE, false, false); diff --git a/include/cassandra.h b/include/cassandra.h index d524f8090..8d6b66996 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -947,37 +947,37 @@ cass_cluster_set_port(CassCluster* cluster, int port); /** - * Same as cass_cluster_set_local_address(), but with lengths for string - * parameters. + * Sets the local address to bind when connecting to the cluster, + * if desired. * * @public @memberof CassCluster * * @param[in] cluster - * @param[in] name - * @param[in] name_length - * @return same as cass_cluster_set_local_address() - * - * @see cass_cluster_set_local_address() + * @param[in] name IP address to bind, or empty string for no binding. + * Only numeric addresses are supported; no resolution is done. + * @return CASS_OK if successful, otherwise an error occurred. */ CASS_EXPORT CassError -cass_cluster_set_local_address_n(CassCluster* cluster, - const char* name, - size_t name_length); +cass_cluster_set_local_address(CassCluster* cluster, + const char* name); /** - * Sets the local address to bind when connecting to the cluster, - * if desired. + * Same as cass_cluster_set_local_address(), but with lengths for string + * parameters. * * @public @memberof CassCluster * * @param[in] cluster - * @param[in] name IP address to bind, or empty string for no binding. - * Only numeric addresses are supported; no resolution is done. - * @return CASS_OK if successful, otherwise an error occurred. + * @param[in] name + * @param[in] name_length + * @return same as cass_cluster_set_local_address() + * + * @see cass_cluster_set_local_address() */ CASS_EXPORT CassError -cass_cluster_set_local_address(CassCluster* cluster, - const char* name); +cass_cluster_set_local_address_n(CassCluster* cluster, + const char* name, + size_t name_length); /** * Sets the SSL context and enables SSL. diff --git a/src/connection.cpp b/src/connection.cpp index 1280fb81e..ce8628cad 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -232,7 +232,6 @@ Connection::Connection(uv_loop_t* loop, , heartbeat_outstanding_(false) { socket_.data = this; uv_tcp_init(loop_, &socket_); - bool ok = true; if (uv_tcp_nodelay(&socket_, config.tcp_nodelay_enable() ? 1 : 0) != 0) { @@ -249,16 +248,14 @@ Connection::Connection(uv_loop_t* loop, if (local_address) { int rc = uv_tcp_bind(&socket_, local_address->addr(), 0); if (rc) { - ok = false; notify_error("Unable to bind local address: " + std::string(UV_ERRSTR(rc, loop_))); + return; } } - if (ok) { - SslContext* ssl_context = config_.ssl_context(); - if (ssl_context != NULL) { - ssl_session_.reset(ssl_context->create_session(host)); - } + SslContext* ssl_context = config_.ssl_context(); + if (ssl_context != NULL) { + ssl_session_.reset(ssl_context->create_session(host)); } } From 330245ba0d30d3298f84cb0f7fb47a23d73e1baa Mon Sep 17 00:00:00 2001 From: Ayush Sengupta Date: Fri, 22 Jun 2018 13:56:01 -0700 Subject: [PATCH 7/8] Partition Aware Policy --- src/jenkins_hash.cpp | 97 +++++++++++++++++++++++++++++ src/jenkins_hash.hpp | 142 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 239 insertions(+) create mode 100644 src/jenkins_hash.cpp create mode 100644 src/jenkins_hash.hpp diff --git a/src/jenkins_hash.cpp b/src/jenkins_hash.cpp new file mode 100644 index 000000000..f4eafb4ac --- /dev/null +++ b/src/jenkins_hash.cpp @@ -0,0 +1,97 @@ +// Copyright 2011 Google Inc. All Rights Reserved. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +// Contains the legacy Bob Jenkins Lookup2-based hashing routines. These need to +// always return the same results as their values have been recorded in various +// places and cannot easily be updated. Original author: Sanjay Ghemawat + +#include "jenkins_hash.hpp" + +namespace cass { + +uint64_t Hash64StringWithSeed(const char *s, uint32_t len, uint64_t c) { + uint64_t a, b; + uint32_t keylen; + + a = b = JENKINS_ULONGLONG(0xe08c1d668b756f82); // the golden ratio; an arbitrary value + + for (keylen = len; keylen >= 3 * sizeof(a); + keylen -= 3 * static_cast(sizeof(a)), s += 3 * sizeof(a)) { + a += Word64At(s); + b += Word64At(s + sizeof(a)); + c += Word64At(s + sizeof(a) * 2); + mix(a, b, c); + } + + c += len; + switch (keylen) { // deal with rest. Cases fall through + case 23: + c += char2unsigned64(s[22]) << 56; + case 22: + c += char2unsigned64(s[21]) << 48; + case 21: + c += char2unsigned64(s[20]) << 40; + case 20: + c += char2unsigned64(s[19]) << 32; + case 19: + c += char2unsigned64(s[18]) << 24; + case 18: + c += char2unsigned64(s[17]) << 16; + case 17: + c += char2unsigned64(s[16]) << 8; + // the first byte of c is reserved for the length + case 16: + b += Word64At(s + 8); + a += Word64At(s); + break; + case 15: + b += char2unsigned64(s[14]) << 48; + case 14: + b += char2unsigned64(s[13]) << 40; + case 13: + b += char2unsigned64(s[12]) << 32; + case 12: + b += char2unsigned64(s[11]) << 24; + case 11: + b += char2unsigned64(s[10]) << 16; + case 10: + b += char2unsigned64(s[9]) << 8; + case 9: + b += char2unsigned64(s[8]); + case 8: + a += Word64At(s); + break; + case 7: + a += char2unsigned64(s[6]) << 48; + case 6: + a += char2unsigned64(s[5]) << 40; + case 5: + a += char2unsigned64(s[4]) << 32; + case 4: + a += char2unsigned64(s[3]) << 24; + case 3: + a += char2unsigned64(s[2]) << 16; + case 2: + a += char2unsigned64(s[1]) << 8; + case 1: + a += char2unsigned64(s[0]); + // case 0: nothing left to add + } + mix(a, b, c); + return c; +} + +} // namespace cass diff --git a/src/jenkins_hash.hpp b/src/jenkins_hash.hpp new file mode 100644 index 000000000..56ce9310b --- /dev/null +++ b/src/jenkins_hash.hpp @@ -0,0 +1,142 @@ +// Copyright 2011 Google Inc. All Rights Reserved. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +/** + * A hash function that returns a 64-bit hash value. + * + * This is the implementation of + * Bob Jenkin's hash function (http://burtleburtle.net/bob/hash/evahash.html). + * + * The hash function takes a string as a input and a start seed. Hashing the same + * string with two different seeds should give two independent hash values. The + * functions just do a single mix, in order to convert the key into something + * *random*. This hash function is guarenteed to have no funnels, i.e there is no + * theoretical limit on the number of bits in the internal state a subset of bits + * of the input can change. Hence the chances of collisions are low even if the + * input differs only by a few bits. + * + */ + +#ifndef __CASS_JENKINS_HASH_HPP_INCLUDED__ +#define __CASS_JENKINS_HASH_HPP_INCLUDED__ + + +#include + +namespace cass { +/* + * Detecting the *endianness* of the machine. + * Default detection implemented with reference to + * http://www.boost.org/doc/libs/1_42_0/boost/detail/endian.hpp +*/ + +#if defined (__GLIBC__) +# include +# if (__BYTE_ORDER == __LITTLE_ENDIAN) +# define JENKINS_LITTLE_ENDIAN +# elif (__BYTE_ORDER == __BIG_ENDIAN) +# define JENKINS_BIG_ENDIAN +# elif (__BYTE_ORDER == __PDP_ENDIAN) +# define JENKINS_PDP_ENDIAN +# else +# error Unknown machine endianness detected. +# endif +# define JENKINS_BYTE_ORDER __BYTE_ORDER +#elif defined(_BIG_ENDIAN) && !defined(_LITTLE_ENDIAN) +# define JENKINS_BIG_ENDIAN +# define JENKINS_BYTE_ORDER 4321 +#elif defined(_LITTLE_ENDIAN) && !defined(_BIG_ENDIAN) +# define JENKINS_LITTLE_ENDIAN +# define JENKINS_BYTE_ORDER 1234 +#elif defined(__sparc) || defined(__sparc__) \ + || defined(_POWER) || defined(__powerpc__) \ + || defined(__ppc__) || defined(__hpux) || defined(__hppa) \ + || defined(_MIPSEB) || defined(_POWER) \ + || defined(__s390__) +# define JENKINS_BIG_ENDIAN +# define JENKINS_BYTE_ORDER 4321 +#elif defined(__i386__) || defined(__alpha__) \ + || defined(__ia64) || defined(__ia64__) \ + || defined(_M_IX86) || defined(_M_IA64) \ + || defined(_M_ALPHA) || defined(__amd64) \ + || defined(__amd64__) || defined(_M_AMD64) \ + || defined(__x86_64) || defined(__x86_64__) \ + || defined(_M_X64) || defined(__bfin__) + +# define JENKINS_LITTLE_ENDIAN +# define JENKINS_BYTE_ORDER 1234 +#else +# error The file cassandra-cpp-driver/src/jenkins_hash.hpp needs to be set up for your CPU type. +#endif + +#define JENKINS_ULONGLONG(x) x##ULL +// ---------------------------------------------------------------------- +// mix() +// The mix function is due to Bob Jenkins (see +// http://burtleburtle.net/bob/hash/index.html). +// Each mix takes 36 instructions, in 18 cycles if you're lucky. +// +// On x86 architectures, this requires 45 instructions in 27 cycles, +// if you're lucky. +// ---------------------------------------------------------------------- + +static inline void mix(uint64_t &a, uint64_t &b, uint64_t &c) { // 64bit version + a -= b; a -= c; a ^= (c>>43); + b -= c; b -= a; b ^= (a<<9); + c -= a; c -= b; c ^= (b>>8); + a -= b; a -= c; a ^= (c>>38); + b -= c; b -= a; b ^= (a<<23); + c -= a; c -= b; c ^= (b>>5); + a -= b; a -= c; a ^= (c>>35); + b -= c; b -= a; b ^= (a<<49); + c -= a; c -= b; c ^= (b>>11); + a -= b; a -= c; a ^= (c>>12); + b -= c; b -= a; b ^= (a<<18); + c -= a; c -= b; c ^= (b>>22); +} + +#define UNALIGNED_LOAD64(_p) (*reinterpret_cast(_p)) + +#if defined(JENKINS_LITTLE_ENDIAN) && !defined(NEED_ALIGNED_LOADS) +static inline uint64_t Word64At(const char *ptr) { + return UNALIGNED_LOAD64(ptr); +} + +#else + +// Note this code is unlikely to be used. +static inline uint64_t Word64At(const char *ptr) { + return (static_cast(ptr[0]) + + (static_cast(ptr[1]) << 8) + + (static_cast(ptr[2]) << 16) + + (static_cast(ptr[3]) << 24) + + (static_cast(ptr[4]) << 32) + + (static_cast(ptr[5]) << 40) + + (static_cast(ptr[6]) << 48) + + (static_cast(ptr[7]) << 56)); +} +#endif + +static inline uint64_t char2unsigned64(char c) { + return static_cast(static_cast(c)); +} + +uint64_t Hash64StringWithSeed(const char *s, uint32_t len, uint64_t c); + +} // namespace cass + +#endif // __CASS_JENKINS_HASH_HPP_INCLUDED__ \ No newline at end of file From 4f5868439654cc11e13af1f1a4415c1f58b403f3 Mon Sep 17 00:00:00 2001 From: Ayush Sengupta Date: Thu, 28 Jun 2018 13:58:38 -0700 Subject: [PATCH 8/8] Partition Aware Policy --- gtests/src/unit/tests/test_jenkins.cpp | 68 +++++++++++++++++++ gtests/src/unit/tests/test_md5.cpp | 34 +++++----- include/cassandra.h | 15 +++++ src/list_policy.hpp | 2 +- src/metadata.hpp | 6 ++ src/partition_aware_policy.cpp | 42 ++++++++++++ src/partition_aware_policy.hpp | 90 ++++++++++++++++++++++++++ src/partition_metadata.cpp | 35 ++++++++++ src/partition_metadata.hpp | 61 +++++++++++++++++ 9 files changed, 337 insertions(+), 16 deletions(-) create mode 100644 gtests/src/unit/tests/test_jenkins.cpp create mode 100644 src/partition_aware_policy.cpp create mode 100644 src/partition_aware_policy.hpp create mode 100644 src/partition_metadata.cpp create mode 100644 src/partition_metadata.hpp diff --git a/gtests/src/unit/tests/test_jenkins.cpp b/gtests/src/unit/tests/test_jenkins.cpp new file mode 100644 index 000000000..851ec2a0d --- /dev/null +++ b/gtests/src/unit/tests/test_jenkins.cpp @@ -0,0 +1,68 @@ +// Copyright 2011 Google Inc. All Rights Reserved. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "jenkins_hash.hpp" + +inline char* to_char_ptr (uint8_t* uptr) { + return reinterpret_cast(uptr); +} + +inline const char* to_char_ptr (const uint8_t* uptr) { + return reinterpret_cast(uptr); +} + +TEST(JenkinsUnitTest, TestHash64) { + + const uint64_t seed = 97; + + const uint8_t b1[] = { + 0xc7, 0x25, 0x1d, 0x5d, 0x75, 0x3a, 0x4e, 0x46, 0x22, 0x29, 0x4d, 0x6c, 0x67, 0x7a, 0xa8, 0x25, + 0x71 + }; + + const uint8_t b2[] = { + 0x83, 0x8e, 0x7e, 0xf0, 0x71, 0xef, 0x9b, 0x3e, 0x4a, 0xe6, 0x12, 0x60, 0xc0, 0xa1, 0xf9, 0x94, + 0x5a, 0x85, 0x9b, 0xb1, 0xf6, 0x86, 0x97, 0xe1, 0xab, 0x87, 0xc8, 0xab, 0xc1, 0x28, 0xd1, 0x72, + 0x73, 0x0b, 0xda, 0x50, 0xe3, 0xe6, 0xf9, 0x42 + }; + + const uint8_t b3[] = { + 0xad, 0xe3, 0xaa, 0xb7, 0xd2, 0xbc, 0x3a, 0xe6, 0x60, 0xe4, 0xc6, 0xc1, 0x02, 0x0a, 0x3a, 0x50, + 0x66, 0xb2, 0x26, 0x6c, 0x1d, 0x1b, 0x16, 0xb1, 0x1b, 0x51, 0x74, 0x9c, 0xa7, 0xbb, 0xad, 0x46, + 0x25, 0x54, 0xca, 0x30, 0x3a, 0x31, 0xd0, 0x34, 0x56, 0xac, 0xb1, 0xca, 0xaf, 0x7f, 0x5c, 0xf3, + 0x9e, 0x16, 0x94, 0x78, 0x84, 0xca, 0x60, 0x66, 0x27, 0x59, 0xe1, 0x99, 0xb4, 0xc4, 0xbd, 0x50, + 0x48, 0x50, 0xcb, 0xa6, 0x0b, 0xe1, 0x71, 0x31, 0x49, 0x27, 0x11, 0x9e, 0xcc, 0xcd, 0xd8, 0x19, + 0x09, 0xc6, 0xdf, 0x15, 0x64, 0x0d, 0xf7, 0x25, 0x5c, 0x48, 0x19, 0xc7, 0x6b, 0x10, 0x02, 0x7e, + 0x31, 0x54, 0x2a, 0xd8, 0x92, 0xe5, 0xc5, 0xab, 0xe9, 0x3d, 0x57, 0x99, 0x9a, 0x93, 0x4f, 0x48, + 0x3f, 0xfa, 0x73, 0x36, 0x03, 0xe1, 0xbd, 0x27, 0xe5, 0x06, 0x8a, 0x21, 0x33, 0xff, 0x91, 0x80, + 0x36, 0x4d, 0x2d, 0x04, 0xc7, 0x11, 0xcc, 0x2a, 0xc0, 0xa9, 0x17, 0x18, 0x73, 0xff, 0xd5, 0x0e, + 0x0d, 0x8b, 0x6f, 0x8b, 0xba, 0x8c, 0x37, 0x49, 0xb1, 0x31, 0x5b, 0xf4, 0x4d, 0xd7, 0x19, 0x10, + 0x40, 0x6e, 0x61, 0x41, 0xf1, 0x55, 0xaa, 0x44, 0x79, 0x13, 0x57, 0x3b, 0x72, 0xac, 0xfe, 0xce, + 0xf8, 0xd7, 0x07, 0x82, 0x05, 0xef, 0x0f, 0x53, 0x6c, 0xfe, 0x7d, 0x94, 0x48, 0xa5, 0x48, 0x42, + 0x47, 0x70, 0x29, 0xe7, 0x7e, 0x53, 0xca, 0x88, 0x89, 0x8a, 0xec, 0xe5, 0x01, 0x44, 0xf5, 0xc5, + 0xc9, 0x89, 0x6d, 0x6a, 0xf1, 0x26, 0x61, 0xae, 0x30, 0x50, 0x61, 0x68, 0x41, 0xac, 0x82, 0x40, + 0xdb, 0x12, 0x00, 0x68, 0xad, 0x34, 0x52, 0xb2, 0xbb, 0xc5, 0x74, 0xf1, 0x3e, 0x00, 0x98, 0x6e, + 0x1d, 0xc2, 0xd7, 0x7d, 0xc6, 0xc7, 0x10, 0xb2, 0xac, 0xcf, 0x8b, 0x25, 0xd9, 0x7d, 0xd5, 0x20 + }; + + EXPECT_TRUE(cass::Hash64StringWithSeed(to_char_ptr(b1), sizeof(b1), seed) == 1789751740810280356ul); + EXPECT_TRUE(cass::Hash64StringWithSeed(to_char_ptr(b2), sizeof(b2), seed) == 4001818822847464429ul); + EXPECT_TRUE(cass::Hash64StringWithSeed(to_char_ptr(b3), sizeof(b3), seed) == 15240025333683105143ul); + +} diff --git a/gtests/src/unit/tests/test_md5.cpp b/gtests/src/unit/tests/test_md5.cpp index 71c1a3f45..36268a469 100644 --- a/gtests/src/unit/tests/test_md5.cpp +++ b/gtests/src/unit/tests/test_md5.cpp @@ -1,18 +1,22 @@ -/* - Copyright (c) DataStax, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ +// Copyright 2011 Google Inc. All Rights Reserved. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +// Contains the legacy Bob Jenkins Lookup2-based hashing routines. These need to +// always return the same results as their values have been recorded in various +// places and cannot easily be updated. Original author: Sanjay Ghemawat #include diff --git a/include/cassandra.h b/include/cassandra.h index f8c6e3907..12ee2256e 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -430,6 +430,21 @@ typedef struct CassRetryPolicy_ CassRetryPolicy; */ typedef struct CassCustomPayload_ CassCustomPayload; +/** + * Contains the set of Cass Objects used to control a session and execute queries + */ +typedef struct SessionObjects { + CassFuture* cass_future; + CassSession* cass_session; + CassCluster* cass_cluster; + operator=(SessionObjects obj) { + cass_cluster = obj.cass_cluster; + cass_session = obj.cass_session; + cass_future = obj.cass_future; + } +}; + + /** * A snapshot of the session's performance/diagnostic metrics. * diff --git a/src/list_policy.hpp b/src/list_policy.hpp index 51dc67f25..167dc9a72 100644 --- a/src/list_policy.hpp +++ b/src/list_policy.hpp @@ -45,7 +45,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy { virtual ListPolicy* new_instance() = 0; -private: +protected: virtual bool is_valid_host(const Host::Ptr& host) const = 0; }; diff --git a/src/metadata.hpp b/src/metadata.hpp index 0874bb492..d7552b532 100644 --- a/src/metadata.hpp +++ b/src/metadata.hpp @@ -459,6 +459,12 @@ inline bool operator==(const ViewMetadata::Ptr& a, const ViewMetadata::Ptr& b) { return a->name() == b->name(); } +class PartitionMetadata { + public: + int dummy; +}; + + class TableMetadata : public TableMetadataBase { public: typedef SharedRefPtr Ptr; diff --git a/src/partition_aware_policy.cpp b/src/partition_aware_policy.cpp new file mode 100644 index 000000000..59df59be5 --- /dev/null +++ b/src/partition_aware_policy.cpp @@ -0,0 +1,42 @@ +#include "partition_aware_policy.hpp" + +#include "logger.hpp" + +namespace cass { + +// Partition Aware Policy should also implement functions like: + +// getKey(for a statement), getKey(for a byte), YBToCqlHashCode, CQlToYBHashCode. +// After that the implementation of the iterator is extremely simple. +// You have also decided to create a query plan object and pass down the Session object +// to the query object. + +class PartitionAwareQueryPlan: public QueryPlan { + +}; + +class PartitionAwarePolicy: public ChainedLoadBalancingPolicy { + public: + void init(const Host::Ptr &connected_host, + const HostMap &hosts, + Random *random) { + HostMap valid_hosts; + for (HostMap::iterator i = hosts.begin(), + end = hosts.end(); i != end; ++i) { + const Host::Ptr &host = i->second; + if (is_valid_host(host)) { // might want to define valid hosts as the ones up. + valid_hosts.insert(HostPair(i->first, host)); + } + } + + if (valid_hosts.empty()) { + LOG_ERROR("No valid hosts available for list policy."); + } + } + + private: + + +}; + +} // namespace cass \ No newline at end of file diff --git a/src/partition_aware_policy.hpp b/src/partition_aware_policy.hpp new file mode 100644 index 000000000..1bd805d4a --- /dev/null +++ b/src/partition_aware_policy.hpp @@ -0,0 +1,90 @@ +// Copyright 2011 Google Inc. All Rights Reserved. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. + +#ifndef __CASS_PARTITION_AWARE_POLICY_HPP_INCLUDED__ +#define __CASS_PARTITION_AWARE_POLICY_HPP_INCLUDED__ + +#include "load_balancing.hpp" +#include "host.hpp" +#include "random.hpp" +#include "cassandra.h" +#include "metadata.hpp" +#include "list_policy.hpp" + +namespace cass { + +class PartitionAwareQueryPlan: public QueryPlan { + public: + PartitionAwareQueryPlan(const PartitionAwarePolicy *policy, + cass::PartitionMetadata *partition_metadata, + int refresh_frequency_in_seconds); + + Host::Ptr compute_next(); + + HostVec get_partition_aware_hosts(); // should take tablet metadata and key + + void recompute_partition_metadata(); // will force a recomputation of partition metadata + + private: + const PartitionAwarePolicy *policy_; + PartitionMetadata *partition_metadata_; + HostVec partition_aware_hosts_; + int refresh_frequency_in_seconds_; +}; + +class PartitionAwarePolicy: public ChainedLoadBalancingPolicy { + public: + PartitionAwarePolicy(LoadBalancingPolicy *child_policy, + int refresh_frequency_in_seconds, + SessionObjects session_objects) + : child_policy_(child_policy), + refresh_frequency_in_seconds_(refresh_frequency_in_seconds), + session_objects_(session_objects) {} + + ~PartitionAwarePolicy() {} + + void init(const Host::Ptr &connected_host, const HostMap &hosts, Random *random); + + CassHostDistance distance(const Host::Ptr &host) { child_policy_->distance(host); }; + + void on_add(const Host::Ptr &host) { child_policy_->on_add(host); } + + void on_remove(const Host::Ptr &host) { child_policy_->on_remove(host); } + + void on_up(const Host::Ptr &host) { child_policy_->on_up(host); } + + void on_down(const Host::Ptr &host) { child_policy_->on_down(host); } + + void QueryPlan* new_query_plan(const std::string& keyspace, + RequestHandler* request_handler, + const TokenMap* token_map); + + private: + PartitionAwareQueryPlan* partition_aware_query_plan_; + + PartitionAwarePolicy *new_instance() { + + } + + SessionObjects session_objects_; + + int refresh_frequency_in_seconds_; + + bool is_valid_host(const Host::Ptr& host) { return true; } + +}; + +} // namespace cass \ No newline at end of file diff --git a/src/partition_metadata.cpp b/src/partition_metadata.cpp new file mode 100644 index 000000000..45522a7f9 --- /dev/null +++ b/src/partition_metadata.cpp @@ -0,0 +1,35 @@ +#include "partition_metadata.hpp" +#include "metadata.hpp" +#incldue + +namespace cass { + + class PartitionMetadata { + public: + + + private: + void load(ResultResponse rs) { + unordered_map > tableMap = new unordered_map > (); + for (Row row;;) { // figure out the right way to iterate over rows. + string keyspace_name; + KeyspaceMetadata *keyspace = cluster_metadata_->get_keyspace(row.get_string_by_name("keyspace_name", &keyspace_name)); + if (!keyspace) { + continue; + } + + TableMetadata table = keyspace->get_table(row.get_string_by_name("table_name")); + if (!table) { + continue; + } + + // This is explanatory now, just implement. + + } + + } + + }; + + +} // namespace cass diff --git a/src/partition_metadata.hpp b/src/partition_metadata.hpp new file mode 100644 index 000000000..7bdfb59c0 --- /dev/null +++ b/src/partition_metadata.hpp @@ -0,0 +1,61 @@ +#include "cassandra.h" +#include "response.hpp" +#include "result_response.hpp" +#inlcude "future.hpp" +#include "session.hpp" +#include "cluster.hpp" + +#include +#include +#include + +namespace cass { + +class PartitionMetadata { + public: + PartitionMetadata(Cluster *cluster, Session *session, Future *future) { + cluster_ = cluster; + session_ = session; + future_ = future; + execute_query(); + } + + void update_result_set(); + + private: + // static bool isSystem(); + + // static int getKey(); + unordered_map > tableMap; + + static string partitions_query_; + + Cluster *cluster_; + + Session *session_; + + Future *close_future_; + + ResultResponse *result_response_; + + HostVec all_hosts_; + + HostVec up_hosts_; + + Metadata *cluster_metadata_; + + void execute_query(); + + void load(); + + // Not sure if this is absolutely needed. + void loadAsync(); + + int refresh_frequency_in_seconds_; + + string PARTITIONS_QUERY = + "select keyspace_name, table_name, replica_addresses from system.partitions;"; + +}; + +} // namespace cass \ No newline at end of file