diff --git a/cmd/enricher/main.go b/cmd/enricher/main.go index 28b90db3..23ef5db7 100644 --- a/cmd/enricher/main.go +++ b/cmd/enricher/main.go @@ -19,7 +19,8 @@ import ( // import various formatters "github.com/netsampler/goflow2/format" - "github.com/netsampler/goflow2/format/json" + "github.com/netsampler/goflow2/format/common" + _ "github.com/netsampler/goflow2/format/json" _ "github.com/netsampler/goflow2/format/protobuf" // import various transports @@ -85,8 +86,8 @@ func MapFlow(dbAsn, dbCountry *geoip2.Reader, msg *flowmessage.FlowMessageExt) { } func init() { - json.AddJSONField("SrcCountry", json.FORMAT_TYPE_STRING) - json.AddJSONField("DstCountry", json.FORMAT_TYPE_STRING) + common.AddTextField("SrcCountry", common.FORMAT_TYPE_STRING) + common.AddTextField("DstCountry", common.FORMAT_TYPE_STRING) } func main() { diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 0bca1e92..905b3a34 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -15,6 +15,7 @@ import ( "github.com/netsampler/goflow2/format" _ "github.com/netsampler/goflow2/format/json" _ "github.com/netsampler/goflow2/format/protobuf" + _ "github.com/netsampler/goflow2/format/text" // import various transports "github.com/netsampler/goflow2/transport" @@ -40,7 +41,6 @@ var ( Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", "))) Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", "))) - //FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") diff --git a/format/common/hash.go b/format/common/hash.go new file mode 100644 index 00000000..1d901860 --- /dev/null +++ b/format/common/hash.go @@ -0,0 +1,56 @@ +package common + +import ( + "flag" + "fmt" + "reflect" + "strings" + "sync" +) + +var ( + fieldsVar string + fields []string // Hashing fields + + hashDeclared bool + hashDeclaredLock = &sync.Mutex{} +) + +func HashFlag() { + hashDeclaredLock.Lock() + defer hashDeclaredLock.Unlock() + + if hashDeclared { + return + } + hashDeclared = true + flag.StringVar(&fieldsVar, "format.hash", "SamplerAddress", "List of fields to do hashing, separated by commas") + +} + +func ManualHashInit() error { + fields = strings.Split(fieldsVar, ",") + return nil +} + +func HashProtoLocal(msg interface{}) string { + return HashProto(fields, msg) +} + +func HashProto(fields []string, msg interface{}) string { + var keyStr string + + if msg != nil { + vfm := reflect.ValueOf(msg) + vfm = reflect.Indirect(vfm) + + for _, kf := range fields { + fieldValue := vfm.FieldByName(kf) + if fieldValue.IsValid() { + keyStr += fmt.Sprintf("%v-", fieldValue) + } + } + } + + return keyStr +} diff --git a/format/common/selector.go b/format/common/selector.go new file mode 100644 index 00000000..5c607998 --- /dev/null +++ b/format/common/selector.go @@ -0,0 +1,38 @@ +package common + +import ( + "flag" + "strings" + "sync" +) + +var ( + selectorVar string + selector []string // Hashing fields + selectorMap = make(map[string]bool) + + selectorDeclared bool + selectorDeclaredLock = &sync.Mutex{} +) + +func SelectorFlag() { + selectorDeclaredLock.Lock() + defer selectorDeclaredLock.Unlock() + + if selectorDeclared { + return + } + selectorDeclared = true + flag.StringVar(&selectorVar, "format.selector", "", "List of fields to do keep in output") +} + +func ManualSelectorInit() error { + if selectorVar == "" { + return nil + } + selector = strings.Split(selectorVar, ",") + for _, v := range selector { + selectorMap[v] = true + } + return nil +} diff --git a/format/common/text.go b/format/common/text.go new file mode 100644 index 00000000..23f7cf04 --- /dev/null +++ b/format/common/text.go @@ -0,0 +1,263 @@ +package common + +import ( + "encoding/binary" + "fmt" + "github.com/golang/protobuf/proto" + "net" + "reflect" + "strings" +) + +const ( + FORMAT_TYPE_UNKNOWN = iota + FORMAT_TYPE_STRING_FUNC + FORMAT_TYPE_STRING + FORMAT_TYPE_INTEGER + FORMAT_TYPE_IP + FORMAT_TYPE_MAC +) + +var ( + EtypeName = map[uint32]string{ + 0x806: "ARP", + 0x800: "IPv4", + 0x86dd: "IPv6", + } + ProtoName = map[uint32]string{ + 1: "ICMP", + 6: "TCP", + 17: "UDP", + 58: "ICMPv6", + } + IcmpTypeName = map[uint32]string{ + 0: "EchoReply", + 3: "DestinationUnreachable", + 8: "Echo", + 9: "RouterAdvertisement", + 10: "RouterSolicitation", + 11: "TimeExceeded", + } + Icmp6TypeName = map[uint32]string{ + 1: "DestinationUnreachable", + 2: "PacketTooBig", + 3: "TimeExceeded", + 128: "EchoRequest", + 129: "EchoReply", + 133: "RouterSolicitation", + 134: "RouterAdvertisement", + } + + TextFields = []string{ + "Type", + "TimeReceived", + "SequenceNum", + "SamplingRate", + "SamplerAddress", + "TimeFlowStart", + "TimeFlowEnd", + "Bytes", + "Packets", + "SrcAddr", + "DstAddr", + "Etype", + "Proto", + "SrcPort", + "DstPort", + "InIf", + "OutIf", + "SrcMac", + "DstMac", + "SrcVlan", + "DstVlan", + "VlanId", + "IngressVrfID", + "EgressVrfID", + "IPTos", + "ForwardingStatus", + "IPTTL", + "TCPFlags", + "IcmpType", + "IcmpCode", + "IPv6FlowLabel", + "FragmentId", + "FragmentOffset", + "BiFlowDirection", + "SrcAS", + "DstAS", + "NextHop", + "NextHopAS", + "SrcNet", + "DstNet", + } + TextFieldsTypes = []int{ + FORMAT_TYPE_STRING_FUNC, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_IP, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_IP, + FORMAT_TYPE_IP, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_MAC, + FORMAT_TYPE_MAC, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_IP, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + FORMAT_TYPE_INTEGER, + } + RenderExtras = []string{ + "EtypeName", + "ProtoName", + "IcmpName", + } + RenderExtraCall = []RenderExtraFunction{ + RenderExtraFunctionEtypeName, + RenderExtraFunctionProtoName, + RenderExtraFunctionIcmpName, + } +) + +func AddTextField(name string, jtype int) { + TextFields = append(TextFields, name) + TextFieldsTypes = append(TextFieldsTypes, jtype) +} + +type RenderExtraFunction func(proto.Message) string + +func RenderExtraFetchNumbers(msg proto.Message, fields []string) []uint64 { + vfm := reflect.ValueOf(msg) + vfm = reflect.Indirect(vfm) + + values := make([]uint64, len(fields)) + for i, kf := range fields { + fieldValue := vfm.FieldByName(kf) + if fieldValue.IsValid() { + values[i] = fieldValue.Uint() + } + } + + return values +} + +func RenderExtraFunctionEtypeName(msg proto.Message) string { + num := RenderExtraFetchNumbers(msg, []string{"Etype"}) + return EtypeName[uint32(num[0])] +} + +func RenderExtraFunctionProtoName(msg proto.Message) string { + num := RenderExtraFetchNumbers(msg, []string{"Proto"}) + return ProtoName[uint32(num[0])] +} +func RenderExtraFunctionIcmpName(msg proto.Message) string { + num := RenderExtraFetchNumbers(msg, []string{"Proto", "IcmpCode", "IcmpType"}) + return IcmpCodeType(uint32(num[0]), uint32(num[1]), uint32(num[2])) +} + +func IcmpCodeType(proto, icmpCode, icmpType uint32) string { + if proto == 1 { + return IcmpTypeName[icmpType] + } else if proto == 58 { + return Icmp6TypeName[icmpType] + } + return "" +} + +func RenderIP(addr []byte) string { + if addr == nil || (len(addr) != 4 && len(addr) != 16) { + return "" + } + + return net.IP(addr).String() +} + +func FormatMessageReflectText(msg proto.Message, ext string) string { + return FormatMessageReflectCustom(msg, ext, "", " ", "=", false) +} + +func FormatMessageReflectJSON(msg proto.Message, ext string) string { + return fmt.Sprintf("{%s}", FormatMessageReflectCustom(msg, ext, "\"", ",", ":", true)) +} + +func FormatMessageReflectCustom(msg proto.Message, ext, quotes, sep, sign string, null bool) string { + fstr := make([]string, len(TextFields)+len(RenderExtras)) + + vfm := reflect.ValueOf(msg) + vfm = reflect.Indirect(vfm) + + var i int + for j, kf := range TextFields { + fieldValue := vfm.FieldByName(kf) + if fieldValue.IsValid() { + + switch TextFieldsTypes[j] { + case FORMAT_TYPE_STRING_FUNC: + strMethod := fieldValue.MethodByName("String").Call([]reflect.Value{}) + fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, strMethod[0].String()) + case FORMAT_TYPE_STRING: + fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, fieldValue.String()) + case FORMAT_TYPE_INTEGER: + fstr[i] = fmt.Sprintf("%s%s%s%s%d", quotes, kf, quotes, sign, fieldValue.Uint()) + case FORMAT_TYPE_IP: + ip := fieldValue.Bytes() + fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, RenderIP(ip)) + case FORMAT_TYPE_MAC: + mac := make([]byte, 8) + binary.BigEndian.PutUint64(mac, fieldValue.Uint()) + fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, net.HardwareAddr(mac[2:]).String()) + default: + if null { + fstr[i] = fmt.Sprintf("%s%s%s%snull", quotes, kf, quotes, sign) + } + } + + } else { + if null { + fstr[i] = fmt.Sprintf("%s%s%s%snull", quotes, kf, quotes, sign) + } + } + if len(selectorMap) == 0 || selectorMap[kf] { + i++ + } + + } + + for j, e := range RenderExtras { + fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, e, quotes, sign, RenderExtraCall[j](msg)) + if len(selectorMap) == 0 || selectorMap[e] { + i++ + } + } + + if len(selectorMap) > 0 { + fstr = fstr[0:i] + } + + return strings.Join(fstr, sep) +} diff --git a/format/json/json.go b/format/json/json.go index 6e3205a2..f1113c52 100644 --- a/format/json/json.go +++ b/format/json/json.go @@ -2,215 +2,27 @@ package json import ( "context" - "encoding/binary" "fmt" "github.com/golang/protobuf/proto" "github.com/netsampler/goflow2/format" - "github.com/netsampler/goflow2/format/protobuf" - flowmessage "github.com/netsampler/goflow2/pb" - "net" - "reflect" - "strings" + "github.com/netsampler/goflow2/format/common" ) -const ( - FORMAT_TYPE_UNKNOWN = iota - FORMAT_TYPE_STRING_FUNC - FORMAT_TYPE_STRING - FORMAT_TYPE_INTEGER - FORMAT_TYPE_IP - FORMAT_TYPE_MAC -) - -var ( - EtypeName = map[uint32]string{ - 0x806: "ARP", - 0x800: "IPv4", - 0x86dd: "IPv6", - } - ProtoName = map[uint32]string{ - 1: "ICMP", - 6: "TCP", - 17: "UDP", - 58: "ICMPv6", - } - IcmpTypeName = map[uint32]string{ - 0: "EchoReply", - 3: "DestinationUnreachable", - 8: "Echo", - 9: "RouterAdvertisement", - 10: "RouterSolicitation", - 11: "TimeExceeded", - } - Icmp6TypeName = map[uint32]string{ - 1: "DestinationUnreachable", - 2: "PacketTooBig", - 3: "TimeExceeded", - 128: "EchoRequest", - 129: "EchoReply", - 133: "RouterSolicitation", - 134: "RouterAdvertisement", - } - - JsonFields = []string{ - "Type", - "TimeReceived", - "SequenceNum", - "SamplingRate", - "SamplerAddress", - "TimeFlowStart", - "TimeFlowEnd", - "Bytes", - "Packets", - "SrcAddr", - "DstAddr", - "Etype", - "Proto", - "SrcPort", - "DstPort", - "InIf", - "OutIf", - "SrcMac", - "DstMac", - "SrcVlan", - "DstVlan", - "VlanId", - "IngressVrfID", - "EgressVrfID", - "IPTos", - "ForwardingStatus", - "IPTTL", - "TCPFlags", - "IcmpType", - "IcmpCode", - "IPv6FlowLabel", - "FragmentId", - "FragmentOffset", - "BiFlowDirection", - "SrcAS", - "DstAS", - "NextHop", - "NextHopAS", - "SrcNet", - "DstNet", - } - JsonFieldsTypes = []int{ - FORMAT_TYPE_STRING_FUNC, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_IP, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_IP, - FORMAT_TYPE_IP, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_MAC, - FORMAT_TYPE_MAC, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_IP, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - FORMAT_TYPE_INTEGER, - } - JsonExtras = []string{ - "EtypeName", - "ProtoName", - "IcmpName", - } - JsonExtraCall = []JsonExtraFunction{ - JsonExtraFunctionEtypeName, - JsonExtraFunctionProtoName, - JsonExtraFunctionIcmpName, - } -) - -func AddJSONField(name string, jtype int) { - JsonFields = append(JsonFields, name) - JsonFieldsTypes = append(JsonFieldsTypes, jtype) -} - -type JsonExtraFunction func(proto.Message) string - -func JsonExtraFetchNumbers(msg proto.Message, fields []string) []uint64 { - vfm := reflect.ValueOf(msg) - vfm = reflect.Indirect(vfm) - - values := make([]uint64, len(fields)) - for i, kf := range fields { - fieldValue := vfm.FieldByName(kf) - if fieldValue.IsValid() { - values[i] = fieldValue.Uint() - } - } - - return values -} - -func JsonExtraFunctionEtypeName(msg proto.Message) string { - num := JsonExtraFetchNumbers(msg, []string{"Etype"}) - return EtypeName[uint32(num[0])] -} -func JsonExtraFunctionProtoName(msg proto.Message) string { - num := JsonExtraFetchNumbers(msg, []string{"Proto"}) - return ProtoName[uint32(num[0])] -} -func JsonExtraFunctionIcmpName(msg proto.Message) string { - num := JsonExtraFetchNumbers(msg, []string{"Proto", "IcmpCode", "IcmpType"}) - return IcmpCodeType(uint32(num[0]), uint32(num[1]), uint32(num[2])) -} - -func IcmpCodeType(proto, icmpCode, icmpType uint32) string { - if proto == 1 { - return IcmpTypeName[icmpType] - } else if proto == 58 { - return Icmp6TypeName[icmpType] - } - return "" -} - type JsonDriver struct { - fieldsVar string - fields []string // Hashing fields -} - -func RenderIP(addr []byte) string { - if addr == nil || (len(addr) != 4 && len(addr) != 16) { - return "" - } - - return net.IP(addr).String() } func (d *JsonDriver) Prepare() error { + common.HashFlag() + common.SelectorFlag() return nil } func (d *JsonDriver) Init(context.Context) error { - return protobuf.ManualInit() + err := common.ManualHashInit() + if err != nil { + return err + } + return common.ManualSelectorInit() } func (d *JsonDriver) Format(data interface{}) ([]byte, []byte, error) { @@ -219,151 +31,8 @@ func (d *JsonDriver) Format(data interface{}) ([]byte, []byte, error) { return nil, nil, fmt.Errorf("message is not protobuf") } - key := protobuf.HashProtoLocal(msg) - return []byte(key), []byte(FormatMessageReflect(msg, "")), nil -} - -func FormatMessageReflect(msg proto.Message, ext string) string { - fstr := make([]string, len(JsonFields)+len(JsonExtras)) - - vfm := reflect.ValueOf(msg) - vfm = reflect.Indirect(vfm) - - for i, kf := range JsonFields { - fieldValue := vfm.FieldByName(kf) - if fieldValue.IsValid() { - - switch JsonFieldsTypes[i] { - case FORMAT_TYPE_STRING_FUNC: - strMethod := fieldValue.MethodByName("String").Call([]reflect.Value{}) - fstr[i] = fmt.Sprintf("\"%s\":\"%s\"", kf, strMethod[0].String()) - case FORMAT_TYPE_STRING: - fstr[i] = fmt.Sprintf("\"%s\":\"%s\"", kf, fieldValue.String()) - case FORMAT_TYPE_INTEGER: - fstr[i] = fmt.Sprintf("\"%s\":%d", kf, fieldValue.Uint()) - case FORMAT_TYPE_IP: - ip := fieldValue.Bytes() - fstr[i] = fmt.Sprintf("\"%s\":\"%s\"", kf, RenderIP(ip)) - case FORMAT_TYPE_MAC: - mac := make([]byte, 8) - binary.BigEndian.PutUint64(mac, fieldValue.Uint()) - fstr[i] = fmt.Sprintf("\"%s\":\"%s\"", kf, net.HardwareAddr(mac[2:]).String()) - default: - fstr[i] = fmt.Sprintf("\"%s\":null", kf) - } - - } else { - fstr[i] = fmt.Sprintf("\"%s\":null", kf) - } - } - - for i, e := range JsonExtras { - fstr[i+len(JsonFields)] = fmt.Sprintf("\"%s\":\"%s\"", e, JsonExtraCall[i](msg)) - } - - return fmt.Sprintf("{%s}", strings.Join(fstr, ",")) -} - -func FormatMessage(msg *flowmessage.FlowMessage, ext string) string { - srcmac := make([]byte, 8) - dstmac := make([]byte, 8) - binary.BigEndian.PutUint64(srcmac, msg.SrcMac) - binary.BigEndian.PutUint64(dstmac, msg.DstMac) - srcmac = srcmac[2:8] - dstmac = dstmac[2:8] - - b := fmt.Sprintf( - "{"+ - "\"Type\":\"%v\","+ - "\"TimeReceived\":%d,"+ - "\"SequenceNum\":%d,"+ - "\"SamplingRate\":%d,"+ - "\"SamplerAddress\":\"%v\","+ - "\"TimeFlowStart\":%d,"+ - "\"TimeFlowEnd\":%d,"+ - "\"Bytes\":%d,"+ - "\"Packets\":%d,"+ - "\"SrcAddr\":\"%v\","+ - "\"DstAddr\":\"%v\","+ - "\"Etype\":%d,"+ - "\"EtypeName\":\"%s\","+ - "\"Proto\":%d,"+ - "\"ProtoName\":\"%s\","+ - "\"SrcPort\":%d,"+ - "\"DstPort\":%d,"+ - "\"InIf\":%d,"+ - "\"OutIf\":%d,"+ - "\"SrcMac\":\"%v\","+ - "\"DstMac\":\"%v\","+ - "\"SrcVlan\":%d,"+ - "\"DstVlan\":%d,"+ - "\"VlanId\":%d,"+ - "\"IngressVrfID\":%d,"+ - "\"EgressVrfID\":%d,"+ - "\"IPTos\":%d,"+ - "\"ForwardingStatus\":%d,"+ - "\"IPTTL\":%d,"+ - "\"TCPFlags\":%d,"+ - "\"IcmpType\":%d,"+ - "\"IcmpCode\":%d,"+ - "\"IcmpName\":\"%s\","+ - "\"IPv6FlowLabel\":%d,"+ - "\"FragmentId\":%d,"+ - "\"FragmentOffset\":%d,"+ - "\"BiFlowDirection\":\"%v\","+ - "\"SrcAS\":%d,"+ - "\"DstAS\":%d,"+ - "\"NextHop\":\"%v\","+ - "\"NextHopAS\":%d,"+ - "\"SrcNet\":%d,"+ - "\"DstNet\":%d"+ - "%s}", - msg.Type.String(), - msg.TimeReceived, - msg.SequenceNum, - msg.SamplingRate, - RenderIP(msg.SamplerAddress), - msg.TimeFlowStart, - msg.TimeFlowEnd, - msg.Bytes, - msg.Packets, - RenderIP(msg.SrcAddr), - RenderIP(msg.DstAddr), - msg.Etype, - EtypeName[msg.Etype], - msg.Proto, - ProtoName[msg.Proto], - msg.SrcPort, - msg.DstPort, - msg.InIf, - msg.OutIf, - net.HardwareAddr(srcmac).String(), - net.HardwareAddr(dstmac).String(), - msg.SrcVlan, - msg.DstVlan, - msg.VlanId, - msg.IngressVrfID, - msg.EgressVrfID, - msg.IPTos, - msg.ForwardingStatus, - msg.IPTTL, - msg.TCPFlags, - msg.IcmpType, - msg.IcmpCode, - IcmpCodeType(msg.Proto, msg.IcmpCode, msg.IcmpType), - msg.IPv6FlowLabel, - msg.FragmentId, - msg.FragmentOffset, - msg.BiFlowDirection, - msg.SrcAS, - msg.DstAS, - RenderIP(msg.NextHop), - msg.NextHopAS, - msg.SrcNet, - msg.DstNet, - ext) - - return b + key := common.HashProtoLocal(msg) + return []byte(key), []byte(common.FormatMessageReflectJSON(msg, "")), nil } func init() { diff --git a/format/protobuf/protobuf.go b/format/protobuf/protobuf.go index f8cc91f6..a59d1434 100644 --- a/format/protobuf/protobuf.go +++ b/format/protobuf/protobuf.go @@ -6,13 +6,7 @@ import ( "fmt" "github.com/golang/protobuf/proto" "github.com/netsampler/goflow2/format" - "reflect" - "strings" -) - -var ( - fieldsVar string - fields []string // Hashing fields + "github.com/netsampler/goflow2/format/common" ) type ProtobufDriver struct { @@ -20,18 +14,13 @@ type ProtobufDriver struct { } func (d *ProtobufDriver) Prepare() error { - flag.StringVar(&fieldsVar, "format.hash", "SamplerAddress", "List of fields to do hashing, separated by commas") + common.HashFlag() flag.BoolVar(&d.fixedLen, "format.protobuf.fixedlen", false, "Prefix the protobuf with message length") return nil } -func ManualInit() error { - fields = strings.Split(fieldsVar, ",") - return nil -} - func (d *ProtobufDriver) Init(context.Context) error { - return ManualInit() + return common.ManualHashInit() } func (d *ProtobufDriver) Format(data interface{}) ([]byte, []byte, error) { @@ -39,7 +28,7 @@ func (d *ProtobufDriver) Format(data interface{}) ([]byte, []byte, error) { if !ok { return nil, nil, fmt.Errorf("message is not protobuf") } - key := HashProtoLocal(msg) + key := common.HashProtoLocal(msg) if !d.fixedLen { b, err := proto.Marshal(msg) @@ -55,25 +44,3 @@ func init() { d := &ProtobufDriver{} format.RegisterFormatDriver("pb", d) } - -func HashProtoLocal(msg interface{}) string { - return HashProto(fields, msg) -} - -func HashProto(fields []string, msg interface{}) string { - var keyStr string - - if msg != nil { - vfm := reflect.ValueOf(msg) - vfm = reflect.Indirect(vfm) - - for _, kf := range fields { - fieldValue := vfm.FieldByName(kf) - if fieldValue.IsValid() { - keyStr += fmt.Sprintf("%v-", fieldValue) - } - } - } - - return keyStr -} diff --git a/format/text/text.go b/format/text/text.go new file mode 100644 index 00000000..dfb108e6 --- /dev/null +++ b/format/text/text.go @@ -0,0 +1,41 @@ +package text + +import ( + "context" + "fmt" + "github.com/golang/protobuf/proto" + "github.com/netsampler/goflow2/format" + "github.com/netsampler/goflow2/format/common" +) + +type TextDriver struct { +} + +func (d *TextDriver) Prepare() error { + common.HashFlag() + common.SelectorFlag() + return nil +} + +func (d *TextDriver) Init(context.Context) error { + err := common.ManualHashInit() + if err != nil { + return err + } + return common.ManualSelectorInit() +} + +func (d *TextDriver) Format(data interface{}) ([]byte, []byte, error) { + msg, ok := data.(proto.Message) + if !ok { + return nil, nil, fmt.Errorf("message is not protobuf") + } + + key := common.HashProtoLocal(msg) + return []byte(key), []byte(common.FormatMessageReflectText(msg, "")), nil +} + +func init() { + d := &TextDriver{} + format.RegisterFormatDriver("text", d) +}