Skip to content

Commit

Permalink
Cassandra role consistency level (#287)
Browse files Browse the repository at this point in the history
* Added consistency_level parameter for use in cql queries in cassandra_role, cassandra_keyspace and cassandra_table modules. See the project's README.md for details about individual level support.
---------

Co-authored-by: Демьяненко Александр Сергеевич <[email protected]>
  • Loading branch information
rhysmeister and Демьяненко Александр Сергеевич authored Oct 13, 2024
1 parent e029d38 commit de87974
Show file tree
Hide file tree
Showing 15 changed files with 529 additions and 75 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ These roles prepare servers with Debian-based and RHEL-based distributions to ru
- `cassandra_upgradesstables`- Upgrade SSTables which are not on the current Cassandra version.
- `cassandra_verify`- Checks the data checksum for one or more tables.

## Module support for Consistency Level

The pure-python modules, currently cassandra_role, cassandra_keyspace & cassandra_table all have a consistency_level parameter, through which the consistency level can be changed. Not all consistency levels are supported by read and write. The table below summarizes this.

| **Consistency Level** | **Read** | **Write** |
|-------------------------|----------|-----------|
| **ANY** | No | Yes |
| **ONE** | Yes | Yes |
| **TWO** | Yes | Yes |
| **THREE** | Yes | Yes |
| **QUORUM** | Yes | Yes |
| **ALL** | Yes | Yes |
| **LOCAL_ONE** | Yes | Yes |
| **LOCAL_QUORUM** | Yes | Yes |
| **EACH_QUORUM** | No | Yes |
| **SERIAL** | Yes | No |
| **LOCAL_SERIAL** | Yes | No |

If the chosen consistency level is not supported, by either read or write, then the default *LOCAL_ONE* is used.

## Supported Cassandra Versions

* 4.0.X
Expand Down
86 changes: 76 additions & 10 deletions plugins/modules/cassandra_keyspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@
type: dict
aliases:
- data_centers
consistency_level:
description:
- Consistency level to perform cassandra queries with.
- Not all consistency levels are supported by read or write connections.\
When a level is not supported then LOCAL_ONE, the default is used.
- Consult the README.md on GitHub for further details.
type: str
default: "LOCAL_ONE"
choices:
- ANY
- ONE
- TWO
- THREE
- QUORUM
- ALL
- LOCAL_QUORUM
- EACH_QUORUM
- SERIAL
- LOCAL_SERIAL
- LOCAL_ONE
requirements:
- cassandra-driver
Expand Down Expand Up @@ -147,7 +167,10 @@

try:
from cassandra.cluster import Cluster, AuthenticationFailed
from cassandra.cluster import EXEC_PROFILE_DEFAULT
from cassandra.cluster import ExecutionProfile
from cassandra.auth import PlainTextAuthProvider
from cassandra import ConsistencyLevel
HAS_CASSANDRA_DRIVER = True
except Exception:
HAS_CASSANDRA_DRIVER = False
Expand Down Expand Up @@ -251,6 +274,38 @@ def keyspace_is_changed(module, cluster, keyspace, replication_factor,
module.fail_json("Unknown Replication strategy: {0}".format(cfg['class']))
return keyspace_definition_changed


def get_read_and_write_sessions(login_host,
login_port,
auth_provider,
ssl_context,
consistency_level):
profile = ExecutionProfile(
consistency_level=ConsistencyLevel.name_to_value[consistency_level])
if consistency_level in ["ANY", "EACH_QUORUM"]: # Not supported for reads
cluster_r = Cluster(login_host,
port=login_port,
auth_provider=auth_provider,
ssl_context=ssl_context) # Will be LOCAL_ONE
else:
cluster_r = Cluster(login_host,
port=login_port,
auth_provider=auth_provider,
ssl_context=ssl_context,
execution_profiles={EXEC_PROFILE_DEFAULT: profile})
if consistency_level in ["SERIAL", "LOCAL_SERIAL"]: # Not supported for writes
cluster_w = Cluster(login_host,
port=login_port,
auth_provider=auth_provider,
ssl_context=ssl_context) # Will be LOCAL_ONE
else:
cluster_w = Cluster(login_host,
port=login_port,
auth_provider=auth_provider,
ssl_context=ssl_context,
execution_profiles={EXEC_PROFILE_DEFAULT: profile})
return (cluster_r, cluster_w) # Return a tuple of sessions for C* (read, write)

############################################


Expand All @@ -273,7 +328,11 @@ def main():
state=dict(type='str', required=True, choices=['present', 'absent']),
replication_factor=dict(type='int', default=1),
durable_writes=dict(type='bool', default=True),
data_centres=dict(type='dict', aliases=['data_centers'])),
data_centres=dict(type='dict', aliases=['data_centers']),
consistency_level=dict(type='str',
required=False,
default="LOCAL_ONE",
choices=ConsistencyLevel.name_to_value.keys())),
supports_check_mode=True
)

Expand Down Expand Up @@ -303,6 +362,7 @@ def main():
replication_factor = module.params['replication_factor']
durable_writes = module.params['durable_writes']
data_centres = module.params['data_centres']
consistency_level = module.params['consistency_level']

if HAS_SSL_LIBRARY is False and ssl is True:
msg = ("This module requires the SSL python"
Expand Down Expand Up @@ -343,18 +403,24 @@ def main():
ssl_context.verify_mode = getattr(ssl_lib, module.params['ssl_cert_reqs'])
if ssl_cert_reqs in ('CERT_REQUIRED', 'CERT_OPTIONAL'):
ssl_context.load_verify_locations(module.params['ssl_ca_certs'])
cluster = Cluster(login_host,
port=login_port,
auth_provider=auth_provider,
ssl_context=ssl_context)
session = cluster.connect()

sessions = get_read_and_write_sessions(login_host,
login_port,
auth_provider,
ssl_context,
consistency_level)

cluster = sessions[1] # maintain cluster object for comptbility
session_r = sessions[0].connect()
session_w = sessions[1].connect()

except AuthenticationFailed as excep:
module.fail_json(msg="Authentication failed: {0}".format(excep))
except Exception as excep:
module.fail_json(msg="Error connecting to cluster: {0}".format(excep))

try:
if keyspace_exists(session, keyspace):
if keyspace_exists(session_r, keyspace):
if module.check_mode:
if state == "present":
if keyspace_is_changed(module,
Expand All @@ -379,7 +445,7 @@ def main():
data_centres):

cql = create_alter_keyspace(module,
session,
session_w,
keyspace,
replication_factor,
durable_writes,
Expand All @@ -390,7 +456,7 @@ def main():
else:
result['changed'] = False
elif state == "absent":
drop_keyspace(session, keyspace)
drop_keyspace(session_w, keyspace)
result['changed'] = True
else:
if module.check_mode:
Expand All @@ -401,7 +467,7 @@ def main():
else:
if state == "present":
cql = create_alter_keyspace(module,
session,
session_w,
keyspace,
replication_factor,
durable_writes,
Expand Down
Loading

0 comments on commit de87974

Please sign in to comment.