diff --git a/xfel/command_line/striping.py b/xfel/command_line/striping.py index ea63a8d220..a6c0dc2c30 100644 --- a/xfel/command_line/striping.py +++ b/xfel/command_line/striping.py @@ -9,6 +9,7 @@ # dials.combine_experiments (optionally with clustering and selecting clusters). # from dials.util import show_mail_on_error +from dxtbx.model import ExperimentList from libtbx.phil import parse from libtbx.utils import Sorry from libtbx import easy_run @@ -22,7 +23,11 @@ multiprocessing_override_str = ''' mp { + method = local use_mpi = False + mpi_command = source + mpi_option = "" + local.include_mp_in_command = False } ''' @@ -47,7 +52,7 @@ .help = "Enable to select results evenly spaced across each rungroup" "(stripes) as opposed to contiguous chunks." chunk_size = 1000 - .type = float + .type = int(value_min=1) .help = "Maximum number of images per chunk or stripe." respect_rungroup_barriers = True .type = bool @@ -243,6 +248,46 @@ for interactive unit cell clustering, use combine_experiments.clustering.dendrogram=True """ + +def chunk_pairs(expt_paths, refl_paths, max_size=1000): + """Distribute matching expt-refl pairs into chunks with < `max_size` expts""" + expt_lengths = [len(ExperimentList.from_file(expt_path, check_format=False)) + for expt_path in expt_paths] + chunk_count = math.ceil(sum(expt_lengths) / max_size) + estimated_fill = sum(expt_lengths) / chunk_count + chunks_indices = [[] for _ in range(chunk_count)] + chunk_lengths = [0] * chunk_count + currently_filled_chunk = 0 + for len_index, len_ in enumerate(expt_lengths): + if len_ / 2 + chunk_lengths[currently_filled_chunk] > estimated_fill: + currently_filled_chunk = min(chunk_count - 1, currently_filled_chunk + 1) + chunks_indices[currently_filled_chunk].append(len_index) + chunk_lengths[currently_filled_chunk] += len_ + chunked_expts, chunked_refls = [], [] + for chunk_indices in chunks_indices: + chunked_expts.append([expt_paths[i] for i in chunk_indices]) + chunked_refls.append([refl_paths[i] for i in chunk_indices]) + return chunked_expts, chunked_refls, chunk_lengths + + +def stripe_pairs(expt_paths, refl_paths, max_size=1000): + """Distribute matching expt-refl pairs into stripes with <`max_size` expts""" + expt_lengths = [len(ExperimentList.from_file(expt_path, check_format=False)) + for expt_path in expt_paths] + stripe_count = math.ceil(sum(expt_lengths) / max_size) + stripe_indices = [[] for _ in range(stripe_count)] + stripe_lengths = [0, ] * stripe_count + for len_index, len_ in enumerate(expt_lengths): + currently_filled_stripe = stripe_lengths.index(min(stripe_lengths)) + stripe_indices[currently_filled_stripe].append(len_index) + stripe_lengths[currently_filled_stripe] += len_ + striped_expts, striped_refls = [], [] + for chunk_indices in stripe_indices: + striped_expts.append([expt_paths[i] for i in chunk_indices]) + striped_refls.append([refl_paths[i] for i in chunk_indices]) + return striped_expts, striped_refls, stripe_lengths + + def allocate_chunks(results_dir, trial_no, rgs_selected=None, @@ -273,7 +318,6 @@ def allocate_chunks(results_dir, rgs[rg] = [run] else: rgs[rg].append(run) - batch_chunk_nums_sizes = {} batch_contents = {} if respect_rungroup_barriers: batchable = {rg:{rg:runs} for rg, runs in six.iteritems(rgs)} @@ -312,35 +356,26 @@ def allocate_chunks(results_dir, print("no images found for %s" % batch) del batch_contents[batch] continue - n_chunks = int(math.ceil(n_img/max_size)) - chunk_size = int(math.ceil(n_img/n_chunks)) - batch_chunk_nums_sizes[batch] = (n_chunks, chunk_size) if len(batch_contents) == 0: raise Sorry("no DIALS integration results found.") refl_ending += extension batch_chunks = {} - for batch, num_size_tuple in six.iteritems(batch_chunk_nums_sizes): - num, size = num_size_tuple + for batch in batchable: batch_chunks[batch] = [] contents = batch_contents[batch] - expts = [c for c in contents if c.endswith(expt_ending)] - refls = [c for c in contents if c.endswith(refl_ending)] + expts = sorted([c for c in contents if c.endswith(expt_ending)]) + refls = sorted([c for c in contents if c.endswith(refl_ending)]) expts, refls = match_dials_files(expts, refls, expt_ending, refl_ending) - if stripe: - for i in range(num): - expts_stripe = expts[i::num] - refls_stripe = refls[i::num] - batch_chunks[batch].append((expts_stripe, refls_stripe)) - print("striped %d experiments in %s with %d experiments per stripe and %d stripes" % \ - (len(expts), batch, len(batch_chunks[batch][0][0]), len(batch_chunks[batch]))) - else: - for i in range(num): - expts_chunk = expts[i*size:(i+1)*size] - refls_chunk = refls[i*size:(i+1)*size] - batch_chunks[batch].append((expts_chunk, refls_chunk)) - print("chunked %d experiments in %s with %d experiments per chunk and %d chunks" % \ - (len(expts), batch, len(batch_chunks[batch][0][0]), len(batch_chunks[batch]))) - return batch_chunks + pack_func = stripe_pairs if stripe else chunk_pairs + expts_packs, refls_packs, pack_lengths = pack_func(expts, refls, max_size) + for expts_pack, refls_pack in zip(expts_packs, refls_packs): + batch_chunks[batch].append((expts_pack, refls_pack)) + r = '{} {} experiments from {} files in {} into {} {} with sizes = {}' + print(r.format("Striped" if stripe else "Chunked", sum(pack_lengths), + len(expts), batch, len(pack_lengths), + "stripes" if stripe else "chunks", pack_lengths)) + return batch_chunks + def parse_retaining_scope(args, phil_scope=phil_scope): if "-c" in args: