Skip to content

Commit c4aa272

Browse files
authored
Merge pull request #178 from rolfverberg/mpi
Mpi
2 parents fb67df3 + cf69833 commit c4aa272

File tree

4 files changed

+67
-40
lines changed

4 files changed

+67
-40
lines changed

CHAP/common/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
NXdataToDataPointsProcessor,
3434
XarrayToNexusProcessor,
3535
XarrayToNumpyProcessor,
36-
SumProcessor,
36+
# SumProcessor,
3737
)
3838
from CHAP.common.reader import (
3939
BinaryFileReader,

CHAP/common/processor.py

Lines changed: 62 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1315,6 +1315,7 @@ def process(
13151315
except:
13161316
self.logger.warning('Unable to load mpi4py, running serially')
13171317
num_proc = 1
1318+
self.logger.debug(f'Number of processors: {num_proc}')
13181319

13191320
# Create the sub-pipeline configuration for each processor
13201321
# FIX: catered to EDD with one spec scan
@@ -1343,7 +1344,7 @@ def process(
13431344
num = scans_per_proc
13441345
if n_proc < num_scan - scans_per_proc*num_proc:
13451346
num += 1
1346-
config = map_config.model_dump_json()
1347+
config = map_config.model_dump()
13471348
config['spec_scans'][0]['scan_numbers'] = \
13481349
scan_numbers[n_scan:n_scan+num]
13491350
pipeline_config.append(
@@ -1427,12 +1428,10 @@ def process(
14271428
os.remove(tmp_name)
14281429

14291430
# Construct the NeXus NXroot object
1430-
nxroot = self._get_nxroot(
1431+
return self._get_nxroot(
14311432
map_config, detector_config, data, independent_dimensions,
14321433
all_scalar_data, placeholder_data)
14331434

1434-
return nxroot
1435-
14361435
def _get_nxroot(
14371436
self, map_config, detector_config, data, independent_dimensions,
14381437
all_scalar_data, placeholder_data):
@@ -1913,20 +1912,32 @@ def process(self, data, comm, root_as_worker=True):
19131912
data = [comm.recv(source=n_worker)]
19141913
else:
19151914
data.append(comm.recv(source=n_worker))
1915+
#FIX RV TODO Merge the list of data items in some generic fashion
19161916
return data
19171917

19181918

19191919
class MPIMapProcessor(Processor):
19201920
"""A Processor that applies a parallel generic sub-pipeline to
19211921
a map configuration.
19221922
"""
1923-
def process(self, data, sub_pipeline=None):
1923+
def process(self, data, sub_pipeline=None, inputdir='.', outputdir='.',
1924+
interactive=False, log_level='INFO'):
19241925
"""Run a parallel generic sub-pipeline.
19251926
19261927
:param data: Input data.
19271928
:type data: list[PipelineData]
19281929
:param sub_pipeline: The sub-pipeline.
19291930
:type sub_pipeline: Pipeline, optional
1931+
:param inputdir: Input directory, used only if files in the
1932+
input configuration are not absolute paths,
1933+
defaults to `'.'`.
1934+
:type inputdir: str, optional
1935+
:param outputdir: Directory to which any output figures will
1936+
be saved, defaults to `'.'`.
1937+
:type outputdir: str, optional
1938+
:param interactive: Allows for user interactions, defaults to
1939+
`False`.
1940+
:type interactive: bool, optional
19301941
:return: The `data` field of the first item in the returned
19311942
list of sub-pipeline items.
19321943
"""
@@ -1946,7 +1957,7 @@ def process(self, data, sub_pipeline=None):
19461957

19471958
# Get the map configuration from data
19481959
map_config = self.get_config(
1949-
data, 'common.models.map.MapConfig')
1960+
data, 'common.models.map.MapConfig', inputdir=inputdir)
19501961

19511962
# Create the spec reader configuration for each processor
19521963
spec_scans = map_config.spec_scans[0]
@@ -1970,7 +1981,10 @@ def process(self, data, sub_pipeline=None):
19701981
# Get the run configuration to use for the sub-pipeline
19711982
if sub_pipeline is None:
19721983
sub_pipeline = {}
1973-
run_config = RunConfig(sub_pipeline.get('config'), comm)
1984+
run_config = {'inputdir': inputdir, 'outputdir': outputdir,
1985+
'interactive': interactive, 'log_level': log_level}
1986+
run_config.update(sub_pipeline.get('config'))
1987+
run_config = RunConfig(run_config, comm)
19741988
pipeline_config = []
19751989
for item in sub_pipeline['pipeline']:
19761990
if isinstance(item, dict):
@@ -1987,8 +2001,8 @@ def process(self, data, sub_pipeline=None):
19872001
# Run the sub-pipeline on each processor
19882002
return run(
19892003
pipeline_config, inputdir=run_config.inputdir,
1990-
outputdir=run_config.outputdir,
1991-
interactive=run_config.interactive, comm=comm)
2004+
outputdir=run_config.outputdir, interactive=run_config.interactive,
2005+
logger=self.logger, comm=comm)
19922006

19932007

19942008
class MPISpawnMapProcessor(Processor):
@@ -1997,7 +2011,8 @@ class MPISpawnMapProcessor(Processor):
19972011
"""
19982012
def process(
19992013
self, data, num_proc=1, root_as_worker=True, collect_on_root=True,
2000-
sub_pipeline=None):
2014+
sub_pipeline=None, inputdir='.', outputdir='.', interactive=False,
2015+
log_level='INFO'):
20012016
"""Spawn workers running a parallel generic sub-pipeline.
20022017
20032018
:param data: Input data.
@@ -2012,6 +2027,16 @@ def process(
20122027
:type collect_on_root: bool, optional
20132028
:param sub_pipeline: The sub-pipeline.
20142029
:type sub_pipeline: Pipeline, optional
2030+
:param inputdir: Input directory, used only if files in the
2031+
input configuration are not absolute paths,
2032+
defaults to `'.'`.
2033+
:type inputdir: str, optional
2034+
:param outputdir: Directory to which any output figures will
2035+
be saved, defaults to `'.'`.
2036+
:type outputdir: str, optional
2037+
:param interactive: Allows for user interactions, defaults to
2038+
`False`.
2039+
:type interactive: bool, optional
20152040
:return: The `data` field of the first item in the returned
20162041
list of sub-pipeline items.
20172042
"""
@@ -2034,12 +2059,15 @@ def process(
20342059

20352060
# Get the map configuration from data
20362061
map_config = self.get_config(
2037-
data, 'common.models.map.MapConfig')
2062+
data, 'common.models.map.MapConfig', inputdir=inputdir)
20382063

20392064
# Get the run configuration to use for the sub-pipeline
20402065
if sub_pipeline is None:
20412066
sub_pipeline = {}
2042-
run_config = RunConfig(config=sub_pipeline.get('config'))
2067+
run_config = {'inputdir': inputdir, 'outputdir': outputdir,
2068+
'interactive': interactive, 'log_level': log_level}
2069+
run_config.update(sub_pipeline.get('config'))
2070+
run_config = RunConfig(run_config)
20432071

20442072
# Create the sub-pipeline configuration for each processor
20452073
spec_scans = map_config.spec_scans[0]
@@ -3355,28 +3383,28 @@ def process(self, data):
33553383
return self.unwrap_pipelinedata(data)[-1].data
33563384

33573385

3358-
class SumProcessor(Processor):
3359-
"""A Processor to sum the data in a NeXus NXobject, given a set of
3360-
nxpaths.
3361-
"""
3362-
def process(self, data):
3363-
"""Return the summed data array
3364-
3365-
:param data:
3366-
:type data:
3367-
:return: The summed data.
3368-
:rtype: numpy.ndarray
3369-
"""
3370-
nxentry, nxpaths = self.unwrap_pipelinedata(data)[-1]
3371-
if len(nxpaths) == 1:
3372-
return nxentry[nxpaths[0]]
3373-
sum_data = deepcopy(nxentry[nxpaths[0]])
3374-
for nxpath in nxpaths[1:]:
3375-
nxdata = nxentry[nxpath]
3376-
for entry in nxdata.entries:
3377-
sum_data[entry] += nxdata[entry]
3378-
3379-
return sum_data
3386+
#class SumProcessor(Processor):
3387+
# """A Processor to sum the data in a NeXus NXobject, given a set of
3388+
# nxpaths.
3389+
# """
3390+
# def process(self, data):
3391+
# """Return the summed data array
3392+
#
3393+
# :param data:
3394+
# :type data:
3395+
# :return: The summed data.
3396+
# :rtype: numpy.ndarray
3397+
# """
3398+
# nxentry, nxpaths = self.unwrap_pipelinedata(data)[-1]
3399+
# if len(nxpaths) == 1:
3400+
# return nxentry[nxpaths[0]]
3401+
# sum_data = deepcopy(nxentry[nxpaths[0]])
3402+
# for nxpath in nxpaths[1:]:
3403+
# nxdata = nxentry[nxpath]
3404+
# for entry in nxdata.entries:
3405+
# sum_data[entry] += nxdata[entry]
3406+
#
3407+
# return sum_data
33803408

33813409

33823410
if __name__ == '__main__':

CHAP/processor.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ def main(opt_parser=OptionParser):
7777
processor.logger.addHandler(log_handler)
7878
data = processor.process(opts.data)
7979

80-
print(f'Processor {processor} operates on data {data}')
81-
8280

8381
if __name__ == '__main__':
8482
main()

CHAP/runner.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,11 @@ def main():
120120
if run_config.spawn > 0:
121121
with open(f'{configfile}_{common_comm.Get_rank()}') as file:
122122
config = safe_load(file)
123-
run_config = RunConfig(config.get('config'), common_comm)
123+
run_config = RunConfig(config.pop('config'), common_comm)
124124
else:
125125
with open(f'{configfile}_{sub_comm.Get_rank()}') as file:
126126
config = safe_load(file)
127-
run_config = RunConfig(config.get('config'), comm)
127+
run_config = RunConfig(config.pop('config'), comm)
128128
else:
129129
common_comm = comm
130130

@@ -174,7 +174,7 @@ def runner(run_config, pipeline_config, comm=None):
174174
# System modules
175175
from time import time
176176

177-
# logging setup
177+
# Logging setup
178178
logger, log_handler = set_logger(run_config.log_level)
179179
logger.info(f'Input pipeline configuration: {pipeline_config}\n')
180180

@@ -184,6 +184,7 @@ def runner(run_config, pipeline_config, comm=None):
184184
run_config.inputdir, run_config.outputdir, run_config.interactive,
185185
logger, run_config.log_level, log_handler, comm)
186186
logger.info(f'Executed "run" in {time()-t0:.3f} seconds')
187+
187188
return data
188189

189190
def set_logger(log_level='INFO'):

0 commit comments

Comments
 (0)