From 82ccc7576bac44a1377c0e8a28d979017c4d7cc3 Mon Sep 17 00:00:00 2001 From: David Bold Date: Mon, 19 Sep 2022 15:10:07 +0200 Subject: [PATCH 1/5] Try to give better error --- xbout/load.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/xbout/load.py b/xbout/load.py index 39c77d6f..35a6ef56 100644 --- a/xbout/load.py +++ b/xbout/load.py @@ -776,6 +776,19 @@ def _arrange_for_concatenation(filepaths, nxpe=1, nype=1): nprocs = nxpe * nype n_runs = int(len(filepaths) / nprocs) + if len(filepaths) < nprocs: + if len(filepaths) == 1: + raise ValueError( + "A parallel simulation was loaded, but only a single " + "file was loaded. Please ensure to pass in all files " + "by specifing e.g. `BOUT.dmp.*.nc` rather than " + "`BOUT.dmp.0.nc`." + ) + raise ValueError( + f"A parallel simulation was loaded, but only a {len(filepathts)} " + "files were loaded. Please ensure to pass in all files " + "by specifing e.g. `BOUT.dmp.*.nc`" + ) if len(filepaths) % nprocs != 0: raise ValueError( "Each run directory does not contain an equal number " From 5b5e42934ff24d9f3121cc61ecb7f97bdd5baaeb Mon Sep 17 00:00:00 2001 From: David Bold Date: Mon, 19 Sep 2022 15:10:36 +0200 Subject: [PATCH 2/5] Use run_id to sort files --- xbout/load.py | 109 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 37 deletions(-) diff --git a/xbout/load.py b/xbout/load.py index 35a6ef56..69534b15 100644 --- a/xbout/load.py +++ b/xbout/load.py @@ -776,46 +776,81 @@ def _arrange_for_concatenation(filepaths, nxpe=1, nype=1): nprocs = nxpe * nype n_runs = int(len(filepaths) / nprocs) - if len(filepaths) < nprocs: - if len(filepaths) == 1: + runids = [] + for fp in filepaths: + with xr.open_dataset(fp) as tmp: + if "run_id" not in tmp: + runids = None + break + runids.append(tmp["run_id"]) + if not runids: + if len(filepaths) < nprocs: + if len(filepaths) == 1: + raise ValueError( + "A parallel simulation was loaded, but only a single " + "file was loaded. Please ensure to pass in all files " + "by specifing e.g. `BOUT.dmp.*.nc` rather than " + "`BOUT.dmp.0.nc`." + ) raise ValueError( - "A parallel simulation was loaded, but only a single " - "file was loaded. Please ensure to pass in all files " - "by specifing e.g. `BOUT.dmp.*.nc` rather than " - "`BOUT.dmp.0.nc`." + f"A parallel simulation was loaded, but only a {len(filepathts)} " + "files were loaded. Please ensure to pass in all files " + "by specifing e.g. `BOUT.dmp.*.nc`" ) - raise ValueError( - f"A parallel simulation was loaded, but only a {len(filepathts)} " - "files were loaded. Please ensure to pass in all files " - "by specifing e.g. `BOUT.dmp.*.nc`" - ) - if len(filepaths) % nprocs != 0: - raise ValueError( - "Each run directory does not contain an equal number " - "of output files. If the parallelization scheme of " - "your simulation changed partway-through, then please " - "load each directory separately and concatenate them " - "along the time dimension with xarray.concat()." - ) + if len(filepaths) % nprocs != 0: + raise ValueError( + "Each run directory does not contain an equal number " + "of output files. If the parallelization scheme of " + "your simulation changed partway-through, then please " + "load each directory separately and concatenate them " + "along the time dimension with xarray.concat()." + ) + # Create list of lists of filepaths, so that xarray knows how they should + # be concatenated by xarray.open_mfdataset() + paths = iter(filepaths) + paths_grid = [ + [[next(paths) for x in range(nxpe)] for y in range(nype)] + for t in range(n_runs) + ] + + else: + paths_sorted = [] + lastid = None + for path, gid in zip(filepaths, runids): + if lastid != gid: + lastid = gid + paths_sorted.append([]) + paths_sorted[-1].append(path) + paths_grid = [] + for paths in paths_sorted: + if len(paths) != nprocs: + with xr.open_dataset(paths[0]) as tmp: + if tmp["PE_XIND"] != 0 or tmp["PE_YIND"] != 0: + # The first file is missing. + warn( + f"Ignoring {len(paths)} files as the first seems to be missing: {paths}" + ) + continue + assert tmp["NXPE"] == nxpe + assert tmp["NYPE"] == nype + raise ValueError( + f"Something is wrong. We expected {nprocs} files but found {len(paths)} files." + ) + paths = iter(paths) + + paths_grid.append([[next(paths) for x in range(nxpe)] for y in range(nype)]) + + # Dimensions along which no concatenation is needed are still present as + # single-element lists, so need to concatenation along dim=None for those + concat_dims = [None, None, None] + if len(filepaths) > nprocs: + concat_dims[0] = "t" + if nype > 1: + concat_dims[1] = "y" + if nxpe > 1: + concat_dims[2] = "x" - # Create list of lists of filepaths, so that xarray knows how they should - # be concatenated by xarray.open_mfdataset() - paths = iter(filepaths) - paths_grid = [ - [[next(paths) for x in range(nxpe)] for y in range(nype)] for t in range(n_runs) - ] - - # Dimensions along which no concatenation is needed are still present as - # single-element lists, so need to concatenation along dim=None for those - concat_dims = [None, None, None] - if len(filepaths) > nprocs: - concat_dims[0] = "t" - if nype > 1: - concat_dims[1] = "y" - if nxpe > 1: - concat_dims[2] = "x" - - return paths_grid, concat_dims + return paths_grid, concat_dims def _trim(ds, *, guards, keep_boundaries, nxpe, nype, is_restart): From 1833b78f0607cca3e7283e578fcf1a474d2d0ce3 Mon Sep 17 00:00:00 2001 From: David Bold Date: Fri, 30 Sep 2022 11:56:43 +0200 Subject: [PATCH 3/5] fix typo Co-authored-by: John Omotani --- xbout/load.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xbout/load.py b/xbout/load.py index 69534b15..0cd7f98e 100644 --- a/xbout/load.py +++ b/xbout/load.py @@ -793,7 +793,7 @@ def _arrange_for_concatenation(filepaths, nxpe=1, nype=1): "`BOUT.dmp.0.nc`." ) raise ValueError( - f"A parallel simulation was loaded, but only a {len(filepathts)} " + f"A parallel simulation was loaded, but only {len(filepathts)} " "files were loaded. Please ensure to pass in all files " "by specifing e.g. `BOUT.dmp.*.nc`" ) From 0d83068f26383ee11ed31e9ae096a26dab7cbf21 Mon Sep 17 00:00:00 2001 From: David Bold Date: Fri, 30 Sep 2022 11:58:40 +0200 Subject: [PATCH 4/5] Fix indention xarray does not like merging if the data is not square. So unfortunately we cannot easily support the case where some MPI options changed during the simulations Co-authored-by: John Omotani --- xbout/load.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/xbout/load.py b/xbout/load.py index 0cd7f98e..52c8c6b0 100644 --- a/xbout/load.py +++ b/xbout/load.py @@ -840,17 +840,17 @@ def _arrange_for_concatenation(filepaths, nxpe=1, nype=1): paths_grid.append([[next(paths) for x in range(nxpe)] for y in range(nype)]) - # Dimensions along which no concatenation is needed are still present as - # single-element lists, so need to concatenation along dim=None for those - concat_dims = [None, None, None] - if len(filepaths) > nprocs: - concat_dims[0] = "t" - if nype > 1: - concat_dims[1] = "y" - if nxpe > 1: - concat_dims[2] = "x" - - return paths_grid, concat_dims + # Dimensions along which no concatenation is needed are still present as + # single-element lists, so need to concatenation along dim=None for those + concat_dims = [None, None, None] + if len(filepaths) > nprocs: + concat_dims[0] = "t" + if nype > 1: + concat_dims[1] = "y" + if nxpe > 1: + concat_dims[2] = "x" + + return paths_grid, concat_dims def _trim(ds, *, guards, keep_boundaries, nxpe, nype, is_restart): From ff65dfa087515b5fb0bce7abd838028e009a7b17 Mon Sep 17 00:00:00 2001 From: David Bold Date: Fri, 30 Sep 2022 12:45:19 +0200 Subject: [PATCH 5/5] generalise getting run_id * The filepaths may actually be datasets * The files may not exist, as they may for testing only --- xbout/load.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/xbout/load.py b/xbout/load.py index 52c8c6b0..1331b5e2 100644 --- a/xbout/load.py +++ b/xbout/load.py @@ -777,12 +777,22 @@ def _arrange_for_concatenation(filepaths, nxpe=1, nype=1): nprocs = nxpe * nype n_runs = int(len(filepaths) / nprocs) runids = [] + + def getrunid(fp): + if _is_path(fp): + try: + with xr.open_dataset(fp) as tmp: + return tmp.get("run_id", None) + except FileNotFoundError: + return None + return fp.get("run_id", None) + for fp in filepaths: - with xr.open_dataset(fp) as tmp: - if "run_id" not in tmp: - runids = None - break - runids.append(tmp["run_id"]) + thisrunid = getrunid(fp) + if thisrunid is None: + runids = None + break + runids.append(thisrunid) if not runids: if len(filepaths) < nprocs: if len(filepaths) == 1: