-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for parallel syncing #85
base: main
Are you sure you want to change the base?
Conversation
[pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Fix pre-commit issues Mypy issues
time.sleep(max(0.0, (time.time() - start_time) - self.wait)) | ||
if "PYTEST_CURRENT_TEST" in os.environ: | ||
break | ||
except Empty: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be for the case that we haven't encountered any new synchronization requests. So this should be after target_queue.get
and simply be ignored (with a small timeout), right? Because it's nothing bad.
The "try again later" code below is if self.sync(target)
times out
@@ -35,6 +38,25 @@ def __init__( | |||
self.wait = wait | |||
self.wandb_options = wandb_options | |||
self._timeout = timeout | |||
self.num_workers = num_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's mark these three attributes private and also the new methods
self.target_queue: Queue = Queue() | ||
self.workers: list[Process] = [] | ||
|
||
def start(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though the new name makes more sense, this would break backward compatible, so probably best to keep this as "loop".
TriggerWandbSyncHook(self.command_dir)(target) | ||
time.sleep(0.25) | ||
for cf in command_files: | ||
self.target_queue.put((command_file, target)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous version, we were making sure that we don't have the same synchronization target more than once in order to avoid one process outcrowding the others (as you reported in #83 , that wasn't enough though). I think we should still do this, i.e., we check if the target
is already in the queue (if it is, we simply remove the command_file
right here and don't add anything to the queue).
Thanks a lot for the implementation, I really appreciate it ❤️ I can also help to fix the remaining issues (if you give me this permission) Edit: I saw that I have the permission to do that |
Closes #21
Closes #83