-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscribe_instance.py
157 lines (126 loc) · 4.92 KB
/
subscribe_instance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""This module is meant to be run on a VM instance to subscribe to a topic and listen.
The VM instance will subscribe to a topic with the same name as the hostname of the VM
instance.
"""
import argparse
import socket
from google.cloud import pubsub_v1
from crd_connect import connect_to_crd
from database import SpannerDatabase
from logging_utils import CloudAndConsoleLogger
def create_parser():
"""Create an argument parser for the script."""
parser = argparse.ArgumentParser(
description="Subscribe to a topic and listen for messages."
)
parser.add_argument(
"--project_id",
default="sandbox-408020",
type=str,
help="The project ID of the Google Cloud project.",
)
parser.add_argument(
"--db_instance_id",
default="liezl-test",
type=str,
help="The ID of the Cloud Spanner instance.",
)
parser.add_argument(
"--db_id",
default="test-db",
type=str,
help="The ID of the Cloud Spanner database.",
)
parser.add_argument(
"--db_table_name",
default="VirtualMachines",
type=str,
help="The name of the table in the database (to be used in sql queries).",
)
return parser
# Set up logging
cnc_logger = CloudAndConsoleLogger(module_name=__name__)
def subscribe_and_listen(
project_id: str = None,
db_instance_id: str = None,
db_id: str = None,
db_table_name: str = None,
):
"""Subscribe to a topic and listen for messages.
Args:
project_id: The project ID of the Google Cloud project.
db_instance_id: The ID of the Cloud Spanner instance.
db_id: The ID of the Cloud Spanner database.
db_table_name: The name of the table in the database (to be used in sql queries).
"""
# TODO: This value could be entered via the startup script
project_id = project_id or "sandbox-408020"
cnc_logger.info(f"\tProject ID: {project_id}")
# Use hostname to find topic and create a subscription
hostname = socket.gethostname()
topic_name = f"projects/{project_id}/topics/{hostname}"
subscription_name = f"projects/{project_id}/subscriptions/{hostname}"
cnc_logger.info(f"\tTopic Name: {topic_name}")
cnc_logger.info(f"\tSubscription Name: {subscription_name}")
# Set-up database client to use in callback
db_instance_id = db_instance_id or "liezl-test"
db_id = db_id or "test-db"
db_table_name = db_table_name or "VirtualMachines"
cnc_logger.info(
f"\tDB Instance ID: {db_instance_id}\n"
f"\tDB ID: {db_id}\n"
f"\tDB Table Name: {db_table_name}"
)
spanner_db = SpannerDatabase.load_database(
project_id, db_instance_id, db_id, db_table_name
)
def callback(message):
"""Get a pin and command from database and execute command to connect to CRD.
Args:
message: The message from the topic.
"""
# TODO: We could do different actions based on this message.
cnc_logger.info(f"Received message:\n\t{message.data}")
try:
# Connect to CRD using command from database
cnc_logger.info(f"Getting pin and command from database for {hostname}...")
pin, command = spanner_db.get_pin_and_crd(hostname=hostname)
cnc_logger.info(f"Pin: {pin}\nCommand: {command}")
connect_to_crd(command=command, pin=pin, run=True)
cnc_logger.info(f"Finished running connect_to_crd.")
except Exception as e:
cnc_logger.error(f"An error occurred: {e}")
message.ack()
# Create a subscription
with pubsub_v1.SubscriberClient() as subscriber:
cnc_logger.info(f"Creating subscription: {subscription_name}")
try:
subscriber.create_subscription(name=subscription_name, topic=topic_name)
except Exception as e:
cnc_logger.error(
f"Creating subscription {subscription_name} threw an Exception: {e}."
)
cnc_logger.info(f"Subscribing to {subscription_name}...")
future = subscriber.subscribe(subscription_name, callback)
# Block the main thread and wait for the subscription to be cancelled or fail
try:
cnc_logger.info(f"Listening for messages on {subscription_name}...")
future.result()
except Exception as e:
cnc_logger.error(
f"Listening for messages on {subscription_name} threw an Exception: {e}."
)
# This is the entry point for the script and is run on the VM instance by start_up.sh
if __name__ == "__main__":
parser = create_parser()
args, _ = parser.parse_known_args()
project_id = args.project_id
db_instance_id = args.db_instance_id
db_id = args.db_id
db_table_name = args.db_table_name
subscribe_and_listen(
project_id=project_id,
db_instance_id=db_instance_id,
db_id=db_id,
db_table_name=db_table_name,
)