Skip to content

Commit

Permalink
Fix failure caused by duplicate create call to Kubernetes API server (#…
Browse files Browse the repository at this point in the history
…641)

* Additional logging for ep and bouncer

* Handle kube create same object
  • Loading branch information
phudtran authored Mar 4, 2022
1 parent e00378e commit 397924e
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 21 deletions.
22 changes: 14 additions & 8 deletions mizar/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,20 @@ def kube_create_obj(obj):
body['spec']['createtime'] = datetime.datetime.now().isoformat()
logger.info("Init {} at {}".format(
obj.get_name(), body['spec']['createtime']))

obj.obj_api.create_namespaced_custom_object(
group="mizar.com",
version="v1",
namespace="default",
plural=obj.get_plural(),
body=body,
)
try:
obj.obj_api.create_namespaced_custom_object(
group="mizar.com",
version="v1",
namespace="default",
plural=obj.get_plural(),
body=body,
)
except ApiException as e:
if e.status == CONSTANTS.HTTP_CONFLICT_ERROR:
logger.info("Object {} already exists! Skipping".format(obj.get_name()))
return
else:
logger.info("Unkown: Object {} failed to create error {}".format(obj.get_name(), e))
logger.debug("Created {} {}".format(obj.get_kind(), obj.get_name()))
obj.store_update_obj()

Expand Down
1 change: 1 addition & 0 deletions mizar/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class CONSTANTS:
RPC_ERROR_CODE = "error"
RPC_FATAL_CODE = "fatal"
RPC_FAILED_CODE = "failed"
HTTP_CONFLICT_ERROR = 409
GRPC_DEVICE_BUSY_ERROR = "Device or resource busy"
GRPC_FILE_EXISTS_ERROR = "File exists"
GRPC_UNAVAILABLE = "failed to connect to all addresses"
Expand Down
14 changes: 7 additions & 7 deletions mizar/common/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def update_substrate_ep(self, ip, mac, task):
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
task.raise_temporary_error(
"update_substrate_ep returned ERROR! Retrying as agent may have not yet been loaded.")
"update_substrate_ep returned ERROR! IP {} Retrying as agent may have not yet been loaded.".format(ip))

def update_agent_substrate_ep(self, ep, ip, mac, task):
itf = ep.get_veth_peer()
Expand All @@ -123,7 +123,7 @@ def update_agent_substrate_ep(self, ep, ip, mac, task):
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
task.raise_temporary_error(
"update_agent_substrate_ep returned ERROR! Retrying as agent may have not yet been loaded.")
"update_agent_substrate_ep returned ERROR! Endpoint {} Retrying as agent may have not yet been loaded.".format(ep.name))

def delete_agent_substrate_ep(self, ep, ip):
itf = ep.get_veth_peer()
Expand Down Expand Up @@ -178,7 +178,7 @@ def update_ep(self, ep, task):
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
task.raise_temporary_error(
"update_ep returned ERROR! Retrying as agent may have not yet been loaded.")
"update_ep returned ERROR! Endpoint {} Retrying as transit may have not yet been loaded.".format(ep.name))
remote_ports = ep.get_remote_ports()
frontend_ports = ep.get_frontend_ports()
protocols = ep.get_port_protocols()
Expand All @@ -204,7 +204,7 @@ def update_port(self, tunid, ip, port, target_port, protocol, task):
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
task.raise_temporary_error(
"update_port returned ERROR! Retrying as agent may have not yet been loaded.")
"update_port returned ERROR! IP {} Retrying as agent may have not yet been loaded.".format(ip))

def update_agent_metadata(self, ep, task):
itf = ep.get_veth_peer()
Expand Down Expand Up @@ -240,7 +240,7 @@ def update_agent_metadata(self, ep, task):
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
task.raise_temporary_error(
"update_agent_metadata returned ERROR! Retrying as agent may have not yet been loaded.")
"update_agent_metadata returned ERROR! Endpoint {} Retrying as agent may have not yet been loaded.".format(ep.name))

def load_transit_agent_xdp(self, veth_peer):
itf = veth_peer
Expand Down Expand Up @@ -335,7 +335,7 @@ def update_vpc(self, bouncer, task):
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
task.raise_temporary_error(
"update_vpc returned ERROR! Retrying as agent may have not yet been loaded.")
"update_vpc returned ERROR! VNI {} Retrying as agent may have not yet been loaded.".format(bouncer.vni))

def delete_vpc(self, bouncer):
jsonconf = {
Expand Down Expand Up @@ -367,7 +367,7 @@ def update_net(self, net, task):
CONSTANTS.RPC_FAILED_CODE in text or
CONSTANTS.RPC_FATAL_CODE in text):
task.raise_temporary_error(
"update_net returned ERROR! Retrying as agent may have not yet been loaded.")
"update_net returned ERROR! VNI {} Retrying as agent may have not yet been loaded.".format(net.vni))

def delete_net(self, net):
jsonconf = {
Expand Down
13 changes: 8 additions & 5 deletions mizar/dp/mizar/operators/endpoints/endpoints_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ def update_bouncer_with_endpoints(self, bouncer, task):
def update_endpoints_with_bouncers(self, bouncer, task):
eps = list(self.store.get_eps_in_net(bouncer.net).values())
for ep in eps:
logger.info("EP {} update agent with bouncer {}".format(
bouncer.name, ep.name))
if ep.type == OBJ_DEFAULTS.ep_type_simple or ep.type == OBJ_DEFAULTS.ep_type_host:
if not ep.droplet_obj:
task.raise_temporary_error("Task: {} Endpoint: {} Droplet Object not ready.".format(
self.__class__.__name__, ep.name))
logger.info("update_endpoints_with_bouncers: ep {} update agent with bouncer {}".format(
ep.name, bouncer.name))
ep.update_bouncers({bouncer.name: bouncer}, task)

def create_scaled_endpoint(self, name, ep_name, spec, net, extra, namespace="default"):
Expand Down Expand Up @@ -321,22 +321,25 @@ def produce_simple_endpoint_interface(self, ep, task):
interfaces_list[0].status = InterfaceStatus.consumed
interfaces = InterfaceServiceClient(
ep.droplet_obj.main_ip).ActivateHostInterface(interfaces_list[0], task)
return interfaces
else:
logger.info(
"Producing interface for simple endpoint {}".format(ep.name))
interfaces = InterfaceServiceClient(ep.droplet_obj.main_ip).ProduceInterfaces(
InterfacesList(interfaces=interfaces_list), task)
logger.info("Produced {}".format(interfaces))
return interfaces

def create_simple_endpoints(self, interfaces, spec):
"""
Create a simple endpoint object (calling the API operator)
"""
for interface, net_info in zip(interfaces.interfaces, spec['interfaces']):
logger.info("Create simple endpoint {}".format(interface))
name = get_itf_name(interface.interface_id)
if self.store.get_ep(name):
logger.info("EP already exists!")
logger.info("EP {} already exists!".format(name))
return
logger.info("Create simple endpoint ep {} {}".format(
name, interface))
ep = Endpoint(name, self.obj_api, self.store)
ep.set_pod(spec["name"])
ep.set_type(OBJ_DEFAULTS.ep_type_simple)
Expand Down
4 changes: 3 additions & 1 deletion mizar/dp/mizar/workflows/endpoints/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def run(self):
logger.info("Activate host interface for vpc {} on droplet {}".format(
ep.vpc, ep.droplet_obj.ip))
droplets_opr.store_update_vpc_to_droplet(vpc, ep.droplet_obj)
endpoints_opr.produce_simple_endpoint_interface(ep, self)
itf = endpoints_opr.produce_simple_endpoint_interface(ep, self)
logger.info(
"Endpoint Create: Endpoint {} produced interface {}".format(ep.name, itf))
endpoints_opr.set_endpoint_provisioned(ep)
self.finalize()
4 changes: 4 additions & 0 deletions mizar/obj/bouncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,19 @@ def load_transit_xdp_pipeline_stage(self, task):
def update_eps(self, eps, task):
for ep in eps:
self.eps[ep.name] = ep
logger.info("bouncer update_ep updating: ep {}".format(ep.name))
if ep.type in OBJ_DEFAULTS.droplet_eps:
if not ep.droplet_obj:
task.raise_temporary_error("Task: {} Endpoint: {} Droplet Object not ready.".format(
self.__class__.__name__, ep.name))
self._update_simple_ep(ep, task)
logger.info("bouncer update_ep updated: ep {}".format(ep.name))
elif ep.type == OBJ_DEFAULTS.ep_type_scaled:
self._update_scaled_ep(ep, task)
logger.info("bouncer update_ep updated: ep {}".format(ep.name))
elif ep.type == OBJ_DEFAULTS.ep_type_gateway:
self.update_gw_ep(ep, task)
logger.info("bouncer update_ep updated: ep {}".format(ep.name))
else:
task.raise_permenant_error(
"Unknown Endpoint! {}".formatr(ep.type))
Expand Down

0 comments on commit 397924e

Please sign in to comment.