Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log archiving family #12

Merged
merged 9 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions tests/test_log_archiving.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pyflow as pf
import pytest

from wellies.log_archiving import ArchivedRepeatFamily


@pytest.mark.parametrize(
"backup, hook, num_tasks",
[
["backup", True, 4],
[None, False, 2],
],
)
def test_log_archive(backup, hook, num_tasks):
repeat = {
"name": "YMD",
"type": "RepeatDate",
"begin": "2020-01-01",
"end": "2020-01-03",
}
with pf.Suite("s") as suite:
with ArchivedRepeatFamily("main", repeat, backup, backup) as main:
pf.Task("t1")
with pf.Family("f1"):
pf.Task("t2")

assert bool(main.exit_hook()) == hook
suite.generate_node()
tasks = suite.all_tasks
assert len(tasks) == num_tasks
134 changes: 134 additions & 0 deletions wellies/log_archiving.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import textwrap

import pyflow as pf


def repeat_factory(options):
if options["type"] == "RepeatDate":
return pf.RepeatDate(
options["name"],
options["begin"],
options["end"],
options.get("step", 1),
)
else:
raise ValueError(f"Unknown repeat type: {options['type']}")


class ArchivedRepeatFamily(pf.Family):

def __init__(
self,
name: str,
repeat: dict,
backup_root: str = None,
ecfs_backup: str = None,
**kwargs,
):
self.backup_root = backup_root or None
self._added_log_tasks = False
variables = kwargs.pop("variables", {})
if self.backup_root:
if not ecfs_backup:
raise ValueError(
"ecfs_backup must be provided if backup_root is provided"
)
variables["LOGS_BACKUP_ROOT"] = self.backup_root
variables["ECFS_BACKUP"] = ecfs_backup
exit_hooks = kwargs.pop("exit_hook", [])
exit_hooks.append(self.exit_hook())
super().__init__(
name=name,
exit_hook=exit_hooks,
variables=variables,
**kwargs,
)

with self:
self.repeat_attr = repeat_factory(repeat)

def exit_hook(self):
if not self.backup_root:
return None
return textwrap.dedent(
"""
JOBOUT=%ECF_JOBOUT%
JOB=%ECF_JOB%
ECF_OUT=%ECF_OUT%
ECF_HOME=%ECF_HOME%
BACKUP_ROOT=%LOGS_BACKUP_ROOT%

JOB=$(echo $JOB | sed -e "s:$ECF_HOME:$ECF_OUT:")
JOBDIR=$(echo ${JOBOUT%%/*})
BACKUP_DIR=$(echo $JOBDIR | sed -e s:$ECF_OUT:$BACKUP_ROOT:)
if [[ $BACKUP_DIR != "" ]] && [[ $BACKUP_DIR != $JOBDIR ]]
then
mkdir -p $BACKUP_DIR
cp $JOBOUT $BACKUP_DIR/.
cp $JOB $BACKUP_DIR/.
fi
"""
)

def _loop_task(self):
script = textwrap.dedent(
f"""
dir=$LOGS_BACKUP_ROOT/$SUITE/$FAMILY
dir_old=${{dir}}.${self.repeat_attr.name}
[[ -d $dir ]] && mv $dir $dir_old
"""
)
return pf.Task(
name="loop_logs",
script=[script],
)

def _archive_task(self):
script = textwrap.dedent(
f"""
dir=$LOGS_BACKUP_ROOT/$SUITE/$FAMILY
dir_tar=$LOGS_BACKUP_ROOT/$SUITE

if [[ -d $dir_tar ]]; then
cd $dir_tar

for log in $(ls -d ${{FAMILY}}.*); do
REPEAT_TO_TAR=$(echo $log | awk -F'.' '{{print $NF}}'')
if [[ $REPEAT_TO_TAR -lt ${self.repeat_attr.name} ]]; then
TAR_FILE=${{FAMILY}}_${{REPEAT_TO_TAR}}.tar.gz
tar -czvf $TAR_FILE $log
chmod 644 $TAR_FILE
ecp -p $TAR_FILE ${{ECFS_BACKUP}}/$TAR_FILE

rm -rf $log
rm -rf $TAR_FILE
fi
done
fi
"""
)
return pf.Task(
name="archive_logs",
script=[script],
)

def generate_node(self):
"""
Before generating node, we need to add the log archiving
tasks.
"""
if not self._added_log_tasks:
if self.backup_root:
trigger = None
for chld in self.executable_children:
if trigger is None:
trigger = chld.complete
else:
trigger = trigger & chld.complete
with self:
archive = self._archive_task()
loop = self._loop_task()
loop.triggers = trigger & archive.complete
self._added_log_tasks = True

return super().generate_node()
Loading