Skip to content

Commit f87cca3

Browse files
committed
refactor: First pass at making runner.RunConfig a Pydantic class
Only tested on common.processor.MapProcessor with and without MPI
1 parent c4aa272 commit f87cca3

File tree

4 files changed

+198
-146
lines changed

4 files changed

+198
-146
lines changed

CHAP/common/processor.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,8 +1358,7 @@ def process(
13581358

13591359
# Spawn the workers to run the sub-pipeline
13601360
run_config = RunConfig(
1361-
config={'log_level': logging.getLevelName(self.logger.level),
1362-
'spawn': 1})
1361+
log_level=logging.getLevelName(self.logger.level), spawn=1)
13631362
tmp_names = []
13641363
with NamedTemporaryFile(delete=False) as fp:
13651364
fp_name = fp.name
@@ -1371,14 +1370,14 @@ def process(
13711370
tmp_names.append(f_name)
13721371
with open(f_name, 'w') as f:
13731372
yaml.dump(
1374-
{'config': run_config.__dict__,
1373+
{'config': run_config.model_dump(),
13751374
'pipeline': pipeline_config[n_proc-1]},
13761375
f, sort_keys=False)
13771376
sub_comm = MPI.COMM_SELF.Spawn(
13781377
'CHAP', args=[fp_name], maxprocs=num_proc-1)
13791378
common_comm = sub_comm.Merge(False)
13801379
# Align with the barrier in RunConfig() on common_comm
1381-
# called from the spawned main()
1380+
# called from the spawned main() in common_comm
13821381
common_comm.barrier()
13831382
# Align with the barrier in run() on common_comm
13841383
# called from the spawned main()
@@ -1421,13 +1420,15 @@ def process(
14211420
if num_proc > 1:
14221421
# Reset the scan_numbers to the original full set
14231422
spec_scans.scan_numbers = scan_numbers
1424-
# Disconnect spawned workers and cleanup temporary files
1423+
# Align with the barrier in main() on common_comm
1424+
# when disconnecting the spawned worker
14251425
common_comm.barrier()
1426+
# Disconnect spawned workers and cleanup temporary files
14261427
sub_comm.Disconnect()
14271428
for tmp_name in tmp_names:
14281429
os.remove(tmp_name)
14291430

1430-
# Construct the NeXus NXroot object
1431+
# Construct and return the NeXus NXroot object
14311432
return self._get_nxroot(
14321433
map_config, detector_config, data, independent_dimensions,
14331434
all_scalar_data, placeholder_data)
@@ -1984,7 +1985,7 @@ def process(self, data, sub_pipeline=None, inputdir='.', outputdir='.',
19841985
run_config = {'inputdir': inputdir, 'outputdir': outputdir,
19851986
'interactive': interactive, 'log_level': log_level}
19861987
run_config.update(sub_pipeline.get('config'))
1987-
run_config = RunConfig(run_config, comm)
1988+
run_config = RunConfig(**run_config, comm=comm, logger=self.logger)
19881989
pipeline_config = []
19891990
for item in sub_pipeline['pipeline']:
19901991
if isinstance(item, dict):
@@ -2067,7 +2068,7 @@ def process(
20672068
run_config = {'inputdir': inputdir, 'outputdir': outputdir,
20682069
'interactive': interactive, 'log_level': log_level}
20692070
run_config.update(sub_pipeline.get('config'))
2070-
run_config = RunConfig(run_config)
2071+
run_config = RunConfig(**run_config, logger=self.logger)
20712072

20722073
# Create the sub-pipeline configuration for each processor
20732074
spec_scans = map_config.spec_scans[0]
@@ -2128,7 +2129,7 @@ def process(
21282129
tmp_names.append(f_name)
21292130
with open(f_name, 'w') as f:
21302131
yaml.dump(
2131-
{'config': run_config.__dict__,
2132+
{'config': run_config.model_dump(),
21322133
'pipeline': pipeline_config[n_proc]},
21332134
f, sort_keys=False)
21342135
sub_comm = MPI.COMM_SELF.Spawn(

CHAP/edd/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ def process(
535535
:type inputdir: str, optional
536536
:param outputdir: Directory to which any output figures will
537537
be saved, defaults to `'.'`.
538-
:type outputdir: str, optional
538+
:type outputdir: str, optional
539539
:param interactive: Allows for user interactions, defaults to
540540
`False`.
541541
:type interactive: bool, optional

CHAP/pipeline.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ class Pipeline():
1919
def __init__(self, items=None, kwds=None):
2020
"""Pipeline class constructor.
2121
22-
:param items: List of objects.
23-
:param kwds: List of method args for individual objects.
22+
:param items: List of objects, optional.
23+
:type items: list
24+
:param kwds: List of method keyword argugents for the objects,
25+
optional.
26+
:type kwds: list
2427
"""
2528
self.__name__ = self.__class__.__name__
2629

@@ -262,7 +265,7 @@ def execute(self, schema=None, **kwargs):
262265
kwargs['filename'] = os.path.normpath(os.path.realpath(
263266
os.path.join(outputdir, kwargs['filename'])))
264267
else:
265-
self.logger.error('No implementation of read, write, or process')
268+
self.logger.error('No implementation of read, process, or write')
266269
return None
267270

268271
method = getattr(self, method_name)
@@ -274,11 +277,12 @@ def execute(self, schema=None, **kwargs):
274277
args[k] = v
275278

276279
t0 = time()
277-
self.logger.debug(f'Executing "{method_name}" with {args}')
280+
self.logger.debug(
281+
f'Executing "{method_name}" with schema "{schema}" and {args}')
278282
self.logger.info(f'Executing "{method_name}"')
279283
data = method(**args)
280-
self.logger.info(f'Finished "{method_name}" in '
281-
+ f'{time()-t0:.0f} seconds\n')
284+
self.logger.info(
285+
f'Finished "{method_name}" in {time()-t0:.0f} seconds\n')
282286

283287
return [PipelineData(name=self.__name__,
284288
data=data,

0 commit comments

Comments
 (0)