-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store files as output #585
base: main
Are you sure you want to change the base?
Conversation
This looks great so far @gpetretto, would be happy to help test/implement this. The only missing piece that I can see (and why many people want to be able to define files as workflow outputs) would also be the ability to use them as workflow inputs in some standardized/transparent way (i.e., calling the script you provide for retrieval during the job set up step). Do you think that could be included here (or in a future PR)? |
If the option to use the files as input for the workflows is needed, I believe it would be better to add it directly here. Otherwise the risk is that it will not be compatible with the choices made. Do you have some ideas of how you expect this to work? As hinted in the initial message, one can imagine defining an @job
def add(a, b):
with open("output_file", "wt") as f:
f.write(f"The result is {a + b}")
return Response(a + b, output_files={"data_files_gfs": "output_file"})
@job
def read_file():
with open("output_file", "rt") as f:
content = f.read()
return content
add_job = add(1, 5)
read_job = read_file(add_job.output_files["name"]["output_file"]) In this case I already see some potential shortcomings with this design. For example you will not be able to decide precisely where to copy the file (name, path). Also, how do you refer the file? Here I tried to use a dictionary-like access where I specified that the file should be selected by The reason why I am sticking to a reference-like access is that it would be needed to properly support the feature in jobflow-remote. There we cannot rely on the |
Agreed with @ml-evs, this looks fantastic and a very nice implementation. I was also wondering about re-using files. Other workflow managers such as covalent have strategies to do this E.g., see here. Although not sure if any of this is relevant here since it looks like you need to know the file paths in advance. I have a few ideas of how this could be implemented:
@job(wavecar_file=FileDestination("WAVECAR.previous"))
def my_func(arg1, wavecar_file):
pass
my_func("abc", wavecar_file=my_func.output_files.wavecar) One limitation is that this wouldn't support dynamic numbers of files.
@job
def my_func(arg1):
pass
my_func(arg1, files=[my_func.output_files.wavecar]) The files argument could potentially also accept a dictionary/some specialised object that can specify the output destination. my_func(arg1, files=[{"src": my_func.output_files.wavecar, "dst": "WAVECAR_previous"}]) The downside to this approach is that now the caller has specify the expected output destination, rather than this being set by the function itself. |
Thanks @gpetretto for this (well we already discussed and I already thanked you but not officially ;) ) Regarding re-using files, indeed if there is a need for it, it should be added. Now, to be honest, I am not completely sure this is the most important thing. Consider two use cases: 2/ Let's suppose you make a MLFF force field with Vasp and then you want to use it for 10000 other calculations. Then this file will need to be copied 10000 times to the cluster (so 10000 transfers and 10000 copies on the cluster). So I understand the idea that it is nice, but I just want to make sure there is a real use case where we will actually not be constrained in the future by network transfers or storage. Imagine you have an MD trajectory of 20 Gb. And you want to reuse that trajectory for 10 new calculations (for some reason, ... no idea why someone would do that but anyway), then it will be copied once to the e.g. Azure FileStore and then copied back 10 times (200 Gb transfer, 200Gb storage on cluster) for these 10 calculations. (also, consider when using jobflow-remote in the split mode, the files are then transfered 2 times each time: from cluster to runner, from runner to azure cloud when storing, and from azure cloud to runner then runner to cluster when reusing the file) So maybe the question is: What do we want to tackle here exactly with this possibility to transfer files ? And I think we should first answer this question. |
I think this emphasizes exactly why I would want this to be handled at the jobflow level, so that jobflow-remote and other managers have a substrate on which they can something more sensible (e.g., copy the file once to each cluster that needs it and make sure hashes match on future runs), otherwise users and wf developers will invent many other ways of using them. |
I fully agree this must be at the jobflow level indeed, but even though, the same applies. Consider a run_locally where you have defined the store (with a docs store, additional stores AND file stores), the exact same applies as above. The first job will copy files to a remote place, and then the second job will copy it back locally. Currently, this is done by just passing the previous directory (with, as mentioned above, the constraint that it may not run on a different worker unless the FileClient is properly set up/working ?). This possibility also opens up the question for the developer as to what he should use. I agree it is probably more convenient to have this file transfer somehow directly integrated (as in some other wf managers such as covalent), but is it the most efficient in terms of network. For small files, it probably does not make a huge problem (unless there are many of them, or many jobs copying the same file), but for larger files, this may slow down things quite a bit (and even pose problems, e.g. with a "simple" run_locally if the remote e.g. azure file store is not accessible, what happens ? probably the only thing it can do is just fail with an error). Now understand me well, I'm not at all against the idea for the possibility to reuse files and it can (and probably should) be implemented in addition to the current implementation (whose aim is "just" to store/archive files somehow)! I'm just raising a point as to when this should be used for reusing files. And I have not yet found a use case in which I'm convinced it would be better than the current approach (with a previous directory). |
About having the copy once, how would you see that ? Is that the idea of the "context" you were at some point mentioning ? |
Yeah, I agree with the file transfer problem. As someone who copies WAVECARs in workflows, I would like to ask for an option to just pass a directory path. 😅 |
@JaGeo @davidwaroquiers I agree that this shouldn't replace the However, I think it is nice to have an additional option in jobflow in the case that this isn't possible. One immediate application is when jobs will be run on different compute resources and so copying from a folder won't work. E.g., let's say you have access to a GPU cluster for training ML models, and a different CPU cluster for generating training data. It seems like this approach would facilitate file transfers between the two. |
Definitely! But should also be easy to configure and change for the user as it will really depend on the cluster setup. |
I like @utf's proposals. An additional downside of the second one is that @job(multiple_files=FolderDestination("."))
def my_func(arg1, multiple_files):
pass
my_func("abc", multiple_files=[my_func.output_files.wavecar, my_func.output_files.chgcar]) This does not give full freedom on naming the files, but there should be a limit at some point to what can be done with a simple decorator. Note that we will probably need to resort to a notation like the one I uses in the initial example
I see the point. Indeed this should be something that could be handled by jobflow-remote. However, I have two major concerns:
I suppose you are not aiming at storing the WAVECAR in an output In all of these cases it is important to note that there is the constraint that the |
Very nice contribution @gpetretto ! The only point I would like to better understand is how the
I was the original author of the existing |
Hi @rkingsbury, thanks for your comments and mentioning the maggma An open point is related to the fact that right now the reference to retrieve the files stored is only available in the main collection of the Python codeclass FileStore(Store, metaclass=ABCMeta):
"""Abstract class for a file store."""
def __init__(self, index: Store):
self.index = index
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
return self.index.query(
criteria=criteria, properties=properties, sort=sort, skip=skip, limit=limit
)
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
return self.index(docs=docs, key=key)
@abstractmethod
def _put(self, src: str | io.IOBase, dest: str) -> str:
pass
def put(self, src: str | io.IOBase, dest: str, metadata: dict) -> str:
reference = self._put(src, dest)
metadata = dict(metadata)
metadata["reference"] = reference
self.update(metadata, key=reference)
return reference
@abstractmethod
def _get(self, reference: str, dest: str | io.IOBase):
pass
def get(self, criteria: str, dest: str | io.IOBase):
doc = self.query_one(criteria)
if not doc:
raise ValueError
return self._get(reference=doc["reference"], dest=dest) Even though I am not sure if this is really advantagous. |
Thanks @rkingsbury and @gpetretto for the discussion on Maggma stores. Maybe the naming could be different to make it clear. Somehow the file store here is indeed not a Maggma Store. We could call these FileStorage instead or FileBucket or FileArchive or something like that. Would that clarify things better ? |
To keep things going I have tried to add an implementation of the @job
def add(a, b):
with open("output_file", "wt") as f:
f.write(f"The result is {a + b}")
# return Response(a + b, output_files={"data_files_gfs": "output_file"})
return Response(a + b, output_files={"data_files": "output_file"})
@job
def read_file(file_path):
with open(file_path, "rt") as f:
content = f.read()
return content
add_job = add(1, 5)
read_job = read_file(add_job.output_files.name["output_file"])
flow = Flow([add_job, read_job])
print(run_locally(flow, create_folders=True)) Or modifying the target path with @job(file_path=FileDestination("test_dir/test_out_file"))
def read_file(file_path):
with open(file_path, "rt") as f:
content = f.read()
return content @ml-evs, would this fit your use case? At some point I thought of taking advantage of the @job(file_path=FileDestination("test_dir/test_out_file"))
def read_file(file_path="/path/to/some/file): or maybe even from different sources @job(file_path=FileDestination("test_dir/test_out_file"))
def read_file(file_path=FileSource(path="/path/to/some/file, host="some.host"): but I was afraid that things would get out of hand with too many options to deal with (multiple files, link vs copy, ...). So maybe this could be added at a later time if it seems interesting, as it is not strictly related to the addition of output files. |
Hi David, sorry for the delay (very busy couple of weeks here at the end of the semester!) but thank you for your thoughtful and thorough reply. I still want to think more and give a longer reply later, but for quick feedback - I really like your suggestion here. If the |
This is a good point, but let me pose a question. Do you think it would be possible to modify the existing I took a baby step in this direction in the existing Just food for thought. If you think there is potential in having a subclass of |
The proposal of allowing to store files directly as output of jobflow has been raised several times. For example in this discussion: materialsproject/atomate2#515.
The idea is that, while it would be in principle possible to store a file using a maggma Store (e.g. a GridFSStore or an S3Store), this requires respecting the Store API. This would definitely have limitations, for example the need to load in memory the whole file.
This is an initial draft where jobs are allowed to store files directly in a new kind of store. In any case there are definitely several points that should be considered carefully and room for improvement. I consider this the basis to start a discussion.
Based on previous interactions and discussions on the topic I suppose that @utf, @rkingsbury, @ml-evs, @computron may be interested.
Changes
The current implementation introduces the following changes
FileStore
, different from the one in maggma. This exposes methods to directly interact with the files (put
,get
). A couple actual stores are implemented, one based on the file system and one on gridfs. Those are here for testing purposes, but if the concept is acceptable these can be moved to maggma (or some other package?).files_stores
mimicking theadditional_stores
to define the stores where the files wiil be saved.Response
has an additional attibuteoutput_files
. This is a dictionary where the key is the name of the store where the files should be saved. The value is a string or a list of strings with relative paths to the files that should be uploaded. This could be extended to be a dictionary to handle more cases (e.g. store all the produced files, or decide what to do if a file is missing).JobStoreDocument
has an additional attributefiles
, that contain a list ofFileData
. This contains the name of the store and the reference to retrieve the file, along with the name and path of the file.Job.run
method, after the Job is completed, checks if the Response contains theoutput_files
and stores the files before storing theJobStoreDocument
. Thefiles
attribute of theJobStoreDocument
is filled in using the reference to the stored files and this is then added to the JobStore.All these changes should be backward-compatible with existing jobflow workflows.
Example
Here is a simple example to demonstrate the concept. The configuration, including the new JobStore:
A Job that creates some files and stores them in the output:
A script to retrieve the file after completion:
Discussion
Considering this as a good starting point, I think there are several points that may be worth discussing. I will list some here.
FileStore
. Is it enough a minimal API like the one drafted here? Or should it be more involved?@job
decorator, here I am proposing to useResponse
to decide which files to save. This is to avoid clashes with thekwargs
arguments of theadditional_stores
and to allow a finer control for the Job about which files to store, but other solutions could be found to define elsewhere the files to be stored.JobStoreDocument
the best place for the file information? Or should it be in theoutput
? (I think that adding it to theoutput
part will complicate how to decide which files should actually be copied, but maybe someone has a good idea of how to handle that).additional_stores
are not cleaned if the main JobStoreDocument is updated. In this context the same would be for the files stored. Deciding if and how to update the files in the store may be tricky.JobStoreDocument
(here in theFileData
). Are there more information that should be stored?job.output_files
as a reference and specify that you want the file to be copied before the job starts. However, it is not clear to me what would be the typical use case for this functionality and how to control where the file should be copied exactly (e.g. filename, subfolders).