Skip to content

Commit

Permalink
Add order imbalance stock messages
Browse files Browse the repository at this point in the history
Imbalance messages are sent during Limit-up Limit-down trading halts on the
stock market. The client automatically subscribes to imbalances with trading
statuses.
  • Loading branch information
leki75 committed Dec 14, 2024
1 parent 4ca59cf commit 6cb478c
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 40 deletions.
1 change: 1 addition & 0 deletions marketdata/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (sc *StocksClient) configure(o stockOptions) {
sc.handler.updatedBarHandler = o.updatedBarHandler
sc.handler.dailyBarHandler = o.dailyBarHandler
sc.handler.tradingStatusHandler = o.tradingStatusHandler
sc.handler.imbalanceHandler = o.imbalanceHandler
sc.handler.luldHandler = o.luldHandler
sc.handler.cancelErrorHandler = o.cancelErrorHandler
sc.handler.correctionHandler = o.correctionHandler
Expand Down
20 changes: 20 additions & 0 deletions marketdata/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ func TestCoreFunctionalityStocks(t *testing.T) {
updatedBars := make(chan Bar, 10)
dailyBars := make(chan Bar, 10)
tradingStatuses := make(chan TradingStatus, 10)
imbalances := make(chan Imbalance, 10)
lulds := make(chan LULD, 10)
cancelErrors := make(chan TradeCancelError, 10)
corrections := make(chan TradeCorrection, 10)
Expand All @@ -1004,6 +1005,7 @@ func TestCoreFunctionalityStocks(t *testing.T) {
WithUpdatedBars(func(b Bar) { updatedBars <- b }, "ALPACA"),
WithDailyBars(func(b Bar) { dailyBars <- b }, "LPACA"),
WithStatuses(func(ts TradingStatus) { tradingStatuses <- ts }, "ALPACA"),
WithImbalances(func(i Imbalance) { imbalances <- i }, "ALPACA"),
WithLULDs(func(l LULD) { lulds <- l }, "ALPACA"),
WithCancelErrors(func(tce TradeCancelError) { cancelErrors <- tce }),
WithCorrections(func(tc TradeCorrection) { corrections <- tc }),
Expand Down Expand Up @@ -1066,6 +1068,15 @@ func TestCoreFunctionalityStocks(t *testing.T) {
Tape: "C",
},
})
// sending an order imbalance
connection.readCh <- serializeToMsgpack(t, []interface{}{
imbalanceWithT{
Type: "i",
Symbol: "ALPACA",
Price: 123.456,
Tape: "C",
},
})
// sending a LULD
connection.readCh <- serializeToMsgpack(t, []interface{}{
luldWithT{
Expand Down Expand Up @@ -1152,6 +1163,13 @@ func TestCoreFunctionalityStocks(t *testing.T) {
require.Fail(t, "no trading status received in time")
}

select {
case oi := <-imbalances:
assert.EqualValues(t, 123.456, oi.Price)
case <-time.After(time.Second):
require.Fail(t, "no imbalance received in time")
}

select {
case l := <-lulds:
assert.EqualValues(t, 42.1789, l.LimitUpPrice)
Expand Down Expand Up @@ -1502,6 +1520,7 @@ func writeInitialFlowMessagesToConn(
UpdatedBars: sub.updatedBars,
DailyBars: sub.dailyBars,
Statuses: sub.statuses,
Imbalances: sub.imbalances,
LULDs: sub.lulds,
CancelErrors: sub.trades, // Subscribe automatically.
Corrections: sub.trades, // Subscribe automatically.
Expand Down Expand Up @@ -1533,6 +1552,7 @@ func checkInitialMessagesSentByClient(
require.ElementsMatch(t, sub.updatedBars, s["updatedBars"])
require.ElementsMatch(t, sub.dailyBars, s["dailyBars"])
require.ElementsMatch(t, sub.statuses, s["statuses"])
require.ElementsMatch(t, sub.imbalances, s["statuses"])
require.ElementsMatch(t, sub.lulds, s["lulds"])
require.NotContains(t, s, "cancelErrors")
require.NotContains(t, s, "corrections")
Expand Down
8 changes: 8 additions & 0 deletions marketdata/stream/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ type TradingStatus struct {
Tape string
}

// Imbalance is an order imbalance message during LULD halts for a security
type Imbalance struct {
Symbol string
Price float64
Timestamp time.Time
Tape string
}

// LULD is a Limit Up Limit Down message
type LULD struct {
Symbol string
Expand Down
Loading

0 comments on commit 6cb478c

Please sign in to comment.