|
14 | 14 | import requests
|
15 | 15 |
|
16 | 16 | from grafoleancollector import Collector, send_results_to_grafolean
|
17 |
| -from dbutils import get_db_cursor, DB_PREFIX, S_PER_PARTITION, LEAVE_N_PAST_PARTITIONS |
| 17 | +from dbutils import get_db_cursor, DB_PREFIX, LEAVE_N_PAST_DAYS |
18 | 18 | from lookup import PROTOCOLS, DIRECTION_INGRESS, DIRECTION_EGRESS
|
19 | 19 |
|
20 | 20 | logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
|
@@ -76,26 +76,11 @@ def _save_current_max_ts(job_id, max_ts):
|
76 | 76 | c.execute(f"INSERT INTO {DB_PREFIX}bot_jobs (job_id, last_used_ts) VALUES (%s, %s) ON CONFLICT (job_id) DO UPDATE SET last_used_ts = %s;", (job_id, max_ts, max_ts))
|
77 | 77 |
|
78 | 78 |
|
79 |
| -def job_maint_remove_old_partitions(*args, **kwargs): |
| 79 | +def job_maint_remove_old_data(*args, **kwargs): |
| 80 | + log.info("MAINT: Maintenance started - removing old data") |
80 | 81 | with get_db_cursor() as c:
|
81 |
| - log.info("MAINT: Maintenance started - removing old partitions") |
82 |
| - today_seq = int(time.time() // S_PER_PARTITION) |
83 |
| - c.execute(f"SELECT tablename FROM pg_tables WHERE schemaname = 'public' AND tablename LIKE '{DB_PREFIX}flows_%';") |
84 |
| - for tablename, in c.fetchall(): |
85 |
| - m = re.match(f'^{DB_PREFIX}flows_([0-9]+)$', tablename) |
86 |
| - if not m: |
87 |
| - log.warning(f"MAINT: Table {tablename} does not match regex, skipping") |
88 |
| - continue |
89 |
| - day_seq = int(m.group(1)) |
90 |
| - if day_seq > today_seq: |
91 |
| - log.warning(f"MAINT: CAREFUL! Table {tablename} marks a future day (today is {today_seq}); this should never happen! Skipping.") |
92 |
| - continue |
93 |
| - if day_seq < today_seq - LEAVE_N_PAST_PARTITIONS: |
94 |
| - log.info(f"MAINT: Removing old data: {tablename} (today is {today_seq})") |
95 |
| - c.execute(f"DROP TABLE {tablename};") |
96 |
| - else: |
97 |
| - log.info(f"MAINT: Leaving {tablename} (today is {today_seq})") |
98 |
| - log.info("MAINT: Maintenance finished (removing old partitions).") |
| 82 | + c.execute(f"SELECT drop_chunks(INTERVAL '{LEAVE_N_PAST_DAYS} days', '{DB_PREFIX}flows2');") |
| 83 | + log.info("MAINT: Maintenance finished (removing old data).") |
99 | 84 |
|
100 | 85 |
|
101 | 86 | def job_maint_suggest_entities(*args, **job_params):
|
@@ -149,9 +134,9 @@ def job_maint_suggest_entities(*args, **job_params):
|
149 | 134 | class NetFlowBot(Collector):
|
150 | 135 |
|
151 | 136 | def jobs(self):
|
152 |
| - # remove old partitions: |
| 137 | + # remove old data: |
153 | 138 | job_id = 'maint/remove_old_data'
|
154 |
| - yield job_id, [3600], job_maint_remove_old_partitions, {}, 50 |
| 139 | + yield job_id, [3600], job_maint_remove_old_data, {}, 50 |
155 | 140 |
|
156 | 141 | # suggest new netflow exporters / entities:
|
157 | 142 | job_id = f'maint/suggest_entities'
|
|
0 commit comments