Skip to content

Commit d0444eb

Browse files
committed
Netflow writer: write data to TimescaleDB table
1 parent b1c074c commit d0444eb

File tree

3 files changed

+39
-32
lines changed

3 files changed

+39
-32
lines changed

dbutils.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,29 @@ def migration_step_5():
164164
""" We want to save IPv6 addresses too """
165165
with get_db_cursor() as c:
166166
c.execute(f"ALTER TABLE {DB_PREFIX}flows RENAME COLUMN ipv4_dst_addr TO ipvX_dst_addr;")
167-
c.execute(f"ALTER TABLE {DB_PREFIX}flows RENAME COLUMN ipv4_src_addr TO ipvX_src_addr;")
167+
c.execute(f"ALTER TABLE {DB_PREFIX}flows RENAME COLUMN ipv4_src_addr TO ipvX_src_addr;")
168+
169+
def migration_step_6():
170+
""" Create a new table for flows using TimescaleDB.
171+
- we use a different name to avoid converting old data; the conversion is too slow to be done on existing
172+
installations without manual work. For those we know about we can perform this operation manually.
173+
- table is *not* unlogged (it is using WAL). If insert performance becomes an issue, we can change this later
174+
using: "ALTER TABLE netflow_flows2 SET UNLOGGED;" (but at the moment we don't know that yet)
175+
"""
176+
with get_db_cursor() as c:
177+
c.execute(f"""
178+
CREATE TABLE {DB_PREFIX}flows2 (
179+
ts TIMESTAMP NOT NULL,
180+
client_ip INET NOT NULL,
181+
in_bytes BIGINT NOT NULL,
182+
protocol SMALLINT NOT NULL,
183+
direction SMALLINT NOT NULL,
184+
l4_dst_port INTEGER NOT NULL,
185+
l4_src_port INTEGER NOT NULL,
186+
input_snmp SMALLINT NOT NULL,
187+
output_snmp SMALLINT NOT NULL,
188+
ipvX_dst_addr INET NOT NULL,
189+
ipvX_src_addr INET NOT NULL
190+
);
191+
""")
192+
c.execute(f"SELECT create_hypertable('{DB_PREFIX}flows2', 'ts');")

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ services:
4343

4444

4545
db:
46-
image: postgres:12.1-alpine
46+
image: timescale/timescaledb:latest-pg12
4747
container_name: grafolean-netflow-db
4848
volumes:
4949
# You should always save DB data to a host directory unless you are prepared to lose it. By default

netflowwriter.py

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from colors import color
1919

2020
from lookup import PROTOCOLS
21-
from dbutils import migrate_if_needed, get_db_cursor, DB_PREFIX, S_PER_PARTITION
21+
from dbutils import migrate_if_needed, get_db_cursor, DB_PREFIX
2222
from lookup import DIRECTION_INGRESS
2323

2424

@@ -41,13 +41,18 @@
4141
# 11-byte signature (constructed in this way to detect possible mangled bytes), flags, header extension
4242
# https://www.postgresql.org/docs/9.0/sql-copy.html#AEN59377
4343
PG_COPYFROM_INIT = struct.pack('!11sII', b'PGCOPY\n\377\r\n\0', 0, 0)
44+
# "To determine the appropriate binary format for the actual tuple data you should consult the PostgreSQL
45+
# source, in particular the *send and *recv functions for each column's data type (typically these functions
46+
# are found in the src/backend/utils/adt/ directory of the source distribution)."
4447
# 4-byte INETv4/v6 prefix: family, netmask, is_cidr, n bytes
4548
# https://doxygen.postgresql.org/network_8c_source.html#l00193
4649
IPV4_ADDRESS_PREFIX = struct.pack('!BBBB', socket.AF_INET, 32, 0, 4)
4750
# Gotcha: IPv6 address family in Postgres is *not* socket.AF_INET6 (10),
4851
# instead it is defined as socket.AF_INET + 1 (2 + 1 == 3)
4952
# https://doxygen.postgresql.org/utils_2inet_8h_source.html#l00040
5053
IPV6_ADDRESS_PREFIX = struct.pack('!BBBB', socket.AF_INET + 1, 128, 0, 16)
54+
# Timestamp is encoded as signed number of microseconds from PG epoch
55+
PG_EPOCH_TIMESTAMP = 946684800 # 2020-01-01T00:00:00Z
5156

5257

5358
def _pgwriter_init():
@@ -57,9 +62,9 @@ def _pgwriter_init():
5762

5863

5964
def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, address_family, IPVx_DST_ADDR, IPVx_SRC_ADDR):
60-
buf = struct.pack('!HiIi4s4siQiHiHiIiIiHiH',
65+
buf = struct.pack('!Hiqi4s4siQiHiHiIiIiHiH',
6166
11, # number of columns
62-
4, int(ts), # integer - beware of Y2038 problem! :)
67+
8, int(1000000 * (ts - PG_EPOCH_TIMESTAMP)), # https://doxygen.postgresql.org/backend_2utils_2adt_2timestamp_8c_source.html#l00228
6368
8, IPV4_ADDRESS_PREFIX, socket.inet_aton(client_ip), # 4 bytes prefix + 4 bytes IP
6469
8, IN_BYTES, # bigint
6570
2, PROTOCOL,
@@ -86,7 +91,7 @@ def _pgwriter_finish(pgwriter):
8691
with get_db_cursor() as c:
8792
pgwriter.write(struct.pack('!h', -1))
8893
pgwriter.seek(0)
89-
c.copy_expert(f"COPY {DB_PREFIX}flows FROM STDIN WITH BINARY", pgwriter)
94+
c.copy_expert(f"COPY {DB_PREFIX}flows2 FROM STDIN WITH BINARY", pgwriter)
9095

9196

9297
def process_named_pipe(named_pipe_filename):
@@ -98,7 +103,6 @@ def process_named_pipe(named_pipe_filename):
98103

99104
templates = {}
100105
last_record_seqs = {}
101-
last_partition_no = None
102106
buffer = [] # we merge together writes to DB
103107
known_exporters = set()
104108
MAX_BUFFER_SIZE = 5
@@ -122,14 +126,6 @@ def process_named_pipe(named_pipe_filename):
122126
known_exporters.add(client_ip)
123127
log.warning(f"[{client_ip}] New exporter!")
124128

125-
# sequence number of the (24h) day from UNIX epoch helps us determine the
126-
# DB partition we are working with:
127-
partition_no = int(ts // S_PER_PARTITION)
128-
if partition_no != last_partition_no:
129-
write_buffer(buffer, last_partition_no)
130-
ensure_flow_table_partition_exists(partition_no)
131-
last_partition_no = partition_no
132-
133129
try:
134130
export = parse_packet(data, templates)
135131
log.debug(f"[{client_ip}] Received record [{export.header.sequence}]: {datetime.utcfromtimestamp(ts)}")
@@ -145,7 +141,7 @@ def process_named_pipe(named_pipe_filename):
145141
# append the record to a buffer and write to DB when buffer is full enough:
146142
buffer.append((ts, client_ip, export,))
147143
if len(buffer) > MAX_BUFFER_SIZE:
148-
write_buffer(buffer, partition_no)
144+
write_buffer(buffer)
149145
buffer = []
150146
except UnknownNetFlowVersion:
151147
log.warning("Unknown NetFlow version")
@@ -158,26 +154,12 @@ def process_named_pipe(named_pipe_filename):
158154
log.exception("Error writing line, skipping...")
159155

160156

161-
# Based on timestamp, make sure that the partition exists:
162-
def ensure_flow_table_partition_exists(partition_no):
163-
ts_start = partition_no * S_PER_PARTITION
164-
ts_end = ts_start + S_PER_PARTITION
165-
with get_db_cursor() as c:
166-
# "When creating a range partition, the lower bound specified with FROM is an inclusive bound, whereas
167-
# the upper bound specified with TO is an exclusive bound."
168-
# PARTITION OF: "Any indexes, constraints and user-defined row-level triggers that exist in the parent
169-
# table are cloned on the new partition."
170-
# https://www.postgresql.org/docs/12/sql-createtable.html
171-
c.execute(f"CREATE UNLOGGED TABLE IF NOT EXISTS {DB_PREFIX}flows_{partition_no} PARTITION OF {DB_PREFIX}flows FOR VALUES FROM ({ts_start}) TO ({ts_end})")
172-
return partition_no
173-
174-
175157
def ensure_exporter(client_ip):
176158
with get_db_cursor() as c:
177159
c.execute(f"INSERT INTO {DB_PREFIX}exporters (ip) VALUES (%s) ON CONFLICT DO NOTHING;", (client_ip,))
178160

179161

180-
def write_buffer(buffer, partition_no):
162+
def write_buffer(buffer):
181163
# {
182164
# "DST_AS": 0,
183165
# "SRC_AS": 0,
@@ -204,7 +186,7 @@ def write_buffer(buffer, partition_no):
204186
# https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
205187

206188

207-
log.debug(f"Writing {len(buffer)} records to DB, partition {partition_no}")
189+
log.debug(f"Writing {len(buffer)} records to DB")
208190
# save each of the flows within the record, but use execute_values() to perform bulk insert:
209191
def _get_data(buffer):
210192
for ts, client_ip, export in buffer:

0 commit comments

Comments
 (0)