diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..849ddff --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +dist/ diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index bf7a03f..0000000 --- a/Dockerfile +++ /dev/null @@ -1,19 +0,0 @@ -FROM golang:alpine as builder -ARG LDFLAGS="" - -RUN apk --update --no-cache add git build-base gcc - -COPY . /build -WORKDIR /build - -RUN go build -ldflags "${LDFLAGS}" -o goflow cmd/goflow/goflow.go - -FROM alpine:latest -ARG src_dir - -RUN apk update --no-cache && \ - adduser -S -D -H -h / flow -USER flow -COPY --from=builder /build/goflow / - -ENTRYPOINT ["./goflow"] diff --git a/Dockerfile.prod b/Dockerfile.prod deleted file mode 100644 index cbf760b..0000000 --- a/Dockerfile.prod +++ /dev/null @@ -1,17 +0,0 @@ -ARG src_uri=github.com/cloudflare/goflow - -FROM golang:alpine as builder -ARG src_uri - -RUN apk --update --no-cache add git && \ - go get -u $src_uri - -FROM alpine:latest -ARG src_uri - -RUN apk update --no-cache && \ - adduser -S -D -H -h / flow -USER flow -COPY --from=builder /go/bin/goflow / - -ENTRYPOINT ["./goflow"] diff --git a/LICENSE.txt b/LICENSE.txt deleted file mode 100644 index 7d190cc..0000000 --- a/LICENSE.txt +++ /dev/null @@ -1,11 +0,0 @@ -Copyright (c) 2018, Cloudflare. All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - -3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile b/Makefile deleted file mode 100644 index 5166b99..0000000 --- a/Makefile +++ /dev/null @@ -1,92 +0,0 @@ -EXTENSION ?= -DIST_DIR ?= dist/ -GOOS ?= linux -ARCH ?= $(shell uname -m) -BUILDINFOSDET ?= - -DOCKER_REPO := cloudflare/ -GOFLOW_NAME := goflow -GOFLOW_VERSION := $(shell git describe --tags $(git rev-list --tags --max-count=1)) -VERSION_PKG := $(shell echo $(GOFLOW_VERSION) | sed 's/^v//g') -ARCH := x86_64 -LICENSE := BSD-3 -URL := https://github.com/cloudflare/goflow -DESCRIPTION := GoFlow: an sFlow/IPFIX/NetFlow v9/v5 collector to Kafka -BUILDINFOS := ($(shell date +%FT%T%z)$(BUILDINFOSDET)) -LDFLAGS := '-X main.version=$(GOFLOW_VERSION) -X main.buildinfos=$(BUILDINFOS)' - -OUTPUT_GOFLOW := $(DIST_DIR)goflow-$(GOFLOW_VERSION)-$(GOOS)-$(ARCH)$(EXTENSION) - -OUTPUT_GOFLOW_LIGHT_SFLOW := $(DIST_DIR)goflow-sflow-$(GOFLOW_VERSION)-$(GOOS)-$(ARCH)$(EXTENSION) -OUTPUT_GOFLOW_LIGHT_NF := $(DIST_DIR)goflow-netflow-$(GOFLOW_VERSION)-$(GOOS)-$(ARCH)$(EXTENSION) -OUTPUT_GOFLOW_LIGHT_NFV5 := $(DIST_DIR)goflow-nflegacy-$(GOFLOW_VERSION)-$(GOOS)-$(ARCH)$(EXTENSION) - -.PHONY: all -all: test-race vet test - -.PHONY: proto -proto: - @echo generating protobuf - protoc --go_out=. --plugin=$(PROTOCPATH)protoc-gen-go pb/*.proto - -.PHONY: test -test: - @echo testing code - go test ./... - -.PHONY: vet -vet: - @echo checking code is vetted - go vet $(shell go list ./...) - -.PHONY: test-race -test-race: - @echo testing code for races - go test -race ./... - -.PHONY: prepare -prepare: - mkdir -p $(DIST_DIR) - -.PHONY: clean -clean: - rm -rf $(DIST_DIR) - -.PHONY: build-goflow -build-goflow: prepare - go build -ldflags $(LDFLAGS) -o $(OUTPUT_GOFLOW) cmd/goflow/goflow.go - -.PHONY: build-goflow-light -build-goflow-light: prepare - go build -ldflags $(LDFLAGS) -o $(OUTPUT_GOFLOW_LIGHT_SFLOW) cmd/csflow/csflow.go - go build -ldflags $(LDFLAGS) -o $(OUTPUT_GOFLOW_LIGHT_NF) cmd/cnetflow/cnetflow.go - go build -ldflags $(LDFLAGS) -o $(OUTPUT_GOFLOW_LIGHT_NFV5) cmd/cnflegacy/cnflegacy.go - -.PHONY: docker-goflow -docker-goflow: - docker build -t $(DOCKER_REPO)$(GOFLOW_NAME):$(GOFLOW_VERSION) --build-arg LDFLAGS=$(LDFLAGS) -f Dockerfile . - -.PHONY: package-deb-goflow -package-deb-goflow: prepare - fpm -s dir -t deb -n $(GOFLOW_NAME) -v $(VERSION_PKG) \ - --description "$(DESCRIPTION)" \ - --url "$(URL)" \ - --architecture $(ARCH) \ - --license "$(LICENSE)" \ - --deb-no-default-config-files \ - --package $(DIST_DIR) \ - $(OUTPUT_GOFLOW)=/usr/bin/goflow \ - package/goflow.service=/lib/systemd/system/goflow.service \ - package/goflow.env=/etc/default/goflow - -.PHONY: package-rpm-goflow -package-rpm-goflow: prepare - fpm -s dir -t rpm -n $(GOFLOW_NAME) -v $(VERSION_PKG) \ - --description "$(DESCRIPTION)" \ - --url "$(URL)" \ - --architecture $(ARCH) \ - --license "$(LICENSE) "\ - --package $(DIST_DIR) \ - $(OUTPUT_GOFLOW)=/usr/bin/goflow \ - package/goflow.service=/lib/systemd/system/goflow.service \ - package/goflow.env=/etc/default/goflow diff --git a/README.md b/README.md index 00cbb7d..6abedb4 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,24 @@ -# GoFlow +# GoFlow ClickHouse -This application is a NetFlow/IPFIX/sFlow collector in Go. +This is a fork of cloudflare's GoFlow, is a NetFlow/IPFIX/sFlow collector in Go. It gathers network information (IP, interfaces, routers) from different flow protocols, -serializes it in a protobuf format and sends the messages to Kafka using Sarama's library. +serializes it in a protobuf format and ~~sends the messages to Kafka using Sarama's library~~ stores the indexed data into [ClickHouse](https://clickhouse.tech/), +a FOSS, blazing-fast column based DB great for persistent storage of repetitive data. + +Just to put Java out of the loop :) + +If ClickHouse runs out RAM during search at any point, simply put `2` in it's config file. + +(You will need to setup ClickHouse separately) + +## TLDR; / quick start + +To quickly get started, simply run `make build-goflow` and get the binary in `dist/` folder. + +## Limitations of the ClickHouse fork + +MPLS data is also not recorded. However, it'd be very easy to change the code to fit those changes, simply modify the schema and the publish functions. ## Why @@ -21,8 +36,7 @@ which contains the fields a network engineer is interested in. The flow packets usually contains multiples samples This acts as an abstraction of a sample. -The `transport` provides different way of processing the protobuf. Either sending it via Kafka or -print it on the console. +The `transport` provides clickhouse storage. Finally, `utils` provide functions that are directly used by the CLI utils. GoFlow is a wrapper of all the functions and chains thems into producing bytes into Kafka. @@ -60,7 +74,6 @@ Collection: Production: * Convert to protobuf -* Sends to Kafka producer * Prints to the console Monitoring: @@ -77,138 +90,10 @@ Download the latest release and just run the following command: ``` ./goflow -h ``` - -Enable or disable a protocol using `-nf=false` or `-sflow=false`. -Define the port and addresses of the protocols using `-nf.addr`, `-nf.port` for NetFlow and `-sflow.addr`, `-slow.port` for sFlow. - -Set the brokers or the Kafka brokers SRV record using: `-kafka.brokers 127.0.0.1:9092,[::1]:9092` or `-kafka.srv`. -Disable Kafka sending `-kafka=false`. -You can hash the protobuf by key when you send it to Kafka. - -You can collect NetFlow/IPFIX, NetFlow v5 and sFlow using the same collector -or use the single-protocol collectors. - -You can define the number of workers per protocol using `-workers` . - -## Docker - -We also provide a all-in-one Docker container. To run it in debug mode without sending into Kafka: - -``` -$ sudo docker run --net=host -ti cloudflare/goflow:latest -kafka=false -``` - -## Environment - -To get an example of pipeline, check out [flow-pipeline](https://github.com/cloudflare/flow-pipeline) - -### How is it used at Cloudflare - -The samples flowing into Kafka are **processed** and special fields are inserted using other databases: -* User plan -* Country -* ASN and BGP information - -The extended protobuf has the same base of the one in this repo. The **compatibility** with other software -is preserved when adding new fields (thus the fields will be lost if re-serialized). - -Once the updated flows are back into Kafka, they are **consumed** by **database inserters** (Clickhouse, Amazon Redshift, Google BigTable...) -to allow for static analysis. Other teams access the network data just like any other log (SQL query). - -### Output format - -If you want to develop applications, build `pb/flow.proto` into the language you want: - -Example in Go: -``` -PROTOCPATH=$HOME/go/bin/ make proto -``` - -Example in Java: - ``` -export SRC_DIR="path/to/goflow-pb" -export DST_DIR="path/to/java/app/src/main/java" -protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/flow.proto +./goflow -ch.database="homestat" -ch.table="nflowVPN" -ch.addr="127.0.0.1" -ch.username="default" -ch.password="" -loglevel="debug" -sflow="false" -nfl="false" -metrics.addr="0.0.0.0:8057" -nf.port="2057" -nf.addr="xxx.xxx.xxx.xxx" ``` -The fields are listed in the following table. - -You can find information on how they are populated from the original source: -* For [sFlow](https://sflow.org/developers/specifications.php) -* For [NetFlow v5](https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html) -* For [NetFlow v9](https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html) -* For [IPFIX](https://www.iana.org/assignments/ipfix/ipfix.xhtml) - -| Field | Description | NetFlow v5 | sFlow | NetFlow v9 | IPFIX | -| - | - | - | - | - | - | -|Type|Type of flow message|NETFLOW_V5|SFLOW_5|NETFLOW_V9|IPFIX| -|TimeReceived|Timestamp of when the message was received|Included|Included|Included|Included| -|SequenceNum|Sequence number of the flow packet|Included|Included|Included|Included| -|SamplingRate|Sampling rate of the flow|Included|Included|Included|Included| -|FlowDirection|Direction of the flow| | |DIRECTION (61)|flowDirection (61)| -|SamplerAddress|Address of the device that generated the packet|IP source of packet|Agent IP|IP source of packet|IP source of packet| -|TimeFlowStart|Time the flow started|System uptime and first|=TimeReceived|System uptime and FIRST_SWITCHED (22)|flowStartXXX (150, 152, 154, 156)| -|TimeFlowEnd|Time the flow ended|System uptime and last|=TimeReceived|System uptime and LAST_SWITCHED (23)|flowEndXXX (151, 153, 155, 157)| -|Bytes|Number of bytes in flow|dOctets|Length of sample|IN_BYTES (1) OUT_BYTES (23)|octetDeltaCount (1) postOctetDeltaCount (23)| -|Packets|Number of packets in flow|dPkts|=1|IN_PKTS (2) OUT_PKTS (24)|packetDeltaCount (1) postPacketDeltaCount (24)| -|SrcAddr|Source address (IP)|srcaddr (IPv4 only)|Included|Included|IPV4_SRC_ADDR (8) IPV6_SRC_ADDR (27)|sourceIPv4Address/sourceIPv6Address (8/27)| -|DstAddr|Destination address (IP)|dstaddr (IPv4 only)|Included|Included|IPV4_DST_ADDR (12) IPV6_DST_ADDR (28)|destinationIPv4Address (12)destinationIPv6Address (28)| -|Etype|Ethernet type (0x86dd for IPv6...)|IPv4|Included|Included|Included| -|Proto|Protocol (UDP, TCP, ICMP...)|prot|Included|PROTOCOL (4)|protocolIdentifier (4)| -|SrcPort|Source port (when UDP/TCP/SCTP)|srcport|Included|L4_DST_PORT (11)|destinationTransportPort (11)| -|DstPort|Destination port (when UDP/TCP/SCTP)|dstport|Included|L4_SRC_PORT (7)|sourceTransportPort (7)| -|InIf|Input interface|input|Included|INPUT_SNMP (10)|ingressInterface (10)| -|OutIf|Output interface|output|Included|OUTPUT_SNMP (14)|egressInterface (14)| -|SrcMac|Source mac address| |Included|IN_SRC_MAC (56)|sourceMacAddress (56)| -|DstMac|Destination mac address| |Included|OUT_DST_MAC (57)|postDestinationMacAddress (57)| -|SrcVlan|Source VLAN ID| |From ExtendedSwitch|SRC_VLAN (59)|vlanId (58)| -|DstVlan|Destination VLAN ID| |From ExtendedSwitch|DST_VLAN (59)|postVlanId (59)| -|VlanId|802.11q VLAN ID| |Included|SRC_VLAN (59)|postVlanId (59)| -|IngressVrfID|VRF ID| | | |ingressVRFID (234)| -|EgressVrfID|VRF ID| | | |egressVRFID (235)| -|IPTos|IP Type of Service|tos|Included|SRC_TOS (5)|ipClassOfService (5)| -|ForwardingStatus|Forwarding status| | |FORWARDING_STATUS (89)|forwardingStatus (89)| -|IPTTL|IP Time to Live| |Included|IPTTL (52)|minimumTTL (52| -|TCPFlags|TCP flags|tcp_flags|Included|TCP_FLAGS (6)|tcpControlBits (6)| -|IcmpType|ICMP Type| |Included|ICMP_TYPE (32)|icmpTypeXXX (176, 178) icmpTypeCodeXXX (32, 139)| -|IcmpCode|ICMP Code| |Included|ICMP_TYPE (32)|icmpCodeXXX (177, 179) icmpTypeCodeXXX (32, 139)| -|IPv6FlowLabel|IPv6 Flow Label| |Included|IPV6_FLOW_LABEL (31)|flowLabelIPv6 (31)| -|FragmentId|IP Fragment ID| |Included|IPV4_IDENT (54)|fragmentIdentification (54)| -|FragmentOffset|IP Fragment Offset| |Included|FRAGMENT_OFFSET (88)|fragmentOffset (88) and fragmentFlags (197)| -|BiFlowDirection|BiFlow Identification| | | |biflowDirection (239)| -|SrcAS|Source AS number|src_as|From ExtendedGateway|SRC_AS (16)|bgpSourceAsNumber (16)| -|DstAS|Destination AS number|dst_as|From ExtendedGateway|DST_AS (17)|bgpDestinationAsNumber (17)| -|NextHop|Nexthop address|nexthop|From ExtendedGateway|IPV4_NEXT_HOP (15) BGP_IPV4_NEXT_HOP (18) IPV6_NEXT_HOP (62) BGP_IPV6_NEXT_HOP (63)|ipNextHopIPv4Address (15) bgpNextHopIPv4Address (18) ipNextHopIPv6Address (62) bgpNextHopIPv6Address (63)| -|NextHopAS|Nexthop AS number| |From ExtendedGateway| | | -|SrcNet|Source address mask|src_mask|From ExtendedRouter|SRC_MASK (9) IPV6_SRC_MASK (29)|sourceIPv4PrefixLength (9) sourceIPv6PrefixLength (29)| -|DstNet|Destination address mask|dst_mask|From ExtendedRouter|DST_MASK (13) IPV6_DST_MASK (30)|destinationIPv4PrefixLength (13) destinationIPv6PrefixLength (30)| -|HasEncap|Indicates if has GRE encapsulation||Included||| -|xxxEncap fields|Same as field but inside GRE||Included||| -|HasMPLS|Indicates the presence of MPLS header||Included||| -|MPLSCount|Count of MPLS layers||Included||| -|MPLSxTTL|TTL of the MPLS label||Included||| -|MPLSxLabel|MPLS label||Included||| - -If you are implementing flow processors to add more data to the protobuf, -we suggest you use field IDs ≥ 1000. - -### Implementation notes - -The pipeline at Cloudflare is connecting collectors with flow processors -that will add more information: with IP address, add country, ASN, etc. - -For aggregation, we are using Materialized tables in Clickhouse. -Dictionaries help correlating flows with country and ASNs. -A few collectors can treat hundred of thousands of samples. - -We also experimented successfully flow aggregation with Flink using a -[Keyed Session Window](https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#session-windows): -this sums the `Bytes x SamplingRate` and `Packets x SamplingRate` received during a 5 minutes **window** while allowing 2 more minutes -in the case where some flows were delayed before closing the **session**. - -The BGP information provided by routers can be unreliable (if the router does not have a BGP full-table or it is a static route). -You can use Maxmind [prefix to ASN](https://dev.maxmind.com/geoip/geoip2/geolite2/) in order to solve this issue. ## License diff --git a/cmd/cnetflow/cnetflow.go b/cmd/cnetflow/cnetflow.go deleted file mode 100644 index 1eae6de..0000000 --- a/cmd/cnetflow/cnetflow.go +++ /dev/null @@ -1,95 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "net/http" - "os" - "runtime" - - "github.com/cloudflare/goflow/v3/transport" - "github.com/cloudflare/goflow/v3/utils" - "github.com/prometheus/client_golang/prometheus/promhttp" - log "github.com/sirupsen/logrus" -) - -var ( - version = "" - buildinfos = "" - AppVersion = "GoFlow NetFlow " + version + " " + buildinfos - - Addr = flag.String("addr", "", "NetFlow/IPFIX listening address") - Port = flag.Int("port", 2055, "NetFlow/IPFIX listening port") - Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow/IPFIX listening port") - - Workers = flag.Int("workers", 1, "Number of NetFlow workers") - LogLevel = flag.String("loglevel", "info", "Log level") - LogFmt = flag.String("logfmt", "normal", "Log formatter") - - EnableKafka = flag.Bool("kafka", true, "Enable Kafka") - FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") - MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") - MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") - TemplatePath = flag.String("templates.path", "/templates", "NetFlow/IPFIX templates list") - - Version = flag.Bool("v", false, "Print version") -) - -func init() { - transport.RegisterFlags() -} - -func httpServer(state *utils.StateNetFlow) { - http.Handle(*MetricsPath, promhttp.Handler()) - http.HandleFunc(*TemplatePath, state.ServeHTTPTemplates) - log.Fatal(http.ListenAndServe(*MetricsAddr, nil)) -} - -func main() { - flag.Parse() - - if *Version { - fmt.Println(AppVersion) - os.Exit(0) - } - - lvl, _ := log.ParseLevel(*LogLevel) - log.SetLevel(lvl) - - var defaultTransport utils.Transport - defaultTransport = &utils.DefaultLogTransport{} - - switch *LogFmt { - case "json": - log.SetFormatter(&log.JSONFormatter{}) - defaultTransport = &utils.DefaultJSONTransport{} - } - - runtime.GOMAXPROCS(runtime.NumCPU()) - - log.Info("Starting GoFlow") - - s := &utils.StateNetFlow{ - Transport: defaultTransport, - Logger: log.StandardLogger(), - } - - go httpServer(s) - - if *EnableKafka { - kafkaState, err := transport.StartKafkaProducerFromArgs(log.StandardLogger()) - if err != nil { - log.Fatal(err) - } - kafkaState.FixedLengthProto = *FixedLength - s.Transport = kafkaState - } - log.WithFields(log.Fields{ - "Type": "NetFlow"}). - Infof("Listening on UDP %v:%v", *Addr, *Port) - - err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse) - if err != nil { - log.Fatalf("Fatal error: could not listen to UDP (%v)", err) - } -} diff --git a/cmd/cnflegacy/cnflegacy.go b/cmd/cnflegacy/cnflegacy.go deleted file mode 100644 index 30c8078..0000000 --- a/cmd/cnflegacy/cnflegacy.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "net/http" - "os" - "runtime" - - "github.com/cloudflare/goflow/v3/transport" - "github.com/cloudflare/goflow/v3/utils" - "github.com/prometheus/client_golang/prometheus/promhttp" - log "github.com/sirupsen/logrus" -) - -var ( - version = "" - buildinfos = "" - AppVersion = "GoFlow NetFlowV5 " + version + " " + buildinfos - - Addr = flag.String("addr", "", "NetFlow v5 listening address") - Port = flag.Int("port", 2055, "NetFlow v5 listening port") - Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow v5 listening port") - - Workers = flag.Int("workers", 1, "Number of NetFlow v5 workers") - LogLevel = flag.String("loglevel", "info", "Log level") - LogFmt = flag.String("logfmt", "normal", "Log formatter") - - EnableKafka = flag.Bool("kafka", true, "Enable Kafka") - FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") - MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") - MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") - - Version = flag.Bool("v", false, "Print version") -) - -func init() { - transport.RegisterFlags() -} - -func httpServer() { - http.Handle(*MetricsPath, promhttp.Handler()) - log.Fatal(http.ListenAndServe(*MetricsAddr, nil)) -} - -func main() { - flag.Parse() - - if *Version { - fmt.Println(AppVersion) - os.Exit(0) - } - - lvl, _ := log.ParseLevel(*LogLevel) - log.SetLevel(lvl) - - var defaultTransport utils.Transport - defaultTransport = &utils.DefaultLogTransport{} - - switch *LogFmt { - case "json": - log.SetFormatter(&log.JSONFormatter{}) - defaultTransport = &utils.DefaultJSONTransport{} - } - - runtime.GOMAXPROCS(runtime.NumCPU()) - - log.Info("Starting GoFlow") - - s := &utils.StateNFLegacy{ - Transport: defaultTransport, - Logger: log.StandardLogger(), - } - - go httpServer() - - if *EnableKafka { - kafkaState, err := transport.StartKafkaProducerFromArgs(log.StandardLogger()) - if err != nil { - log.Fatal(err) - } - kafkaState.FixedLengthProto = *FixedLength - s.Transport = kafkaState - } - log.WithFields(log.Fields{ - "Type": "NetFlowLegacy"}). - Infof("Listening on UDP %v:%v", *Addr, *Port) - - err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse) - if err != nil { - log.Fatalf("Fatal error: could not listen to UDP (%v)", err) - } -} diff --git a/cmd/csflow/csflow.go b/cmd/csflow/csflow.go deleted file mode 100644 index c326fe0..0000000 --- a/cmd/csflow/csflow.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "net/http" - "os" - "runtime" - - "github.com/cloudflare/goflow/v3/transport" - "github.com/cloudflare/goflow/v3/utils" - "github.com/prometheus/client_golang/prometheus/promhttp" - log "github.com/sirupsen/logrus" -) - -var ( - version = "" - buildinfos = "" - AppVersion = "GoFlow sFlow " + version + " " + buildinfos - - Addr = flag.String("addr", "", "sFlow listening address") - Port = flag.Int("port", 6343, "sFlow listening port") - Reuse = flag.Bool("reuse", false, "Enable so_reuseport for sFlow listening port") - - Workers = flag.Int("workers", 1, "Number of sFlow workers") - LogLevel = flag.String("loglevel", "info", "Log level") - LogFmt = flag.String("logfmt", "normal", "Log formatter") - - EnableKafka = flag.Bool("kafka", true, "Enable Kafka") - FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") - MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") - MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") - - Version = flag.Bool("v", false, "Print version") -) - -func init() { - transport.RegisterFlags() -} - -func httpServer() { - http.Handle(*MetricsPath, promhttp.Handler()) - log.Fatal(http.ListenAndServe(*MetricsAddr, nil)) -} - -func main() { - flag.Parse() - - if *Version { - fmt.Println(AppVersion) - os.Exit(0) - } - - lvl, _ := log.ParseLevel(*LogLevel) - log.SetLevel(lvl) - - var defaultTransport utils.Transport - defaultTransport = &utils.DefaultLogTransport{} - - switch *LogFmt { - case "json": - log.SetFormatter(&log.JSONFormatter{}) - defaultTransport = &utils.DefaultJSONTransport{} - } - - runtime.GOMAXPROCS(runtime.NumCPU()) - - log.Info("Starting GoFlow") - - s := &utils.StateSFlow{ - Transport: defaultTransport, - Logger: log.StandardLogger(), - } - - go httpServer() - - if *EnableKafka { - kafkaState, err := transport.StartKafkaProducerFromArgs(log.StandardLogger()) - if err != nil { - log.Fatal(err) - } - kafkaState.FixedLengthProto = *FixedLength - s.Transport = kafkaState - } - log.WithFields(log.Fields{ - "Type": "sFlow"}). - Infof("Listening on UDP %v:%v", *Addr, *Port) - - err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse) - if err != nil { - log.Fatalf("Fatal error: could not listen to UDP (%v)", err) - } -} diff --git a/conf/conf.go b/conf/conf.go new file mode 100644 index 0000000..5acd2e0 --- /dev/null +++ b/conf/conf.go @@ -0,0 +1,43 @@ +package conf + +import "flag" + +var ( + ClickHouseAddr = flag.String("ch.addr", "127.0.0.1", "ClickHouse DB Host") + ClickHousePort = flag.Int("ch.port", 9000, "ClickHouse DB port") + ClickHouseUser = flag.String("ch.username", "default", "ClickHouse username") + ClickHousePassword = flag.String("ch.password", "default", "ClickHouse password") + ClickHouseDatabase = flag.String("ch.database", "default", "ClickHouse database") + ClickHouseTable = flag.String("ch.table", "netflow", "ClickHouse table") + + SFlowEnable = flag.Bool("sflow", true, "Enable sFlow") + SFlowAddr = flag.String("sflow.addr", "", "sFlow listening address") + SFlowPort = flag.Int("sflow.port", 6343, "sFlow listening port") + SFlowReuse = flag.Bool("sflow.reuserport", false, "Enable so_reuseport for sFlow") + + NFLEnable = flag.Bool("nfl", true, "Enable NetFlow v5") + NFLAddr = flag.String("nfl.addr", "", "NetFlow v5 listening address") + NFLPort = flag.Int("nfl.port", 2056, "NetFlow v5 listening port") + NFLReuse = flag.Bool("nfl.reuserport", false, "Enable so_reuseport for NetFlow v5") + + NFEnable = flag.Bool("nf", true, "Enable NetFlow/IPFIX") + NFAddr = flag.String("nf.addr", "", "NetFlow/IPFIX listening address") + NFPort = flag.Int("nf.port", 2055, "NetFlow/IPFIX listening port") + NFReuse = flag.Bool("nf.reuserport", false, "Enable so_reuseport for NetFlow/IPFIX") + + Workers = flag.Int("workers", 1, "Number of workers per collector") + LogLevel = flag.String("loglevel", "warning", "Log level") + LogFmt = flag.String("logfmt", "normal", "Log formatter") + + EnableKafka = flag.Bool("kafka", false, "Enable Kafka (NOT SUPPORTED IN THIS VERSION)") + + EnableClickHouse = flag.Bool("ch", true, "Enable ClickHouse DB Integration") + + FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") + MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") + MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") + + TemplatePath = flag.String("templates.path", "/templates", "NetFlow/IPFIX templates list") + + Version = flag.Bool("v", false, "Print version") +) diff --git a/docker-compose-pkg.yml b/docker-compose-pkg.yml deleted file mode 100644 index 7737ac5..0000000 --- a/docker-compose-pkg.yml +++ /dev/null @@ -1,11 +0,0 @@ -version: '3' -services: - packager: - build: package - entrypoint: make - command: - - build-goflow - - package-deb-goflow - - package-rpm-goflow - volumes: - - ./:/work/ diff --git a/go.mod b/go.mod index cae068e..ebb85e0 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cloudflare/goflow/v3 go 1.12 require ( + github.com/ClickHouse/clickhouse-go v1.4.3 github.com/Shopify/sarama v1.22.0 github.com/golang/protobuf v1.3.1 github.com/libp2p/go-reuseport v0.0.1 diff --git a/go.sum b/go.sum index c9f5615..19db381 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/ClickHouse/clickhouse-go v1.4.3 h1:iAFMa2UrQdR5bHJ2/yaSLffZkxpcOYQMCUuKeNXGdqc= +github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14= github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Shopify/sarama v1.22.0 h1:rtiODsvY4jW6nUV6n3K+0gx/8WlAwVt+Ixt6RIvpYyo= @@ -6,6 +8,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWso github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= +github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -15,19 +19,25 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw= github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQzaaxIFYDhcYEA= +github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= diff --git a/goflow b/goflow new file mode 100755 index 0000000..4c78691 Binary files /dev/null and b/goflow differ diff --git a/cmd/goflow/goflow.go b/goflow.go similarity index 63% rename from cmd/goflow/goflow.go rename to goflow.go index 4221d1f..9e8ecc2 100644 --- a/cmd/goflow/goflow.go +++ b/goflow.go @@ -8,6 +8,7 @@ import ( "runtime" "sync" + . "github.com/cloudflare/goflow/v3/conf" "github.com/cloudflare/goflow/v3/transport" "github.com/cloudflare/goflow/v3/utils" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -18,40 +19,8 @@ var ( version = "" buildinfos = "" AppVersion = "GoFlow " + version + " " + buildinfos - - SFlowEnable = flag.Bool("sflow", true, "Enable sFlow") - SFlowAddr = flag.String("sflow.addr", "", "sFlow listening address") - SFlowPort = flag.Int("sflow.port", 6343, "sFlow listening port") - SFlowReuse = flag.Bool("sflow.reuserport", false, "Enable so_reuseport for sFlow") - - NFLEnable = flag.Bool("nfl", true, "Enable NetFlow v5") - NFLAddr = flag.String("nfl.addr", "", "NetFlow v5 listening address") - NFLPort = flag.Int("nfl.port", 2056, "NetFlow v5 listening port") - NFLReuse = flag.Bool("nfl.reuserport", false, "Enable so_reuseport for NetFlow v5") - - NFEnable = flag.Bool("nf", true, "Enable NetFlow/IPFIX") - NFAddr = flag.String("nf.addr", "", "NetFlow/IPFIX listening address") - NFPort = flag.Int("nf.port", 2055, "NetFlow/IPFIX listening port") - NFReuse = flag.Bool("nf.reuserport", false, "Enable so_reuseport for NetFlow/IPFIX") - - Workers = flag.Int("workers", 1, "Number of workers per collector") - LogLevel = flag.String("loglevel", "info", "Log level") - LogFmt = flag.String("logfmt", "normal", "Log formatter") - - EnableKafka = flag.Bool("kafka", true, "Enable Kafka") - FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") - MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") - MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") - - TemplatePath = flag.String("templates.path", "/templates", "NetFlow/IPFIX templates list") - - Version = flag.Bool("v", false, "Print version") ) -func init() { - transport.RegisterFlags() -} - func httpServer(state *utils.StateNetFlow) { http.Handle(*MetricsPath, promhttp.Handler()) http.HandleFunc(*TemplatePath, state.ServeHTTPTemplates) @@ -97,6 +66,7 @@ func main() { go httpServer(sNF) + // ht: insert new transport here, start it in a similar way as kafka if *EnableKafka { kafkaState, err := transport.StartKafkaProducerFromArgs(log.StandardLogger()) if err != nil { @@ -109,6 +79,18 @@ func main() { sNF.Transport = kafkaState } + if *EnableClickHouse { + clickHouseState, err := transport.StartClickHouseConnection(log.StandardLogger()) + if err != nil { + log.Fatal(err) + } + clickHouseState.FixedLengthProto = *FixedLength + + sSFlow.Transport = clickHouseState + sNFL.Transport = clickHouseState + sNF.Transport = clickHouseState + } + wg := &sync.WaitGroup{} if *SFlowEnable { wg.Add(1) diff --git a/transport/clickhouse.go b/transport/clickhouse.go new file mode 100644 index 0000000..d2375b1 --- /dev/null +++ b/transport/clickhouse.go @@ -0,0 +1,211 @@ +package transport + +import ( + // "errors" + + "fmt" + "net" + "sync" + + // "os" + // "reflect" + // "strings" + + . "github.com/cloudflare/goflow/v3/conf" + flowmessage "github.com/cloudflare/goflow/v3/pb" + "github.com/cloudflare/goflow/v3/utils" + + "database/sql" + + "github.com/ClickHouse/clickhouse-go" + // proto "github.com/golang/protobuf/proto" +) + +var ( + count uint64 + tx *sql.Tx + + dbConn *sql.DB +) + +type ClickHouseState struct { + FixedLengthProto bool +} + +func StartClickHouseConnection(logger utils.Logger) (*ClickHouseState, error) { + + count = 0 + chDebug := "debug=false" + + if ClickHouseAddr == nil { + temp := "" // *string cannot be initialized + ClickHouseAddr = &temp // in one statement + } + + if *LogLevel == "debug" { + chDebug = "debug=true" + } + + connStr := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s&database=%s&table=%s&%s", + *ClickHouseAddr, *ClickHousePort, *ClickHouseUser, *ClickHousePassword, *ClickHouseDatabase, *ClickHouseTable, chDebug) + + // open DB dbConnion stuff + connect, err := sql.Open("clickhouse", connStr) + dbConn = connect + if err != nil { + logger.Fatalf("couldn't dbConn to db (%v)", err) + } else { + fmt.Printf("NetFlow-clickhouse collector\nConnected to clickhouse\n server on %v:%v\n database:%s\n table: %s \n debug: %s", + *ClickHouseAddr, *ClickHousePort, *ClickHouseDatabase, *ClickHouseTable, chDebug) + } + if err := dbConn.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) + } else { + fmt.Println(err) + } + // return + } + + // create DB schema, if not exist + _, err = dbConn.Exec(fmt.Sprintf(` + CREATE DATABASE IF NOT EXISTS %s + `, *ClickHouseDatabase)) + if err != nil { + logger.Fatalf("couldn't create database '%s' (%v)", *ClickHouseDatabase, err) + } + + // use MergeTree engine to optimize storage + //https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/mergetree/ + _, err = dbConn.Exec(fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ( + TimeReceived DateTime CODEC(Delta, ZSTD), + TimeFlowStart DateTime CODEC(Delta, ZSTD), + TimeFlowEnd DateTime CODEC(Delta, ZSTD), + Bytes UInt64 CODEC(ZSTD(1)), + Etype UInt32, + Packets UInt32, + SrcAddr IPv4 CODEC(ZSTD), + DstAddr IPv4 CODEC(ZSTD), + SrcPort UInt32, + DstPort UInt32, + Proto UInt32, + SrcMac UInt64 CODEC(ZSTD), + DstMac UInt64 CODEC(ZSTD), + SrcVlan UInt32, + DstVlan UInt32, + VlanId UInt32, + FlowType UInt8 + +) ENGINE = MergeTree() +ORDER BY (TimeReceived, SrcAddr, SrcPort, DstAddr, DstPort) +TTL TimeReceived + interval 18 week +PARTITION BY toYYYYMMDD(TimeReceived) + `, *ClickHouseDatabase, *ClickHouseTable)) + + if err != nil { + logger.Fatalf("couldn't create table (%v)", err) + } + + // start transaction prep + + // defer stmt.Close() + state := ClickHouseState{FixedLengthProto: true} + + return &state, nil + +} + +func ipv4BytesToUint32(b []byte) uint32 { + return uint32(b[0])<<24 + uint32(b[1])<<16 + uint32(b[2])<<8 + uint32(b[3]) +} + +func ClickHouseInsert(fm *flowmessage.FlowMessage, stmt *sql.Stmt, wg *sync.WaitGroup) { + // extract fields out of the flow message + + // assume and encode as IPv4 (even if its v6) + //srcAddr := ipv4BytesToUint32(fm.GetSrcAddr()[:4]) + //dstAddr := ipv4BytesToUint32(fm.GetDstAddr()[:4]) + srcAddr := net.IP(fm.GetSrcAddr()[:4]).To4().String() + dstAddr := net.IP(fm.GetDstAddr()[:4]).To4().String() + + count += 1 + // fmt.Printf("stmt: %v\n", stmt) + if _, err := stmt.Exec( + fm.GetTimeReceived(), + fm.GetTimeFlowStart(), + fm.GetTimeFlowEnd(), + fm.GetBytes(), + fm.GetEtype(), + fm.GetPackets(), + srcAddr, + dstAddr, + fm.GetSrcPort(), + fm.GetDstPort(), + fm.GetProto(), + fm.GetSrcMac(), + fm.GetDstMac(), + fm.GetSrcVlan(), + fm.GetDstVlan(), + fm.GetVlanId(), + uint8(fm.GetType()), + ); err != nil { + fmt.Printf("error inserting record (%v)\n", err) + } + + wg.Done() + + // ----------------------------------------------- + + if *LogLevel == "debug" { + fmt.Printf("src (%v) %v:%v\ndst (%v) %v:%v\nbytes:%v\ncount:%v\n------\n", + srcAddr, + fm.GetSrcAddr(), + fm.GetSrcPort(), + dstAddr, + fm.GetDstAddr(), + fm.GetDstPort(), + fm.GetBytes(), + count) + } + +} + +func (s ClickHouseState) Publish(msgs []*flowmessage.FlowMessage) { + // ht: this is all you need to implement for the transport interface + // needs to be async?? + + // we need a semaphore / counter that increments inside the goroutines + // WaitGroup ~= semaphore + + var wg sync.WaitGroup + + tx, _ = dbConn.Begin() + + stmt, err := tx.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(TimeReceived, + TimeFlowStart,TimeFlowEnd,Bytes,Etype,Packets,SrcAddr,DstAddr,SrcPort, + DstPort,Proto,SrcMac,DstMac,SrcVlan,DstVlan,VlanId,FlowType) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`, *ClickHouseDatabase, *ClickHouseTable)) + + if err != nil { + fmt.Printf("Couldn't prepare statement (%v)\n", err) + // stmt.Close() + return + } + + for _, msg := range msgs { + wg.Add(1) + go ClickHouseInsert(msg, stmt, &wg) + } + + wg.Wait() + defer stmt.Close() + + if err := tx.Commit(); err != nil { + fmt.Printf("Couldn't commit transactions (%v)\n", err) + } + + // commit after all of those are inserted + // fmt.Println("\noutside loop!\n") + +} diff --git a/transport/kafka.go b/transport/kafka.go index ec3dae4..885b26e 100644 --- a/transport/kafka.go +++ b/transport/kafka.go @@ -4,7 +4,7 @@ import ( "crypto/tls" "crypto/x509" "errors" - "flag" + // "flag" "fmt" "os" "reflect" @@ -51,19 +51,19 @@ func ParseKafkaVersion(versionString string) (sarama.KafkaVersion, error) { return sarama.ParseKafkaVersion(versionString) } -func RegisterFlags() { - KafkaTLS = flag.Bool("kafka.tls", false, "Use TLS to connect to Kafka") - KafkaSASL = flag.Bool("kafka.sasl", false, "Use SASL/PLAIN data to connect to Kafka (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)") - KafkaTopic = flag.String("kafka.topic", "flow-messages", "Kafka topic to produce to") - KafkaSrv = flag.String("kafka.srv", "", "SRV record containing a list of Kafka brokers (or use kafka.out.brokers)") - KafkaBrk = flag.String("kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas") +// func RegisterFlags() { +// KafkaTLS = flag.Bool("kafka.tls", false, "Use TLS to connect to Kafka") +// KafkaSASL = flag.Bool("kafka.sasl", false, "Use SASL/PLAIN data to connect to Kafka (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)") +// KafkaTopic = flag.String("kafka.topic", "flow-messages", "Kafka topic to produce to") +// KafkaSrv = flag.String("kafka.srv", "", "SRV record containing a list of Kafka brokers (or use kafka.out.brokers)") +// KafkaBrk = flag.String("kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas") - KafkaLogErrors = flag.Bool("kafka.log.err", false, "Log Kafka errors") +// KafkaLogErrors = flag.Bool("kafka.log.err", false, "Log Kafka errors") - KafkaHashing = flag.Bool("kafka.hashing", false, "Enable partitioning by hash instead of random") - KafkaKeying = flag.String("kafka.key", "SamplerAddress,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas") - KafkaVersion = flag.String("kafka.version", "0.11.0.0", "Log message version (must be a version that parses per sarama.ParseKafkaVersion)") -} +// KafkaHashing = flag.Bool("kafka.hashing", false, "Enable partitioning by hash instead of random") +// KafkaKeying = flag.String("kafka.key", "SamplerAddress,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas") +// KafkaVersion = flag.String("kafka.version", "0.11.0.0", "Log message version (must be a version that parses per sarama.ParseKafkaVersion)") +// } func StartKafkaProducerFromArgs(log utils.Logger) (*KafkaState, error) { kVersion, err := ParseKafkaVersion(*KafkaVersion)