Skip to content

Commit

Permalink
Use domain name as database key
Browse files Browse the repository at this point in the history
  • Loading branch information
congwang09 committed Oct 29, 2024
1 parent ddbdc44 commit ac13cbe
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 25 deletions.
13 changes: 3 additions & 10 deletions sdx_controller/handlers/lc_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def process_lc_json_msg(
latest_topo,
domain_list,
):
num_domain_topos = 0
logger.info("MQ received message:" + str(msg))
msg_json = json.loads(msg)
msg_id = msg_json["id"]
Expand All @@ -37,10 +36,10 @@ def process_lc_json_msg(

# Update existing topology
if domain_name in domain_list:
num_domain_topos = len(domain_list)
logger.info("Updating topo")
logger.debug(msg_json)
self.te_manager.update_topology(msg_json)
logger.info("Updating topology in TE manager")
failed_links = self.te_manager.get_failed_links()
if failed_links:
logger.info("Processing link failure.")
Expand All @@ -53,18 +52,12 @@ def process_lc_json_msg(
self.db_instance.add_key_value_pair_to_db(
"domains", "domain_list", domain_list
)

logger.info("Adding topology to TE manager")
self.te_manager.add_topology(msg_json)
num_domain_topos = len(domain_list)

logger.info(f"Adding topology {domain_name} to db.")
self.db_instance.add_key_value_pair_to_db(
"topologies", "num_domain_topos", num_domain_topos
)
db_key = "LC-" + str(num_domain_topos)
logger.info(f"Adding topology {db_key} to db.")
self.db_instance.add_key_value_pair_to_db(
"topologies", db_key, json.dumps(msg_json)
"topologies", domain_name, json.dumps(msg_json)
)

# TODO: use TEManager API directly; but TEManager does not
Expand Down
21 changes: 6 additions & 15 deletions sdx_controller/messaging/rpc_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,45 +85,36 @@ def start_sdx_consumer(self, thread_queue, db_instance):

latest_topo = {}
domain_list = []
num_domain_topos = 0

# This part reads from DB when SDX controller initially starts.
# It looks for domain_list, and num_domain_topos, if they are already in DB,
# It looks for domain_list, if already in DB,
# Then use the existing ones from DB.
domain_list_from_db = db_instance.read_from_db("domains", "domain_list")
latest_topo_from_db = db_instance.read_from_db("topologies", "latest_topo")
num_domain_topos_from_db = db_instance.read_from_db(
"topologies", "num_domain_topos"
)

if domain_list_from_db:
domain_list = domain_list_from_db["domain_list"]
logger.debug("Domain list already exists in db: ")
logger.debug(domain_list)
num_domain_topos = len(domain_list)

if latest_topo_from_db:
latest_topo = latest_topo_from_db["latest_topo"]
logger.debug("Topology already exists in db: ")
logger.debug(latest_topo)

# If topologies already saved in db, use them to initialize te_manager
if num_domain_topos_from_db:
num_domain_topos = num_domain_topos_from_db["num_domain_topos"]
logger.debug("Read num_domain_topos from db: ")
logger.debug(num_domain_topos)
for topo in range(1, num_domain_topos + 1):
db_key = f"LC-{topo}"
topology = db_instance.read_from_db("topologies", db_key)
if domain_list:
for domain in domain_list:
topology = db_instance.read_from_db("topologies", domain)

if not topology:
continue

# Get the actual thing minus the Mongo ObjectID.
topology = topology[db_key]
topology = topology[domain]
topo_json = json.loads(topology)
self.te_manager.add_topology(topo_json)
logger.debug(f"Read {db_key}: {topology}")
logger.debug(f"Read {domain}: {topology}")

while not self._exit_event.is_set():
# Queue.get() will block until there's an item in the queue.
Expand Down

0 comments on commit ac13cbe

Please sign in to comment.