Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure Ceph pools in Rook #66

Merged
merged 8 commits into from
Sep 3, 2024
33 changes: 25 additions & 8 deletions src/rookify/modules/ceph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ def __init__(self, config: Dict[str, Any]):
def __getattr__(self, name: str) -> Any:
return getattr(self.__ceph, name)

def mon_command(self, command: str, **kwargs: str) -> Dict[str, Any] | List[Any]:
cmd = {"prefix": command, "format": "json"}
cmd.update(**kwargs)

result = self.__ceph.mon_command(json.dumps(cmd), b"")

def _json_command(self, handler: Any, *args: Any) -> Dict[str, Any] | List[Any]:
result = handler(*args)
if result[0] != 0:
raise ModuleException(f"Ceph did return an error: {result}")

Expand All @@ -36,14 +32,35 @@ def mon_command(self, command: str, **kwargs: str) -> Dict[str, Any] | List[Any]

return data

def get_osd_pool_configurations_from_osd_dump(
self, dump_data: Dict[str, Any]
) -> Dict[str, Any]:
osd_pools = {osd_pool["pool_name"]: osd_pool for osd_pool in dump_data["pools"]}

erasure_code_profiles = dump_data["erasure_code_profiles"]

for osd_pool_name in osd_pools:
osd_pool = osd_pools[osd_pool_name]

osd_pool["erasure_code_configuration"] = erasure_code_profiles.get(
osd_pool["erasure_code_profile"], erasure_code_profiles["default"]
)

return osd_pools

def mon_command(self, command: str, **kwargs: str) -> Dict[str, Any] | List[Any]:
cmd = {"prefix": command, "format": "json"}
cmd.update(**kwargs)
return self._json_command(self.__ceph.mon_command, json.dumps(cmd), b"") # type: ignore
return self._json_command(self.__ceph.mon_command, json.dumps(cmd), b"")

def mgr_command(self, command: str, **kwargs: str) -> Dict[str, Any] | List[Any]:
cmd = {"prefix": command, "format": "json"}
cmd.update(**kwargs)
return self._json_command(self.__ceph.mgr_command, json.dumps(cmd), b"")

def osd_command(
self, osd_id: int, command: str, **kwargs: str
) -> Dict[str, Any] | List[Any]:
cmd = {"prefix": command, "format": "json"}
cmd.update(**kwargs)
return self._json_command(self.__ceph.osd_command, osd_id, json.dumps(cmd), b"") # type: ignore
return self._json_command(self.__ceph.osd_command, osd_id, json.dumps(cmd), b"")
3 changes: 2 additions & 1 deletion src/rookify/modules/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def _get_state_tags_data(self, name: str) -> Dict[str, Any]:

if len(state.tags) > 0:
for tag in state.tags:
data[tag] = getattr(state, tag)
if hasattr(state, tag):
data[tag] = getattr(state, tag)

return data

Expand Down
3 changes: 3 additions & 0 deletions src/rookify/modules/migrate_mds_pools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-

from .main import MigrateMdsPoolsHandler as ModuleHandler # noqa
156 changes: 156 additions & 0 deletions src/rookify/modules/migrate_mds_pools/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# -*- coding: utf-8 -*-

from typing import Any, Dict
from ..machine import Machine
from ..module import ModuleHandler


class MigrateMdsPoolsHandler(ModuleHandler):
REQUIRES = ["analyze_ceph", "migrate_mds"]

def preflight(self) -> None:
state_data = self.machine.get_preflight_state("AnalyzeCephHandler").data

pools = getattr(
self.machine.get_preflight_state("MigrateMdsPoolsHandler"), "pools", {}
)

osd_pools = self.ceph.get_osd_pool_configurations_from_osd_dump(
state_data["osd"]["dump"]
)

for mds_fs_data in state_data["fs"]["ls"]:
if not mds_fs_data["metadata_pool"].endswith("-metadata"):
self.logger.warn(
"MDS filesystem '{0}' uses an incompatible Ceph pool metadata name '{1}' and can not be migrated to Rook automatically".format(
mds_fs_data["name"], mds_fs_data["metadata_pool"]
)
)

# Store pools for incompatible MDS filesystem as migrated ones
migrated_pools = getattr(
self.machine.get_execution_state("MigrateMdsPoolsHandler"),
"migrated_pools",
[],
)

if mds_fs_data["metadata_pool"] not in migrated_pools:
migrated_pools.append(mds_fs_data["metadata_pool"])

for pool_data_osd_name in mds_fs_data["data_pools"]:
if pool_data_osd_name not in migrated_pools:
migrated_pools.append(pool_data_osd_name)

self.machine.get_execution_state(
"MigrateMdsPoolsHandler"
).migrated_pools = migrated_pools

continue

pool = {
"name": mds_fs_data["name"],
"metadata": mds_fs_data["metadata_pool"],
"data": [pool for pool in mds_fs_data["data_pools"]],
"osd_pool_configurations": {},
}

pool["osd_pool_configurations"][mds_fs_data["metadata_pool"]] = osd_pools[
mds_fs_data["metadata_pool"]
]

for mds_ods_pool_name in mds_fs_data["data_pools"]:
pool["osd_pool_configurations"][mds_ods_pool_name] = osd_pools[
mds_ods_pool_name
]

pools[mds_fs_data["name"]] = pool

self.machine.get_preflight_state("MigrateMdsPoolsHandler").pools = pools

def execute(self) -> None:
pools = self.machine.get_preflight_state("MigrateMdsPoolsHandler").pools

for pool in pools.values():
self._migrate_pool(pool)

def _migrate_pool(self, pool: Dict[str, Any]) -> None:
migrated_mds_pools = getattr(
self.machine.get_execution_state("MigrateMdsPoolsHandler"),
"migrated_mds_pools",
[],
)

if pool["name"] in migrated_mds_pools:
return

migrated_pools = getattr(
self.machine.get_execution_state("MigrateMdsPoolsHandler"),
"migrated_pools",
[],
)

self.logger.debug("Migrating Ceph MDS pool '{0}'".format(pool["name"]))
osd_pool_configurations = pool["osd_pool_configurations"]

pool_metadata_osd_configuration = osd_pool_configurations[pool["metadata"]]

filesystem_definition_values = {
"cluster_namespace": self._config["rook"]["cluster"]["namespace"],
"name": pool["name"],
"mds_size": pool_metadata_osd_configuration["size"],
}

filesystem_definition_values["data_pools"] = []

for pool_data_osd_name in pool["data"]:
osd_configuration = osd_pool_configurations[pool_data_osd_name]

definition_data_pool = {
"name": osd_configuration["pool_name"],
"size": osd_configuration["size"],
}

filesystem_definition_values["data_pools"].append(definition_data_pool)

# Render cluster config from template
pool_definition = self.load_template(
"filesystem.yaml.j2", **filesystem_definition_values
)

self.k8s.crd_api_apply(pool_definition.yaml)

if pool["name"] not in migrated_mds_pools:
migrated_mds_pools.append(pool["name"])

self.machine.get_execution_state(
"MigrateMdsPoolsHandler"
).migrated_mds_pools = migrated_mds_pools

if pool["metadata"] not in migrated_pools:
migrated_pools.append(pool["metadata"])

for pool_data_osd_name in pool["data"]:
if pool_data_osd_name not in migrated_pools:
migrated_pools.append(pool_data_osd_name)

self.machine.get_execution_state(
"MigrateMdsPoolsHandler"
).migrated_pools = migrated_pools

self.logger.info("Migrated Ceph MDS pool '{0}'".format(pool["name"]))

@staticmethod
def register_execution_state(
machine: Machine, state_name: str, handler: ModuleHandler, **kwargs: Any
) -> None:
ModuleHandler.register_execution_state(
machine, state_name, handler, tags=["migrated_pools", "migrated_mds_pools"]
)

@staticmethod
def register_preflight_state(
machine: Machine, state_name: str, handler: ModuleHandler, **kwargs: Any
) -> None:
ModuleHandler.register_preflight_state(
machine, state_name, handler, tags=["pools"]
)
140 changes: 140 additions & 0 deletions src/rookify/modules/migrate_mds_pools/templates/filesystem.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
---
#################################################################################################################
# Create a filesystem with settings with replication enabled for a production environment.
# A minimum of 3 OSDs on different nodes are required in this example.
# If one mds daemon per node is too restrictive, see the podAntiAffinity below.
# kubectl create -f filesystem.yaml
#################################################################################################################

apiVersion: ceph.rook.io/v1
kind: CephFilesystem
metadata:
name: {{ name }}
namespace: {{ cluster_namespace }}
spec:
# The metadata pool spec. Must use replication.
metadataPool:
replicated:
size: {{ mds_size }}
requireSafeReplicaSize: true
parameters:
# Inline compression mode for the data pool
# Further reference: https://docs.ceph.com/docs/master/rados/configuration/bluestore-config-ref/#inline-compression
compression_mode:
none
# gives a hint (%) to Ceph in terms of expected consumption of the total cluster capacity of a given pool
# for more info: https://docs.ceph.com/docs/master/rados/operations/placement-groups/#specifying-expected-pool-size
#target_size_ratio: ".5"
# The list of data pool specs. Can use replication or erasure coding.
dataPools:
{% for pool in data_pools %}
- name: {{ pool.name }}
failureDomain: host
replicated:
size: {{ pool.size }}
# Disallow setting pool with replica 1, this could lead to data loss without recovery.
# Make sure you're *ABSOLUTELY CERTAIN* that is what you want
requireSafeReplicaSize: true
parameters:
# Inline compression mode for the data pool
# Further reference: https://docs.ceph.com/docs/master/rados/configuration/bluestore-config-ref/#inline-compression
compression_mode:
none
# gives a hint (%) to Ceph in terms of expected consumption of the total cluster capacity of a given pool
# for more info: https://docs.ceph.com/docs/master/rados/operations/placement-groups/#specifying-expected-pool-size
#target_size_ratio: ".5"
{% endfor %}
# Whether to preserve filesystem after CephFilesystem CRD deletion
preserveFilesystemOnDelete: true
# The metadata service (mds) configuration
metadataServer:
# The number of active MDS instances
activeCount: 1
# Whether each active MDS instance will have an active standby with a warm metadata cache for faster failover.
# If false, standbys will be available, but will not have a warm cache.
activeStandby: true
# The affinity rules to apply to the mds deployment
placement:
# nodeAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# nodeSelectorTerms:
# - matchExpressions:
# - key: role
# operator: In
# values:
# - mds-node
# topologySpreadConstraints:
# tolerations:
# - key: mds-node
# operator: Exists
# podAffinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- rook-ceph-mds
## Add this if you want to allow mds daemons for different filesystems to run on one
## node. The value in "values" must match .metadata.name.
# - key: rook_file_system
# operator: In
# values:
# - myfs
# topologyKey: kubernetes.io/hostname will place MDS across different hosts
topologyKey: kubernetes.io/hostname
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- rook-ceph-mds
# topologyKey: */zone can be used to spread MDS across different AZ
# Use <topologyKey: failure-domain.beta.kubernetes.io/zone> in k8s cluster if your cluster is v1.16 or lower
# Use <topologyKey: topology.kubernetes.io/zone> in k8s cluster is v1.17 or upper
topologyKey: topology.kubernetes.io/zone
# A key/value list of annotations
# annotations:
# key: value
# A key/value list of labels
# labels:
# key: value
# resources:
# The requests and limits set here, allow the filesystem MDS Pod(s) to use half of one CPU core and 1 gigabyte of memory
# limits:
# memory: "1024Mi"
# requests:
# cpu: "500m"
# memory: "1024Mi"
priorityClassName: system-cluster-critical
livenessProbe:
disabled: false
startupProbe:
disabled: false
# Filesystem mirroring settings
# mirroring:
# enabled: true
# # list of Kubernetes Secrets containing the peer token
# # for more details see: https://docs.ceph.com/en/latest/dev/cephfs-mirroring/#bootstrap-peers
# # Add the secret name if it already exists else specify the empty list here.
# peers:
# secretNames:
# - secondary-cluster-peer
# # specify the schedule(s) on which snapshots should be taken
# # see the official syntax here https://docs.ceph.com/en/latest/cephfs/snap-schedule/#add-and-remove-schedules
# snapshotSchedules:
# - path: /
# interval: 24h # daily snapshots
# # The startTime should be mentioned in the format YYYY-MM-DDTHH:MM:SS
# # If startTime is not specified, then by default the start time is considered as midnight UTC.
# # see usage here https://docs.ceph.com/en/latest/cephfs/snap-schedule/#usage
# # startTime: 2022-07-15T11:55:00
# # manage retention policies
# # see syntax duration here https://docs.ceph.com/en/latest/cephfs/snap-schedule/#add-and-remove-retention-policies
# snapshotRetention:
# - path: /
# duration: "h 24"
3 changes: 3 additions & 0 deletions src/rookify/modules/migrate_osd_pools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-

from .main import MigrateOSDPoolsHandler as ModuleHandler # noqa
Loading
Loading