diff --git a/decoder/common.go b/decoder/common.go new file mode 100644 index 000000000..01867661a --- /dev/null +++ b/decoder/common.go @@ -0,0 +1,47 @@ +package decoder + +import ( + "encoding/json" + "errors" +) + +func anyToInt(v any) (int, error) { + switch vNum := v.(type) { + case int: + return vNum, nil + case float64: + return int(vNum), nil + case json.Number: + vInt64, err := vNum.Int64() + if err != nil { + return 0, err + } + return int(vInt64), nil + default: + return 0, errors.New("value is not convertable to int") + } +} + +// atoi is allocation free ASCII number to integer conversion +func atoi(b []byte) (int, bool) { + if len(b) == 0 { + return 0, false + } + x := 0 + for _, c := range b { + if c < '0' || '9' < c { + return 0, false + } + x = x*10 + int(c) - '0' + } + return x, true +} + +func isDigit(c byte) bool { + return c >= '0' && c <= '9' +} + +func checkNumber(num []byte, minimum, maximum int) bool { + x, ok := atoi(num) + return ok && x >= minimum && x <= maximum +} diff --git a/decoder/decoder.go b/decoder/decoder.go index 225594e76..9092b6d6d 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -13,6 +13,8 @@ const ( POSTGRES NGINX_ERROR PROTOBUF + SYSLOG_RFC3164 + SYSLOG_RFC5424 ) type Decoder interface { diff --git a/decoder/json.go b/decoder/json.go index 442f14ef8..492cd2169 100644 --- a/decoder/json.go +++ b/decoder/json.go @@ -1,7 +1,6 @@ package decoder import ( - "encoding/json" "errors" "fmt" "slices" @@ -53,13 +52,13 @@ func (d *jsonDecoder) Type() Type { return JSON } -// DecodeToJson decodes json-formatted string and merges result with root. +// DecodeToJson decodes json formatted string and merges result with root. func (d *jsonDecoder) DecodeToJson(root *insaneJSON.Root, data []byte) error { data = d.cutFieldsBySize(data) return root.DecodeBytes(data) } -// Decode decodes json-formatted string to [*insaneJSON.Node]. +// Decode decodes json formatted string to [*insaneJSON.Node]. // // Args: // - root [*insaneJSON.Root] - required @@ -141,23 +140,10 @@ func extractJsonParams(params map[string]any) (jsonParams, error) { return jsonParams{}, fmt.Errorf("%q must be map", jsonMaxFieldsSizeParam) } for k, v := range maxFieldsSizeMap { - var vInt int - - switch vNum := v.(type) { - case int: - vInt = vNum - case float64: - vInt = int(vNum) - case json.Number: - vInt64, err := vNum.Int64() - if err != nil { - return jsonParams{}, fmt.Errorf("each value in %q must be int", jsonMaxFieldsSizeParam) - } - vInt = int(vInt64) - default: + vInt, err := anyToInt(v) + if err != nil { return jsonParams{}, fmt.Errorf("each value in %q must be int", jsonMaxFieldsSizeParam) } - maxFieldsSize[k] = vInt } } diff --git a/decoder/nginx.go b/decoder/nginx.go index 78b89c298..24d8e415b 100644 --- a/decoder/nginx.go +++ b/decoder/nginx.go @@ -46,7 +46,7 @@ func (d *nginxErrorDecoder) Type() Type { return NGINX_ERROR } -// DecodeToJson decodes nginx error formatted log and merges result with root. +// DecodeToJson decodes nginx-error formatted log and merges result with root. // // From: // @@ -86,7 +86,7 @@ func (d *nginxErrorDecoder) DecodeToJson(root *insaneJSON.Root, data []byte) err return nil } -// Decode decodes nginx error formated log to [NginxErrorRow]. +// Decode decodes nginx-error formatted log to [NginxErrorRow]. // // Example of format: // diff --git a/decoder/postgres.go b/decoder/postgres.go index ae25f8823..85179c125 100644 --- a/decoder/postgres.go +++ b/decoder/postgres.go @@ -25,7 +25,7 @@ type PostgresRow struct { Log []byte } -// DecodePostgresToJson decodes postgres formatted log and merges result with event. +// DecodePostgresToJson decodes postgres formatted log and merges result with root. // // From: // @@ -42,24 +42,24 @@ type PostgresRow struct { // "user": "test_user", // "log": "listening on Unix socket \"/var/run/postgresql/.s.PGSQL.5432\"" // } -func DecodePostgresToJson(event *insaneJSON.Root, data []byte) error { +func DecodePostgresToJson(root *insaneJSON.Root, data []byte) error { row, err := DecodePostgres(data) if err != nil { return err } - event.AddFieldNoAlloc(event, "time").MutateToBytesCopy(event, row.Time) - event.AddFieldNoAlloc(event, "pid").MutateToBytesCopy(event, row.PID) - event.AddFieldNoAlloc(event, "pid_message_number").MutateToBytesCopy(event, row.PIDMessageNumber) - event.AddFieldNoAlloc(event, "client").MutateToBytesCopy(event, row.Client) - event.AddFieldNoAlloc(event, "db").MutateToBytesCopy(event, row.DB) - event.AddFieldNoAlloc(event, "user").MutateToBytesCopy(event, row.User) - event.AddFieldNoAlloc(event, "log").MutateToBytesCopy(event, row.Log) + root.AddFieldNoAlloc(root, "time").MutateToBytesCopy(root, row.Time) + root.AddFieldNoAlloc(root, "pid").MutateToBytesCopy(root, row.PID) + root.AddFieldNoAlloc(root, "pid_message_number").MutateToBytesCopy(root, row.PIDMessageNumber) + root.AddFieldNoAlloc(root, "client").MutateToBytesCopy(root, row.Client) + root.AddFieldNoAlloc(root, "db").MutateToBytesCopy(root, row.DB) + root.AddFieldNoAlloc(root, "user").MutateToBytesCopy(root, row.User) + root.AddFieldNoAlloc(root, "log").MutateToBytesCopy(root, row.Log) return nil } -// DecodePostgres decodes postgres formated log to [PostgresRow]. +// DecodePostgres decodes postgres formatted log to [PostgresRow]. // // Example of format: // diff --git a/decoder/readme.md b/decoder/readme.md index 7f9b4af2f..e3ed78df9 100644 --- a/decoder/readme.md +++ b/decoder/readme.md @@ -12,7 +12,9 @@ Available values for `decoder` param: + cri -- parses cri format from log into event (e.g. `2016-10-06T00:17:09.669794203Z stderr F log content`) + postgres -- parses postgres format from log into event (e.g. `2021-06-22 16:24:27 GMT [7291] => [3-1] client=test_client,db=test_db,user=test_user LOG: listening on Unix socket \"/var/run/postgresql/.s.PGSQL.5432\"\n`) + nginx_error -- parses nginx error log format from log into event (e.g. `2022/08/17 10:49:27 [error] 2725122#2725122: *792412315 lua udp socket read timed out, context: ngx.timer`) -+ protobuf -- parses protobuf message into event ++ protobuf -- parses protobuf message into event ++ syslog_rfc3164 -- parses syslog-RFC3164 format from log into event (see [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164)) ++ syslog_rfc5424 -- parses syslog-RFC5424 format from log into event (see [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424)) > Currently `auto` is available only for usage with k8s input plugin. @@ -71,7 +73,14 @@ To: } ``` -## Nginx decoder +## Nginx-error decoder +The resulting event may contain any of the following fields: +* `time` *string* +* `level` *string* +* `pid` *string* +* `tid` *string* +* `cid` *string* +* `message` *string* ### Params * `nginx_with_custom_fields` - if set, custom fields will be extracted. @@ -86,7 +95,7 @@ pipelines: ``` From: -`"2022/08/17 10:49:27 [error] 2725122#2725122: *792412315 lua udp socket read timed out, context: ngx.timer"` +`2022/08/17 10:49:27 [error] 2725122#2725122: *792412315 lua udp socket read timed out, context: ngx.timer` To: ```json @@ -212,4 +221,158 @@ pipelines: proto_import_paths: - path/to/proto_dir1 - path/to/proto_dir2 +``` + +## Syslog-RFC3164 decoder +The resulting event may contain any of the following fields: +* `priority` *string* +* `facility` *string* +* `severity` *string* +* `timestamp` *string* (`Stamp` format) +* `hostname` *string* +* `app_name` *string* +* `process_id` *string* +* `message` *string* + +### Params +* `syslog_facility_format` - facility format, must be one of `number|string` (`number` by default). +* `syslog_severity_format` - severity format, must be one of `number|string` (`number` by default). + +### Examples +Default decoder: +```yml +pipelines: + example: + settings: + decoder: 'syslog_rfc3164' +``` +From: + +`<34>Oct 5 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8` + +To: +```json +{ + "priority": "34", + "facility": "4", + "severity": "2", + "timestamp": "Oct 5 22:14:15", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message": "'myproc' failed on /dev/pts/8" +} +``` +--- +Decoder with `syslog_*_format` params: +```yaml +pipelines: + example: + settings: + decoder: 'syslog_rfc3164' + decoder_params: + syslog_facility_format: 'string' + syslog_severity_format: 'string' +``` +From: + +`<34>Oct 11 22:14:15 mymachine.example.com myproc: 'myproc' failed on /dev/pts/8` + +To: +```json +{ + "priority": "34", + "facility": "AUTH", + "severity": "CRIT", + "timestamp": "Oct 11 22:14:15", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "message": "'myproc' failed on /dev/pts/8" +} +``` + +## Syslog-RFC5424 decoder +The resulting event may contain any of the following fields: +* `priority` *string* +* `facility` *string* +* `severity` *string* +* `proto_version` *string* +* `timestamp` *string* (`RFC3339`/`RFC3339Nano` format) +* `hostname` *string* +* `app_name` *string* +* `process_id` *string* +* `message_id` *string* +* `message` *string* +* `SD_1` *object* +* ... +* `SD_N` *object* + +### Params +* `syslog_facility_format` - facility format, must be one of `number|string` (`number` by default). +* `syslog_severity_format` - severity format, must be one of `number|string` (`number` by default). + +### Examples +Default decoder: +```yml +pipelines: + example: + settings: + decoder: 'syslog_rfc5424' +``` +From: + +`<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] An application event log` + +To: +```json +{ + "priority": "165", + "facility": "20", + "severity": "5", + "proto_version": "1", + "timestamp": "2003-10-11T22:14:15.003Z", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message_id": "ID47", + "message": "An application event log", + "exampleSDID@32473": { + "iut": "3", + "eventSource": "Application", + "eventID": "1011" + } +} +``` +--- +Decoder with `syslog_*_format` params: +```yaml +pipelines: + example: + settings: + decoder: 'syslog_rfc5424' + decoder_params: + syslog_facility_format: 'string' + syslog_severity_format: 'string' +``` +From: + +`<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"]` + +To: +```json +{ + "priority": "165", + "facility": "LOCAL4", + "severity": "NOTICE", + "proto_version": "1", + "timestamp": "2003-10-11T22:14:15.003Z", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "message_id": "ID47", + "exampleSDID@32473": { + "iut": "3", + "eventSource": "Application", + "eventID": "1011" + } +} ``` \ No newline at end of file diff --git a/decoder/syslog.go b/decoder/syslog.go new file mode 100644 index 000000000..e88a5b3d2 --- /dev/null +++ b/decoder/syslog.go @@ -0,0 +1,227 @@ +package decoder + +import ( + "bytes" + "errors" + "fmt" + "strconv" + + insaneJSON "github.com/ozontech/insane-json" +) + +var ( + errSyslogInvalidFormat = errors.New("log doesn't conform the format") + errSyslogInvalidPriority = errors.New("PRI header not a valid priority") + errSyslogInvalidTimestamp = errors.New("timestamp doesn't conform the format") + errSyslogInvalidVersion = errors.New("version doesn't conform the format") + errSyslogInvalidSD = errors.New("structured data doesn't conform the format") +) + +const ( + syslogFacilityFormatParam = "syslog_facility_format" + syslogSeverityFormatParam = "syslog_severity_format" + + // priority = facility * 8 + severity. + // max facility = 23, max severity = 7. + // 23 * 8 + 7 = 191. + syslogMaxPriority = 191 +) + +var bom = []byte{0xEF, 0xBB, 0xBF} + +type syslogParams struct { + facilityFormat string // optional + severityFormat string // optional +} + +func extractSyslogParams(params map[string]any) (syslogParams, error) { + facilityFormat := spfNumber + if facilityFormatRaw, ok := params[syslogFacilityFormatParam]; ok { + facilityFormat, ok = facilityFormatRaw.(string) + if !ok { + return syslogParams{}, fmt.Errorf("%q must be string", syslogFacilityFormatParam) + } + if err := syslogPriorityFormatValidate(syslogFacilityFormatParam, facilityFormat); err != nil { + return syslogParams{}, err + } + } + + severityFormat := spfNumber + if severityFormatRaw, ok := params[syslogSeverityFormatParam]; ok { + severityFormat, ok = severityFormatRaw.(string) + if !ok { + return syslogParams{}, fmt.Errorf("%q must be string", syslogSeverityFormatParam) + } + if err := syslogPriorityFormatValidate(syslogSeverityFormatParam, severityFormat); err != nil { + return syslogParams{}, err + } + } + + return syslogParams{ + facilityFormat: facilityFormat, + severityFormat: severityFormat, + }, nil +} + +const ( + spfNumber = "number" + spfString = "string" +) + +func syslogPriorityFormatValidate(param, format string) error { + switch format { + case spfNumber, spfString: + return nil + default: + return fmt.Errorf("invalid %q format, must be one of [number|string]", param) + } +} + +func syslogParsePriority(data []byte) (int, int, error) { + if len(data) < 3 || data[0] != '<' { + return 0, 0, errSyslogInvalidFormat + } + offset := bytes.IndexByte(data, '>') + if offset < 2 || 4 < offset { + return 0, 0, errSyslogInvalidFormat + } + p, ok := atoi(data[1:offset]) + if !ok || p > syslogMaxPriority { + return 0, 0, errSyslogInvalidPriority + } + return p, offset, nil +} + +func syslogFacilityFromPriority(p int, format string) string { + f := p / 8 + if format == spfNumber { + return strconv.Itoa(f) + } + return syslogFacilityString(f) +} + +func syslogSeverityFromPriority(p int, format string) string { + s := p % 8 + if format == spfNumber { + return strconv.Itoa(s) + } + return syslogSeverityString(s) +} + +func syslogFacilityString(f int) string { + switch f { + case 0: + return "KERN" + case 1: + return "USER" + case 2: + return "MAIL" + case 3: + return "DAEMON" + case 4: + return "AUTH" + case 5: + return "SYSLOG" + case 6: + return "LPR" + case 7: + return "NEWS" + case 8: + return "UUCP" + case 9: + return "CRON" + case 10: + return "AUTHPRIV" + case 11: + return "FTP" + case 12: + return "NTP" + case 13: + return "SECURITY" + case 14: + return "CONSOLE" + case 15: + return "SOLARISCRON" + case 16: + return "LOCAL0" + case 17: + return "LOCAL1" + case 18: + return "LOCAL2" + case 19: + return "LOCAL3" + case 20: + return "LOCAL4" + case 21: + return "LOCAL5" + case 22: + return "LOCAL6" + case 23: + return "LOCAL7" + default: + return "UNKNOWN" + } +} + +func syslogSeverityString(s int) string { + switch s { + case 0: + return "EMERG" + case 1: + return "ALERT" + case 2: + return "CRIT" + case 3: + return "ERROR" + case 4: + return "WARN" + case 5: + return "NOTICE" + case 6: + return "INFO" + case 7: + return "DEBUG" + default: + return "UNKNOWN" + } +} + +type SyslogSDParams map[string][]byte +type SyslogSD map[string]SyslogSDParams + +func syslogDecodeToJson(root *insaneJSON.Root, row SyslogRFC5424Row) { // nolint: gocritic // hugeParam is ok + root.AddFieldNoAlloc(root, "priority").MutateToBytesCopy(root, row.Priority) + root.AddFieldNoAlloc(root, "facility").MutateToString(row.Facility) + root.AddFieldNoAlloc(root, "severity").MutateToString(row.Severity) + if len(row.ProtoVersion) > 0 { + root.AddFieldNoAlloc(root, "proto_version").MutateToBytesCopy(root, row.ProtoVersion) + } + if len(row.Timestamp) > 0 { + root.AddFieldNoAlloc(root, "timestamp").MutateToBytesCopy(root, row.Timestamp) + } + if len(row.Hostname) > 0 { + root.AddFieldNoAlloc(root, "hostname").MutateToBytesCopy(root, row.Hostname) + } + if len(row.AppName) > 0 { + root.AddFieldNoAlloc(root, "app_name").MutateToBytesCopy(root, row.AppName) + } + if len(row.ProcID) > 0 { + root.AddFieldNoAlloc(root, "process_id").MutateToBytesCopy(root, row.ProcID) + } + if len(row.MsgID) > 0 { + root.AddFieldNoAlloc(root, "message_id").MutateToBytesCopy(root, row.MsgID) + } + if len(row.Message) > 0 { + root.AddFieldNoAlloc(root, "message").MutateToBytesCopy(root, row.Message) + } + + for id, params := range row.StructuredData { + if len(params) == 0 { + continue + } + obj := root.AddFieldNoAlloc(root, id).MutateToObject() + for k, v := range params { + obj.AddFieldNoAlloc(root, k).MutateToBytesCopy(root, v) + } + } +} diff --git a/decoder/syslog_rfc3164.go b/decoder/syslog_rfc3164.go new file mode 100644 index 000000000..6389cd566 --- /dev/null +++ b/decoder/syslog_rfc3164.go @@ -0,0 +1,164 @@ +package decoder + +import ( + "bytes" + "fmt" + "time" + + insaneJSON "github.com/ozontech/insane-json" +) + +type SyslogRFC3164Row struct { + Priority []byte + Facility string + Severity string + Timestamp []byte + Hostname []byte + AppName []byte + ProcID []byte + Message []byte +} + +type syslogRFC3164Decoder struct { + params syslogParams +} + +func NewSyslogRFC3164Decoder(params map[string]any) (Decoder, error) { + p, err := extractSyslogParams(params) + if err != nil { + return nil, fmt.Errorf("can't extract params: %w", err) + } + + return &syslogRFC3164Decoder{ + params: p, + }, nil +} + +func (d *syslogRFC3164Decoder) Type() Type { + return SYSLOG_RFC3164 +} + +// DecodeToJson decodes syslog-RFC3164 formatted log and merges result with root. +// +// From: +// +// "<34>Oct 5 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8" +// +// To: +// +// { +// "priority": "34", +// "facility": "4", +// "severity": "2", +// "timestamp": "Oct 5 22:14:15", +// "hostname": "mymachine.example.com", +// "app_name": "myproc", +// "process_id": "10", +// "message": "'myproc' failed on /dev/pts/8" +// } +func (d *syslogRFC3164Decoder) DecodeToJson(root *insaneJSON.Root, data []byte) error { + rowRaw, err := d.Decode(data) + if err != nil { + return err + } + row := rowRaw.(SyslogRFC3164Row) + + syslogDecodeToJson(root, SyslogRFC5424Row{SyslogRFC3164Row: row}) + return nil +} + +// Decode decodes syslog-RFC3164 formatted log to [SyslogRFC3164Row]. +// +// Example of format: +// +// "<34>Oct 11 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8" +func (d *syslogRFC3164Decoder) Decode(data []byte, _ ...any) (any, error) { + var row SyslogRFC3164Row + + data = bytes.TrimSuffix(data, []byte("\n")) + if len(data) == 0 { + return row, errSyslogInvalidFormat + } + + // priority + pri, offset, err := syslogParsePriority(data) + if err != nil { + return row, fmt.Errorf("failed to parse priority: %w", err) + } + // offset points to '>' + row.Priority = data[1:offset] + data = data[offset+1:] + + // facility & severity from priority + row.Facility = syslogFacilityFromPriority(pri, d.params.facilityFormat) + row.Severity = syslogSeverityFromPriority(pri, d.params.severityFormat) + + // timestamp + if !d.validateTimestamp(data) { + return row, fmt.Errorf("failed to parse timestamp: %w", errSyslogInvalidTimestamp) + } + row.Timestamp = data[:len(time.Stamp)] + data = data[len(time.Stamp)+1:] + + // hostname + offset = bytes.IndexByte(data, ' ') + if offset < 0 { + return row, fmt.Errorf("failed to parse hostname: %w", errSyslogInvalidFormat) + } + row.Hostname = data[:offset] + data = data[offset+1:] + + // appname + offset = bytes.IndexAny(data, "[: ") + if offset < 0 { + return row, fmt.Errorf("failed to parse appname: %w", errSyslogInvalidFormat) + } + row.AppName = data[:offset] + data = data[offset:] + + // optional procid + if data[0] == '[' { + offset = bytes.IndexByte(data, ']') + if offset < 0 || data[offset+1] != ':' { + return row, fmt.Errorf("failed to parse ProcID: %w", errSyslogInvalidFormat) + } + row.ProcID = data[1:offset] + data = data[offset+2:] + } else { + data = data[1:] + } + + // message + if len(data) > 0 && data[0] == ' ' { + data = data[1:] + } + row.Message = data + + return row, nil +} + +// validateTimestamp validates [time.Stamp] format and trailing space (Jan _2 15:04:05 ) +func (d *syslogRFC3164Decoder) validateTimestamp(ts []byte) bool { + if len(ts) < len(time.Stamp)+1 { + return false + } + if !(ts[3] == ' ' && ts[6] == ' ' && ts[9] == ':' && ts[12] == ':' && ts[15] == ' ') { + return false + } + // Mmm + if ts[0] < 'A' || 'Z' < ts[0] || + ts[1] < 'a' || ts[1] > 'z' || + ts[2] < 'a' || ts[2] > 'z' { + return false + } + // dd + if !((ts[4] == ' ' || isDigit(ts[4])) && isDigit(ts[5])) { + return false + } + // time + if !(checkNumber(ts[7:9], 0, 23) && checkNumber(ts[10:12], 0, 59) && checkNumber(ts[13:15], 0, 59)) { + return false + } + + return true +} diff --git a/decoder/syslog_rfc3164_test.go b/decoder/syslog_rfc3164_test.go new file mode 100644 index 000000000..ba9d29b4f --- /dev/null +++ b/decoder/syslog_rfc3164_test.go @@ -0,0 +1,166 @@ +package decoder + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSyslogRFC3164(t *testing.T) { + tests := []struct { + name string + + input string + params map[string]any + + want SyslogRFC3164Row + wantCreateErr bool + wantDecodeErr bool + }{ + { + name: "valid_full", + input: "<34>Oct 11 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8\n", + want: SyslogRFC3164Row{ + Priority: []byte("34"), + Facility: "4", + Severity: "2", + Timestamp: []byte("Oct 11 22:14:15"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("'myproc' failed on /dev/pts/8"), + }, + }, + { + name: "valid_no_pid", + input: "<4>Oct 5 22:14:15 mymachine.example.com myproc: 'myproc' failed on /dev/pts/8", + want: SyslogRFC3164Row{ + Priority: []byte("4"), + Facility: "0", + Severity: "4", + Timestamp: []byte("Oct 5 22:14:15"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + Message: []byte("'myproc' failed on /dev/pts/8"), + }, + }, + { + name: "valid_priority_format", + input: "<34>Oct 11 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8\n", + params: map[string]any{ + syslogFacilityFormatParam: spfString, + syslogSeverityFormatParam: spfString, + }, + want: SyslogRFC3164Row{ + Priority: []byte("34"), + Facility: "AUTH", + Severity: "CRIT", + Timestamp: []byte("Oct 11 22:14:15"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("'myproc' failed on /dev/pts/8"), + }, + }, + { + name: "invalid_create_1", + params: map[string]any{ + syslogFacilityFormatParam: spfString, + syslogSeverityFormatParam: 123, + }, + wantCreateErr: true, + }, + { + name: "invalid_create_2", + params: map[string]any{ + syslogFacilityFormatParam: "test", + }, + wantCreateErr: true, + }, + { + name: "invalid_decode_timestamp_1", + input: "<34> Oct 11 22:14:15", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_2", + input: "<34>2006-01-02 15:04:05 ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_3", + input: "<34>Oct 2 22:14:15 ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_4", + input: "<34>Oct 11 22:14:15test", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_5", + input: "<34>Oct 11 aa:bb:cc ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_6", + input: "<34>oct 11 22:14:15 ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_7", + input: "<34>Oct 11 27:14:15 ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_8", + input: "<34>Oct 11 22:72:15 ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_9", + input: "<34>Oct 11 22:14:99 ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_hostname", + input: "<34>Oct 11 22:14:15 mymachine.example.com", + wantDecodeErr: true, + }, + { + name: "invalid_decode_appname", + input: "<34>Oct 11 22:14:15 mymachine.example.com myproc", + wantDecodeErr: true, + }, + { + name: "invalid_decode_procid_1", + input: "<34>Oct 11 22:14:15 mymachine.example.com myproc[10: 'myproc' failed on /dev/pts/8", + wantDecodeErr: true, + }, + { + name: "invalid_decode_procid_2", + input: "<34>Oct 11 22:14:15 mymachine.example.com myproc[10] 'myproc' failed on /dev/pts/8", + wantDecodeErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + d, err := NewSyslogRFC3164Decoder(tt.params) + assert.Equal(t, tt.wantCreateErr, err != nil) + if tt.wantCreateErr { + return + } + + row, err := d.Decode([]byte(tt.input)) + assert.Equal(t, tt.wantDecodeErr, err != nil) + if tt.wantDecodeErr { + return + } + + assert.Equal(t, tt.want, row.(SyslogRFC3164Row)) + }) + } +} diff --git a/decoder/syslog_rfc5424.go b/decoder/syslog_rfc5424.go new file mode 100644 index 000000000..2c47649a8 --- /dev/null +++ b/decoder/syslog_rfc5424.go @@ -0,0 +1,352 @@ +package decoder + +import ( + "bytes" + "fmt" + + insaneJSON "github.com/ozontech/insane-json" +) + +type SyslogRFC5424Row struct { + SyslogRFC3164Row + + ProtoVersion []byte + MsgID []byte + StructuredData SyslogSD +} + +type syslogRFC5424Decoder struct { + params syslogParams +} + +func NewSyslogRFC5424Decoder(params map[string]any) (Decoder, error) { + p, err := extractSyslogParams(params) + if err != nil { + return nil, fmt.Errorf("can't extract params: %w", err) + } + + return &syslogRFC5424Decoder{ + params: p, + }, nil +} + +func (d *syslogRFC5424Decoder) Type() Type { + return SYSLOG_RFC5424 +} + +// DecodeToJson decodes syslog-RFC5424 formatted log and merges result with root. +// +// From: +// +// `<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] An application event log` +// +// To: +// +// { +// "priority": "165", +// "facility": "20", +// "severity": "5", +// "proto_version": "1", +// "timestamp": "2003-10-11T22:14:15.003Z", +// "hostname": "mymachine.example.com", +// "app_name": "myproc", +// "process_id": "10", +// "message_id": "ID47", +// "message": "An application event log", +// "exampleSDID@32473": { +// "iut": "3", +// "eventSource": "Application", +// "eventID": "1011" +// } +// } +func (d *syslogRFC5424Decoder) DecodeToJson(root *insaneJSON.Root, data []byte) error { + rowRaw, err := d.Decode(data) + if err != nil { + return err + } + row := rowRaw.(SyslogRFC5424Row) + + syslogDecodeToJson(root, row) + return nil +} + +// Decode decodes syslog-RFC5424 formatted log to [SyslogRFC5424Row]. +// +// Example of format: +// +// "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] An application event log" +func (d *syslogRFC5424Decoder) Decode(data []byte, _ ...any) (any, error) { + var ( + row SyslogRFC5424Row + offset int + ok bool + err error + ) + + data = bytes.TrimSuffix(data, []byte("\n")) + if len(data) == 0 { + return row, errSyslogInvalidFormat + } + + // priority + pri, offset, err := syslogParsePriority(data) + if err != nil { + return row, fmt.Errorf("failed to parse priority: %w", err) + } + // offset points to '>' + row.Priority = data[1:offset] + data = data[offset+1:] + + // facility & severity from priority + row.Facility = syslogFacilityFromPriority(pri, d.params.facilityFormat) + row.Severity = syslogSeverityFromPriority(pri, d.params.severityFormat) + + // proto version + offset = bytes.IndexByte(data, ' ') + if offset <= 0 { + return row, fmt.Errorf("failed to parse version: %w", errSyslogInvalidFormat) + } + row.ProtoVersion = data[:offset] + if _, ok = atoi(row.ProtoVersion); !ok { + return row, fmt.Errorf("failed to parse version: %w", errSyslogInvalidVersion) + } + data = data[offset+1:] + + // timestamp + offset, ok = d.readUntilSpaceOrNilValue(data) + if !ok { + return row, fmt.Errorf("failed to parse timestamp: %w", errSyslogInvalidVersion) + } + if offset == 0 { + data = data[2:] + } else { + row.Timestamp = data[:offset] + if !d.validateTimestamp(row.Timestamp) { + return row, fmt.Errorf("failed to parse timestamp: %w", errSyslogInvalidTimestamp) + } + data = data[offset+1:] + } + + // hostname + offset, ok = d.readUntilSpaceOrNilValue(data) + if !ok { + return row, fmt.Errorf("failed to parse hostname: %w", errSyslogInvalidFormat) + } + if offset == 0 { + data = data[2:] + } else { + row.Hostname = data[:offset] + data = data[offset+1:] + } + + // appname + offset, ok = d.readUntilSpaceOrNilValue(data) + if !ok { + return row, fmt.Errorf("failed to parse appname: %w", errSyslogInvalidFormat) + } + if offset == 0 { + data = data[2:] + } else { + row.AppName = data[:offset] + data = data[offset+1:] + } + + // procid + offset, ok = d.readUntilSpaceOrNilValue(data) + if !ok { + return row, fmt.Errorf("failed to parse ProcID: %w", errSyslogInvalidFormat) + } + if offset == 0 { + data = data[2:] + } else { + row.ProcID = data[:offset] + data = data[offset+1:] + } + + // msgid + offset, ok = d.readUntilSpaceOrNilValue(data) + if !ok { + return row, fmt.Errorf("failed to parse MsgID: %w", errSyslogInvalidFormat) + } + if offset == 0 { + data = data[2:] + } else { + row.MsgID = data[:offset] + data = data[offset+1:] + } + + // structured data + row.StructuredData, offset, ok = d.parseStructuredData(data) + if !ok { + return row, fmt.Errorf("failed to parse structured data: %w", errSyslogInvalidSD) + } + + // no message + if offset >= len(data) { + return row, nil + } + data = data[offset+1:] + + // message + if len(data) > 0 && data[0] == ' ' { + data = data[1:] + } + // BOM + if len(data) > 2 && bytes.Equal(data[:3], bom) { + data = data[3:] + } + row.Message = data + + return row, nil +} + +// validateTimestamp validates [time.RFC3339] / [time.RFC3339Nano] formats +func (d *syslogRFC5424Decoder) validateTimestamp(ts []byte) bool { + if len(ts) < len("2006-01-02T15:04:05Z") { + return false + } + // format + if !(ts[4] == '-' && ts[7] == '-' && ts[10] == 'T' && ts[13] == ':' && ts[16] == ':') { + return false + } + // date + if !(checkNumber(ts[:4], 0, 9999) && checkNumber(ts[5:7], 1, 12) && checkNumber(ts[8:10], 1, 31)) { + return false + } + // time + if !(checkNumber(ts[11:13], 0, 23) && checkNumber(ts[14:16], 0, 59) && checkNumber(ts[17:19], 0, 59)) { + return false + } + // length of processed: "2006-01-02T15:04:05" + ts = ts[19:] + + // nanoseconds + if len(ts) >= 2 && ts[0] == '.' && isDigit(ts[1]) { + i := 2 + for ; i < len(ts) && isDigit(ts[i]); i++ { + } + if i > 7 { + return false + } + ts = ts[i:] + } + + // timezone + if len(ts) > 0 && ts[0] == 'Z' { + return true + } + if len(ts) < len("-07:00") { + return false + } + if !((ts[0] == '+' || ts[0] == '-') && ts[3] == ':') { + return false + } + if !(checkNumber(ts[1:3], 0, 23) && checkNumber(ts[4:6], 0, 59)) { + return false + } + + return true +} + +func (d *syslogRFC5424Decoder) parseStructuredData(data []byte) (SyslogSD, int, bool) { + if len(data) > 0 && data[0] == '-' { + return nil, 0, len(data) == 1 || data[1] == ' ' + } + + sd := SyslogSD{} + offset := 0 + r := bytes.NewReader(nil) + var ( + sdID, paramID string + idx, startParamID, startParamValue int + wasOpen, wasClose, insideParamValue bool + ) + + shiftData := func(count int) { + data = data[count:] + offset += count + } + + resetState := func() { + r.Reset(data) + idx = 0 + startParamID = 0 + startParamValue = 0 + insideParamValue = false + paramID = "" + } + + for len(data) > 0 { + if data[0] != '[' { + break + } + wasOpen = true + shiftData(1) + + idx = bytes.IndexByte(data, ' ') + if idx < 2 { + return nil, 0, false + } + sdID = string(data[:idx]) + sd[sdID] = SyslogSDParams{} + shiftData(idx + 1) + + resetState() + wasClose = false + paramsLoop: + for { + b, err := r.ReadByte() + if err != nil { + break + } + + switch { + case b == ']': + if data[idx-1] != '"' { + return nil, 0, false + } + wasClose = true + break paramsLoop + case b == ' ' && !insideParamValue: + startParamID = idx + 1 + case b == '=' && !insideParamValue: + if idx+1 < len(data) && data[idx+1] != '"' { + return nil, 0, false + } + paramID = string(data[startParamID:idx]) + case b == '"': + if data[idx-1] == '\\' { + break + } + if insideParamValue { + sd[sdID][paramID] = data[startParamValue:idx] + } else { + startParamValue = idx + 1 + } + insideParamValue = !insideParamValue + } + idx++ + } + if !wasClose { + return nil, 0, false + } + shiftData(idx + 1) + } + + if !wasOpen { + return nil, 0, false + } + return sd, offset, true +} + +// readUntilSpaceOrNilValue reads bytes until SP (' ') or NILVALUE ('-') +func (d *syslogRFC5424Decoder) readUntilSpaceOrNilValue(data []byte) (int, bool) { + if len(data) < 2 { + return -1, false + } + if data[0] == '-' && data[1] == ' ' { + return 0, true + } + offset := bytes.IndexByte(data, ' ') + return offset, offset > 0 +} diff --git a/decoder/syslog_rfc5424_test.go b/decoder/syslog_rfc5424_test.go new file mode 100644 index 000000000..96c68d0fe --- /dev/null +++ b/decoder/syslog_rfc5424_test.go @@ -0,0 +1,484 @@ +package decoder + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSyslogRFC5424(t *testing.T) { + tests := []struct { + name string + + input string + params map[string]any + + want SyslogRFC5424Row + wantCreateErr bool + wantDecodeErr bool + }{ + { + name: "valid_full", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"My \\\"Application\\\"\" eventID=\"1011\"] An application event log\n", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("My \\\"Application\\\""), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_full_bom", + input: fmt.Sprintf("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] %sAn application event log", bom), + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("Application"), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_full_priority_format", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", + params: map[string]any{ + syslogFacilityFormatParam: spfString, + syslogSeverityFormatParam: spfString, + }, + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "LOCAL4", + Severity: "NOTICE", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("Application"), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_no_timestamp", + input: "<165>1 - mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("Application"), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_no_hostname", + input: "<165>1 2003-10-11T22:14:15.003Z - myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("Application"), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_no_appname", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com - 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("Application"), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_no_procid", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("Application"), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_no_msgid", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 - [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("Application"), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_no_sd", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 - An application event log", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + }, + }, + { + name: "valid_no_msg", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "exampleSDID@32473": SyslogSDParams{ + "iut": []byte("3"), + "eventSource": []byte("Application"), + "eventID": []byte("1011"), + }, + }, + }, + }, + { + name: "valid_only_required", + input: "<165>1 - - - - - - An application event log", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + }, + }, + { + name: "valid_multi_sd", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [example1@123 param1=\"1\" param2=\"two\"][example2@123 param1=\"\" param2=\"twotwo\"] An application event log", + want: SyslogRFC5424Row{ + SyslogRFC3164Row: SyslogRFC3164Row{ + Priority: []byte("165"), + Facility: "20", + Severity: "5", + Timestamp: []byte("2003-10-11T22:14:15.003Z"), + Hostname: []byte("mymachine.example.com"), + AppName: []byte("myproc"), + ProcID: []byte("10"), + Message: []byte("An application event log"), + }, + ProtoVersion: []byte("1"), + MsgID: []byte("ID47"), + StructuredData: SyslogSD{ + "example1@123": SyslogSDParams{ + "param1": []byte("1"), + "param2": []byte("two"), + }, + "example2@123": SyslogSDParams{ + "param1": []byte(""), + "param2": []byte("twotwo"), + }, + }, + }, + }, + { + name: "invalid_create_1", + params: map[string]any{ + syslogFacilityFormatParam: spfString, + syslogSeverityFormatParam: 123, + }, + wantCreateErr: true, + }, + { + name: "invalid_create_2", + params: map[string]any{ + syslogFacilityFormatParam: "test", + }, + wantCreateErr: true, + }, + { + name: "invalid_decode_timestamp_1", + input: "<165>1 2003-10-11T22:14:15", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_2", + input: "<165>1 2003 10 11T22:14:15Z", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_3", + input: "<165>1 2003-10-11T22-14-15Z", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_4", + input: "<165>1 2003-13-11T22:14:15Z", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_5", + input: "<165>1 2003-12-32T22:14.15Z", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_6", + input: "<165>1 2003-12-31T25:14.15Z", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_7", + input: "<165>1 2003-12-31T22:62.15Z", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_8", + input: "<165>1 2003-12-31T22:14.99Z", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_9", + input: "<165>1 2003-12-31T22:14.15.0000003Z", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_10", + input: "<165>1 2003-12-31T22:14.15X", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_11", + input: "<165>1 2003-12-31T22:14.15-07", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_12", + input: "<165>1 2003-12-31T22:14.15@07:00", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_13", + input: "<165>1 2003-12-31T22:14.15+07@00", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_14", + input: "<165>1 2003-12-31T22:14.15+25:00", + wantDecodeErr: true, + }, + { + name: "invalid_decode_timestamp_15", + input: "<165>1 2003-12-31T22:14.15+07:65", + wantDecodeErr: true, + }, + { + name: "invalid_decode_hostname", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com", + wantDecodeErr: true, + }, + { + name: "invalid_decode_appname", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc", + wantDecodeErr: true, + }, + { + name: "invalid_decode_procid", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10", + wantDecodeErr: true, + }, + { + name: "invalid_decode_msgid", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_1", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 exampleSDID@32473", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_2", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_3", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473]", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_4", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_5", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 =]", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_6", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=3\"]", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_7", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3]", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_8", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" ", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_9", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" ]", + wantDecodeErr: true, + }, + { + name: "invalid_decode_sd_10", + input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [ exampleSDID@32473 iut=\"3\"]", + wantDecodeErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + d, err := NewSyslogRFC5424Decoder(tt.params) + assert.Equal(t, tt.wantCreateErr, err != nil) + if tt.wantCreateErr { + return + } + + row, err := d.Decode([]byte(tt.input)) + assert.Equal(t, tt.wantDecodeErr, err != nil) + if tt.wantDecodeErr { + return + } + + assert.Equal(t, tt.want, row.(SyslogRFC5424Row)) + }) + } +} diff --git a/decoder/syslog_test.go b/decoder/syslog_test.go new file mode 100644 index 000000000..439ad83be --- /dev/null +++ b/decoder/syslog_test.go @@ -0,0 +1,83 @@ +package decoder + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSyslogParsePriority(t *testing.T) { + tests := []struct { + name string + + input string + + want int + wantOffset int + wantErr bool + }{ + { + name: "valid_1", + input: "<1>", + want: 1, + wantOffset: 2, + }, + { + name: "valid_2", + input: "<12>", + want: 12, + wantOffset: 3, + }, + { + name: "valid_3", + input: "<123>", + want: 123, + wantOffset: 4, + }, + { + name: "invalid_1", + input: "", + wantErr: true, + }, + { + name: "invalid_2", + input: "<", + wantErr: true, + }, + { + name: "invalid_3", + input: "<>", + wantErr: true, + }, + { + name: "invalid_4", + input: "", + wantErr: true, + }, + { + name: "invalid_5", + input: "<100000>", + wantErr: true, + }, + { + name: "invalid_6", + input: "<192>", + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + p, offset, err := syslogParsePriority([]byte(tt.input)) + assert.Equal(t, tt.wantErr, err != nil) + if tt.wantErr { + return + } + + assert.Equal(t, tt.want, p) + assert.Equal(t, tt.wantOffset, offset) + }) + } +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 9741f9681..36d58ae79 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -231,11 +231,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli switch settings.Decoder { case "json": pipeline.decoderType = decoder.JSON - pipeline.decoder, err = decoder.NewJsonDecoder(pipeline.settings.DecoderParams) - if err != nil { - pipeline.logger.Fatal("can't create json decoder", zap.Error(err)) - } case "raw": pipeline.decoderType = decoder.RAW case "cri": @@ -244,23 +240,24 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli pipeline.decoderType = decoder.POSTGRES case "nginx_error": pipeline.decoderType = decoder.NGINX_ERROR - pipeline.decoder, err = decoder.NewNginxErrorDecoder(pipeline.settings.DecoderParams) - if err != nil { - pipeline.logger.Fatal("can't create nginx_error decoder", zap.Error(err)) - } case "protobuf": pipeline.decoderType = decoder.PROTOBUF - pipeline.decoder, err = decoder.NewProtobufDecoder(pipeline.settings.DecoderParams) - if err != nil { - pipeline.logger.Fatal("can't create protobuf decoder", zap.Error(err)) - } + case "syslog_rfc3164": + pipeline.decoderType = decoder.SYSLOG_RFC3164 + pipeline.decoder, err = decoder.NewSyslogRFC3164Decoder(pipeline.settings.DecoderParams) + case "syslog_rfc5424": + pipeline.decoderType = decoder.SYSLOG_RFC5424 + pipeline.decoder, err = decoder.NewSyslogRFC5424Decoder(pipeline.settings.DecoderParams) case "auto": pipeline.decoderType = decoder.AUTO default: pipeline.logger.Fatal("unknown decoder", zap.String("decoder", settings.Decoder)) } + if err != nil { + pipeline.logger.Fatal("can't create decoder", zap.String("decoder", settings.Decoder), zap.Error(err)) + } return pipeline } @@ -509,7 +506,8 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, byt _ = event.Root.DecodeString("{}") } switch dec { - case decoder.JSON: + case decoder.JSON, decoder.NGINX_ERROR, decoder.PROTOBUF, + decoder.SYSLOG_RFC3164, decoder.SYSLOG_RFC5424: err = p.decoder.DecodeToJson(event.Root, bytes) case decoder.RAW: event.Root.AddFieldNoAlloc(event.Root, "message").MutateToBytesCopy(event.Root, bytes[:len(bytes)-1]) @@ -519,10 +517,6 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, byt event.Root.AddFieldNoAlloc(event.Root, "stream").MutateToBytesCopy(event.Root, row.Stream) case decoder.POSTGRES: err = decoder.DecodePostgresToJson(event.Root, bytes) - case decoder.NGINX_ERROR: - err = p.decoder.DecodeToJson(event.Root, bytes) - case decoder.PROTOBUF: - err = p.decoder.DecodeToJson(event.Root, bytes) default: p.logger.Panic("unknown decoder", zap.Int("decoder", int(dec))) } diff --git a/plugin/action/decode/README.md b/plugin/action/decode/README.md index 27502ad9f..53b3f06bd 100755 --- a/plugin/action/decode/README.md +++ b/plugin/action/decode/README.md @@ -106,7 +106,15 @@ The resulting event: } ``` -### NginxError decoder +### Nginx-error decoder +The event root may contain any of the following fields: +* `time` *string* +* `level` *string* +* `pid` *string* +* `tid` *string* +* `cid` *string* +* `message` *string* + You can specify `nginx_with_custom_fields: true` in `params` to decode custom fields. Default decoder: @@ -277,6 +285,188 @@ The resulting event: } ``` +### Syslog-RFC3164 decoder +The event root may contain any of the following fields: +* `priority` *string* +* `facility` *string* +* `severity` *string* +* `timestamp` *string* (`Stamp` format) +* `hostname` *string* +* `app_name` *string* +* `process_id` *string* +* `message` *string* + +You can specify `syslog_facility_format` and `syslog_severity_format` in `params` +for preferred `facility` and `severity` fields format. + +Default decoder: +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: decode + field: log + decoder: syslog_rfc3164 + ... +``` +The original event: +```json +{ + "log": "<34>Oct 5 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8", + "service": "test" +} +``` +The resulting event: +```json +{ + "service": "test", + "priority": "34", + "facility": "4", + "severity": "2", + "timestamp": "Oct 5 22:14:15", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message": "'myproc' failed on /dev/pts/8" +} +``` +--- +Decoder with `syslog_*_format` params: +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: decode + field: log + decoder: syslog_rfc3164 + params: + syslog_facility_format: 'string' + syslog_severity_format: 'string' + ... +``` +The original event: +```json +{ + "log": "<34>Oct 11 22:14:15 mymachine.example.com myproc: 'myproc' failed on /dev/pts/8", + "service": "test" +} +``` +The resulting event: +```json +{ + "service": "test", + "priority": "34", + "facility": "AUTH", + "severity": "CRIT", + "timestamp": "Oct 11 22:14:15", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "message": "'myproc' failed on /dev/pts/8" +} +``` + +### Syslog-RFC5424 decoder +The event root may contain any of the following fields: +* `priority` *string* +* `facility` *string* +* `severity` *string* +* `proto_version` *string* +* `timestamp` *string* (`RFC3339`/`RFC3339Nano` format) +* `hostname` *string* +* `app_name` *string* +* `process_id` *string* +* `message_id` *string* +* `message` *string* +* `SD_1` *object* +* ... +* `SD_N` *object* + +You can specify `syslog_facility_format` and `syslog_severity_format` in `params` +for preferred `facility` and `severity` fields format. + +Default decoder: +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: decode + field: log + decoder: syslog_rfc5424 + ... +``` +The original event: +```json +{ + "log": "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", + "service": "test" +} +``` +The resulting event: +```json +{ + "service": "test", + "priority": "165", + "facility": "20", + "severity": "5", + "proto_version": "1", + "timestamp": "2003-10-11T22:14:15.003Z", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message_id": "ID47", + "message": "An application event log", + "exampleSDID": { + "iut": "3", + "eventSource": "Application", + "eventID": "1011" + } +} +``` +--- +Decoder with `syslog_*_format` params: +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: decode + field: log + decoder: syslog_rfc5424 + params: + syslog_facility_format: 'string' + syslog_severity_format: 'string' + ... +``` +The original event: +```json +{ + "log": "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc - ID47 [exampleSDID iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]", + "service": "test" +} +``` +The resulting event: +```json +{ + "service": "test", + "priority": "165", + "facility": "LOCAL4", + "severity": "NOTICE", + "proto_version": "1", + "timestamp": "2003-10-11T22:14:15.003Z", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "message_id": "ID47", + "exampleSDID": { + "iut": "3", + "eventSource": "Application", + "eventID": "1011" + } +} +``` + ### Keep origin ```yaml pipelines: @@ -317,7 +507,7 @@ The event field to decode. Must be a string.
-**`decoder`** *`string`* *`default=json`* *`options=json|postgres|nginx_error|protobuf`* +**`decoder`** *`string`* *`default=json`* *`options=json|postgres|nginx_error|protobuf|syslog_rfc3164|syslog_rfc5424`* Decoder type. @@ -332,7 +522,7 @@ Decoding params. If set, the fields will be cut to the specified limit. > It works only with string values. If the field doesn't exist or isn't a string, it will be skipped. -**NginxError decoder params**: +**Nginx-error decoder params**: * `nginx_with_custom_fields` - if set, custom fields will be extracted. **Protobuf decoder params**: @@ -357,6 +547,10 @@ If present and not empty, then all file paths to find are assumed to be relative > * google/protobuf/type.proto > * google/protobuf/wrappers.proto +**Syslog-RFC3164 & Syslog-RFC5424 decoder params**: +* `syslog_facility_format` - facility format, must be one of `number|string` (`number` by default). +* `syslog_severity_format` - severity format, must be one of `number|string` (`number` by default). +
**`prefix`** *`string`* diff --git a/plugin/action/decode/decode.go b/plugin/action/decode/decode.go index b50e709a6..ac9125626 100644 --- a/plugin/action/decode/decode.go +++ b/plugin/action/decode/decode.go @@ -120,7 +120,15 @@ The resulting event: } ``` -### NginxError decoder +### Nginx-error decoder +The event root may contain any of the following fields: +* `time` *string* +* `level` *string* +* `pid` *string* +* `tid` *string* +* `cid` *string* +* `message` *string* + You can specify `nginx_with_custom_fields: true` in `params` to decode custom fields. Default decoder: @@ -291,6 +299,188 @@ The resulting event: } ``` +### Syslog-RFC3164 decoder +The event root may contain any of the following fields: +* `priority` *string* +* `facility` *string* +* `severity` *string* +* `timestamp` *string* (`Stamp` format) +* `hostname` *string* +* `app_name` *string* +* `process_id` *string* +* `message` *string* + +You can specify `syslog_facility_format` and `syslog_severity_format` in `params` +for preferred `facility` and `severity` fields format. + +Default decoder: +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: decode + field: log + decoder: syslog_rfc3164 + ... +``` +The original event: +```json +{ + "log": "<34>Oct 5 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8", + "service": "test" +} +``` +The resulting event: +```json +{ + "service": "test", + "priority": "34", + "facility": "4", + "severity": "2", + "timestamp": "Oct 5 22:14:15", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message": "'myproc' failed on /dev/pts/8" +} +``` +--- +Decoder with `syslog_*_format` params: +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: decode + field: log + decoder: syslog_rfc3164 + params: + syslog_facility_format: 'string' + syslog_severity_format: 'string' + ... +``` +The original event: +```json +{ + "log": "<34>Oct 11 22:14:15 mymachine.example.com myproc: 'myproc' failed on /dev/pts/8", + "service": "test" +} +``` +The resulting event: +```json +{ + "service": "test", + "priority": "34", + "facility": "AUTH", + "severity": "CRIT", + "timestamp": "Oct 11 22:14:15", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "message": "'myproc' failed on /dev/pts/8" +} +``` + +### Syslog-RFC5424 decoder +The event root may contain any of the following fields: +* `priority` *string* +* `facility` *string* +* `severity` *string* +* `proto_version` *string* +* `timestamp` *string* (`RFC3339`/`RFC3339Nano` format) +* `hostname` *string* +* `app_name` *string* +* `process_id` *string* +* `message_id` *string* +* `message` *string* +* `SD_1` *object* +* ... +* `SD_N` *object* + +You can specify `syslog_facility_format` and `syslog_severity_format` in `params` +for preferred `facility` and `severity` fields format. + +Default decoder: +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: decode + field: log + decoder: syslog_rfc5424 + ... +``` +The original event: +```json +{ + "log": "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", + "service": "test" +} +``` +The resulting event: +```json +{ + "service": "test", + "priority": "165", + "facility": "20", + "severity": "5", + "proto_version": "1", + "timestamp": "2003-10-11T22:14:15.003Z", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message_id": "ID47", + "message": "An application event log", + "exampleSDID": { + "iut": "3", + "eventSource": "Application", + "eventID": "1011" + } +} +``` +--- +Decoder with `syslog_*_format` params: +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: decode + field: log + decoder: syslog_rfc5424 + params: + syslog_facility_format: 'string' + syslog_severity_format: 'string' + ... +``` +The original event: +```json +{ + "log": "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc - ID47 [exampleSDID iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]", + "service": "test" +} +``` +The resulting event: +```json +{ + "service": "test", + "priority": "165", + "facility": "LOCAL4", + "severity": "NOTICE", + "proto_version": "1", + "timestamp": "2003-10-11T22:14:15.003Z", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "message_id": "ID47", + "exampleSDID": { + "iut": "3", + "eventSource": "Application", + "eventID": "1011" + } +} +``` + ### Keep origin ```yaml pipelines: @@ -332,6 +522,8 @@ const ( decPostgres decNginxError decProtobuf + decSyslogRFC3164 + decSyslogRFC5424 ) type logDecodeErrorMode int @@ -361,7 +553,7 @@ type Config struct { // > @3@4@5@6 // > // > Decoder type. - Decoder string `json:"decoder" default:"json" options:"json|postgres|nginx_error|protobuf"` // * + Decoder string `json:"decoder" default:"json" options:"json|postgres|nginx_error|protobuf|syslog_rfc3164|syslog_rfc5424"` // * Decoder_ decoderType // > @3@4@5@6 @@ -373,7 +565,7 @@ type Config struct { // > If set, the fields will be cut to the specified limit. // > > It works only with string values. If the field doesn't exist or isn't a string, it will be skipped. // > - // > **NginxError decoder params**: + // > **Nginx-error decoder params**: // > * `nginx_with_custom_fields` - if set, custom fields will be extracted. // > // > **Protobuf decoder params**: @@ -397,6 +589,10 @@ type Config struct { // >> * google/protobuf/timestamp.proto // >> * google/protobuf/type.proto // >> * google/protobuf/wrappers.proto + // > + // > **Syslog-RFC3164 & Syslog-RFC5424 decoder params**: + // > * `syslog_facility_format` - facility format, must be one of `number|string` (`number` by default). + // > * `syslog_severity_format` - severity format, must be one of `number|string` (`number` by default). Params map[string]any `json:"params"` // * // > @3@4@5@6 @@ -442,6 +638,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP p.decoder, err = decoder.NewNginxErrorDecoder(p.config.Params) case decProtobuf: p.decoder, err = decoder.NewProtobufDecoder(p.config.Params) + case decSyslogRFC3164: + p.decoder, err = decoder.NewSyslogRFC3164Decoder(p.config.Params) + case decSyslogRFC5424: + p.decoder, err = decoder.NewSyslogRFC5424Decoder(p.config.Params) } if err != nil { p.logger.Fatal(fmt.Sprintf("can't create %s decoder", p.config.Decoder), zap.Error(err)) @@ -465,6 +665,10 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { p.decodeNginxError(event.Root, fieldNode) case decProtobuf: p.decodeProtobuf(event.Root, fieldNode, event.Buf) + case decSyslogRFC3164: + p.decodeSyslogRFC3164(event.Root, fieldNode) + case decSyslogRFC5424: + p.decodeSyslogRFC5424(event.Root, fieldNode) } return pipeline.ActionPass @@ -569,8 +773,81 @@ func (p *Plugin) decodeProtobuf(root *insaneJSON.Root, node *insaneJSON.Node, bu root.MergeWith(t) } -func (p *Plugin) addFieldPrefix(root *insaneJSON.Root, key string, val []byte) { - root.AddFieldNoAlloc(root, p.config.Prefix+key).MutateToBytesCopy(root, val) +func (p *Plugin) decodeSyslogRFC3164(root *insaneJSON.Root, node *insaneJSON.Node) { + rowRaw, err := p.decoder.Decode(node.AsBytes()) + if p.checkError(err, node) { + return + } + row := rowRaw.(decoder.SyslogRFC3164Row) + + if !p.config.KeepOrigin { + node.Suicide() + } + + p.decodeSyslog(root, decoder.SyslogRFC5424Row{SyslogRFC3164Row: row}) +} + +func (p *Plugin) decodeSyslogRFC5424(root *insaneJSON.Root, node *insaneJSON.Node) { + rowRaw, err := p.decoder.Decode(node.AsBytes()) + if p.checkError(err, node) { + return + } + row := rowRaw.(decoder.SyslogRFC5424Row) + + if !p.config.KeepOrigin { + node.Suicide() + } + + p.decodeSyslog(root, row) +} + +func (p *Plugin) decodeSyslog(root *insaneJSON.Root, row decoder.SyslogRFC5424Row) { // nolint: gocritic // hugeParam is ok + p.addFieldPrefix(root, "priority", row.Priority) + p.addFieldPrefix(root, "facility", row.Facility) + p.addFieldPrefix(root, "severity", row.Severity) + if len(row.ProtoVersion) > 0 { + p.addFieldPrefix(root, "proto_version", row.ProtoVersion) + } + if len(row.Timestamp) > 0 { + p.addFieldPrefix(root, "timestamp", row.Timestamp) + } + if len(row.Hostname) > 0 { + p.addFieldPrefix(root, "hostname", row.Hostname) + } + if len(row.AppName) > 0 { + p.addFieldPrefix(root, "app_name", row.AppName) + } + if len(row.ProcID) > 0 { + p.addFieldPrefix(root, "process_id", row.ProcID) + } + if len(row.MsgID) > 0 { + p.addFieldPrefix(root, "message_id", row.MsgID) + } + if len(row.Message) > 0 { + p.addFieldPrefix(root, "message", row.Message) + } + + for id, params := range row.StructuredData { + if len(params) == 0 { + continue + } + obj := root.AddFieldNoAlloc(root, id).MutateToObject() + for k, v := range params { + obj.AddFieldNoAlloc(root, k).MutateToBytesCopy(root, v) + } + } +} + +func (p *Plugin) addFieldPrefix(root *insaneJSON.Root, key string, val any) { + f := root.AddFieldNoAlloc(root, p.config.Prefix+key) + switch v := val.(type) { + case []byte: + f.MutateToBytesCopy(root, v) + case string: + f.MutateToString(v) + default: + panic("") + } } func (p *Plugin) checkError(err error, node *insaneJSON.Node) bool { diff --git a/plugin/action/decode/decode_test.go b/plugin/action/decode/decode_test.go index e5e530359..6440b23d5 100644 --- a/plugin/action/decode/decode_test.go +++ b/plugin/action/decode/decode_test.go @@ -234,6 +234,143 @@ func TestDecode(t *testing.T) { "p_version": "10", }, }, + { + name: "syslog_rfc3164", + config: &Config{ + Field: "log", + Decoder: "syslog_rfc3164", + }, + input: []byte(`{"service":"test","log":"<34>Oct 5 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8"}`), + want: map[string]string{ + "service": "test", + "priority": "34", + "facility": "4", + "severity": "2", + "timestamp": "Oct 5 22:14:15", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message": "'myproc' failed on /dev/pts/8", + }, + }, + { + name: "syslog_rfc3164_prefix", + config: &Config{ + Field: "log", + Decoder: "syslog_rfc3164", + Prefix: "p_", + }, + input: []byte(`{"service":"test","log":"<34>Oct 5 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8"}`), + want: map[string]string{ + "service": "test", + "p_priority": "34", + "p_facility": "4", + "p_severity": "2", + "p_timestamp": "Oct 5 22:14:15", + "p_hostname": "mymachine.example.com", + "p_app_name": "myproc", + "p_process_id": "10", + "p_message": "'myproc' failed on /dev/pts/8", + }, + }, + { + name: "syslog_rfc3164_priority_format", + config: &Config{ + Field: "log", + Decoder: "syslog_rfc3164", + Params: map[string]any{ + "syslog_facility_format": "string", + "syslog_severity_format": "string", + }, + }, + input: []byte(`{"service":"test","log":"<34>Oct 5 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8"}`), + want: map[string]string{ + "service": "test", + "priority": "34", + "facility": "AUTH", + "severity": "CRIT", + "timestamp": "Oct 5 22:14:15", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message": "'myproc' failed on /dev/pts/8", + }, + }, + { + name: "syslog_rfc5424", + config: &Config{ + Field: "log", + Decoder: "syslog_rfc5424", + }, + input: []byte(`{"service":"test","log":"<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log"}`), + want: map[string]string{ + "service": "test", + "priority": "165", + "facility": "20", + "severity": "5", + "proto_version": "1", + "timestamp": "2003-10-11T22:14:15.003Z", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "process_id": "10", + "message_id": "ID47", + "message": "An application event log", + "exampleSDID@32473.iut": "3", + "exampleSDID@32473.eventSource": "Application", + "exampleSDID@32473.eventID": "1011", + }, + }, + { + name: "syslog_rfc5424_prefix", + config: &Config{ + Field: "log", + Decoder: "syslog_rfc5424", + Prefix: "p_", + }, + input: []byte(`{"service":"test","log":"<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log"}`), + want: map[string]string{ + "service": "test", + "p_priority": "165", + "p_facility": "20", + "p_severity": "5", + "p_proto_version": "1", + "p_timestamp": "2003-10-11T22:14:15.003Z", + "p_hostname": "mymachine.example.com", + "p_app_name": "myproc", + "p_process_id": "10", + "p_message_id": "ID47", + "p_message": "An application event log", + "exampleSDID@32473.iut": "3", + "exampleSDID@32473.eventSource": "Application", + "exampleSDID@32473.eventID": "1011", + }, + }, + { + name: "syslog_rfc5424_priority_format", + config: &Config{ + Field: "log", + Decoder: "syslog_rfc5424", + Params: map[string]any{ + "syslog_facility_format": "string", + "syslog_severity_format": "string", + }, + }, + input: []byte(`{"service":"test","log":"<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]"}`), + want: map[string]string{ + "service": "test", + "priority": "165", + "facility": "LOCAL4", + "severity": "NOTICE", + "proto_version": "1", + "timestamp": "2003-10-11T22:14:15.003Z", + "hostname": "mymachine.example.com", + "app_name": "myproc", + "message_id": "ID47", + "exampleSDID@32473.iut": "3", + "exampleSDID@32473.eventSource": "Application", + "exampleSDID@32473.eventID": "1011", + }, + }, { name: "keep_origin", config: &Config{