Amazon's VPC Flow Logs are analagous to NetFlow and IPFIX logs, and can be used for security and performance analysis. Observable Networks uses VPC Flow logs as an input to endpoint modeling for security monitoring.
This project contains a Python library that makes retrieving VPC Flow Logs from Amazon CloudWatch Logs a bit easier. The library provides:
- A data structure that parses the Flow Log records into easily-used Python objects.
- A utility that makes iterating over all the Flow Log records in a log group very simple.
The library builds on boto3 and should work on both Python 2.7 and 3.4+.
For information on VPC Flow Logs and how to enable them see this post at the AWS blog. You may use this library with the kinesis-logs-reader library when retrieving VPC flow logs from Amazon Kinesis.
You can get flowlogs_reader
by using pip
pip install flowlogs_reader
Or if you want to install from source and/or contribute you can clone from GitHub:
git clone
cd flowlogs-reader
python develop
provides a command line interface called flowlogs_reader
that allows you to print VPC Flow Log records to your screen.
It assumes your AWS credentials are available through environment variables, a boto configuration file, or through IAM metadata.
Some example uses are below.
Printing flows
The default action is to print
flows. You may also specify the ipset
, findip
, and aggregate
flowlogs_reader flowlog_group
- print all flows in the past hourflowlogs_reader flowlog_group print 10
- print the first 10 flows from the past hourflowlogs_reader flowlog_group ipset
- print the unique IPs seen in the past hourflowlogs_reader flowlog_group findip
- print all flows involving flowlog_group findip aggregate
- aggregate the flows by 5-tuple, then print them as a tab-separated stream (with a header)
You may combine the output of flowlogs_reader
with other command line utilities:
flowlogs_reader flowlog_group | grep REJECT
- print allREJECT
ed Flow Log recordsflowlogs_reader flowlog_group | awk '$6 = 443'
- print all traffic from port 443
Time windows
The default time window is the last hour. You may also specify a --start-time
and/or an --end-time
. The -s
and -e
switches may be used also:
flowlogs_reader --start-time='2015-08-13 00:00:00' flowlog_group
flowlogs_reader --end-time='2015-08-14 00:00:00' flowlog_group
flowlogs_reader --start-time='2015-08-13 01:00:00' --end-time='2015-08-14 02:00:00' flowlog_group
Use the --time-format
switch to control how start and end times are interpreted. The default is '%Y-%m-%d %H:%M:%S'
. See the Python documentation for strptime
for information on format strings.
AWS options
Other command line switches:
flowlogs_reader --region='us-west-2' flowlog_group
- connect to the given AWS regionflowlogs_reader --profile='dev_profile' flowlog_group
- use the profile from your local AWS configuration file to specify credentials and regionsflowlogs_reader --filter-pattern='REJECT' flowlog_group
- use the given filter pattern to have the server limit the outputflowlogs_reader --role-arn='arn:aws:iam::12345678901:role/myrole' --external-id='0a1b2c3d' flowlog_group
- use the given role and external ID to connect to a 3rd party's account usingsts assume-role
takes an event
dictionary retrieved from a log stream. It parses the message
in the event, which takes a record like this:
2 123456789010 eni-102010ab 443 49152 6 10 840 1439387263 1439387264 ACCEPT OK
And turns it into a Python object like this:
>>> flow_record.srcaddr
>>> flow_record.dstaddr
>>> flow_record.srcport
>>> flow_record.to_dict()
{'account_id': '123456789010',
'action': 'ACCEPT',
'bytes': 840,
'dstaddr': '',
'dstport': 49152,
'end': datetime.datetime(2015, 8, 12, 13, 47, 44),
'interface_id': 'eni-102010ab',
'log_status': 'OK',
'packets': 10,
'protocol': 6,
'srcaddr': '',
'srcport': 443,
'start': datetime.datetime(2015, 8, 12, 13, 47, 43),
'version': 2}
You may use the FlowRecord.from_message(...)
constructor if you have a line of log text instead of an event dictionary.
takes the name of a log group and can then yield all the Flow Log records from that group.
>>> from flowlogs_reader import FlowLogsReader
... flow_log_reader = FlowLogsReader('flowlog_group')
... records = list(flow_log_reader)
... print(len(records))
By default it will retrieve records from log streams that were ingested in the last hour, and yield records from those log streams in that same time window.
You can control what's retrieved with these parameters:
are Pythondatetime.datetime
is a string likeREJECT
used to filter the logs. See the examples below.region_name
is a string like'us-east-1'
. This will be used to create a boto3 Session object.profile_name
is a string like'my-profile'
is a dictionary of parameters to pass when creating the boto3 client.boto_client
is a boto3 client object. This takes overridesregion_name
, andboto_client_kwargs
Start by importing FlowLogsReader
from flowlogs_reader import FlowLogsReader
Find all of the IP addresses communicating inside the VPC:
ip_set = set()
for record in FlowLogsReader('flowlog_group'):
See all of the traffic for one IP address:
target_ip = ''
records = []
for record in FlowLogsReader('flowlog_group'):
if (record.srcaddr == target_ip) or (record.dstaddr == target_ip):
Loop through a few preconfigured profiles and collect all of the IP addresses:
ip_set = set()
profile_names = ['profile1', 'profile2']
for profile_name in profile_names:
for record in FlowLogsReader('flowlog_group', profile_name=profile_name):
Apply a filter for UDP traffic that was logged normally.
'[version="2", account_id, interface_id, srcaddr, dstaddr, '
'srcport, dstport, protocol="17", packets, bytes, '
'start, end, action, log_status="OK"]'
flow_log_reader = FlowLogsReader('flowlog_group', filter_pattern=FILTER_PATTERN)
records = list(flow_log_reader)
You may aggregate records with the aggregate_records
Pass in a FlowLogsReader
object and optionally a key_fields
Python dict
objects will be yielded representing the aggregated flow records.
By default the typical ('srcaddr', 'dstaddr', 'srcport', 'dstport', 'protocol')
will be used.
The start
, end
, packets
, and bytes
items will be aggregated.
flow_log_reader = FlowLogsReader('flowlog_group')
key_fields = ('srcaddr', 'dstaddr')
records = list(aggregated_records(flow_log_reader, key_fields=key_fields))