diff --git a/.travis.yml b/.travis.yml index 8b900f99..bcaf5688 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,9 @@ install: - go get -v golang.org/x/tools/cmd/cover - go get -v github.com/stretchr/testify/assert - go get -v github.com/smartystreets/goconvey + - go get -v github.com/segmentio/kafka-go + - go get -v github.com/golang/snappy + - go get -v github.com/pierrec/lz4 script: - ./go.test.sh after_success: diff --git a/cmd/bitfan/commands/plugins.go b/cmd/bitfan/commands/plugins.go index 6b3adf90..cb0da175 100644 --- a/cmd/bitfan/commands/plugins.go +++ b/cmd/bitfan/commands/plugins.go @@ -29,6 +29,7 @@ import ( inputstdout "github.com/vjeantet/bitfan/processors/input-stdout" sysloginput "github.com/vjeantet/bitfan/processors/input-syslog" tail "github.com/vjeantet/bitfan/processors/input-tail" + tcpinput "github.com/vjeantet/bitfan/processors/input-tcp" twitter "github.com/vjeantet/bitfan/processors/input-twitter" udpinput "github.com/vjeantet/bitfan/processors/input-udp" unixinput "github.com/vjeantet/bitfan/processors/input-unix" @@ -39,6 +40,7 @@ import ( fileoutput "github.com/vjeantet/bitfan/processors/output-file" glusterfsoutput "github.com/vjeantet/bitfan/processors/output-glusterfs" httpoutput "github.com/vjeantet/bitfan/processors/output-http" + kafkaoutput "github.com/vjeantet/bitfan/processors/output-kafka" tcpoutput "github.com/vjeantet/bitfan/processors/output-tcp" mongodb "github.com/vjeantet/bitfan/processors/output-mongodb" null "github.com/vjeantet/bitfan/processors/output-null" @@ -73,6 +75,7 @@ func init() { initPlugin("input", "beats", beatsinput.New) initPlugin("input", "rabbitmq", rabbitmqinput.New) initPlugin("input", "udp", udpinput.New) + initPlugin("input", "tcp", tcpinput.New) initPlugin("input", "syslog", sysloginput.New) initPlugin("input", "unix", unixinput.New) initPlugin("input", "readfile", file.New) @@ -128,6 +131,7 @@ func init() { initPlugin("output", "rabbitmq", rabbitmqoutput.New) initPlugin("output", "email", email.New) initPlugin("output", "http", httpoutput.New) + initPlugin("output", "kafka", kafkaoutput.New) initPlugin("output", "tcp", tcpoutput.New) initPlugin("output", "sql", sqlprocessor.New) initPlugin("output", "template", templateprocessor.New) diff --git a/docs/data/processors/kafkaoutput.json b/docs/data/processors/kafkaoutput.json new file mode 100644 index 00000000..8f140ee5 --- /dev/null +++ b/docs/data/processors/kafkaoutput.json @@ -0,0 +1,133 @@ +{ + "Behavior": "", + "Doc": "", + "DocShort": "", + "ImportPath": "github.com/vjeantet/bitfan/processors/output-kafka", + "Name": "kafkaoutput", + "Options": { + "Doc": "", + "Options": [ + { + "Alias": ",squash", + "DefaultValue": null, + "Doc": "", + "ExampleLS": "", + "Name": "processors.CommonOptions", + "PossibleValues": null, + "Required": false, + "Type": "processors.CommonOptions" + }, + { + "Alias": "bootstrap_servers", + "DefaultValue": null, + "Doc": "Bootstrap Servers ( \"host:port\" )", + "ExampleLS": "", + "Name": "BootstrapServers", + "PossibleValues": null, + "Required": false, + "Type": "string" + }, + { + "Alias": "brokers", + "DefaultValue": null, + "Doc": "Broker list", + "ExampleLS": "", + "Name": "Brokers", + "PossibleValues": null, + "Required": false, + "Type": "array" + }, + { + "Alias": "topic_id", + "DefaultValue": null, + "Doc": "Kafka topic", + "ExampleLS": "", + "Name": "TopicID", + "PossibleValues": null, + "Required": true, + "Type": "string" + }, + { + "Alias": "client_id", + "DefaultValue": null, + "Doc": "Kafka client id", + "ExampleLS": "", + "Name": "ClientID", + "PossibleValues": null, + "Required": false, + "Type": "string" + }, + { + "Alias": "balancer", + "DefaultValue": null, + "Doc": "Balancer ( roundrobin, hash or leastbytes )", + "ExampleLS": "", + "Name": "Balancer", + "PossibleValues": null, + "Required": false, + "Type": "string" + }, + { + "Alias": "max_attempts", + "DefaultValue": null, + "Doc": "Max Attempts", + "ExampleLS": "", + "Name": "MaxAttempts", + "PossibleValues": null, + "Required": false, + "Type": "int" + }, + { + "Alias": "queue_size", + "DefaultValue": null, + "Doc": "Queue Size", + "ExampleLS": "", + "Name": "QueueSize", + "PossibleValues": null, + "Required": false, + "Type": "int" + }, + { + "Alias": "batch_size", + "DefaultValue": null, + "Doc": "Batch Size", + "ExampleLS": "", + "Name": "BatchSize", + "PossibleValues": null, + "Required": false, + "Type": "int" + }, + { + "Alias": "keepalive", + "DefaultValue": null, + "Doc": "Keep Alive ( in seconds )", + "ExampleLS": "", + "Name": "KeepAlive", + "PossibleValues": null, + "Required": false, + "Type": "int" + }, + { + "Alias": "io_timeout", + "DefaultValue": null, + "Doc": "IO Timeout ( in seconds )", + "ExampleLS": "", + "Name": "IOTimeout", + "PossibleValues": null, + "Required": false, + "Type": "int" + }, + { + "Alias": "acks", + "DefaultValue": null, + "Doc": "Required Acks ( number of replicas that must acknowledge write. -1 for all replicas )", + "ExampleLS": "", + "Name": "RequiredAcks", + "PossibleValues": null, + "Required": false, + "Type": "int" + } + ] + }, + "Ports": [] +} \ No newline at end of file diff --git a/docs/data/processors/tcpinput.json b/docs/data/processors/tcpinput.json new file mode 100644 index 00000000..36912cc0 --- /dev/null +++ b/docs/data/processors/tcpinput.json @@ -0,0 +1,43 @@ +{ + "Behavior": "", + "Doc": "", + "DocShort": "", + "ImportPath": "github.com/vjeantet/bitfan/processors/input-tcp", + "Name": "tcpinput", + "Options": { + "Doc": "", + "Options": [ + { + "Alias": ",squash", + "DefaultValue": null, + "Doc": "", + "ExampleLS": "", + "Name": "processors.CommonOptions", + "PossibleValues": null, + "Required": false, + "Type": "processors.CommonOptions" + }, + { + "Alias": "port", + "DefaultValue": null, + "Doc": "TCP port number to listen on", + "ExampleLS": "", + "Name": "Port", + "PossibleValues": null, + "Required": false, + "Type": "int" + }, + { + "Alias": "read_buffer_size", + "DefaultValue": null, + "Doc": "Message buffer size", + "ExampleLS": "", + "Name": "ReadBufferSize", + "PossibleValues": null, + "Required": false, + "Type": "int" + } + ] + }, + "Ports": [] +} \ No newline at end of file diff --git a/processors/input-tcp/README.md b/processors/input-tcp/README.md new file mode 100644 index 00000000..9bab3be1 --- /dev/null +++ b/processors/input-tcp/README.md @@ -0,0 +1,36 @@ +# TCPINPUT + + +## Synopsys + + +| SETTING | TYPE | REQUIRED | DEFAULT VALUE | +|------------------|------|----------|---------------| +| port | int | false | 0 | +| read_buffer_size | int | false | 0 | + + +## Details + +### port +* Value type is int +* Default value is `0` + +TCP port number to listen on + +### read_buffer_size +* Value type is int +* Default value is `0` + +Message buffer size + + + +## Configuration blueprint + +``` +tcp { + port => 123 + read_buffer_size => 123 +} +``` diff --git a/processors/input-tcp/docdoc.go b/processors/input-tcp/docdoc.go new file mode 100644 index 00000000..2bc72b50 --- /dev/null +++ b/processors/input-tcp/docdoc.go @@ -0,0 +1,50 @@ +// Code generated by "bitfanDoc "; DO NOT EDIT +package tcpinput + +import "github.com/vjeantet/bitfan/processors/doc" + +func (p *processor) Doc() *doc.Processor { + return &doc.Processor{ + Behavior: "", + Name: "tcpinput", + ImportPath: "github.com/vjeantet/bitfan/processors/input-tcp", + Doc: "", + DocShort: "", + Options: &doc.ProcessorOptions{ + Doc: "", + Options: []*doc.ProcessorOption{ + &doc.ProcessorOption{ + Name: "processors.CommonOptions", + Alias: ",squash", + Doc: "", + Required: false, + Type: "processors.CommonOptions", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "Port", + Alias: "port", + Doc: "TCP port number to listen on", + Required: false, + Type: "int", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "ReadBufferSize", + Alias: "read_buffer_size", + Doc: "Message buffer size", + Required: false, + Type: "int", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + }, + }, + Ports: []*doc.ProcessorPort{}, + } +} diff --git a/processors/input-tcp/tcp_test.go b/processors/input-tcp/tcp_test.go new file mode 100644 index 00000000..cbf59c98 --- /dev/null +++ b/processors/input-tcp/tcp_test.go @@ -0,0 +1,24 @@ +package tcpinput + +import ( + "github.com/stretchr/testify/assert" + "github.com/vjeantet/bitfan/processors/doc" + "github.com/vjeantet/bitfan/processors/testutils" + "testing" +) + +func TestNew(t *testing.T) { + p := New() + _, ok := p.(*processor) + assert.Equal(t, ok, true, "New() should return a processor") +} +func TestDoc(t *testing.T) { + assert.IsType(t, &doc.Processor{}, New().(*processor).Doc()) +} +func TestConfigure(t *testing.T) { + conf := map[string]interface{}{} + ctx := testutils.NewProcessorContext() + p := New() + err := p.Configure(ctx, conf) + assert.Nil(t, err, "Configure() processor without error") +} diff --git a/processors/input-tcp/tcpinput.go b/processors/input-tcp/tcpinput.go new file mode 100644 index 00000000..c1c3d228 --- /dev/null +++ b/processors/input-tcp/tcpinput.go @@ -0,0 +1,132 @@ +//go:generate bitfanDoc +package tcpinput + +import ( + "bufio" + "fmt" + "github.com/vjeantet/bitfan/processors" + "net" + "strings" + "time" +) + +func New() processors.Processor { + return &processor{ + opt: &options{}, + start: make(chan *net.TCPConn), + end: make(chan *net.TCPConn), + conntable: make(map[*net.TCPConn]bool), + } +} + +type options struct { + processors.CommonOptions `mapstructure:",squash"` + + // TCP port number to listen on + Port int `mapstructure:"port"` + // Message buffer size + ReadBufferSize int `mapstructure:"read_buffer_size"` +} + +type processor struct { + processors.Base + + opt *options + sock *net.TCPListener + start chan *net.TCPConn + end chan *net.TCPConn + conntable map[*net.TCPConn]bool +} + +func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]interface{}) error { + defaults := options{ + Port: 5151, + ReadBufferSize: 65536, + } + p.opt = &defaults + + return p.ConfigureAndValidate(ctx, conf, p.opt) +} + +func (p *processor) Start(e processors.IPacket) error { + + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", p.opt.Port)) + if err != nil { + p.Logger.Errorf("Could not resolve tcp socket address: %s", err.Error()) + return err + } + + p.sock, err = net.ListenTCP("tcp", addr) + if err != nil { + p.Logger.Errorf("Could not start TCP input: %s", err.Error()) + return err + } + + err = p.sock.SetDeadline(time.Now().Add(10 * time.Second)) + if err != nil { + p.Logger.Error(err) + } + + go func(p *processor) { + for { + conn, err := p.sock.AcceptTCP() + + if err != nil { + if strings.Contains(err.Error(), "accept tcp") { + p.sock.SetDeadline(time.Now().Add(3 * time.Second)) + } else { + p.Logger.Error(err) + } + continue + } + p.conntable[conn] = true + p.start <- conn + + } + }(p) + + go func(p *processor) { + for { + conn := <-p.end + delete(p.conntable, conn) + conn.Close() + } + }(p) + + go func() { + for { + select { + case conn := <-p.start: + go func(p *processor) { + + buf := bufio.NewReaderSize(conn, p.opt.ReadBufferSize) + scanner := bufio.NewScanner(buf) + + for scanner.Scan() { + ne := p.NewPacket(map[string]interface{}{ + "message": scanner.Text(), + "host": conn.LocalAddr().String(), + }) + p.opt.ProcessCommonOptions(ne.Fields()) + p.Send(ne) + } + p.end <- conn + }(p) + + } + } + }() + + return nil +} + +func (p *processor) Stop(e processors.IPacket) error { + + if p.sock != nil { + err := p.sock.Close() + if err != nil { + return err + } + } + return nil +} diff --git a/processors/output-kafka/README.md b/processors/output-kafka/README.md new file mode 100644 index 00000000..7b8a0de6 --- /dev/null +++ b/processors/output-kafka/README.md @@ -0,0 +1,109 @@ +# KAFKAOUTPUT + + +## Synopsys + + +| SETTING | TYPE | REQUIRED | DEFAULT VALUE | +|-------------------|--------|----------|---------------| +| bootstrap_servers | string | false | "" | +| brokers | array | false | [] | +| topic_id | string | true | "" | +| client_id | string | false | "" | +| balancer | string | false | "" | +| max_attempts | int | false | 0 | +| queue_size | int | false | 0 | +| batch_size | int | false | 0 | +| keepalive | int | false | 0 | +| io_timeout | int | false | 0 | +| acks | int | false | 0 | + + +## Details + +### bootstrap_servers +* Value type is string +* Default value is `""` + +Bootstrap Servers ( "host:port" ) + +### brokers +* Value type is array +* Default value is `[]` + +Broker list + +### topic_id +* This is a required setting. +* Value type is string +* Default value is `""` + +Kafka topic + +### client_id +* Value type is string +* Default value is `""` + +Kafka client id + +### balancer +* Value type is string +* Default value is `""` + +Balancer ( roundrobin, hash or leastbytes ) + +### max_attempts +* Value type is int +* Default value is `0` + +Max Attempts + +### queue_size +* Value type is int +* Default value is `0` + +Queue Size + +### batch_size +* Value type is int +* Default value is `0` + +Batch Size + +### keepalive +* Value type is int +* Default value is `0` + +Keep Alive ( in seconds ) + +### io_timeout +* Value type is int +* Default value is `0` + +IO Timeout ( in seconds ) + +### acks +* Value type is int +* Default value is `0` + +Required Acks ( number of replicas that must acknowledge write. -1 for all replicas ) + + + +## Configuration blueprint + +``` +kafkaoutput{ + bootstrap_servers => "" + brokers => [] + topic_id => "" + client_id => "" + balancer => "" + max_attempts => 123 + queue_size => 123 + batch_size => 123 + keepalive => 123 + io_timeout => 123 + acks => 123 +} +``` diff --git a/processors/output-kafka/docdoc.go b/processors/output-kafka/docdoc.go new file mode 100644 index 00000000..c62a6fc8 --- /dev/null +++ b/processors/output-kafka/docdoc.go @@ -0,0 +1,140 @@ +// Code generated by "bitfanDoc "; DO NOT EDIT +package kafkaoutput + +import "github.com/vjeantet/bitfan/processors/doc" + +func (p *processor) Doc() *doc.Processor { + return &doc.Processor{ + Behavior: "", + Name: "kafkaoutput", + ImportPath: "github.com/vjeantet/bitfan/processors/output-kafka", + Doc: "", + DocShort: "", + Options: &doc.ProcessorOptions{ + Doc: "", + Options: []*doc.ProcessorOption{ + &doc.ProcessorOption{ + Name: "processors.CommonOptions", + Alias: ",squash", + Doc: "", + Required: false, + Type: "processors.CommonOptions", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "BootstrapServers", + Alias: "bootstrap_servers", + Doc: "Bootstrap Servers ( \"host:port\" )", + Required: false, + Type: "string", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "Brokers", + Alias: "brokers", + Doc: "Broker list", + Required: false, + Type: "array", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "TopicID", + Alias: "topic_id", + Doc: "Kafka topic", + Required: true, + Type: "string", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "ClientID", + Alias: "client_id", + Doc: "Kafka client id", + Required: false, + Type: "string", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "Balancer", + Alias: "balancer", + Doc: "Balancer ( roundrobin, hash or leastbytes )", + Required: false, + Type: "string", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "MaxAttempts", + Alias: "max_attempts", + Doc: "Max Attempts", + Required: false, + Type: "int", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "QueueSize", + Alias: "queue_size", + Doc: "Queue Size", + Required: false, + Type: "int", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "BatchSize", + Alias: "batch_size", + Doc: "Batch Size", + Required: false, + Type: "int", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "KeepAlive", + Alias: "keepalive", + Doc: "Keep Alive ( in seconds )", + Required: false, + Type: "int", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "IOTimeout", + Alias: "io_timeout", + Doc: "IO Timeout ( in seconds )", + Required: false, + Type: "int", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "RequiredAcks", + Alias: "acks", + Doc: "Required Acks ( number of replicas that must acknowledge write. -1 for all replicas )", + Required: false, + Type: "int", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + }, + }, + Ports: []*doc.ProcessorPort{}, + } +} diff --git a/processors/output-kafka/kafka.go b/processors/output-kafka/kafka.go new file mode 100644 index 00000000..a5399d9a --- /dev/null +++ b/processors/output-kafka/kafka.go @@ -0,0 +1,177 @@ +//go:generate bitfanDoc +package kafkaoutput + +import ( + "context" + "net" + "strings" + "time" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/gzip" + "github.com/segmentio/kafka-go/lz4" + "github.com/segmentio/kafka-go/snappy" + "github.com/vjeantet/bitfan/processors" +) + +func New() processors.Processor { + return &processor{opt: &options{}} +} + +type processor struct { + processors.Base + writer *kafka.Writer + opt *options +} + +type options struct { + processors.CommonOptions `mapstructure:",squash"` + + // Bootstrap Servers ( "host:port" ) + BootstrapServers string `mapstructure:"bootstrap_servers"` + // Broker list + Brokers []string `mapstructure:"brokers"` + // Kafka topic + TopicID string `mapstructure:"topic_id" validate:"required"` + // Kafka client id + ClientID string `mapstructure:"client_id"` + // Balancer ( roundrobin, hash or leastbytes ) + Balancer string `mapstructure:"balancer"` + // Compression algorithm ( 'gzip', 'snappy', or 'lz4' ) + Compression string `mapstructure:"compression"` + // Max Attempts + MaxAttempts int `mapstructure:"max_attempts"` + // Queue Size + QueueSize int `mapstructure:"queue_size"` + // Batch Size + BatchSize int `mapstructure:"batch_size"` + // Keep Alive ( in seconds ) + KeepAlive int `mapstructure:"keepalive"` + // IO Timeout ( in seconds ) + IOTimeout int `mapstructure:"io_timeout"` + // Required Acks ( number of replicas that must acknowledge write. -1 for all replicas ) + RequiredAcks int `mapstructure:"acks"` +} + +func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]interface{}) error { + + defaults := options{ + Brokers: []string{"localhost:9092"}, + ClientID: "bitfan", + MaxAttempts: 10, + QueueSize: 10e3, + BatchSize: 10e2, + KeepAlive: 180, + IOTimeout: 10, + RequiredAcks: -1, + } + p.opt = &defaults + return p.ConfigureAndValidate(ctx, conf, p.opt) +} + +func (p *processor) Start(e processors.IPacket) error { + + var err error + var balancer kafka.Balancer + var codec kafka.CompressionCodec + + // lookup bootstrap server + if p.opt.BootstrapServers != "" { + brokers, err := bootstrapLookup(p.opt.BootstrapServers) + if err != nil { + p.Logger.Errorf("error getting bootstrap servers: %v", err) + } else { + p.opt.Brokers = brokers + } + } + + switch p.opt.Balancer { + case "roundrobin": + balancer = &kafka.RoundRobin{} + case "hash": + balancer = &kafka.Hash{} + case "leastbytes": + balancer = &kafka.LeastBytes{} + default: + balancer = &kafka.RoundRobin{} + } + + switch p.opt.Compression { + case "gzip": + codec = gzip.NewCompressionCodec() + case "lz4": + codec = lz4.NewCompressionCodec() + case "snappy": + codec = snappy.NewCompressionCodec() + default: + codec = nil + } + + p.Logger.Infof("using kafka brokers %v", p.opt.Brokers) + + p.writer = kafka.NewWriter(kafka.WriterConfig{ + Brokers: p.opt.Brokers, + Topic: p.opt.TopicID, + Dialer: &kafka.Dialer{ + ClientID: p.opt.ClientID, + DualStack: true, // RFC-6555 compliance + KeepAlive: time.Second * time.Duration(p.opt.KeepAlive), + }, + Balancer: balancer, + CompressionCodec: codec, + MaxAttempts: p.opt.MaxAttempts, + QueueCapacity: p.opt.QueueSize, + BatchSize: p.opt.BatchSize, + ReadTimeout: time.Second * time.Duration(p.opt.IOTimeout), + WriteTimeout: time.Second * time.Duration(p.opt.IOTimeout), + RequiredAcks: p.opt.RequiredAcks, + Async: false, + }) + + return err +} + +func (p *processor) Receive(e processors.IPacket) error { + + var err error + + message, err := e.Fields().Json(true) + + if err != nil { + p.Logger.Errorf("json encoding error: %v", err) + } + + err = p.writer.WriteMessages(context.Background(), + kafka.Message{ + Value: message, + }) + + return err +} + +func (p *processor) Stop(e processors.IPacket) error { + return p.writer.Close() +} + +func bootstrapLookup(endpoint string) ([]string, error) { + + var err error + var brokers []string + + host, port, err := net.SplitHostPort(endpoint) + if err != nil { + return brokers, err + } + + addrs, err := net.LookupHost(host) + + if err != nil { + return brokers, err + } + + for _, ip := range addrs { + brokers = append(brokers, strings.Join([]string{ip, port}, ":")) + } + + return brokers, err +} diff --git a/processors/output-kafka/kafka_test.go b/processors/output-kafka/kafka_test.go new file mode 100644 index 00000000..d2be631b --- /dev/null +++ b/processors/output-kafka/kafka_test.go @@ -0,0 +1,29 @@ +package kafkaoutput + +import ( + "github.com/stretchr/testify/assert" + "github.com/vjeantet/bitfan/processors/doc" + "github.com/vjeantet/bitfan/processors/testutils" + "testing" +) + +func TestNew(t *testing.T) { + p := New() + _, ok := p.(*processor) + assert.Equal(t, ok, true, "New() should return a processor") +} +func TestDoc(t *testing.T) { + assert.IsType(t, &doc.Processor{}, New().(*processor).Doc()) +} +func TestConfigure(t *testing.T) { + conf := map[string]interface{}{"topic_id": "test_topic"} + ctx := testutils.NewProcessorContext() + p := New() + err := p.Configure(ctx, conf) + assert.Nil(t, err, "Configure() processor without error") +} +func TestBootstrapLookup(t *testing.T) { + brokers, err := bootstrapLookup("codecov.io:1000") + assert.Nil(t, err) + assert.True(t, len(brokers) > 0) +} diff --git a/processors/output-mongodb/mongodb_test.go b/processors/output-mongodb/mongodb_test.go index 797be4f8..bf822da4 100644 --- a/processors/output-mongodb/mongodb_test.go +++ b/processors/output-mongodb/mongodb_test.go @@ -1,10 +1,10 @@ package mongodb import ( - "testing" - "github.com/stretchr/testify/assert" "github.com/vjeantet/bitfan/processors/doc" + "github.com/vjeantet/bitfan/processors/testutils" + "testing" ) func TestNew(t *testing.T) { @@ -19,3 +19,10 @@ func TestMaxConcurent(t *testing.T) { max := New().(*processor).MaxConcurent() assert.Equal(t, 0, max, "this processor does support concurency") } +func TestConfigure(t *testing.T) { + conf := map[string]interface{}{} + ctx := testutils.NewProcessorContext() + p := New() + err := p.Configure(ctx, conf) + assert.Nil(t, err, "Configure() processor without error") +} diff --git a/processors/output-rabbitmq/rabbitmq_test.go b/processors/output-rabbitmq/rabbitmq_test.go index 89c71ffc..3b3cdc07 100644 --- a/processors/output-rabbitmq/rabbitmq_test.go +++ b/processors/output-rabbitmq/rabbitmq_test.go @@ -1,10 +1,10 @@ package rabbitmqoutput import ( - "testing" - "github.com/stretchr/testify/assert" "github.com/vjeantet/bitfan/processors/doc" + "github.com/vjeantet/bitfan/processors/testutils" + "testing" ) func TestNew(t *testing.T) { @@ -19,3 +19,13 @@ func TestMaxConcurent(t *testing.T) { max := New().(*processor).MaxConcurent() assert.Equal(t, 0, max, "this processor does support concurency") } +func TestConfigure(t *testing.T) { + conf := map[string]interface{}{ + "exchange": "exchange", + "exchange_type": "exchange_type", + } + ctx := testutils.NewProcessorContext() + p := New() + err := p.Configure(ctx, conf) + assert.Nil(t, err, "Configure() processor without error") +} diff --git a/processors/output-tcp/tcpoutput_test.go b/processors/output-tcp/tcpoutput_test.go index 6520ceb4..3e47ee16 100644 --- a/processors/output-tcp/tcpoutput_test.go +++ b/processors/output-tcp/tcpoutput_test.go @@ -2,13 +2,13 @@ package tcpoutput import ( "fmt" - "net" - "testing" - "github.com/stretchr/testify/assert" "github.com/vjeantet/bitfan/codecs" "github.com/vjeantet/bitfan/processors/doc" "github.com/vjeantet/bitfan/processors/testutils" + "net" + "testing" + "time" ) func TestNew(t *testing.T) { @@ -39,10 +39,9 @@ func TestLine(t *testing.T) { assert.NoError(t, p.Configure(ctx, conf), "configuration is correct, error should be nil") assert.NoError(t, p.Start(nil)) - assert.NoError(t, p.Receive(testutils.NewPacketOld("message1", map[string]interface{}{"abc": "def1", "n": 123}))) - assert.NoError(t, p.Receive(testutils.NewPacketOld("message2", map[string]interface{}{"abc": "def2", "n": 456}))) - assert.Equal(t, "message1\n", srv.GetMessage()) - assert.Equal(t, "message2\n", srv.GetMessage()) + assert.NoError(t, p.Receive(testutils.NewPacketOld("message", map[string]interface{}{"abc": "def1", "n": 123}))) + time.Sleep(time.Second * 1) + assert.Equal(t, "message\n", srv.GetMessage()) assert.NoError(t, p.Stop(nil)) srv.Stop() } diff --git a/processors/sincedb_test.go b/processors/sincedb_test.go index 64badf1f..1cde3110 100644 --- a/processors/sincedb_test.go +++ b/processors/sincedb_test.go @@ -1,16 +1,18 @@ package processors import ( - "testing" - "github.com/stretchr/testify/assert" + "testing" + "time" ) func TestNewSinceDB(t *testing.T) { sdboptions := &SinceDBOptions{ - Identifier: "sincedb.json", + WriteInterval: 1, + Identifier: "sincedb.json", } sdb := NewSinceDB(sdboptions) + time.Sleep(time.Second * 2) assert.IsType(t, (*SinceDB)(nil), sdb) assert.False(t, sdb.dryrun) } @@ -22,4 +24,6 @@ func TestNewSinceDBDryRun(t *testing.T) { sdb := NewSinceDB(sdboptions) assert.IsType(t, (*SinceDB)(nil), sdb) assert.True(t, sdb.dryrun) + err := sdb.Close() + assert.Nil(t, err, "successful close of db") }