Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstash is always non-blocking #290

Closed
forsyth2 opened this issue Aug 2, 2023 · 16 comments · Fixed by #355 · May be fixed by #354
Closed

zstash is always non-blocking #290

forsyth2 opened this issue Aug 2, 2023 · 16 comments · Fixed by #355 · May be fixed by #354
Labels
semver: bug Bug fix (will increment patch version)

Comments

@forsyth2
Copy link
Collaborator

forsyth2 commented Aug 2, 2023

zstash is always non-blocking. Reported by @golaz:

Running zstash create -v --hpss=globus://nersc/home/g/golaz/2023/E3SM/fme/${EXP} --maxsize 128 . results in non-blocking archiving. This is unexpected since the --non-blocking option isn't included. That option was introduced in #214, which was before the last release (zstash v1.3.0).

@forsyth2 forsyth2 added the semver: bug Bug fix (will increment patch version) label Aug 2, 2023
@forsyth2
Copy link
Collaborator Author

forsyth2 commented Aug 4, 2023

Debugging

Possible definitions of "non-blocking":

  1. Zstash does not wait until the transfer completes to start creating a subsequent tarball (definition from Improve Globus transfer concurrency #171)
  2. Zstash submits as many zstash tarballs as have been created at the moment to be transferred (desired behavior from implementing Improve Globus transfer concurrency #171. Also, it seems to me that point 1 is necessary but not sufficient for this point to be true. I.e., point 1 has to be true for point 2 to be true.)
$ git grep -n "submit_transfer"
globus.py:199:        task = transfer_client.submit_transfer(transfer_data)
globus.py:273:            last_task = transfer_client.submit_transfer(transfer_data)

So, there are two places in the code where Globus transfers are submitted.

The second instance:

  • Appears in globus_finalize.
  • Occurs if the global variable transfer_data evaluates as True.
  • If non_blocking evaluates as False (the default case), then globus_wait runs until the transfer finishes. That is, we don't proceed in the code until the transfer finishes, meaning the transfer is blocking, as expected.

The first instance:

  • Appears in globus_transfer.
  • It seems like this submit_transfer just always happens, since it's not in a conditional block.
  • However, it is run after checking the status of the last transfer, which indicates to me it would always be dependent on the previous transfer completing. That is, it would always be blocking, which is the opposite of this issue. So, I must be missing something here.
$ git grep -n "non_blocking"
create.py:93:    globus_finalize(non_blocking=args.non_blocking)
create.py:169:    if args.non_blocking:
globus.py:264:def globus_finalize(non_blocking: bool = False):
globus.py:289:    if not non_blocking:
update.py:48:    globus_finalize(non_blocking=args.non_blocking)

@forsyth2
Copy link
Collaborator Author

forsyth2 commented Aug 4, 2023

@lukaszlacinski Do you have any insights on this? Thank you

@forsyth2
Copy link
Collaborator Author

forsyth2 commented Aug 4, 2023

It may make sense to address this for the next release (rather than the one currently in-progress).

From @golaz: That would give us an opportunity to rethink the --keep / --non-blocking functionality in conjunction with the three destinations (disk, HPSS, Globus). --keep is the default for some destinations but not others, and similarly for --non-blocking.

@golaz
Copy link
Collaborator

golaz commented Aug 4, 2023

I wonder whether we even need to have blocking = non-clocking option for Globus. In the past, the main motivation was for instances where a user would be very close to their disk quota and tar files could be deleted after their transfer.

The best option might be a combination of non-blocking Globus transfers with the option of purging tar files after their successful transfer.

@forsyth2
Copy link
Collaborator Author

forsyth2 commented Aug 2, 2024

@TonyB9000 Do you have any input on this topic? Does "The best option might be a combination of non-blocking Globus transfers with the option of purging tar files after their successful transfer." in the comment above sound like your use case for publication?

@TonyB9000
Copy link
Collaborator

@forsyth2 I really need to better understand the "blocking vs non-blocking" behaviors (never having exercised either of them).

By my understanding, in BOTH cases, the user resides on a "source_system" with "loose" files (not zstash-format), and wants to end up with zstash-archived files on a remote (HPSS-storage) system. The documentation is honestly unclear.

I can envision 3 modes of operation:

Mode 1: Create entire finished archive locally (big local volume), then Globus transfer begins.
Mode 2: Begin creating the local archive, irrespective of Globus status, with Globus transfer invoked as each file is completed.
Mode 3: Begin creating the local archive, but wait for each tar file to be Globus transferred before creating the next tar file.

Modes 1 and 2 can result in large local footprint, if Globus is slow or hangs.

I'm guessing that mode 2 is the "non-blocking", meaning that zstash tar-file creation does not block waiting for completed Globus transfers. I don't really see the value of mode 2. There may be fewer "Globus transfer jobs", but the files are generally huge so the overhead of multiple transfers seems minimal. Is the idea to save local CPU usage (all tar-files completed early, Globus catches up eventually)?

And if --keep is invoked, mode 3 is meaningless (unless Globus balks when it has to wait for a file to transfer). Just how zstash is invoking Globus is significant.

A hybrid mode might support "block at 1 TB", where zstash tarfiles are produced up to a volume of 1 TB, and then blocked until those tar-files are transferred.

Question: Globus transfer incurs its own "slowness", but then so does HPSS tape writing. Is the HPSS delay treated as part of the Globus delay? Or does Globus tell HPSS "store this" and return immediately to accept a new file transfer?

@forsyth2 forsyth2 mentioned this issue Oct 31, 2024
14 tasks
@forsyth2
Copy link
Collaborator Author

forsyth2 commented Dec 2, 2024

@TonyB9000 here's a more complete explanation of the expected behavior. @golaz (or @chengzhuzhang) can you please confirm I have this right, in particular the pseudo-code below?

--non-blocking occurs twice on the usage docs:

For zstash create:

Zstash will submit a Globus transfer and immediately create a subsequent tarball. That is, Zstash will not wait until the transfer completes to start creating a subsequent tarball. On machines where it takes more time to create a tarball than transfer it, each Globus transfer will have one file. On machines where it takes less time to create a tarball than transfer it, the first transfer will have one file, but the number of tarballs in subsequent transfers will grow finding dynamically the most optimal number of tarballs per transfer. NOTE: zstash is currently always non-blocking.

zstash update shares identical text.

And the default should be that --non-blocking is turned off. That is, we should block (wait) unless that flag is set.

Thus, the pseudo-code should be something like this:

current_tarball = new tarball
tarballs_ready_for_transfer = new list
most_recent_globus_transfer = None
for file in directory_to_archive:
    # Take care of getting the file archived into the tarball
    if current_tarball.size() >= MAXSIZE: # As determined by --maxsize parameter
        tarballs_ready_for_transfer.append(current_tarball)

        # Take care of getting the Globus transfers going
        if not non_blocking:
            # This is the default case. The --non-blocking flag has NOT been set.
            # The current issue is that zstash is always non-blocking =>
            # That means we're ALWAYS skipping this if-block.
            # It's as if the --non-blocking flag is always set.
            while (most_recent_globus_transfer is not None) and (most_recent_globus_transfer.is_active()):
                block # Wait for the globus transfer to finish
        if optimization_criterion: # Not really sure what this criterion is
            # There's a good number of tarballs to transfer now; let's transfer them.
            most_recent_globus_transfer = globus.send_to_destination(tarballs_ready_for_transfer)
            # We just started a transfer for all the finished tarballs, so let's start a new list
            tarballs_ready_for_transfer = new list

        current_tarball = new tarball
    current_tarball.compress_and_add(file)

@forsyth2
Copy link
Collaborator Author

forsyth2 commented Dec 2, 2024

As far as I can tell, the below code snippets highlights the current actual code. We just need to figure out how it diverges from the pseudo-code in my previous comment.

Note on notation below:

f(x)
    g(x) # f calls g
    if A: >>>
        h(x) # h runs if A is true

So, the simplified code is:

def create():
    hpss_put(hpss, get_db_filename(cache), cache, keep=True)
        hpss_transfer(hpss, file_path, "put", cache, keep)
            globus_transfer(endpoint, url_path, name, transfer_type)
                transfer_data.add_item(src_path, dst_path)
                if task_id: >>> 
                    task = transfer_client.get_task(task_id)
                    if task["status"] == "ACTIVE" >>> 
                        return!
                task = submit_transfer_with_checks(transfer_data)
                    task = transfer_client.submit_transfer(transfer_data)
                task_id = task.get("task_id")
                if transfer_type == "get" and task_id: >>> # We do NOT enter this block on create, because we are putting, not getting!
                    globus_wait(task_id)
    globus_finalize(non_blocking=args.non_blocking)
        if transfer_data: >>> 
            last_task = submit_transfer_with_checks(transfer_data)
            last_task_id = last_task.get("task_id")
        if not non_blocking: >>>
                    if task_id: >>>
                        globus_wait(task_id)
                    if last_task_id: >>>
                        globus_wait(last_task_id)

globus.py unfortunately uses global variables, which can make tracking state quite difficult. In particular:

transfer_client: TransferClient = None
transfer_data: TransferData = None
task_id = None

Inside globus_wait:

        while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20):
            pass
        

@TonyB9000
Copy link
Collaborator

@forsyth2 You wrote:
“NOTE: zstash is currently always non-blocking.”

I take that to be equivalent to “non-blocking=True”, effectively. The release version has no “blocking/non-blocking” adjustment.

Then, in the pseudocode, you write:

“if not non_blocking:
# This is the default case. The --non-blocking flag has NOT been set.”

This is equivalent to “non-blocking=False”. Are you now referring to the desired NEW behavior?

In any case, my main question here is … what is the motivation for “blocking”? (non-blocking=False”). Why do we want to hold-up tarfile creation while a globus transfer is running?

From my point of view, the only “Must Be Avoided” behavior would be attempting to transfer a tar-file BEFORE it is completed. This can be accomplished by having Globus transfer the tar-files in a “tarfiles-completed” list.

Is there an issue with launching too many Globus transfers in parallel? If so, do not release the next “tarfiles-completed” list until that last globus transfer returns. But I see no reason to hold up tarfile creation in the meanwhile.

What am I missing?

@forsyth2
Copy link
Collaborator Author

forsyth2 commented Dec 2, 2024

This is equivalent to “non-blocking=False”. Are you now referring to the desired NEW behavior?

Yes, that is what I mean by the comment lines that follow that:

            # The current issue is that zstash is always non-blocking =>
            # That means we're ALWAYS skipping this if-block.
            # It's as if the --non-blocking flag is always set.

what is the motivation for “blocking”?

I think that's a question for @golaz.

@forsyth2
Copy link
Collaborator Author

forsyth2 commented Dec 2, 2024

if optimization_criterion: # Not really sure what this criterion

In the zstash code, we note a limit of 3 parallel Globus transfers. So once we have 3 running, we keep a running list of tarballs to add to the next allowed transfer.

@TonyB9000
Copy link
Collaborator

@forsyth2 @golaz Yesterday I began some instrumentation of the code with timestamped messaging.

I want to capture here my understanding that "blocking" intends to address a potential "low-disk-space" issue, where running ahead with too many tar files produced could exhaust the space. It would (thus) make sense that whenever blocking is in effect, we delete the transferred tar file(s) BEFORE creating new ones, in addition to waiting upon completion of transfers.

In fact, under, the blocking scenario, even allowing (say) 3 parallel Globus transfers is an admission that one can tolerate 3 tarfiles (or, sets of tar files) be accumulated. How can we know this?

What I find peculiar (now) is that in a (potential) low-disk-space scenario, why allow ANY parallel Globus transfers? Doing so already risks disk exhaustion with multiple sets of tar files maintained. Given that you cannot ""split" a datafile across multiple tar-files, why not simply force a single-datafile/tarfile-transfer at any one time? It may be inefficient in terms of transfers, but you are either "siding with disk-saving" or "siding with transfer rate", never with both. I see no reasonable mechanism for any intermediate "trade-offs" here. Either you risk disk-exhaustion, or you don't.

Put differently - if Globus is bothered by exceeding 3 parallel transfers (having nothing to do with disk-space) we should not worry in a low-disk scenario, as we should not be "storing up tar files" in the first place.

It seems to me, you have one of two operating modes:

Low-Disk: (blocking in effect). Here, only a single Globus transfer is ever active, and only on a single tar-file. You cannot have ANY parallel Globus transfers, because you cannot be sure you can have more than a single tar-file queued for transfer.

High-Disk: (non-blocking) You launch the first 3 Globus transfers AS SOON AS any tar-files (initially, ONE) is available. Thereafter, you generate tarfiles with abandon, and anytime the number of parallel transfers drops below 3, you launch a new one with as many tar files as has been produced.

I see no criteria that allows for any "intermediate" position here, although you can still tweak the performance with "Minimum Tar Size" (at the risk that making it too large in a Low-Disk scenario could cause failure.) In fact, under Low-Disk conditions, one could set "Minimum Tar Size = 1k", and effectively assure that each tar file contain only a single datafile (whose transfer must complete before producing another).

If there is a rationale for deviating from the above, I need to understand it.

It would be a different story if a control script driving zstash could detect a zstash "bad exit" due to disk space, know what had been successfully transferred, and restarts zstash for the remaining material with auto-tweaked parameters (switch to blocking, reduce min tar size, etc.) Each subsequent "Fail" would reduce tar minimum until either success ensues, or there really is no disk space for operations.

@forsyth2
Copy link
Collaborator Author

forsyth2 commented Dec 3, 2024

@TonyB9000 Your analysis above makes a lot of sense to me

@TonyB9000
Copy link
Collaborator

TonyB9000 commented Dec 4, 2024

@forsyth2 My read of
https://globus-sdk-python.readthedocs.io/en/1.x-line/clients/transfer.html#globus_sdk.TransferClient.task_wait
explains that if "timeout" is supplied, the wait returns after timeout elapses. If you want to wait indefinitely, you omit this option (or perhaps supply 0, since "1" is the minimum accepted value.)

Hence (by my understanding):

        while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20):
            pass

is equivalent to just

        transfer_client.task_wait(task_id, polling_interval=20):

The loop is only useful if you want to issue a message (or take some other action) every "timeout" seconds. Also, I think it returns a transfer_status (transfer_response) on exit.

I am still working through the logic - hope to conduct some tests tomorrow.

@TonyB9000
Copy link
Collaborator

@forsyth2 @golaz @chengzhuzhang
After poring over the code, I augmented the "simplified code". I was a little shocked to discover that the entire hpss_put/globus_transfer operation is invoked deep in "create_database", under the innocuous-sounding function "add_files". I think it should be called "produce_and_dispatch_tarfiles()".

[create.py]  create()
[create.py]      # calls globus_activate()
[create.py]      # creates cache directory to hold tar-files
[create.py]      create_database()
[hpss_utils.py]      add_files()
[hpss.py]                hpss_put(hpss, get_db_filename(cache), cache, keep=True)
[hpss.py]                    hpss_transfer(hpss, file_path, "put", cache, keep)
[globus.py]                      globus_transfer(endpoint, url_path, name, transfer_type)
[globus.py]                          transfer_data.add_item(src_path, dst_path)
[globus.py]                          if task_id: >>> 
[globus.py]                              task = transfer_client.get_task(task_id)
[globus.py]                              if task["status"] == "ACTIVE" >>> 
[globus.py]                                  return!
[globus.py]                          task = submit_transfer_with_checks(transfer_data)
[globus.py]                              task = transfer_client.submit_transfer(transfer_data)
[globus.py]                          task_id = task.get("task_id")
[create.py]      globus_finalize(non_blocking=args.non_blocking)
[globus.py]          if transfer_data: >>> 
[globus.py]              last_task = submit_transfer_with_checks(transfer_data)
[globus.py]              last_task_id = last_task.get("task_id")
[globus.py]          if not non_blocking: >>>
[globus.py]              if task_id: >>>
[globus.py]                  globus_wait(task_id)
[globus.py]              if last_task_id: >>>
[globus.py]                  globus_wait(last_task_id)

I have continued testing, and cannot create the previous behavior (where tarfile production races ahead of transfers). I may need to test with MUCH SMALLER files (and correspondingly-reduced tar file "maxsize"), so that transfer overhead becomes large w.r.t. archive production.

I intend to experiment with globus "task_wait", immediately following "submit_transfer_with_checks()", UNLESS non-blocking is specified, and see how the logfile output sequence changes.

@TonyB9000
Copy link
Collaborator

@forsyth2 @golaz @chengzhuzhang Success of sorts on blocking/non-blocking:

By selecting much smaller files to transfer, I was finally able to get distinct behavior between BLOCKING and NON-BLOCKING.

With BLOCKING, as we wanted, we wait until the globus transfer completes[*] BEFORE we return to create another tar file. There is ONE transfer per tar file. (I used 23 Kbyte files, and set MAXSIZE to 25 Kbyte).

When each “task_wait” expires, the next tar-file submit discovers the completion and reports “status = SUCCEEDED”, before dispatching the next transfer (obtaining a new task_id).

NOTE: There is (yet) no provision to delete the successfully-transferred file before creating the next tar-file, which is the ostensible reason for blocking…)

[*] Actually, tarfile #2 MUST be created BEFORE we can test for successful transfer, because the code is structured to test for status=SUCCESS only upon submission for the next tar-file.)

With NON-BLOCKING, the behavior is not quite what I’d expected.

• We create the first tar file (000000), and hpss_put/globus_submit that file for transfer and return.
• Thereafter, for each tar-file we create, we attempt to submit to globus, but that begins (in “globus.py”) with checking the status of last submit task. It ALWAYS reports “STILL ACTI VE” and we return.
• Yet: ALL TARFILES CREATED are transferred on the one submit (task_id is constant). Since this continues through ALL tarfiles created, we NEVER see a “status = SUCCEEDED” message, because we no longer have anything to submit.

The (NON-BLOCKING) behavior appears to be that globus keeps looking into the directory, and if new (closed) files appear, it keeps transferring them.

This is not my conception of “transfer directory”. I would take a snapshot of the directory, transfer ONLY those files present at the time of submission, and ignore any new files that arrive during the transfer operation. (If any of this is detailed in the globus_sdk docs, I never saw it).

NOTE: in my BLOCKING test, I used:

transfer_client.task_wait(task_id, polling_interval=20)

to effect the blocking. The documentation generally says: task_wait(task_id, timeout=10, polling_interval=10) (default values shown). Both timeout and polling_interval are listed as “optional”, but what is strange is that their example for “indefinite wait” uses:

>>> while not tc.task_wait(task_id, timeout=60):
>>>     print("Another minute went by without terminating"

I would have thought to use “polling_interval=60”, with “timeout” omitted.

I will conduct some more tests, but in each case I have instrumented the log-file output to reveal the behavior.

I WONDER if the “task_wait” expiration (prior to completion) is the way in which it wants you to control “another parallel transfer”.

I am thinking of creating a series of files, each one ~5 times larger that the previous (10K, 50K, 250K, 1M, 5M, 25M, 125M, …) up to maybe 5G, in NON-BLOCKING mode, just to see if the ALL get transferred under the first task_id, or if some other behavior erupts, and how “timeout” and “polling interval” affect this.

In any case, I am sure that we can now “block” or “non-block” at will.

@TonyB9000 TonyB9000 mentioned this issue Dec 19, 2024
2 tasks
@github-project-automation github-project-automation bot moved this from Todo to Done in forsyth2 current tasks Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
semver: bug Bug fix (will increment patch version)
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants