forked from aws-samples/aws-data-mesh-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
4_finalize_subscription.py
49 lines (36 loc) · 1.67 KB
/
4_finalize_subscription.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import argparse
import logging
import warnings
import sys
import os
import inspect
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))))
sys.path.insert(0, parent_dir)
import data_mesh_util.lib.utils as utils
from data_mesh_util.lib.constants import *
from data_mesh_util import DataMeshConsumer as dmc
warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning)
class Step4():
'''
Consumer functionality to finalize a subscription request.
'''
_region, _clients, _account_ids, _creds = utils.load_client_info_from_file()
_mgr = dmc.DataMeshConsumer(data_mesh_account_id=_account_ids.get(MESH),
log_level=logging.DEBUG,
region_name=_region,
use_credentials=_creds.get(CONSUMER))
def setUp(self) -> None:
warnings.filterwarnings("ignore", category=ResourceWarning)
def finalize_subscription(self, subscription_id):
# finalize the subscription
self._mgr.finalize_subscription(subscription_id=subscription_id)
# confirm that the consumer can see that it's status is now Active
subscription = self._mgr.get_subscription(request_id=subscription_id)
if subscription.get('Status') != 'Active':
raise Exception(f"Subscription {subscription_id} is not Active")
if __name__ == "__main__":
utils.purify_sysargs()
parser = argparse.ArgumentParser()
parser.add_argument('--subscription_id', dest='subscription_id', required=True)
args = parser.parse_args()
print(Step4().finalize_subscription(subscription_id=args.subscription_id))