diff --git a/README.md b/README.md index 5f3ffc8..a01d6a7 100644 --- a/README.md +++ b/README.md @@ -10,12 +10,12 @@ Deployed with Ansible. If you find new miners or other malicious stuff, please add those signatures to our [`intergalactic-most-wanted-list`](https://github.com/usegalaxy-eu/intergalactic-most-wanted-list). -## Prerequisites -This role expect several requirements. -1. [galaxy_jwd.py](https://github.com/usegalaxy-eu/infrastructure-playbook/blob/master/roles/usegalaxy-eu.bashrc/files/galaxy_jwd.py) must exist in the directory of `walle_script_location` -2. Python 3 -2. the python packages imported in `walle.py` and `galaxy_jwd.py` must be present -3. Following environment vars must be set: +## Host machine requirements + +1. Python 3 on the host machine +1. Python dependencies in `walle.py` and `galaxy_jwd.py` must be available (perhaps set `walle_virtualenv` for this) +1. The python dependencies in `walle.py` and `galaxy_jwd.py` must be available (perhaps in the `walle_virtualenv`) +1. Following environment vars must be set: - `GALAXY_CONFIG_FILE`: Path to the galaxy.yml file - `PGDATABASE`: Name of the Galaxy database - `PGUSER`: Galaxy database user @@ -30,11 +30,24 @@ This role expect several requirements. - `WALLE_USER_DELETION_SUBJECT`: The message's subject line. [^1]: You should always run 'dangerous' jobs in embedded Pulsar. + ## Ansible -For ansible details consult `defaults/main.yml`, it should be pretty much self-explanatory. + +1. Consult `defaults/main.yml` for available walle variables +1. You can overwrite or append to `walle_env_vars` by defining `walle_extra_env_vars` + in your playbook: + ```yml + # These values will replace the defaults + walle_extra_env_vars: + - key: GALAXY_PULSAR_APP_CONF + value: "{{ galaxy_config_dir }}/my_pulsar_app.yml" + - key: GXADMIN_PATH + value: /usr/bin/gxadmin + ``` + ## Usage -From the tools help command: +From the tool's help command: ~~~ usage: WALL·E [-h] [--chunksize CHUNKSIZE] [--min-size MIN_SIZE_MB] [--max-size MAX_SIZE_MB] [--since SINCE] [--tool TOOL] [-v] [-i] [--delete-user MIN_SEVERITY] @@ -86,6 +99,7 @@ optional arguments: -h, --help show this help message and exit --chunksize CHUNKSIZE Chunksize in MiB for hashing the files in JWDs, defaults to 100 MiB + --kill Kill malicious jobs with gxadmin. --min-size MIN_SIZE_MB Minimum filesize im MB to limit the files to scan. The check will be skipped if value is 0 (default) --max-size MAX_SIZE_MB diff --git a/defaults/main.yml b/defaults/main.yml index 71232fd..afff269 100644 --- a/defaults/main.yml +++ b/defaults/main.yml @@ -3,6 +3,7 @@ walle_malware_database_location: /etc/walle walle_malware_database_version: main walle_malware_repo: https://github.com/usegalaxy-eu/intergalactic-most-wanted-list.git +walle_malware_database_force_update: false # Local changes will be overwritten walle_pgpass_file: "/home/{{ walle_user_name }}/.pgpass" walle_bashrc: /opt/galaxy/.bashrc @@ -10,6 +11,7 @@ walle_python: /usr/bin/python walle_database_file: checksums.yml walle_log_dir: /var/log/walle walle_script_location: /usr/local/bin/walle.py +#walle_virtualenv: # Script args walle_filesize_min: 0 @@ -20,16 +22,26 @@ walle_envs_database: value: "{{ walle_malware_database_location }}/{{ walle_database_file }}" - key: PGPASSFILE value: "{{ walle_pgpass_file }}" + - key: GALAXY_CONFIG_FILE + value: "{{ galaxy_config_dir }}/galaxy.yml" + - key: GALAXY_LOG_DIR + value: "{{ galaxy_log_dir }}" - key: PGHOST value: 127.0.0.1 - key: PGUSER value: galaxy - key: PGDATABASE value: galaxy - - key: GALAXY_CONFIG_FILE - value: "{{ galaxy_config_dir }}/galaxy.yml" - key: GALAXY_PULSAR_APP_CONF value: "{{ galaxy_config_dir }}/pulsar_app.yml" + - key: GXADMIN_PATH + value: /usr/local/bin/gxadmin + +# These will be added to the default env vars - you can override +# walle_envs_database by defining the same key in walle_extra_env_vars +walle_extra_env_vars: [] + +walle_env_vars: "{{ walle_envs_database + walle_extra_env_vars }}" # delete users when malware was found and malware severity reached walle_delete_threshold walle_delete_users: false @@ -47,6 +59,7 @@ walle_envs_user_deletion: # walle_galaxy_url: #galaxy_hostname, no leading slash # walle_tool: walle_verbose: false +walle_kill: false # Cron walle_cron_day: "*" diff --git a/files/galaxy_jwd.py b/files/galaxy_jwd.py new file mode 100644 index 0000000..9214e8a --- /dev/null +++ b/files/galaxy_jwd.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python +"""Galaxy jobs's job working directory (JWD) script. + +Can get you the path of a JWD and can delete JWDs of failed jobs older than X +days. +""" + +import argparse +import os +import shutil +import sys +import textwrap +from argparse import RawDescriptionHelpFormatter +from datetime import datetime +from pathlib import Path +from typing import Optional, Tuple, Union +from xml.dom.minidom import parse + +import psycopg2 +import yaml + + +class SubcommandHelpFormatter(RawDescriptionHelpFormatter): + """Custom help formatter to hide argparse metavars.""" + + def _format_action(self, action): + """Removes the first line from subparsers.""" + parts = super(RawDescriptionHelpFormatter, self)._format_action(action) + if action.nargs == argparse.PARSER: + parts = "\n".join(parts.split("\n")[1:]) + return parts + + +def main(): + """Main function of the JWD script. + + 1. Can get you the path of a JWD + 2. Can delete JWDs of failed jobs older than X days + """ + parser = argparse.ArgumentParser( + prog="galaxy_jwd", + description=textwrap.dedent( + """ + Get the JWD path of a given Galaxy job id or clean the JWDs of old failed jobs. + + The following ENVs (same as gxadmin's) should be set: + GALAXY_CONFIG_FILE: Path to the galaxy.yml file + GALAXY_LOG_DIR: Path to the Galaxy log directory + PGDATABASE: Name of the Galaxy database + PGUSER: Galaxy database user + PGHOST: Galaxy database host + + We also need a ~/.pgpass file (same as gxadmin's) in format: + :5432:*:: + """ # noqa: E501 + ), + formatter_class=SubcommandHelpFormatter, + ) + subparsers = parser.add_subparsers( + dest="operation", + title="operations", + help=None, + metavar="", + ) + + # Parser for the get subcommand + get_parser = subparsers.add_parser( + "get", + help="Get the JWD path of a given Galaxy job id", + epilog=textwrap.dedent( + """ + example: + python galaxy_jwd.py get 12345678 + """ # noqa: E501 + ), + formatter_class=RawDescriptionHelpFormatter, + ) + get_parser.add_argument( + "job_id", + help="Galaxy job id", + ) + + # Parser for the clean subcommand + clean_parser = subparsers.add_parser( + "clean", + help="Clean JWDs of failed jobs older than X days", + epilog=textwrap.dedent( + """ + example (dry-run): + python galaxy_jwd.py clean --dry_run --days 5 + + example (no dry-run): + python galaxy_jwd.py clean --no_dry_run --days 5 + """ # noqa: E501 + ), + formatter_class=RawDescriptionHelpFormatter, + ) + dry_run_group = clean_parser.add_mutually_exclusive_group(required=True) + dry_run_group.add_argument( + "--dry_run", + help="Dry run (prints the JWDs that would be deleted)", + action="store_true", + ) + dry_run_group.add_argument( + "--no_dry_run", + help="No dry run (deletes the JWDs)", + action="store_true", + ) + clean_parser.add_argument( + "--days", + help=( + "Minimum age of jobs (in days) to be considered for deletion " + "(default: 5)" + ), + default=5, + ) + + args = parser.parse_args(args=None if sys.argv[1:] else ["--help"]) + + # Check if environment variables are set + if not os.environ.get("GALAXY_CONFIG_FILE"): + raise ValueError("Please set ENV GALAXY_CONFIG_FILE") + galaxy_config_file = os.environ.get("GALAXY_CONFIG_FILE").strip() + + # Check if the given galaxy.yml file exists + if not os.path.isfile(galaxy_config_file): + raise ValueError( + f"The given galaxy.yml file {galaxy_config_file} does not exist" + ) + + # pulsar_app.yml file path for pulsar_embedded runner + if not os.environ.get("GALAXY_PULSAR_APP_CONF"): + raise ValueError("Please set ENV GALAXY_PULSAR_APP_CONF") + galaxy_pulsar_app_conf = os.environ.get("GALAXY_PULSAR_APP_CONF").strip() + + if not os.environ.get("GALAXY_LOG_DIR"): + raise ValueError("Please set ENV GALAXY_LOG_DIR") + galaxy_log_dir = os.environ.get("GALAXY_LOG_DIR").strip() + + if not os.environ.get("PGDATABASE"): + raise ValueError("Please set ENV PGDATABASE") + db_name = os.environ.get("PGDATABASE").strip() + + if not os.environ.get("PGUSER"): + raise ValueError("Please set ENV PGUSER") + db_user = os.environ.get("PGUSER").strip() + + if not os.environ.get("PGHOST"): + raise ValueError("Please set ENV PGHOST") + db_host = os.environ.get("PGHOST").strip() + + # Check if ~/.pgpass file exists and is not empty + if ( + not os.path.isfile(os.path.expanduser("~/.pgpass")) + or os.stat(os.path.expanduser("~/.pgpass")).st_size == 0 + ): + raise ValueError( + "Please create a ~/.pgpass file in format: " + ":5432:*::" + ) + db_password = extract_password_from_pgpass( + pgpass_file=os.path.expanduser("~/.pgpass") + ) + + object_store_conf = get_object_store_conf_path(galaxy_config_file) + backends = parse_object_store(object_store_conf) + + # Add pulsar staging directory (runner: pulsar_embedded) to backends + backends["pulsar_embedded"] = get_pulsar_staging_dir(galaxy_pulsar_app_conf) + + # Connect to Galaxy database + db = Database( + dbname=db_name, + dbuser=db_user, + dbhost=db_host, + dbpassword=db_password, + ) + + # For the get subcommand + if args.operation == "get": + job_id = args.job_id + object_store_id, job_runner_name = db.get_job_info(job_id) + jwd_path = decode_path(job_id, [object_store_id], backends, job_runner_name) + + # Check + if jwd_path: + print(jwd_path) + else: + print( + f"ERROR: Job working directory (of {job_id}) does not exist", + file=sys.stderr, + ) + sys.exit(1) + + # For the clean subcommand + if args.operation == "clean": + # Check if the given Galaxy log directory exists + if not os.path.isdir(galaxy_log_dir): + raise ValueError( + f"The given Galaxy log directory {galaxy_log_dir} does not" f"exist" + ) + + # Set variables + dry_run = args.dry_run + days = args.days + jwd_cleanup_log = ( + f"{galaxy_log_dir}/" + f"jwd_cleanup_{datetime.now().strftime('%d_%m_%Y-%I_%M_%S')}.log" + ) + failed_jobs = db.get_failed_jobs(days=days) + + # Delete JWD folders if dry_run is False + # Log the folders that will be deleted + if not dry_run: + with open(jwd_cleanup_log, "w") as jwd_log: + jwd_log.write( + "The following job working directories (JWDs) belonging " + "to the failed jobs are deleted\nJob id: JWD path\n" + ) + for job_id, metadata in failed_jobs.items(): + # Delete JWD folders older than X days + jwd_path = decode_path(job_id, metadata, backends) + if jwd_path: + delete_jwd(jwd_path) + jwd_log.write(f"{job_id}: {jwd_path}\n") + else: + # Print folders of JWDs of failed jobs older than X days + for job_id, metadata in failed_jobs.items(): + jwd_path = decode_path(job_id, metadata, backends) + if jwd_path: + print(f"{job_id}: {jwd_path}") + + +def extract_password_from_pgpass(pgpass_file: Union[str, Path]) -> Union[str, None]: + """Extract the password from the ~/.pgpass file. + + The ~/.pgpass file should have the following format: + :5432:*:: + + Args: + pgpass_file: Path to the ~/.pgpass file. + + Returns: + Password for the given pg_host. + + Raises: + ValueError: The ~/.pgpass file cannot be parsed. + """ + pgpass_format = ":5432:*::" + with open(pgpass_file, "r") as pgpass: + for line in pgpass: + if line.startswith(os.environ.get("PGHOST")): + return line.split(":")[4].strip() + else: + raise ValueError( + f"Please add the password for '{os.environ.get('PGHOST')}'" + f"to the ~/.pgpass file in format: {pgpass_format}" + ) + + +def get_object_store_conf_path(galaxy_config_file: Union[str, Path]) -> Union[str, None]: + """Get the path to the object_store_conf.xml file. + + Args: + galaxy_config_file: Path to the galaxy.yml file. + + Returns: + Path to the object_store_conf.xml file. + + Raises: + ValueError: The object store configuration file specified in the + Galaxy configuration does not exist. + """ + object_store_conf = "" + with open(galaxy_config_file, "r") as config: + for line in config: + if line.strip().startswith("object_store_config_file"): + object_store_conf = line.split(":")[1].strip() + + # Check if the object_store_conf.xml file exists + if not os.path.isfile(object_store_conf): + raise ValueError(f"{object_store_conf} does not exist") + + return object_store_conf + + +def parse_object_store(object_store_conf: str) -> dict: + """Get the path of type 'job_work' from the extra_dir's for each backend. + + Args: + object_store_conf: Path to the object_store_conf.xml/yml file. + + Returns: + Dictionary of backend id and path of type 'job_work'. + """ + if object_store_conf.endswith(".xml"): + return parse_object_store_xml(object_store_conf) + if object_store_conf.split(".")[-1] in ("yml", "yaml"): + return parse_object_store_yaml(object_store_conf) + raise ValueError("Invalid object store configuration file extension") + + +def parse_object_store_xml(object_store_conf: str) -> dict: + dom = parse(object_store_conf) + backends = {} + for backend in dom.getElementsByTagName("backend"): + backend_id = backend.getAttribute("id") + backends[backend_id] = {} + # Get the extra_dir's path for each backend if type is "job_work" + for extra_dir in backend.getElementsByTagName("extra_dir"): + if extra_dir.getAttribute("type") == "job_work": + backends[backend_id] = extra_dir.getAttribute("path") + return backends + + +def parse_object_store_yaml(object_store_conf: str) -> dict: + with open(object_store_conf, "r") as f: + data = yaml.safe_load(f) + backends = {} + for backend in data["backends"]: + backend_id = backend["id"] + backends[backend_id] = {} + # Get the extra_dir's path for each backend if type is "job_work" + if "extra_dirs" in backend: + for extra_dir in backend["extra_dirs"]: + if extra_dir.get("type") == "job_work": + backends[backend_id] = extra_dir["path"] + return backends + + +def get_pulsar_staging_dir(galaxy_pulsar_app_conf: Union[str, Path]) -> str: + """Get the path to the pulsar staging directory. + + Args: + galaxy_pulsar_app_conf: Path to the pulsar_app.yml file. + + Returns: + Path to the pulsar staging directory. + + Raises: + ValueError: The Pulsar staging directory does not exist. + """ + pulsar_staging_dir = "" + with open(galaxy_pulsar_app_conf, "r") as config: + yaml_config = yaml.safe_load(config) + pulsar_staging_dir = yaml_config["staging_directory"] + + # Check if the pulsar staging directory exists + if not os.path.isdir(pulsar_staging_dir): + raise ValueError( + f"Pulsar staging directory '{pulsar_staging_dir}' does not exist" + ) + + return pulsar_staging_dir + + +def decode_path( + job_id: Union[int, str], + metadata: list, + backends_dict: dict, + job_runner_name: Optional[str] = None, +) -> Union[str, None]: + """Decode the path of JWDs and check if the path exists. + + Args: + job_id: Job id. + metadata: List of object_store_id and update_time. + backends_dict: Dictionary of backend id and path of type 'job_work'. + job_runner_name: Name of the job runner. Defaults to None. + + Returns: + Path to the JWD. + """ + job_id = str(job_id) + + # Check if object_store_id exists in our object store config + if metadata[0] not in backends_dict.keys(): + raise ValueError( + f"Object store id '{metadata[0]}' does not exist in the " + f"object_store_conf.xml file." + ) + + # Pulsar embedded jobs uses the staging directory and this has a different + # path structure + if (job_runner_name or "").startswith("pulsar_embedded"): + jwd_path = f"{backends_dict['pulsar_embedded']}/{job_id}" + else: + jwd_path = ( + f"{backends_dict[metadata[0]]}/" f"0{job_id[0:2]}/{job_id[2:5]}/{job_id}" + ) + + # Validate that the path is a JWD + # It is a JWD if the following conditions are true: + # 1. Check if tool_script.sh exists + # 2. Check if directories 'inputs', and 'outputs' exist + # 3. Additionally, we can also try and find the file + # '__instrument_core_epoch_end' and compare the timestamp in that with the + # 'update_time' (metadata[1]) of the job. + if ( + os.path.exists(jwd_path) + and os.path.exists(f"{jwd_path}/tool_script.sh") + and os.path.exists(f"{jwd_path}/inputs") + and os.path.exists(f"{jwd_path}/outputs") + ): + return jwd_path + else: + return None + + +def delete_jwd(jwd_path: str) -> None: + """Delete JWD folder and all its contents. + + Args: + jwd_path: Path to the JWD folder. + """ + try: + print(f"Deleting JWD: {jwd_path}") + shutil.rmtree(jwd_path) + except OSError as e: + print(f"Error deleting JWD: {jwd_path} : {e.strerror}") + + +class Database: + """Class to connect to the database and query DB.""" + + def __init__( + self, + dbname: str, + dbuser: str, + dbhost: str, + dbpassword: str, + ) -> None: + """Create a connection to the Galaxy database. + + Args: + dbname: Name of the database. + dbuser: Name of the database user. + dbhost: Hostname of the database. + dbpassword: Password of the database user. + """ + try: + self.conn = psycopg2.connect( + dbname=dbname, user=dbuser, host=dbhost, password=dbpassword + ) + except psycopg2.OperationalError as e: + print(f"Unable to connect to database: {e}") + + def get_failed_jobs(self, days: int) -> dict: + """Get failed jobs from DB. + + Args: + days: Minimum age of failed jobs (in days). + + Returns: + Dictionary with job_id as key and object_store_id, and update_time + as list of values. + """ + cur = self.conn.cursor() + cur.execute( + f""" + SELECT id, object_store_id, update_time + FROM job + WHERE state = 'error' + AND update_time IS NOT NULL + AND object_store_id IS NOT NULL + AND update_time <= NOW() - INTERVAL '{days} days' + """ + ) + failed_jobs = cur.fetchall() + cur.close() + self.conn.close() + + # Create a dictionary with job_id as key and object_store_id, and + # update_time as values + failed_jobs_dict = {} + for job_id, object_store_id, update_time in failed_jobs: + failed_jobs_dict[job_id] = [object_store_id, update_time] + + if not failed_jobs_dict: + print( + f"No failed jobs older than {days} days found.", + file=sys.stderr, + ) + sys.exit(1) + + return failed_jobs_dict + + def get_job_info(self, job_id: int) -> Tuple[str, str]: + """Get object_store_id and job_runner_name for a given job id. + + Args: + job_id: Job id. + + Returns: + object_store_id: Object store id. + job_runner_name: Job runner name. + """ + cur = self.conn.cursor() + cur.execute( + f""" + SELECT object_store_id, job_runner_name + FROM job + WHERE id = '{job_id}' AND object_store_id IS NOT NULL + AND job_runner_name IS NOT NULL + """ + ) + object_store_id, job_runner_name = cur.fetchone() + cur.close() + self.conn.close() + + if not object_store_id: + print( + f"Object store id and/or the job runner name for the job" + f"'{job_id}' was not found in the database", + file=sys.stderr, + ) + sys.exit(1) + + return object_store_id, job_runner_name + + +if __name__ == "__main__": + main() diff --git a/templates/walle.py b/files/walle.py similarity index 89% rename from templates/walle.py rename to files/walle.py index 4fd981a..9f5433f 100644 --- a/templates/walle.py +++ b/files/walle.py @@ -1,14 +1,22 @@ #!/usr/bin/env python +"""Keep your system clean! + +A command line script that iterates over the currently running jobs and stops +them as well as logs the user, when a file in the JWD matches to a list of +hashes. +""" + import argparse import hashlib +import logging import os import pathlib +import subprocess import sys import time import zlib -import logging -from typing import Dict +from typing import Dict, List, Union import galaxy_jwd import requests @@ -36,6 +44,7 @@ datefmt="%Y-%m-%d %H:%M", ) logger = logging.getLogger(__name__) +GXADMIN_PATH = os.getenv("GXADMIN_PATH", "/usr/local/bin/gxadmin") def convert_arg_to_byte(mb: str) -> int: @@ -151,6 +160,12 @@ def make_parser() -> argparse.ArgumentParser: default=100, ) + my_parser.add_argument( + "--kill", + action="store_true", + help="Kill malicious jobs with gxadmin.", + ) + my_parser.add_argument( "--min-size", metavar="MIN_SIZE_MB", @@ -296,7 +311,7 @@ def check_if_jwd_exists_and_get_files(self, args: argparse.Namespace) -> bool: return False def collect_files_in_a_directory( - self, args: argparse.Namespace, dirpath: str, filenames: list[str] + self, args: argparse.Namespace, dirpath: str, filenames: List[str] ): for filename in filenames: file = pathlib.Path(os.path.join(dirpath, filename)) @@ -404,23 +419,29 @@ def load_malware_lib_from_env(malware_file: pathlib.Path) -> dict: def digest_file_crc32(chunksize: int, path: pathlib.Path) -> int: crc32 = 0 - with open(path, "rb") as specimen: - while chunk := specimen.read(chunksize): - crc32 = zlib.crc32(chunk, crc32) + try: + with open(path, "rb") as specimen: + while chunk := specimen.read(chunksize): + crc32 = zlib.crc32(chunk, crc32) + except PermissionError: + logger.warning(f"Permission denied for file: {path}") return crc32 def digest_file_sha1(chunksize: int, path: pathlib.Path) -> str: sha1 = hashlib.sha1() - with open(path, "rb") as specimen: - while chunk := specimen.read(chunksize): - sha1.update(chunk) + try: + with open(path, "rb") as specimen: + while chunk := specimen.read(chunksize): + sha1.update(chunk) + except PermissionError: + logger.warning(f"Permission denied for file: {path}") return sha1.hexdigest() def scan_file_for_malware( - chunksize: int, file: pathlib.Path, lib: list[Malware] -) -> list[Malware]: + chunksize: int, file: pathlib.Path, lib: List[Malware] +) -> List[Malware]: """ Returning a list of Malware, because it could potentially happen (even if it should not), @@ -436,13 +457,22 @@ def scan_file_for_malware( """ matches = [] crc32 = digest_file_crc32(chunksize, file) + logger.debug(f"File {file} calculated CRC32: {crc32}") sha1 = None for malware in lib: if malware.crc32 == crc32: + logger.debug( + f"File {file} CRC32 matches {malware.program} {malware.version}" + ) if sha1 is None: sha1 = digest_file_sha1(chunksize, file) + logger.debug(f"File {file} calculated SHA-1: {sha1}") if malware.sha1 == sha1: matches.append(malware) + else: + logger.debug( + f"File {file} SHA1 does not match {malware.program} {malware.version}" + ) return matches @@ -507,7 +537,13 @@ def get_jwd_path(self, job: Job) -> str: class RunningJobDatabase(galaxy_jwd.Database): - def __init__(self, db_name: str, db_host=None, db_user=None, db_password=None): + def __init__( + self, + db_name: str, + db_host: Union[str, None] = None, + db_user: Union[str, None] = None, + db_password: Union[str, None] = None, + ): super().__init__( db_name, db_user, @@ -515,7 +551,7 @@ def __init__(self, db_name: str, db_host=None, db_user=None, db_password=None): db_password, ) - def get_running_jobs(self, tool: str) -> list[Job]: + def get_running_jobs(self, tool: str) -> List[Job]: query = """ SELECT j.user_id, u.username, u.email, j.tool_id, j.id, j.job_runner_external_id, j.job_runner_name, j.object_store_id @@ -535,8 +571,10 @@ def get_running_jobs(self, tool: str) -> list[Job]: self.conn.close() # Create a dictionary with job_id as key and object_store_id, and # update_time as values - if not running_jobs: - logger.warning("No running jobs with tool_id like '%s' found.", tool) + if running_jobs: + logger.debug(f"Found {len(running_jobs)} running jobs matching '{tool}'") + else: + logger.debug(f"No running jobs with tool_id like {tool} found.") sys.exit(0) running_jobs_list = [] for ( @@ -564,6 +602,36 @@ def get_running_jobs(self, tool: str) -> list[Job]: return running_jobs_list +def kill_job(job: Job): + """Attempt to kill a job by its galaxy_id using gxadmin.""" + logger.info(f"Failing malicious job: {job.galaxy_id}") + serial_args = [ + [ + GXADMIN_PATH, + "mutate", + "fail-job", + str(job.galaxy_id), + "--commit", + ], + [ + GXADMIN_PATH, + "mutate", + "fail-terminal-datasets", + "--commit", + ], + ] + for args in serial_args: + logger.debug(f"COMMAND: {' '.join(args)}") + try: + result = subprocess.run(args, check=True, capture_output=True, text=True) + if result.stdout: + logger.debug(f"COMMAND STDOUT:\n{result.stdout}") + if result.stderr: + logger.debug(f"COMMAND STDERR:\n{result.stderr}") + except subprocess.CalledProcessError as e: + logger.error(f"Error failing job {job.galaxy_id}:\n{e}") + + def evaluate_match_for_deletion( job: Job, match: Malware, @@ -571,7 +639,10 @@ def evaluate_match_for_deletion( severity: Severity, ) -> UserIdMail: """ - If in verbose mode, print detailed information for every match. No updates on 'reported' needed. + Miner Finder's main function. Shows a status bar while processing the jobs + found in Galaxy. + If in verbose mode, print detailed information for every match. No updates + on 'reported' needed. """ if job.user_id not in delete_users and (severity <= match.severity): return {job.user_id: job.user_mail} @@ -750,6 +821,8 @@ def main(): reported_users.update(case.report_according_to_verbosity()) if args.delete_user: delete_users.update(case.mark_user_for_deletion(args.delete_user)) + if matching_malware and args.kill: + kill_job(job) # Deletes users at the end, to report all malicious jobs of a user if args.delete_user: api = GalaxyAPI( diff --git a/tasks/main.yml b/tasks/main.yml index 42837fe..abcd2d0 100644 --- a/tasks/main.yml +++ b/tasks/main.yml @@ -11,7 +11,7 @@ - "{{ walle_malware_database_location }}" - name: Deploy (WallE) - ansible.builtin.template: + ansible.builtin.copy: src: walle.py dest: "{{ walle_script_location }}" owner: root @@ -23,14 +23,20 @@ repo: "{{ walle_malware_repo }}" dest: "{{ walle_malware_database_location }}" version: "{{ walle_malware_database_version }}" + force: "{{ walle_malware_database_force_update }}" + ignore_errors: "{{ walle_malware_database_force_update }}" +- name: Touch walle .bashrc file + ansible.builtin.file: + path: "{{ walle_bashrc }}" + state: touch -- name: Add env variable for database (WallE) +- name: Add env variables to WallE .bashrc ansible.builtin.lineinfile: path: "{{ walle_bashrc }}" regexp: "^export {{ item.key }}=" line: 'export {{ item.key }}="{{ item.value }}"' - with_items: "{{ walle_envs_database }}" + with_items: "{{ walle_env_vars }}" - name: Add env variables for user deletion (WallE) ansible.builtin.lineinfile: @@ -40,6 +46,14 @@ with_items: "{{ walle_envs_user_deletion }}" when: walle_delete_users +- name: Copy galaxy_jwd.py script to walle_script_location + ansible.builtin.copy: + src: "galaxy_jwd.py" + dest: "{{ walle_script_location|dirname }}/galaxy_jwd.py" + mode: 0755 + owner: "{{ walle_user_name }}" + group: "{{ walle_user_group }}" + - name: Create logfile (WallE) ansible.builtin.file: state: touch @@ -74,7 +88,7 @@ minute: "{{ walle_cron_minute }}" user: "{{ walle_user_name }}" job: > - source {{ walle_bashrc }}; + BASH_ENV={{ walle_bashrc }} bash -c " {% if walle_virtualenv %} source {{ walle_virtualenv }}/bin/activate; {{ walle_virtualenv }}/bin/python {% else %} {{ walle_python }} {% endif %} @@ -84,7 +98,8 @@ {{ walle_filesize_min }} {% endif %} {% if walle_filesize_max %} --max-size {{ walle_filesize_max }} {% endif %} {% if walle_since_hours %} --since {{ walle_since_hours }} {% endif %} - {% if walle_verbose %} -v {% endif %} + {% if walle_verbose %} --verbose {% endif %} {% if walle_delete_users %} --delete-user {{ walle_delete_threshold }} {% endif %} - >> {{ walle_log_dir }}/walle.log 2>&1 + {% if walle_kill %} --kill {% endif %} + >> {{ walle_log_dir }}/walle.log 2>&1"