Skip to content

Commit 3fa30d1

Browse files
authored
Merge pull request #181 from rolfverberg/dev_rolf
Dev rolf
2 parents 2b509e9 + b741340 commit 3fa30d1

File tree

4 files changed

+237
-165
lines changed

4 files changed

+237
-165
lines changed

CHAP/common/processor.py

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,8 +1358,7 @@ def process(
13581358

13591359
# Spawn the workers to run the sub-pipeline
13601360
run_config = RunConfig(
1361-
config={'log_level': logging.getLevelName(self.logger.level),
1362-
'spawn': 1})
1361+
log_level=logging.getLevelName(self.logger.level), spawn=1)
13631362
tmp_names = []
13641363
with NamedTemporaryFile(delete=False) as fp:
13651364
fp_name = fp.name
@@ -1371,14 +1370,14 @@ def process(
13711370
tmp_names.append(f_name)
13721371
with open(f_name, 'w') as f:
13731372
yaml.dump(
1374-
{'config': run_config.__dict__,
1373+
{'config': run_config.model_dump(),
13751374
'pipeline': pipeline_config[n_proc-1]},
13761375
f, sort_keys=False)
13771376
sub_comm = MPI.COMM_SELF.Spawn(
13781377
'CHAP', args=[fp_name], maxprocs=num_proc-1)
13791378
common_comm = sub_comm.Merge(False)
13801379
# Align with the barrier in RunConfig() on common_comm
1381-
# called from the spawned main()
1380+
# called from the spawned main() in common_comm
13821381
common_comm.barrier()
13831382
# Align with the barrier in run() on common_comm
13841383
# called from the spawned main()
@@ -1421,13 +1420,15 @@ def process(
14211420
if num_proc > 1:
14221421
# Reset the scan_numbers to the original full set
14231422
spec_scans.scan_numbers = scan_numbers
1424-
# Disconnect spawned workers and cleanup temporary files
1423+
# Align with the barrier in main() on common_comm
1424+
# when disconnecting the spawned worker
14251425
common_comm.barrier()
1426+
# Disconnect spawned workers and cleanup temporary files
14261427
sub_comm.Disconnect()
14271428
for tmp_name in tmp_names:
14281429
os.remove(tmp_name)
14291430

1430-
# Construct the NeXus NXroot object
1431+
# Construct and return the NeXus NXroot object
14311432
return self._get_nxroot(
14321433
map_config, detector_config, data, independent_dimensions,
14331434
all_scalar_data, placeholder_data)
@@ -1920,24 +1921,28 @@ class MPIMapProcessor(Processor):
19201921
"""A Processor that applies a parallel generic sub-pipeline to
19211922
a map configuration.
19221923
"""
1923-
def process(self, data, sub_pipeline=None, inputdir='.', outputdir='.',
1924-
interactive=False, log_level='INFO'):
1924+
def process(self, data, config=None, sub_pipeline=None, inputdir=None,
1925+
outputdir=None, interactive=None, log_level=None):
19251926
"""Run a parallel generic sub-pipeline.
19261927
19271928
:param data: Input data.
19281929
:type data: list[PipelineData]
1930+
:param config: Initialization parameters for an instance of
1931+
common.models.map.MapConfig.
1932+
:type config: dict, optional
19291933
:param sub_pipeline: The sub-pipeline.
19301934
:type sub_pipeline: Pipeline, optional
19311935
:param inputdir: Input directory, used only if files in the
1932-
input configuration are not absolute paths,
1933-
defaults to `'.'`.
1936+
input configuration are not absolute paths.
19341937
:type inputdir: str, optional
19351938
:param outputdir: Directory to which any output figures will
1936-
be saved, defaults to `'.'`.
1939+
be saved.
19371940
:type outputdir: str, optional
1938-
:param interactive: Allows for user interactions, defaults to
1939-
`False`.
1941+
:param interactive: Allows for user interactions.
19401942
:type interactive: bool, optional
1943+
:ivar log_level: Logger level (not case sesitive).
1944+
:type log_level: Literal[
1945+
'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], optional
19411946
:return: The `data` field of the first item in the returned
19421947
list of sub-pipeline items.
19431948
"""
@@ -1955,11 +1960,24 @@ def process(self, data, sub_pipeline=None, inputdir='.', outputdir='.',
19551960
num_proc = comm.Get_size()
19561961
rank = comm.Get_rank()
19571962

1958-
# Get the map configuration from data
1959-
map_config = self.get_config(
1960-
data, 'common.models.map.MapConfig', inputdir=inputdir)
1963+
# Get the validated map configuration
1964+
try:
1965+
map_config = self.get_config(
1966+
data, 'common.models.map.MapConfig', inputdir=inputdir)
1967+
except:
1968+
self.logger.info('No valid Map configuration in input pipeline '
1969+
'data, using config parameter instead.')
1970+
try:
1971+
# Local modules
1972+
from CHAP.common.models.map import MapConfig
1973+
1974+
map_config = MapConfig(**config, inputdir=inputdir)
1975+
except Exception as exc:
1976+
raise RuntimeError from exc
19611977

19621978
# Create the spec reader configuration for each processor
1979+
# FIX: catered to EDD with one spec scan
1980+
assert len(map_config.spec_scans) == 1
19631981
spec_scans = map_config.spec_scans[0]
19641982
scan_numbers = spec_scans.scan_numbers
19651983
num_scan = len(scan_numbers)
@@ -1984,7 +2002,7 @@ def process(self, data, sub_pipeline=None, inputdir='.', outputdir='.',
19842002
run_config = {'inputdir': inputdir, 'outputdir': outputdir,
19852003
'interactive': interactive, 'log_level': log_level}
19862004
run_config.update(sub_pipeline.get('config'))
1987-
run_config = RunConfig(run_config, comm)
2005+
run_config = RunConfig(**run_config, comm=comm)
19882006
pipeline_config = []
19892007
for item in sub_pipeline['pipeline']:
19902008
if isinstance(item, dict):
@@ -1999,20 +2017,17 @@ def process(self, data, sub_pipeline=None, inputdir='.', outputdir='.',
19992017
pipeline_config.append(item)
20002018

20012019
# Run the sub-pipeline on each processor
2002-
return run(
2003-
pipeline_config, inputdir=run_config.inputdir,
2004-
outputdir=run_config.outputdir, interactive=run_config.interactive,
2005-
logger=self.logger, comm=comm)
2020+
return run(run_config, pipeline_config, logger=self.logger, comm=comm)
20062021

20072022

20082023
class MPISpawnMapProcessor(Processor):
20092024
"""A Processor that applies a parallel generic sub-pipeline to
20102025
a map configuration by spawning workers processes.
20112026
"""
20122027
def process(
2013-
self, data, num_proc=1, root_as_worker=True, collect_on_root=True,
2014-
sub_pipeline=None, inputdir='.', outputdir='.', interactive=False,
2015-
log_level='INFO'):
2028+
self, data, num_proc=1, root_as_worker=True, collect_on_root=False,
2029+
sub_pipeline=None, inputdir=None, outputdir=None, interactive=None,
2030+
log_level=None):
20162031
"""Spawn workers running a parallel generic sub-pipeline.
20172032
20182033
:param data: Input data.
@@ -2023,7 +2038,7 @@ def process(
20232038
defaults to `True`.
20242039
:type root_as_worker: bool, optional
20252040
:param collect_on_root: Collect the result of the spawned
2026-
workers on the root node, defaults to `True`.
2041+
workers on the root node, defaults to `False`.
20272042
:type collect_on_root: bool, optional
20282043
:param sub_pipeline: The sub-pipeline.
20292044
:type sub_pipeline: Pipeline, optional
@@ -2037,6 +2052,9 @@ def process(
20372052
:param interactive: Allows for user interactions, defaults to
20382053
`False`.
20392054
:type interactive: bool, optional
2055+
:ivar log_level: Logger level (not case sesitive).
2056+
:type log_level: Literal[
2057+
'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], optional
20402058
:return: The `data` field of the first item in the returned
20412059
list of sub-pipeline items.
20422060
"""
@@ -2067,7 +2085,7 @@ def process(
20672085
run_config = {'inputdir': inputdir, 'outputdir': outputdir,
20682086
'interactive': interactive, 'log_level': log_level}
20692087
run_config.update(sub_pipeline.get('config'))
2070-
run_config = RunConfig(run_config)
2088+
run_config = RunConfig(**run_config, logger=self.logger)
20712089

20722090
# Create the sub-pipeline configuration for each processor
20732091
spec_scans = map_config.spec_scans[0]
@@ -2128,7 +2146,7 @@ def process(
21282146
tmp_names.append(f_name)
21292147
with open(f_name, 'w') as f:
21302148
yaml.dump(
2131-
{'config': run_config.__dict__,
2149+
{'config': run_config.model_dump(),
21322150
'pipeline': pipeline_config[n_proc]},
21332151
f, sort_keys=False)
21342152
sub_comm = MPI.COMM_SELF.Spawn(
@@ -2143,7 +2161,7 @@ def process(
21432161

21442162
# Run the sub-pipeline on the root node
21452163
if root_as_worker:
2146-
data = runner(run_config, pipeline_config[0], common_comm)
2164+
data = runner(run_config, pipeline_config[0], comm=common_comm)
21472165
elif collect_on_root:
21482166
run_config.spawn = 0
21492167
pipeline_config = [{'common.MPICollectProcessor': {
@@ -2157,7 +2175,10 @@ def process(
21572175

21582176
# Disconnect spawned workers and cleanup temporary files
21592177
if num_proc > first_proc:
2178+
# Align with the barrier in main() on common_comm
2179+
# when disconnecting the spawned worker
21602180
common_comm.barrier()
2181+
# Disconnect spawned workers and cleanup temporary files
21612182
sub_comm.Disconnect()
21622183
for tmp_name in tmp_names:
21632184
os.remove(tmp_name)

CHAP/edd/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ def process(
535535
:type inputdir: str, optional
536536
:param outputdir: Directory to which any output figures will
537537
be saved, defaults to `'.'`.
538-
:type outputdir: str, optional
538+
:type outputdir: str, optional
539539
:param interactive: Allows for user interactions, defaults to
540540
`False`.
541541
:type interactive: bool, optional

CHAP/pipeline.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ class Pipeline():
1919
def __init__(self, items=None, kwds=None):
2020
"""Pipeline class constructor.
2121
22-
:param items: List of objects.
23-
:param kwds: List of method args for individual objects.
22+
:param items: List of objects, optional.
23+
:type items: list
24+
:param kwds: List of method keyword argugents for the objects,
25+
optional.
26+
:type kwds: list
2427
"""
2528
self.__name__ = self.__class__.__name__
2629

@@ -262,7 +265,7 @@ def execute(self, schema=None, **kwargs):
262265
kwargs['filename'] = os.path.normpath(os.path.realpath(
263266
os.path.join(outputdir, kwargs['filename'])))
264267
else:
265-
self.logger.error('No implementation of read, write, or process')
268+
self.logger.error('No implementation of read, process, or write')
266269
return None
267270

268271
method = getattr(self, method_name)
@@ -274,11 +277,12 @@ def execute(self, schema=None, **kwargs):
274277
args[k] = v
275278

276279
t0 = time()
277-
self.logger.debug(f'Executing "{method_name}" with {args}')
280+
self.logger.debug(
281+
f'Executing "{method_name}" with schema "{schema}" and {args}')
278282
self.logger.info(f'Executing "{method_name}"')
279283
data = method(**args)
280-
self.logger.info(f'Finished "{method_name}" in '
281-
+ f'{time()-t0:.0f} seconds\n')
284+
self.logger.info(
285+
f'Finished "{method_name}" in {time()-t0:.0f} seconds\n')
282286

283287
return [PipelineData(name=self.__name__,
284288
data=data,

0 commit comments

Comments
 (0)