diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py
index 7f59b6ba..a4f3ccb7 100644
--- a/reana_workflow_controller/consumer.py
+++ b/reana_workflow_controller/consumer.py
@@ -22,8 +22,6 @@
 from reana_commons.k8s.api_client import (
     current_k8s_batchv1_api_client,
     current_k8s_corev1_api_client,
-    current_k8s_custom_objects_api_client,
-    current_k8s_networking_api_client,
 )
 from reana_commons.k8s.secrets import UserSecretsStore
 from reana_commons.utils import (
@@ -45,10 +43,8 @@
     REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT,
 )
 from reana_workflow_controller.errors import REANAWorkflowControllerError
-from reana_workflow_controller.k8s import delete_dask_dashboard_ingress
 
-from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
-from reana_workflow_controller.dask import requires_dask
+from reana_workflow_controller.dask import requires_dask, delete_dask_cluster
 
 try:
     from urllib import parse as urlparse
@@ -168,7 +164,7 @@ def _update_workflow_status(workflow, status, logs):
                 workflow.logs += "Workflow engine logs could not be retrieved.\n"
 
             if requires_dask(workflow):
-                _delete_dask_cluster(workflow)
+                delete_dask_cluster(workflow.id_)
 
             if RunStatus.should_cleanup_job(status):
                 try:
@@ -314,36 +310,3 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
     # There might not be any pod returned by `list_namespaced_pod`, for example
     # when a workflow fails to be scheduled
     return ""
-
-
-def _delete_dask_cluster(workflow: Workflow) -> None:
-    """Delete the Dask cluster resources."""
-    current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
-        group="kubernetes.dask.org",
-        version="v1",
-        plural="daskclusters",
-        namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
-        name=f"reana-run-dask-{workflow.id_}",
-    )
-
-    if DASK_AUTOSCALER_ENABLED:
-        current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
-            group="kubernetes.dask.org",
-            version="v1",
-            plural="daskautoscalers",
-            namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
-            name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
-        )
-
-    delete_dask_dashboard_ingress(
-        f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
-    )
-
-    dask_service = (
-        Session.query(Service)
-        .filter_by(name=f"dask-dashboard-{workflow.id_}")
-        .one_or_none()
-    )
-    workflow.services.remove(dask_service)
-    Session.delete(dask_service)
-    Session.object_session(workflow).commit()
diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py
index 0d393707..b0996784 100644
--- a/reana_workflow_controller/dask.py
+++ b/reana_workflow_controller/dask.py
@@ -11,6 +11,12 @@
 
 from flask import current_app
 
+from kubernetes import client
+from kubernetes.client.exceptions import ApiException
+
+from reana_db.database import Session
+from reana_db.models import Service
+from reana_db.utils import _get_workflow_with_uuid_or_name
 from reana_commons.config import (
     K8S_CERN_EOS_AVAILABLE,
     K8S_CERN_EOS_MOUNT_CONFIGURATION,
@@ -20,6 +26,7 @@
     REANA_RUNTIME_KUBERNETES_NAMESPACE,
 )
 from reana_commons.k8s.api_client import (
+    current_k8s_networking_api_client,
     current_k8s_custom_objects_api_client,
 )
 from reana_commons.k8s.kerberos import get_kerberos_k8s_config
@@ -28,10 +35,13 @@
     get_workspace_volume,
     get_reana_shared_volume,
 )
-from reana_commons.job_utils import kubernetes_memory_to_bytes
 
-from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
-from reana_workflow_controller.k8s import create_dask_dashboard_ingress
+from reana_workflow_controller.config import (
+    DASK_AUTOSCALER_ENABLED,
+    REANA_INGRESS_HOST,
+    REANA_INGRESS_CLASS_NAME,
+    REANA_INGRESS_ANNOTATIONS,
+)
 
 
 class DaskResourceManager:
@@ -108,14 +118,21 @@ def _load_dask_autoscaler_template(self):
 
     def create_dask_resources(self):
         """Create necessary Dask resources for the workflow."""
-        self._prepare_cluster()
-        self._create_dask_cluster()
+        try:
+            self._prepare_cluster()
+            self._create_dask_cluster()
 
-        if DASK_AUTOSCALER_ENABLED:
-            self._prepare_autoscaler()
-            self._create_dask_autoscaler()
+            if DASK_AUTOSCALER_ENABLED:
+                self._prepare_autoscaler()
+                self._create_dask_autoscaler()
+
+            create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)
 
-        create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)
+        except Exception as e:
+            logging.error(
+                f"An error occured while trying to create dask cluster, now deleting the cluster... Error message:\n{e}"
+            )
+            delete_dask_cluster(self.workflow_id)
 
     def _prepare_cluster(self):
         """Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers."""
@@ -488,11 +505,11 @@ def _create_dask_cluster(self):
                 namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
                 body=self.cluster_body,
             )
-        except Exception:
+        except Exception as e:
             logging.exception(
                 "An error occurred while trying to create a Dask cluster."
             )
-            raise
+            raise e
 
     def _create_dask_autoscaler(self):
         """Create Dask autoscaler resource."""
@@ -516,3 +533,167 @@ def requires_dask(workflow):
     return bool(
         workflow.reana_specification["workflow"].get("resources", {}).get("dask", False)
     )
+
+
+def delete_dask_cluster(workflow_id) -> None:
+    """Delete the Dask cluster resources."""
+    errors = []  # Collect errors during deletion attempts
+
+    try:
+        current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
+            group="kubernetes.dask.org",
+            version="v1",
+            plural="daskclusters",
+            namespace="default",
+            name=f"reana-run-dask-{workflow_id}",
+        )
+        logging.info(f"Dask cluster for workflow {workflow_id} deleted successfully.")
+    except Exception as e:
+        errors.append(f"Error deleting Dask cluster for workflow {workflow_id}: {e}")
+
+    if DASK_AUTOSCALER_ENABLED:
+        try:
+            current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
+                group="kubernetes.dask.org",
+                version="v1",
+                plural="daskautoscalers",
+                namespace="default",
+                name=f"dask-autoscaler-reana-run-dask-{workflow_id}",
+            )
+            logging.info(
+                f"Dask autoscaler for workflow {workflow_id} deleted successfully."
+            )
+        except Exception as e:
+            errors.append(
+                f"Error deleting Dask autoscaler for workflow {workflow_id}: {e}"
+            )
+
+    try:
+        delete_dask_dashboard_ingress(
+            f"dask-dashboard-ingress-reana-run-dask-{workflow_id}", workflow_id
+        )
+        logging.info(
+            f"Dask dashboard ingress for workflow {workflow_id} deleted successfully."
+        )
+    except Exception as e:
+        errors.append(
+            f"Error deleting Dask dashboard ingress for workflow {workflow_id}: {e}"
+        )
+
+    try:
+        dask_service = (
+            Session.query(Service)
+            .filter_by(name=f"dask-dashboard-{workflow_id}")
+            .one_or_none()
+        )
+        if dask_service:
+            workflow = _get_workflow_with_uuid_or_name(workflow_id)
+            workflow.services.remove(dask_service)
+            Session.delete(dask_service)
+            Session.object_session(workflow).commit()
+
+    except Exception as e:
+        errors.append(
+            f"Error deleting Dask Service model from database of the workflow: {workflow_id}: {e}"
+        )
+
+    # Raise collected errors if any
+    if errors:
+        logging.error("Errors occurred during resource deletion:\n" + "\n".join(errors))
+        raise RuntimeError(
+            "Errors occurred during resource deletion:\n" + "\n".join(errors)
+        )
+
+
+def create_dask_dashboard_ingress(cluster_name, workflow_id):
+    """Create K8S Ingress object for Dask dashboard."""
+    # Define the middleware spec
+    middleware_spec = {
+        "apiVersion": "traefik.io/v1alpha1",
+        "kind": "Middleware",
+        "metadata": {"name": f"replacepath-{workflow_id}", "namespace": "default"},
+        "spec": {
+            "replacePathRegex": {
+                "regex": f"/{workflow_id}/dashboard/*",
+                "replacement": "/$1",
+            }
+        },
+    }
+
+    ingress = client.V1Ingress(
+        api_version="networking.k8s.io/v1",
+        kind="Ingress",
+        metadata=client.V1ObjectMeta(
+            name=f"dask-dashboard-ingress-{cluster_name}",
+            annotations={
+                **REANA_INGRESS_ANNOTATIONS,
+                "traefik.ingress.kubernetes.io/router.middlewares": f"default-replacepath-{workflow_id}@kubernetescrd",
+            },
+        ),
+        spec=client.V1IngressSpec(
+            rules=[
+                client.V1IngressRule(
+                    host=REANA_INGRESS_HOST,
+                    http=client.V1HTTPIngressRuleValue(
+                        paths=[
+                            client.V1HTTPIngressPath(
+                                path=f"/{workflow_id}/dashboard",
+                                path_type="Prefix",
+                                backend=client.V1IngressBackend(
+                                    service=client.V1IngressServiceBackend(
+                                        name=f"{cluster_name}-scheduler",
+                                        port=client.V1ServiceBackendPort(number=8787),
+                                    )
+                                ),
+                            )
+                        ]
+                    ),
+                )
+            ]
+        ),
+    )
+    if REANA_INGRESS_CLASS_NAME:
+        ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME
+
+    # Create middleware for ingress
+    current_k8s_custom_objects_api_client.create_namespaced_custom_object(
+        group="traefik.io",
+        version="v1alpha1",
+        namespace="default",
+        plural="middlewares",
+        body=middleware_spec,
+    )
+    # Create the ingress resource
+    current_k8s_networking_api_client.create_namespaced_ingress(
+        namespace="default", body=ingress
+    )
+
+
+def delete_dask_dashboard_ingress(cluster_name, workflow_id):
+    """Delete K8S Ingress Object for Dask dashboard."""
+    errors = []  # Collect errors during deletion attempts
+    try:
+        current_k8s_networking_api_client.delete_namespaced_ingress(
+            name=cluster_name, namespace="default", body=client.V1DeleteOptions()
+        )
+    except Exception as e:
+        errors.append(
+            f"Error deleting Dask dashboard ingress for workflow {workflow_id}: {e}"
+        )
+
+    try:
+        current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
+            group="traefik.io",
+            version="v1alpha1",
+            namespace="default",
+            plural="middlewares",
+            name=f"replacepath-{workflow_id}",
+        )
+    except Exception as e:
+        errors.append(
+            f"Error deleting Dask dashboard ingress middleware for workflow {workflow_id}: {e}"
+        )
+
+    # Raise collected errors if any
+    if errors:
+        raise RuntimeError("\n".join(errors))
diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py
index aab726ff..e9d84078 100644
--- a/reana_workflow_controller/k8s.py
+++ b/reana_workflow_controller/k8s.py
@@ -17,7 +17,6 @@
     current_k8s_appsv1_api_client,
     current_k8s_corev1_api_client,
     current_k8s_networking_api_client,
-    current_k8s_custom_objects_api_client,
 )
 from reana_commons.k8s.secrets import UserSecretsStore
 from reana_commons.k8s.volumes import (
@@ -404,89 +403,6 @@ def delete_k8s_ingress_object(ingress_name, namespace):
         )
 
 
-def create_dask_dashboard_ingress(cluster_name, workflow_id):
-    """Create K8S Ingress object for Dask dashboard."""
-    # Define the middleware spec
-    middleware_spec = {
-        "apiVersion": "traefik.io/v1alpha1",
-        "kind": "Middleware",
-        "metadata": {
-            "name": f"replacepath-{workflow_id}",
-            "namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
-        },
-        "spec": {
-            "replacePathRegex": {
-                "regex": f"/{workflow_id}/dashboard/*",
-                "replacement": "/$1",
-            }
-        },
-    }
-
-    ingress = client.V1Ingress(
-        api_version="networking.k8s.io/v1",
-        kind="Ingress",
-        metadata=client.V1ObjectMeta(
-            name=f"dask-dashboard-ingress-{cluster_name}",
-            annotations={
-                **REANA_INGRESS_ANNOTATIONS,
-                "traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-replacepath-{workflow_id}@kubernetescrd",
-            },
-        ),
-        spec=client.V1IngressSpec(
-            rules=[
-                client.V1IngressRule(
-                    host=REANA_INGRESS_HOST,
-                    http=client.V1HTTPIngressRuleValue(
-                        paths=[
-                            client.V1HTTPIngressPath(
-                                path=f"/{workflow_id}/dashboard",
-                                path_type="Prefix",
-                                backend=client.V1IngressBackend(
-                                    service=client.V1IngressServiceBackend(
-                                        name=f"{cluster_name}-scheduler",
-                                        port=client.V1ServiceBackendPort(number=8787),
-                                    )
-                                ),
-                            )
-                        ]
-                    ),
-                )
-            ]
-        ),
-    )
-    if REANA_INGRESS_CLASS_NAME:
-        ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME
-
-    # Create middleware for ingress
-    current_k8s_custom_objects_api_client.create_namespaced_custom_object(
-        group="traefik.io",
-        version="v1alpha1",
-        namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
-        plural="middlewares",
-        body=middleware_spec,
-    )
-    # Create the ingress resource
-    current_k8s_networking_api_client.create_namespaced_ingress(
-        namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=ingress
-    )
-
-
-def delete_dask_dashboard_ingress(cluster_name, workflow_id):
-    """Delete K8S Ingress Object for Dask dashboard."""
-    current_k8s_networking_api_client.delete_namespaced_ingress(
-        name=cluster_name,
-        namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
-        body=client.V1DeleteOptions(),
-    )
-    current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
-        group="traefik.io",
-        version="v1alpha1",
-        namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
-        plural="middlewares",
-        name=f"replacepath-{workflow_id}",
-    )
-
-
 def check_pod_status_by_prefix(
     pod_name_prefix, namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE
 ):