From 6cb478ce00988acc61a3eb4b2aca4351e1382d25 Mon Sep 17 00:00:00 2001 From: Gabor Lekeny Date: Fri, 13 Dec 2024 12:49:31 +0100 Subject: [PATCH] Add order imbalance stock messages Imbalance messages are sent during Limit-up Limit-down trading halts on the stock market. The client automatically subscribes to imbalances with trading statuses. --- marketdata/stream/client.go | 1 + marketdata/stream/client_test.go | 20 +++ marketdata/stream/entities.go | 8 ++ marketdata/stream/entities_easyjson.go | 161 +++++++++++++++++++------ marketdata/stream/flow.go | 2 + marketdata/stream/flow_test.go | 10 +- marketdata/stream/message.go | 52 ++++++++ marketdata/stream/message_test.go | 37 ++++++ marketdata/stream/options.go | 11 ++ marketdata/stream/options_test.go | 3 + marketdata/stream/subscription.go | 23 +++- 11 files changed, 288 insertions(+), 40 deletions(-) diff --git a/marketdata/stream/client.go b/marketdata/stream/client.go index 8172f29..f3ea477 100644 --- a/marketdata/stream/client.go +++ b/marketdata/stream/client.go @@ -115,6 +115,7 @@ func (sc *StocksClient) configure(o stockOptions) { sc.handler.updatedBarHandler = o.updatedBarHandler sc.handler.dailyBarHandler = o.dailyBarHandler sc.handler.tradingStatusHandler = o.tradingStatusHandler + sc.handler.imbalanceHandler = o.imbalanceHandler sc.handler.luldHandler = o.luldHandler sc.handler.cancelErrorHandler = o.cancelErrorHandler sc.handler.correctionHandler = o.correctionHandler diff --git a/marketdata/stream/client_test.go b/marketdata/stream/client_test.go index dca4332..d371252 100644 --- a/marketdata/stream/client_test.go +++ b/marketdata/stream/client_test.go @@ -994,6 +994,7 @@ func TestCoreFunctionalityStocks(t *testing.T) { updatedBars := make(chan Bar, 10) dailyBars := make(chan Bar, 10) tradingStatuses := make(chan TradingStatus, 10) + imbalances := make(chan Imbalance, 10) lulds := make(chan LULD, 10) cancelErrors := make(chan TradeCancelError, 10) corrections := make(chan TradeCorrection, 10) @@ -1004,6 +1005,7 @@ func TestCoreFunctionalityStocks(t *testing.T) { WithUpdatedBars(func(b Bar) { updatedBars <- b }, "ALPACA"), WithDailyBars(func(b Bar) { dailyBars <- b }, "LPACA"), WithStatuses(func(ts TradingStatus) { tradingStatuses <- ts }, "ALPACA"), + WithImbalances(func(i Imbalance) { imbalances <- i }, "ALPACA"), WithLULDs(func(l LULD) { lulds <- l }, "ALPACA"), WithCancelErrors(func(tce TradeCancelError) { cancelErrors <- tce }), WithCorrections(func(tc TradeCorrection) { corrections <- tc }), @@ -1066,6 +1068,15 @@ func TestCoreFunctionalityStocks(t *testing.T) { Tape: "C", }, }) + // sending an order imbalance + connection.readCh <- serializeToMsgpack(t, []interface{}{ + imbalanceWithT{ + Type: "i", + Symbol: "ALPACA", + Price: 123.456, + Tape: "C", + }, + }) // sending a LULD connection.readCh <- serializeToMsgpack(t, []interface{}{ luldWithT{ @@ -1152,6 +1163,13 @@ func TestCoreFunctionalityStocks(t *testing.T) { require.Fail(t, "no trading status received in time") } + select { + case oi := <-imbalances: + assert.EqualValues(t, 123.456, oi.Price) + case <-time.After(time.Second): + require.Fail(t, "no imbalance received in time") + } + select { case l := <-lulds: assert.EqualValues(t, 42.1789, l.LimitUpPrice) @@ -1502,6 +1520,7 @@ func writeInitialFlowMessagesToConn( UpdatedBars: sub.updatedBars, DailyBars: sub.dailyBars, Statuses: sub.statuses, + Imbalances: sub.imbalances, LULDs: sub.lulds, CancelErrors: sub.trades, // Subscribe automatically. Corrections: sub.trades, // Subscribe automatically. @@ -1533,6 +1552,7 @@ func checkInitialMessagesSentByClient( require.ElementsMatch(t, sub.updatedBars, s["updatedBars"]) require.ElementsMatch(t, sub.dailyBars, s["dailyBars"]) require.ElementsMatch(t, sub.statuses, s["statuses"]) + require.ElementsMatch(t, sub.imbalances, s["statuses"]) require.ElementsMatch(t, sub.lulds, s["lulds"]) require.NotContains(t, s, "cancelErrors") require.NotContains(t, s, "corrections") diff --git a/marketdata/stream/entities.go b/marketdata/stream/entities.go index 6e6d556..85852f5 100644 --- a/marketdata/stream/entities.go +++ b/marketdata/stream/entities.go @@ -59,6 +59,14 @@ type TradingStatus struct { Tape string } +// Imbalance is an order imbalance message during LULD halts for a security +type Imbalance struct { + Symbol string + Price float64 + Timestamp time.Time + Tape string +} + // LULD is a Limit Up Limit Down message type LULD struct { Symbol string diff --git a/marketdata/stream/entities_easyjson.go b/marketdata/stream/entities_easyjson.go index f261dde..2b067f2 100644 --- a/marketdata/stream/entities_easyjson.go +++ b/marketdata/stream/entities_easyjson.go @@ -1312,7 +1312,96 @@ func (v *LULD) UnmarshalJSON(data []byte) error { func (v *LULD) UnmarshalEasyJSON(l *jlexer.Lexer) { easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream9(l, v) } -func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(in *jlexer.Lexer, out *CryptoTrade) { +func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(in *jlexer.Lexer, out *Imbalance) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "symbol": + out.Symbol = string(in.String()) + case "price": + out.Price = float64(in.Float64()) + case "timestamp": + if data := in.Raw(); in.Ok() { + in.AddError((out.Timestamp).UnmarshalJSON(data)) + } + case "tape": + out.Tape = string(in.String()) + default: + in.SkipRecursive() + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} +func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(out *jwriter.Writer, in Imbalance) { + out.RawByte('{') + first := true + _ = first + { + const prefix string = ",\"symbol\":" + out.RawString(prefix[1:]) + out.String(string(in.Symbol)) + } + { + const prefix string = ",\"price\":" + out.RawString(prefix) + out.Float64(float64(in.Price)) + } + { + const prefix string = ",\"timestamp\":" + out.RawString(prefix) + out.Raw((in.Timestamp).MarshalJSON()) + } + { + const prefix string = ",\"tape\":" + out.RawString(prefix) + out.String(string(in.Tape)) + } + out.RawByte('}') +} + +// MarshalJSON supports json.Marshaler interface +func (v Imbalance) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(&w, v) + return w.Buffer.BuildBytes(), w.Error +} + +// MarshalEasyJSON supports easyjson.Marshaler interface +func (v Imbalance) MarshalEasyJSON(w *jwriter.Writer) { + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(w, v) +} + +// UnmarshalJSON supports json.Unmarshaler interface +func (v *Imbalance) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(&r, v) + return r.Error() +} + +// UnmarshalEasyJSON supports easyjson.Unmarshaler interface +func (v *Imbalance) UnmarshalEasyJSON(l *jlexer.Lexer) { + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(l, v) +} +func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(in *jlexer.Lexer, out *CryptoTrade) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -1357,7 +1446,7 @@ func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10 in.Consumed() } } -func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(out *jwriter.Writer, in CryptoTrade) { +func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(out *jwriter.Writer, in CryptoTrade) { out.RawByte('{') first := true _ = first @@ -1402,27 +1491,27 @@ func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10 // MarshalJSON supports json.Marshaler interface func (v CryptoTrade) MarshalJSON() ([]byte, error) { w := jwriter.Writer{} - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(&w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(&w, v) return w.Buffer.BuildBytes(), w.Error } // MarshalEasyJSON supports easyjson.Marshaler interface func (v CryptoTrade) MarshalEasyJSON(w *jwriter.Writer) { - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(w, v) } // UnmarshalJSON supports json.Unmarshaler interface func (v *CryptoTrade) UnmarshalJSON(data []byte) error { r := jlexer.Lexer{Data: data} - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(&r, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(&r, v) return r.Error() } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *CryptoTrade) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream10(l, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(l, v) } -func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(in *jlexer.Lexer, out *CryptoQuote) { +func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(in *jlexer.Lexer, out *CryptoQuote) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -1467,7 +1556,7 @@ func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11 in.Consumed() } } -func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(out *jwriter.Writer, in CryptoQuote) { +func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(out *jwriter.Writer, in CryptoQuote) { out.RawByte('{') first := true _ = first @@ -1512,27 +1601,27 @@ func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11 // MarshalJSON supports json.Marshaler interface func (v CryptoQuote) MarshalJSON() ([]byte, error) { w := jwriter.Writer{} - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(&w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(&w, v) return w.Buffer.BuildBytes(), w.Error } // MarshalEasyJSON supports easyjson.Marshaler interface func (v CryptoQuote) MarshalEasyJSON(w *jwriter.Writer) { - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(w, v) } // UnmarshalJSON supports json.Unmarshaler interface func (v *CryptoQuote) UnmarshalJSON(data []byte) error { r := jlexer.Lexer{Data: data} - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(&r, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(&r, v) return r.Error() } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *CryptoQuote) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream11(l, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(l, v) } -func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(in *jlexer.Lexer, out *CryptoOrderbookEntry) { +func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(in *jlexer.Lexer, out *CryptoOrderbookEntry) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -1565,7 +1654,7 @@ func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12 in.Consumed() } } -func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(out *jwriter.Writer, in CryptoOrderbookEntry) { +func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(out *jwriter.Writer, in CryptoOrderbookEntry) { out.RawByte('{') first := true _ = first @@ -1585,27 +1674,27 @@ func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12 // MarshalJSON supports json.Marshaler interface func (v CryptoOrderbookEntry) MarshalJSON() ([]byte, error) { w := jwriter.Writer{} - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(&w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(&w, v) return w.Buffer.BuildBytes(), w.Error } // MarshalEasyJSON supports easyjson.Marshaler interface func (v CryptoOrderbookEntry) MarshalEasyJSON(w *jwriter.Writer) { - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(w, v) } // UnmarshalJSON supports json.Unmarshaler interface func (v *CryptoOrderbookEntry) UnmarshalJSON(data []byte) error { r := jlexer.Lexer{Data: data} - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(&r, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(&r, v) return r.Error() } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *CryptoOrderbookEntry) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream12(l, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(l, v) } -func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(in *jlexer.Lexer, out *CryptoOrderbook) { +func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(in *jlexer.Lexer, out *CryptoOrderbook) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -1690,7 +1779,7 @@ func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13 in.Consumed() } } -func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(out *jwriter.Writer, in CryptoOrderbook) { +func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(out *jwriter.Writer, in CryptoOrderbook) { out.RawByte('{') first := true _ = first @@ -1752,27 +1841,27 @@ func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13 // MarshalJSON supports json.Marshaler interface func (v CryptoOrderbook) MarshalJSON() ([]byte, error) { w := jwriter.Writer{} - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(&w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(&w, v) return w.Buffer.BuildBytes(), w.Error } // MarshalEasyJSON supports easyjson.Marshaler interface func (v CryptoOrderbook) MarshalEasyJSON(w *jwriter.Writer) { - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(w, v) } // UnmarshalJSON supports json.Unmarshaler interface func (v *CryptoOrderbook) UnmarshalJSON(data []byte) error { r := jlexer.Lexer{Data: data} - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(&r, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(&r, v) return r.Error() } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *CryptoOrderbook) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream13(l, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(l, v) } -func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(in *jlexer.Lexer, out *CryptoBar) { +func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(in *jlexer.Lexer, out *CryptoBar) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -1823,7 +1912,7 @@ func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14 in.Consumed() } } -func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(out *jwriter.Writer, in CryptoBar) { +func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(out *jwriter.Writer, in CryptoBar) { out.RawByte('{') first := true _ = first @@ -1883,27 +1972,27 @@ func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14 // MarshalJSON supports json.Marshaler interface func (v CryptoBar) MarshalJSON() ([]byte, error) { w := jwriter.Writer{} - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(&w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(&w, v) return w.Buffer.BuildBytes(), w.Error } // MarshalEasyJSON supports easyjson.Marshaler interface func (v CryptoBar) MarshalEasyJSON(w *jwriter.Writer) { - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(w, v) } // UnmarshalJSON supports json.Unmarshaler interface func (v *CryptoBar) UnmarshalJSON(data []byte) error { r := jlexer.Lexer{Data: data} - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(&r, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(&r, v) return r.Error() } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *CryptoBar) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream14(l, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(l, v) } -func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(in *jlexer.Lexer, out *Bar) { +func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream16(in *jlexer.Lexer, out *Bar) { isTopLevel := in.IsStart() if in.IsNull() { if isTopLevel { @@ -1952,7 +2041,7 @@ func easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15 in.Consumed() } } -func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(out *jwriter.Writer, in Bar) { +func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream16(out *jwriter.Writer, in Bar) { out.RawByte('{') first := true _ = first @@ -2007,23 +2096,23 @@ func easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15 // MarshalJSON supports json.Marshaler interface func (v Bar) MarshalJSON() ([]byte, error) { w := jwriter.Writer{} - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(&w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream16(&w, v) return w.Buffer.BuildBytes(), w.Error } // MarshalEasyJSON supports easyjson.Marshaler interface func (v Bar) MarshalEasyJSON(w *jwriter.Writer) { - easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(w, v) + easyjson3e8ab7adEncodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream16(w, v) } // UnmarshalJSON supports json.Unmarshaler interface func (v *Bar) UnmarshalJSON(data []byte) error { r := jlexer.Lexer{Data: data} - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(&r, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream16(&r, v) return r.Error() } // UnmarshalEasyJSON supports easyjson.Unmarshaler interface func (v *Bar) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream15(l, v) + easyjson3e8ab7adDecodeGithubComAlpacahqAlpacaTradeApiGoV3MarketdataStream16(l, v) } diff --git a/marketdata/stream/flow.go b/marketdata/stream/flow.go index a9eeff1..97347d8 100644 --- a/marketdata/stream/flow.go +++ b/marketdata/stream/flow.go @@ -178,6 +178,7 @@ func (c *client) readSubResponse(ctx context.Context) error { UpdatedBars []string `msgpack:"updatedBars"` DailyBars []string `msgpack:"dailyBars"` Statuses []string `msgpack:"statuses"` + Imbalances []string `msgpack:"imbalances"` LULDs []string `msgpack:"lulds"` CancelErrors []string `msgpack:"cancelErrors"` Corrections []string `msgpack:"corrections"` @@ -207,6 +208,7 @@ func (c *client) readSubResponse(ctx context.Context) error { c.sub.updatedBars = resp.UpdatedBars c.sub.dailyBars = resp.DailyBars c.sub.statuses = resp.Statuses + c.sub.imbalances = resp.Imbalances c.sub.lulds = resp.LULDs c.sub.cancelErrors = resp.CancelErrors c.sub.corrections = resp.Corrections diff --git a/marketdata/stream/flow_test.go b/marketdata/stream/flow_test.go index ed3dc59..ac7a3a8 100644 --- a/marketdata/stream/flow_test.go +++ b/marketdata/stream/flow_test.go @@ -124,6 +124,7 @@ func TestInitializeAuthRetrySucceeds(t *testing.T) { updatedBars := []string{"AAPL"} dailyBars := []string{"CLDR"} statuses := []string{"*"} + imbalances := []string{"PACA"} lulds := []string{"AL", "PACA", "ALP"} c := NewStocksClient( marketdata.SIP, @@ -134,6 +135,7 @@ func TestInitializeAuthRetrySucceeds(t *testing.T) { WithUpdatedBars(func(_ Bar) {}, updatedBars...), WithDailyBars(func(_ Bar) {}, dailyBars...), WithStatuses(func(_ TradingStatus) {}, statuses...), + WithImbalances(func(_ Imbalance) {}, imbalances...), WithLULDs(func(_ LULD) {}, lulds...), ) c.conn = conn @@ -191,9 +193,10 @@ func TestInitializeAuthRetrySucceeds(t *testing.T) { "updatedBars": updatedBars, "dailyBars": dailyBars, "statuses": statuses, + "imbalances": imbalances, "lulds": lulds, - "cancelErrors": trades, // Subscribed automatically. - "corrections": trades, // Subscribed automatically. + "cancelErrors": trades, // Subscribed automatically with trades. + "corrections": trades, // Subscribed automatically with trades. }, }) @@ -204,6 +207,7 @@ func TestInitializeAuthRetrySucceeds(t *testing.T) { assert.ElementsMatch(t, updatedBars, c.sub.updatedBars) assert.ElementsMatch(t, dailyBars, c.sub.dailyBars) assert.ElementsMatch(t, statuses, c.sub.statuses) + assert.ElementsMatch(t, imbalances, c.sub.imbalances) assert.ElementsMatch(t, lulds, c.sub.lulds) assert.ElementsMatch(t, trades, c.sub.cancelErrors) assert.ElementsMatch(t, trades, c.sub.corrections) @@ -229,6 +233,7 @@ func TestInitializeAuthRetrySucceeds(t *testing.T) { assert.ElementsMatch(t, updatedBars, sub["updatedBars"]) assert.ElementsMatch(t, dailyBars, sub["dailyBars"]) assert.ElementsMatch(t, statuses, sub["statuses"]) + assert.ElementsMatch(t, imbalances, sub["imbalances"]) assert.ElementsMatch(t, lulds, sub["lulds"]) assert.NotContains(t, sub, "cancelErrors") assert.NotContains(t, sub, "corrections") @@ -626,6 +631,7 @@ func TestWriteSubContents(t *testing.T) { UpdatedBars []string `msgpack:"updatedBars"` DailyBars []string `msgpack:"dailyBars"` Statuses []string `msgpack:"statuses"` + Imbalances []string `msgpack:"imbalances"` LULDs []string `msgpack:"lulds"` } err = msgpack.Unmarshal(msg, &got) diff --git a/marketdata/stream/message.go b/marketdata/stream/message.go index a0ab5c7..5d814af 100644 --- a/marketdata/stream/message.go +++ b/marketdata/stream/message.go @@ -15,6 +15,7 @@ type msgHandler interface { handleUpdatedBar(d *msgpack.Decoder, n int) error handleDailyBar(d *msgpack.Decoder, n int) error handleTradingStatus(d *msgpack.Decoder, n int) error + handleImbalance(d *msgpack.Decoder, n int) error handleLULD(d *msgpack.Decoder, n int) error handleCancelError(d *msgpack.Decoder, n int) error handleCorrection(d *msgpack.Decoder, n int) error @@ -81,6 +82,8 @@ func (c *client) handleMessageType(msgType string, d *msgpack.Decoder, n int) er return c.handler.handleDailyBar(d, n) case "s": return c.handler.handleTradingStatus(d, n) + case "i": + return c.handler.handleImbalance(d, n) case "l": return c.handler.handleLULD(d, n) case "x": @@ -108,6 +111,7 @@ type stocksMsgHandler struct { updatedBarHandler func(bar Bar) dailyBarHandler func(bar Bar) tradingStatusHandler func(ts TradingStatus) + imbalanceHandler func(ts Imbalance) luldHandler func(luld LULD) cancelErrorHandler func(tce TradeCancelError) correctionHandler func(tc TradeCorrection) @@ -303,6 +307,36 @@ func (h *stocksMsgHandler) handleTradingStatus(d *msgpack.Decoder, n int) error return nil } +func (h *stocksMsgHandler) handleImbalance(d *msgpack.Decoder, n int) error { + oi := Imbalance{} + for i := 0; i < n; i++ { + key, err := d.DecodeString() + if err != nil { + return err + } + switch key { + case "S": + oi.Symbol, err = d.DecodeString() + case "p": + oi.Price, err = d.DecodeFloat64() + case "t": + oi.Timestamp, err = d.DecodeTime() + case "z": + oi.Tape, err = d.DecodeString() + default: + err = d.Skip() + } + if err != nil { + return err + } + } + h.mu.RLock() + handler := h.imbalanceHandler + h.mu.RUnlock() + handler(oi) + return nil +} + func (h *stocksMsgHandler) handleLULD(d *msgpack.Decoder, n int) error { luld := LULD{} for i := 0; i < n; i++ { @@ -628,6 +662,11 @@ func (h *cryptoMsgHandler) handleTradingStatus(d *msgpack.Decoder, n int) error return discardMapContents(d, n) } +func (h *cryptoMsgHandler) handleImbalance(d *msgpack.Decoder, n int) error { + // should not happen! + return discardMapContents(d, n) +} + func (h *cryptoMsgHandler) handleLULD(d *msgpack.Decoder, n int) error { // should not happen! return discardMapContents(d, n) @@ -750,6 +789,11 @@ func (h *optionsMsgHandler) handleTradingStatus(d *msgpack.Decoder, n int) error return discardMapContents(d, n) } +func (h *optionsMsgHandler) handleImbalance(d *msgpack.Decoder, n int) error { + // should not happen! + return discardMapContents(d, n) +} + func (h *optionsMsgHandler) handleLULD(d *msgpack.Decoder, n int) error { // should not happen! return discardMapContents(d, n) @@ -812,6 +856,11 @@ func (h *newsMsgHandler) handleTradingStatus(d *msgpack.Decoder, n int) error { return discardMapContents(d, n) } +func (h *newsMsgHandler) handleImbalance(d *msgpack.Decoder, n int) error { + // should not happen! + return discardMapContents(d, n) +} + func (h *newsMsgHandler) handleLULD(d *msgpack.Decoder, n int) error { // should not happen! return discardMapContents(d, n) @@ -934,6 +983,7 @@ var subMessageHandler = func(c *client, s subscriptions) error { c.sub.updatedBars = s.updatedBars c.sub.dailyBars = s.dailyBars c.sub.statuses = s.statuses + c.sub.imbalances = s.imbalances c.sub.lulds = s.lulds c.sub.cancelErrors = s.cancelErrors c.sub.corrections = s.corrections @@ -968,6 +1018,8 @@ func (c *client) handleSubscriptionMessage(d *msgpack.Decoder, n int) error { s.dailyBars, err = decodeStringSlice(d) case "statuses": s.statuses, err = decodeStringSlice(d) + case "imbalances": + s.imbalances, err = decodeStringSlice(d) case "lulds": s.lulds, err = decodeStringSlice(d) case "cancelErrors": diff --git a/marketdata/stream/message_test.go b/marketdata/stream/message_test.go index 5a85110..93988f8 100644 --- a/marketdata/stream/message_test.go +++ b/marketdata/stream/message_test.go @@ -73,6 +73,17 @@ type tradingStatusWithT struct { NewField uint64 `msgpack:"n"` } +// tradingStatusWithT is the incoming trading status message that also contains the T type key +type imbalanceWithT struct { + Type string `msgpack:"T"` + Symbol string `msgpack:"S"` + Price float64 `msgpack:"p"` + Timestamp time.Time `msgpack:"t"` + Tape string `msgpack:"z"` + // NewField is for testing correct handling of added fields in the future + NewField uint64 `msgpack:"n"` +} + // luldWithT is the incoming LULD message that also contains the T type key type luldWithT struct { Type string `json:"T" msgpack:"T"` @@ -253,6 +264,7 @@ type subWithT struct { UpdatedBars []string `msgpack:"updatedBars"` DailyBars []string `msgpack:"dailyBars"` Statuses []string `msgpack:"statuses"` + Imbalances []string `msgpack:"imbalances"` LULDs []string `msgpack:"lulds"` Corrections []string `msgpack:"corrections"` CancelErrors []string `msgpack:"cancelErrors"` @@ -332,6 +344,14 @@ var testTradingStatus = tradingStatusWithT{ Tape: "C", } +var testImbalance = imbalanceWithT{ + Type: "i", + Symbol: "BIIB", + Price: 100.2, + Timestamp: time.Date(2021, 3, 5, 16, 0, 0, 0, time.UTC), + Tape: "C", +} + var testLULD = luldWithT{ Type: "l", Symbol: "TEST", @@ -451,6 +471,8 @@ var testSubMessage1 = subWithT{ Trades: []string{"ALPACA"}, Quotes: []string{}, Bars: []string{}, + Statuses: []string{"ALPACA"}, + Imbalances: []string{"ALPACA"}, CancelErrors: []string{"ALPACA"}, Corrections: []string{"ALPACA"}, } @@ -461,6 +483,8 @@ var testSubMessage2 = subWithT{ Quotes: []string{"AL", "PACA"}, Bars: []string{"ALP", "ACA"}, DailyBars: []string{"LPACA"}, + Statuses: []string{"AL", "PACA"}, + Imbalances: []string{"AL", "PACA"}, CancelErrors: []string{"ALPACA"}, Corrections: []string{"ALPACA"}, } @@ -470,6 +494,7 @@ func TestHandleMessagesStocks(t *testing.T) { testOther, testTrade, testTradingStatus, + testImbalance, testQuote, testBar, testUpdatedBar, @@ -525,6 +550,10 @@ func TestHandleMessagesStocks(t *testing.T) { h.tradingStatusHandler = func(ts TradingStatus) { tradingStatus = ts } + var imbalance Imbalance + h.imbalanceHandler = func(oi Imbalance) { + imbalance = oi + } var luld LULD h.luldHandler = func(l LULD) { luld = l @@ -560,6 +589,12 @@ func TestHandleMessagesStocks(t *testing.T) { assert.True(t, testTradingStatus.Timestamp.Equal(tradingStatus.Timestamp)) assert.Equal(t, testTradingStatus.Tape, tradingStatus.Tape) + // Verify stock imbalance. + assert.Equal(t, testImbalance.Symbol, imbalance.Symbol) + assert.Equal(t, testImbalance.Price, imbalance.Price) + assert.True(t, testImbalance.Timestamp.Equal(imbalance.Timestamp)) + assert.Equal(t, testImbalance.Tape, imbalance.Tape) + // Verify stock luld. assert.Equal(t, testLULD.Symbol, luld.Symbol) assert.EqualValues(t, testLULD.LimitUpPrice, luld.LimitUpPrice) @@ -635,12 +670,14 @@ func TestHandleMessagesStocks(t *testing.T) { assert.EqualValues(t, testSubMessage1.Bars, subscriptionMessages[0].bars) assert.EqualValues(t, testSubMessage1.CancelErrors, subscriptionMessages[0].cancelErrors) assert.EqualValues(t, testSubMessage1.Corrections, subscriptionMessages[0].corrections) + assert.EqualValues(t, testSubMessage1.Imbalances, subscriptionMessages[0].imbalances) assert.EqualValues(t, testSubMessage2.Trades, subscriptionMessages[1].trades) assert.EqualValues(t, testSubMessage2.Quotes, subscriptionMessages[1].quotes) assert.EqualValues(t, testSubMessage2.Bars, subscriptionMessages[1].bars) assert.EqualValues(t, testSubMessage2.DailyBars, subscriptionMessages[1].dailyBars) assert.EqualValues(t, testSubMessage2.CancelErrors, subscriptionMessages[1].cancelErrors) assert.EqualValues(t, testSubMessage2.Corrections, subscriptionMessages[1].corrections) + assert.EqualValues(t, testSubMessage2.Imbalances, subscriptionMessages[1].imbalances) } func TestHandleMessagesCrypto(t *testing.T) { diff --git a/marketdata/stream/options.go b/marketdata/stream/options.go index 94a96cf..b21c583 100644 --- a/marketdata/stream/options.go +++ b/marketdata/stream/options.go @@ -176,6 +176,7 @@ type stockOptions struct { updatedBarHandler func(Bar) dailyBarHandler func(Bar) tradingStatusHandler func(TradingStatus) + imbalanceHandler func(Imbalance) luldHandler func(LULD) cancelErrorHandler func(TradeCancelError) correctionHandler func(TradeCorrection) @@ -206,6 +207,7 @@ func defaultStockOptions() *stockOptions { updatedBars: []string{}, dailyBars: []string{}, statuses: []string{}, + imbalances: []string{}, lulds: []string{}, cancelErrors: []string{}, corrections: []string{}, @@ -218,6 +220,7 @@ func defaultStockOptions() *stockOptions { updatedBarHandler: func(_ Bar) {}, dailyBarHandler: func(_ Bar) {}, tradingStatusHandler: func(_ TradingStatus) {}, + imbalanceHandler: func(_ Imbalance) {}, luldHandler: func(_ LULD) {}, cancelErrorHandler: func(_ TradeCancelError) {}, correctionHandler: func(_ TradeCorrection) {}, @@ -292,6 +295,14 @@ func WithStatuses(handler func(TradingStatus), symbols ...string) StockOption { }) } +// WithImbalances configures initial imbalance handler. +func WithImbalances(handler func(Imbalance), symbols ...string) StockOption { + return newFuncStockOption(func(o *stockOptions) { + o.sub.imbalances = symbols + o.imbalanceHandler = handler + }) +} + // WithLULDs configures initial LULD symbols to subscribe to and the handler func WithLULDs(handler func(LULD), symbols ...string) StockOption { return newFuncStockOption(func(o *stockOptions) { diff --git a/marketdata/stream/options_test.go b/marketdata/stream/options_test.go index 2612f5f..3082a33 100644 --- a/marketdata/stream/options_test.go +++ b/marketdata/stream/options_test.go @@ -44,6 +44,7 @@ func TestDefaultOptions(t *testing.T) { assert.EqualValues(t, []string{}, o.sub.updatedBars) assert.EqualValues(t, []string{}, o.sub.dailyBars) assert.EqualValues(t, []string{}, o.sub.statuses) + assert.EqualValues(t, []string{}, o.sub.imbalances) assert.EqualValues(t, []string{}, o.sub.lulds) assert.EqualValues(t, []string{}, o.sub.cancelErrors) assert.EqualValues(t, []string{}, o.sub.corrections) @@ -71,6 +72,7 @@ func TestConfigureStocks(t *testing.T) { WithDailyBars(func(_ Bar) {}, "LPACA"), WithStatuses(func(_ TradingStatus) {}, "ALPACA"), WithLULDs(func(_ LULD) {}, "ALPA", "CA"), + WithImbalances(func(_ Imbalance) {}, "ALPACA"), WithCancelErrors(func(_ TradeCancelError) {}), WithCorrections(func(_ TradeCorrection) {}), ) @@ -90,6 +92,7 @@ func TestConfigureStocks(t *testing.T) { assert.EqualValues(t, []string{"LPACA"}, c.sub.dailyBars) assert.EqualValues(t, []string{"ALPACA"}, c.sub.statuses) assert.EqualValues(t, []string{"ALPA", "CA"}, c.sub.lulds) + assert.EqualValues(t, []string{"ALPACA"}, c.sub.imbalances) assert.EqualValues(t, []string{}, c.sub.cancelErrors) assert.EqualValues(t, []string{}, c.sub.corrections) // NOTE: function equality can not be tested well diff --git a/marketdata/stream/subscription.go b/marketdata/stream/subscription.go index 57b019a..8d5a297 100644 --- a/marketdata/stream/subscription.go +++ b/marketdata/stream/subscription.go @@ -53,6 +53,13 @@ func (sc *StocksClient) SubscribeToStatuses(handler func(TradingStatus), symbols return sc.client.handleSubChange(true, subscriptions{statuses: symbols}) } +func (sc *StocksClient) SubscribeToImbalances(handler func(Imbalance), symbols ...string) error { + sc.handler.mu.Lock() + sc.handler.imbalanceHandler = handler + sc.handler.mu.Unlock() + return sc.client.handleSubChange(true, subscriptions{imbalances: symbols}) +} + func (sc *StocksClient) SubscribeToLULDs(handler func(LULD), symbols ...string) error { sc.handler.mu.Lock() sc.handler.luldHandler = handler @@ -96,10 +103,20 @@ func (sc *StocksClient) UnsubscribeFromStatuses(symbols ...string) error { return sc.handleSubChange(false, subscriptions{statuses: symbols}) } +func (sc *StocksClient) UnsubscribeFromImbalances(symbols ...string) error { + return sc.handleSubChange(false, subscriptions{imbalances: symbols}) +} + func (sc *StocksClient) UnsubscribeFromLULDs(symbols ...string) error { return sc.handleSubChange(false, subscriptions{lulds: symbols}) } +func (sc *StocksClient) UnregisterImbalances() { + sc.handler.mu.Lock() + sc.handler.imbalanceHandler = func(Imbalance) {} + sc.handler.mu.Unlock() +} + func (sc *StocksClient) UnregisterCancelErrors() { sc.handler.mu.Lock() sc.handler.cancelErrorHandler = func(TradeCancelError) {} @@ -218,9 +235,10 @@ type subscriptions struct { updatedBars []string dailyBars []string statuses []string + imbalances []string lulds []string - cancelErrors []string // Subscribed automatically. - corrections []string // Subscribed automatically. + cancelErrors []string // Subscribed automatically with trades. + corrections []string // Subscribed automatically with trades. orderbooks []string news []string } @@ -301,6 +319,7 @@ func getSubChangeMessage(subscribe bool, changes subscriptions) ([]byte, error) "updatedBars": changes.updatedBars, "dailyBars": changes.dailyBars, "statuses": changes.statuses, + "imbalances": changes.imbalances, "lulds": changes.lulds, "orderbooks": changes.orderbooks, "news": changes.news,