Skip to content

Commit

Permalink
better readyz, event-driven, no timers or retries
Browse files Browse the repository at this point in the history
  • Loading branch information
sholdee committed Jun 30, 2024
1 parent 33db49e commit 0e8116a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 105 deletions.
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ EXPOSE 8000
# Set environment variables (can be overridden at runtime)
ENV LOG_FILE_PATH=/opt/adguardhome/work/data/querylog.json
ENV METRICS_PORT=8000
ENV UPDATE_INTERVAL=10

# Run the exporter
CMD ["python", "./adguard_exporter.py"]
164 changes: 60 additions & 104 deletions adguard_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,24 @@

# Set up logging
log_format = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_format)
logging.basicConfig(level=logging.INFO, format=log_format,
handlers=[logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__)

# Explicitly add a stream handler for stdout
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(log_format)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False

# Configuration
config = configparser.ConfigParser()
config.read_dict({
'DEFAULT': {
'LOG_FILE_PATH': '/opt/adguardhome/work/data/querylog.json',
'METRICS_PORT': '8000',
'UPDATE_INTERVAL': '10'
'METRICS_PORT': '8000'
}
})

log_file_path = os.environ.get('LOG_FILE_PATH', config['DEFAULT']['LOG_FILE_PATH'])
metrics_port = int(os.environ.get('METRICS_PORT', config['DEFAULT']['METRICS_PORT']))
update_interval = int(os.environ.get('UPDATE_INTERVAL', config['DEFAULT']['UPDATE_INTERVAL']))

logger.info(f"Using log file path: {log_file_path}")
logger.info(f"Using metrics port: {metrics_port}")
logger.info(f"Using update interval: {update_interval}")

# Define Prometheus metrics
dns_queries = Counter('agh_dns_queries', 'Total number of DNS queries')
Expand Down Expand Up @@ -81,7 +71,7 @@ def __init__(self):
self.top_blocked_hosts = TopHosts(max_size=100)
self.response_times = []
self.upstream_response_times = defaultdict(list)
self.window_size = 300 # 5 minutes in seconds
self.window_size = 300 # 5 minute window for response time averages
self.lock = threading.Lock()

def update_metrics(self, data):
Expand All @@ -107,15 +97,15 @@ def update_metrics(self, data):
with self.lock:
self.upstream_response_times[upstream].append((current_time, elapsed_ms))

if is_blocked and result_reason == '3':
blocked_queries.inc()
self.top_blocked_hosts.add(host)
if is_blocked and result_reason == '7':
safe_search_enforced_hosts.labels(host).inc()
elif is_blocked:
blocked_queries.inc()
self.top_blocked_hosts.add(host)

self.update_prometheus_metrics()
self.process_metrics()

def update_prometheus_metrics(self):
def process_metrics(self):
current_time = time.time()
cutoff_time = current_time - self.window_size

Expand Down Expand Up @@ -150,107 +140,67 @@ def __init__(self, log_file_path, metrics_collector):
self.log_file_path = log_file_path
self.metrics_collector = metrics_collector
self.last_position = 0
self.is_initialized = False
self.start_time = time.time()
self.last_inode = None
self.health_status = True
self.max_wait_time = 300 # Maximum wait time in seconds

def wait_for_log_file(self):
wait_interval = 5 # Interval between checks in seconds
start_time = time.time()
while not os.path.exists(self.log_file_path):
elapsed_time = time.time() - start_time
if elapsed_time >= self.max_wait_time:
logger.error(f"Log file did not appear within {self.max_wait_time} seconds.")
return False
logger.info(f"Waiting for log file to appear... ({int(elapsed_time)} seconds elapsed)")
time.sleep(wait_interval)
logger.info(f"Log file found: {self.log_file_path}")
return True

def get_inode(self):
return os.stat(self.log_file_path).st_ino if os.path.exists(self.log_file_path) else None
self.file_exists = os.path.exists(log_file_path)
self.initial_load()

def initial_load(self):
logger.info(f"Performing initial load of log file: {self.log_file_path}")
self.last_inode = self.get_inode()
if self.last_inode and os.path.exists(self.log_file_path):
with open(self.log_file_path, 'r') as log_file:
for line in log_file:
if line.strip():
try:
data = orjson.loads(line)
self.metrics_collector.update_metrics(data)
except orjson.JSONDecodeError:
logger.error(f"Error decoding JSON: {line}")
self.last_position = log_file.tell()
self.is_initialized = True
logger.info(f"Initial load complete. Processed up to position {self.last_position}")
if self.file_exists:
logger.info(f"Loading existing log file: {self.log_file_path}")
self.process_new_lines()
else:
logger.warning(f"Log file does not exist: {self.log_file_path}")
logger.info(f"Waiting for log file: {self.log_file_path}")

def on_created(self, event):
if event.src_path == self.log_file_path:
logger.info(f"Log file created: {self.log_file_path}")
self.file_exists = True
self.last_position = 0
self.process_new_lines()

def on_deleted(self, event):
if event.src_path == self.log_file_path:
logger.info(f"Log file deleted: {self.log_file_path}")
self.file_exists = False
self.last_position = 0

def on_modified(self, event):
if event.src_path == self.log_file_path:
logger.debug(f"Log file modified: {self.log_file_path}")
self.process_new_lines()

def process_new_lines(self):
max_retries = 5
retry_delay = 1 # second

for attempt in range(max_retries):
try:
current_inode = self.get_inode()
if current_inode != self.last_inode:
logger.info(f"Log file rotated. New inode detected: {current_inode}")
self.last_position = 0 # Reset position to start of the new file
self.last_inode = current_inode

if not os.path.exists(self.log_file_path):
logger.warning(f"Log file does not exist: {self.log_file_path}")
raise FileNotFoundError(f"Log file not found: {self.log_file_path}")

with open(self.log_file_path, 'r') as log_file:
log_file.seek(self.last_position)
lines = log_file.readlines()
logger.debug(f"Processing {len(lines)} new lines")
for line in lines:
if line.strip():
try:
data = orjson.loads(line)
self.metrics_collector.update_metrics(data)
except orjson.JSONDecodeError:
logger.error(f"Error decoding JSON: {line}")
self.last_position = log_file.tell()
self.health_status = True
break # Successfully processed, exit the retry loop
except FileNotFoundError:
if attempt < max_retries - 1:
logger.warning(f"Log file not found. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logger.error("Max retries reached. Unable to process log file.")
self.health_status = False
except Exception as e:
logger.error(f"Error processing log file: {e}")
self.health_status = False
break # Exit the retry loop for other types of exceptions

def is_ready(self):
return self.is_initialized or time.time() - self.start_time < self.max_wait_time
try:
with open(self.log_file_path, 'r') as log_file:
log_file.seek(self.last_position)
lines = log_file.readlines()
logger.debug(f"Processing {len(lines)} new lines")
for line in lines:
if line.strip():
try:
data = orjson.loads(line)
self.metrics_collector.update_metrics(data)
except orjson.JSONDecodeError:
logger.error(f"Error decoding JSON: {line}")
self.last_position = log_file.tell()
self.health_status = True
except FileNotFoundError:
logger.error(f"Log file not found: {self.log_file_path}")
self.file_exists = False
self.health_status = False
except Exception as e:
logger.error(f"Error processing log file: {e}")
self.health_status = False

def is_healthy(self):
return self.health_status

class HealthServer:
def __init__(self, log_handler):
self.log_handler = log_handler
self.server_ready = False

def set_ready(self):
self.server_ready = True

def livez(self, environ, start_response):
status = '200 OK' if self.log_handler.is_healthy() else '503 Service Unavailable'
Expand All @@ -259,7 +209,7 @@ def livez(self, environ, start_response):
return [b"Alive" if status == '200 OK' else b"Unhealthy"]

def readyz(self, environ, start_response):
status = '200 OK' if self.log_handler.is_ready() else '503 Service Unavailable'
status = '200 OK' if self.server_ready else '503 Service Unavailable'
headers = [('Content-type', 'text/plain; charset=utf-8')]
start_response(status, headers)
return [b"Ready" if status == '200 OK' else b"Not Ready"]
Expand Down Expand Up @@ -289,20 +239,26 @@ def graceful_shutdown(signum, frame):
log_handler = LogHandler(log_file_path, metrics_collector)
health_server = HealthServer(log_handler)

combined_server = start_combined_server(metrics_port, health_server)
try:
combined_server = start_combined_server(metrics_port, health_server)

if log_handler.wait_for_log_file():
log_handler.initial_load()
observer = Observer()
observer.schedule(log_handler, path=log_file_path, recursive=False)
observer.schedule(log_handler, path=os.path.dirname(log_file_path), recursive=False)
observer.start()

health_server.set_ready()
except Exception as e:
logger.error(f"Failed to start exporter: {e}")
sys.exit(1)

signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)

logger.info(f"Exporter is ready. Monitoring log file: {log_file_path}")

try:
while True:
time.sleep(update_interval)
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
Expand Down

0 comments on commit 0e8116a

Please sign in to comment.