Skip to content

Commit

Permalink
Replace all usage of boto2 with boto3 (#4868)
Browse files Browse the repository at this point in the history
* Take out boto2 from awsProvisioner.py

* Add mypy stub file for s3

* Lazy import aws to avoid dependency if extra is not installed yet

* Also lazy import in tests

* Separate out wdl kubernetes test to avoid missing dependency

* Add unittest main

* Fix wdl CI to run separated tests

* Fix typo in lookup

* Update moto and remove leftover line in node.py

* Remove all instances of boto

* Fix issues with boto return types and grab attributes before deleting

* Remove some unnecessary abstraction

* Fix improperly types in ec2.py

* Ensure UUID is a string for boto3

* No more boto

* Remove comments

* Move attribute initialization

* Properly delete all attributes of the item

* Move out pager and use pager for select to get around output limits

* Turn getter into method

* Remove comment in setup.py

* Remove commented dead import

* Remove stray boto import

* Apply suggestions from code review

Co-authored-by: Adam Novak <[email protected]>

* Rename, rearrange some code

* Revert not passing Value's to attributes when deleting attributes in SDB

* Fix missed changed var names

* Change ordering of jobstorexists exception to fix improper output on exception

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Adam Novak <[email protected]>
  • Loading branch information
3 people authored Apr 29, 2024
1 parent 435d9c1 commit d681fe6
Show file tree
Hide file tree
Showing 17 changed files with 428 additions and 550 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ clean_sdist:
# Setting SET_OWNER_TAG will tag cloud resources so that UCSC's cloud murder bot won't kill them.
test: check_venv check_build_reqs
TOIL_OWNER_TAG="shared" \
python -m pytest --durations=0 --strict-markers --log-level DEBUG --log-cli-level INFO -r s $(cov) -n $(threads) --dist loadscope $(tests) -m "$(marker)"
python -m pytest --durations=0 --strict-markers --log-level DEBUG --log-cli-level INFO -r s $(cov) -n $(threads) --dist loadscope $(tests) -m "$(marker)" --color=yes

test_debug: check_venv check_build_reqs
TOIL_OWNER_TAG="$(whoami)" \
Expand Down
1 change: 0 additions & 1 deletion requirements-aws.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
boto>=2.48.0, <3
boto3-stubs[s3,sdb,iam,sts,boto3,ec2,autoscaling]>=1.28.3.post2, <2
mypy-boto3-iam>=1.28.3.post2, <2 # Need to force .post1 to be replaced
mypy-boto3-s3>=1.28.3.post2, <2
Expand Down
11 changes: 0 additions & 11 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# import imp
import os
import types
from importlib.machinery import SourceFileLoader
Expand Down Expand Up @@ -142,16 +141,6 @@ def import_version():
}))
os.rename(f.name, 'src/toil/version.py')

# Unfortunately, we can't use a straight import here because that would also load the stuff
# defined in "src/toil/__init__.py" which imports modules from external dependencies that may
# yet to be installed when setup.py is invoked.
#
# This is also the reason we cannot switch from the "deprecated" imp library
# and use:
# from importlib.machinery import SourceFileLoader
# return SourceFileLoader('toil.version', path='src/toil/version.py').load_module()
#
# Because SourceFileLoader will error and load "src/toil/__init__.py" .
loader = SourceFileLoader('toil.version', 'src/toil/version.py')
mod = types.ModuleType(loader.name)
loader.exec_module(mod)
Expand Down
231 changes: 0 additions & 231 deletions src/toil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ def logProcessContext(config: "Config") -> None:


try:
from boto import provider
from botocore.credentials import (JSONFileCache,
RefreshableCredentials,
create_credential_resolver)
Expand Down Expand Up @@ -476,235 +475,5 @@ def str_to_datetime(s):
datetime.datetime(1970, 1, 1, 0, 0)
"""
return datetime.strptime(s, datetime_format)


class BotoCredentialAdapter(provider.Provider):
"""
Boto 2 Adapter to use AWS credentials obtained via Boto 3's credential finding logic.
This allows for automatic role assumption
respecting the Boto 3 config files, even when parts of the app still use
Boto 2.
This class also handles caching credentials in multi-process environments
to avoid loads of processes swamping the EC2 metadata service.
"""

# TODO: We take kwargs because new boto2 versions have an 'anon'
# argument and we want to be future proof

def __init__(self, name, access_key=None, secret_key=None,
security_token=None, profile_name=None, **kwargs):
"""Create a new BotoCredentialAdapter."""
# TODO: We take kwargs because new boto2 versions have an 'anon'
# argument and we want to be future proof

if (name == 'aws' or name is None) and access_key is None and not kwargs.get('anon', False):
# We are on AWS and we don't have credentials passed along and we aren't anonymous.
# We will backend into a boto3 resolver for getting credentials.
# Make sure to enable boto3's own caching, so we can share that
# cache with pure boto3 code elsewhere in Toil.
# Keep synced with toil.lib.aws.session.establish_boto3_session
self._boto3_resolver = create_credential_resolver(Session(profile=profile_name), cache=JSONFileCache())
else:
# We will use the normal flow
self._boto3_resolver = None

# Pass along all the arguments
super().__init__(name, access_key=access_key,
secret_key=secret_key, security_token=security_token,
profile_name=profile_name, **kwargs)

def get_credentials(self, access_key=None, secret_key=None, security_token=None, profile_name=None):
"""
Make sure our credential fields are populated.
Called by the base class constructor.
"""
if self._boto3_resolver is not None:
# Go get the credentials from the cache, or from boto3 if not cached.
# We need to be eager here; having the default None
# _credential_expiry_time makes the accessors never try to refresh.
self._obtain_credentials_from_cache_or_boto3()
else:
# We're not on AWS, or they passed a key, or we're anonymous.
# Use the normal route; our credentials shouldn't expire.
super().get_credentials(access_key=access_key,
secret_key=secret_key, security_token=security_token,
profile_name=profile_name)

def _populate_keys_from_metadata_server(self):
"""
Hack to catch _credential_expiry_time being too soon and refresh the credentials.
This override is misnamed; it's actually the only hook we have to catch
_credential_expiry_time being too soon and refresh the credentials. We
actually just go back and poke the cache to see if it feels like
getting us new credentials.
Boto 2 hardcodes a refresh within 5 minutes of expiry:
https://github.com/boto/boto/blob/591911db1029f2fbb8ba1842bfcc514159b37b32/boto/provider.py#L247
Boto 3 wants to refresh 15 or 10 minutes before expiry:
https://github.com/boto/botocore/blob/8d3ea0e61473fba43774eb3c74e1b22995ee7370/botocore/credentials.py#L279
So if we ever want to refresh, Boto 3 wants to refresh too.
"""
# This should only happen if we have expiring credentials, which we should only get from boto3
if self._boto3_resolver is None:
raise RuntimeError("The Boto3 resolver should not be None.")

self._obtain_credentials_from_cache_or_boto3()

@retry()
def _obtain_credentials_from_boto3(self):
"""
Fill our credential fields from Boto 3.
We know the current cached credentials are not good, and that we
need to get them from Boto 3. Fill in our credential fields
(_access_key, _secret_key, _security_token,
_credential_expiry_time) from Boto 3.
"""
# We get a Credentials object
# <https://github.com/boto/botocore/blob/8d3ea0e61473fba43774eb3c74e1b22995ee7370/botocore/credentials.py#L227>
# or a RefreshableCredentials, or None on failure.
creds = self._boto3_resolver.load_credentials()

if creds is None:
try:
resolvers = str(self._boto3_resolver.providers)
except:
resolvers = "(Resolvers unavailable)"
raise RuntimeError("Could not obtain AWS credentials from Boto3. Resolvers tried: " + resolvers)

# Make sure the credentials actually has some credentials if it is lazy
creds.get_frozen_credentials()

# Get when the credentials will expire, if ever
if isinstance(creds, RefreshableCredentials):
# Credentials may expire.
# Get a naive UTC datetime like boto 2 uses from the boto 3 time.
self._credential_expiry_time = creds._expiry_time.astimezone(timezone('UTC')).replace(tzinfo=None)
else:
# Credentials never expire
self._credential_expiry_time = None

# Then, atomically get all the credentials bits. They may be newer than we think they are, but never older.
frozen = creds.get_frozen_credentials()

# Copy them into us
self._access_key = frozen.access_key
self._secret_key = frozen.secret_key
self._security_token = frozen.token

def _obtain_credentials_from_cache_or_boto3(self):
"""
Get the cached credentials.
Or retrieve them from Boto 3 and cache them
(or wait for another cooperating process to do so) if they are missing
or not fresh enough.
"""
cache_path = '~/.cache/aws/cached_temporary_credentials'
path = os.path.expanduser(cache_path)
tmp_path = path + '.tmp'
while True:
log.debug('Attempting to read cached credentials from %s.', path)
try:
with open(path) as f:
content = f.read()
if content:
record = content.split('\n')
if len(record) != 4:
raise RuntimeError("Number of cached credentials is not 4.")
self._access_key = record[0]
self._secret_key = record[1]
self._security_token = record[2]
self._credential_expiry_time = str_to_datetime(record[3])
else:
log.debug('%s is empty. Credentials are not temporary.', path)
self._obtain_credentials_from_boto3()
return
except OSError as e:
if e.errno == errno.ENOENT:
log.debug('Cached credentials are missing.')
dir_path = os.path.dirname(path)
if not os.path.exists(dir_path):
log.debug('Creating parent directory %s', dir_path)
try:
# A race would be ok at this point
os.makedirs(dir_path, exist_ok=True)
except OSError as e2:
if e2.errno == errno.EROFS:
# Sometimes we don't actually have write access to ~.
# We may be running in a non-writable Toil container.
# We should just go get our own credentials
log.debug('Cannot use the credentials cache because we are working on a read-only filesystem.')
self._obtain_credentials_from_boto3()
else:
raise
else:
raise
else:
if self._credentials_need_refresh():
log.debug('Cached credentials are expired.')
else:
log.debug('Cached credentials exist and are still fresh.')
return
# We get here if credentials are missing or expired
log.debug('Racing to create %s.', tmp_path)
# Only one process, the winner, will succeed
try:
fd = os.open(tmp_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
except OSError as e:
if e.errno == errno.EEXIST:
log.debug('Lost the race to create %s. Waiting on winner to remove it.', tmp_path)
while os.path.exists(tmp_path):
time.sleep(0.1)
log.debug('Winner removed %s. Trying from the top.', tmp_path)
else:
raise
else:
try:
log.debug('Won the race to create %s. Requesting credentials from backend.', tmp_path)
self._obtain_credentials_from_boto3()
except:
os.close(fd)
fd = None
log.debug('Failed to obtain credentials, removing %s.', tmp_path)
# This unblocks the losers.
os.unlink(tmp_path)
# Bail out. It's too likely to happen repeatedly
raise
else:
if self._credential_expiry_time is None:
os.close(fd)
fd = None
log.debug('Credentials are not temporary. Leaving %s empty and renaming it to %s.',
tmp_path, path)
# No need to actually cache permanent credentials,
# because we know we aren't getting them from the
# metadata server or by assuming a role. Those both
# give temporary credentials.
else:
log.debug('Writing credentials to %s.', tmp_path)
with os.fdopen(fd, 'w') as fh:
fd = None
fh.write('\n'.join([
self._access_key,
self._secret_key,
self._security_token,
datetime_to_str(self._credential_expiry_time)]))
log.debug('Wrote credentials to %s. Renaming to %s.', tmp_path, path)
os.rename(tmp_path, path)
return
finally:
if fd is not None:
os.close(fd)


provider.Provider = BotoCredentialAdapter

except ImportError:
pass
10 changes: 5 additions & 5 deletions src/toil/batchSystems/awsBatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from argparse import ArgumentParser, _ArgumentGroup
from typing import Any, Dict, Iterator, List, Optional, Set, Union

from boto.exception import BotoServerError
from botocore.exceptions import ClientError

from toil import applianceSelf
from toil.batchSystems.abstractBatchSystem import (EXIT_STATUS_UNAVAILABLE_VALUE,
Expand Down Expand Up @@ -376,7 +376,7 @@ def shutdown(self) -> None:
# Get rid of the job definition we are using if we can.
self._destroy_job_definition()

@retry(errors=[BotoServerError])
@retry(errors=[ClientError])
def _try_terminate(self, aws_id: str) -> None:
"""
Internal function. Should not be called outside this class.
Expand All @@ -392,7 +392,7 @@ def _try_terminate(self, aws_id: str) -> None:
# Kill the AWS Batch job
self.client.terminate_job(jobId=aws_id, reason='Killed by Toil')

@retry(errors=[BotoServerError])
@retry(errors=[ClientError])
def _wait_until_stopped(self, aws_id: str) -> None:
"""
Internal function. Should not be called outside this class.
Expand All @@ -418,7 +418,7 @@ def _wait_until_stopped(self, aws_id: str) -> None:
logger.info('Waiting for killed job %s to stop', self.aws_id_to_bs_id.get(aws_id, aws_id))
time.sleep(2)

@retry(errors=[BotoServerError])
@retry(errors=[ClientError])
def _get_or_create_job_definition(self) -> str:
"""
Internal function. Should not be called outside this class.
Expand Down Expand Up @@ -482,7 +482,7 @@ def _get_or_create_job_definition(self) -> str:

return self.job_definition

@retry(errors=[BotoServerError])
@retry(errors=[ClientError])
def _destroy_job_definition(self) -> None:
"""
Internal function. Should not be called outside this class.
Expand Down
2 changes: 1 addition & 1 deletion src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(self, locator: str, prefix: str):

class JobStoreExistsException(LocatorException):
"""Indicates that the specified job store already exists."""
def __init__(self, prefix: str, locator: str):
def __init__(self, locator: str, prefix: str):
"""
:param str locator: The location of the job store
"""
Expand Down
Loading

0 comments on commit d681fe6

Please sign in to comment.