Skip to content

Commit

Permalink
Fix pre-commit checks and run tests
Browse files Browse the repository at this point in the history
  • Loading branch information
forsyth2 committed Dec 27, 2024
1 parent d7bae50 commit 633a543
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 29 deletions.
1 change: 1 addition & 0 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ts_utc,
)


def create():
cache: str
cache, args = setup_create()
Expand Down
63 changes: 43 additions & 20 deletions zstash/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from globus_sdk import TransferAPIError, TransferClient, TransferData
from globus_sdk.services.transfer.response.iterable import IterableTransferResponse
from six.moves.urllib.parse import urlparse
from .utils import ts_utc

from .settings import logger
from .utils import ts_utc

hpss_endpoint_map = {
"ALCF": "de463ec4-6d04-11e5-ba46-22000b92c6ec",
Expand Down Expand Up @@ -158,9 +158,10 @@ def file_exists(name: str) -> bool:
return False


def globus_transfer(
# C901 'globus_transfer' is too complex (20)
def globus_transfer( # noqa: C901
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
): # noqa: C901
):
global transfer_client
global local_endpoint
global remote_endpoint
Expand Down Expand Up @@ -223,10 +224,14 @@ def globus_transfer(
# NOTE: How we behave here depends upon whether we want to support mutliple active transfers.
# Presently, we do not, except inadvertantly (if status == PENDING)
if prev_task_status == "ACTIVE":
logger.info(f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning.")
logger.info(
f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning."
)
return "ACTIVE"
elif prev_task_status == "SUCCEEDED":
logger.info(f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing.")
logger.info(
f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing."
)
src_ep = task["source_endpoint_id"]
dst_ep = task["destination_endpoint_id"]
label = task["label"]
Expand All @@ -237,22 +242,26 @@ def globus_transfer(
)
)
else:
logger.error(f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing.")
logger.error(
f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing."
)

# DEBUG: review accumulated items in TransferData
logger.info(f"{ts_utc()}: TransferData: accumulated items:")
attribs = transfer_data.__dict__
for item in attribs['data']['DATA']:
if item['DATA_TYPE'] == "transfer_item":
for item in attribs["data"]["DATA"]:
if item["DATA_TYPE"] == "transfer_item":
print(f" source item: {item['source_path']}")

# SUBMIT new transfer here
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
task = submit_transfer_with_checks(transfer_data)
task_id = task.get("task_id")
# NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer,
# the "lable" given here refers only to the LAST tarfile in the TransferData list.
logger.info(f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}")
logger.info(
f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}"
)

transfer_data = None
except TransferAPIError as e:
Expand All @@ -272,7 +281,9 @@ def globus_transfer(
# test for blocking on new task_id
task_status = "UNKNOWN"
if not non_blocking:
task_status = globus_block_wait(task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5)
task_status = globus_block_wait(
task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5
)
else:
logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}")

Expand All @@ -285,35 +296,47 @@ def globus_transfer(
return task_status


def globus_block_wait(task_id: str, wait_timeout: int, polling_interval: int, max_retries: int):
def globus_block_wait(
task_id: str, wait_timeout: int, polling_interval: int, max_retries: int
):
global transfer_client

# poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours
logger.info(f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}")
logger.info(
f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}"
)
task_status = "UNKNOWN"
retry_count = 0
while retry_count < max_retries:
try:
# Wait for the task to complete
transfer_client.task_wait(task_id, timeout=wait_timeout, polling_interval=10)
except GlobusHTTPError as e:
logger.error(f"Exception: {e}")
transfer_client.task_wait(
task_id, timeout=wait_timeout, polling_interval=10
)
# except GlobusHTTPError as e:
# logger.error(f"Exception: {e}")
except Exception as e:
logger.error(f"Unexpected Exception: {e}")
else:
curr_task = transfer_client.get_task(task_id)
task_status = curr_task['status']
task_status = curr_task["status"]
if task_status == "SUCCEEDED":
break
finally:
retry_count += 1
logger.info(f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds")
logger.info(
f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds"
)

if retry_count == max_retries:
logger.info(f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds")
logger.info(
f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds"
)
task_status = "EXHAUSTED_TIMEOUT_RETRIES"

logger.info(f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}")
logger.info(
f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}"
)

return task_status

Expand Down
19 changes: 15 additions & 4 deletions zstash/hpss.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ def hpss_transfer(
globus_status = "UNKNOWN"
# Transfer file using the Globus Transfer Service
logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})")
globus_status = globus_transfer(endpoint, url_path, name, transfer_type, non_blocking)
logger.info(f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}")
globus_status = globus_transfer(
endpoint, url_path, name, transfer_type, non_blocking
)
logger.info(
f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}"
)
# NOTE: Here, the status could be "EXHAUSTED_TIMEOUT_RETRIES", meaning a very long transfer
# or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but
# we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer
Expand All @@ -107,15 +111,22 @@ def hpss_transfer(
os.chdir(cwd)

if transfer_type == "put":
if not keep and scheme == "globus" and globus_status == "SUCCEEDED" and not non_blocking:
if (
not keep
and scheme == "globus"
and globus_status == "SUCCEEDED"
and not non_blocking
):
# We should not keep the local file, so delete it now that it is on HPSS
os.remove(file_path)
if not keep and scheme != "globus":
# We should not keep the local file, so delete it now that it is on HPSS
os.remove(file_path)


def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False):
def hpss_put(
hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False
):
"""
Put a file to the HPSS archive.
"""
Expand Down
9 changes: 6 additions & 3 deletions zstash/hpss_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from .settings import BLOCK_SIZE, TupleFilesRowNoId, TupleTarsRowNoId, config, logger
from .utils import create_tars_table, tars_table_exists, ts_utc

import subprocess

# Minimum output file object
class HashIO(object):
Expand Down Expand Up @@ -165,9 +164,13 @@ def add_files(
# process = subprocess.run(["ls", "-l", "zstash"], capture_output=True, text=True)
# print(process.stdout)

logger.info(f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}")
logger.info(
f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}"
)
hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking)
logger.info(f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}")
logger.info(
f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}"
)

# Update database with files that have been archived
# Add a row to the "files" table,
Expand Down
5 changes: 3 additions & 2 deletions zstash/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import shlex
import sqlite3
import subprocess
from datetime import datetime, timezone
from fnmatch import fnmatch
from typing import Any, List, Tuple
from datetime import datetime, timezone


from .settings import TupleTarsRow, config, logger


def ts_utc():
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f")


def filter_files(subset: str, files: List[str], include: bool) -> List[str]:

# Construct list of files to filter, based on
Expand Down

0 comments on commit 633a543

Please sign in to comment.