Skip to content

Commit

Permalink
Restructure project to avoid passing around env_config
Browse files Browse the repository at this point in the history
dormant-user committed Jan 4, 2025

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent f90090c commit 43e8979
Showing 9 changed files with 112 additions and 163 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -36,7 +36,8 @@ pip install PyUdisk
import pyudisk

if __name__ == '__main__':
pyudisk.monitor()
for metric in pyudisk.smart_metrics():
print(metric)
```

**CLI**
2 changes: 1 addition & 1 deletion pyudisk/__init__.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@

import click

from .main import EnvConfig, generate_report, monitor, smart_metrics # noqa: F401
from .main import generate_report, monitor, smart_metrics # noqa: F401

version = "1.1.0"

7 changes: 5 additions & 2 deletions pyudisk/config.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
from pydantic import BaseModel, DirectoryPath, Field, FilePath, HttpUrl, field_validator
from pydantic_settings import BaseSettings

from .models import udisk
from . import models

OPERATING_SYSTEM = platform.system()
SMARTCTL_LIB = shutil.which("smartctl")
@@ -150,7 +150,7 @@ def validate_metrics(cls, value: Metric | List[Metric]) -> List[Metric]:
for dtype in dtypes.get("anyOf")
if dtype.get("type", "") != "null"
]
for name, dtypes in udisk.Attributes.model_json_schema()
for name, dtypes in models.udisk.Attributes.model_json_schema()
.get("properties")
.items()
}
@@ -207,3 +207,6 @@ class Config:

env_file = os.environ.get("env_file") or os.environ.get("ENV_FILE") or ".env"
extra = "ignore"


env: EnvConfig = EnvConfig # noqa: TypeCheck
17 changes: 7 additions & 10 deletions pyudisk/disk_data.py
Original file line number Diff line number Diff line change
@@ -5,10 +5,8 @@
import pyarchitecture
from psutil._common import sdiskpart

from .config import EnvConfig
from . import config, models, util
from .logger import LOGGER
from .models import SystemPartitions
from .util import size_converter


def get_partitions() -> Generator[sdiskpart]:
@@ -18,7 +16,7 @@ def get_partitions() -> Generator[sdiskpart]:
sdiskpart:
Yields the partition datastructure.
"""
system_partitions = SystemPartitions()
system_partitions = models.SystemPartitions()
for partition in psutil.disk_partitions():
is_not_system_mount = all(
not partition.mountpoint.startswith(mnt)
@@ -53,7 +51,7 @@ def get_disk_io() -> Generator[Dict[str, str | List[str]]]:
# If there is only one physical disk, then set the mountpoint to root (/)
if len(physical_disks) == 1:
yield dict(
size=size_converter(psutil.disk_usage("/").total),
size=util.size_converter(psutil.disk_usage("/").total),
device_id=physical_disks[0],
node=f"/dev/{physical_disks[0]}",
mountpoints=["/"],
@@ -62,7 +60,7 @@ def get_disk_io() -> Generator[Dict[str, str | List[str]]]:
# If there are multiple physical disks, then set the mountpoint to disk path itself
for physical_disk in physical_disks:
yield dict(
size=size_converter(psutil.disk_usage("/").total),
size=util.size_converter(psutil.disk_usage("/").total),
device_id=physical_disk,
node=f"/dev/{physical_disk}",
mountpoints=[f"/dev/{physical_disk}"],
@@ -78,18 +76,17 @@ def partitions() -> Generator[Dict[str, str | List[str]]]:
"""
for partition in get_partitions():
yield dict(
size=size_converter(psutil.disk_usage(partition.mountpoint).total),
size=util.size_converter(psutil.disk_usage(partition.mountpoint).total),
device_id=partition.device.lstrip("/dev/"),
node=partition.device,
mountpoints=[partition.device],
)


def get_disk_data(env: EnvConfig, posix: bool) -> List[Dict[str, str | List[str]]]:
def get_disk_data(posix: bool) -> List[Dict[str, str | List[str]]]:
"""Get disk information for macOS and Windows machines.
Args:
env: Environment variables configuration.
posix: If the operating system is POSIX compliant.
Returns:
@@ -98,7 +95,7 @@ def get_disk_data(env: EnvConfig, posix: bool) -> List[Dict[str, str | List[str]
"""
if posix:
# 1: Attempt to extract physical disks from PyArchitecture
if pyarch := pyarchitecture.disks.get_all_disks(env.disk_lib):
if pyarch := pyarchitecture.disks.get_all_disks(config.env.disk_lib):
return pyarch
# 2: Assume disks with non-zero write count as physical disks
# disk_io_counters fn will fetch disks rather than partitions (similar to output from 'diskutil list')
94 changes: 41 additions & 53 deletions pyudisk/main.py
Original file line number Diff line number Diff line change
@@ -6,39 +6,34 @@

from pydantic import NewPath, ValidationError

from .config import OPERATING_SYSTEM, EnvConfig, OperationSystem
from .disk_data import get_disk_data
from . import config, disk_data, metrics, models, notification, parsers, util
from .logger import LOGGER
from .metrics import get_smart_metrics, get_udisk_metrics
from .models import smartctl, udisk
from .notification import notification_service, send_report
from .parsers import parse_block_devices, parse_drives
from .util import standard


def smart_metrics(env: EnvConfig) -> Generator[udisk.Disk | smartctl.Disk]:
def smart_metrics(**kwargs) -> Generator[models.udisk.Disk | models.smartctl.Disk]:
"""Gathers smart metrics using udisksctl dump, and constructs a Disk object.
Args:
env: Environment variables configuration.
Yields:
Disk:
Yields the Disk object from the generated Dataframe.
"""
if OPERATING_SYSTEM in (OperationSystem.darwin, OperationSystem.windows):
for device_info in get_disk_data(
env, OPERATING_SYSTEM != OperationSystem.windows
config.env = config.EnvConfig(**kwargs)
if config.OPERATING_SYSTEM in (
config.OperationSystem.darwin,
config.OperationSystem.windows,
):
for device_info in disk_data.get_disk_data(
config.OPERATING_SYSTEM != config.OperationSystem.windows
):
if metrics := get_smart_metrics(env.smart_lib, device_info):
if retrieved_metrics := metrics.get_smart_metrics(device_info):
try:
yield smartctl.Disk(**metrics)
yield models.smartctl.Disk(**retrieved_metrics)
except ValidationError as error:
LOGGER.error(error.errors())
return
smart_dump = get_udisk_metrics(env)
block_devices = parse_block_devices(smart_dump)
drives = {k: v for k, v in sorted(parse_drives(smart_dump).items())}
smart_dump = metrics.get_udisk_metrics()
block_devices = parsers.parse_block_devices(smart_dump)
drives = {k: v for k, v in sorted(parsers.parse_drives(smart_dump).items())}
diff = set()
# Enable mount warning by default (log warning messages if disk is not mounted)
mount_warning = os.environ.get("MOUNT_WARNING", "1") == "1"
@@ -57,18 +52,18 @@ def smart_metrics(env: EnvConfig) -> Generator[udisk.Disk | smartctl.Disk]:
LOGGER.warning("UNmounted drive(s) found - '%s'", ", ".join(diff))
optional_fields = [
k
for k, v in udisk.Disk.model_json_schema().get("properties").items()
for k, v in models.udisk.Disk.model_json_schema().get("properties").items()
if v.get("anyOf", [{}])[-1].get("type", "") == "null"
]
# S.M.A.R.T metrics can be null, but the keys are mandatory
# UDisk metrics can be null, but the keys are mandatory
for drive in drives.values():
for key in optional_fields:
if key not in drive.keys():
drive[key] = None
for drive, data in drives.items():
if block_data := block_devices.get(drive):
data["Partition"] = block_data
yield udisk.Disk(
yield models.udisk.Disk(
id=drive, model=data.get("Info", {}).get("Model", ""), **data
)
elif drive not in diff:
@@ -92,11 +87,11 @@ def generate_html(
try:
import jinja2
except ModuleNotFoundError:
standard()
util.standard()

template_dir = os.path.join(pathlib.Path(__file__).parent, "templates")
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
template = env.get_template(f"{OPERATING_SYSTEM}.html")
jinja_env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
template = jinja_env.get_template(f"{config.OPERATING_SYSTEM}.html")
now = datetime.now()
html_output = template.render(
data=data, last_updated=f"{now.strftime('%c')} {now.astimezone().tzinfo}"
@@ -118,9 +113,9 @@ def generate_report(**kwargs) -> str:
str:
Returns the report filepath.
"""
env = EnvConfig(**kwargs)
config.env = config.EnvConfig(**kwargs)
if kwargs.get("raw"):
return generate_html([disk.model_dump() for disk in smart_metrics(env)])
return generate_html([disk.model_dump() for disk in smart_metrics()])
if report_file := kwargs.get("filepath"):
assert report_file.endswith(
".html"
@@ -129,35 +124,33 @@ def generate_report(**kwargs) -> str:
os.makedirs(report_dir, exist_ok=True)
else:
if directory := kwargs.get("directory"):
env.report_dir = directory
os.makedirs(env.report_dir, exist_ok=True)
config.env.report_dir = directory
os.makedirs(config.env.report_dir, exist_ok=True)
report_file = datetime.now().strftime(
os.path.join(env.report_dir, env.report_file)
os.path.join(config.env.report_dir, config.env.report_file)
)
LOGGER.info("Generating disk report")
disk_report = [disk.model_dump() for disk in smart_metrics(env)]
disk_report = [disk.model_dump() for disk in smart_metrics()]
generate_html(disk_report, report_file)
LOGGER.info("Report has been stored in %s", report_file)
return report_file


def monitor_disk(env: EnvConfig) -> Generator[udisk.Disk]:
def monitor_disk(**kwargs) -> Generator[models.udisk.Disk]:
"""Monitors disk attributes based on the configuration.
Args:
env: Environment variables configuration.
Yields:
Disk:
Data structure parsed as a Disk object.
"""
assert (
OPERATING_SYSTEM == OperationSystem.linux
config.OPERATING_SYSTEM == config.OperationSystem.linux
), "Monitoring feature is available only for Linux machines!!"
config.env = config.EnvConfig(**kwargs)
message = ""
for disk in smart_metrics(env):
for disk in smart_metrics():
if disk.Attributes:
for metric in env.metrics:
for metric in config.env.metrics:
attribute = disk.Attributes.model_dump().get(metric.attribute)
if metric.max_threshold and attribute >= metric.max_threshold:
msg = f"{metric.attribute!r} for {disk.id!r} is >= {metric.max_threshold} at {attribute}"
@@ -175,9 +168,7 @@ def monitor_disk(env: EnvConfig) -> Generator[udisk.Disk]:
LOGGER.warning("No attributes were loaded for %s", disk.model)
yield disk
if message:
notification_service(
title="Disk Monitor Alert!!", message=message, env_config=env
)
notification.notification_service(title="Disk Monitor Alert!!", message=message)


def monitor(**kwargs) -> None:
@@ -187,27 +178,24 @@ def monitor(**kwargs) -> None:
**kwargs: Arbitrary keyword arguments.
"""
assert (
OPERATING_SYSTEM == OperationSystem.linux
config.OPERATING_SYSTEM == config.OperationSystem.linux
), "Monitoring feature is available only for Linux machines!!"
env = EnvConfig(**kwargs)
disk_report = [disk.model_dump() for disk in monitor_disk(env)]
config.env = config.EnvConfig(**kwargs)
disk_report = [disk.model_dump() for disk in monitor_disk()]
if disk_report:
LOGGER.info(
"Disk monitor report has been generated for %d disks", len(disk_report)
)
if env.disk_report:
os.makedirs(env.report_dir, exist_ok=True)
if config.env.disk_report:
os.makedirs(config.env.report_dir, exist_ok=True)
report_file = datetime.now().strftime(
os.path.join(env.report_dir, env.report_file)
os.path.join(config.env.report_dir, config.env.report_file)
)
report_data = generate_html(disk_report, report_file)
if env.gmail_user and env.gmail_pass and env.recipient:
LOGGER.info("Sending an email disk report to %s", env.recipient)
send_report(
if config.env.gmail_user and config.env.gmail_pass and config.env.recipient:
LOGGER.info("Sending an email disk report to %s", config.env.recipient)
notification.send_report(
title=f"Disk Report - {datetime.now().strftime('%c')}",
user=env.gmail_user,
password=env.gmail_pass,
recipient=env.recipient,
content=report_data,
)
else:
22 changes: 7 additions & 15 deletions pyudisk/metrics.py
Original file line number Diff line number Diff line change
@@ -2,20 +2,14 @@
import subprocess
from typing import Any, Dict, List

from pydantic import FilePath

from .config import EnvConfig
from . import config, models
from .logger import LOGGER
from .models import smartctl


def get_smart_metrics(
smart_lib: FilePath, device_info: Dict[str, str | List[str]]
) -> Dict[str, Any]:
def get_smart_metrics(device_info: Dict[str, str | List[str]]) -> Dict[str, Any]:
"""Gathers disk information using the 'smartctl' command.
Args:
smart_lib: Library path to 'smartctl' command.
device_info: Device information retrieved.
Returns:
@@ -26,7 +20,7 @@ def get_smart_metrics(
mountpoints = device_info["mountpoints"]
try:
result = subprocess.run(
[smart_lib, "-a", device_id, "--json"],
[config.env.smart_lib, "-a", device_id, "--json"],
capture_output=True,
text=True,
check=False,
@@ -46,25 +40,23 @@ def get_smart_metrics(
LOGGER.error("[%d]: %s", error.returncode, result)
output = {}
output["device"] = output.get(
"device", smartctl.Device(name=device_id, info_name=device_id).model_dump()
"device",
models.smartctl.Device(name=device_id, info_name=device_id).model_dump(),
)
output["model_name"] = output.get("model_name", device_info.get("name"))
output["mountpoints"] = mountpoints
return output


def get_udisk_metrics(env: EnvConfig) -> str:
def get_udisk_metrics() -> str:
"""Gathers disk information using the dump from 'udisksctl' command.
Args:
env: Environment variables configuration.
Returns:
str:
Returns the output from disk util dump.
"""
try:
output = subprocess.check_output(f"{env.smart_lib} dump", shell=True)
output = subprocess.check_output(f"{config.env.smart_lib} dump", shell=True)
except subprocess.CalledProcessError as error:
result = error.output.decode(encoding="UTF-8").strip()
LOGGER.error(f"[{error.returncode}]: {result}\n")
2 changes: 2 additions & 0 deletions pyudisk/models/__init__.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@

from pydantic import BaseModel, Field

from . import smartctl, udisk # noqa: F401


class SystemPartitions(BaseModel):
"""System partitions' mountpoints and fstypes."""
94 changes: 29 additions & 65 deletions pyudisk/notification.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from threading import Thread
from typing import List

from .config import EnvConfig
from . import config, util
from .logger import LOGGER
from .util import standard


def urljoin(*args) -> str:
@@ -16,50 +15,39 @@ def urljoin(*args) -> str:
return "/".join(map(lambda x: str(x).rstrip("/").lstrip("/"), args))


def notification_service(title: str, message: str, env_config: EnvConfig) -> None:
def notification_service(title: str, message: str) -> None:
"""Sends notifications using Ntfy and Telegram services.
Args:
title: Notification title.
message: Body of the notification.
env_config: Environment variables' configuration.
"""
threads: List[Thread] = []
if env_config.ntfy_url and env_config.ntfy_topic:
if config.env.ntfy_url and config.env.ntfy_topic:
thread = Thread(
target=ntfy_fn,
kwargs=dict(
title=title,
message=message,
url=env_config.ntfy_url,
topic=env_config.ntfy_topic,
username=env_config.ntfy_username,
password=env_config.ntfy_password,
),
)
threads.append(thread)
if env_config.telegram_bot_token and env_config.telegram_chat_id:
if config.env.telegram_bot_token and config.env.telegram_chat_id:
thread = Thread(
target=telegram_fn,
kwargs=dict(
title=title,
message=message,
bot_token=env_config.telegram_bot_token,
chat_id=env_config.telegram_chat_id,
message_thread_id=env_config.telegram_thread_id,
disable_notification=False,
),
)
threads.append(thread)
if env_config.gmail_user and env_config.gmail_pass and env_config.phone:
if config.env.gmail_user and config.env.gmail_pass and config.env.phone:
thread = Thread(
target=sms_fn,
kwargs=dict(
title=title,
message=message,
user=env_config.gmail_user,
password=env_config.gmail_pass,
phone=env_config.phone,
),
)
threads.append(thread)
@@ -69,26 +57,23 @@ def notification_service(title: str, message: str, env_config: EnvConfig) -> Non
thread.join()


def send_report(
title: str, user: str, password: str, recipient: str, content: str
) -> None:
def send_report(title: str, content: str) -> None:
"""Sends an email notification using Gmail's SMTP protocol.
Args:
title: Notification title.
user: Gmail username.
password: Gmail password.
recipient: Email recipient.
content: HTML body to attach to the email.
"""
try:
import gmailconnector as gc
except ModuleNotFoundError:
standard()
util.standard()

email_obj = gc.SendEmail(gmail_user=user, gmail_pass=password)
email_obj = gc.SendEmail(
gmail_user=config.env.gmail_user, gmail_pass=config.env.gmail_pass
)
response = email_obj.send_email(
subject=title, recipient=recipient, html_body=content
subject=title, recipient=config.env.recipient, html_body=content
)
if response.ok:
LOGGER.info("Report sent successfully")
@@ -97,59 +82,47 @@ def send_report(
LOGGER.error(response.json())


def sms_fn(title: str, message: str, user: str, password: str, phone: str) -> None:
def sms_fn(title: str, message: str) -> None:
"""Sends an SMS notification using Gmail's SMTP protocol.
Args:
title: Notification title.
message: Body of the notification.
user: Gmail username.
password: Gmail password.
phone: Phone number to send the SMS.
"""
try:
import gmailconnector as gc
except ModuleNotFoundError:
standard()
util.standard()

sms_obj = gc.SendSMS(gmail_user=user, gmail_pass=password)
response = sms_obj.send_sms(phone=phone, message=message, subject=title)
sms_obj = gc.SendSMS(
gmail_user=config.env.gmail_user, gmail_pass=config.env.gmail_pass
)
response = sms_obj.send_sms(phone=config.env.phone, message=message, subject=title)
if response.ok:
LOGGER.info("SMS notification sent successfully")
else:
LOGGER.error("Failed to SMS notification")
LOGGER.error(response.json())


def ntfy_fn(
title: str,
message: str,
url: str,
topic: str,
username: str = None,
password: str = None,
) -> None:
def ntfy_fn(title: str, message: str) -> None:
"""Sends a notification using Ntfy service.
Args:
title: Notification title.
message: Body of the notification.
url: Ntfy service url.
topic: Ntfy service topic.
username: Ntfy service username.
password: Ntfy service password.
"""
try:
import requests
except ModuleNotFoundError:
standard()
util.standard()

session = requests.Session()
if username and password:
session.auth = (username, password)
if config.env.ntfy_username and config.env.ntfy_password:
session.auth = (config.env.ntfy_username, config.env.ntfy_password)
try:
response = session.post(
url=urljoin(url, topic),
url=urljoin(config.env.ntfy_url, config.env.ntfy_topic),
headers={
"X-Title": title,
"Content-Type": "application/x-www-form-urlencoded",
@@ -163,41 +136,32 @@ def ntfy_fn(
LOGGER.error(error)


def telegram_fn(
title: str,
message: str,
bot_token: str,
chat_id: int,
message_thread_id: int = None,
disable_notification: bool = False,
) -> None:
def telegram_fn(title: str, message: str, disable_notification: bool = False) -> None:
"""Sends a notification using Telegram.
Args:
title: Notification title.
message: Body of the notification.
bot_token: Telegram bot token.
chat_id: Telegram chat id.
message_thread_id: Message thread id.
disable_notification: Boolean flag to disable notification.
"""
try:
import requests
except ModuleNotFoundError:
standard()
util.standard()

text = f"*{title}*\n{message}"
payload = {
"chat_id": chat_id,
"chat_id": config.env.telegram_chat_id,
"text": text,
"parse_mode": "markdown",
"disable_notification": disable_notification,
}
if message_thread_id:
payload["message_thread_id"] = message_thread_id
if config.env.telegram_thread_id:
payload["message_thread_id"] = config.env.telegram_thread_id
try:
response = requests.post(
f"https://api.org/bot{bot_token}/sendMessage", json=payload
f"https://api.org/bot{config.env.telegram_bot_token}/sendMessage",
json=payload,
)
response.raise_for_status()
LOGGER.info("Telegram notification sent successfully")
34 changes: 18 additions & 16 deletions pyudisk/parsers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Any, Dict, List

from .disk_data import get_partitions
from .models import udisk
from . import disk_data, models


def parse_drives(input_data: str) -> Dict[str, Any]:
@@ -18,13 +17,16 @@ def parse_drives(input_data: str) -> Dict[str, Any]:
head = None
category = None
for line in input_data.splitlines():
if line.startswith(udisk.Drives.head):
head = line.replace(udisk.Drives.head, "").rstrip(":").strip()
if line.startswith(models.udisk.Drives.head):
head = line.replace(models.udisk.Drives.head, "").rstrip(":").strip()
formatted[head] = {}
elif line.strip() in (udisk.Drives.category1, udisk.Drives.category2):
elif line.strip() in (
models.udisk.Drives.category1,
models.udisk.Drives.category2,
):
category = (
line.replace(udisk.Drives.category1, "Info")
.replace(udisk.Drives.category2, "Attributes")
line.replace(models.udisk.Drives.category1, "Info")
.replace(models.udisk.Drives.category2, "Attributes")
.strip()
)
formatted[head][category] = {}
@@ -56,8 +58,8 @@ def parse_block_devices(input_data: str) -> Dict[str, List[Dict[str, str]]]:
block = None
category = None
block_partitions = [
f"{udisk.BlockDevices.head}{block_device.device.split('/')[-1]}:"
for block_device in get_partitions()
f"{models.udisk.BlockDevices.head}{block_device.device.split('/')[-1]}:"
for block_device in disk_data.get_partitions()
]
for line in input_data.splitlines():
if line in block_partitions:
@@ -67,14 +69,14 @@ def parse_block_devices(input_data: str) -> Dict[str, List[Dict[str, str]]]:
block = line
block_devices[block] = {}
elif block and line.strip() in (
udisk.BlockDevices.category1,
udisk.BlockDevices.category2,
udisk.BlockDevices.category3,
models.udisk.BlockDevices.category1,
models.udisk.BlockDevices.category2,
models.udisk.BlockDevices.category3,
):
category = (
line.replace(udisk.BlockDevices.category1, "Block")
.replace(udisk.BlockDevices.category2, "Filesystem")
.replace(udisk.BlockDevices.category3, "Partition")
line.replace(models.udisk.BlockDevices.category1, "Block")
.replace(models.udisk.BlockDevices.category2, "Filesystem")
.replace(models.udisk.BlockDevices.category3, "Partition")
.strip()
)
elif block and category:
@@ -83,7 +85,7 @@ def parse_block_devices(input_data: str) -> Dict[str, List[Dict[str, str]]]:
key = key.strip()
val = val.strip()
if key == "Drive":
val = eval(val).replace(udisk.Drives.head, "")
val = eval(val).replace(models.udisk.Drives.head, "")
if key == "Symlinks":
block_devices[block][key] = [val]
except ValueError as error:

0 comments on commit 43e8979

Please sign in to comment.