Skip to content

Commit

Permalink
Merge pull request #60 from desihub/speedup-nightwatch
Browse files Browse the repository at this point in the history
Potentially speed up Tucson nightwatch mirror
  • Loading branch information
weaverba137 authored Jun 21, 2024
2 parents 2d8b4f3 + ecba2e4 commit ca8c238
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 92 deletions.
1 change: 1 addition & 0 deletions bin/desi_tucson_transfer_catchup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ for d in engineering/focalplane engineering/focalplane/hwtables \
engineering/focalplane) priority='nice'; exclude='--exclude archive --exclude hwtables --exclude *.ipynb --exclude .ipynb_checkpoints' ;;
engineering/focalplane/hwtables) priority='nice'; exclude='--include *.csv --exclude *' ;;
spectro/data) priority=''; exclude='--exclude 2018* --exclude 2019* --exclude 2020* --exclude 2021* --exclude 2022* --exclude 2023*' ;;
spectro/nightwatch/kpno) priority='nice'; exclude='--exclude 2021* --exclude 2022* --exclude 2023*' ;;
spectro/redux/daily) priority=''; exclude='--exclude *.tmp --exclude attic --exclude exposures --exclude preproc --exclude temp --exclude tiles' ;;
spectro/redux/daily/exposures) priority=''; exclude='--exclude *.tmp' ;;
spectro/redux/daily/preproc) priority=''; exclude='--exclude *.tmp --exclude preproc-*.fits --exclude preproc-*.fits.gz' ;;
Expand Down
27 changes: 22 additions & 5 deletions py/desitransfer/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import re
import stat
import time
import pytz

MST = pytz.timezone('America/Phoenix')
Expand All @@ -34,7 +35,7 @@ def empty_rsync(out):
``True`` if there are no files to transfer.
"""
rr = re.compile(r'(receiving|sent [0-9]+ bytes|total size)')
return all([rr.match(l) is not None for l in out.split('\n') if l])
return all([rr.match(out_line) is not None for out_line in out.split('\n') if out_line])


def new_exposures(out):
Expand All @@ -52,8 +53,8 @@ def new_exposures(out):
"""
e = set()
e_re = re.compile(r'([0-9]{8})/?')
for l in out.split('\n'):
m = e_re.match(l)
for out_line in out.split('\n'):
m = e_re.match(out_line)
if m is not None:
e.add(m.groups()[0])
return e
Expand Down Expand Up @@ -125,7 +126,7 @@ def ensure_scratch(directories):
"""
for d in directories:
try:
l = os.listdir(d)
dir_list = os.listdir(d)
except FileNotFoundError:
continue
return d
Expand All @@ -143,7 +144,7 @@ def today():
This formulation, with the offset ``7/24+0.5``, is inherited from previous
nightwatch transfer scripts.
"""
return (dt.datetime.utcnow() - dt.timedelta(7/24+0.5)).strftime('%Y%m%d')
return (dt.datetime.utcnow() - dt.timedelta(7 / 24 + 0.5)).strftime('%Y%m%d')


def idle_time(start=8, end=12, tz=None):
Expand Down Expand Up @@ -174,3 +175,19 @@ def idle_time(start=8, end=12, tz=None):
return (i - s) // dt.timedelta(seconds=1)
e = dt.datetime(i.year, i.month, i.day, end, 0, 0, tzinfo=tz)
return (e - i) // dt.timedelta(seconds=1)


def exclude_years(start_year):
"""Generate rsync ``--exclude`` statements of the form ``--exclude 2020*``.
Parameters
----------
start_year : :class:`int`
First year to exclude.
Returns
-------
:class:`list`
A list suitable for appending to a command.
"""
return (' '.join([f'--exclude {y:d}*' for y in range(start_year, time.localtime().tm_year)])).split()
27 changes: 13 additions & 14 deletions py/desitransfer/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ def __init__(self, options):
self._ini = options.configuration
self.test = options.test
self.tape = options.backup
getlist = lambda x: x.split(',')
getdict = lambda x: dict([tuple(i.split(':')) for i in x.split(',')])
self.conf = ConfigParser(defaults=os.environ, strict=True,
interpolation=ExtendedInterpolation(),
converters={'list': getlist, 'dict': getdict})
converters={'list': lambda x: x.split(','),
'dict': lambda x: dict([tuple(i.split(':')) for i in x.split(',')])})
files = self.conf.read(self._ini)
# assert files[0] == self._ini
self.sections = [s for s in self.conf.sections()
Expand Down Expand Up @@ -183,11 +182,11 @@ def directory(self, d):
_, out, err = _popen(cmd)
links = sorted([x for x in out.split('\n') if x])
if links:
for l in links:
if self._link_re.search(l) is None:
log.warning("Malformed symlink detected: %s. Skipping.", l)
for link in links:
if self._link_re.search(link) is None:
log.warning("Malformed symlink detected: %s. Skipping.", link)
else:
self.exposure(d, l, status)
self.exposure(d, link, status)
else:
log.warning('No links found, check connection.')
#
Expand Down Expand Up @@ -440,9 +439,9 @@ def backup(self, d, night, status):
if self.tape:
log.debug(' '.join(cmd))
_, out, err = _popen(cmd)
with open(ls_file) as l:
data = l.read()
backup_files = [l.split()[-1] for l in data.split('\n') if l]
with open(ls_file) as ls_fileobj:
data = ls_fileobj.read()
backup_files = [ls_out.split()[-1] for ls_out in data.split('\n') if ls_out]
else:
backup_files = []
backup_file = hpss_file + '_' + night + '.tar'
Expand Down Expand Up @@ -544,9 +543,9 @@ def verify_checksum(checksum_file):
checksum_file, n_lines)
errors += "{0:d} file(s) listed but not downloaded.\n".format(n_lines)
if n_lines < 0:
log.error("%d files are not listed in %s!", -1*n_lines, checksum_file)
errors += "{0:d} file(s) downloaded but not listed.\n".format(-1*n_lines)
digest = dict([(l.split()[1], l.split()[0]) for l in lines if l])
log.error("%d files are not listed in %s!", -1 * n_lines, checksum_file)
errors += "{0:d} file(s) downloaded but not listed.\n".format(-1 * n_lines)
digest = dict([(cl.split()[1], cl.split()[0]) for cl in lines if cl])
for f in files:
ff = os.path.join(d, f)
if ff != checksum_file:
Expand Down Expand Up @@ -668,5 +667,5 @@ def main():
options.kill)
return 0
transfer.transfer()
time.sleep(sleep*60)
time.sleep(sleep * 60)
return 0
28 changes: 14 additions & 14 deletions py/desitransfer/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ def transfer(self, permission=True):
if self.extra:
for i, e in enumerate(self.extra):
cmd.insert(cmd.index('--omit-dir-times') + 1 + i, e)
with open(self.log, 'ab') as l:
l.write(("DEBUG: desi_daily_transfer %s\n" % dtVersion).encode('utf-8'))
l.write(("DEBUG: %s\n" % ' '.join(cmd)).encode('utf-8'))
l.write(("DEBUG: Transfer start: %s\n" % stamp()).encode('utf-8'))
l.flush()
p = sub.Popen(cmd, stdout=l, stderr=sub.STDOUT)
with open(self.log, 'ab') as logfile:
logfile.write(("DEBUG: desi_daily_transfer %s\n" % dtVersion).encode('utf-8'))
logfile.write(("DEBUG: %s\n" % ' '.join(cmd)).encode('utf-8'))
logfile.write(("DEBUG: Transfer start: %s\n" % stamp()).encode('utf-8'))
logfile.flush()
p = sub.Popen(cmd, stdout=logfile, stderr=sub.STDOUT)
status = p.wait()
l.write(("DEBUG: Transfer complete: %s\n" % stamp()).encode('utf-8'))
logfile.write(("DEBUG: Transfer complete: %s\n" % stamp()).encode('utf-8'))
if status == 0:
self.lock()
if permission:
Expand All @@ -80,8 +80,8 @@ def lock(self):
fpath = os.path.join(dirpath, f)
if stat.S_IMODE(os.stat(fpath).st_mode) != file_perm:
os.chmod(fpath, file_perm)
with open(self.log, 'ab') as l:
l.write(("DEBUG: Lock complete: %s\n" % stamp()).encode('utf-8'))
with open(self.log, 'ab') as logfile:
logfile.write(("DEBUG: Lock complete: %s\n" % stamp()).encode('utf-8'))

def permission(self):
"""Set permissions for DESI collaboration access.
Expand All @@ -95,12 +95,12 @@ def permission(self):
The status returned by :command:`fix_permissions.sh`.
"""
cmd = ['fix_permissions.sh', self.destination]
with open(self.log, 'ab') as l:
l.write(("DEBUG: %s\n" % ' '.join(cmd)).encode('utf-8'))
l.flush()
p = sub.Popen(cmd, stdout=l, stderr=sub.STDOUT)
with open(self.log, 'ab') as logfile:
logfile.write(("DEBUG: %s\n" % ' '.join(cmd)).encode('utf-8'))
logfile.flush()
p = sub.Popen(cmd, stdout=logfile, stderr=sub.STDOUT)
status = p.wait()
l.write(("DEBUG: Permission reset complete: %s\n" % stamp()).encode('utf-8'))
logfile.write(("DEBUG: Permission reset complete: %s\n" % stamp()).encode('utf-8'))
return status


Expand Down
5 changes: 3 additions & 2 deletions py/desitransfer/nightwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
Catchup on a specific night::
NIGHT=20200124 && rsync -rlvt --exclude-from ${DESITRANSFER}/py/desitransfer/data/desi_nightwatch_transfer_exclude.txt dts:/exposures/nightwatch/${NIGHT}/ /global/cfs/cdirs/desi/spectro/nightwatch/kpno/${NIGHT}/
NIGHT=20200124 && rsync -rlvt --exclude-from ${DESITRANSFER}/py/desitransfer/data/desi_nightwatch_transfer_exclude.txt \
dts:/exposures/nightwatch/${NIGHT}/ /global/cfs/cdirs/desi/spectro/nightwatch/kpno/${NIGHT}/
By-hand startup sequence (bash shell)::
Expand Down Expand Up @@ -114,7 +115,7 @@ def main():
options = _options()
_configure_log(options.debug)
errcount = 0
wait = options.sleep*60
wait = options.sleep * 60
source = '/exposures/nightwatch'
basedir = os.path.join(os.environ['DESI_ROOT'], 'spectro', 'nightwatch')
kpnodir = os.path.join(basedir, 'kpno')
Expand Down
3 changes: 2 additions & 1 deletion py/desitransfer/spacewatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def download_jpg(files, destination, overwrite=False, test=False):
r = requests.get(jpg)
if r.status_code == 200:
downloaded += 1
timestamp = int(datetime.datetime.strptime(r.headers['Last-Modified'], '%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=utc).timestamp())
timestamp = int(datetime.datetime.strptime(r.headers['Last-Modified'],
'%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=utc).timestamp())
with open(dst_jpg, 'wb') as j:
j.write(r.content)
os.utime(dst_jpg, (timestamp, timestamp))
Expand Down
12 changes: 10 additions & 2 deletions py/desitransfer/test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from unittest.mock import patch
from tempfile import TemporaryDirectory
from ..common import (dt, MST, dir_perm, file_perm, empty_rsync, new_exposures, rsync,
stamp, ensure_scratch, yesterday, today, idle_time)
stamp, ensure_scratch, yesterday, today, idle_time, exclude_years)


class FakeDateTime(datetime):
Expand Down Expand Up @@ -135,7 +135,7 @@ def test_today(self, mock_dt):
"""Test today's date.
"""
mock_dt.datetime.utcnow.return_value = datetime(2019, 7, 3, 5, 0, 0)
mock_dt.timedelta.return_value = timedelta(7/24+0.5)
mock_dt.timedelta.return_value = timedelta(7 / 24 + 0.5)
y = today()
self.assertEqual(y, '20190702')

Expand Down Expand Up @@ -174,3 +174,11 @@ def test_idle_time_alt_time_zone(self):
# mock_datetime.return_value = datetime(2021, 7, 3, 13, 0, 0, tzinfo=MST)
i = idle_time(tz='US/Pacific')
self.assertEqual(i, -3600)

def test_exclude_years(self):
"""Test exclude statements for a range of years.
"""
last_year = datetime.now().year - 1
ex = exclude_years(2018)
self.assertEqual(ex[1], '2018*')
self.assertEqual(ex[-1], f'{last_year:d}*')
Loading

0 comments on commit ca8c238

Please sign in to comment.