Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruciax tsm fixes #119

Merged
merged 24 commits into from
Oct 24, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9c57d56
Bug Fix in tape uploader
XeBoris Aug 28, 2017
ced1682
Add --name option for massive-tsm, better up- and download handling f…
XeBoris Aug 30, 2017
d03f445
Prepared *.pickles deletion
XeBoris Aug 30, 2017
6708144
random rse option, delete zero sized files in tsm uploader and ruciax…
XeBoris Sep 12, 2017
458693f
activate massive-ruciax by removing commands in script creation section
XeBoris Sep 12, 2017
ec21e6a
change requests PatrickdP
XeBoris Sep 15, 2017
39fde87
Removed lines which delete zero-sized files (see pax #607, #608)
XeBoris Sep 21, 2017
7be4ac3
applied flake8 checks and changes
XeBoris Sep 21, 2017
f5218d8
Merge branch 'master' into ruciax_tsm_fixes
XeBoris Oct 10, 2017
bd3a428
code check I
XeBoris Oct 18, 2017
cf030c7
remove standalone application
XeBoris Oct 18, 2017
fc33d5e
Bug Fix in tape uploader
XeBoris Aug 28, 2017
f3275a0
Add --name option for massive-tsm, better up- and download handling f…
XeBoris Aug 30, 2017
1949187
Prepared *.pickles deletion
XeBoris Aug 30, 2017
011b892
random rse option, delete zero sized files in tsm uploader and ruciax…
XeBoris Sep 12, 2017
b21fd0d
activate massive-ruciax by removing commands in script creation section
XeBoris Sep 12, 2017
a46397e
change requests PatrickdP
XeBoris Sep 15, 2017
d1c8c92
Removed lines which delete zero-sized files (see pax #607, #608)
XeBoris Sep 21, 2017
892001e
applied flake8 checks and changes
XeBoris Sep 21, 2017
4c30339
code check I
XeBoris Oct 18, 2017
1b5911e
remove standalone application
XeBoris Oct 18, 2017
dde07c0
Merge branch 'ruciax_tsm_fixes' of https://github.com/XENON1T/cax int…
XeBoris Oct 18, 2017
f66bb18
Merge branch 'master' into ruciax_tsm_fixes
XeBoris Oct 18, 2017
5e4e206
Update main.py
XeBoris Oct 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 79 additions & 54 deletions cax/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,6 @@ def massiveruciax():

parser.add_argument('--once', action='store_true',
help="Run all tasks just one, then exits")
parser.add_argument('--on-disk-only', action='store_true',
dest='on_disk_only',
help="Select only runs which are stored at the data analysis facility anyways")
parser.add_argument('--config', action='store', type=str,
dest='config_file',
help="Load a custom .json config file into cax")
Expand All @@ -597,9 +594,6 @@ def massiveruciax():
parser.add_argument('--rucio-rule', type=str,
dest='config_rule',
help="Load the a rule file")
parser.add_argument('--skip-error', action='store_true',
dest='skip_error',
help="Skip all database entries with an error")


args = parser.parse_args()
Expand Down Expand Up @@ -674,8 +668,8 @@ def massiveruciax():
rucio_rule = "--rucio-rule {rulefile}".format( rulefile=abs_config_rule )
verfication_only = json.loads(open(abs_config_rule, 'r').read())[0]['verification_only']
else:
rucio_rule = ""
verfication_only = False
rucio_rule = ""


# Establish mongo connection
Expand Down Expand Up @@ -745,43 +739,45 @@ def massiveruciax():
#Double check that rucio uploads are only triggered when data exists at the host
host_data = False
rucio_data = False
rucio_data_upload = None
host_data_error = False

for idoc in doc['data']:
if idoc['host'] == config.get_hostname() and idoc['status'] == "transferred":
host_data = True
if idoc['host'] == config.get_hostname() and idoc['status'] == "error":
host_data_error == False
elif idoc['host'] == config.get_hostname() and idoc['status'] == "error":
host_data = True
host_data_error = True

if idoc['host'] == "rucio-catalogue":
rucio_data = True
rucio_data_upload = idoc['status']

if host_data == False and args.on_disk_only == True:
continue

if host_data == True and host_data_error == True and args.skip_error == True:
continue
if verfication_only == False:

#A rucio upload makes only sense if the data are stored at the host
#where ruciax runs right now.
if host_data != True:
continue
elif host_data == True and host_data_error == True:
continue
#Trigger rucio uploads:
#If rucio data exists which are in status transferring or error let us skip them.
#RSEreupload is still executed
if rucio_data == True and (rucio_data_upload == "transferring" or rucio_data_upload == "error" or rucio_data_upload == "transferred"):
continue

#massive-ruciax does not care about data which are already
#in the rucio catalogue for upload
if rucio_data == True and verfication_only == False:
continue
elif verfication_only == True:
if rucio_data == False or rucio_data_upload != "transferred":
continue

#Get the local time:
local_time = time.strftime("%Y%m%d_%H%M%S", time.localtime())

#Detector choice
run = ""
runlogfile = ""
if doc['detector'] == 'tpc':
run = "--run {number}".format(number=doc['number'])
runlogfile = "--log-file {log_path}/ruciax_log_{number}_{timestamp}.txt".format(
log_path=log_path[config.get_hostname()],
number=doc['number'],
timestamp=local_time)

elif doc['detector'] == 'muon_veto':
run = "--name {number}".format(number=doc['name'])
runlogfile = "--log-file {log_path}/ruciax_log_{number}_{timestamp}.txt".format(
#Prepare run name for upload and log file
run = "--name {name}".format(name=doc['name'])
runlogfile = "--log-file {log_path}/ruciax_log_{number}_{timestamp}.txt".format(
log_path=log_path[config.get_hostname()],
number=doc['number'],
timestamp=local_time)
Expand Down Expand Up @@ -830,7 +826,7 @@ def massiveruciax():
dd = divmod(diff.total_seconds(), 60)

#delete script:
qsub.delete_script( sc )
#qsub.delete_script( sc )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really want to keep the script?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course not! Thank you checking.


logging.info("+--------------------------->>>")
logging.info("| Summary: massive-ruciax for run/name: %s/%s", doc['number'], doc['name'] )
Expand Down Expand Up @@ -884,7 +880,9 @@ def massive_tsmclient():
dest='config_file',
help="Load a custom .json config file into cax")
parser.add_argument('--run', type=int,
help="Select a single run")
help="Select a single raw data set by run number")
parser.add_argument('--name', type=str,
help="Select a single raw data set by run name")
parser.add_argument('--from-run', dest='from_run', type=int,
help="Choose: run number start")
parser.add_argument('--to-run', dest='to_run', type=int,
Expand All @@ -911,6 +909,11 @@ def massive_tsmclient():
run_window_lastruns = False
run_lastdays = False

#Prevent user from up/downloading via --name and --run at the same time
if args.run != None and args.name != None:
logging.info("Input to massive-tsm is --run {r} and --name {n} at the same time! <- Forbidden".format(r=args.run, n=args.name))
exit()

if args.from_run == None and args.to_run == None and args.last_days == None:
pass
elif (args.from_run != None and args.to_run == None) or (args.from_run == None and args.to_run != None):
Expand Down Expand Up @@ -944,14 +947,24 @@ def massive_tsmclient():
args.config_file)
config.set_json(args.config_file)
config_arg = os.path.abspath(args.config_file)


#extract if tsm-server up- or download is made:
tsm_task = None
tsm_file = json.loads(open(config_arg, 'r').read())
for i_host in tsm_file:
if i_host['name'] == config.get_hostname():
if "tsm-server" in i_host['upload_options']:
tsm_task = "upload"
elif "tsm-server" in i_host['download_options']:
tsm_task = "download"

# Setup logging
log_path = {"xe1t-datamanager": "/home/xe1ttransfer/tsm_log",
"midway-login1": "n/a",
"tegner-login-1": "/afs/pdc.kth.se/home/b/bobau/tsm_log"}

if log_path[config.get_hostname()] == "n/a":
print("Modify the log path in main.py")
logging.info("Modify the log path in main.py")
exit()

if not os.path.exists(log_path[config.get_hostname()]):
Expand Down Expand Up @@ -1024,11 +1037,15 @@ def massive_tsmclient():

for doc in docs:

#Select a single run for rucio upload (massive-ruciax -> ruciax)
#Select a single run up/download
if args.run:
if args.run != doc['number']:
continue

#print("Test A")
if args.run != doc['number']:
continue
if args.name:
if str(args.name) != str(doc['name']):
continue

#Double check if a 'data' field is defind in doc (runDB entry)
if 'data' not in doc:
continue
Expand All @@ -1039,31 +1056,39 @@ def massive_tsmclient():
for idoc in doc['data']:
if idoc['host'] == config.get_hostname() and idoc['status'] == "transferred":
host_data = True
if idoc['host'] == "tsm-server" and idoc['status'] == "transferred":
tsm_data = True

if host_data == False:
if idoc['host'] == "tsm-server" and (idoc['status'] == "transferred" or idoc['status'] == "transferring"):
#we can skip a tsm-server transfer try if the tsm-status is:
# - transferred [everything fine]
# - transferring [the file is marked for upload right now [suppose that everything is fine]
tsm_data = True

#Evaluate the double check
if host_data == False and tsm_task == "upload":
#Do not try upload data which are not registered in the runDB
continue
#make sure now that only "new data" are uploaded
if tsm_data == True:
if tsm_data == True and tsm_task == "upload":
continue


#Detector choice
local_time = time.strftime("%Y%m%d_%H%M%S", time.localtime())
if doc['detector'] == 'tpc':
job = "--config {conf} --run {number} --log-file {log_path}/tsm_log_{number}_{timestamp}.txt".format(
conf=config_arg,
number=doc['number'],
log_path=log_path[config.get_hostname()],
timestamp=local_time)

elif doc['detector'] == 'muon_veto':
job = "--config {conf} --name {number} --log-file {log_path}/tsm_log_{number}_{timestamp}.txt".format(

run_selection = None
if args.run != None and args.name == None:
#select a run by run number
run_selection = "--run {run}".format(run=args.run)
elif args.run == None and args.name != None:
run_selection = "--name {run}".format(run=args.name)
elif args.run == None and args.name == None:
run_selection = "--name {run}".format(run=doc['name'])

#The job defintin string:
job = "--config {conf} {run_selection} --log-file {log_path}/tsm_log_{number}_{timestamp}.txt".format(
conf=config_arg,
number=doc['name'],
run_selection=run_selection,
log_path=log_path[config.get_hostname()],
number=doc['number'],
timestamp=local_time)

#start the time for an upload:
Expand Down
18 changes: 14 additions & 4 deletions cax/tasks/data_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ def copy_tsm(self, datum, destination, method, option_type):
logging.info("Path to tsm data: %s", raw_data_tsm)
logging.info("File/Folder for backup: %s", raw_data_filename)

#Do a simple pretest to analyse the directly what is going to be backuped
#Do a simple pretest to analyse the directory what is going to be backuped up
#continue only if there are files in the directory and no more folders
list_files = []
list_folders = []
Expand All @@ -592,10 +592,20 @@ def copy_tsm(self, datum, destination, method, option_type):
for name in dirs:
list_folders.append(name)

#Sanity check from zero sized files during the upload
# find *.pickles or *log files which are size of zere and delte them before upload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says you check if file is *.pickles or *log, but I don't see this check below. For example, a zero-sized .zip could indicate a problem, but would just get deleted here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true. I put this in the comment section to make the purpose clear (*pickle and eventually an empty log file).
For the deletion process later, I would say that a zero sized zip file indicates a problem based on pax. But we also should not upload these files then. For rucio it does not make a difference and it will complain about any zero sized file then.

BTW: I have never seen a zero sized *zip file ever.

for i_file in list_files:
file_size = os.path.getsize( os.path.join( raw_data_location, i_file) )
if file_size == 0:
logging.info("Delete file {file} with {size} bytes".format(file=i_file, size=file_size))
os.remove( i_file )
list_files.remove(i_file)

#Sanity check if raw data folder contains a subfolder (mostly important for old raw data sets)
if len(list_files) == 0 or len(list_folders) > 0:
logging.info("ERROR: There are %s files in %s", len(list_files), raw_data_path+raw_data_filename)
if len(list_folders) > 0:
looging.info("ERROR: These folders are found in %s:", raw_data_path+raw_data_filename)
logging.info("ERROR: These folders are found in %s:", raw_data_path+raw_data_filename)
for i_folders in list_folders:
logging.info(" <> %s", i_folders)
logging.info("Check the error(s) and start again")
Expand All @@ -613,8 +623,8 @@ def copy_tsm(self, datum, destination, method, option_type):
else:
logging.info("Pre-test of %s counts %s files for tape upload [succcessful]", raw_data_path+raw_data_filename, len(list_files))


#Do a checksum pre-test:
exit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to keep this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No,... Thank you 👍

#Do a checksum pre-test for double counts:
checksum_pretest_list = []
for i_file in files:
f_path = os.path.join(raw_data_path, raw_data_filename, i_file)
Expand Down
24 changes: 24 additions & 0 deletions cax/tasks/rucio_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,16 @@ def copyRucio(self, datum_original, datum_destination, option_type):
self.return_rucio['status'] = "error"
return

#Sanity check from zero sized files during the upload
# find *.pickles or *log files which are size of zere and delte them before upload
for i_file in files:
file_size = os.path.getsize( i_file )

if file_size == 0:
logging.info("Delete file {file} with {size} bytes".format(file=i_file, size=file_size))
os.remove( i_file )
files.remove(i_file)

#Create the data structure for upload:
#-------------------------------------

Expand Down Expand Up @@ -2520,6 +2530,19 @@ def rule_definition(self):
destination_livetime = t[0]['destination_livetime'] #need pre-definition
destination_condition = t[0]['destination_condition']

#catch the random statement in the destination_rse:
for i_rse in destination_rse:
if i_rse.find("random") >= 0:
key = i_rse
random_rse = i_rse.split(":")[1].replace(" ", "").split(",")
i_random_rse = random.choice(random_rse)
logging.info("(rule_definition) - A list of RSEs: {rse} is given.".format(rse=random_rse))
logging.info("(rule_definition) - Pick one random element: {e}".format(e=i_random_rse))

destination_rse.remove( key )
destination_rse.append( i_random_rse)
destination_livetime[i_random_rse] = destination_livetime.pop(key)

if t[0]['remove_rse'] == None:
remove_rse = []
else:
Expand Down Expand Up @@ -2709,6 +2732,7 @@ def set_possible_rules(self, data_type, dbinfo ):
#Get rule definition from json file
transfer_lifetime = {}
rule_def = self.rule_definition()

if rule_def != 0:
logging.info("A seperated rucio-rule file is loaded")
transfer_list, transfer_lifetime, self.delete_list = self.magic( actual_run, rule_def, all_rse )
Expand Down
27 changes: 15 additions & 12 deletions cax/tasks/tsm_mover.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Handle copying data between sites.

This is one of the key tasks of 'cax' because it's responsible for moving
data between sites. At present, it just does scp.
tsm_mover.py contains the necessary classes
to upload and download from tape backup and
syncronize it with the runDB.

Author: Boris Bauermeister
Email: [email protected]
"""

import datetime
Expand Down Expand Up @@ -202,23 +205,23 @@ def upload(self, raw_data_location):
if i.find("Total number of objects inspected:") >= 0:
tno_dict['tno_inspected'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of objects backed up:") >= 0:
tno_dict['tno_backedup'] = int(i.split(":")[1])
tno_dict['tno_backedup'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of objects updated:") >= 0:
tno_dict['tno_updated'] = int(i.split(":")[1])
tno_dict['tno_updated'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of objects rebound:") >= 0:
tno_dict['tno_rebound'] = int(i.split(":")[1])
tno_dict['tno_rebound'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of objects deleted:") >= 0:
tno_dict['tno_deleted'] = int(i.split(":")[1])
tno_dict['tno_deleted'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of objects expired:") >= 0:
tno_dict['tno_expired'] = int(i.split(":")[1])
tno_dict['tno_expired'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of objects failed:") >= 0:
tno_dict['tno_failed'] = int(i.split(":")[1])
tno_dict['tno_failed'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of objects encrypted:") >= 0:
tno_dict['tno_encrypted'] = int(i.split(":")[1])
tno_dict['tno_encrypted'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of objects grew:") >= 0:
tno_dict['tno_grew'] = int(i.split(":")[1])
tno_dict['tno_grew'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of retries:") >= 0:
tno_dict['tno_retries'] = int(i.split(":")[1])
tno_dict['tno_retries'] = int(i.split(":")[1].replace(",", ""))
elif i.find("Total number of bytes inspected:") >= 0:
tno_dict['tno_bytes_inspected'] = i.split(":")[1].replace(" ", "")
elif i.find("Total number of bytes transferred:") >= 0:
Expand Down