Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

tcp input and kafka output processors #87

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9a8e02f
Add tcp input processor
Sep 25, 2018
184cc9d
add stub files for kafka output
Sep 25, 2018
1811d54
message handling thread was broken. fixed it.
awillis Sep 26, 2018
ec2ce92
make tcpinput import statement consistent
awillis Sep 26, 2018
9a4131f
add bitfanDoc output
awillis Sep 26, 2018
0debe87
more bitfan doc stuff
awillis Sep 26, 2018
4322a5c
truncate old docs with new
awillis Sep 26, 2018
e9a699a
First version of kafka driver, needs testing
awillis Sep 27, 2018
d5e98e1
Merge branch 'tcp_input' of github.com:awillis/bitfan into tcpin_kafk…
awillis Sep 27, 2018
2e8336d
minor readme issues
awillis Sep 27, 2018
2bbe163
add support for dns bootstrap server
Sep 27, 2018
e3e0c3d
- set bootstrap_servers option like logstash does
Sep 28, 2018
2180a3e
updated documentation
Sep 28, 2018
a69e86b
add dependencies to CI build
Sep 28, 2018
1f76c27
go fmt docs and add tests for Configure() methods
awillis Sep 29, 2018
bb26dd3
slight improvement to sincedb test for codecov
awillis Sep 29, 2018
935fcd5
improve sincedb test coverage
awillis Sep 29, 2018
ed69462
sending two messages and waiting for two messages works better when y…
awillis Sep 29, 2018
1fb2e23
improve mongo tests to make codecov happy
awillis Sep 29, 2018
bec2d92
only send one message. sending two messages is too error prone for ci…
awillis Sep 29, 2018
3bf510c
improve output rabbitmq tests in an attempt to appease the codecov gods
awillis Sep 29, 2018
2a8364e
sleep for a bit
awillis Sep 29, 2018
a0907c1
add test for bootstrap_servers lookup
awillis Sep 29, 2018
c882057
remove sleep, as this is not a real server
awillis Sep 29, 2018
02159b3
ok. add the sleep back in. because that seemed to make travis happy.
awillis Sep 29, 2018
4481a81
- use stdlib instead of miekg dns library
awillis Oct 17, 2018
0d79f56
fixup build libraries
awillis Oct 17, 2018
29e4e2c
add support for compression
awillis Oct 17, 2018
e8b36f5
revert gopkg.in paths, so as not to import twice with gzip library, a…
awillis Oct 17, 2018
2d8b848
add missing error check, actually populate broker list
awillis Oct 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ install:
- go get -v golang.org/x/tools/cmd/cover
- go get -v github.com/stretchr/testify/assert
- go get -v github.com/smartystreets/goconvey
- go get -v github.com/segmentio/kafka-go
- go get -v github.com/golang/snappy
- go get -v github.com/pierrec/lz4
script:
- ./go.test.sh
after_success:
Expand Down
4 changes: 4 additions & 0 deletions cmd/bitfan/commands/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
inputstdout "github.com/vjeantet/bitfan/processors/input-stdout"
sysloginput "github.com/vjeantet/bitfan/processors/input-syslog"
tail "github.com/vjeantet/bitfan/processors/input-tail"
tcpinput "github.com/vjeantet/bitfan/processors/input-tcp"
twitter "github.com/vjeantet/bitfan/processors/input-twitter"
udpinput "github.com/vjeantet/bitfan/processors/input-udp"
unixinput "github.com/vjeantet/bitfan/processors/input-unix"
Expand All @@ -39,6 +40,7 @@ import (
fileoutput "github.com/vjeantet/bitfan/processors/output-file"
glusterfsoutput "github.com/vjeantet/bitfan/processors/output-glusterfs"
httpoutput "github.com/vjeantet/bitfan/processors/output-http"
kafkaoutput "github.com/vjeantet/bitfan/processors/output-kafka"
tcpoutput "github.com/vjeantet/bitfan/processors/output-tcp"
mongodb "github.com/vjeantet/bitfan/processors/output-mongodb"
null "github.com/vjeantet/bitfan/processors/output-null"
Expand Down Expand Up @@ -73,6 +75,7 @@ func init() {
initPlugin("input", "beats", beatsinput.New)
initPlugin("input", "rabbitmq", rabbitmqinput.New)
initPlugin("input", "udp", udpinput.New)
initPlugin("input", "tcp", tcpinput.New)
initPlugin("input", "syslog", sysloginput.New)
initPlugin("input", "unix", unixinput.New)
initPlugin("input", "readfile", file.New)
Expand Down Expand Up @@ -128,6 +131,7 @@ func init() {
initPlugin("output", "rabbitmq", rabbitmqoutput.New)
initPlugin("output", "email", email.New)
initPlugin("output", "http", httpoutput.New)
initPlugin("output", "kafka", kafkaoutput.New)
initPlugin("output", "tcp", tcpoutput.New)
initPlugin("output", "sql", sqlprocessor.New)
initPlugin("output", "template", templateprocessor.New)
Expand Down
133 changes: 133 additions & 0 deletions docs/data/processors/kafkaoutput.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
{
"Behavior": "",
"Doc": "",
"DocShort": "",
"ImportPath": "github.com/vjeantet/bitfan/processors/output-kafka",
"Name": "kafkaoutput",
"Options": {
"Doc": "",
"Options": [
{
"Alias": ",squash",
"DefaultValue": null,
"Doc": "",
"ExampleLS": "",
"Name": "processors.CommonOptions",
"PossibleValues": null,
"Required": false,
"Type": "processors.CommonOptions"
},
{
"Alias": "bootstrap_servers",
"DefaultValue": null,
"Doc": "Bootstrap Servers ( \"host:port\" )",
"ExampleLS": "",
"Name": "BootstrapServers",
"PossibleValues": null,
"Required": false,
"Type": "string"
},
{
"Alias": "brokers",
"DefaultValue": null,
"Doc": "Broker list",
"ExampleLS": "",
"Name": "Brokers",
"PossibleValues": null,
"Required": false,
"Type": "array"
},
{
"Alias": "topic_id",
"DefaultValue": null,
"Doc": "Kafka topic",
"ExampleLS": "",
"Name": "TopicID",
"PossibleValues": null,
"Required": true,
"Type": "string"
},
{
"Alias": "client_id",
"DefaultValue": null,
"Doc": "Kafka client id",
"ExampleLS": "",
"Name": "ClientID",
"PossibleValues": null,
"Required": false,
"Type": "string"
},
{
"Alias": "balancer",
"DefaultValue": null,
"Doc": "Balancer ( roundrobin, hash or leastbytes )",
"ExampleLS": "",
"Name": "Balancer",
"PossibleValues": null,
"Required": false,
"Type": "string"
},
{
"Alias": "max_attempts",
"DefaultValue": null,
"Doc": "Max Attempts",
"ExampleLS": "",
"Name": "MaxAttempts",
"PossibleValues": null,
"Required": false,
"Type": "int"
},
{
"Alias": "queue_size",
"DefaultValue": null,
"Doc": "Queue Size",
"ExampleLS": "",
"Name": "QueueSize",
"PossibleValues": null,
"Required": false,
"Type": "int"
},
{
"Alias": "batch_size",
"DefaultValue": null,
"Doc": "Batch Size",
"ExampleLS": "",
"Name": "BatchSize",
"PossibleValues": null,
"Required": false,
"Type": "int"
},
{
"Alias": "keepalive",
"DefaultValue": null,
"Doc": "Keep Alive ( in seconds )",
"ExampleLS": "",
"Name": "KeepAlive",
"PossibleValues": null,
"Required": false,
"Type": "int"
},
{
"Alias": "io_timeout",
"DefaultValue": null,
"Doc": "IO Timeout ( in seconds )",
"ExampleLS": "",
"Name": "IOTimeout",
"PossibleValues": null,
"Required": false,
"Type": "int"
},
{
"Alias": "acks",
"DefaultValue": null,
"Doc": "Required Acks ( number of replicas that must acknowledge write. -1 for all replicas )",
"ExampleLS": "",
"Name": "RequiredAcks",
"PossibleValues": null,
"Required": false,
"Type": "int"
}
]
},
"Ports": []
}
43 changes: 43 additions & 0 deletions docs/data/processors/tcpinput.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"Behavior": "",
"Doc": "",
"DocShort": "",
"ImportPath": "github.com/vjeantet/bitfan/processors/input-tcp",
"Name": "tcpinput",
"Options": {
"Doc": "",
"Options": [
{
"Alias": ",squash",
"DefaultValue": null,
"Doc": "",
"ExampleLS": "",
"Name": "processors.CommonOptions",
"PossibleValues": null,
"Required": false,
"Type": "processors.CommonOptions"
},
{
"Alias": "port",
"DefaultValue": null,
"Doc": "TCP port number to listen on",
"ExampleLS": "",
"Name": "Port",
"PossibleValues": null,
"Required": false,
"Type": "int"
},
{
"Alias": "read_buffer_size",
"DefaultValue": null,
"Doc": "Message buffer size",
"ExampleLS": "",
"Name": "ReadBufferSize",
"PossibleValues": null,
"Required": false,
"Type": "int"
}
]
},
"Ports": []
}
36 changes: 36 additions & 0 deletions processors/input-tcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# TCPINPUT


## Synopsys


| SETTING | TYPE | REQUIRED | DEFAULT VALUE |
|------------------|------|----------|---------------|
| port | int | false | 0 |
| read_buffer_size | int | false | 0 |


## Details

### port
* Value type is int
* Default value is `0`

TCP port number to listen on

### read_buffer_size
* Value type is int
* Default value is `0`

Message buffer size



## Configuration blueprint

```
tcp {
port => 123
read_buffer_size => 123
}
```
50 changes: 50 additions & 0 deletions processors/input-tcp/docdoc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions processors/input-tcp/tcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tcpinput

import (
"github.com/stretchr/testify/assert"
"github.com/vjeantet/bitfan/processors/doc"
"github.com/vjeantet/bitfan/processors/testutils"
"testing"
)

func TestNew(t *testing.T) {
p := New()
_, ok := p.(*processor)
assert.Equal(t, ok, true, "New() should return a processor")
}
func TestDoc(t *testing.T) {
assert.IsType(t, &doc.Processor{}, New().(*processor).Doc())
}
func TestConfigure(t *testing.T) {
conf := map[string]interface{}{}
ctx := testutils.NewProcessorContext()
p := New()
err := p.Configure(ctx, conf)
assert.Nil(t, err, "Configure() processor without error")
}
Loading