Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions open_instruct/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ def main(args: FlatArguments, tc: TokenizerConfig):
os.path.join(get_last_checkpoint_path(args, incomplete=True), "COMPLETED"), "w"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with gemini-bot here, we should probably put this completed write under the main process too.

) as f:
f.write("COMPLETED") # annoyingly, empty files arent uploaded by beaker.
if accelerator.is_local_main_process: # TODO: in mason local model this is gonna error out if using something like output/test; because mason used the same shared file ssytem.
if accelerator.is_main_process: # TODO: in mason local model this is gonna error out if using something like output/test; because mason used the same shared file ssytem.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

While this change to is_main_process is correct for handling checkpoint cleanup in a multi-node environment, a related race condition still exists. The preceding lines (955-958) that write the COMPLETED file are executed by all processes, not just the main one. This can cause file system contention and is inefficient.

To fully resolve the race conditions during checkpointing, this file-writing logic should also be restricted to the main process. Additionally, the TODO comment is now obsolete with this fix and can be removed.

I recommend the following structure:

                        if accelerator.is_main_process:
                            # use this to mark the checkpoint as completely saved, to avoid restoring from garbled checkpoints
                            with open(
                                os.path.join(get_last_checkpoint_path(args, incomplete=True), "COMPLETED"), "w"
                            ) as f:
                                f.write("COMPLETED")  # annoyingly, empty files arent uploaded by beaker.
                            clean_last_n_checkpoints(args.output_dir, args.keep_last_n_checkpoints)

This ensures both operations are performed atomically by a single process.

clean_last_n_checkpoints(args.output_dir, args.keep_last_n_checkpoints)
accelerator.wait_for_everyone()

Expand All @@ -971,7 +971,7 @@ def main(args: FlatArguments, tc: TokenizerConfig):
# use this to mark the checkpoint as completely saved, to avoid restoring from garbled checkpoints
with open(os.path.join(get_last_checkpoint_path(args, incomplete=True), "COMPLETED"), "w") as f:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with gemini-bot here, we should probably put this completed write under the main process too.

f.write("COMPLETED") # annoyingly, empty files arent uploaded by beaker.
if accelerator.is_local_main_process:
if accelerator.is_main_process:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Similar to my other comment, there's a race condition here. The code that writes the COMPLETED file (lines 972-973) is executed by all processes, which should be avoided.

To prevent potential file system issues, this logic should be moved inside the if accelerator.is_main_process: block.

Here is the recommended change:

            if accelerator.is_main_process:
                # use this to mark the checkpoint as completely saved, to avoid restoring from garbled checkpoints
                with open(os.path.join(get_last_checkpoint_path(args, incomplete=True), "COMPLETED"), "w") as f:
                    f.write("COMPLETED")  # annoyingly, empty files arent uploaded by beaker.
                clean_last_n_checkpoints(args.output_dir, args.keep_last_n_checkpoints)

clean_last_n_checkpoints(args.output_dir, args.keep_last_n_checkpoints)
accelerator.wait_for_everyone()

Expand All @@ -981,7 +981,7 @@ def main(args: FlatArguments, tc: TokenizerConfig):
)

# remove all checkpoints to save space
if args.clean_checkpoints_at_end and accelerator.is_local_main_process:
if args.clean_checkpoints_at_end and accelerator.is_main_process:
clean_last_n_checkpoints(args.output_dir, keep_last_n_checkpoints=0)

if (
Expand Down