Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor and mypy up ibm datasource #5509

Merged
merged 7 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 59 additions & 44 deletions cloudinit/sources/DataSourceIBMCloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import json
import logging
import os
from typing import Any, Callable, Dict, Optional, Tuple

from cloudinit import atomic_helper, sources, subp, util
from cloudinit.sources.helpers import openstack
Expand Down Expand Up @@ -176,7 +177,7 @@ def network_config(self):
# environment handles networking configuration. Not cloud-init.
return {"config": "disabled", "version": 1}
if self._network_config is None:
if self.network_json is not None:
if self.network_json not in (sources.UNSET, None):
LOG.debug("network config provided via network_json")
self._network_config = openstack.convert_net_json(
self.network_json, known_macs=None
Expand All @@ -186,22 +187,30 @@ def network_config(self):
return self._network_config


def _read_system_uuid():
def _read_system_uuid() -> Optional[str]:
"""
Read the system uuid.

@return: the system uuid or None if not available.
a-dubs marked this conversation as resolved.
Show resolved Hide resolved
"""
uuid_path = "/sys/hypervisor/uuid"
if not os.path.isfile(uuid_path):
return None
return util.load_text_file(uuid_path).strip().lower()


def _is_xen():
"""
Return boolean indicating if this is a xen hypervisor.
"""
return os.path.exists("/proc/xen")


def _is_ibm_provisioning(
prov_cfg="/root/provisioningConfiguration.cfg",
inst_log="/root/swinstall.log",
boot_ref="/proc/1/environ",
):
) -> bool:
"""Return boolean indicating if this boot is ibm provisioning boot."""
if os.path.exists(prov_cfg):
msg = "config '%s' exists." % prov_cfg
Expand Down Expand Up @@ -229,7 +238,7 @@ def _is_ibm_provisioning(
return result


def get_ibm_platform():
def get_ibm_platform() -> Tuple[Optional[str], Optional[str]]:
"""Return a tuple (Platform, path)

If this is Not IBM cloud, then the return value is (None, None).
Expand All @@ -242,7 +251,7 @@ def get_ibm_platform():
return not_found

# fslabels contains only the first entry with a given label.
fslabels = {}
fslabels: Dict[str, Dict] = {}
try:
devs = util.blkid()
except subp.ProcessExecutionError as e:
Expand Down Expand Up @@ -289,7 +298,7 @@ def get_ibm_platform():
return not_found


def read_md():
def read_md() -> Optional[Dict[str, Any]]:
"""Read data from IBM Cloud.

@return: None if not running on IBM Cloud.
Expand Down Expand Up @@ -325,27 +334,39 @@ def read_md():
return ret


def metadata_from_dir(source_dir):
def metadata_from_dir(source_dir: str) -> Dict[str, Any]:
"""Walk source_dir extracting standardized metadata.

Certain metadata keys are renamed to present a standardized set of metadata
keys.

This function has a lot in common with ConfigDriveReader.read_v2 but
there are a number of inconsistencies, such key renames and as only
presenting a 'latest' version which make it an unlikely candidate to share
there are a number of inconsistencies, such as key renames and only
presenting a 'latest' version, which make it an unlikely candidate to share
code.

@return: Dict containing translated metadata, userdata, vendordata,
networkdata as present.
"""

def opath(fname):
def opath(fname: str) -> str:
return os.path.join("openstack", "latest", fname)

def load_json_bytes(blob):
def load_json_bytes(blob: bytes) -> Dict[str, Any]:
a-dubs marked this conversation as resolved.
Show resolved Hide resolved
return json.loads(blob.decode("utf-8"))

def load_file(
path: str, translator: Optional[Callable[[bytes], Any]] = None
) -> Any:
try:
raw = util.load_binary_file(path)
return translator(raw) if translator else raw
except IOError as e:
LOG.debug("Failed reading path '%s': %s", path, e)
return None
except Exception as e:
raise sources.BrokenMetadata(f"Failed decoding {path}: {e}")

files = [
# tuples of (results_name, path, translator)
("metadata_raw", opath("meta_data.json"), load_json_bytes),
Expand All @@ -354,54 +375,48 @@ def load_json_bytes(blob):
("networkdata", opath("network_data.json"), load_json_bytes),
]

results = {}
for (name, path, transl) in files:
fpath = os.path.join(source_dir, path)
raw = None
try:
raw = util.load_binary_file(fpath)
except IOError as e:
LOG.debug("Failed reading path '%s': %s", fpath, e)
raw_results: Dict[str, Optional[bytes]] = {}
translated_results: Dict[str, Any] = {}

if raw is None or transl is None:
data = raw
for name, path, transl in files:
fpath = os.path.join(source_dir, path)
raw_results[name] = load_file(fpath)
if raw_results[name] is not None and transl is not None:
translated_results[name] = load_file(fpath, transl)
else:
try:
data = transl(raw)
except Exception as e:
raise sources.BrokenMetadata(
"Failed decoding %s: %s" % (path, e)
)
translated_results[name] = raw_results[name]

results[name] = data

if results.get("metadata_raw") is None:
if raw_results["metadata_raw"] is None:
raise sources.BrokenMetadata(
"%s missing required file 'meta_data.json'" % source_dir
"%s missing required file 'meta_data.json'",
a-dubs marked this conversation as resolved.
Show resolved Hide resolved
source_dir,
)

results["metadata"] = {}
translated_results["metadata"] = {}

md_raw = translated_results["metadata_raw"]
md = translated_results["metadata"]

md_raw = results["metadata_raw"]
md = results["metadata"]
if "random_seed" in md_raw:
try:
md["random_seed"] = base64.b64decode(md_raw["random_seed"])
except (ValueError, TypeError) as e:
raise sources.BrokenMetadata(
"Badly formatted metadata random_seed entry: %s" % e
"Badly formatted metadata random_seed entry: %s",
a-dubs marked this conversation as resolved.
Show resolved Hide resolved
e,
)

renames = (
("public_keys", "public-keys"),
("hostname", "local-hostname"),
("uuid", "instance-id"),
)
for mdname, newname in renames:
if mdname in md_raw:
md[newname] = md_raw[mdname]
renames = {
a-dubs marked this conversation as resolved.
Show resolved Hide resolved
"public_keys": "public-keys",
"hostname": "local-hostname",
"uuid": "instance-id",
}

for old_key, new_key in renames.items():
if old_key in md_raw:
md[new_key] = md_raw[old_key]

return results
return translated_results


# Used to match classes to dependencies
Expand Down
2 changes: 1 addition & 1 deletion cloudinit/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __init__(self, sys_cfg, distro: Distro, paths: Paths, ud_proc=None):
self.vendordata_raw = None
self.vendordata2_raw = None
self.metadata_address = None
self.network_json = UNSET
self.network_json: Optional[str] = UNSET
self.ec2_metadata = UNSET

self.ds_cfg = util.get_cfg_by_path(
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ module = [
"cloudinit.sources.DataSourceExoscale",
"cloudinit.sources.DataSourceGCE",
"cloudinit.sources.DataSourceHetzner",
"cloudinit.sources.DataSourceIBMCloud",
"cloudinit.sources.DataSourceMAAS",
"cloudinit.sources.DataSourceNoCloud",
"cloudinit.sources.DataSourceOVF",
Expand Down
Loading