Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix non-blocking option #354

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions tests/scripts/test_non_blocking.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# First, we have to set up Globus according to https://github.com/E3SM-Project/zstash/discussions/329
# Log in to Globus
# Authenticate LCRC Improv DTN
# Authenticate NERSC Perlmutter
source /lcrc/soft/climate/e3sm-unified/load_latest_e3sm_unified_chrysalis.sh
cd /home/ac.forsyth2/ez
mkdir zstash_dirs
cd zstash_dirs/
mkdir zstash_demo; echo 'file0 stuff' > zstash_demo/file0.txt
zstash create --hpss=globus://15288284-7006-4041-ba1a-6b52501e49f1/~/manual_run zstash_demo
# globus_sdk.services.transfer.errors.TransferAPIError: ('POST', 'https://transfer.api.globus.org/v0.10/endpoint/15288284-7006-4041-ba1a-6b52501e49f1/autoactivate?if_expires_in=600', None, 400, 'ClientError.AuthenticationFailed', 'No credentials supplied', 'msYY54WXq')
rm ~/.globus-native-apps.cfg
zstash create --hpss=globus://15288284-7006-4041-ba1a-6b52501e49f1/~/manual_run zstash_demo
# Auth Code prompt appears twice


cd /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions
du -sh v2.NARRM.historical_0151/
# That's 22 GB. Let's try to compress it with zstash.
cd v2.NARRM.historical_0151/tests

# From https://docs.e3sm.org/zstash/_build/html/main/usage.html:
# `--maxsize MAXSIZE`` specifies the maximum size (in GB) for tar files. The default is 256 GB. Zstash will create tar files that are smaller than MAXSIZE except when individual input files exceed MAXSIZE (as individual files are never split up between different tar files).
# `--non-blocking` Zstash will submit a Globus transfer and immediately create a subsequent tarball. That is, Zstash will not wait until the transfer completes to start creating a subsequent tarball. On machines where it takes more time to create a tarball than transfer it, each Globus transfer will have one file. On machines where it takes less time to create a tarball than transfer it, the first transfer will have one file, but the number of tarballs in subsequent transfers will grow finding dynamically the most optimal number of tarballs per transfer. NOTE: zstash is currently always non-blocking.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, you want transfer to go as fast possible, so --non-blocking make sense.

BUT there are situations where the disk is nearly full, so you want to block the transfers (and delete local cache) to avoid filling up the disk space.

=> So we want to hold up generating a new tarball until the previous tarball was transferred over (and its local copy is deleted)


# Make maxsize 1 GB. This will create a new tar after every 1 GB of data.
zstash create -v --hpss=globus://nersc/home/f/forsyth/test_290_v1 --maxsize 1 .

# DEBUG: Closing tar archive 000000.tar
# INFO: Creating new tar archive 000001.tar

# In a different window:
ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash
# 000000.tar 000001.tar 000002.tar 000003.tar 000004.tar 000005.tar index.db
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these tar files completed? empty?


# So, we can clearly see the tars are being created immediately.
# On the Globus website, test_290_v1 000000 transfer is complete.
# And test_290_v1 000001 transfer is in progress.

# This is the `--non-blocking` behavior, even though we did not specify it.

# Now, with changes in this PR:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this PR mainly involve propagating if not non_blocking: on the remaining globus_wait not wrapped in a conditional. But I don't think that really matters, given the summary table in #354 (comment)

conda activate zstash_dev_issue_290
cd /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests
rm -rf zstash
zstash create -v --hpss=globus://nersc/home/f/forsyth/test_290_v2 --maxsize 1 --non-blocking .
# DEBUG: Closing tar archive 000000.tar
# INFO: Creating new tar archive 000001.tar
# In a different window:
ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash
# 000000.tar 000001.tar index.db
# # On the Globus website, test_290_v1 000000 transfer is complete.
# And test_290_v1 000001 transfer is in progress.
ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash
# 000000.tar 000001.tar 000002.tar 000003.tar 000004.tar 000005.tar index.db
ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash
000000.tar 000002.tar 000004.tar 000006.tar 000008.tar 00000a.tar
000001.tar 000003.tar 000005.tar 000007.tar 000009.tar index.db
# Command completed on the command line
# But on Globus website:
# Completed -- test_290_v2 000000, test_290_v2 000001, test_290_v2 index
6 changes: 5 additions & 1 deletion zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def create():
failures: List[str] = create_database(cache, args)

# Transfer to HPSS. Always keep a local copy.
hpss_put(hpss, get_db_filename(cache), cache, keep=True)
hpss_put(
hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking
)

globus_finalize(non_blocking=args.non_blocking)

Expand Down Expand Up @@ -254,6 +256,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)
except FileNotFoundError:
raise Exception("Archive creation failed due to broken symlink.")
Expand All @@ -268,6 +271,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)

# Close database
Expand Down
9 changes: 7 additions & 2 deletions zstash/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def file_exists(name: str) -> bool:


def globus_transfer(
remote_ep: str, remote_path: str, name: str, transfer_type: str
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
): # noqa: C901
global transfer_client
global local_endpoint
Expand Down Expand Up @@ -247,7 +247,10 @@ def globus_transfer(
sys.exit(1)

if transfer_type == "get" and task_id:
globus_wait(task_id)
# non_blocking => do not wait for the last transfer to finish before creating a new tar
# not non_blocking => blocking => wait for the last transfer to finish before creating a new tar
if not non_blocking:
globus_wait(task_id)


def globus_wait(task_id: str):
Expand Down Expand Up @@ -319,6 +322,8 @@ def globus_finalize(non_blocking: bool = False):
logger.error("Exception: {}".format(e))
sys.exit(1)

# non_blocking => do not wait for the last transfer to finish before creating a new tar
# not non_blocking => blocking => wait for the last transfer to finish before creating a new tar
if not non_blocking:
if task_id:
globus_wait(task_id)
Expand Down
9 changes: 6 additions & 3 deletions zstash/hpss.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def hpss_transfer(
transfer_type: str,
cache: str,
keep: bool = False,
non_blocking: bool = False,
):
if hpss == "none":
logger.info("{}: HPSS is unavailable".format(transfer_type))
Expand Down Expand Up @@ -87,7 +88,7 @@ def hpss_transfer(

if scheme == "globus":
# Transfer file using the Globus Transfer Service
globus_transfer(endpoint, url_path, name, transfer_type)
globus_transfer(endpoint, url_path, name, transfer_type, non_blocking)
else:
# Transfer file using `hsi`
command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name)
Expand All @@ -104,11 +105,13 @@ def hpss_transfer(
os.remove(file_path)


def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True):
def hpss_put(
hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False
):
"""
Put a file to the HPSS archive.
"""
hpss_transfer(hpss, file_path, "put", cache, keep)
hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking)


def hpss_get(hpss: str, file_path: str, cache: str):
Expand Down
3 changes: 2 additions & 1 deletion zstash/hpss_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def add_files(
keep: bool,
follow_symlinks: bool,
skip_tars_md5: bool = False,
non_blocking: bool = False,
) -> List[str]:

# Now, perform the actual archiving
Expand Down Expand Up @@ -156,7 +157,7 @@ def add_files(
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, os.path.join(cache, tfname), cache, keep)
hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking)

# Update database with files that have been archived
# Add a row to the "files" table,
Expand Down
22 changes: 19 additions & 3 deletions zstash/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def update():
hpss = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, get_db_filename(cache), cache, keep=True)
hpss_put(
hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking
)

globus_finalize(non_blocking=args.non_blocking)

Expand Down Expand Up @@ -242,14 +244,28 @@ def update_database( # noqa: C901
try:
# Add files
failures = add_files(
cur, con, itar, newfiles, cache, keep, args.follow_symlinks
cur,
con,
itar,
newfiles,
cache,
keep,
args.follow_symlinks,
non_blocking=args.non_blocking,
)
except FileNotFoundError:
raise Exception("Archive update failed due to broken symlink.")
else:
# Add files
failures = add_files(
cur, con, itar, newfiles, cache, keep, args.follow_symlinks
cur,
con,
itar,
newfiles,
cache,
keep,
args.follow_symlinks,
non_blocking=args.non_blocking,
)

# Close database
Expand Down
Loading