PCAP data is low-level data about network activity. PCAP data can be created from Wireshark and its command line counterpart tshark.
Using PCAP data, you can identify and monitor the behavior of applications and users on devices. This recipe is an example of how to filter PCAP data by MAC address. The MAC address on which to filter is derived from tables of employees and their computers. Employees with a "watch" flag against them will have their associated computer’s MAC address used in the filter of the PCAP data.
The resulting filtered stream of network activity can be used for monitoring and aggregated reports.
-
Docker
-
If running on Mac/Windows, at least 4 GB allocated to Docker:
docker system info | grep Memory
It should return a value greater than 8 GB—if not, the Kafka stack will probably not work.
Minimum version is Confluent Platform 5.0.1
-
Clone this repository:
git clone https://github.com/confluentinc/ksql-recipes-try-it-at-home.git
-
Launch:
cd ksql-recipes-try-it-at-home/monitoring-network-activity docker-compose up -d
-
Run KSQL CLI:
docker-compose exec ksql-cli ksql http://ksql-server:8088
-
Create a table on the
employees
Kafka topic:CREATE TABLE employee_t \ (key VARCHAR, \ name VARCHAR, \ watch_flag VARCHAR) \ WITH (KAFKA_TOPIC ='employees', \ VALUE_FORMAT ='JSON', \ KEY='key');
-
Create a table on the
computers
Kafka topic:CREATE TABLE computer_t \ (comp_id INT, \ empkey VARCHAR, \ macaddr VARCHAR) \ WITH (KAFKA_TOPIC='computers', \ VALUE_FORMAT='JSON', \ KEY='empkey');
-
Change a couple of system settings:
-
Any processing is to be done on the existing contents of a Kafka topic, not just new records
-
When creating target topic, use one partition instead of the default four
SET 'auto.offset.reset' = 'earliest'; SET 'ksql.sink.partitions'='1';
-
-
Create a new table showing the computer details for each employee who has been put on a "watch list."
Note that table-table joins are currently 1:1, not 1:N.
CREATE TABLE COMP_WATCH_BY_EMP_ID_T \ WITH (VALUE_FORMAT='AVRO') AS \ SELECT c.empkey AS EMP_ID, \ e.NAME AS EMP_NAME, \ e.key AS EMP_KEY, \ c.macaddr AS MACADDR \ FROM computer_t c \ INNER JOIN employee_t e \ ON c.empkey = e.key \ WHERE e.watch_flag='YES';
Verify that data is flowing to the new
COMP_WATCH_BY_EMP_ID_T
table:SELECT * FROM COMP_WATCH_BY_EMP_ID_T LIMIT 1;
-
We will be joining the
COMP_WATCH_T
table to a stream of events using the common key ofMACADDR
. The table currently is keyed on the key that was being used in the join—employee ID. We need to re-key the data onMACADDR
instead.CREATE STREAM COMP_WATCH_BY_EMP_ID_S WITH (VALUE_FORMAT='AVRO',KAFKA_TOPIC='COMP_WATCH_BY_EMP_ID_T'); CREATE STREAM COMP_WATCH_BY_MACADDR_S AS \ SELECT * FROM COMP_WATCH_BY_EMP_ID_S \ PARTITION BY MACADDR;
Verify that data is flowing to the new
COMP_WATCH_BY_MACADDR_S
stream:SELECT * FROM COMP_WATCH_BY_MACADDR_S LIMIT 1;
CREATE TABLE COMP_WATCH_BY_MACADDR_T WITH (VALUE_FORMAT='AVRO', \ KEY='MACADDR',\ KAFKA_TOPIC='COMP_WATCH_BY_MACADDR_S');
-
Register a KSQL stream on the
pcap
Kafka topic. The schema has many columns; only a subset are declared here. This is a valid approach to take with KSQL, and it will just ignore columns that are not declared.CREATE STREAM PCAP (_index VARCHAR, \ _type VARCHAR, \ _score VARCHAR, \ _source STRUCT< \ layers STRUCT< \ frame STRUCT< \ encap_type VARCHAR, \ time VARCHAR, \ offset_shift VARCHAR, \ time_epoch VARCHAR, \ time_delta VARCHAR, \ time_delta_displayed VARCHAR, \ time_relative VARCHAR, \ number VARCHAR, \ len VARCHAR, \ cap_len VARCHAR, \ marked VARCHAR, \ ignored VARCHAR, \ protocols VARCHAR>, \ eth STRUCT< \ dst VARCHAR, \ dst_tree STRUCT< \ dst_resolved VARCHAR, \ addr VARCHAR, \ addr_resolved VARCHAR, \ lg VARCHAR, \ ig VARCHAR>, \ src VARCHAR, \ src_tree STRUCT< \ src_resolved VARCHAR, \ addr VARCHAR, \ addr_resolved VARCHAR, \ lg VARCHAR, \ ig VARCHAR>, \ type VARCHAR>, \ ip STRUCT< \ version VARCHAR, \ hdr_len VARCHAR, \ dsfield VARCHAR, \ dsfield_tree STRUCT< \ dscp VARCHAR, \ ecn VARCHAR>, \ len VARCHAR, \ id VARCHAR, \ flags VARCHAR, \ flags_tree STRUCT< \ rb VARCHAR, \ df VARCHAR, \ mf VARCHAR, \ frag_offset VARCHAR>, \ ttl VARCHAR, \ proto VARCHAR, \ checksum VARCHAR, \ status VARCHAR, \ src VARCHAR, \ addr VARCHAR, \ src_host VARCHAR, \ host VARCHAR, \ dst VARCHAR, \ dst_host VARCHAR>, \ udp STRUCT< \ srcport VARCHAR, \ dstport VARCHAR, \ port VARCHAR, \ length VARCHAR, \ checksum VARCHAR, \ status VARCHAR, \ stream_ VARCHAR>, \ bootp STRUCT< \ bootp_type VARCHAR, \ bootp_len VARCHAR, \ bootp_hops VARCHAR, \ bootp_id VARCHAR, \ bootp_secs VARCHAR, \ bootp_flags VARCHAR, \ bootp_flags_tree STRUCT< \ bootp_bc VARCHAR, \ bootp_reserved VARCHAR>, \ bootp_client VARCHAR, \ bootp_your VARCHAR, \ bootp_server VARCHAR, \ bootp_relay VARCHAR, \ bootp_mac_addr VARCHAR, \ bootp_addr_padding VARCHAR, \ bootp_file VARCHAR, \ bootp_dhcp VARCHAR, \ bootp_cookie VARCHAR, \ bootp_option_type VARCHAR, \ bootp_option_type_tree STRUCT< \ bootp_option_end VARCHAR>, \ bootp_option_padding VARCHAR> \ > \ > ) \ WITH (KAFKA_TOPIC='pcap', \ VALUE_FORMAT='JSON');
-
Flatten the PCAP data, using just the particular fields of interest:
CREATE STREAM PCAP_FLAT WITH (VALUE_FORMAT='AVRO') AS \ SELECT _SOURCE -> LAYERS -> FRAME -> TIME AS FRAME_TIME, \ _SOURCE -> LAYERS -> FRAME -> LEN AS FRAME_LEN, \ _SOURCE -> LAYERS -> FRAME -> PROTOCOLS AS FRAME_PROTOCOLS, \ _SOURCE -> LAYERS -> ETH -> SRC AS ETH_SRC, \ _SOURCE -> LAYERS -> ETH -> DST AS ETH_DST, \ _SOURCE -> LAYERS -> IP -> SRC AS IP_SRC, \ _SOURCE -> LAYERS -> IP -> DST AS IP_DST, \ _SOURCE -> LAYERS -> IP -> SRC_HOST AS IP_SRC_HOST,\ _SOURCE -> LAYERS -> IP -> DST_HOST AS IP_DST_HOST,\ _SOURCE -> LAYERS -> IP -> PROTO AS IP_PROTO \ FROM PCAP;
-
Join the stream of PCAP data to the table of employees on the
watch list
:CREATE STREAM WATCHED_EMP_NETWORK_TRAFFIC AS \ SELECT * \ FROM PCAP_FLAT P \ INNER JOIN COMP_WATCH_BY_MACADDR_T C \ ON P.ETH_SRC = C.MACADDR;
-
View the joined stream of data:
ksql> SELECT C_EMP_NAME, P_FRAME_PROTOCOLS, P_IP_DST FROM WATCHED_EMP_NETWORK_TRAFFIC; Tom Jones | eth:ethertype:ip:udp:bootp | 255.255.255.255 Tom Jones | eth:ethertype:ip:udp:bootp | 255.255.255.255 Tom Jones | eth:ethertype:arp | null Tom Jones | eth:ethertype:ip:tcp | 86.66.0.227 Tom Jones | eth:ethertype:ip:icmp:data | 86.64.145.29 Tom Jones | eth:ethertype:ip:tcp | 86.66.0.227 Tom Jones | eth:ethertype:ip:tcp:http | 86.66.0.227 Tom Jones | eth:ethertype:ip:tcp | 86.66.0.227
-
Optionally, filter the data further:
ksql> SELECT C_EMP_NAME, P_FRAME_PROTOCOLS, P_IP_DST FROM WATCHED_EMP_NETWORK_TRAFFIC WHERE P_FRAME_PROTOCOLS LIKE '%http'; Tom Jones | eth:ethertype:ip:tcp:http | 86.66.0.227 Tom Jones | eth:ethertype:ip:tcp:http | 17.252.60.23 Tom Jones | eth:ethertype:ip:tcp:http | 10.5.60.53
Press Ctrl-C to cancel the
SELECT
statement.