Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidying up everything and adding slight QoL changes. #4

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/apt_update_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ variables:
- name: package_manager
value: "apt"
tasks:
- name: refresh the cache
- name: refresh the {{ package_manager }} cache
command: "{{ package_manager }} update"

- name: display available upgrades
- name: display available upgrades with {{ package_manager }}
command: "{{ package_manager }} list --upgradable"
180 changes: 123 additions & 57 deletions meshbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,45 @@
from base64 import b64encode
from configparser import ConfigParser
import json
import math
import meshctrl
import os
import yaml

grace_period = 3 # Grace period will last for x (by default 3) second(s).

'''
Script utilities are handled in the following section.
'''

class ScriptEndTrigger(Exception):
pass

class text_color:
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
magenta = "\033[35m"
cyan = "\033[36m"
white = "\033[37m"
italic = "\x1B[3m"
reset = "\x1B[0m"

def console(message: str, required: bool=False):
'''
Helper function for terminal output, with a couple variables for the silent flag. Also clears terminal color each time.
'''
if required:
print(message)
print(message + text_color.reset)
elif not args.silent:
print(message)
print(message + text_color.reset)

async def load_config(segment: str = 'meshcentral-account') -> dict:
'''
Function that loads the segment from the meshcentral.conf (by default) file and returns the it in a dict.
'''

async def load_config(segment: str = 'meshcentral-account') -> ConfigParser:
conf_file = args.conf
if not os.path.exists(conf_file):
raise ScriptEndTrigger(f'Missing config file {conf_file}. Provide an alternative path.')
Expand All @@ -40,6 +60,10 @@ async def load_config(segment: str = 'meshcentral-account') -> ConfigParser:
return config[segment]

async def init_connection(credentials: dict) -> meshctrl.Session:
'''
Use the libmeshctrl library to initiate a Secure Websocket (wss) connection to the MeshCentral instance.
'''

session = meshctrl.Session(
credentials['websocket_url'],
user=credentials['username'],
Expand All @@ -48,37 +72,61 @@ async def init_connection(credentials: dict) -> meshctrl.Session:
await session.initialized.wait()
return session

async def translate_id_to_name(target_id: str, group_list: dict) -> str:
async def translate_nodeid_to_name(target_id: str, group_list: dict) -> str:
'''
Simple function that looks up nodeid to the human-readable name if existent - otherwise return None.
'''

for group in group_list:
for device in group_list[group]:
if device["device_id"] == target_id:
return device["device_name"]
return None

'''
Creation and compilation happends in the following section, where the yaml gets read in, and edited accordingly.
'''

async def compile_book(playbook_file: dict) -> dict:
playbook = open(playbook_file, 'r')
playbook = await replace_placeholders(yaml.safe_load(playbook))
return playbook
async def compile_book(meshbook_file: dict) -> dict:
'''
Simple function that opens the file and replaces placeholders through the next function. After that just return it.
'''

async def replace_placeholders(playbook: dict) -> dict:
variables = {var["name"]: var["value"] for var in playbook.get("variables", [])}
meshbook = open(meshbook_file, 'r')
meshbook = await replace_placeholders(yaml.safe_load(meshbook))
return meshbook

for task in playbook.get("tasks", []):
command = task.get("command", "")
async def replace_placeholders(meshbook: dict) -> dict:
'''
Replace the placeholders in both name and command fields of the tasks. According to the variables defined in the variables list.
'''

variables = {var["name"]: var["value"] for var in meshbook.get("variables", [])}

for task in meshbook.get("tasks", []):
task_name = task.get("name")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}"
task_name = task_name.replace(placeholder, var_value)
task["name"] = task_name

command = task.get("command")
for var_name, var_value in variables.items():
placeholder = f"{{{{ {var_name} }}}}" # Create the placeholder string like "{{ host1 }}"
command = command.replace(placeholder, var_value)
task["command"] = command
return playbook

return meshbook

'''
Creation and compilation of the MeshCentral nodes list (list of all nodes available to the user in the configuration) is handled in the following section.
'''

async def compile_group_list(session: meshctrl.Session) -> dict:
'''
Function that retrieves the devices from MeshCentral and compiles it into a efficient list.
'''

devices_response = await session.list_devices(details=False, timeout=10)

local_device_list = {}
Expand All @@ -96,11 +144,16 @@ async def compile_group_list(session: meshctrl.Session) -> dict:
return local_device_list

async def filter_targets(devices: list[dict], os_categories: dict, target_os: str = None) -> list[str]:
"""Filters devices based on reachability and optional OS criteria, supporting nested OS categories."""
'''
Filters devices based on reachability and optional OS criteria, supporting nested OS categories.
'''

valid_devices = []

def get_os_variants(category: str, os_map: dict) -> set:
"""Extracts all OS names under a given category if it exists."""
'''
Extracts all OS names under a given category if it exists.
'''

for key, value in os_map.items():
if key == category:
Expand Down Expand Up @@ -135,14 +188,18 @@ def get_os_variants(category: str, os_map: dict) -> set:

return valid_devices

async def gather_targets(playbook: dict, group_list: dict[str, list[dict]], os_categories: dict) -> list[str]:
"""Finds target devices based on playbook criteria (device or group)."""
async def gather_targets(meshbook: dict, group_list: dict[str, list[dict]], os_categories: dict) -> list[str]:
'''
Finds target devices based on meshbook criteria (device or group).
'''

target_list = []
target_os = playbook.get("target_os")
target_os = meshbook.get("target_os")

async def process_device_or_group(pseudo_target, group_list, os_categories, target_os) -> list[str]:
"""Helper function to process devices or groups."""
'''
Helper function to process devices or groups.
'''

matched_devices = []
for group in group_list:
Expand All @@ -154,14 +211,14 @@ async def process_device_or_group(pseudo_target, group_list, os_categories, targ
return await filter_targets(matched_devices, os_categories, target_os)
return []

match playbook:
match meshbook:
case {"device": pseudo_target}: # Single device target
if isinstance(pseudo_target, str):
matched_devices = await process_device_or_group(pseudo_target, group_list, os_categories, target_os)
target_list.extend(matched_devices)

else:
console("\033[91mPlease use devices (Notice the 'S') for multiple devices.\x1B[0m", True)
console(text_color.yellow + "Please use devices (Notice the 'S') for multiple devices.", True)

case {"devices": pseudo_target}: # List of devices
if isinstance(pseudo_target, list):
Expand All @@ -170,15 +227,15 @@ async def process_device_or_group(pseudo_target, group_list, os_categories, targ
target_list.extend(matched_devices)

else:
console("\033[91mThe 'devices' method is being used, but only one string is given. Did you mean 'device'?\x1B[0m", True)
console(text_color.yellow + "The 'devices' method is being used, but only one string is given. Did you mean 'device'?", True)

case {"group": pseudo_target}: # Single group target
if isinstance(pseudo_target, str) and pseudo_target in group_list:
matched_devices = await filter_targets(group_list[pseudo_target], os_categories, target_os)
target_list.extend(matched_devices)

else:
console("\033[91mPlease use groups (Notice the 'S') for multiple groups.\x1B[0m", True)
console(text_color.yellow + "Please use groups (Notice the 'S') for multiple groups.", True)

case {"groups": pseudo_target}: # List of groups
if isinstance(pseudo_target, list):
Expand All @@ -187,24 +244,28 @@ async def process_device_or_group(pseudo_target, group_list, os_categories, targ
matched_devices = await filter_targets(group_list[sub_pseudo_target], os_categories, target_os)
target_list.extend(matched_devices)
else:
console("\033[91mThe 'groups' method is being used, but only one string is given. Did you mean 'group'?\x1B[0m", True)
console(text_color.yellow + "The 'groups' method is being used, but only one string is given. Did you mean 'group'?", True)

return target_list

async def execute_playbook(session: meshctrl.Session, targets: dict, playbook: dict, group_list: dict) -> None:
async def execute_meshbook(session: meshctrl.Session, targets: dict, meshbook: dict, group_list: dict) -> None:
'''
Actual function that handles meshbook execution, also responsible for formatting the resulting JSON.
'''

responses_list = {}
round = 1

for task in playbook["tasks"]:
console(("\033[1m\033[92m" + str(round) + ". Running: " + task["name"] + "\033[0m"))
for task in meshbook["tasks"]:
console(text_color.green + str(round) + ". Running: " + task["name"])
response = await session.run_command(nodeids=targets, command=task["command"],ignore_output=False,timeout=900)

task_batch = []
for device in response:
device_result = response[device]["result"]
response[device]["result"] = device_result.replace("Run commands completed.", "")
response[device]["device_id"] = device
response[device]["device_name"] = await translate_id_to_name(device, group_list)
response[device]["device_name"] = await translate_nodeid_to_name(device, group_list)
task_batch.append(response[device])

responses_list["Task " + str(round)] = {
Expand All @@ -221,12 +282,16 @@ async def execute_playbook(session: meshctrl.Session, targets: dict, playbook: d
console(json.dumps(responses_list), True)

async def main():
'''
Main function where the program starts. Place from which all comands originate (eventually).
'''

parser = argparse.ArgumentParser(description="Process command-line arguments")
parser.add_argument("-pb", "--playbook", type=str, help="Path to the playbook yaml file.", required=True)
parser.add_argument("-pb", "--meshbook", type=str, help="Path to the meshbook yaml file.", required=True)

parser.add_argument("-oc", "--oscategories", type=str, help="Path to the Operating System categories JSON file.", required=False, default="./os_categories.json")
parser.add_argument("--conf", type=str, help="Path for the API configuration file (default: ./meshcentral.conf).", required=False, default="./meshcentral.conf")
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the playbook.", required=False)
parser.add_argument("--nograce", action="store_true", help="Disable the grace 3 seconds before running the meshbook.", required=False)
parser.add_argument("-i", "--indent", action="store_true", help="Use an JSON indentation of 4 when this flag is passed.", required=False)
parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output", required=False)

Expand All @@ -238,51 +303,52 @@ async def main():
with open(local_categories_file, "r") as file:
os_categories = json.load(file)

credentials, playbook = await asyncio.gather(
credentials, meshbook = await asyncio.gather(
(load_config()),
(compile_book(args.playbook))
(compile_book(args.meshbook))
)

'''
The following section mainly displays used variables and first steps of the program to the console.
'''

console(("-" * 40))
console("Playbook: " + args.playbook)
console("Operating System Categorisation file: " + args.oscategories)
console("Congiguration file: " + args.conf)
if "device" in playbook:
console("Target device: " + str(playbook["device"]))
console("meshbook: " + text_color.yellow + args.meshbook)
console("Operating System Categorisation file: " + text_color.yellow + args.oscategories)
console("Configuration file: " + text_color.yellow + args.conf)
console("Target Operating System category given: " + text_color.yellow + meshbook["target_os"])
if "device" in meshbook:
console("Target device: " + text_color.yellow + str(meshbook["device"]))

elif "group" in playbook:
console("Target group: " + str(playbook["group"]))
elif "group" in meshbook:
console("Target group: " + text_color.yellow + str(meshbook["group"]))

console("Grace: " + str((not args.nograce))) # Negation of bool for correct explanation
console("Silent: False") # Can be pre-defined because if silent flag was passed then none of this would be printed.
console("Grace: " + text_color.yellow + str((not args.nograce))) # Negation of bool for correct explanation
console("Silent: " + text_color.yellow + "False") # Can be pre-defined because if silent flag was passed then none of this would be printed.

session = await init_connection(credentials)
console(("-" * 40))
console(("\x1B[3mTrying to load the MeshCentral account credential file...\x1B[0m"))
console(("\x1B[3mTrying to load the Playbook yaml file and compile it into something workable...\x1B[0m"))
console(("\x1B[3mTrying to load the Operating System categorisation JSON file...\x1B[0m"))
console(("\x1B[3mConnecting to MeshCentral and establish a session using variables from previous credential file.\x1B[0m"))

console(text_color.italic + "Trying to load the MeshCentral account credential file...")
console(text_color.italic + "Trying to load the meshbook yaml file and compile it into something workable...")
console(text_color.italic + "Trying to load the Operating System categorisation JSON file...")
console(text_color.italic + "Connecting to MeshCentral and establish a session using variables from previous credential file.")
console(text_color.italic + "Generating group list with nodes and reference the targets from that.")

'''
End of the main information displaying section.
'''

console(("\x1B[3mGenerating group list with nodes and reference the targets from that.\x1B[0m"))
group_list = await compile_group_list(session)
targets_list = await gather_targets(playbook, group_list, os_categories)
targets_list = await gather_targets(meshbook, group_list, os_categories)

if len(targets_list) == 0:
console(("\033[91mNo targets found or targets unreachable, quitting.\x1B[0m"), True)
console(text_color.red + "No targets found or targets unreachable, quitting.", True)
console(("-" * 40), True)

else:
console(("-" * 40))

match playbook:
match meshbook:
case {"group": candidate_target_name}:
target_name = candidate_target_name

Expand All @@ -295,22 +361,22 @@ async def main():
case {"devices": candidate_target_name}:
target_name = str(candidate_target_name)

console(("\033[91mExecuting playbook on the target(s): " + target_name + ".\x1B[0m"))
console(text_color.yellow + "Executing meshbook on the target(s): " + target_name + ".")

if not args.nograce:
console(("\033[91mInitiating grace-period...\x1B[0m"))
console(text_color.yellow + "Initiating grace-period...")

for x in range(3):
console(("\033[91m{}...\x1B[0m".format(x+1)))
for x in range(grace_period):
console(text_color.yellow + "{}...".format(x+1)) # Countdown!
await asyncio.sleep(1)

console(("-" * 40))
await execute_playbook(session, targets_list, playbook, group_list)
await execute_meshbook(session, targets_list, meshbook, group_list)

await session.close()

except OSError as message:
console(message, True)
console(text_color.red + message, True)

if __name__ == "__main__":
asyncio.run(main())