Skip to content

Commit

Permalink
wip pep8 compliance
Browse files Browse the repository at this point in the history
  • Loading branch information
TomMaullin committed Jul 20, 2023
1 parent f38f0fc commit e4dfd72
Show file tree
Hide file tree
Showing 10 changed files with 854 additions and 754 deletions.
58 changes: 32 additions & 26 deletions blm/blm_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,35 @@


def _main(argv=None):

# --------------------------------------------------------------------------------
# Check inputs
# --------------------------------------------------------------------------------
# Create the parser and add argument
parser = argparse.ArgumentParser(description="BLM cluster script")
parser.add_argument('inputs_yml', type=str, nargs='?', default=os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'blm_config.yml'),
help='Path to inputs yaml file')
os.path.dirname(os.path.realpath(__file__)),
'blm_config.yml'),
help='Path to inputs yaml file')

# Parse the arguments
args = parser.parse_args()
# If the argument is just a filename without a directory,

# If the argument is just a filename without a directory,
# prepend the current working directory
if os.path.dirname(args.inputs_yml) == '':
args.inputs_yml = os.path.join(os.getcwd(), args.inputs_yml)
inputs_yml = args.inputs_yml

# Load the inputs yaml file
with open(inputs_yml, 'r') as stream:
inputs = yaml.load(stream,Loader=yaml.FullLoader)
inputs = yaml.load(stream, Loader=yaml.FullLoader)


# --------------------------------------------------------------------------------
# Read Output directory, work out number of batches
# --------------------------------------------------------------------------------
OutDir = inputs['outdir']

# Get number of nodes
numNodes = inputs['numNodes']

Expand Down Expand Up @@ -112,18 +111,19 @@ def _main(argv=None):

# Raise a value error if none of the above
else:
raise ValueError('The cluster type, ' + inputs['clusterType'] + ', is not recognized.')
raise ValueError('The cluster type, ' +
inputs['clusterType'] + ', is not recognized.')

else:
# Raise a value error if the cluster type was not specified
raise ValueError('Please specify "clusterType" in the inputs yaml.')

# --------------------------------------------------------------------------------
# Connect to client
# --------------------------------------------------------------------------------

# Connect to cluster
client = Client(cluster)
client = Client(cluster)

# --------------------------------------------------------------------------------
# Run Setup
Expand All @@ -140,7 +140,7 @@ def _main(argv=None):
# tries to rerun it every time you call the result function again, e.g. after each
# stage of the pipeline).
del future_s

# --------------------------------------------------------------------------------
# Run Batch Jobs
# --------------------------------------------------------------------------------
Expand All @@ -152,12 +152,13 @@ def _main(argv=None):
futures = []

# Submit jobs
for i in np.arange(1,nb+1):
for i in np.arange(1, nb+1):

# Run the jobNum^{th} job.
future_b = client.submit(compute_product_forms, i, inputs_yml, pure=False)
future_b = client.submit(
compute_product_forms, i, inputs_yml, pure=False)

# Append to list
# Append to list
futures.append(future_b)

# Completed jobs
Expand All @@ -181,7 +182,7 @@ def _main(argv=None):
fileGroups = np.array_split(np.arange(nb)+1, numNodes)

# Check for empty filegroups
fileGroups = [i for i in fileGroups if i.size!=0]
fileGroups = [i for i in fileGroups if i.size != 0]

# Number of file groups
numFileGroups = len(fileGroups)
Expand All @@ -190,19 +191,21 @@ def _main(argv=None):
futures = []

# Loop through nodes
for node in np.arange(1,numFileGroups + 1):
for node in np.arange(1, numFileGroups + 1):

# Run the jobNum^{th} job.
future_c = client.submit(combine_batch_designs, 'XtX', OutDir, fileGroups[node-1], pure=False)
future_c = client.submit(
combine_batch_designs, 'XtX', OutDir, fileGroups[node-1], pure=False)

# Append to list
# Append to list
futures.append(future_c)

# Loop through nodes
for node in np.arange(1,numNodes + 1):
for node in np.arange(1, numNodes + 1):

# Give the i^{th} node the i^{th} partition of the data
future_b = client.submit(combine_batch_masking, nb, node, numNodes, maskJob, inputs_yml, pure=False)
future_b = client.submit(
combine_batch_masking, nb, node, numNodes, maskJob, inputs_yml, pure=False)

# Append to list
futures.append(future_b)
Expand All @@ -220,7 +223,8 @@ def _main(argv=None):
maskJob = True

# The first job does the analysis mask (this is why the 3rd argument is set to true)
future_b_first = client.submit(combine_batch_masking, nb, numNodes + 1, numNodes, maskJob, inputs_yml, pure=False)
future_b_first = client.submit(
combine_batch_masking, nb, numNodes + 1, numNodes, maskJob, inputs_yml, pure=False)
res = future_b_first.result()

del future_b_first, res
Expand All @@ -239,7 +243,8 @@ def _main(argv=None):
for jobNum in np.arange(pnvb):

# Run the jobNum^{th} job.
future_c = client.submit(output_results, jobNum, pnvb, nb, inputs_yml, pure=False)
future_c = client.submit(
output_results, jobNum, pnvb, nb, inputs_yml, pure=False)

# Append to list
futures.append(future_c)
Expand Down Expand Up @@ -272,5 +277,6 @@ def _main(argv=None):
client.close()
client.shutdown()


if __name__ == "__main__":
_main(sys.argv[1:])
Loading

0 comments on commit e4dfd72

Please sign in to comment.