Skip to content

Commit

Permalink
fix: report failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Tamir David authored and Tamir David committed Aug 12, 2024
1 parent a48d423 commit 67207d9
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 40 deletions.
69 changes: 37 additions & 32 deletions agents/python/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,45 @@ def _configure(self, **kwargs):


def _initialize_components():
trace_exporters, metric_exporters, log_exporters = sdk_config._import_exporters(
sdk_config._get_exporter_names("traces"),
sdk_config._get_exporter_names("metrics"),
sdk_config._get_exporter_names("logs"),
)

auto_resource = {
"telemetry.distro.name": "odigos",
"telemetry.distro.version": VERSION,
}

resource_attributes_event = threading.Event()
client = start_opamp_client(resource_attributes_event)
resource_attributes_event.wait(timeout=30) # Wait for the resource attributes to be received for 30 seconds

received_value = client.resource_attributes

if received_value:
auto_resource.update(received_value)

resource = Resource.create(auto_resource) \
.merge(OTELResourceDetector().detect()) \
.merge(ProcessResourceDetector().detect())

initialize_traces_if_enabled(trace_exporters, resource)
initialize_metrics_if_enabled(metric_exporters, resource)
initialize_logging_if_enabled(log_exporters, resource)

try:

client = start_opamp_client(resource_attributes_event)
resource_attributes_event.wait(timeout=30) # Wait for the resource attributes to be received for 30 seconds

# Reorder the python sys.path to ensure that the user application's dependencies take precedence over the agent's dependencies.
# This is necessary because the user application's dependencies may be incompatible with those used by the agent.
reorder_python_path()
# Reload distro modules to ensure the new path is used.
reload_distro_modules()
received_value = client.resource_attributes

if received_value:
trace_exporters, metric_exporters, log_exporters = sdk_config._import_exporters(
sdk_config._get_exporter_names("traces"),
sdk_config._get_exporter_names("metrics"),
sdk_config._get_exporter_names("logs"),
)

auto_resource = {
"telemetry.distro.name": "odigos",
"telemetry.distro.version": VERSION,
}

auto_resource.update(received_value)

resource = Resource.create(auto_resource) \
.merge(OTELResourceDetector().detect()) \
.merge(ProcessResourceDetector().detect())

initialize_traces_if_enabled(trace_exporters, resource)
initialize_metrics_if_enabled(metric_exporters, resource)
initialize_logging_if_enabled(log_exporters, resource)

# Reorder the python sys.path to ensure that the user application's dependencies take precedence over the agent's dependencies.
# This is necessary because the user application's dependencies may be incompatible with those used by the agent.
reorder_python_path()
# Reload distro modules to ensure the new path is used.
reload_distro_modules()

except Exception as e:
client.shutdown(custom_failure_message=str(e))


def initialize_traces_if_enabled(trace_exporters, resource):
traces_enabled = os.getenv(sdk_config.OTEL_TRACES_EXPORTER, "none").strip().lower()
Expand Down
23 changes: 15 additions & 8 deletions agents/python/opamp/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,20 @@ def run(self):
return

self.send_first_message_with_retry()

self.event.set()

self.worker()

except Exception as e:
opamp_logger.error(f"Error running OpAMP client: {e}")
self.send_agent_failure_disconnect_message(error_message=str(e))
failure_message = self.get_agent_failure_disconnect_message(error_message=str(e))
self.send_agent_to_server_message(failure_message)

# Exiting the opamp thread and set the event to notify the main thread
self.event.set()

def send_agent_failure_disconnect_message(self, error_message: str) -> None:
sys.exit()

def get_agent_failure_disconnect_message(self, error_message: str) -> None:
agent_failure_message = opamp_pb2.AgentToServer()

agent_disconnect = self.get_agent_disconnect()
Expand All @@ -76,7 +80,7 @@ def send_agent_failure_disconnect_message(self, error_message: str) -> None:
agent_health = self.get_agent_health(component_health=False, last_error=error_message, status=AgentHealthStatus.AGENT_FAILURE.value)
agent_failure_message.health.CopyFrom(agent_health)

self.send_agent_to_server_message(agent_failure_message)
return agent_failure_message

def send_unsupported_version_disconnect_message(self, error_message: str) -> None:
first_disconnect_message = opamp_pb2.AgentToServer()
Expand Down Expand Up @@ -236,11 +240,14 @@ def mandatory_env_vars_set(self):

return True

def shutdown(self):
def shutdown(self, custom_failure_message: str = None):
self.running = False
opamp_logger.info("Sending agent disconnect message to OpAMP server...")
agent_health = self.get_agent_health(component_health=False, last_error="Python runtime is exiting", status=AgentHealthStatus.TERMINATED.value)
disconnect_message = opamp_pb2.AgentToServer(agent_disconnect=opamp_pb2.AgentDisconnect(), health=agent_health)
if custom_failure_message:
disconnect_message = self.get_agent_failure_disconnect_message(error_message=custom_failure_message)
else:
agent_health = self.get_agent_health(component_health=False, last_error="Python runtime is exiting", status=AgentHealthStatus.TERMINATED.value)
disconnect_message = opamp_pb2.AgentToServer(agent_disconnect=opamp_pb2.AgentDisconnect(), health=agent_health)

with self.condition:
self.condition.notify_all()
Expand Down
3 changes: 3 additions & 0 deletions common/agent_health_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ const (
// The termination can be due to normal shutdown (e.g. event loop run out of work)
// due to explicit termination (e.g. code calls exit(), or OS signal), or due to an error (e.g. unhandled exception)
AgentHealthProcessTerminated AgentHealthStatus = "ProcessTerminated"

// AgentHealthStatusAgentFailure is when the opamp agent encountered an error during runtime.
AgentFailure AgentHealthStatus = "AgentFailure"
)

0 comments on commit 67207d9

Please sign in to comment.