From b4c14704f2f1fe9a6b60f2fe36cc5d8d4f092f17 Mon Sep 17 00:00:00 2001 From: Batyrkhan Koshenov Date: Mon, 31 Oct 2022 13:24:38 +0600 Subject: [PATCH] producer, utils: refactor old funcs for new FlowSets field type Signed-off-by: Batyrkhan Koshenov --- producer/producer_nf.go | 50 +----- producer/producer_nflegacy.go | 2 +- producer/producer_sf.go | 11 +- producer/producer_test.go | 18 ++- utils/netflow.go | 295 +++++++++++++++++----------------- 5 files changed, 174 insertions(+), 202 deletions(-) diff --git a/producer/producer_nf.go b/producer/producer_nf.go index 53da126..4369bbe 100644 --- a/producer/producer_nf.go +++ b/producer/producer_nf.go @@ -34,7 +34,7 @@ func CreateSamplingSystem() SamplingRateSystem { func (s *basicSamplingRateSystem) AddSamplingRate(version uint16, obsDomainId uint32, samplingRate uint32) { s.samplinglock.Lock() _, exists := s.sampling[version] - if exists != true { + if !exists { s.sampling[version] = make(map[uint32]uint32) } s.sampling[version][obsDomainId] = samplingRate @@ -119,7 +119,7 @@ func DecodeUNumber(b []byte, out interface{}) error { iter++ } } else { - return errors.New(fmt.Sprintf("Non-regular number of bytes for a number: %v", l)) + return fmt.Errorf("non-regular number of bytes for a number: %v", l) } } switch t := out.(type) { @@ -132,7 +132,7 @@ func DecodeUNumber(b []byte, out interface{}) error { case *uint64: *t = o default: - return errors.New("The parameter is not a pointer to a byte/uint16/uint32/uint64 structure") + return errors.New("the parameter is not a pointer to a byte/uint16/uint32/uint64 structure") } return nil } @@ -150,13 +150,9 @@ func ConvertNetFlowDataSet(version uint16, baseTime uint32, uptime uint32, recor for i := range record { df := record[i] - v, ok := df.Value.([]byte) - if !ok { - continue - } + v := df.Value switch df.Type { - // Statistics case netflow.NFV9_FIELD_IN_BYTES: DecodeUNumber(v, &(flowMessage.Bytes)) @@ -382,43 +378,11 @@ func SearchNetFlowOptionDataSets(dataFlowSet []netflow.OptionsDataFlowSet) (uint } func SplitNetFlowSets(packetNFv9 netflow.NFv9Packet) ([]netflow.DataFlowSet, []netflow.TemplateFlowSet, []netflow.NFv9OptionsTemplateFlowSet, []netflow.OptionsDataFlowSet) { - dataFlowSet := make([]netflow.DataFlowSet, 0) - templatesFlowSet := make([]netflow.TemplateFlowSet, 0) - optionsTemplatesFlowSet := make([]netflow.NFv9OptionsTemplateFlowSet, 0) - optionsDataFlowSet := make([]netflow.OptionsDataFlowSet, 0) - for _, flowSet := range packetNFv9.FlowSets { - switch flowSet.(type) { - case netflow.TemplateFlowSet: - templatesFlowSet = append(templatesFlowSet, flowSet.(netflow.TemplateFlowSet)) - case netflow.NFv9OptionsTemplateFlowSet: - optionsTemplatesFlowSet = append(optionsTemplatesFlowSet, flowSet.(netflow.NFv9OptionsTemplateFlowSet)) - case netflow.DataFlowSet: - dataFlowSet = append(dataFlowSet, flowSet.(netflow.DataFlowSet)) - case netflow.OptionsDataFlowSet: - optionsDataFlowSet = append(optionsDataFlowSet, flowSet.(netflow.OptionsDataFlowSet)) - } - } - return dataFlowSet, templatesFlowSet, optionsTemplatesFlowSet, optionsDataFlowSet + return packetNFv9.DataFS, packetNFv9.TemplateFS, packetNFv9.NFv9OptionsTemplateFS, packetNFv9.OptionsDataFS } func SplitIPFIXSets(packetIPFIX netflow.IPFIXPacket) ([]netflow.DataFlowSet, []netflow.TemplateFlowSet, []netflow.IPFIXOptionsTemplateFlowSet, []netflow.OptionsDataFlowSet) { - dataFlowSet := make([]netflow.DataFlowSet, 0) - templatesFlowSet := make([]netflow.TemplateFlowSet, 0) - optionsTemplatesFlowSet := make([]netflow.IPFIXOptionsTemplateFlowSet, 0) - optionsDataFlowSet := make([]netflow.OptionsDataFlowSet, 0) - for _, flowSet := range packetIPFIX.FlowSets { - switch flowSet.(type) { - case netflow.TemplateFlowSet: - templatesFlowSet = append(templatesFlowSet, flowSet.(netflow.TemplateFlowSet)) - case netflow.IPFIXOptionsTemplateFlowSet: - optionsTemplatesFlowSet = append(optionsTemplatesFlowSet, flowSet.(netflow.IPFIXOptionsTemplateFlowSet)) - case netflow.DataFlowSet: - dataFlowSet = append(dataFlowSet, flowSet.(netflow.DataFlowSet)) - case netflow.OptionsDataFlowSet: - optionsDataFlowSet = append(optionsDataFlowSet, flowSet.(netflow.OptionsDataFlowSet)) - } - } - return dataFlowSet, templatesFlowSet, optionsTemplatesFlowSet, optionsDataFlowSet + return packetIPFIX.DataFS, packetIPFIX.TemplateFS, packetIPFIX.IPFIXOptionsTemplateFS, packetIPFIX.OptionsDataFS } // Convert a NetFlow datastructure to a FlowMessage protobuf @@ -474,7 +438,7 @@ func ProcessMessageNetFlow(msgDec interface{}, samplingRateSys SamplingRateSyste fmsg.SamplingRate = uint64(samplingRate) } default: - return flowMessageSet, errors.New("Bad NetFlow/IPFIX version") + return flowMessageSet, errors.New("bad NetFlow/IPFIX version") } return flowMessageSet, nil diff --git a/producer/producer_nflegacy.go b/producer/producer_nflegacy.go index 990da0e..3dd1a94 100644 --- a/producer/producer_nflegacy.go +++ b/producer/producer_nflegacy.go @@ -74,6 +74,6 @@ func ProcessMessageNetFlowLegacy(msgDec interface{}) ([]*flowmessage.FlowMessage return flowMessageSet, nil default: - return []*flowmessage.FlowMessage{}, errors.New("Bad NetFlow v5 version") + return []*flowmessage.FlowMessage{}, errors.New("bad NetFlow v5 version") } } diff --git a/producer/producer_sf.go b/producer/producer_sf.go index 9217c67..47cfb33 100644 --- a/producer/producer_sf.go +++ b/producer/producer_sf.go @@ -319,9 +319,11 @@ func ParseSampledHeaderConfig(flowMessage *flowmessage.FlowMessage, sampledHeade return nil } +/* func SearchSFlowSamples(samples []interface{}) []*flowmessage.FlowMessage { return SearchSFlowSamples(samples) } +*/ func SearchSFlowSamplesConfig(samples []interface{}, config *SFlowProducerConfig, agent net.IP) []*flowmessage.FlowMessage { flowMessageSet := make([]*flowmessage.FlowMessage, 0) @@ -345,9 +347,10 @@ func SearchSFlowSamplesConfig(samples []interface{}, config *SFlowProducerConfig flowMessage.OutIf = flowSample.OutputIfValue } - ipNh := net.IP{} - ipSrc := net.IP{} - ipDst := net.IP{} + var ( + ipNh, ipSrc, ipDst net.IP + ) + flowMessage.Packets = 1 for _, record := range records { switch recordData := record.Data.(type) { @@ -425,6 +428,6 @@ func ProcessMessageSFlowConfig(msgDec interface{}, config *SFlowProducerConfig) return flowMessageSet, nil default: - return []*flowmessage.FlowMessage{}, errors.New("Bad sFlow version") + return []*flowmessage.FlowMessage{}, errors.New("bad sFlow version") } } diff --git a/producer/producer_test.go b/producer/producer_test.go index fcefb3e..c2269f0 100644 --- a/producer/producer_test.go +++ b/producer/producer_test.go @@ -10,30 +10,32 @@ import ( func TestProcessMessageNetFlow(t *testing.T) { records := []netflow.DataRecord{ - netflow.DataRecord{ + { Values: []netflow.DataField{ - netflow.DataField{ + { Type: netflow.NFV9_FIELD_IPV4_SRC_ADDR, Value: []byte{10, 0, 0, 1}, }, }, }, } - dfs := []interface{}{ - netflow.DataFlowSet{ + + dfs := []netflow.DataFlowSet{ + { Records: records, }, } pktnf9 := netflow.NFv9Packet{ - FlowSets: dfs, + FlowSets: netflow.FlowSets{DataFS: dfs}, } + testsr := &SingleSamplingRateSystem{1} _, err := ProcessMessageNetFlow(pktnf9, testsr) assert.Nil(t, err) pktipfix := netflow.IPFIXPacket{ - FlowSets: dfs, + FlowSets: netflow.FlowSets{DataFS: dfs}, } _, err = ProcessMessageNetFlow(pktipfix, testsr) assert.Nil(t, err) @@ -58,7 +60,7 @@ func TestProcessMessageSFlow(t *testing.T) { sflow.FlowSample{ SamplingRate: 1, Records: []sflow.FlowRecord{ - sflow.FlowRecord{ + { Data: sh, }, }, @@ -66,7 +68,7 @@ func TestProcessMessageSFlow(t *testing.T) { sflow.ExpandedFlowSample{ SamplingRate: 1, Records: []sflow.FlowRecord{ - sflow.FlowRecord{ + { Data: sh, }, }, diff --git a/utils/netflow.go b/utils/netflow.go index e1c2b11..9a98fb1 100644 --- a/utils/netflow.go +++ b/utils/netflow.go @@ -96,7 +96,10 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { } timeTrackStart := time.Now() - msgDec, err := netflow.DecodeMessage(buf, templates) + + fm := netflow.FlowMessage{} + err := fm.Decode(buf, templates) + if err != nil { switch err.(type) { case *netflow.ErrorVersion: @@ -133,8 +136,7 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { flowMessageSet := make([]*flowmessage.FlowMessage, 0) - switch msgDecConv := msgDec.(type) { - case netflow.NFv9Packet: + if fm.Version == 9 { NetFlowStats.With( prometheus.Labels{ "router": key, @@ -142,77 +144,78 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { }). Inc() - for _, fs := range msgDecConv.FlowSets { - switch fsConv := fs.(type) { - case netflow.TemplateFlowSet: - NetFlowSetStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "9", - "type": "TemplateFlowSet", - }). - Inc() - - NetFlowSetRecordsStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "9", - "type": "OptionsTemplateFlowSet", - }). - Add(float64(len(fsConv.Records))) - - case netflow.NFv9OptionsTemplateFlowSet: - NetFlowSetStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "9", - "type": "OptionsTemplateFlowSet", - }). - Inc() - - NetFlowSetRecordsStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "9", - "type": "OptionsTemplateFlowSet", - }). - Add(float64(len(fsConv.Records))) - - case netflow.OptionsDataFlowSet: - NetFlowSetStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "9", - "type": "OptionsDataFlowSet", - }). - Inc() - - NetFlowSetRecordsStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "9", - "type": "OptionsDataFlowSet", - }). - Add(float64(len(fsConv.Records))) - case netflow.DataFlowSet: - NetFlowSetStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "9", - "type": "DataFlowSet", - }). - Inc() - - NetFlowSetRecordsStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "9", - "type": "DataFlowSet", - }). - Add(float64(len(fsConv.Records))) - } + for _, fs := range fm.PacketNFv9.TemplateFS { + NetFlowSetStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "9", + "type": "TemplateFlowSet", + }). + Inc() + + NetFlowSetRecordsStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "9", + "type": "OptionsTemplateFlowSet", + }). + Add(float64(len(fs.Records))) + } + + for _, fs := range fm.PacketNFv9.NFv9OptionsTemplateFS { + NetFlowSetStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "9", + "type": "OptionsTemplateFlowSet", + }). + Inc() + + NetFlowSetRecordsStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "9", + "type": "OptionsTemplateFlowSet", + }). + Add(float64(len(fs.Records))) + } + + for _, fs := range fm.PacketNFv9.OptionsDataFS { + NetFlowSetStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "9", + "type": "OptionsDataFlowSet", + }). + Inc() + + NetFlowSetRecordsStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "9", + "type": "OptionsDataFlowSet", + }). + Add(float64(len(fs.Records))) + } + + for _, fs := range fm.PacketNFv9.DataFS { + NetFlowSetStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "9", + "type": "DataFlowSet", + }). + Inc() + + NetFlowSetRecordsStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "9", + "type": "DataFlowSet", + }). + Add(float64(len(fs.Records))) } - flowMessageSet, err = producer.ProcessMessageNetFlow(msgDecConv, sampling) + flowMessageSet, err = producer.ProcessMessageNetFlow(fm.PacketNFv9, sampling) for _, fmsg := range flowMessageSet { fmsg.TimeReceived = ts @@ -225,7 +228,7 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { }). Observe(float64(timeDiff)) } - case netflow.IPFIXPacket: + } else if fm.Version == 10 { NetFlowStats.With( prometheus.Labels{ "router": key, @@ -233,79 +236,79 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { }). Inc() - for _, fs := range msgDecConv.FlowSets { - switch fsConv := fs.(type) { - case netflow.TemplateFlowSet: - NetFlowSetStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "10", - "type": "TemplateFlowSet", - }). - Inc() - - NetFlowSetRecordsStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "10", - "type": "TemplateFlowSet", - }). - Add(float64(len(fsConv.Records))) - - case netflow.IPFIXOptionsTemplateFlowSet: - NetFlowSetStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "10", - "type": "OptionsTemplateFlowSet", - }). - Inc() - - NetFlowSetRecordsStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "10", - "type": "OptionsTemplateFlowSet", - }). - Add(float64(len(fsConv.Records))) - - case netflow.OptionsDataFlowSet: - - NetFlowSetStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "10", - "type": "OptionsDataFlowSet", - }). - Inc() - - NetFlowSetRecordsStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "10", - "type": "OptionsDataFlowSet", - }). - Add(float64(len(fsConv.Records))) - - case netflow.DataFlowSet: - NetFlowSetStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "10", - "type": "DataFlowSet", - }). - Inc() - - NetFlowSetRecordsStatsSum.With( - prometheus.Labels{ - "router": key, - "version": "10", - "type": "DataFlowSet", - }). - Add(float64(len(fsConv.Records))) - } + for _, fs := range fm.PacketIPFIX.TemplateFS { + NetFlowSetStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "10", + "type": "TemplateFlowSet", + }). + Inc() + + NetFlowSetRecordsStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "10", + "type": "TemplateFlowSet", + }). + Add(float64(len(fs.Records))) + } + + for _, fs := range fm.PacketIPFIX.IPFIXOptionsTemplateFS { + NetFlowSetStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "10", + "type": "OptionsTemplateFlowSet", + }). + Inc() + + NetFlowSetRecordsStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "10", + "type": "OptionsTemplateFlowSet", + }). + Add(float64(len(fs.Records))) } - flowMessageSet, err = producer.ProcessMessageNetFlow(msgDecConv, sampling) + + for _, fs := range fm.PacketIPFIX.OptionsDataFS { + NetFlowSetStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "10", + "type": "OptionsDataFlowSet", + }). + Inc() + + NetFlowSetRecordsStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "10", + "type": "OptionsDataFlowSet", + }). + Add(float64(len(fs.Records))) + } + + for _, fs := range fm.PacketIPFIX.DataFS { + NetFlowSetStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "10", + "type": "DataFlowSet", + }). + Inc() + + NetFlowSetRecordsStatsSum.With( + prometheus.Labels{ + "router": key, + "version": "10", + "type": "DataFlowSet", + }). + Add(float64(len(fs.Records))) + } + + flowMessageSet, err = producer.ProcessMessageNetFlow(fm.PacketIPFIX, sampling) for _, fmsg := range flowMessageSet { fmsg.TimeReceived = ts