Skip to content

Commit

Permalink
Set Tags on EMR cluster creation
Browse files Browse the repository at this point in the history
This ensures that tags are set at the moment of cluster creation, which
can have implications for permissions requiring tags.

Closes Yelp#2207
  • Loading branch information
mgmarino committed Mar 31, 2021
1 parent 091572e commit 9e86591
Showing 1 changed file with 9 additions and 19 deletions.
28 changes: 9 additions & 19 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1121,13 +1121,21 @@ def _create_cluster(self, persistent=False):
emr_client = self.make_emr_client()

kwargs = self._cluster_kwargs(persistent)
kwargs["Tags"] = self._build_tags()
log.debug('Calling run_job_flow(%s)' % (
', '.join('%s=%r' % (k, v)
for k, v in sorted(kwargs.items()))))
cluster_id = emr_client.run_job_flow(**kwargs)['JobFlowId']

log.info('Created new cluster %s' % cluster_id)

return cluster_id

def _build_tags(self):
"""
Build tags to be set on cluster creation
"""

# set EMR tags for the cluster
tags = dict(self._opts['tags'])

Expand All @@ -1143,25 +1151,7 @@ def _create_cluster(self, persistent=False):
tags['__mrjob_pool_hash'] = self._pool_hash()
tags['__mrjob_pool_name'] = self._opts['pool_name']

self._add_tags(tags, cluster_id)

return cluster_id

def _add_tags(self, tags, cluster_id):
"""Add tags in the dict *tags* to cluster *cluster_id*. Do nothing
if *tags* is empty or ``None``"""
if not tags:
return

tags_items = sorted(tags.items())

self.make_emr_client().add_tags(
ResourceId=cluster_id,
Tags=[dict(Key=k, Value=v) for k, v in tags_items])

log.info('Added EMR tags to cluster %s: %s' % (
cluster_id,
', '.join('%s=%s' % (tag, value) for tag, value in tags_items)))
return [dict(Key=str(k), Value='' if v is None else str(v)) for k, v in tags.items()]

# TODO: could break this into sub-methods for clarity
def _cluster_kwargs(self, persistent=False):
Expand Down

0 comments on commit 9e86591

Please sign in to comment.