From d0d7d51e712c6a41541dcc090d7438bd41f3ee07 Mon Sep 17 00:00:00 2001 From: Warren Fernandes Date: Fri, 7 Nov 2014 15:09:28 -0700 Subject: [PATCH] Upgrade dropsonde [#81953496] Signed-off-by: Joseph Rodriguez --- Godeps/Godeps.json | 2 +- .../cloudfoundry/dropsonde/.travis.yml | 5 +- .../cloudfoundry/dropsonde/README.md | 39 ++--- .../dropsonde/autowire/autowire.go | 116 ------------- .../dropsonde/autowire/autowire_suite_test.go | 16 -- .../dropsonde/autowire/autowire_test.go | 86 ---------- .../dropsonde/control/controlmessage.pb.go | 107 ++++++++++++ .../dropsonde/control/heartbeatrequest.pb.go | 24 +++ .../cloudfoundry/dropsonde/control/uuid.pb.go | 40 +++++ .../cloudfoundry/dropsonde/dropsonde.go | 125 ++++++++++++++ .../cloudfoundry/dropsonde/dropsonde_test.go | 109 +++++++++++++ .../dropsonde_unmarshaller.go | 47 +++++- .../dropsonde_unmarshaller_test.go | 42 +++++ .../dropsonde/emitter/event_emitter_test.go | 2 +- .../dropsonde/emitter/event_formatter.go | 3 +- .../dropsonde/emitter/event_formatter_test.go | 11 +- .../dropsonde/emitter/heartbeat_emitter.go | 95 ----------- .../emitter/heartbeat_emitter_test.go | 120 -------------- .../dropsonde/emitter/heartbeat_responder.go | 77 +++++++++ .../emitter/heartbeat_responder_test.go | 154 ++++++++++++++++++ .../dropsonde/emitter/instrumented_emitter.go | 3 +- .../emitter/instrumented_emitter_test.go | 2 + .../emitter/responding_byte_emitter.go | 8 + .../dropsonde/emitter/udp_emitter.go | 24 +++ .../dropsonde/emitter/udp_emitter_test.go | 87 +++++++++- .../envelope_extensions.go | 19 ++- .../envelope_extensions_suite_test.go | 13 ++ .../envelope_extensions_test.go | 21 +-- .../dropsonde/events/envelope.pb.go | 15 ++ .../cloudfoundry/dropsonde/events/error.pb.go | 48 ++++++ .../dropsonde/events/events_suite_test.go | 13 -- .../dropsonde/events/heartbeat.pb.go | 17 +- .../cloudfoundry/dropsonde/events/http.pb.go | 29 +--- .../cloudfoundry/dropsonde/events/log.pb.go | 2 + .../dropsonde/events/metric.pb.go | 2 + .../cloudfoundry/dropsonde/events/uuid.pb.go | 40 +++++ .../dropsonde/factories/factories.go | 5 + .../instrumented_handler.go | 2 +- .../instrumented_handler_suite_test.go | 13 ++ .../instrumented_handler_test.go | 10 +- .../instrumented_round_tripper.go | 4 +- .../instrumented_round_tripper_suite_test.go | 13 ++ .../instrumented_round_tripper_test.go | 10 +- ...d_test.go => dropsonde_end_to_end_test.go} | 64 +++++--- .../dropsonde/{autowire => }/logs/logs.go | 29 ++-- .../{autowire => }/logs/logs_suite_test.go | 0 .../{autowire => }/logs/logs_test.go | 27 ++- .../{autowire => }/metrics/metrics.go | 26 +-- .../metrics/metrics_suite_test.go | 0 .../{autowire => }/metrics/metrics_test.go | 28 +++- .../file_and_loggregator_access_logger.go | 2 +- ...file_and_loggregator_access_logger_test.go | 4 +- config/config.go | 10 +- main.go | 3 + proxy/proxy.go | 4 +- proxy/proxy_test.go | 7 +- router/router.go | 4 +- router/router_test.go | 5 + scripts/test | 3 - test_util/helpers.go | 7 +- 60 files changed, 1233 insertions(+), 610 deletions(-) delete mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire.go delete mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire_suite_test.go delete mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire_test.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/controlmessage.pb.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/heartbeatrequest.pb.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/uuid.pb.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_test.go delete mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_emitter.go delete mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_emitter_test.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_responder.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_responder_test.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/responding_byte_emitter.go rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{events => envelope_extensions}/envelope_extensions.go (62%) create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions_suite_test.go rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{events => envelope_extensions}/envelope_extensions_test.go (78%) create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/error.pb.go delete mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/events_suite_test.go create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/uuid.pb.go rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{ => instrumented_handler}/instrumented_handler.go (98%) create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler_suite_test.go rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{ => instrumented_handler}/instrumented_handler_test.go (91%) rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{ => instrumented_round_tripper}/instrumented_round_tripper.go (96%) create mode 100644 Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper_suite_test.go rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{ => instrumented_round_tripper}/instrumented_round_tripper_test.go (90%) rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/integration_test/{autowire_end_to_end_test.go => dropsonde_end_to_end_test.go} (66%) rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{autowire => }/logs/logs.go (76%) rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{autowire => }/logs/logs_suite_test.go (100%) rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{autowire => }/logs/logs_test.go (64%) rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{autowire => }/metrics/metrics.go (73%) rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{autowire => }/metrics/metrics_suite_test.go (100%) rename Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/{autowire => }/metrics/metrics_test.go (65%) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index ec7225d4b..6d42f2187 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -20,7 +20,7 @@ }, { "ImportPath": "github.com/cloudfoundry/dropsonde", - "Rev": "510a77a445ec3319a67ad427320de767b0168509" + "Rev": "d8399b690bf4b541b1a530ca1ca7305b4f3444c2" }, { "ImportPath": "github.com/cloudfoundry/gosteno", diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/.travis.yml b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/.travis.yml index 2c43a1c25..46de47891 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/.travis.yml +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/.travis.yml @@ -19,8 +19,9 @@ install: script: PATH=$HOME/gopath/bin:$PATH ./test go: -- 1.2.1 -- 1.3.1 +- 1.2 +- 1.3 +- tip matrix: allow_failures: diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/README.md b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/README.md index 0179b9055..c25e5be62 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/README.md +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/README.md @@ -12,32 +12,28 @@ for the full specification of the dropsonde Protocol Buffer format. Use [this script](events/generate-events.sh) to generate Go handlers for the various protobuf messages. -## Autowire -The intended use of dropsonde is through the [autowire](autowire/autowire.go) -sub-package. Simply anonymously import the package +## Initialization and Configuration ```go import ( - _ "github.com/cloudfoundry/dropsonde/autowire" + _ "github.com/cloudfoundry/dropsonde" ) -``` -and it will automatically initialize, instrument the default HTTP handler for -outgoing requests, instrument itself (to count messages sent, etc.), and provide -basic [runtime stats](runtime_stats/runtime_stats.go). -Alternatively, import `github.com/cloudfoundry/dropsonde/autowire/metrics` to include the -ability to send custom metrics, via [`metrics.SendValue`](autowire/metrics/metrics.go#L44) -and [`metrics.IncrementCounter`](autowire/metrics/metrics.go#L51). (The same auto- -initialization will apply when importing `metrics`.) +func main() { + dropsonde.Initialize("localhost:3457", "router", "z1", "0") +} +``` +This initializes dropsonde, along with the logs and metrics packages. It also instruments +the default HTTP handler for outgoing requests, instrument itself (to count messages sent, etc.), +and provides basic [runtime stats](runtime_stats/runtime_stats.go). -### Configuration -Before running a program using `autowire`, you **must** set the `DROPSONDE_ORIGIN` -environment variable. This string is used by downstream portions of the dropsonde -system to track the source of metrics. Failing to set this variable will result -in the program running, but without any instrumentation. +The first argument is the destination for messages (typically metron). +The host and port is required. The remaining arguments form the origin. +This list is used by downstream portions of the dropsonde system to +track the source of metrics. -You may (optionally) set `DROPSONDE_DESTINATION` to configure the recipient of -event messages. If left unset, a [default](autowire/autowire.go#L37) will be -used. +Alternatively, import `github.com/cloudfoundry/dropsonde/metrics` to include the +ability to send custom metrics, via [`metrics.SendValue`](metrics/metrics.go#L44) +and [`metrics.IncrementCounter`](metrics/metrics.go#L51). ## Manual usage For details on manual usage of dropsonde, please refer to the @@ -47,8 +43,7 @@ types. ## Handling dropsonde events Programs wishing to emit events and metrics should use the package as described -above, or should use `dropsonde/autowire` (or `dropsonde/autowire/metrics`). For -programs that wish to process events, we provide the `dropsonde/unmarshaller` +above. For programs that wish to process events, we provide the `dropsonde/unmarshaller` and `dropsonde/marshaller` packages for decoding/reencoding raw Protocol Buffer messages. Use [`dropsonde/signature`](signature/signature_verifier.go) to sign and validate messages. diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire.go deleted file mode 100644 index 4e4cfa902..000000000 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire.go +++ /dev/null @@ -1,116 +0,0 @@ -// Package autowire provides sensible defaults for using dropsonde. -// -// The default HTTP transport is instrumented, as well as some basic stats about -// the Go runtime. Additionally, the default emitter is itself instrumented to -// periodically send "heartbeat" messages containing counts of received and sent -// events. The default emitter sends events over UDP. -// -// Use -// -// Set the DROPSONDE_ORIGIN and DROPSONDE_DESTINATION environment variables. -// (See Initialize below for details.) Anonymously import autowire: -// -// import ( -// _ "github.com/cloudfoundry/dropsonde/autowire" -// ) -// -// The package self-initializes and automatically adds instrumentation where it -// can. -package autowire - -import ( - "github.com/cloudfoundry/dropsonde" - "github.com/cloudfoundry/dropsonde/emitter" - "github.com/cloudfoundry/dropsonde/events" - "github.com/cloudfoundry/dropsonde/runtime_stats" - "log" - "net/http" - "os" - "time" -) - -var autowiredEmitter emitter.EventEmitter - -const runtimeStatsInterval = 10 * time.Second - -const DefaultDestination = "localhost:3457" - -func init() { - emitter, _ := CreateDefaultEmitter() - Initialize(emitter) -} - -// InstrumentedHandler returns a Handler pre-configured to emit HTTP server -// request metrics to autowire's Emitter. -func InstrumentedHandler(handler http.Handler) http.Handler { - return dropsonde.InstrumentedHandler(handler, autowiredEmitter) -} - -// InstrumentedRoundTripper returns a RoundTripper pre-configured to emit -// HTTP client request metrics to autowire's Emitter. -func InstrumentedRoundTripper(roundTripper http.RoundTripper) http.RoundTripper { - return dropsonde.InstrumentedRoundTripper(roundTripper, autowiredEmitter) -} - -// Initialize creates default emitters and instruments the default HTTP -// transport. -// -// The DROPSONDE_ORIGIN environment variable is required and specifies the -// source name for all metrics emitted by this process. If it is not set, the -// program will run normally but will not emit metrics. -// -// The DROPSONDE_DESTINATION environment variable sets the host and port to -// which metrics are sent. It is optional, and defaults to DefaultDestination. -func Initialize(emitter emitter.EventEmitter) { - http.DefaultTransport = &http.Transport{Proxy: http.ProxyFromEnvironment} - if emitter == nil { - autowiredEmitter = &nullEventEmitter{} - return - } - - autowiredEmitter = emitter - - go runtime_stats.NewRuntimeStats(autowiredEmitter, runtimeStatsInterval).Run(nil) - - http.DefaultTransport = InstrumentedRoundTripper(http.DefaultTransport) -} - -func CreateDefaultEmitter() (emitter.EventEmitter, string) { - origin := os.Getenv("DROPSONDE_ORIGIN") - if len(origin) == 0 { - log.Println("Failed to auto-initialize dropsonde: DROPSONDE_ORIGIN environment variable not set") - return nil, "" - } - - destination := os.Getenv("DROPSONDE_DESTINATION") - if len(destination) == 0 { - log.Println("DROPSONDE_DESTINATION not set. Using " + DefaultDestination) - destination = DefaultDestination - } - - udpEmitter, err := emitter.NewUdpEmitter(destination) - if err != nil { - log.Printf("Failed to auto-initialize dropsonde: %v\n", err) - return nil, destination - } - - hbEmitter, err := emitter.NewHeartbeatEmitter(udpEmitter, origin) - if err != nil { - log.Printf("Failed to auto-initialize dropsonde: %v\n", err) - return nil, destination - } - - return emitter.NewEventEmitter(hbEmitter, origin), destination -} - -func AutowiredEmitter() emitter.EventEmitter { - return autowiredEmitter -} - -type nullEventEmitter struct{} - -func (*nullEventEmitter) Emit(events.Event) error { - return nil -} - -func (*nullEventEmitter) Close() {} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire_suite_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire_suite_test.go deleted file mode 100644 index 684310d26..000000000 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire_suite_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package autowire_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "io/ioutil" - "log" - "testing" -) - -func TestAutowire(t *testing.T) { - log.SetOutput(ioutil.Discard) - RegisterFailHandler(Fail) - RunSpecs(t, "Autowire Suite") -} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire_test.go deleted file mode 100644 index 939b4ca29..000000000 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/autowire_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package autowire_test - -import ( - "github.com/cloudfoundry/dropsonde/autowire" - "github.com/cloudfoundry/dropsonde/emitter" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "net/http" - "os" - "reflect" -) - -var _ = Describe("Autowire", func() { - var oldDestination string - var oldOrigin string - - BeforeEach(func() { - oldDestination = os.Getenv("DROPSONDE_DESTINATION") - oldOrigin = os.Getenv("DROPSONDE_ORIGIN") - }) - - AfterEach(func() { - os.Setenv("DROPSONDE_DESTINATION", oldDestination) - os.Setenv("DROPSONDE_ORIGIN", oldOrigin) - }) - - Describe("Initialize", func() { - Context("with a non-nil emitter", func() { - It("instruments the HTTP default transport", func() { - autowire.Initialize(emitter.NewEventEmitter(nil, "")) - Expect(reflect.TypeOf(http.DefaultTransport).Elem().Name()).ToNot(Equal("Transport")) - }) - }) - - Context("with a nil-emitter", func() { - It("resets the HTTP default transport to not be instrumented", func() { - autowire.Initialize(nil) - Expect(reflect.TypeOf(http.DefaultTransport).Elem().Name()).To(Equal("Transport")) - }) - }) - }) - - Describe("CreateDefaultEmitter", func() { - Context("with DROPSONDE_ORIGIN set", func() { - BeforeEach(func() { - os.Setenv("DROPSONDE_ORIGIN", "anything") - }) - - Context("with DROPSONDE_DESTINATION missing", func() { - It("defaults to localhost", func() { - os.Setenv("DROPSONDE_DESTINATION", "") - _, destination := autowire.CreateDefaultEmitter() - - Expect(destination).To(Equal("localhost:3457")) - }) - }) - - Context("with DROPSONDE_DESTINATION set", func() { - It("uses the configured destination", func() { - os.Setenv("DROPSONDE_DESTINATION", "test") - _, destination := autowire.CreateDefaultEmitter() - - Expect(destination).To(Equal("test")) - }) - }) - }) - - Context("with DROPSONDE_ORIGIN missing", func() { - It("returns a nil-emitter", func() { - os.Setenv("DROPSONDE_ORIGIN", "") - emitter, _ := autowire.CreateDefaultEmitter() - Expect(emitter).To(BeNil()) - }) - }) - }) -}) - -type FakeHandler struct{} - -func (fh FakeHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {} - -type FakeRoundTripper struct{} - -func (frt FakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - return nil, nil -} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/controlmessage.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/controlmessage.pb.go new file mode 100644 index 000000000..4994e186e --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/controlmessage.pb.go @@ -0,0 +1,107 @@ +// Code generated by protoc-gen-gogo. +// source: controlmessage.proto +// DO NOT EDIT! + +/* +Package control is a generated protocol buffer package. + +It is generated from these files: + controlmessage.proto + heartbeatrequest.proto + uuid.proto + +It has these top-level messages: + ControlMessage +*/ +package control + +import proto "code.google.com/p/gogoprotobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +// / Type of the wrapped control. +type ControlMessage_ControlType int32 + +const ( + ControlMessage_HeartbeatRequest ControlMessage_ControlType = 1 +) + +var ControlMessage_ControlType_name = map[int32]string{ + 1: "HeartbeatRequest", +} +var ControlMessage_ControlType_value = map[string]int32{ + "HeartbeatRequest": 1, +} + +func (x ControlMessage_ControlType) Enum() *ControlMessage_ControlType { + p := new(ControlMessage_ControlType) + *p = x + return p +} +func (x ControlMessage_ControlType) String() string { + return proto.EnumName(ControlMessage_ControlType_name, int32(x)) +} +func (x *ControlMessage_ControlType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(ControlMessage_ControlType_value, data, "ControlMessage_ControlType") + if err != nil { + return err + } + *x = ControlMessage_ControlType(value) + return nil +} + +// / ControlMessage wraps a control command and adds metadata. +type ControlMessage struct { + Origin *string `protobuf:"bytes,1,req,name=origin" json:"origin,omitempty"` + Identifier *UUID `protobuf:"bytes,2,req,name=identifier" json:"identifier,omitempty"` + Timestamp *int64 `protobuf:"varint,3,req,name=timestamp" json:"timestamp,omitempty"` + ControlType *ControlMessage_ControlType `protobuf:"varint,4,req,name=controlType,enum=control.ControlMessage_ControlType" json:"controlType,omitempty"` + HeartbeatRequest *HeartbeatRequest `protobuf:"bytes,5,opt,name=heartbeatRequest" json:"heartbeatRequest,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ControlMessage) Reset() { *m = ControlMessage{} } +func (m *ControlMessage) String() string { return proto.CompactTextString(m) } +func (*ControlMessage) ProtoMessage() {} + +func (m *ControlMessage) GetOrigin() string { + if m != nil && m.Origin != nil { + return *m.Origin + } + return "" +} + +func (m *ControlMessage) GetIdentifier() *UUID { + if m != nil { + return m.Identifier + } + return nil +} + +func (m *ControlMessage) GetTimestamp() int64 { + if m != nil && m.Timestamp != nil { + return *m.Timestamp + } + return 0 +} + +func (m *ControlMessage) GetControlType() ControlMessage_ControlType { + if m != nil && m.ControlType != nil { + return *m.ControlType + } + return ControlMessage_HeartbeatRequest +} + +func (m *ControlMessage) GetHeartbeatRequest() *HeartbeatRequest { + if m != nil { + return m.HeartbeatRequest + } + return nil +} + +func init() { + proto.RegisterEnum("control.ControlMessage_ControlType", ControlMessage_ControlType_name, ControlMessage_ControlType_value) +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/heartbeatrequest.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/heartbeatrequest.pb.go new file mode 100644 index 000000000..2bfd2aa1d --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/heartbeatrequest.pb.go @@ -0,0 +1,24 @@ +// Code generated by protoc-gen-gogo. +// source: heartbeatrequest.proto +// DO NOT EDIT! + +package control + +import proto "code.google.com/p/gogoprotobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +// / A HeartbeatRequest command elicits a heartbeat from a component or app +type HeartbeatRequest struct { + XXX_unrecognized []byte `json:"-"` +} + +func (m *HeartbeatRequest) Reset() { *m = HeartbeatRequest{} } +func (m *HeartbeatRequest) String() string { return proto.CompactTextString(m) } +func (*HeartbeatRequest) ProtoMessage() {} + +func init() { +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/uuid.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/uuid.pb.go new file mode 100644 index 000000000..33c987680 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/control/uuid.pb.go @@ -0,0 +1,40 @@ +// Code generated by protoc-gen-gogo. +// source: uuid.proto +// DO NOT EDIT! + +package control + +import proto "code.google.com/p/gogoprotobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +// / Type representing a 128-bit UUID. +type UUID struct { + Low *uint64 `protobuf:"varint,1,req,name=low" json:"low,omitempty"` + High *uint64 `protobuf:"varint,2,req,name=high" json:"high,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *UUID) Reset() { *m = UUID{} } +func (m *UUID) String() string { return proto.CompactTextString(m) } +func (*UUID) ProtoMessage() {} + +func (m *UUID) GetLow() uint64 { + if m != nil && m.Low != nil { + return *m.Low + } + return 0 +} + +func (m *UUID) GetHigh() uint64 { + if m != nil && m.High != nil { + return *m.High + } + return 0 +} + +func init() { +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde.go new file mode 100644 index 000000000..2ec393182 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde.go @@ -0,0 +1,125 @@ +// Package dropsonde provides sensible defaults for using dropsonde. +// +// The default HTTP transport is instrumented, as well as some basic stats about +// the Go runtime. Additionally, the default emitter is itself instrumented to +// periodically send "heartbeat" messages containing counts of received and sent +// events. The default emitter sends events over UDP. +// +// Use +// +// dropsonde.Initialize("localhost:3457", origins...) +// +// to initialize. See package metrics and logs for other usage. + +package dropsonde + +import ( + "errors" + "fmt" + "github.com/cloudfoundry/dropsonde/emitter" + "github.com/cloudfoundry/dropsonde/events" + "github.com/cloudfoundry/dropsonde/instrumented_handler" + "github.com/cloudfoundry/dropsonde/instrumented_round_tripper" + "github.com/cloudfoundry/dropsonde/log_sender" + "github.com/cloudfoundry/dropsonde/logs" + "github.com/cloudfoundry/dropsonde/metric_sender" + "github.com/cloudfoundry/dropsonde/metrics" + "github.com/cloudfoundry/dropsonde/runtime_stats" + "github.com/cloudfoundry/gosteno" + "net/http" + "strings" + "time" +) + +var autowiredEmitter emitter.EventEmitter + +const ( + runtimeStatsInterval = 10 * time.Second + originDelimiter = "/" +) + +func init(){ + autowiredEmitter = &NullEventEmitter{} +} + +// Initialize creates default emitters and instruments the default HTTP +// transport. +// +// The origin variable is required and specifies the +// source name for all metrics emitted by this process. If it is not set, the +// program will run normally but will not emit metrics. +// +// The destination variable sets the host and port to +// which metrics are sent. It is optional, and defaults to DefaultDestination. +func Initialize(destination string, origin ...string) error { + autowiredEmitter = nil + emitter, err := createDefaultEmitter(strings.Join(origin, originDelimiter), destination) + if err != nil { + return err + } + + autowiredEmitter = emitter + initialize() + + return nil +} + +func InitializeWithEmitter(emitter emitter.EventEmitter) { + autowiredEmitter = emitter + initialize() +} + +func AutowiredEmitter() emitter.EventEmitter { + return autowiredEmitter +} + +// InstrumentedHandler returns a Handler pre-configured to emit HTTP server +// request metrics to AutowiredEmitter. +func InstrumentedHandler(handler http.Handler) http.Handler { + return instrumented_handler.InstrumentedHandler(handler, autowiredEmitter) +} + +// InstrumentedRoundTripper returns a RoundTripper pre-configured to emit +// HTTP client request metrics to AutowiredEmitter. +func InstrumentedRoundTripper(roundTripper http.RoundTripper) http.RoundTripper { + return instrumented_round_tripper.InstrumentedRoundTripper(roundTripper, autowiredEmitter) +} + +func initialize() { + metrics.Initialize(metric_sender.NewMetricSender(AutowiredEmitter())) + logs.Initialize(log_sender.NewLogSender(AutowiredEmitter(), gosteno.NewLogger("dropsonde/logs"))) + go runtime_stats.NewRuntimeStats(autowiredEmitter, runtimeStatsInterval).Run(nil) + http.DefaultTransport = InstrumentedRoundTripper(http.DefaultTransport) +} + +func createDefaultEmitter(origin, destination string) (emitter.EventEmitter, error) { + if len(origin) == 0 { + return nil, errors.New("Failed to initialize dropsonde: origin variable not set") + } + + if len(destination) == 0 { + return nil, errors.New("Failed to initialize dropsonde: destination variable not set") + } + + udpEmitter, err := emitter.NewUdpEmitter(destination) + if err != nil { + return nil, errors.New(fmt.Sprintf("Failed to initialize dropsonde: %v", err.Error())) + } + + heartbeatResponder, err := emitter.NewHeartbeatResponder(udpEmitter, origin) + if err != nil { + return nil, errors.New(fmt.Sprintf("Failed to initialize dropsonde: %v", err.Error())) + } + + go udpEmitter.ListenForHeartbeatRequest(heartbeatResponder.Respond) + + return emitter.NewEventEmitter(heartbeatResponder, origin), nil +} + +type NullEventEmitter struct{} + +func (*NullEventEmitter) Emit(events.Event) error { + return nil +} + +func (*NullEventEmitter) Close() {} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_test.go new file mode 100644 index 000000000..f775aa422 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_test.go @@ -0,0 +1,109 @@ +package dropsonde_test + +import ( + "fmt" + "net" + "net/http" + "reflect" + "time" + + "code.google.com/p/gogoprotobuf/proto" + "github.com/cloudfoundry/dropsonde" + "github.com/cloudfoundry/dropsonde/control" + "github.com/cloudfoundry/dropsonde/events" + "github.com/cloudfoundry/dropsonde/factories" + uuid "github.com/nu7hatch/gouuid" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Autowire", func() { + + Describe("Initialize", func() { + It("resets the HTTP default transport to be instrumented", func() { + dropsonde.InitializeWithEmitter(&dropsonde.NullEventEmitter{}) + Expect(reflect.TypeOf(http.DefaultTransport).Elem().Name()).To(Equal("instrumentedRoundTripper")) + }) + }) + + Describe("CreateDefaultEmitter", func() { + Context("with origin set", func() { + It("responds to heartbeat requests with heartbeats", func() { + err := dropsonde.Initialize("localhost:1235", "cf", "metron") + Expect(err).ToNot(HaveOccurred()) + + messages := make(chan []byte, 100) + readyChan := make(chan struct{}) + + go respondWithHeartbeatRequest(1235, messages, readyChan) + <-readyChan + + emitter := dropsonde.AutowiredEmitter() + + err = emitter.Emit(&events.CounterEvent{Name: proto.String("name"), Delta: proto.Uint64(1)}) + Expect(err).NotTo(HaveOccurred()) + + bytes := make([]byte, 65000) + Eventually(messages, 5).Should(Receive(&bytes)) + message := &events.Envelope{} + proto.Unmarshal(bytes, message) + Expect(message.GetOrigin()).To(Equal("cf/metron")) + }) + }) + + Context("with origin missing", func() { + It("returns a nil-emitter", func() { + err := dropsonde.Initialize("localhost:2343", "") + Expect(err).To(HaveOccurred()) + + emitter := dropsonde.AutowiredEmitter() + Expect(emitter).To(BeNil()) + }) + }) + }) +}) + +type FakeHandler struct{} + +func (fh FakeHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {} + +type FakeRoundTripper struct{} + +func (frt FakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return nil, nil +} + +func respondWithHeartbeatRequest(port int, messages chan []byte, readyChan chan struct{}) { + conn, err := net.ListenPacket("udp4", fmt.Sprintf(":%d", port)) + if err != nil { + panic(err) + } + + buf := make([]byte, 1024) + close(readyChan) + n, addr, _ := conn.ReadFrom(buf) + + conn.WriteTo(newMarshalledHeartbeatRequest(), addr) + n, addr, _ = conn.ReadFrom(buf) + + messages <- buf[:n] + conn.Close() +} + +func newMarshalledHeartbeatRequest() []byte { + id, _ := uuid.NewV4() + + heartbeatRequest := &control.ControlMessage{ + Origin: proto.String("test"), + Identifier: factories.NewControlUUID(id), + Timestamp: proto.Int64(time.Now().UnixNano()), + ControlType: control.ControlMessage_HeartbeatRequest.Enum(), + } + + bytes, err := proto.Marshal(heartbeatRequest) + if err != nil { + panic(err.Error()) + } + return bytes +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller/dropsonde_unmarshaller.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller/dropsonde_unmarshaller.go index d011b3db6..cc8dc0b93 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller/dropsonde_unmarshaller.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller/dropsonde_unmarshaller.go @@ -21,6 +21,7 @@ import ( "github.com/cloudfoundry/gosteno" "github.com/cloudfoundry/loggregatorlib/cfcomponent/instrumentation" "github.com/davecgh/go-spew/spew" + "sync" "sync/atomic" "unicode" ) @@ -43,15 +44,18 @@ func NewDropsondeUnmarshaller(logger *gosteno.Logger) DropsondeUnmarshaller { } return &dropsondeUnmarshaller{ - logger: logger, - receiveCounts: receiveCounts, + logger: logger, + receiveCounts: receiveCounts, + logMessageReceiveCounts: make(map[string]*uint64), } } type dropsondeUnmarshaller struct { - logger *gosteno.Logger - receiveCounts map[events.Envelope_EventType]*uint64 - unmarshalErrorCount uint64 + logger *gosteno.Logger + receiveCounts map[events.Envelope_EventType]*uint64 + logMessageReceiveCounts map[string]*uint64 + unmarshalErrorCount uint64 + sync.RWMutex } // Run reads byte slices from inputChan, unmarshalls them to Envelopes, and @@ -77,11 +81,27 @@ func (u *dropsondeUnmarshaller) UnmarshallMessage(message []byte) (*events.Envel } u.logger.Debugf("dropsondeUnmarshaller: received message %v", spew.Sprintf("%v", envelope)) - u.incrementReceiveCount(envelope.GetEventType()) + + if envelope.GetEventType() == events.Envelope_LogMessage { + u.incrementLogMessageReceiveCount(envelope.GetLogMessage().GetAppId()) + } else { + u.incrementReceiveCount(envelope.GetEventType()) + } return envelope, nil } +func (u *dropsondeUnmarshaller) incrementLogMessageReceiveCount(appId string) { + _, ok := u.logMessageReceiveCounts[appId] + if ok == false { + var count uint64 + u.Lock() + u.logMessageReceiveCounts[appId] = &count + u.Unlock() + } + incrementCount(u.logMessageReceiveCounts[appId]) +} + func (u *dropsondeUnmarshaller) incrementReceiveCount(eventType events.Envelope_EventType) { incrementCount(u.receiveCounts[eventType]) } @@ -98,8 +118,19 @@ func (m *dropsondeUnmarshaller) metrics() []instrumentation.Metric { modifiedEventName[0] = unicode.ToLower(modifiedEventName[0]) metricName := string(modifiedEventName) + "Received" - metricValue := atomic.LoadUint64(m.receiveCounts[events.Envelope_EventType(eventType)]) - metrics = append(metrics, instrumentation.Metric{Name: metricName, Value: metricValue}) + if eventName == "LogMessage" { + m.RLock() + for appId, count := range m.logMessageReceiveCounts { + metricValue := atomic.LoadUint64(count) + tags := make(map[string]interface{}) + tags["appId"] = appId + metrics = append(metrics, instrumentation.Metric{Name: metricName, Value: metricValue, Tags: tags}) + } + m.RUnlock() + } else { + metricValue := atomic.LoadUint64(m.receiveCounts[events.Envelope_EventType(eventType)]) + metrics = append(metrics, instrumentation.Metric{Name: metricName, Value: metricValue}) + } } metrics = append(metrics, instrumentation.Metric{ diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller/dropsonde_unmarshaller_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller/dropsonde_unmarshaller_test.go index 9e52e10a1..d66d4ae98 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller/dropsonde_unmarshaller_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller/dropsonde_unmarshaller_test.go @@ -5,8 +5,10 @@ import ( "github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller" "github.com/cloudfoundry/dropsonde/events" "github.com/cloudfoundry/dropsonde/factories" + "github.com/cloudfoundry/loggregatorlib/cfcomponent/instrumentation" "github.com/cloudfoundry/loggregatorlib/cfcomponent/instrumentation/testhelpers" "github.com/cloudfoundry/loggregatorlib/loggertesthelper" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -108,9 +110,49 @@ var _ = Describe("DropsondeUnmarshaller", func() { testhelpers.EventuallyExpectMetric(unmarshaller, "heartbeatReceived", 1) }) + It("emits a log message counter tagged with app id", func() { + envelope1 := &events.Envelope{ + Origin: proto.String("fake-origin-3"), + EventType: events.Envelope_LogMessage.Enum(), + LogMessage: factories.NewLogMessage(events.LogMessage_OUT, "test log message 1", "fake-app-id-1", "DEA"), + } + + envelope2 := &events.Envelope{ + Origin: proto.String("fake-origin-3"), + EventType: events.Envelope_LogMessage.Enum(), + LogMessage: factories.NewLogMessage(events.LogMessage_OUT, "test log message 2", "fake-app-id-2", "DEA"), + } + + message1, _ := proto.Marshal(envelope1) + message2, _ := proto.Marshal(envelope2) + + inputChan <- message1 + inputChan <- message1 + inputChan <- message2 + + Eventually(func() uint64 { + return getLogMessageCountByAppId(unmarshaller, "fake-app-id-1") + }).Should(BeNumerically("==", 2)) + + Eventually(func() uint64 { + return getLogMessageCountByAppId(unmarshaller, "fake-app-id-2") + }).Should(BeNumerically("==", 1)) + }) + It("emits an unmarshal error counter", func() { inputChan <- []byte{1, 2, 3} testhelpers.EventuallyExpectMetric(unmarshaller, "unmarshalErrors", 1) }) }) }) + +func getLogMessageCountByAppId(instrumentable instrumentation.Instrumentable, appId string) uint64 { + for _, metric := range instrumentable.Emit().Metrics { + if metric.Name == "logMessageReceived" { + if metric.Tags["appId"] == appId { + return metric.Value.(uint64) + } + } + } + return uint64(0) +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_emitter_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_emitter_test.go index 62a838419..7ecec13aa 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_emitter_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_emitter_test.go @@ -5,8 +5,8 @@ import ( "github.com/cloudfoundry/dropsonde/emitter" "github.com/cloudfoundry/dropsonde/emitter/fake" "github.com/cloudfoundry/dropsonde/events" - "github.com/cloudfoundry/dropsonde/factories" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_formatter.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_formatter.go index 13c6c2b00..9c8a6e906 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_formatter.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_formatter.go @@ -4,6 +4,7 @@ import ( "code.google.com/p/gogoprotobuf/proto" "errors" "github.com/cloudfoundry/dropsonde/events" + "time" ) var ErrorMissingOrigin = errors.New("Event not emitted due to missing origin information") @@ -14,7 +15,7 @@ func Wrap(e events.Event, origin string) (*events.Envelope, error) { return nil, ErrorMissingOrigin } - envelope := &events.Envelope{Origin: proto.String(origin)} + envelope := &events.Envelope{Origin: proto.String(origin), Timestamp: proto.Int64(time.Now().UnixNano())} switch e := e.(type) { case *events.Heartbeat: diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_formatter_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_formatter_test.go index d4aad6f52..8262df3e1 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_formatter_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/event_formatter_test.go @@ -1,11 +1,15 @@ package emitter_test import ( - "code.google.com/p/gogoprotobuf/proto" "github.com/cloudfoundry/dropsonde/emitter" + + "time" + + "code.google.com/p/gogoprotobuf/proto" "github.com/cloudfoundry/dropsonde/events" "github.com/cloudfoundry/dropsonde/factories" uuid "github.com/nu7hatch/gouuid" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -100,6 +104,11 @@ var _ = Describe("EventFormatter", func() { Expect(err.Error()).To(Equal("Event not emitted due to missing origin information")) }) }) + + It("sets the timestamp to now", func() { + envelope, _ := emitter.Wrap(testEvent, origin) + Expect(time.Unix(0, envelope.GetTimestamp())).To(BeTemporally("~", time.Now(), 100*time.Millisecond)) + }) }) }) }) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_emitter.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_emitter.go deleted file mode 100644 index 4bf008865..000000000 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_emitter.go +++ /dev/null @@ -1,95 +0,0 @@ -package emitter - -import ( - "code.google.com/p/gogoprotobuf/proto" - "log" - "os" - "runtime" - "strconv" - "sync" - "time" -) - -var HeartbeatInterval = 1 * time.Second - -func init() { - intervalOverride, err := strconv.ParseFloat(os.Getenv("DROPSONDE_HEARTBEAT_INTERVAL_SECS"), 64) - if err == nil { - HeartbeatInterval = time.Duration(intervalOverride*1000) * time.Millisecond - } -} - -type heartbeatEmitter struct { - instrumentedEmitter InstrumentedEmitter - innerHbEmitter ByteEmitter - stopChan chan struct{} - origin string - sync.Mutex - closed bool -} - -func NewHeartbeatEmitter(emitter ByteEmitter, origin string) (ByteEmitter, error) { - instrumentedEmitter, err := NewInstrumentedEmitter(emitter) - if err != nil { - return nil, err - } - - hbEmitter := &heartbeatEmitter{ - instrumentedEmitter: instrumentedEmitter, - innerHbEmitter: emitter, - origin: origin, - stopChan: make(chan struct{}), - } - - go hbEmitter.generateHeartbeats(HeartbeatInterval) - runtime.SetFinalizer(hbEmitter, (*heartbeatEmitter).Close) - - return hbEmitter, nil -} - -func (e *heartbeatEmitter) Emit(data []byte) error { - return e.instrumentedEmitter.Emit(data) -} - -func (e *heartbeatEmitter) Close() { - e.Lock() - defer e.Unlock() - - if e.closed { - return - } - - e.closed = true - close(e.stopChan) -} - -func (e *heartbeatEmitter) generateHeartbeats(heartbeatInterval time.Duration) { - defer e.instrumentedEmitter.Close() - - ticker := time.NewTicker(heartbeatInterval) - defer ticker.Stop() - for { - select { - case <-e.stopChan: - return - case <-ticker.C: - hbEvent := e.instrumentedEmitter.GetHeartbeatEvent() - hbEnvelope, err := Wrap(hbEvent, e.origin) - if err != nil { - log.Printf("Failed to wrap heartbeat event: %v\n", err) - break - } - - hbData, err := proto.Marshal(hbEnvelope) - if err != nil { - log.Printf("Failed to marshal heartbeat event: %v\n", err) - break - } - - err = e.innerHbEmitter.Emit(hbData) - if err != nil { - log.Printf("Problem while emitting heartbeat data: %v\n", err) - } - } - } -} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_emitter_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_emitter_test.go deleted file mode 100644 index 91c4ed6d2..000000000 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_emitter_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package emitter_test - -import ( - "bytes" - "code.google.com/p/gogoprotobuf/proto" - "errors" - "github.com/cloudfoundry/dropsonde/emitter" - "github.com/cloudfoundry/dropsonde/emitter/fake" - "github.com/cloudfoundry/dropsonde/events" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "log" - "time" -) - -var _ = Describe("HeartbeatEmitter", func() { - var ( - wrappedEmitter *fake.FakeByteEmitter - origin = "testHeartbeatEmitter/0" - ) - - BeforeEach(func() { - emitter.HeartbeatInterval = 10 * time.Millisecond - wrappedEmitter = fake.NewFakeByteEmitter() - }) - - Describe("NewHeartbeatEmitter", func() { - It("requires non-nil args", func() { - hbEmitter, err := emitter.NewHeartbeatEmitter(nil, origin) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("wrappedEmitter is nil")) - Expect(hbEmitter).To(BeNil()) - }) - - It("starts periodic heartbeat emission", func() { - hbEmitter, err := emitter.NewHeartbeatEmitter(wrappedEmitter, origin) - Expect(err).NotTo(HaveOccurred()) - Expect(hbEmitter).NotTo(BeNil()) - - Eventually(func() int { return len(wrappedEmitter.GetMessages()) }).Should(BeNumerically(">=", 2)) - }) - - It("logs an error when heartbeat emission fails", func() { - wrappedEmitter.ReturnError = errors.New("fake error") - - logWriter := new(bytes.Buffer) - log.SetOutput(logWriter) - - hbEmitter, _ := emitter.NewHeartbeatEmitter(wrappedEmitter, origin) - - Eventually(func() int { return len(wrappedEmitter.GetMessages()) }).Should(BeNumerically(">=", 2)) - - loggedText := string(logWriter.Bytes()) - expectedText := "fake error" - Expect(loggedText).To(ContainSubstring(expectedText)) - hbEmitter.Close() - }) - }) - - Describe("Emit", func() { - var ( - hbEmitter emitter.ByteEmitter - testData = []byte("hello") - ) - - BeforeEach(func() { - hbEmitter, _ = emitter.NewHeartbeatEmitter(wrappedEmitter, origin) - }) - - It("delegates to the wrapped emitter", func() { - hbEmitter.Emit(testData) - - messages := wrappedEmitter.GetMessages() - Expect(messages).To(HaveLen(1)) - Expect(messages[0]).To(Equal(testData)) - }) - - It("increments the heartbeat counter", func() { - hbEmitter.Emit(testData) - - Eventually(func() bool { - messages := wrappedEmitter.GetMessages() - - for _, message := range messages { - hbEnvelope := &events.Envelope{} - err := proto.Unmarshal(message, hbEnvelope) - if err != nil || hbEnvelope.GetEventType() != events.Envelope_Heartbeat { - continue // Not an envelope; keep looking - } - - hbEvent := hbEnvelope.GetHeartbeat() - - if hbEvent.GetReceivedCount() == 1 { - return true - } - } - - return false - }).Should(BeTrue()) - }) - }) - - Describe("Close", func() { - var hbEmitter emitter.ByteEmitter - - BeforeEach(func() { - hbEmitter, _ = emitter.NewHeartbeatEmitter(wrappedEmitter, origin) - }) - - It("eventually delegates to the inner heartbeat emitter", func() { - hbEmitter.Close() - Eventually(wrappedEmitter.IsClosed).Should(BeTrue()) - }) - - It("can be called more than once", func() { - hbEmitter.Close() - Expect(hbEmitter.Close).ToNot(Panic()) - }) - }) -}) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_responder.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_responder.go new file mode 100644 index 000000000..3055c1b3f --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_responder.go @@ -0,0 +1,77 @@ +package emitter + +import ( + "log" + "runtime" + "sync" + + "code.google.com/p/gogoprotobuf/proto" + "github.com/cloudfoundry/dropsonde/control" + "github.com/cloudfoundry/dropsonde/events" +) + +type heartbeatResponder struct { + instrumentedEmitter InstrumentedEmitter + innerEmitter ByteEmitter + origin string + sync.Mutex + closed bool +} + +func NewHeartbeatResponder(byteEmitter ByteEmitter, origin string) (RespondingByteEmitter, error) { + instrumentedEmitter, err := NewInstrumentedEmitter(byteEmitter) + if err != nil { + return nil, err + } + + hbEmitter := &heartbeatResponder{ + instrumentedEmitter: instrumentedEmitter, + innerEmitter: byteEmitter, + origin: origin, + } + + runtime.SetFinalizer(hbEmitter, (*heartbeatResponder).Close) + + return hbEmitter, nil +} + +func (e *heartbeatResponder) Emit(data []byte) error { + return e.instrumentedEmitter.Emit(data) +} + +func (e *heartbeatResponder) Close() { + e.Lock() + defer e.Unlock() + + if e.closed { + return + } + + e.instrumentedEmitter.Close() + e.closed = true +} + +func (e *heartbeatResponder) Respond(controlMessage *control.ControlMessage) { + hbEvent := e.instrumentedEmitter.GetHeartbeatEvent().(*events.Heartbeat) + hbEvent.ControlMessageIdentifier = convertToEventUUID(controlMessage.GetIdentifier()) + hbEnvelope, err := Wrap(hbEvent, e.origin) + if err != nil { + log.Printf("Failed to wrap heartbeat event: %v\n", err) + return + } + + hbData, err := proto.Marshal(hbEnvelope) + if err != nil { + log.Printf("Failed to marshal heartbeat event: %v\n", err) + return + } + + err = e.innerEmitter.Emit(hbData) + if err != nil { + log.Printf("Problem while emitting heartbeat data: %v\n", err) + } +} + +func convertToEventUUID(uuid *control.UUID) *events.UUID { + return &events.UUID{Low: uuid.Low, High: uuid.High} +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_responder_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_responder_test.go new file mode 100644 index 000000000..3d11eb7ab --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/heartbeat_responder_test.go @@ -0,0 +1,154 @@ +package emitter_test + +import ( + "bytes" + "errors" + "log" + "time" + + uuid "github.com/nu7hatch/gouuid" + + "code.google.com/p/gogoprotobuf/proto" + "github.com/cloudfoundry/dropsonde/control" + "github.com/cloudfoundry/dropsonde/emitter" + "github.com/cloudfoundry/dropsonde/emitter/fake" + "github.com/cloudfoundry/dropsonde/events" + "github.com/cloudfoundry/dropsonde/factories" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("HeartbeatResponder", func() { + var ( + wrappedEmitter *fake.FakeByteEmitter + origin = "testHeartbeatResponder/0" + ) + + BeforeEach(func() { + wrappedEmitter = fake.NewFakeByteEmitter() + }) + + Describe("NewHeartbeatResponder", func() { + It("requires non-nil args", func() { + heartbeatResponder, err := emitter.NewHeartbeatResponder(nil, origin) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("wrappedEmitter is nil")) + Expect(heartbeatResponder).To(BeNil()) + }) + }) + + Describe("Emit", func() { + var ( + heartbeatResponder emitter.RespondingByteEmitter + testData = []byte("hello") + ) + + BeforeEach(func() { + heartbeatResponder, _ = emitter.NewHeartbeatResponder(wrappedEmitter, origin) + }) + + It("delegates to the wrapped emitter", func() { + heartbeatResponder.Emit(testData) + + messages := wrappedEmitter.GetMessages() + Expect(messages).To(HaveLen(1)) + Expect(messages[0]).To(Equal(testData)) + }) + + It("increments the heartbeat counter", func() { + id, _ := uuid.NewV4() + + heartbeatRequest := &control.ControlMessage{ + Origin: proto.String("test"), + Identifier: factories.NewControlUUID(id), + Timestamp: proto.Int64(time.Now().UnixNano()), + ControlType: control.ControlMessage_HeartbeatRequest.Enum(), + } + heartbeatResponder.Emit(testData) + heartbeatResponder.Respond(heartbeatRequest) + + Eventually(wrappedEmitter.GetMessages).Should(HaveLen(2)) + + message := wrappedEmitter.GetMessages()[1] + hbEnvelope := &events.Envelope{} + err := proto.Unmarshal(message, hbEnvelope) + Expect(err).NotTo(HaveOccurred()) + + hbEvent := hbEnvelope.GetHeartbeat() + + Expect(hbEvent.GetReceivedCount()).To(Equal(uint64(1))) + }) + }) + + Describe("Close", func() { + var heartbeatResponder emitter.ByteEmitter + + BeforeEach(func() { + heartbeatResponder, _ = emitter.NewHeartbeatResponder(wrappedEmitter, origin) + }) + + It("eventually delegates to the inner heartbeat emitter", func() { + heartbeatResponder.Close() + Eventually(wrappedEmitter.IsClosed).Should(BeTrue()) + }) + + It("can be called more than once", func() { + heartbeatResponder.Close() + Expect(heartbeatResponder.Close).ToNot(Panic()) + }) + }) + + Describe("RespondToHeartbeat", func() { + var heartbeatResponder emitter.RespondingByteEmitter + + BeforeEach(func() { + heartbeatResponder, _ = emitter.NewHeartbeatResponder(wrappedEmitter, origin) + }) + + It("creates a Heartbeat message", func() { + id, _ := uuid.NewV4() + + heartbeatRequest := &control.ControlMessage{ + Origin: proto.String("tst"), + Identifier: factories.NewControlUUID(id), + Timestamp: proto.Int64(time.Now().UnixNano()), + ControlType: control.ControlMessage_HeartbeatRequest.Enum(), + } + + heartbeatResponder.Respond(heartbeatRequest) + Expect(wrappedEmitter.GetMessages()).To(HaveLen(1)) + hbBytes := wrappedEmitter.GetMessages()[0] + + var heartbeat events.Envelope + err := proto.Unmarshal(hbBytes, &heartbeat) + Expect(err).NotTo(HaveOccurred()) + + heartbeatUuid := heartbeatRequest.GetIdentifier().String() + Expect(heartbeat.GetHeartbeat().ControlMessageIdentifier.String()).To(Equal(heartbeatUuid)) + }) + + It("logs an error when heartbeat emission fails", func() { + wrappedEmitter.ReturnError = errors.New("fake error") + + logWriter := new(bytes.Buffer) + log.SetOutput(logWriter) + + id, _ := uuid.NewV4() + + heartbeatRequest := &control.ControlMessage{ + Origin: proto.String("tst"), + Identifier: factories.NewControlUUID(id), + Timestamp: proto.Int64(time.Now().UnixNano()), + ControlType: control.ControlMessage_HeartbeatRequest.Enum(), + } + + heartbeatResponder.Respond(heartbeatRequest) + + loggedText := string(logWriter.Bytes()) + expectedText := "Problem while emitting heartbeat data: fake error" + Expect(loggedText).To(ContainSubstring(expectedText)) + heartbeatResponder.Close() + }) + }) +}) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/instrumented_emitter.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/instrumented_emitter.go index 6bace6ce8..2b7a9bc5c 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/instrumented_emitter.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/instrumented_emitter.go @@ -2,9 +2,10 @@ package emitter import ( "errors" + "sync" + "github.com/cloudfoundry/dropsonde/events" "github.com/cloudfoundry/dropsonde/factories" - "sync" ) type InstrumentedEmitter interface { diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/instrumented_emitter_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/instrumented_emitter_test.go index 20f7a358e..7ac8d91ce 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/instrumented_emitter_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/instrumented_emitter_test.go @@ -2,9 +2,11 @@ package emitter_test import ( "errors" + "github.com/cloudfoundry/dropsonde/emitter" "github.com/cloudfoundry/dropsonde/emitter/fake" "github.com/cloudfoundry/dropsonde/events" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/responding_byte_emitter.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/responding_byte_emitter.go new file mode 100644 index 000000000..e0d240e38 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/responding_byte_emitter.go @@ -0,0 +1,8 @@ +package emitter + +import "github.com/cloudfoundry/dropsonde/control" + +type RespondingByteEmitter interface { + ByteEmitter + Respond(*control.ControlMessage) +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/udp_emitter.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/udp_emitter.go index a72f15906..522575b1f 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/udp_emitter.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/udp_emitter.go @@ -1,6 +1,8 @@ package emitter import ( + "code.google.com/p/gogoprotobuf/proto" + "github.com/cloudfoundry/dropsonde/control" "net" ) @@ -32,3 +34,25 @@ func (e *udpEmitter) Emit(data []byte) error { func (e *udpEmitter) Close() { e.udpConn.Close() } + +func (e *udpEmitter) Address() net.Addr { + return e.udpConn.LocalAddr() +} + +func (e *udpEmitter) ListenForHeartbeatRequest(responder func(*control.ControlMessage)) error { + buf := make([]byte, 1024) + for { + n, _, err := e.udpConn.ReadFrom(buf) + if err != nil { + return err + } + + controlMessage := &control.ControlMessage{} + err = proto.Unmarshal(buf[:n], controlMessage) + if err != nil { + return err + } + + responder(controlMessage) + } +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/udp_emitter_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/udp_emitter_test.go index a19be61b1..1704cbdfe 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/udp_emitter_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/emitter/udp_emitter_test.go @@ -1,10 +1,17 @@ package emitter_test import ( + "net" + "sync" + + "code.google.com/p/gogoprotobuf/proto" + "github.com/cloudfoundry/dropsonde/control" "github.com/cloudfoundry/dropsonde/emitter" + "github.com/cloudfoundry/dropsonde/factories" + uuid "github.com/nu7hatch/gouuid" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "net" ) var _ = Describe("UdpEmitter", func() { @@ -105,4 +112,82 @@ var _ = Describe("UdpEmitter", func() { }) }) }) + + Describe("ListenForHeartbeatRequest", func() { + var timesCalled int + var lastControlMessage *control.ControlMessage + var lock sync.Mutex + + var fakeResponder = func(controlMessage *control.ControlMessage) { + lock.Lock() + defer lock.Unlock() + timesCalled++ + lastControlMessage = controlMessage + } + + var getTimesCalled = func() int { + lock.Lock() + defer lock.Unlock() + return timesCalled + } + + var getReceivedHeartbeatRequest = func() *control.ControlMessage { + lock.Lock() + defer lock.Unlock() + + return lastControlMessage + } + + BeforeEach(func() { + lock = sync.Mutex{} + timesCalled = 0 + }) + + It("calls responder with the correct heartbeat request when when heartbeat is requested", func() { + emitter, _ := emitter.NewUdpEmitter("localhost:123") + go emitter.ListenForHeartbeatRequest(fakeResponder) + + Expect(timesCalled).To(BeZero()) + + heartbeatRequest := newHeartbeatRequest() + + sendHeartbeatRequest(emitter.Address(), heartbeatRequest) + Eventually(getTimesCalled).Should(Equal(1)) + Expect(getReceivedHeartbeatRequest()).To(Equal(heartbeatRequest)) + }) + + It("responds to multiple heartbeat requests", func() { + emitter, _ := emitter.NewUdpEmitter("localhost:123") + go emitter.ListenForHeartbeatRequest(fakeResponder) + sendHeartbeatRequest(emitter.Address(), newHeartbeatRequest()) + sendHeartbeatRequest(emitter.Address(), newHeartbeatRequest()) + + Eventually(getTimesCalled).Should(Equal(2)) + }) + + It("returns an error if listening on the UDP port fails", func() { + emitter, _ := emitter.NewUdpEmitter("localhost:123") + emitter.Close() + err := emitter.ListenForHeartbeatRequest(fakeResponder) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("use of closed network connection")) + }) + }) }) + +func sendHeartbeatRequest(addr net.Addr, message *control.ControlMessage) { + encodedMessage, _ := proto.Marshal(message) + conn, _ := net.ListenPacket("udp4", "") + conn.WriteTo(encodedMessage, addr) +} + +func newHeartbeatRequest() *control.ControlMessage { + id, _ := uuid.NewV4() + + return &control.ControlMessage{ + Origin: proto.String("test"), + Identifier: factories.NewControlUUID(id), + Timestamp: proto.Int64(0), + ControlType: control.ControlMessage_HeartbeatRequest.Enum(), + } +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope_extensions.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions.go similarity index 62% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope_extensions.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions.go index 44cd5d7eb..900e44579 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope_extensions.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions.go @@ -1,29 +1,30 @@ -package events +package envelope_extensions import ( "encoding/binary" "fmt" + "github.com/cloudfoundry/dropsonde/events" ) const SystemAppId = "system" type hasAppId interface { - GetApplicationId() *UUID + GetApplicationId() *events.UUID } -func (m *Envelope) GetAppId() string { - if m.GetEventType() == Envelope_LogMessage { +func GetAppId(m *events.Envelope) string { + if m.GetEventType() == events.Envelope_LogMessage { logMessage := m.GetLogMessage() return logMessage.GetAppId() } var event hasAppId switch m.GetEventType() { - case Envelope_HttpStart: + case events.Envelope_HttpStart: event = m.GetHttpStart() - case Envelope_HttpStop: + case events.Envelope_HttpStop: event = m.GetHttpStop() - case Envelope_HttpStartStop: + case events.Envelope_HttpStartStop: event = m.GetHttpStartStop() default: return SystemAppId @@ -31,12 +32,12 @@ func (m *Envelope) GetAppId() string { uuid := event.GetApplicationId() if uuid != nil { - return uuid.FormattedString() + return formatUUID(uuid) } return SystemAppId } -func (id *UUID) FormattedString() string { +func formatUUID(id *events.UUID) string { var u [16]byte binary.LittleEndian.PutUint64(u[:8], id.GetLow()) binary.LittleEndian.PutUint64(u[8:], id.GetHigh()) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions_suite_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions_suite_test.go new file mode 100644 index 000000000..ec869a809 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions_suite_test.go @@ -0,0 +1,13 @@ +package envelope_extensions_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestEnvelopeExtensions(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "EnvelopeExtensions Suite") +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope_extensions_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions_test.go similarity index 78% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope_extensions_test.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions_test.go index 86d4ccd7b..25bd38dae 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope_extensions_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/envelope_extensions/envelope_extensions_test.go @@ -1,9 +1,10 @@ -package events_test +package envelope_extensions_test import ( + "code.google.com/p/gogoprotobuf/proto" + "github.com/cloudfoundry/dropsonde/envelope_extensions" "github.com/cloudfoundry/dropsonde/events" - "code.google.com/p/gogoprotobuf/proto" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -21,7 +22,7 @@ var _ = Describe("EnvelopeExtensions", func() { EventType: events.Envelope_HttpStart.Enum(), HttpStart: &events.HttpStart{ApplicationId: testAppUuid}, } - appId := envelope.GetAppId() + appId := envelope_extensions.GetAppId(envelope) Expect(appId).To(Equal("01000000-0000-0000-0200-000000000000")) }) @@ -30,8 +31,8 @@ var _ = Describe("EnvelopeExtensions", func() { EventType: events.Envelope_HttpStart.Enum(), HttpStart: &events.HttpStart{}, } - appId := envelope.GetAppId() - Expect(appId).To(Equal(events.SystemAppId)) + appId := envelope_extensions.GetAppId(envelope) + Expect(appId).To(Equal(envelope_extensions.SystemAppId)) }) }) @@ -41,7 +42,7 @@ var _ = Describe("EnvelopeExtensions", func() { EventType: events.Envelope_HttpStop.Enum(), HttpStop: &events.HttpStop{ApplicationId: testAppUuid}, } - appId := envelope.GetAppId() + appId := envelope_extensions.GetAppId(envelope) Expect(appId).To(Equal("01000000-0000-0000-0200-000000000000")) }) }) @@ -52,7 +53,7 @@ var _ = Describe("EnvelopeExtensions", func() { EventType: events.Envelope_HttpStartStop.Enum(), HttpStartStop: &events.HttpStartStop{ApplicationId: testAppUuid}, } - appId := envelope.GetAppId() + appId := envelope_extensions.GetAppId(envelope) Expect(appId).To(Equal("01000000-0000-0000-0200-000000000000")) }) }) @@ -63,7 +64,7 @@ var _ = Describe("EnvelopeExtensions", func() { EventType: events.Envelope_LogMessage.Enum(), LogMessage: &events.LogMessage{AppId: proto.String("test-app-id")}, } - appId := envelope.GetAppId() + appId := envelope_extensions.GetAppId(envelope) Expect(appId).To(Equal("test-app-id")) }) }) @@ -73,8 +74,8 @@ var _ = Describe("EnvelopeExtensions", func() { envelope := &events.Envelope{ EventType: events.Envelope_Heartbeat.Enum(), } - appId := envelope.GetAppId() - Expect(appId).To(Equal(events.SystemAppId)) + appId := envelope_extensions.GetAppId(envelope) + Expect(appId).To(Equal(envelope_extensions.SystemAppId)) }) }) }) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope.pb.go index 953f3db33..e45987917 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope.pb.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/envelope.pb.go @@ -7,10 +7,12 @@ Package events is a generated protocol buffer package. It is generated from these files: envelope.proto + error.proto heartbeat.proto http.proto log.proto metric.proto + uuid.proto It has these top-level messages: Envelope @@ -24,6 +26,7 @@ import math "math" var _ = proto.Marshal var _ = math.Inf +// / Type of the wrapped event. type Envelope_EventType int32 const ( @@ -34,6 +37,7 @@ const ( Envelope_LogMessage Envelope_EventType = 5 Envelope_ValueMetric Envelope_EventType = 6 Envelope_CounterEvent Envelope_EventType = 7 + Envelope_Error Envelope_EventType = 8 ) var Envelope_EventType_name = map[int32]string{ @@ -44,6 +48,7 @@ var Envelope_EventType_name = map[int32]string{ 5: "LogMessage", 6: "ValueMetric", 7: "CounterEvent", + 8: "Error", } var Envelope_EventType_value = map[string]int32{ "Heartbeat": 1, @@ -53,6 +58,7 @@ var Envelope_EventType_value = map[string]int32{ "LogMessage": 5, "ValueMetric": 6, "CounterEvent": 7, + "Error": 8, } func (x Envelope_EventType) Enum() *Envelope_EventType { @@ -72,6 +78,7 @@ func (x *Envelope_EventType) UnmarshalJSON(data []byte) error { return nil } +// / Envelope wraps an Event and adds metadata. type Envelope struct { Origin *string `protobuf:"bytes,1,req,name=origin" json:"origin,omitempty"` EventType *Envelope_EventType `protobuf:"varint,2,req,name=eventType,enum=events.Envelope_EventType" json:"eventType,omitempty"` @@ -83,6 +90,7 @@ type Envelope struct { LogMessage *LogMessage `protobuf:"bytes,8,opt,name=logMessage" json:"logMessage,omitempty"` ValueMetric *ValueMetric `protobuf:"bytes,9,opt,name=valueMetric" json:"valueMetric,omitempty"` CounterEvent *CounterEvent `protobuf:"bytes,10,opt,name=counterEvent" json:"counterEvent,omitempty"` + Error *Error `protobuf:"bytes,11,opt,name=error" json:"error,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -160,6 +168,13 @@ func (m *Envelope) GetCounterEvent() *CounterEvent { return nil } +func (m *Envelope) GetError() *Error { + if m != nil { + return m.Error + } + return nil +} + func init() { proto.RegisterEnum("events.Envelope_EventType", Envelope_EventType_name, Envelope_EventType_value) } diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/error.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/error.pb.go new file mode 100644 index 000000000..7bf0d0ed5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/error.pb.go @@ -0,0 +1,48 @@ +// Code generated by protoc-gen-gogo. +// source: error.proto +// DO NOT EDIT! + +package events + +import proto "code.google.com/p/gogoprotobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +// / An Error event represents an error in the originating process. +type Error struct { + Source *string `protobuf:"bytes,1,req,name=source" json:"source,omitempty"` + Code *int32 `protobuf:"varint,2,req,name=code" json:"code,omitempty"` + Message *string `protobuf:"bytes,3,req,name=message" json:"message,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Error) Reset() { *m = Error{} } +func (m *Error) String() string { return proto.CompactTextString(m) } +func (*Error) ProtoMessage() {} + +func (m *Error) GetSource() string { + if m != nil && m.Source != nil { + return *m.Source + } + return "" +} + +func (m *Error) GetCode() int32 { + if m != nil && m.Code != nil { + return *m.Code + } + return 0 +} + +func (m *Error) GetMessage() string { + if m != nil && m.Message != nil { + return *m.Message + } + return "" +} + +func init() { +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/events_suite_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/events_suite_test.go deleted file mode 100644 index 79b1bbf7c..000000000 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/events_suite_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package events_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "testing" -) - -func TestEvents(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Events Suite") -} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/heartbeat.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/heartbeat.pb.go index 979e8f1eb..27cbe292e 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/heartbeat.pb.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/heartbeat.pb.go @@ -11,11 +11,13 @@ import math "math" var _ = proto.Marshal var _ = math.Inf +// / A Heartbeat event both indicates liveness of the emitter, and communicates counts of events processed. type Heartbeat struct { - SentCount *uint64 `protobuf:"varint,1,req,name=sentCount" json:"sentCount,omitempty"` - ReceivedCount *uint64 `protobuf:"varint,2,req,name=receivedCount" json:"receivedCount,omitempty"` - ErrorCount *uint64 `protobuf:"varint,3,req,name=errorCount" json:"errorCount,omitempty"` - XXX_unrecognized []byte `json:"-"` + SentCount *uint64 `protobuf:"varint,1,req,name=sentCount" json:"sentCount,omitempty"` + ReceivedCount *uint64 `protobuf:"varint,2,req,name=receivedCount" json:"receivedCount,omitempty"` + ErrorCount *uint64 `protobuf:"varint,3,req,name=errorCount" json:"errorCount,omitempty"` + ControlMessageIdentifier *UUID `protobuf:"bytes,4,opt,name=controlMessageIdentifier" json:"controlMessageIdentifier,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *Heartbeat) Reset() { *m = Heartbeat{} } @@ -43,5 +45,12 @@ func (m *Heartbeat) GetErrorCount() uint64 { return 0 } +func (m *Heartbeat) GetControlMessageIdentifier() *UUID { + if m != nil { + return m.ControlMessageIdentifier + } + return nil +} + func init() { } diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/http.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/http.pb.go index d6849f41e..14db3957a 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/http.pb.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/http.pb.go @@ -11,6 +11,7 @@ import math "math" var _ = proto.Marshal var _ = math.Inf +// / Type of peer handling request. type PeerType int32 const ( @@ -44,6 +45,7 @@ func (x *PeerType) UnmarshalJSON(data []byte) error { return nil } +// / HTTP method. type Method int32 const ( @@ -86,30 +88,7 @@ func (x *Method) UnmarshalJSON(data []byte) error { return nil } -type UUID struct { - Low *uint64 `protobuf:"varint,1,req,name=low" json:"low,omitempty"` - High *uint64 `protobuf:"varint,2,req,name=high" json:"high,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *UUID) Reset() { *m = UUID{} } -func (m *UUID) String() string { return proto.CompactTextString(m) } -func (*UUID) ProtoMessage() {} - -func (m *UUID) GetLow() uint64 { - if m != nil && m.Low != nil { - return *m.Low - } - return 0 -} - -func (m *UUID) GetHigh() uint64 { - if m != nil && m.High != nil { - return *m.High - } - return 0 -} - +// / An HttpStart event is emitted when a client sends a request (or immediately when a server receives the request). type HttpStart struct { Timestamp *int64 `protobuf:"varint,1,req,name=timestamp" json:"timestamp,omitempty"` RequestId *UUID `protobuf:"bytes,2,req,name=requestId" json:"requestId,omitempty"` @@ -206,6 +185,7 @@ func (m *HttpStart) GetInstanceId() string { return "" } +// / An HttpStop event is emitted when a client receives a response to its request (or when a server completes its handling and returns a response). type HttpStop struct { Timestamp *int64 `protobuf:"varint,1,req,name=timestamp" json:"timestamp,omitempty"` Uri *string `protobuf:"bytes,2,req,name=uri" json:"uri,omitempty"` @@ -270,6 +250,7 @@ func (m *HttpStop) GetApplicationId() *UUID { return nil } +// / An HttpStartStop event represents the whole lifecycle of an HTTP request. type HttpStartStop struct { StartTimestamp *int64 `protobuf:"varint,1,req,name=startTimestamp" json:"startTimestamp,omitempty"` StopTimestamp *int64 `protobuf:"varint,2,req,name=stopTimestamp" json:"stopTimestamp,omitempty"` diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/log.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/log.pb.go index 5c438e9cf..6e8fb297e 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/log.pb.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/log.pb.go @@ -11,6 +11,7 @@ import math "math" var _ = proto.Marshal var _ = math.Inf +// / MessageType stores the destination of the message (corresponding to STDOUT or STDERR). type LogMessage_MessageType int32 const ( @@ -44,6 +45,7 @@ func (x *LogMessage_MessageType) UnmarshalJSON(data []byte) error { return nil } +// / A LogMessage contains a "log line" and associated metadata. type LogMessage struct { Message []byte `protobuf:"bytes,1,req,name=message" json:"message,omitempty"` MessageType *LogMessage_MessageType `protobuf:"varint,2,req,name=message_type,enum=events.LogMessage_MessageType" json:"message_type,omitempty"` diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/metric.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/metric.pb.go index 31b966ab6..adc617fcc 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/metric.pb.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/metric.pb.go @@ -11,6 +11,7 @@ import math "math" var _ = proto.Marshal var _ = math.Inf +// / A ValueMetric indicates the value of a metric at an instant in time. type ValueMetric struct { Name *string `protobuf:"bytes,1,req,name=name" json:"name,omitempty"` Value *float64 `protobuf:"fixed64,2,req,name=value" json:"value,omitempty"` @@ -43,6 +44,7 @@ func (m *ValueMetric) GetUnit() string { return "" } +// / A CounterEvent represents the increment of a counter. It contains only the change in the value; it is the responsibility of downstream consumers to maintain the value of the counter. type CounterEvent struct { Name *string `protobuf:"bytes,1,req,name=name" json:"name,omitempty"` Delta *uint64 `protobuf:"varint,2,req,name=delta" json:"delta,omitempty"` diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/uuid.pb.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/uuid.pb.go new file mode 100644 index 000000000..91d758c62 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/events/uuid.pb.go @@ -0,0 +1,40 @@ +// Code generated by protoc-gen-gogo. +// source: uuid.proto +// DO NOT EDIT! + +package events + +import proto "code.google.com/p/gogoprotobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +// / Type representing a 128-bit UUID. +type UUID struct { + Low *uint64 `protobuf:"varint,1,req,name=low" json:"low,omitempty"` + High *uint64 `protobuf:"varint,2,req,name=high" json:"high,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *UUID) Reset() { *m = UUID{} } +func (m *UUID) String() string { return proto.CompactTextString(m) } +func (*UUID) ProtoMessage() {} + +func (m *UUID) GetLow() uint64 { + if m != nil && m.Low != nil { + return *m.Low + } + return 0 +} + +func (m *UUID) GetHigh() uint64 { + if m != nil && m.High != nil { + return *m.High + } + return 0 +} + +func init() { +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/factories/factories.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/factories/factories.go index 13a9d8fac..ea591af27 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/factories/factories.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/factories/factories.go @@ -4,6 +4,7 @@ import ( "code.google.com/p/gogoprotobuf/proto" "encoding/binary" "fmt" + "github.com/cloudfoundry/dropsonde/control" "github.com/cloudfoundry/dropsonde/events" uuid "github.com/nu7hatch/gouuid" "net/http" @@ -15,6 +16,10 @@ func NewUUID(id *uuid.UUID) *events.UUID { return &events.UUID{Low: proto.Uint64(binary.LittleEndian.Uint64(id[:8])), High: proto.Uint64(binary.LittleEndian.Uint64(id[8:]))} } +func NewControlUUID(id *uuid.UUID) *control.UUID { + return &control.UUID{Low: proto.Uint64(binary.LittleEndian.Uint64(id[:8])), High: proto.Uint64(binary.LittleEndian.Uint64(id[8:]))} +} + func NewHttpStart(req *http.Request, peerType events.PeerType, requestId *uuid.UUID) *events.HttpStart { httpStart := &events.HttpStart{ Timestamp: proto.Int64(time.Now().UnixNano()), diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler.go similarity index 98% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler.go index 8276ee744..e4e977ccb 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler.go @@ -1,4 +1,4 @@ -package dropsonde +package instrumented_handler import ( "bufio" diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler_suite_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler_suite_test.go new file mode 100644 index 000000000..a2c92453c --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler_suite_test.go @@ -0,0 +1,13 @@ +package instrumented_handler_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestInstrumentedHandler(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "InstrumentedHandler Suite") +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler_test.go similarity index 91% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler_test.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler_test.go index 59255edcf..4b3d30cf8 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_handler/instrumented_handler_test.go @@ -1,10 +1,10 @@ -package dropsonde_test +package instrumented_handler_test import ( "errors" - "github.com/cloudfoundry/dropsonde" "github.com/cloudfoundry/dropsonde/emitter/fake" "github.com/cloudfoundry/dropsonde/events" + "github.com/cloudfoundry/dropsonde/instrumented_handler" uuid "github.com/nu7hatch/gouuid" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -31,7 +31,7 @@ var _ = Describe("InstrumentedHandler", func() { var err error fh := FakeHandler{} - h = dropsonde.InstrumentedHandler(fh, fakeEmitter) + h = instrumented_handler.InstrumentedHandler(fh, fakeEmitter) req, err = http.NewRequest("GET", "http://foo.example.com/", nil) Expect(err).ToNot(HaveOccurred()) req.RemoteAddr = "127.0.0.1" @@ -39,7 +39,7 @@ var _ = Describe("InstrumentedHandler", func() { }) AfterEach(func() { - dropsonde.GenerateUuid = uuid.NewV4 + instrumented_handler.GenerateUuid = uuid.NewV4 }) Describe("request ID", func() { @@ -71,7 +71,7 @@ var _ = Describe("InstrumentedHandler", func() { }) It("should use an empty request ID if generating a new one fails", func() { - dropsonde.GenerateUuid = func() (u *uuid.UUID, err error) { + instrumented_handler.GenerateUuid = func() (u *uuid.UUID, err error) { return nil, errors.New("test error") } h.ServeHTTP(httptest.NewRecorder(), req) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper.go similarity index 96% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper.go index 441a6b8b9..13db00bc2 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper.go @@ -1,4 +1,4 @@ -package dropsonde +package instrumented_round_tripper import ( "github.com/cloudfoundry/dropsonde/emitter" @@ -64,3 +64,5 @@ func (irt *instrumentedRoundTripper) RoundTrip(req *http.Request) (*http.Respons return resp, roundTripErr } + +var GenerateUuid = uuid.NewV4 diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper_suite_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper_suite_test.go new file mode 100644 index 000000000..79bb4f898 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper_suite_test.go @@ -0,0 +1,13 @@ +package instrumented_round_tripper_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestInstrumentedRoundTripper(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "InstrumentedRoundTripper Suite") +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper_test.go similarity index 90% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper_test.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper_test.go index ce38f77ab..a2d3a84c6 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/instrumented_round_tripper/instrumented_round_tripper_test.go @@ -1,11 +1,11 @@ -package dropsonde_test +package instrumented_round_tripper_test import ( "errors" - "github.com/cloudfoundry/dropsonde" "github.com/cloudfoundry/dropsonde/emitter/fake" "github.com/cloudfoundry/dropsonde/events" "github.com/cloudfoundry/dropsonde/factories" + "github.com/cloudfoundry/dropsonde/instrumented_round_tripper" uuid "github.com/nu7hatch/gouuid" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -33,7 +33,7 @@ var _ = Describe("InstrumentedRoundTripper", func() { fakeEmitter = fake.NewFakeEventEmitter(origin) fakeRoundTripper = new(FakeRoundTripper) - rt = dropsonde.InstrumentedRoundTripper(fakeRoundTripper, fakeEmitter) + rt = instrumented_round_tripper.InstrumentedRoundTripper(fakeRoundTripper, fakeEmitter) req, err = http.NewRequest("GET", "http://foo.example.com/", nil) Expect(err).ToNot(HaveOccurred()) @@ -50,12 +50,12 @@ var _ = Describe("InstrumentedRoundTripper", func() { Context("if request ID can't be generated", func() { BeforeEach(func() { - dropsonde.GenerateUuid = func() (u *uuid.UUID, err error) { + instrumented_round_tripper.GenerateUuid = func() (u *uuid.UUID, err error) { return nil, errors.New("test error") } }) AfterEach(func() { - dropsonde.GenerateUuid = uuid.NewV4 + instrumented_round_tripper.GenerateUuid = uuid.NewV4 }) It("defaults to an empty request ID", func() { diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/integration_test/autowire_end_to_end_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/integration_test/dropsonde_end_to_end_test.go similarity index 66% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/integration_test/autowire_end_to_end_test.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/integration_test/dropsonde_end_to_end_test.go index 77478a888..7b0fa5b6c 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/integration_test/autowire_end_to_end_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/integration_test/dropsonde_end_to_end_test.go @@ -3,35 +3,32 @@ package integration_test import ( "code.google.com/p/gogoprotobuf/proto" "fmt" - "github.com/cloudfoundry/dropsonde/autowire" - "github.com/cloudfoundry/dropsonde/autowire/metrics" + "github.com/cloudfoundry/dropsonde" + "github.com/cloudfoundry/dropsonde/control" "github.com/cloudfoundry/dropsonde/events" + "github.com/cloudfoundry/dropsonde/factories" "github.com/cloudfoundry/dropsonde/metric_sender" + "github.com/cloudfoundry/dropsonde/metrics" + uuid "github.com/nu7hatch/gouuid" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "net" "net/http" - "os" + "strings" "sync" + "time" ) // these tests need to be invoked individually from an external script, // since environment variables need to be set/unset before starting the tests var _ = Describe("Autowire End-to-End", func() { - Context("with DROPSONDE_ORIGIN set", func() { - var oldEnv string + Context("with standard initialization", func() { + origin := []string{"test-origin"} BeforeEach(func() { - oldEnv = os.Getenv("DROPSONDE_ORIGIN") - os.Setenv("DROPSONDE_ORIGIN", "test-origin") - emitter, _ := autowire.CreateDefaultEmitter() - autowire.Initialize(emitter) - metrics.Initialize(metric_sender.NewMetricSender(autowire.AutowiredEmitter())) - }) - - AfterEach(func() { - os.Setenv("DROPSONDE_ORIGIN", oldEnv) + dropsonde.Initialize("localhost:3457", origin...) + metrics.Initialize(metric_sender.NewMetricSender(dropsonde.AutowiredEmitter())) }) It("emits HTTP client/server events and heartbeats", func() { @@ -41,18 +38,28 @@ var _ = Describe("Autowire End-to-End", func() { udpDataChan := make(chan []byte, 16) receivedEvents := make(map[string]bool) + heartbeatUuidsChan := make(chan string, 1000) + lock := sync.RWMutex{} - origin := os.Getenv("DROPSONDE_ORIGIN") + heartbeatRequest := newHeartbeatRequest() + marshalledHeartbeatRequest, _ := proto.Marshal(heartbeatRequest) + requestedHeartbeat := false go func() { defer close(udpDataChan) for { buffer := make([]byte, 1024) - n, _, err := udpListener.ReadFrom(buffer) + n, addr, err := udpListener.ReadFrom(buffer) if err != nil { return } + if !requestedHeartbeat { + + udpListener.WriteTo(marshalledHeartbeatRequest, addr) + requestedHeartbeat = true + } + if n == 0 { panic("Received empty packet") } @@ -70,6 +77,7 @@ var _ = Describe("Autowire End-to-End", func() { case events.Envelope_HttpStop: eventId += envelope.GetHttpStop().GetPeerType().String() case events.Envelope_Heartbeat: + heartbeatUuidsChan <- envelope.GetHeartbeat().GetControlMessageIdentifier().String() case events.Envelope_ValueMetric: eventId += envelope.GetValueMetric().GetName() case events.Envelope_CounterEvent: @@ -79,7 +87,7 @@ var _ = Describe("Autowire End-to-End", func() { } - if envelope.GetOrigin() != origin { + if envelope.GetOrigin() != strings.Join(origin, "/") { panic("origin not as expected") } @@ -94,7 +102,7 @@ var _ = Describe("Autowire End-to-End", func() { httpListener, err := net.Listen("tcp", "localhost:0") Expect(err).ToNot(HaveOccurred()) defer httpListener.Close() - httpHandler := autowire.InstrumentedHandler(FakeHandler{}) + httpHandler := dropsonde.InstrumentedHandler(FakeHandler{}) go http.Serve(httpListener, httpHandler) _, err = http.Get("http://" + httpListener.Addr().String()) @@ -114,12 +122,9 @@ var _ = Describe("Autowire End-to-End", func() { }).Should(BeTrue(), fmt.Sprintf("missing %s", eventType)) } - Eventually(func() bool { - lock.RLock() - defer lock.RUnlock() - _, ok := receivedEvents["Heartbeat"] - return ok - }).Should(BeTrue()) + heartbeatUuid := heartbeatRequest.GetIdentifier().String() + Eventually(heartbeatUuidsChan).Should(Receive(Equal(heartbeatUuid))) + }) }) }) @@ -135,3 +140,14 @@ type FakeRoundTripper struct{} func (frt FakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return nil, nil } + +func newHeartbeatRequest() *control.ControlMessage { + id, _ := uuid.NewV4() + + return &control.ControlMessage{ + Origin: proto.String("MET"), + Identifier: factories.NewControlUUID(id), + Timestamp: proto.Int64(time.Now().UnixNano()), + ControlType: control.ControlMessage_HeartbeatRequest.Enum(), + } +} diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/logs/logs.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/logs/logs.go similarity index 76% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/logs/logs.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/logs/logs.go index f1f691f6e..54ea41acd 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/logs/logs.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/logs/logs.go @@ -3,11 +3,10 @@ // // Use // -// See the documentation for package autowire for details on configuring through -// environment variables. +// See the documentation for package dropsonde for configuration details. // -// Import the package (note that you do not need to additionally import -// autowire). The package self-initializes; to send logs use +// Importing package dropsonde and initializing will initial this package. +// To send logs use // // logs.SendAppLog(appId, message, sourceType, sourceInstance) // @@ -17,22 +16,14 @@ package logs import ( - "github.com/cloudfoundry/dropsonde/autowire" "github.com/cloudfoundry/dropsonde/log_sender" - "github.com/cloudfoundry/gosteno" "io" ) var logSender log_sender.LogSender -func init() { - Initialize(log_sender.NewLogSender(autowire.AutowiredEmitter(), gosteno.NewLogger("autowire/logs"))) -} - // Initialize prepares the logs package for use with the automatic Emitter -// from dropsonde/autowire. This function is called by the package's init -// method, so should only be explicitly called to reset the default -// LogSender, e.g. in tests. +// from dropsonde. func Initialize(ls log_sender.LogSender) { logSender = ls } @@ -41,6 +32,9 @@ func Initialize(ls log_sender.LogSender) { // and source instance, with a message type of std out. // Returns an error if one occurs while sending the event. func SendAppLog(appId, message, sourceType, sourceInstance string) error { + if logSender == nil { + return nil + } return logSender.SendAppLog(appId, message, sourceType, sourceInstance) } @@ -48,17 +42,26 @@ func SendAppLog(appId, message, sourceType, sourceInstance string) error { // and source instance, with a message type of std err. // Returns an error if one occurs while sending the event. func SendAppErrorLog(appId, message, sourceType, sourceInstance string) error { + if logSender == nil { + return nil + } return logSender.SendAppErrorLog(appId, message, sourceType, sourceInstance) } // ScanLogStream sends a log message with the given meta-data for each line from reader. // Restarts on read errors and continues until EOF (or stopChan is closed). func ScanLogStream(appId, sourceType, sourceInstance string, reader io.Reader, stopChan chan struct{}) { + if logSender == nil { + return + } logSender.ScanLogStream(appId, sourceType, sourceInstance, reader, stopChan) } // ScanErrorLogStream sends a log error message with the given meta-data for each line from reader. // Restarts on read errors and continues until EOF (or stopChan is closed). func ScanErrorLogStream(appId, sourceType, sourceInstance string, reader io.Reader, stopChan chan struct{}) { + if logSender == nil { + return + } logSender.ScanErrorLogStream(appId, sourceType, sourceInstance, reader, stopChan) } diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/logs/logs_suite_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/logs/logs_suite_test.go similarity index 100% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/logs/logs_suite_test.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/logs/logs_suite_test.go diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/logs/logs_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/logs/logs_test.go similarity index 64% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/logs/logs_test.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/logs/logs_test.go index 266bc2580..cba94f416 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/logs/logs_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/logs/logs_test.go @@ -1,8 +1,8 @@ package logs_test import ( - "github.com/cloudfoundry/dropsonde/autowire/logs" "github.com/cloudfoundry/dropsonde/log_sender/fake" + "github.com/cloudfoundry/dropsonde/logs" "errors" . "github.com/onsi/ginkgo" @@ -46,4 +46,29 @@ var _ = Describe("Logs", func() { Expect(err).To(HaveOccurred()) }) }) + + Context("when Metric Sender is not initialized", func() { + BeforeEach(func() { + logs.Initialize(nil) + }) + + It("SendAppLog is a no-op", func() { + err := logs.SendAppLog("app-id", "custom-log-message", "App", "0") + Expect(err).ToNot(HaveOccurred()) + }) + + It("SendAppErrorLog is a no-op", func() { + err := logs.SendAppErrorLog("app-id", "custom-log-error-message", "App", "0") + Expect(err).ToNot(HaveOccurred()) + }) + + It("ScanLogStream is a no-op", func() { + Expect(func() { logs.ScanLogStream("app-id", "src-type", "src-instance", nil, nil) }).ShouldNot(Panic()) + }) + + It("ScanErrorLogStream is a no-op", func() { + Expect(func() { logs.ScanErrorLogStream("app-id", "src-type", "src-instance", nil, nil) }).ShouldNot(Panic()) + }) + + }) }) diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/metrics/metrics.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/metrics/metrics.go similarity index 73% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/metrics/metrics.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/metrics/metrics.go index 7772c50f6..1a0e2adc7 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/metrics/metrics.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/metrics/metrics.go @@ -3,11 +3,10 @@ // // Use // -// See the documentation for package autowire for details on configuring through -// environment variables. +// See the documentation for package dropsonde for configuration details. // -// Import the package (note that you do not need to additionally import -// autowire). The package self-initializes; to send metrics use +// Importing package dropsonde and initializing will initial this package. +// To send metrics use // // metrics.SendValue(name, value, unit) // @@ -21,20 +20,12 @@ package metrics import ( - "github.com/cloudfoundry/dropsonde/autowire" "github.com/cloudfoundry/dropsonde/metric_sender" ) var metricSender metric_sender.MetricSender -func init() { - Initialize(metric_sender.NewMetricSender(autowire.AutowiredEmitter())) -} - -// Initialize prepares the metrics package for use with the automatic Emitter -// from dropsonde/autowire. This function is called by the package's init -// method, so should only be explicitly called to reset the default -// MetricSender, e.g. in tests. +// Initialize prepares the metrics package for use with the automatic Emitter. func Initialize(ms metric_sender.MetricSender) { metricSender = ms } @@ -42,6 +33,9 @@ func Initialize(ms metric_sender.MetricSender) { // SendValue sends a value event for the named metric. See // http://metrics20.org/spec/#units for the specifications on allowed units. func SendValue(name string, value float64, unit string) error { + if metricSender == nil { + return nil + } return metricSender.SendValue(name, value, unit) } @@ -49,6 +43,9 @@ func SendValue(name string, value float64, unit string) error { // Maintaining the value of the counter is the responsibility of the receiver of // the event, not the process that includes this package. func IncrementCounter(name string) error { + if metricSender == nil { + return nil + } return metricSender.IncrementCounter(name) } @@ -56,5 +53,8 @@ func IncrementCounter(name string) error { // (positive) delta. Maintaining the value of the counter is the responsibility // of the receiver, as with IncrementCounter. func AddToCounter(name string, delta uint64) error { + if metricSender == nil { + return nil + } return metricSender.AddToCounter(name, delta) } diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/metrics/metrics_suite_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/metrics/metrics_suite_test.go similarity index 100% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/metrics/metrics_suite_test.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/metrics/metrics_suite_test.go diff --git a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/metrics/metrics_test.go b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/metrics/metrics_test.go similarity index 65% rename from Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/metrics/metrics_test.go rename to Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/metrics/metrics_test.go index d2a6ffb8e..27afb5a27 100644 --- a/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/autowire/metrics/metrics_test.go +++ b/Godeps/_workspace/src/github.com/cloudfoundry/dropsonde/metrics/metrics_test.go @@ -1,8 +1,8 @@ package metrics_test import ( - "github.com/cloudfoundry/dropsonde/autowire/metrics" "github.com/cloudfoundry/dropsonde/metric_sender/fake" + "github.com/cloudfoundry/dropsonde/metrics" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -42,4 +42,30 @@ var _ = Describe("Metrics", func() { Expect(fakeMetricSender.GetCounter("count")).To(BeEquivalentTo(15)) }) + + Context("when Metric Sender is not initialized", func() { + + BeforeEach(func() { + metrics.Initialize(nil) + }) + + It("SendValue is a no-op", func() { + err := metrics.SendValue("metric", 42.42, "answers") + + Expect(err).ToNot(HaveOccurred()) + }) + + It("IncrementCounter is a no-op", func() { + err := metrics.IncrementCounter("count") + + Expect(err).ToNot(HaveOccurred()) + }) + + It("AddToCounter is a no-op", func() { + err := metrics.AddToCounter("count", 10) + + Expect(err).ToNot(HaveOccurred()) + }) + + }) }) diff --git a/access_log/file_and_loggregator_access_logger.go b/access_log/file_and_loggregator_access_logger.go index 18827ed84..61ffd485b 100644 --- a/access_log/file_and_loggregator_access_logger.go +++ b/access_log/file_and_loggregator_access_logger.go @@ -4,7 +4,7 @@ import ( "io" "regexp" - "github.com/cloudfoundry/dropsonde/autowire/logs" + "github.com/cloudfoundry/dropsonde/logs" ) type FileAndLoggregatorAccessLogger struct { diff --git a/access_log/file_and_loggregator_access_logger_test.go b/access_log/file_and_loggregator_access_logger_test.go index b6513e153..144710a9c 100644 --- a/access_log/file_and_loggregator_access_logger_test.go +++ b/access_log/file_and_loggregator_access_logger_test.go @@ -1,8 +1,8 @@ package access_log_test import ( - "github.com/cloudfoundry/dropsonde/autowire/logs" "github.com/cloudfoundry/dropsonde/log_sender/fake" + "github.com/cloudfoundry/dropsonde/logs" . "github.com/cloudfoundry/gorouter/access_log" "github.com/cloudfoundry/gorouter/route" "github.com/cloudfoundry/gorouter/test_util" @@ -18,7 +18,7 @@ import ( var _ = Describe("AccessLog", func() { Context("with a dropsonde source instance", func() { - It("logs to dropsonde autowire", func() { + It("logs to dropsonde", func() { fakeLogSender := fake.NewFakeLogSender() logs.Initialize(fakeLogSender) diff --git a/config/config.go b/config/config.go index 1059d2625..c6e436d6f 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,7 @@ import ( steno "github.com/cloudfoundry/gosteno" "io/ioutil" + "strconv" "time" ) @@ -40,10 +41,15 @@ type LoggingConfig struct { Syslog string `yaml:"syslog"` Level string `yaml:"level"` LoggregatorEnabled bool `yaml:"loggregator_enabled"` + MetronAddress string `yaml:"metron_address"` + + // This field is populated by the `Process` function. + JobName string `yaml:"-"` } var defaultLoggingConfig = LoggingConfig{ - Level: "debug", + Level: "debug", + MetronAddress: "localhost:3457", } type Config struct { @@ -53,6 +59,7 @@ type Config struct { Port uint16 `yaml:"port"` Index uint `yaml:"index"` + Zone string `yaml:"zone"` GoMaxProcs int `yaml:"go_max_procs,omitempty"` TraceKey string `yaml:"trace_key"` AccessLog string `yaml:"access_log"` @@ -110,6 +117,7 @@ func (c *Config) Process() { c.PublishActiveAppsInterval = time.Duration(c.PublishActiveAppsIntervalInSeconds) * time.Second c.StartResponseDelayInterval = time.Duration(c.StartResponseDelayIntervalInSeconds) * time.Second c.EndpointTimeout = time.Duration(c.EndpointTimeoutInSeconds) * time.Second + c.Logging.JobName = "router_" + c.Zone + "_" + strconv.Itoa(int(c.Index)) if c.StartResponseDelayInterval > c.DropletStaleThreshold { c.DropletStaleThreshold = c.StartResponseDelayInterval diff --git a/main.go b/main.go index b42afe1de..6f93132e1 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "net/url" + "github.com/cloudfoundry/dropsonde" "github.com/cloudfoundry/gorouter/access_log" vcap "github.com/cloudfoundry/gorouter/common" "github.com/cloudfoundry/gorouter/config" @@ -39,6 +40,8 @@ func main() { c = config.InitConfigFromFile(configFile) } + dropsonde.Initialize(c.Logging.MetronAddress, c.Logging.JobName) + // setup number of procs if c.GoMaxProcs != 0 { runtime.GOMAXPROCS(c.GoMaxProcs) diff --git a/proxy/proxy.go b/proxy/proxy.go index 4ecd595e1..897b57742 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/cloudfoundry/dropsonde/autowire" + "github.com/cloudfoundry/dropsonde" "github.com/cloudfoundry/gorouter/access_log" router_http "github.com/cloudfoundry/gorouter/common/http" "github.com/cloudfoundry/gorouter/route" @@ -180,7 +180,7 @@ func (p *proxy) ServeHTTP(responseWriter http.ResponseWriter, request *http.Requ proxyWriter := newProxyResponseWriter(responseWriter) roundTripper := &proxyRoundTripper{ - transport: autowire.InstrumentedRoundTripper(p.transport), + transport: dropsonde.InstrumentedRoundTripper(p.transport), iter: iter, handler: &handler, diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 6dce69362..1cfab1041 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -11,7 +11,7 @@ import ( "strings" "time" - "github.com/cloudfoundry/dropsonde/autowire" + "github.com/cloudfoundry/dropsonde" "github.com/cloudfoundry/dropsonde/emitter/fake" "github.com/cloudfoundry/dropsonde/events" "github.com/cloudfoundry/gorouter/access_log" @@ -60,6 +60,9 @@ var _ = Describe("Proxy", func() { r = registry.NewRouteRegistry(conf, mbus) + fakeEmitter := fake.NewFakeEventEmitter("fake") + dropsonde.InitializeWithEmitter(fakeEmitter) + accessLogFile = new(test_util.FakeFile) accessLog = access_log.NewFileAndLoggregatorAccessLogger(accessLogFile, "") go accessLog.Run() @@ -510,7 +513,7 @@ var _ = Describe("Proxy", func() { x := dialProxy(proxyServer) fakeEmitter := fake.NewFakeEventEmitter("fake") - autowire.Initialize(fakeEmitter) + dropsonde.InitializeWithEmitter(fakeEmitter) req := x.NewRequest("GET", "/", nil) req.Host = "app" diff --git a/router/router.go b/router/router.go index e87dfcdde..346fa5687 100644 --- a/router/router.go +++ b/router/router.go @@ -2,7 +2,7 @@ package router import ( "github.com/apcera/nats" - "github.com/cloudfoundry/dropsonde/autowire" + "github.com/cloudfoundry/dropsonde" vcap "github.com/cloudfoundry/gorouter/common" "github.com/cloudfoundry/gorouter/config" "github.com/cloudfoundry/gorouter/proxy" @@ -112,7 +112,7 @@ func (r *Router) Run() <-chan error { } server := http.Server{ - Handler: autowire.InstrumentedHandler(r.proxy), + Handler: dropsonde.InstrumentedHandler(r.proxy), } errChan := make(chan error, 1) diff --git a/router/router_test.go b/router/router_test.go index fa58ee4de..c530febf9 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -2,6 +2,8 @@ package router_test import ( "github.com/apcera/nats" + "github.com/cloudfoundry/dropsonde" + "github.com/cloudfoundry/dropsonde/emitter/fake" "github.com/cloudfoundry/gorouter/access_log" vcap "github.com/cloudfoundry/gorouter/common" cfg "github.com/cloudfoundry/gorouter/config" @@ -43,6 +45,9 @@ var _ = Describe("Router", func() { natsRunner = natsrunner.NewNATSRunner(int(natsPort)) natsRunner.Start() + fakeEmitter := fake.NewFakeEventEmitter("fake") + dropsonde.InitializeWithEmitter(fakeEmitter) + proxyPort := test_util.NextAvailPort() statusPort := test_util.NextAvailPort() diff --git a/scripts/test b/scripts/test index c3bc981f1..4f9ab6a37 100755 --- a/scripts/test +++ b/scripts/test @@ -2,9 +2,6 @@ set -e -x -u - -export DROPSONDE_ORIGIN="gorouter_test/0" - function printStatus { if [ $? -eq 0 ]; then echo -e "\nSWEET SUITE SUCCESS" diff --git a/test_util/helpers.go b/test_util/helpers.go index 4af0936d0..ff4565039 100644 --- a/test_util/helpers.go +++ b/test_util/helpers.go @@ -21,6 +21,7 @@ func SpecConfig(natsPort, statusPort, proxyPort uint16) *config.Config { c.PruneStaleDropletsInterval = 0 c.DropletStaleThreshold = 0 c.PublishActiveAppsInterval = 0 + c.Zone = "z1" c.EndpointTimeout = 500 * time.Millisecond @@ -40,8 +41,10 @@ func SpecConfig(natsPort, statusPort, proxyPort uint16) *config.Config { } c.Logging = config.LoggingConfig{ - File: "/dev/stdout", - Level: "info", + File: "/dev/stdout", + Level: "info", + MetronAddress: "localhost:3457", + JobName: "router_test_z1_0", } return c