diff --git a/ocs_ci/ocs/amq.py b/ocs_ci/ocs/amq.py index 2d44f04f9c7..70d75eb3346 100644 --- a/ocs_ci/ocs/amq.py +++ b/ocs_ci/ocs/amq.py @@ -167,9 +167,10 @@ def setup_amq_cluster_operator(self, namespace=constants.AMQ_NAMESPACE): log.warn( "Some amq leftovers are present, please cleanup the cluster" ) - pytest.skip( - "AMQ leftovers are present needs to cleanup the cluster" - ) + # TODO: Check why this skip was here in the first place + # pytest.skip( + # "AMQ leftovers are present needs to cleanup the cluster" + # ) time.sleep(30) # Check strimzi-cluster-operator pod created if self.is_amq_pod_running(pod_pattern="cluster-operator", expected_pods=1): diff --git a/ocs_ci/ocs/constants.py b/ocs_ci/ocs/constants.py index 02ba43e07b1..9d05cd5c2bd 100644 --- a/ocs_ci/ocs/constants.py +++ b/ocs_ci/ocs/constants.py @@ -417,6 +417,7 @@ RGW_KAFKA_NOTIFY = "https://github.com/shonpaz123/notify/" OCS_WORKLOADS = "https://github.com/red-hat-storage/ocs-workloads" CODESPEED_URL = "http://10.0.78.167:8000/" +KAFKA_PODS_LABEL = "strimzi.io/pool-name=kafka" # ODF monitoring tool ODF_MONITORING_TOOL_REPO = "https://github.com/red-hat-storage/odf-monitoring-tools.git" @@ -439,6 +440,7 @@ DEFAULT_NOOBAA_BACKINGSTORE = "noobaa-default-backing-store" DEFAULT_NOOBAA_BUCKETCLASS = "noobaa-default-bucket-class" DEFAULT_MCG_BUCKET_LOGS_PVC = "noobaa-bucket-logging-pvc" +DEFAULT_MCG_BUCKET_NOTIFS_PVC = "noobaa-bucket-notifications-pvc" CUSTOM_MCG_LABEL = "custom=mcg-label" NOOBAA_RESOURCE_NAME = "noobaa" NOOBAA_DB_PVC_NAME = "db-noobaa-db-pg-0" @@ -620,7 +622,9 @@ NOOBAA_OPERATOR_DEPLOYMENT = "noobaa-operator" NOOBAA_ENDPOINT_DEPLOYMENT = "noobaa-endpoint" NOOBAA_DB_STATEFULSET = "noobaa-db-pg" +NOOBAA_DB_POD = "noobaa-db-pg-0" NOOBAA_CORE_STATEFULSET = "noobaa-core" +NOOBAA_CORE_POD = "noobaa-core-0" # Noobaa db secret NOOBAA_DB_SECRET = "noobaa-db" diff --git a/ocs_ci/ocs/resources/bucket_notifications_manager.py b/ocs_ci/ocs/resources/bucket_notifications_manager.py new file mode 100644 index 00000000000..80075b2a7a2 --- /dev/null +++ b/ocs_ci/ocs/resources/bucket_notifications_manager.py @@ -0,0 +1,356 @@ +import json +import logging +import os +import tempfile +import time + +from ocs_ci.framework import config +from ocs_ci.helpers.helpers import ( + craft_s3_command, + create_unique_resource_name, + default_storage_class, +) +from ocs_ci.ocs import constants +from ocs_ci.ocs.cluster import CephCluster +from ocs_ci.ocs.ocp import OCP +from ocs_ci.ocs.amq import AMQ +from ocs_ci.ocs.exceptions import CommandFailed +from ocs_ci.ocs.resources.pod import ( + Pod, + get_noobaa_endpoint_pods, + get_pods_having_label, + wait_for_pods_to_be_running, +) + +logger = logging.getLogger(__name__) + +NOTIFS_YAML_PATH_NB_CR = "/spec/bucketNotifications" + + +class BucketNotificationsManager: + """ + A class to manage the MCG bucket notifications feature + """ + + @property + def nb_config_resource(self): + """ + Return the NooBaa configuration resource + Note that this might change in the future. + + Returns: + ocs_ci.ocs.ocp.OCP: OCP instance of the NooBaa configuration resource + """ + return OCP( + kind="noobaa", + namespace=config.ENV_DATA["cluster_namespace"], + resource_name="noobaa", + ) + + def __init__(self): + self.amq = AMQ() + self.kafka_topics = [] + self.conn_secrets = [] + self.cur_logs_pvc = constants.DEFAULT_MCG_BUCKET_NOTIFS_PVC + + def setup_kafka(self): + """ + Deploy an AMQ cluster and set up Kafka + """ + # Get sc + sc = default_storage_class(interface_type=constants.CEPHBLOCKPOOL) + + # Deploy amq cluster + self.amq.setup_amq_cluster(sc.name) + + def enable_bucket_notifs_on_cr(self, notifs_pvc=None): + """ + Set the bucket notifications feature on the NooBaa CR + + Args: + notifs_pvc(str|optional): Name of a provided PVC for MCG to use for + intermediate logging of the events. + Note: + If not provided, a PVC will be automatically be created + by MCG when first enabling the feature. + """ + logger.info("Enabling bucket notifications on the NooBaa CR") + + # Build a patch command to enable guaranteed bucket logs + bucket_notifs_dict = {"connections": [], "enabled": True} + + # Add the bucketLoggingPVC field if provided + if notifs_pvc: + bucket_notifs_dict["pvc"] = notifs_pvc + + patch_params = [ + { + "op": "add", + "path": NOTIFS_YAML_PATH_NB_CR, + "value": bucket_notifs_dict, + } + ] + + # Try patching via add, and if it fails - replace instead + try: + self.nb_config_resource.patch( + params=json.dumps(patch_params), + format_type="json", + ) + except CommandFailed as e: + if "already exists" in str(e).lower(): + patch_params[0]["op"] = "replace" + self.nb_config_resource.patch( + params=json.dumps(patch_params), + format_type="json", + ) + else: + logger.error(f"Failed to enable bucket notifications: {e}") + raise e + + self.cur_logs_pvc = ( + notifs_pvc if notifs_pvc else constants.DEFAULT_MCG_BUCKET_NOTIFS_PVC + ) + + wait_for_pods_to_be_running( + pod_names=[constants.NOOBAA_CORE_POD], + timeout=60, + sleep=10, + ) + + logger.info("Guaranteed bucket logs have been enabled") + + def disable_bucket_logging_on_cr(self): + """ + Unset the bucket notifications feature on the NooBaa CR + """ + logger.info("Disabling bucket notifications on the NooBaa CR") + + try: + patch_params = [ + { + "op": "replace", + "path": NOTIFS_YAML_PATH_NB_CR, + "value": None, + }, + ] + self.nb_config_resource.patch( + params=json.dumps(patch_params), + format_type="json", + ) + + except CommandFailed as e: + if "not found" in str(e): + logger.info("The bucketNotifications field was not found") + else: + logger.error(f"Failed to disable bucket notifications: {e}") + raise e + + wait_for_pods_to_be_running( + pod_names=[constants.NOOBAA_CORE_POD], + timeout=60, + sleep=10, + ) + + logger.info("Bucket notifications have been disabled") + + def create_kafka_topic(self, topic_name=""): + """ + Create a Kafka topic + + Args: + name(str|optional): Name of the Kafka topic + + Returns: + str: Name of the created Kafka topic + """ + topic_name = topic_name or create_unique_resource_name( + resource_description="nb-notif", resource_type="kafka-topic" + ) + topic = self.amq.create_kafka_topic(topic_name) + self.kafka_topics.append(topic) + return topic_name + + def create_kafka_conn_secret(self, topic): + """ + Create secret from a JSON file that + defines the Kafka connection configuration + + Args: + topic(str): Name of the Kafka topic + + Returns: + secret_ocp_obj: OCP instance of the created secret + conn_file_name: Name of the JSON file + """ + namespace = config.ENV_DATA["cluster_namespace"] + conn_name = create_unique_resource_name( + resource_description="nb-notif", resource_type="kafka-conn" + ) + secret_name = conn_name + "-secret" + conn_file_name = "" + + kafka_conn_config = { + "metadata.broker.list": "my-cluster-kafka-bootstrap.myproject.svc.cluster.local:9092", + "notification_protocol": "kafka", + "topic": topic, + "name": conn_name, + } + + with tempfile.NamedTemporaryFile( + mode="w+", prefix="kafka_conn_", suffix=".json", delete=True + ) as conn_file: + conn_file_name = os.path.basename(conn_file.name) + conn_file.write(json.dumps(kafka_conn_config)) + conn_file.flush() # Ensure that the data is written + + OCP().exec_oc_cmd( + f"create secret generic {secret_name} --from-file={conn_file.name} -n {namespace}" + ) + + secret_ocp_obj = OCP( + kind="secret", + namespace=namespace, + resource_name=secret_name, + ) + self.conn_secrets.append(secret_ocp_obj) + + return secret_ocp_obj, conn_file_name + + def add_notif_conn_to_noobaa_cr(self, secret): + """ + Add a connection secret to list of bucket notifications + connections in the NooBaa CR. + + Args: + secret(OCP): OCP instance of the secret to add + """ + nb_ocp_obj = OCP( + kind="noobaa", + namespace=config.ENV_DATA["cluster_namespace"], + resource_name="noobaa", + ) + conn_data = { + "name": secret.resource_name, + "namespace": secret.namespace, + } + patch_path = "/spec/bucketNotifications/connections" + add_op = [{"op": "add", "path": f"{patch_path}/-", "value": conn_data}] + nb_ocp_obj.patch( + resource_name=constants.NOOBAA_RESOURCE_NAME, + params=json.dumps(add_op), + format_type="json", + ) + + # Wait for noobaa to process the change + nb_pods = [pod.name for pod in get_noobaa_endpoint_pods()] + nb_pods += [constants.NOOBAA_CORE_POD] + wait_for_pods_to_be_running( + namespace=config.ENV_DATA["cluster_namespace"], + pod_names=nb_pods, + timeout=60, + sleep=10, + ) + CephCluster().wait_for_noobaa_health_ok() + + def put_bucket_notification(self, awscli_pod, mcg_obj, bucket, events, conn_file): + """ + Configure bucket notifications on a bucket using the AWS CLI + + Args: + awscli_pod(Pod): Pod instance of the AWS CLI pod + mcg_obj(MCG): MCG object + bucket(str): Name of the bucket + events(list): List of events to trigger notifications + conn_file(str): Name of the file that NooBaa uses to connect to Kafka + """ + rand_id = create_unique_resource_name( + resource_description="notif", resource_type="id" + ) + + notif_config = { + "TopicConfiguration": { + "Id": rand_id, + "Events": events, + "Topic": conn_file, # See https://issues.redhat.com/browse/DFBUGS-947 + } + } + # Serialize JSON with escaped quotes + notif_config_json = json.dumps(notif_config).replace('"', '\\"') + awscli_pod.exec_cmd_on_pod( + command=craft_s3_command( + f"put-bucket-notification --bucket {bucket} --notification-configuration '{notif_config_json}'", + mcg_obj=mcg_obj, + api=True, + ) + ) + logger.info("Waiting for put-bucket-notification to propogate") + time.sleep(60) + + def get_bucket_notification(self, awscli_pod, mcg_obj, bucket): + """ + Get the bucket notification configuration of a bucket + + Args: + awscli_pod(Pod): Pod instance of the AWS CLI pod + mcg_obj(MCG): MCG object + bucket(str): Name of the bucket + + Returns: + dict: Bucket notification configuration + """ + return awscli_pod.exec_cmd_on_pod( + command=craft_s3_command( + f"get-bucket-notification --bucket {bucket}", + mcg_obj=mcg_obj, + api=True, + ) + ) + + def get_events(self, topic, timeout_in_ms=5000): + """ + Query a Kafka topic for events + + Args: + topic(str): Name of the Kafka topic + timeout_in_ms(int): How long to wait for events + + Returns: + list: List of event dictionaries + """ + # Query the Kafka topic via the Kafka consumer script on any of the Kafka pods + kafka_pod = Pod( + **get_pods_having_label( + namespace=constants.AMQ_NAMESPACE, label=constants.KAFKA_PODS_LABEL + )[0] + ) + cmd = ( + f"bin/kafka-console-consumer.sh --bootstrap-server {constants.KAFKA_ENDPOINT} " + f"--topic {topic} --from-beginning --timeout-ms {timeout_in_ms}" + ) + raw_resp = kafka_pod.exec_cmd_on_pod(command=cmd, out_yaml_format=False) + + # Parse the raw response into a list of event dictionaries + events = [] + for line in raw_resp.split("\n"): + if line: + # Every event is nested in a single-element list + event_dict = json.loads(line)["Records"][0] + events.append(event_dict) + + return events + + def cleanup(self): + """ + Clean up the resources created by the BucketNotificationsManager + 1. Disable bucket notifications on the NooBaa CR + 2. Delete connection secrets + 3. Delete Kafka topics + 4. Clean up the AMQ cluster + """ + self.disable_bucket_logging_on_cr() + for secret in self.conn_secrets: + secret.delete(resource_name=secret.resource_name) + for topic in self.kafka_topics: + topic.delete() + self.amq.cleanup() diff --git a/tests/functional/object/mcg/test_bucket_notifications.py b/tests/functional/object/mcg/test_bucket_notifications.py new file mode 100644 index 00000000000..4b5dc117bfa --- /dev/null +++ b/tests/functional/object/mcg/test_bucket_notifications.py @@ -0,0 +1,139 @@ +import pytest +import logging + + +from ocs_ci.framework import config +from ocs_ci.framework.testlib import ( + MCGTest, + mcg, + polarion_id, + bugzilla, + red_squad, + skipif_mcg_only, + tier1, +) +from ocs_ci.ocs import constants +from ocs_ci.ocs.bucket_utils import write_random_test_objects_to_bucket +from ocs_ci.ocs.exceptions import TimeoutExpiredError +from ocs_ci.ocs.ocp import OCP +from ocs_ci.ocs.resources.bucket_notifications_manager import BucketNotificationsManager +from ocs_ci.utility.utils import TimeoutSampler + +logger = logging.getLogger(__name__) + + +@mcg +@red_squad +class TestBucketNotifications(MCGTest): + """ + Test the MCG bucket notifications feature + """ + + @pytest.fixture(autouse=True, scope="class") + def notif_manager(self, request): + """ + Set up Kafka and the BucketNotificationsManager + + Returns: + BucketNotificationsManager: An instance of the BucketNotificationsManager class + """ + notif_manager = BucketNotificationsManager() + request.addfinalizer(notif_manager.cleanup) + + notif_manager.setup_kafka() + return notif_manager + + @tier1 + @pytest.mark.parametrize( + argnames=["use_provided_notifs_pvc"], + argvalues=[ + pytest.param(False, marks=[polarion_id("OCS-6242"), bugzilla("2302842")]), + pytest.param( + True, + marks=[polarion_id("OCS-6243"), skipif_mcg_only], + ), + ], + ids=[ + "default-logs-pvc", + "provided-logs-pvc", + ], + ) + def test_bucket_notifications( + self, + mcg_obj, + awscli_pod, + bucket_factory, + test_directory_setup, + notif_manager, + pvc_factory, + use_provided_notifs_pvc, + ): + """ + Test the MCG bucket notifications feature + 1. Enable bucket notifications on the NooBaa CR + 2. Create a Kafka topic and add a Kafka notification connection to the NooBaa CR + 3. Create a bucket and configure bucket notificiations + on it using the new connection + 4. Verify that the bucket notification configuration was set correctly + 5. Write some objects to the bucket + 6. Verify that the expected events were received by Kafka + """ + # 1. Enable bucket notifications on the NooBaa CR + provided_notifs_pvc = None + if use_provided_notifs_pvc: + clstr_proj_obj = OCP(namespace=config.ENV_DATA["cluster_namespace"]) + provided_notifs_pvc = pvc_factory( + interface=constants.CEPHFILESYSTEM, + project=clstr_proj_obj, + size=20, + access_mode=constants.ACCESS_MODE_RWX, + ).name + notif_manager.enable_bucket_notifs_on_cr(notifs_pvc=provided_notifs_pvc) + else: + notif_manager.enable_bucket_notifs_on_cr() + + # 2. Create a Kafka topic connection to the NooBaa CR + topic = notif_manager.create_kafka_topic() + secret, conn_file_name = notif_manager.create_kafka_conn_secret(topic) + notif_manager.add_notif_conn_to_noobaa_cr(secret) + + # 3. Create a bucket and configure bucket notifs on it using the new connection + bucket = bucket_factory()[0].name + notif_manager.put_bucket_notification( + awscli_pod=awscli_pod, + mcg_obj=mcg_obj, + bucket=bucket, + events=["s3:ObjectCreated:*"], + conn_file=conn_file_name, + ) + + # # 4. Verify the bucket notification configuration was set correctly + resp = notif_manager.get_bucket_notification(awscli_pod, mcg_obj, bucket) + # See https://issues.redhat.com/browse/DFBUGS-947 + assert resp["TopicConfiguration"]["Topic"] == conn_file_name + + # 5. Write some objects to the bucket + obj_keys = write_random_test_objects_to_bucket( + io_pod=awscli_pod, + bucket_to_write=bucket, + file_dir=test_directory_setup.origin_dir, + amount=20, + mcg_obj=mcg_obj, + ) + obj_keys_set = set(obj_keys) + + # 6. Verify that the expected events were received by Kafka + try: + for events in TimeoutSampler( + timeout=120, + sleep=5, + func=notif_manager.get_events, + topic=topic, + ): + keys_in_notifs = set(event["s3"]["object"]["key"] for event in events) + if obj_keys_set.issubset(keys_in_notifs): + logger.info("All expected events were received by Kafka") + break + except TimeoutExpiredError: + logger.error("Not all expected events were received by Kafka") + raise