Skip to content

Commit

Permalink
Allow specifying storage classes (#777)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: aburadeh <[email protected]>
Co-authored-by: mohammad-aburadeh <[email protected]>
Co-authored-by: Alexander Dejanovski <[email protected]>
  • Loading branch information
4 people authored Jun 14, 2024
1 parent 3da2655 commit 6a56e53
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 16 deletions.
12 changes: 12 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ storage_provider = <Storage system used for backups>
; storage_provider should be either of "local", "google_storage" or "s3"
region = <Region hosting the storage>
; Storage class to use when uploading objects.
; Use a value specific to chosen `storage_provider` that supports both reads and writes (eg S3's GLACIER and Azure's ARCHIVE won't work).
; If not specified, we default to the 'hottest' class (STANDARD, STANDARD, HOT for GCP, AWS, AZURE respectively).
; Supported values:
; AWS S3: STANDARD | REDUCED_REDUNDANCY | STANDARD_IA | ONEZONE_IA | INTELLIGENT_TIERING
; GCP: STANDARD | Unsupported | Unsupported | Unsupported
; AZURE: HOT | COOL | COLD
; https://aws.amazon.com/s3/storage-classes/
; https://cloud.google.com/storage/docs/storage-classes
; https://learn.microsoft.com/en-us/azure/storage/blobs/access-tiers-overview
; storage_class = <Storage Class Name used to store backups>
; Name of the bucket used for storing backups
bucket_name = cassandra_backups
Expand Down
12 changes: 12 additions & 0 deletions medusa-example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ storage_provider = <Storage system used for backups>
; storage_provider should be either of "local", "google_storage", "azure_blobs" or the s3_* values from
; https://github.com/apache/libcloud/blob/trunk/libcloud/storage/types.py

; Storage class to use when uploading objects.
; Use a value specific to chosen `storage_provider` that supports both reads and writes (eg S3's GLACIER and Azure's ARCHIVE won't work).
; If not specified, we default to the 'hottest' class (STANDARD, STANDARD, HOT for GCP, AWS, AZURE respectively).
; Supported values:
; AWS S3: STANDARD | REDUCED_REDUNDANCY | STANDARD_IA | ONEZONE_IA | INTELLIGENT_TIERING
; GCP: STANDARD | Unsupported | Unsupported | Unsupported
; AZURE: HOT | COOL | COLD
; https://aws.amazon.com/s3/storage-classes/
; https://cloud.google.com/storage/docs/storage-classes
; https://learn.microsoft.com/en-us/azure/storage/blobs/access-tiers-overview
; storage_class = <Storage Class Name used to store backups>

; Name of the bucket used for storing backups
bucket_name = cassandra_backups

Expand Down
2 changes: 1 addition & 1 deletion medusa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

StorageConfig = collections.namedtuple(
'StorageConfig',
['bucket_name', 'key_file', 'prefix', 'fqdn', 'host_file_separator', 'storage_provider',
['bucket_name', 'key_file', 'prefix', 'fqdn', 'host_file_separator', 'storage_provider', 'storage_class',
'base_path', 'max_backup_age', 'max_backup_count', 'api_profile', 'transfer_max_bandwidth',
'concurrent_transfers', 'multi_part_upload_threshold', 'host', 'region', 'port', 'secure', 'ssl_verify',
'aws_cli_path', 'kms_id', 'backup_grace_period_in_days', 'use_sudo_for_restore', 'k8s_mode', 'read_timeout']
Expand Down
8 changes: 7 additions & 1 deletion medusa/storage/abstract_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
MAX_UP_DOWN_LOAD_RETRIES = 5


AbstractBlob = collections.namedtuple('AbstractBlob', ['name', 'size', 'hash', 'last_modified'])
AbstractBlob = collections.namedtuple('AbstractBlob', ['name', 'size', 'hash', 'last_modified', 'storage_class'])

AbstractBlobMetadata = collections.namedtuple('AbstractBlobMetadata', ['name', 'sse_enabled', 'sse_key_id'])

Expand Down Expand Up @@ -442,6 +442,12 @@ def additional_upload_headers(self):
"""
return {}

def get_storage_class(self):
if self.config.storage_class is not None:
return self.config.storage_class.upper()
else:
return None

@staticmethod
def human_readable_size(size, decimal_places=3):
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
Expand Down
12 changes: 9 additions & 3 deletions medusa/storage/azure_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from azure.core.credentials import AzureNamedKeyCredential
from azure.storage.blob.aio import BlobServiceClient
from azure.storage.blob import BlobProperties
from azure.storage.blob import BlobProperties, StandardBlobTier
from medusa.storage.abstract_storage import AbstractStorage, AbstractBlob, AbstractBlobMetadata, ObjectDoesNotExistError
from pathlib import Path
from retrying import retry
Expand Down Expand Up @@ -95,7 +95,8 @@ async def _list_blobs(self, prefix=None) -> t.List[AbstractBlob]:
b_props.name,
b_props.size,
self._get_blob_hash(b_props),
b_props.last_modified)
b_props.last_modified,
b_props.blob_tier)
)
return blobs

Expand All @@ -116,17 +117,20 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic
self.config.bucket_name, object_key
)
)
storage_class = self.get_storage_class()
blob_client = await self.azure_container_client.upload_blob(
name=object_key,
data=data,
overwrite=True,
standard_blob_tier=StandardBlobTier(storage_class.capitalize()) if storage_class else None,
)
blob_properties = await blob_client.get_blob_properties()
return AbstractBlob(
blob_properties.name,
blob_properties.size,
self._get_blob_hash(blob_properties),
blob_properties.last_modified,
blob_properties.blob_tier
)

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
Expand Down Expand Up @@ -173,6 +177,7 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:
blob_properties.size,
self._get_blob_hash(blob_properties),
blob_properties.last_modified,
blob_properties.blob_tier
)

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
Expand All @@ -188,13 +193,14 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
src, self.human_readable_size(file_size), self.config.bucket_name, object_key
)
)

storage_class = self.get_storage_class()
with open(src, "rb") as data:
blob_client = await self.azure_container_client.upload_blob(
name=object_key,
data=data,
overwrite=True,
max_concurrency=16,
standard_blob_tier=StandardBlobTier(storage_class.capitalize()) if storage_class else None,
)
blob_properties = await blob_client.get_blob_properties()
mo = ManifestObject(
Expand Down
18 changes: 15 additions & 3 deletions medusa/storage/google_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ async def _list_blobs(self, prefix=None) -> t.List[AbstractBlob]:
int(o['size']),
o['md5Hash'],
# datetime comes as a string like 2023-08-31T14:23:24.957Z
datetime.datetime.strptime(o['timeCreated'], '%Y-%m-%dT%H:%M:%S.%fZ')
datetime.datetime.strptime(o['timeCreated'], '%Y-%m-%dT%H:%M:%S.%fZ'),
o['storageClass']
)
async for o in objects
]
Expand Down Expand Up @@ -125,14 +126,20 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic
self.config.bucket_name, object_key
)
)

storage_class = self.get_storage_class()
ex_header = {"storageClass": storage_class} if storage_class else {}
resp = await self.gcs_storage.upload(
bucket=self.bucket_name,
object_name=object_key,
file_data=data,
force_resumable_upload=True,
timeout=-1,
headers=ex_header,
)
return AbstractBlob(
resp['name'], int(resp['size']), resp['md5Hash'], resp['timeCreated'], storage_class.upper()
)
return AbstractBlob(resp['name'], int(resp['size']), resp['md5Hash'], resp['timeCreated'])

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
async def _download_blob(self, src: str, dest: str):
Expand Down Expand Up @@ -181,7 +188,8 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:
int(blob['size']),
blob['md5Hash'],
# datetime comes as a string like 2023-08-31T14:23:24.957Z
datetime.datetime.strptime(blob['timeCreated'], '%Y-%m-%dT%H:%M:%S.%fZ')
datetime.datetime.strptime(blob['timeCreated'], '%Y-%m-%dT%H:%M:%S.%fZ'),
blob['storageClass']
)

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
Expand All @@ -197,12 +205,16 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
src, self.config.bucket_name, object_key
)
)

storage_class = self.get_storage_class()
ex_header = {"storageClass": storage_class} if storage_class else {}
resp = await self.gcs_storage.copy(
bucket=self.bucket_name,
object_name=f'{src}'.replace(f'gs://{self.bucket_name}/', ''),
destination_bucket=self.bucket_name,
new_name=object_key,
timeout=-1,
headers=ex_header,
)
resp = resp['resource']
else:
Expand Down
9 changes: 6 additions & 3 deletions medusa/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ async def _list_blobs(self, prefix=None):
str(p.relative_to(self.root_dir)),
os.stat(self.root_dir / p).st_size,
self._md5(self.root_dir / p),
datetime.datetime.fromtimestamp(os.stat(self.root_dir / p).st_mtime)
datetime.datetime.fromtimestamp(os.stat(self.root_dir / p).st_mtime),
None
)
for p in paths if not p.is_dir()
]
Expand Down Expand Up @@ -92,7 +93,8 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic
object_key,
os.stat(object_path).st_size,
md5.hexdigest(),
datetime.datetime.fromtimestamp(os.stat(object_path).st_mtime)
datetime.datetime.fromtimestamp(os.stat(object_path).st_mtime),
None
)

async def _download_blob(self, src: str, dest: str):
Expand Down Expand Up @@ -160,7 +162,8 @@ async def _get_object(self, object_key: t.Union[Path, str]) -> AbstractBlob:
str(object_key),
os.stat(object_path).st_size,
self._md5(object_path),
datetime.datetime.fromtimestamp(os.stat(object_path).st_mtime)
datetime.datetime.fromtimestamp(os.stat(object_path).st_mtime),
None
)

async def _read_blob_as_bytes(self, blob: AbstractBlob) -> bytes:
Expand Down
14 changes: 11 additions & 3 deletions medusa/storage/s3_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ async def _list_blobs(self, prefix=None) -> t.List[AbstractBlob]:

for o in response.get('Contents', []):
obj_hash = o['ETag'].replace('"', '')
blobs.append(AbstractBlob(o['Key'], o['Size'], obj_hash, o['LastModified']))
blobs.append(AbstractBlob(o['Key'], o['Size'], obj_hash, o['LastModified'], o['StorageClass']))

return blobs

Expand All @@ -264,6 +264,10 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic
kms_args['ServerSideEncryption'] = 'aws:kms'
kms_args['SSEKMSKeyId'] = self.kms_id

storage_class = self.get_storage_class()
if storage_class is not None:
kms_args['StorageClass'] = storage_class

logging.debug(
'[S3 Storage] Uploading object from stream -> s3://{}/{}'.format(
self.bucket_name, object_key
Expand Down Expand Up @@ -326,7 +330,7 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:
try:
resp = self.s3_client.head_object(Bucket=self.bucket_name, Key=object_key)
item_hash = resp['ETag'].replace('"', '')
return AbstractBlob(object_key, int(resp['ContentLength']), item_hash, resp['LastModified'])
return AbstractBlob(object_key, int(resp['ContentLength']), item_hash, resp['LastModified'], None)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey' or e.response['Error']['Code'] == '404':
logging.debug("[S3 Storage] Object {} not found".format(object_key))
Expand All @@ -339,7 +343,7 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:
def __stat_blob(self, key):
resp = self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
item_hash = resp['ETag'].replace('"', '')
return AbstractBlob(key, int(resp['ContentLength']), item_hash, resp['LastModified'])
return AbstractBlob(key, int(resp['ContentLength']), item_hash, resp['LastModified'], None)

@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
Expand All @@ -353,6 +357,10 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
kms_args['ServerSideEncryption'] = 'aws:kms'
kms_args['SSEKMSKeyId'] = self.kms_id

storage_class = self.get_storage_class()
if storage_class is not None:
kms_args['StorageClass'] = storage_class

file_size = os.stat(src).st_size
logging.debug(
'[S3 Storage] Uploading {} ({}) -> {}'.format(
Expand Down
38 changes: 38 additions & 0 deletions tests/integration/features/integration_tests.feature
Original file line number Diff line number Diff line change
Expand Up @@ -1146,3 +1146,41 @@ Feature: Integration tests
Examples: Local storage
| storage | client encryption |
| local | with_client_encryption |

@32
Scenario Outline: Perform a differential backup with explicit storage class, then verify it
Given I have a fresh ccm cluster "<client encryption>" running named "scenario32"
Given I will use "<storage class>" as storage class in the storage
Given I am using "<storage>" as storage provider in ccm cluster "<client encryption>" with gRPC server
When I create the "test" table with secondary index in keyspace "medusa"
When I load 100 rows in the "medusa.test" table
When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I perform a backup in "differential" mode of the node named "first_backup" with md5 checks "disabled"
Then I can see the backup named "first_backup" when I list the backups
Then I can verify the backup named "first_backup" with md5 checks "disabled" successfully
Then I can see 2 SSTables with "<storage class>" in the SSTable pool for the "test" table in keyspace "medusa"

@s3
Examples: S3 storage
| storage | client encryption | storage class |
| s3_us_west_oregon | without_client_encryption | STANDARD |
| s3_us_west_oregon | without_client_encryption | REDUCED_REDUNDANCY |
| s3_us_west_oregon | without_client_encryption | STANDARD_IA |
| s3_us_west_oregon | without_client_encryption | ONEZONE_IA |
| s3_us_west_oregon | without_client_encryption | INTELLIGENT_TIERING |

@gcs
Examples: Google Cloud Storage
| storage | client encryption | storage class |
| google_storage | without_client_encryption | STANDARD |
# this is buggy for now, the library does not propagate the custom storage class headers
# | google_storage | without_client_encryption | NEARLINE |
# | google_storage | without_client_encryption | COLDLINE |
# | google_storage | without_client_encryption | ARCHIVE |

@azure
Examples: Azure Blob Storage
| storage | client encryption | storage class |
| azure_blobs | without_client_encryption | HOT |
| azure_blobs | without_client_encryption | COOL |
| azure_blobs | without_client_encryption | COLD |
22 changes: 22 additions & 0 deletions tests/integration/features/steps/integration_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ def _i_run_a_dse_command(context, command):
])


@given(r'I will use "{storage_class}" as storage class in the storage')
def i_will_use_storage_class(context, storage_class):
context.storage_class = storage_class


@given(r'I am using "{storage_provider}" as storage provider in ccm cluster "{client_encryption}"')
def i_am_using_storage_provider(context, storage_provider, client_encryption):
context.storage_provider = storage_provider
Expand Down Expand Up @@ -531,6 +536,9 @@ def get_args(context, storage_provider, client_encryption, cassandra_url, use_mg
stop_cmd = f'resources/dse/stop-dse.sh {context.dse_version}'

storage_args = {"prefix": storage_prefix}
if hasattr(context, "storage_class"):
storage_args.update({"storage_class": context.storage_class})

cassandra_args = {
"is_ccm": str(is_ccm),
"stop_cmd": stop_cmd,
Expand Down Expand Up @@ -1268,6 +1276,15 @@ def _the_backup_index_exists(context):
)
def _i_can_see_nb_sstables_in_the_sstable_pool(
context, nb_sstables, table_name, keyspace
):
_i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(context, nb_sstables, None, table_name, keyspace)


# Then I can see 2 SSTables with "<storage class>" in the SSTable pool for the "test" table in keyspace "medusa"
@then(r'I can see {nb_sstables} SSTables with "{storage_class}" in the SSTable pool '
r'for the "{table_name}" table in keyspace "{keyspace}"')
def _i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(
context, nb_sstables, storage_class, table_name, keyspace
):
with Storage(config=context.medusa_config.storage) as storage:
path = os.path.join(
Expand All @@ -1280,6 +1297,11 @@ def _i_can_see_nb_sstables_in_the_sstable_pool(
logging.error("Was expecting {} SSTables".format(nb_sstables))
assert len(sstables) == int(nb_sstables)

if storage_class is not None:
for sstable in sstables:
logging.info(f'{storage_class.upper()} vs {sstable.storage_class.upper()}')
assert storage_class.upper() == sstable.storage_class.upper()


@then(
r'backup named "{backup_name}" has {nb_files} files '
Expand Down
1 change: 1 addition & 0 deletions tests/resources/config/medusa-azure_blobs.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ host_file_separator = ","
bucket_name = medusa-integration-tests
key_file = ~/medusa_azure_credentials.json
storage_provider = azure_blobs
storage_class = COOL
fqdn = 127.0.0.1
base_path = /tmp
prefix = storage_prefix
Expand Down
1 change: 1 addition & 0 deletions tests/resources/config/medusa-google_storage.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ host_file_separator = ","
bucket_name = medusa-it
key_file = ~/medusa_credentials.json
storage_provider = google_storage
storage_class = STANDARD
fqdn = 127.0.0.1
base_path = /tmp
prefix = storage_prefix
Expand Down
1 change: 1 addition & 0 deletions tests/resources/config/medusa-s3_us_west_oregon.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ host_file_separator = ","
bucket_name = medusa-it-gha
key_file = ~/.aws/medusa_credentials
storage_provider = s3_us_west_oregon
storage_class = STANDARD
fqdn = 127.0.0.1
prefix = storage_prefix
multi_part_upload_threshold = 1024
Expand Down
Loading

0 comments on commit 6a56e53

Please sign in to comment.