KSQL can be used to process and enrich security events in real time, providing extra context for security analysts during investigations. Sysmon is a free tool for Windows endpoints that allows the collection of valuable security events that have a direct relationship with each other via a property called the ProcessGUID
. Join statements can be performed via KSQL queries in real time.
In this recipe, we will show you how to join two specific Sysmon events in order to expedite the analysis of data and provide more context to the data collected while hunting for specific adversarial techniques.
-
Docker
-
If running on Mac/Windows, allocate at least 4 GB to Docker:
docker system info | grep Memory
It should return a value greater than 8 GB - if not, the Apache Kafka<sup>®</sup> stack will probably not work.
Minimum version: Confluent Platform 5.0
-
Clone this repository:
git clone https://github.com/confluentinc/ksql-recipes-try-it-at-home.git
-
Launch:
cd ksql-recipes-try-it-at-home/sysmon-event-processing docker-compose up -d
-
Run KSQL CLI:
docker-compose exec ksql-cli ksql http://ksql-server:8088
-
Configure KSQL to process all data that exists in a topic (and not just from the current point onwards, which is the default):
ksql> SET 'auto.offset.reset' = 'earliest';
-
Register the Kafka topic that contains Sysmon event logs as a stream and name it
WINLOGBEAT_STREAM
. In this case, the available Kafka topic was namedwinlogbeat
. Remember that we are performing a join between Sysmon events 1 and 3, and the data is in JSON format. Therefore, we only need to specify the column names of the two Sysmon events (1 and 3) and define nested fields via the STRUCT data type.ksql> CREATE STREAM WINLOGBEAT_STREAM \ (source_name VARCHAR, \ type VARCHAR, \ task VARCHAR, \ log_name VARCHAR, \ computer_name VARCHAR, \ event_data STRUCT< \ UtcTime VARCHAR, \ ProcessGuid VARCHAR, \ ProcessId INTEGER, \ Image VARCHAR, \ FileVersion VARCHAR, \ Description VARCHAR, \ Product VARCHAR, \ Company VARCHAR, \ CommandLine VARCHAR, \ CurrentDirectory VARCHAR, \ User VARCHAR, \ LogonGuid VARCHAR, \ LogonId VARCHAR, \ TerminalSessionId INTEGER, \ IntegrityLevel VARCHAR, \ Hashes VARCHAR, \ ParentProcessGuid VARCHAR, \ ParentProcessId INTEGER, \ ParentImage VARCHAR, \ ParentCommandLine VARCHAR, \ Protocol VARCHAR, \ Initiated VARCHAR, \ SourceIsIpv6 VARCHAR, \ SourceIp VARCHAR, \ SourceHostname VARCHAR, \ SourcePort INTEGER, \ SourcePortName VARCHAR, \ DestinationIsIpv6 VARCHAR, \ DestinationIp VARCHAR, \ DestinationHostname VARCHAR, \ DestinationPort INTEGER, \ DestinationPortName VARCHAR>, \ event_id INTEGER) \ WITH (KAFKA_TOPIC='winlogbeat', VALUE_FORMAT='JSON');
Optionally, query a sample of the data:
ksql> SELECT SOURCE_NAME , COMPUTER_NAME , EVENT_DATA->PARENTIMAGE, EVENT_DATA->PRODUCT \ FROM WINLOGBEAT_STREAM \ WHERE EVENT_ID =1 OR EVENT_ID = 3 \ LIMIT 2; Microsoft-Windows-Sysmon | MSEDGEWIN10 | C:\Windows\System32\svchost.exe | Microsoft® Windows® Operating System Microsoft-Windows-Sysmon | MSEDGEWIN10 | C:\Windows\System32\wuauclt.exe | Microsoft Malware Protection
-
Rekey the
WINLOGBEAT_STREAM
stream so that the key is correct for subsequent join operations. The rekey is on theprocess_guid
field since that’s the common property with a unique value between Sysmon events 1 and 3. It’s done using thePARTITION BY
command.Note that this operation also flattens the nested
event_data
field.ksql> CREATE STREAM WINLOGBEAT_STREAM_REKEY \ WITH (VALUE_FORMAT='JSON', \ PARTITIONS=1, \ TIMESTAMP='event_date_creation') \ AS SELECT \ STRINGTOTIMESTAMP(event_data->UtcTime, 'yyyy-MM-dd HH:mm:ss.SSS') AS event_date_creation, \ event_data->ProcessGuid AS process_guid, \ event_data->ProcessId AS process_id, \ event_data->Image AS process_path, \ event_data->FileVersion AS file_version, \ event_data->Description AS file_description, \ event_data->Company AS file_company, \ event_data->CommandLine AS process_command_line, \ event_data->CurrentDirectory AS process_current_directory, \ event_data->User AS user_account, \ event_data->LogonGuid AS user_logon_guid, \ event_data->LogonId AS user_logon_id, \ event_data->TerminalSessionId AS user_session_id, \ event_data->IntegrityLevel AS process_integrity_level, \ event_data->Hashes AS hashes, \ event_data->ParentProcessGuid AS parent_process_guid, \ event_data->ParentProcessId AS parent_process_id, \ event_data->ParentImage AS parent_process_path, \ event_data->ParentCommandLine AS parent_process_command_line, \ event_data->Protocol AS network_protocol, \ event_data->Initiated AS network_connection_initiated, \ event_data->SourceIsIpv6 AS src_is_ipv6, \ event_data->SourceIp AS src_ip_addr, \ event_data->SourceHostname AS src_host_name, \ event_data->SourcePort AS src_port, \ event_data->SourcePortName AS src_port_name, \ event_data->DestinationIsIpv6 AS dst_is_ipv6, \ event_data->DestinationIp AS dst_ip_addr, \ event_data->DestinationHostname AS dst_host_name, \ event_data->DestinationPort AS dst_port, \ event_data->DestinationPortName AS dst_port_name, \ event_id, \ source_name, \ log_name \ FROM WINLOGBEAT_STREAM \ WHERE source_name='Microsoft-Windows-Sysmon' \ PARTITION BY process_guid;
-
Using the rekeyed stream, create a stream with just the Sysmon
ProcessCreate
events (event_id=1
):ksql> CREATE STREAM SYSMON_PROCESS_CREATE \ WITH (VALUE_FORMAT='JSON', \ PARTITIONS=1, \ TIMESTAMP='event_date_creation') \ AS SELECT event_date_creation, \ process_guid, \ process_id, \ process_path, \ file_version, \ file_description, \ file_company, \ process_command_line, \ process_current_directory, \ user_account, \ user_logon_guid, \ user_logon_id, \ user_session_id, \ process_integrity_level, \ hashes, \ parent_process_guid, \ parent_process_id, \ parent_process_path, \ parent_process_command_line, \ event_id, \ source_name, \ log_name \ FROM WINLOGBEAT_STREAM_REKEY \ WHERE event_id=1;
Optionally, query a sample of this data. Note that it is just
ProcessCreate
events.ksql> SELECT PROCESS_PATH, FILE_DESCRIPTION, USER_ACCOUNT \ FROM SYSMON_PROCESS_CREATE \ LIMIT 2; C:\Windows\System32\wuauclt.exe | Windows Update | NT AUTHORITY\SYSTEM C:\Windows\SoftwareDistribution\Download\Install\AM_Engine_Patch_1.1.15500.2.exe | AntiMalware Definition Update | NT AUTHORITY\SYSTEM
-
From this derived Sysmon
ProcessCreate
stream (and underlying topic), now declare a KSQL table. We define the SysmonProcessCreate
events as a table because for each key (process_guid
), we want to know its current values (process_name, process_command_line, hashes, etc.) for when we subsequently join them withNetworkCreate
events that have the sameprocess_guid
value.ksql> CREATE TABLE SYSMON_PROCESS_CREATE_TABLE \ (event_date_creation VARCHAR, \ process_guid VARCHAR, \ process_id INTEGER, \ process_path VARCHAR, \ file_version VARCHAR, \ file_description VARCHAR, \ file_company VARCHAR, \ process_command_line VARCHAR, \ process_current_directory VARCHAR, \ user_account VARCHAR, \ user_logon_guid VARCHAR, \ user_logon_id VARCHAR, \ user_session_id INTEGER, \ process_integrity_level VARCHAR, \ hashes VARCHAR, \ parent_process_guid VARCHAR, \ parent_process_id INTEGER, \ parent_process_path VARCHAR, \ parent_process_command_line VARCHAR, \ event_id INTEGER, \ source_name VARCHAR, \ log_name VARCHAR) \ WITH (KAFKA_TOPIC='SYSMON_PROCESS_CREATE', \ VALUE_FORMAT='JSON', \ KEY='process_guid');
-
Using the previously rekeyed stream, create a stream with just the Sysmon
NetworkConnect
events (event_id=3
):ksql> CREATE STREAM SYSMON_NETWORK_CONNECT \ WITH (VALUE_FORMAT='JSON', \ PARTITIONS=1, \ TIMESTAMP='event_date_creation') \ AS SELECT event_date_creation, \ process_guid, \ process_id, \ process_path, \ user_account, \ network_protocol, \ network_connection_initiated, \ src_is_ipv6, \ src_ip_addr, \ src_host_name, \ src_port, \ src_port_name, \ dst_is_ipv6, \ dst_ip_addr, \ dst_host_name, \ dst_port, \ dst_port_name, \ event_id, \ source_name, \ log_name \ FROM WINLOGBEAT_STREAM_REKEY \ WHERE event_id=3;
Optionally, query a sample of this data. Note that it is just
NetworkConnect
events.ksql> SELECT PROCESS_PATH , SRC_HOST_NAME , DST_IP_ADDR \ FROM SYSMON_NETWORK_CONNECT \ LIMIT 2; C:\Windows\System32\svchost.exe | MSEDGEWIN10.moffatt.me | 40.77.229.141 C:\Windows\System32\svchost.exe | MSEDGEWIN10.moffatt.me | 104.103.114.93
-
Join the
NetworkConnect
event stream with the lookups against theProcessCreate
. This enriched stream is persisted to a new Kafka topic calledSYSMON_JOIN
.ksql> CREATE STREAM SYSMON_JOIN WITH (PARTITIONS=1) AS \ SELECT N.EVENT_DATE_CREATION, \ N.PROCESS_GUID, \ N.PROCESS_ID, \ N.PROCESS_PATH, \ N.USER_ACCOUNT, \ N.NETWORK_PROTOCOL, \ N.NETWORK_CONNECTION_INITIATED, \ N.SRC_IS_IPV6, \ N.SRC_IP_ADDR,\ N.SRC_HOST_NAME, \ N.SRC_PORT, \ N.SRC_PORT_NAME, \ N.DST_IS_IPV6, \ N.DST_IP_ADDR, \ N.DST_HOST_NAME, \ N.DST_PORT, \ N.DST_PORT_NAME, \ N.SOURCE_NAME, \ N.LOG_NAME,\ P.PROCESS_COMMAND_LINE, \ P.HASHES, \ P.PARENT_PROCESS_PATH, \ P.PARENT_PROCESS_COMMAND_LINE, \ P.USER_LOGON_GUID, \ P.USER_LOGON_ID, \ P.USER_SESSION_ID, \ P.PROCESS_CURRENT_DIRECTORY, \ P.PROCESS_INTEGRITY_LEVEL, \ P.PARENT_PROCESS_GUID, \ P.PARENT_PROCESS_ID \ FROM SYSMON_NETWORK_CONNECT N \ INNER JOIN SYSMON_PROCESS_CREATE_TABLE P \ ON N.PROCESS_GUID = P.PROCESS_GUID;
Optionally, query a sample of the joined data. Note that for each network event you have information about the process responsible:
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss'), SRC_HOST_NAME , DST_IP_ADDR , \ PROCESS_COMMAND_LINE , PARENT_PROCESS_COMMAND_LINE , N_USER_ACCOUNT \ FROM SYSMON_JOIN; 2019-02-13 10:58:30 | MSEDGEWIN10.moffatt.me | 40.83.74.46 | c:\windows\system32\svchost.exe -k netsvcs -p -s PushToInstall | C:\Windows\system32\services.exe | NT AUTHORITY\SYSTEM 2019-02-13 10:59:54 | MSEDGEWIN10.moffatt.me | 2.21.186.132 | c:\windows\system32\svchost.exe -k unistacksvcgroup -s WpnUserService | C:\Windows\system32\services.exe | MSEDGEWIN10\IEUser 2019-02-13 11:00:10 | MSEDGEWIN10.moffatt.me | 204.79.197.200 | "C:\Windows\system32\backgroundTaskHost.exe" -ServerName:CortanaUI.AppXy7vb4pc2dr3kc93kfc509b1d0arkfb2x.mca | C:\Windows\system32\svchost.exe 2019-02-13 11:00:10 | MSEDGEWIN10.moffatt.me | 204.79.197.200 | "C:\Windows\SystemApps\Microsoft.Windows.Cortana_cw5n1h2txyewy\SearchUI.exe" -ServerName:CortanaUI.AppXa50dqqa5gqv4a428c9y1jjw7m3btvepj.mca | 2019-02-13 11:00:27 | MSEDGEWIN10.moffatt.me | 13.107.3.128 | "C:\Program Files\WindowsApps\Microsoft.MicrosoftOfficeHub_17.10314.31700.1000_x64__8wekyb3d8bbwe\Office16\OfficeHubTaskHost.exe" -ServerName:M 2019-02-13 11:00:42 | MSEDGEWIN10.moffatt.me | 2.21.186.81 | "C:\Users\IEUser\AppData\Local\Microsoft\OneDrive\OneDrive.exe" /background | C:\Windows\Explorer.EXE | MSEDGEWIN10\IEUser