Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation for hook to define phases of playback during git commands #122

Merged
merged 10 commits into from
Jan 31, 2024
94 changes: 82 additions & 12 deletions perfact/zodbsync/subcommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os

import filelock
import json

from .helpers import Namespace

Expand Down Expand Up @@ -150,12 +151,80 @@ def gitexec(func):
- play back changed objects (diff between old and new HEAD)
- unstash
"""
def _playback_paths(self, paths):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this embedded function? Could we pull this out of def gitexec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be a problem, I think this was due to the iterative development. Basically started in the wrapper but soon moved it out but kept it contained inside of gitexec

paths = self.sync.prepare_paths(paths)
dryrun = self.args.dry_run

playback_hook = self.config.get('playback_hook', None)
if playback_hook and os.path.isfile(playback_hook):
proc = subprocess.Popen(
playback_hook, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)
out, _ = proc.communicate(json.dumps({'paths': paths}))
returncode = proc.returncode
if returncode:
raise AssertionError(
"Error calling playback hook, returncode "
"{}, [[{}]] on {}".format(
returncode, playback_hook, out
)
)
phases = json.loads(out)
else:
phases = [{'name': 'playback', 'paths': paths}]
if self.config.get('run_after_playback', None):
phases[0]['cmd'] = self.config['run_after_playback']

for ix, phase in enumerate(phases):
phase_name = phase.get('name') or str(ix)
phase_cmd = phase.get('cmd')

self.sync.playback_paths(
paths=phase['paths'],
recurse=False,
override=True,
skip_errors=self.args.skip_errors,
dryrun=dryrun,
)

if not dryrun and phase_cmd and os.path.isfile(phase_cmd):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please turn this around and by using a negated condition and continue.

self.logger.info(
'Calling phase %s, command: %s', phase_name, phase_cmd
)
proc = subprocess.Popen(
phase_cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)
out, _ = proc.communicate(json.dumps(
{'paths': phase['paths']}
))
returncode = proc.returncode

if returncode:
self.logger.error(
"Error during phase command %s, %s",
returncode, out
)
if sys.stdin.isatty():
print("Enter 'y' to continue, other to rollback")
res = input()
if res == 'y':
continue

raise AssertionError(
"Unrecoverable error in phase command"
)
else:
self.logger.info(out)

@SubCommand.with_lock
def wrapper(self, *args, **kwargs):
# Check for unstaged changes
self.check_repo()

try:
self.paths = []
func(self, *args, **kwargs)

# Fail and roll back for any of the markers of an interrupted
Expand All @@ -173,20 +242,10 @@ def wrapper(self, *args, **kwargs):
conflicts = files & set(self.unstaged_changes)
assert not conflicts, "Change in unstaged files, aborting"

# Strip site name from the start
files = [fname[len(self.sync.site):] for fname in files]
# Strip filename to get the object path
dirs = [fname.rsplit('/', 1)[0] for fname in files]
# Make unique and sort
paths = sorted(set(dirs))
self.paths = sorted(set(files))

self.sync.playback_paths(
paths=paths,
recurse=False,
override=True,
skip_errors=self.args.skip_errors,
dryrun=self.args.dry_run,
)
_playback_paths(self, self.paths)

if self.args.dry_run:
self.abort()
Expand Down Expand Up @@ -223,6 +282,17 @@ def wrapper(self, *args, **kwargs):
self.logger.exception("Unable to show diff")

self.abort()
# if we are not in dryrun we can't be sure we havent already
viktordick marked this conversation as resolved.
Show resolved Hide resolved
# committed some stuff to the data-fs so playback all paths
# abort
if not self.args.dry_run and self.paths:
self.sync.playback_paths(
paths=self.paths,
recurse=False,
override=True,
skip_errors=True,
dryrun=False,
)
raise

return wrapper
Expand Down
104 changes: 103 additions & 1 deletion perfact/zodbsync/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def prepare_pick(self, name='TestFolder', msg='Second commit'):
initialized repository. Returns the commit ID.
'''
# Add a folder, commit it
self.add_folder('TestFolder', 'Second commit')
self.add_folder(name, msg)
commit = self.get_head_id()

# Reset the commit
Expand Down Expand Up @@ -1541,3 +1541,105 @@ def test_playback_postprocess(self):
with open(self.config.path, 'w') as f:
f.write(orig_config)
del self.runner

def test_playback_hook(self):
"""
Add configuration option for a playback hook script and check that
only the paths returned are played back
"""
self.add_folder('NewFolder', 'First Folder')
self.add_folder('NewFolder2', 'Second Folder')
commit = self.get_head_id()
# Reset the commit
self.gitrun('reset', '--hard', 'HEAD~2')

playback_cmd = "{}/playback_cmd".format(self.zeo.path)
cmd_script = '\n'.join([
"#!/bin/bash",
"cat > {}"
]).format('{}.out'.format(playback_cmd))
with open(playback_cmd, 'w') as f:
f.write(cmd_script)
os.chmod(playback_cmd, 0o700)

fname = "{}/playback_hook".format(self.zeo.path)
playback_dict = [{
"paths": ["/NewFolder"],
"cmd": playback_cmd
}]

script = '\n'.join([
"#!/bin/bash",
"echo '{}'".format(json.dumps(playback_dict)),
])
with open(fname, 'w') as f:
f.write(script)
os.chmod(fname, 0o700)
with open(self.config.path) as f:
orig_config = f.read()
with open(self.config.path, 'a') as f:
f.write('\nplayback_hook = "{}"\n'.format(fname))

# Avoid error regarding reusing runner with changed config
del self.runner
self.run('pick', 'HEAD..{}'.format(commit))

assert 'NewFolder' in self.app.objectIds()
assert 'NewFolder2' not in self.app.objectIds()
assert os.path.isfile('{}.out'.format(playback_cmd))

with open(self.config.path, 'w') as f:
f.write(orig_config)
del self.runner

def test_playback_hook_failed(self):
"""
Add configuration option for a playback hook script with a
failing cmd and check that all changes are rolled back
"""
self.add_folder('NewFolder', 'First Folder')
self.add_folder('NewFolder2', 'Second Folder')
commit = self.get_head_id()
# Reset the commit
self.gitrun('reset', '--hard', 'HEAD~2')

playback_cmd = "{}/playback_cmd".format(self.zeo.path)
cmd_script = '\n'.join([
"#!/bin/bash",
"exit 42"
])
with open(playback_cmd, 'w') as f:
f.write(cmd_script)
os.chmod(playback_cmd, 0o700)

fname = "{}/playback_hook".format(self.zeo.path)
playback_dict = [{
"paths": ["/NewFolder"],
"cmd": playback_cmd
}, {
"paths": ["/NewFolder2"],
},
]
script = '\n'.join([
"#!/bin/bash",
"echo '{}'".format(json.dumps(playback_dict)),
])
with open(fname, 'w') as f:
f.write(script)
os.chmod(fname, 0o700)
with open(self.config.path) as f:
orig_config = f.read()
with open(self.config.path, 'a') as f:
f.write('\nplayback_hook = "{}"\n'.format(fname))

# Avoid error regarding reusing runner with changed config
del self.runner
with pytest.raises(AssertionError):
self.run('pick', 'HEAD..{}'.format(commit))

assert 'NewFolder' not in self.app.objectIds()
assert 'NewFolder2' not in self.app.objectIds()

with open(self.config.path, 'w') as f:
f.write(orig_config)
del self.runner
24 changes: 10 additions & 14 deletions perfact/zodbsync/zodbsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import shutil
import time # for periodic output
import sys
import json
import subprocess

# for using an explicit transaction manager
import transaction
Expand Down Expand Up @@ -660,11 +658,7 @@ def _playback_fixorder(self, path):
object_handlers[fs_data['type']].fix_order(obj, fs_data)
del self.fs_data[path]

def playback_paths(self, paths, recurse=True, override=False,
skip_errors=False, dryrun=False):
self.recurse = recurse
self.override = override
self.skip_errors = skip_errors
def prepare_paths(self, paths):
# normalize paths - cut off filenames and the site name
paths = {
path.rsplit('/', 1)[0] if (
Expand All @@ -679,9 +673,17 @@ def playback_paths(self, paths, recurse=True, override=False,
})

if not len(paths):
return
return []

paths = [path.rstrip('/') + '/' for path in paths]
return paths

def playback_paths(self, paths, recurse=True, override=False,
skip_errors=False, dryrun=False):
self.recurse = recurse
self.override = override
self.skip_errors = skip_errors
paths = self.prepare_paths(paths)

if recurse:
paths = remove_redundant_paths(paths)
Expand Down Expand Up @@ -744,12 +746,6 @@ def playback_paths(self, paths, recurse=True, override=False,
txn_mgr.abort()
else:
txn_mgr.commit()
postproc = self.config.get('run_after_playback', None)
if postproc and os.path.isfile(postproc):
self.logger.info('Calling postprocessing script ' + postproc)
proc = subprocess.Popen(postproc, stdin=subprocess.PIPE,
universal_newlines=True)
proc.communicate(json.dumps({'paths': paths}))

def recent_changes(self, since_secs=None, txnid=None, limit=50,
search_limit=100):
Expand Down