diff --git a/build.gradle b/build.gradle
index 2613551..4fec85e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -37,8 +37,6 @@ configure(allprojects) {
exclude group: "org.apache.commons", name: "commons-lang"
exclude group: "com.epam.deltix", name: "containers"
exclude group: "com.epam.deltix", name: "hd-date-time"
- exclude group: "com.epam.deltix", name: "gflog-api"
- exclude group: "com.epam.deltix", name: "gflog-core"
exclude group: "com.epam.deltix", name: "dfp"
}
}
diff --git a/gradle.properties b/gradle.properties
index b554a3f..cc3f9e6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,4 +1,4 @@
-version=1.0.17-SNAPSHOT
+version=1.0.18-SNAPSHOT
group=com.epam.deltix
org.gradle.jvmargs=-Xmx2500m -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError -Dfile.encoding=UTF-8
diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/EntryValidationCode.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/EntryValidationCode.java
new file mode 100644
index 0000000..fcfdc59
--- /dev/null
+++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/EntryValidationCode.java
@@ -0,0 +1,70 @@
+package com.epam.deltix.orderbook.core.api;
+
+/**
+ * Represents validation error codes that can be encountered during order book updates.
+ * Each constant corresponds to a specific scenario where an update to the order book fails
+ * validation checks.
+ */
+public enum EntryValidationCode {
+ /**
+ * The quote ID, which uniquely identifies an order, is missing.
+ */
+ MISSING_QUOTE_ID,
+
+ /**
+ * The quote ID submitted already exists in the order book, indicating a duplicated order.
+ */
+ DUPLICATE_QUOTE_ID,
+
+ /**
+ * The price field of the order is missing, which is required for order placement.
+ */
+ MISSING_PRICE,
+
+ /**
+ * The submitted order size is either not well-formed or outside the allowed range.
+ */
+ BAD_SIZE,
+
+ /**
+ * The quote ID referenced does not match any existing order in the order book.
+ */
+ UNKNOWN_QUOTE_ID,
+
+ /**
+ * An attempt to modify an existing order with a new price was made,
+ * which may not be supported in certain order book implementations.
+ */
+ MODIFY_CHANGE_PRICE,
+
+ /**
+ * An attempt to increase the size of an existing order through modification was made.
+ * This may be an invalid operation depending on exchange rules.
+ */
+ MODIFY_INCREASE_SIZE,
+
+ /**
+ * The exchange ID, used to identify the marketplace where the order should be placed, is missing.
+ */
+ MISSING_EXCHANGE_ID,
+
+ /**
+ * There's a discrepancy between the exchange ID stated and the one expected by the order book.
+ */
+ EXCHANGE_ID_MISMATCH,
+
+ /**
+ * The action specified for updating the order book is not recognized or allowed.
+ */
+ UNSUPPORTED_UPDATE_ACTION,
+
+ /**
+ * The order does not specify whether it is a buy or sell order.
+ */
+ UNSPECIFIED_SIDE,
+
+ /**
+ * The type of order insert operation specified is not supported by the order book.
+ */
+ UNSUPPORTED_INSERT_TYPE
+}
diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/ErrorListener.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/ErrorListener.java
new file mode 100644
index 0000000..924848a
--- /dev/null
+++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/ErrorListener.java
@@ -0,0 +1,17 @@
+package com.epam.deltix.orderbook.core.api;
+
+
+import com.epam.deltix.timebase.messages.MessageInfo;
+
+/**
+ * User-defined error handler
+ */
+public interface ErrorListener {
+ /**
+ * Called when input market message contains something invalid.
+ *
+ * @param message message containing invalid market-related messages
+ * @param errorCode error code that describes what is wrong
+ */
+ void onError(MessageInfo message, EntryValidationCode errorCode);
+}
diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/Exchange.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/Exchange.java
index 5f5ef3c..6cfa438 100644
--- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/Exchange.java
+++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/Exchange.java
@@ -16,8 +16,8 @@
*/
package com.epam.deltix.orderbook.core.api;
-
import com.epam.deltix.timebase.messages.universal.QuoteSide;
+import com.epam.deltix.util.annotations.Alphanumeric;
/**
* Represents the order book entries for a specific stock exchange.
@@ -32,6 +32,7 @@ public interface Exchange {
*
* @return exchangeId for this exchange.
*/
+ @Alphanumeric
long getExchangeId();
/**
diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/ExchangeList.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/ExchangeList.java
index 3ed894a..028a68e 100644
--- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/ExchangeList.java
+++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/ExchangeList.java
@@ -17,6 +17,7 @@
package com.epam.deltix.orderbook.core.api;
import com.epam.deltix.orderbook.core.options.Option;
+import com.epam.deltix.util.annotations.Alphanumeric;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
@@ -30,7 +31,7 @@
*
* @author Andrii_Ostapenko1
*/
-public interface ExchangeList
extends Iterable
{
*
* @return an iterator over the elements in this market side in proper sequence
*/
+ @Override
Iterator
iterator();
/**
@@ -45,7 +46,7 @@ public interface IterableMarketSide
extends Iterable
{
* @param fromLevel - Starting price level index to use
* @return an iterator over the elements in this order book in proper sequence
*/
- Iterator
iterator(short fromLevel);
+ Iterator
iterator(int fromLevel);
/**
* Returns an iterator over the elements in this market side in proper sequence.
@@ -54,7 +55,7 @@ public interface IterableMarketSide
extends Iterable
{
* @param toLevel - End price level index to use
* @return an iterator over the elements in this order book in proper sequence
*/
- Iterator
iterator(short fromLevel, short toLevel);
+ Iterator
iterator(int fromLevel, int toLevel);
/**
* Creates a new sequential or parallel {@code Stream} from a
@@ -89,13 +90,13 @@ default Stream
stream() {
void forEach(Predicate
action);
- void forEach(short fromLevel, Predicate
action);
+ void forEach(int fromLevel, Predicate
action);
- void forEach(short fromLevel, short toLevel, Predicate
action);
+ void forEach(int fromLevel, int toLevel, Predicate
action);
action, Cookie cookie);
-
action, Cookie cookie);
+
action, Cookie cookie);
-
action, Cookie cookie);
+
action, Cookie cookie);
}
diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/MarketSide.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/MarketSide.java
index 605cca0..d23c8af 100644
--- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/MarketSide.java
+++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/MarketSide.java
@@ -49,7 +49,14 @@ public interface MarketSide
extends IterableMarketSide
{
Quote getBestQuote();
/**
- * Get quote by level.
+ * Get worst quote.
+ *
+ * @return Worst quote from side or null if quote not found
+ */
+ Quote getWorstQuote();
+
+ /**
+ * Get quote by level. WARNING: this method can be slow for some implementations (for example, in L3 order book). Use iterator instead.
*
* @param level - level to use
* @return quote or null if quote not found
@@ -89,30 +96,47 @@ public interface MarketSide
extends IterableMarketSide
{
* @param level - quote level to use.
* @return true if this market side contains given quote level
*/
- boolean hasLevel(short level);
+ boolean hasLevel(int level);
+
+ /**
+ * Get quote by quoteId.
+ * Unsupported operation for L2, always returns null.
+ *
+ * @param quoteId - Quote Id
+ * @return quote or null if quote not found
+ */
+ Quote getQuote(CharSequence quoteId);
+
+ /**
+ * Unsupported operation for L2, always returns false.
+ *
+ * @param quoteId - Quote Id
+ * @return true if this market side contains given quote ID
+ */
+ boolean hasQuote(CharSequence quoteId);
@Override
default Iterator
iterator() {
- return iterator((short) 0);
+ return iterator(0);
}
@Override
- default Iterator
iterator(final short fromLevel) {
- return iterator(fromLevel, (short) depth());
+ default Iterator
iterator(final int fromLevel) {
+ return iterator(fromLevel, depth());
}
@Override
default void forEach(final Predicate
action) {
- forEach((short) 0, (short) depth(), action);
+ forEach(0, depth(), action);
}
@Override
- default void forEach(final short level, final Predicate
action) {
- forEach((short) 0, level, action);
+ default void forEach(final int level, final Predicate
action) {
+ forEach(0, level, action);
}
@Override
- default void forEach(final short fromLevel, final short toLevel, final Predicate
action) {
+ default void forEach(final int fromLevel, final int toLevel, final Predicate
action) {
Objects.requireNonNull(action);
for (int i = fromLevel; i < toLevel; i++) {
if (!action.test(getQuote(i))) {
@@ -123,17 +147,17 @@ default void forEach(final short fromLevel, final short toLevel, final Predicate
@Override
default
action, final Cookie cookie) {
- forEach((short) 0, (short) depth(), action, cookie);
+ forEach(0, depth(), action, cookie);
}
@Override
- default
action, final Cookie cookie) {
- forEach(fromLevel, (short) depth(), action, cookie);
+ default
action, final Cookie cookie) {
+ forEach(fromLevel, depth(), action, cookie);
}
@Override
- default
action,
final Cookie cookie) {
Objects.requireNonNull(action);
diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBook.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBook.java
index 1baca11..a8f121a 100644
--- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBook.java
+++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBook.java
@@ -16,8 +16,9 @@
*/
package com.epam.deltix.orderbook.core.api;
+
import com.epam.deltix.orderbook.core.options.Option;
-import com.epam.deltix.timebase.messages.MarketMessageInfo;
+import com.epam.deltix.timebase.messages.MessageInfo;
import com.epam.deltix.timebase.messages.universal.DataModelType;
import com.epam.deltix.timebase.messages.universal.QuoteSide;
@@ -28,10 +29,10 @@
* of various trading systems that we have previously worked with. It is designed to provide high-level normalized
* format suitable to capture data from majority of very different trading venues.
* Package Type is enumeration of:
- * •VENDOR_SNAPSHOT – snapshot that came directly from data vendor (e.g. exchange).
- * •PERIODICAL_SNAPSHOT Synthetic snapshot generated by EPAM Market
+ * VENDOR_SNAPSHOT - snapshot that came directly from data vendor (e.g. exchange).
+ * PERIODICAL_SNAPSHOT Synthetic snapshot generated by EPAM Market
* Data Aggregation software to simplify data validation and allow late joiners (consumers that appear in the middle of trading session).
- * •INCREMENTAL_UPDATE – incremental update, a list of insert/update/delete data entries.
+ * INCREMENTAL_UPDATE - incremental update, a list of insert/update/delete data entries.
* This Format does not support snapshot and increment messages mixed in one package.
* Trade entries can be easily combined with Increments.
* It is important to differentiate one type of snapshot from another.
@@ -52,7 +53,7 @@ public interface OrderBook
{
* Note: FlyWeight pattern in use. We don't keep any references on your classes (message) after method returns execution.
*
- * Supported market data messages: L1/L2/ResetEntry
+ * Supported market data messages: L1/L2/L3/ResetEntry,
*
* @param message Most financial market-related messages to use.
* @return {@code true} if all entries of the package is process otherwise {@code false}
@@ -63,7 +64,7 @@ public interface OrderBook {
* @see com.epam.deltix.timebase.messages.universal.BookResetEntryInterface
*
{ */ ExchangeList extends Exchange> getExchanges(); + /** + * @return true if order book from this exchange is waiting for snapshot to recover. + * In this state order book appears empty, but corresponding exchange is likely not empty. + * Order book may be in this state initially, or after we market data disconnect, as well as after internal error. + */ + boolean isWaitingForSnapshot(); + } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookFactory.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookFactory.java index c9433bd..06c04ff 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookFactory.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookFactory.java @@ -16,9 +16,14 @@ */ package com.epam.deltix.orderbook.core.api; + import com.epam.deltix.orderbook.core.impl.L1OrderBookFactory; import com.epam.deltix.orderbook.core.impl.L2OrderBookFactory; -import com.epam.deltix.orderbook.core.options.*; +import com.epam.deltix.orderbook.core.impl.L3OrderBookFactory; +import com.epam.deltix.orderbook.core.options.Defaults; +import com.epam.deltix.orderbook.core.options.OrderBookOptions; +import com.epam.deltix.orderbook.core.options.OrderBookOptionsBuilder; +import com.epam.deltix.orderbook.core.options.OrderBookType; import com.epam.deltix.timebase.messages.universal.DataModelType; import java.util.Objects; @@ -42,52 +47,59 @@ protected OrderBookFactory() { /** * Factory method for create order book with the given options. * - *- * Note: FlyWeight pattern in use. We don't keep any references on your classes (opt) after method returns execution. - * * @param
type of quote - * @param opt to use. + * @param options to use. * @return a new OrderBook instance of given type. - * @throws NullPointerException - if opt is null. - * @throws UnsupportedOperationException - if some options does not supported. + * @throws IllegalArgumentException - if some options does not supported. * @see OrderBook * @see OrderBookQuote */ - public staticOrderBookcreate(final OrderBookOptions opt) { - Objects.requireNonNull(opt); - final Optionsymbol = opt.getSymbol(); - final DataModelType quoteLevels = opt.getQuoteLevels().orElse(Defaults.QUOTE_LEVELS); - final OrderBookType orderBookType = opt.getBookType().orElse(Defaults.ORDER_BOOK_TYPE); - OrderBook book = null; - final UpdateMode updateMode = opt.getUpdateMode().orElse(Defaults.UPDATE_MODE); + public staticOrderBookcreate(final OrderBookOptions options) { + if (Objects.isNull(options)) { + throw new IllegalArgumentException("Options not allowed to be null."); + } + + final DataModelType quoteLevels = options.getQuoteLevels().orElse(Defaults.QUOTE_LEVELS); + final OrderBookType orderBookType = options.getBookType().orElse(Defaults.ORDER_BOOK_TYPE); + final OrderBookbook; switch (quoteLevels) { case LEVEL_ONE: if (orderBookType == OrderBookType.SINGLE_EXCHANGE) { - book = L1OrderBookFactory.newSingleExchangeBook(symbol, updateMode); + book = L1OrderBookFactory.newSingleExchangeBook(options); } else { - throw new UnsupportedOperationException("Unsupported book mode: " + orderBookType + " for quote levels: " + quoteLevels); + throw new IllegalArgumentException("Unsupported book type: " + orderBookType + " for quote levels: " + quoteLevels); } break; case LEVEL_TWO: - final GapMode gapMode = opt.getGapMode().orElse(Defaults.GAP_MODE); - final UnreachableDepthMode unreachableDepthMode = opt.getUnreachableDepthMode().orElse(Defaults.UNREACHABLE_DEPTH_MODE); - final int initialDepth = opt.getInitialDepth().orElse(Defaults.INITIAL_DEPTH); - final int maxDepth = opt.getMaxDepth().orElse(Defaults.MAX_DEPTH); - final Integer exchangePoolSize = opt.getInitialExchangesPoolSize().orElse(Defaults.INITIAL_EXCHANGES_POOL_SIZE); switch (orderBookType) { case SINGLE_EXCHANGE: - book = L2OrderBookFactory.newSingleExchangeBook(symbol, initialDepth, maxDepth, gapMode, updateMode, unreachableDepthMode); + book = L2OrderBookFactory.newSingleExchangeBook(options); break; case AGGREGATED: - book = L2OrderBookFactory.newAggregatedBook(symbol, exchangePoolSize, initialDepth, maxDepth, gapMode, updateMode, unreachableDepthMode); + book = L2OrderBookFactory.newAggregatedBook(options); + break; + case CONSOLIDATED: + book = L2OrderBookFactory.newConsolidatedBook(options); + break; + default: + throw new IllegalArgumentException("Unsupported book type: " + orderBookType + " for quote levels: " + quoteLevels); + } + break; + case LEVEL_THREE: + switch (orderBookType) { + case SINGLE_EXCHANGE: + book = L3OrderBookFactory.newSingleExchangeBook(options); break; case CONSOLIDATED: - book = L2OrderBookFactory.newConsolidatedBook(symbol, exchangePoolSize, initialDepth, maxDepth, gapMode, updateMode, unreachableDepthMode); + book = L3OrderBookFactory.newConsolidatedBook(options); break; + case AGGREGATED: + default: + throw new IllegalArgumentException("Unsupported book type: " + orderBookType + " for quote levels: " + quoteLevels); } break; default: - throw new UnsupportedOperationException("Unsupported quote levels: " + quoteLevels); + throw new IllegalArgumentException("Unsupported quote levels: " + quoteLevels); } return book; } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookQuote.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookQuote.java index c1068ca..dd23622 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookQuote.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookQuote.java @@ -23,7 +23,7 @@ * * @author Andrii_Ostapenko1 */ -public interface OrderBookQuote { +public interface OrderBookQuote extends OrderBookQuoteTimestamp { /** * Ask, Bid or Trade price. @@ -83,4 +83,44 @@ public interface OrderBookQuote { */ boolean hasSize(); + /** + * Quote ID. In Forex market, for example, quote ID can be referenced in + * TradeOrders (to identify market maker's quote/rate we want to deal with). + * Each market maker usually keeps this ID unique per session per day. This + * is a alpha-numeric text field that can reach 64 characters or more, + *+ * Supported for L3 quote level + * + * @return Quote ID or null if not found + */ + CharSequence getQuoteId(); + + /** + * Quote ID. In Forex market, for example, quote ID can be referenced in + * TradeOrders (to identify market maker's quote/rate we want to deal with). + * Each market maker usually keeps this ID unique per session per day. This + * is a alpha-numeric text field that can reach 64 characters or more, + *
+ * Supported for L3 quote level + * + * @return true if Quote ID is not null + */ + boolean hasQuoteId(); + + /** + * Id of participant (or broker ID). + * Supported for L3 quote level + * + * @return Participant or null if not found + */ + CharSequence getParticipantId(); + + /** + * Id of participant (or broker ID). + * Supported for L3 quote level + * + * @return true if Participant is not null + */ + boolean hasParticipantId(); + } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookQuoteTimestamp.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookQuoteTimestamp.java new file mode 100644 index 0000000..65cdb6e --- /dev/null +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/api/OrderBookQuoteTimestamp.java @@ -0,0 +1,73 @@ +package com.epam.deltix.orderbook.core.api; + +/** + * Quote timestamp interface. + *
+ * This interface is used for accessing quote timestamp information. + * This functionality is optional and can be enabled by setting OrderBookOptionsBuilder.shouldStoreQuoteTimestamps(true) + *
+ * This function requires additional memory allocation for each quote. + * + * @author Andrii_Ostapenko1 + * @see com.epam.deltix.orderbook.core.options.Defaults#SHOULD_STORE_QUOTE_TIMESTAMPS + */ +public interface OrderBookQuoteTimestamp { + + /** + * Special constant that marks 'unknown' timestamp. + */ + long TIMESTAMP_UNKNOWN = Long.MIN_VALUE; + + /** + * Exchange Time is measured in milliseconds that passed since January 1, 1970 UTC + * For inbound messages special constant {link TIMESTAMP_UNKNOWN} marks 'unknown' timestamp in which case OrderBook + * stores message using current server time. + *
+ * By default, Original Timestamp is not supported. + * For enabling Original Timestamp support you need to set OrderBookOptionsBuilder.shouldStoreQuoteTimestamps(true) + * + * @return timestamp + */ + default long getOriginalTimestamp() { + return TIMESTAMP_UNKNOWN; + } + + /** + * Exchange Time is measured in milliseconds that passed since January 1, 1970 UTC + * By default Original Timestamp is not supported. + * For enabling Original Timestamp support you need to set OrderBookOptionsBuilder.shouldStoreQuoteTimestamps(true) + * + * @return true if Original Timestamp is not null + */ + default boolean hasOriginalTimestamp() { + return false; + } + + /** + * Time in this field is measured in milliseconds that passed since January 1, 1970 UTC. + * For inbound messages, special constant {link TIMESTAMP_UNKNOWN} marks 'unknown' timestamp + * in which case OrderBook stores message using current server time. + *
+ * By default, TimeStamp is not supported. + * For enabling Time support you need to set OrderBookOptionsBuilder.shouldStoreQuoteTimestamps(true) + * + * @return timestamp + */ + default long getTimestamp() { + return TIMESTAMP_UNKNOWN; + } + + /** + * Time in this field is measured in milliseconds that passed since January 1, 1970 UTC. + * For inbound messages, special constant {link TIMESTAMP_UNKNOWN} marks 'unknown' timestamp + * in which case OrderBook stores message using current server time. + *
+ * By default, TimeStamp is not supported. + * For enabling Time support, you need to set OrderBookOptionsBuilder.shouldStoreQuoteTimestamps(true) + * + * @return true if time not null + */ + default boolean hasTimestamp() { + return false; + } +} diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL1MarketSide.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL1MarketSide.java index 2150290..71ce2a8 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL1MarketSide.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL1MarketSide.java @@ -69,7 +69,7 @@ public boolean isEmpty() { } @Override - public boolean hasLevel(final short level) { + public boolean hasLevel(final int level) { return !isEmpty() && level == 0; } @@ -78,6 +78,11 @@ public Quote getBestQuote() { return quote; } + @Override + public Quote getWorstQuote() { + return quote; + } + @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -88,7 +93,7 @@ public String toString() { } @Override - public Iterator
iterator(final short fromLevel, final short toLevel) { + public Iteratoriterator(final int fromLevel, final int toLevel) { itr.iterateBy(quote); return itr; } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL2MarketSide.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL2MarketSide.java index 218cf44..38e1834 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL2MarketSide.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL2MarketSide.java @@ -19,13 +19,18 @@ import com.epam.deltix.dfp.Decimal; import com.epam.deltix.dfp.Decimal64Utils; import com.epam.deltix.orderbook.core.api.MarketSide; +import com.epam.deltix.timebase.messages.universal.BookUpdateAction; import com.epam.deltix.timebase.messages.universal.QuoteSide; +import com.epam.deltix.util.annotations.Alphanumeric; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; +import static com.epam.deltix.dfp.Decimal64Utils.*; +import static com.epam.deltix.timebase.messages.TypeConstants.EXCHANGE_NULL; + /** * @author Andrii_Ostapenko1 */ @@ -33,21 +38,17 @@ abstract class AbstractL2MarketSideimpleme protected final Listdata; private final ReusableIteratoritr; - // This parameter is used to limit maximum elements. - private final short maxDepth; - // This parameter is used to understand whether the side is full or not. - private short depthLimit; + // This parameter is used to limit maximum elements and to understand whether the side is full or not. + private final int maxDepth; - AbstractL2MarketSide(final int initialCapacity, - final short maxDepth) { + AbstractL2MarketSide(final int initialCapacity, final int maxDepth) { this.maxDepth = maxDepth; - this.depthLimit = maxDepth; this.data = new ArrayList<>(initialCapacity); this.itr = new ReusableIterator<>(); } @Override - public short getMaxDepth() { + public int getMaxDepth() { return maxDepth; } @@ -59,8 +60,7 @@ public int depth() { //TODO Add configuration parameter for type of calculating total quantity @Override public long getTotalQuantity() { - @Decimal - long result = Decimal64Utils.ZERO; + @Decimal long result = ZERO; for (int i = 0; i < data.size(); i++) { result = Decimal64Utils.add(result, data.get(i).getSize()); } @@ -79,37 +79,32 @@ public boolean isEmpty() { @Override public Quote getQuote(final int level) { - if (!hasLevel((short) level)) { + if (!hasLevel(level)) { return null; } return data.get(level); } @Override - public void add(final short level, final Quote insert) { + public void add(final int level, final Quote insert) { data.add(level, insert); } @Override - public void addLast(final Quote insert) { - data.add(insert); - } - - @Override - public void add(final Quote insert) { + public void addWorstQuote(final Quote insert) { data.add(insert); } @Override public Quote remove(final int level) { - if (!hasLevel((short) level)) { + if (!hasLevel(level)) { return null; } return data.remove(level); } @Override - public short binarySearchLevelByPrice(final Quote find) { + public int binarySearch(final Quote find) { int low = 0; int high = data.size() - 1; @@ -149,14 +144,14 @@ public short binarySearchLevelByPrice(final Quote find) { high = mid - 1; } } else { - return (short) mid; + return mid; } } return NOT_FOUND; } @Override - public short binarySearchNextLevelByPrice(final Quote find) { + public int binarySearchNextLevelByPrice(final Quote find) { int low = 0; int high = data.size() - 1; @@ -196,30 +191,20 @@ public short binarySearchNextLevelByPrice(final Quote find) { high = mid - 1; } } else { - return (short) mid; + return mid; } } - return (short) low; + return low; } @Override - public boolean hasLevel(final short level) { - if (data.size() > level) { + public boolean hasLevel(final int level) { + if (level >= 0 && data.size() > level) { return Objects.nonNull(data.get(level)); } return false; } - @Override - public void trim() { - this.depthLimit = (short) data.size(); - } - - @Override - public Quote getWorstQuote() { - return data.get(data.size() - 1); - } - @Override public Quote removeWorstQuote() { return data.remove(data.size() - 1); @@ -227,15 +212,20 @@ public Quote removeWorstQuote() { @Override public boolean isFull() { - return depth() >= depthLimit; + return depth() >= maxDepth; } //TODO add doc !! @Override - public boolean isGap(final short level) { + public boolean isGap(final int level) { return !hasLevel(level) && level > depth(); } + @Override + public boolean isUnreachableLeve(int level) { + return level < 0 || level >= getMaxDepth(); + } + @Override public Quote getBestQuote() { if (isEmpty()) { @@ -244,6 +234,90 @@ public Quote getBestQuote() { return data.get(0); } + @Override + public Quote getWorstQuote() { + if (!isEmpty()) { + return data.get(data.size() - 1); + } + return null; + } + + @Override + public boolean isInvalidInsert(final int level, final @Decimal long price, final @Decimal long size, final @Alphanumeric long exchangeId) { + //TODO need to defined default type for internal decimal + if (level < 0 || isEqual(price, NULL) || isLessOrEqual(size, ZERO) || exchangeId == EXCHANGE_NULL) { + return true; + } + if (isUnreachableLeve(level)) { + return true; + } + if (isGap(level)) { + return true; + } + return !checkOrderPrice(level, price); + } + + @Override + public boolean isInvalidUpdate(final BookUpdateAction action, + final int level, + final @Decimal long price, + final @Decimal long size, + final @Alphanumeric long exchangeId) { + if (!hasLevel(level)) { + return true; + } + if (action != BookUpdateAction.DELETE) { + return isNotEqual(getQuote(level).getPrice(), price) || isLess(size, ZERO); + } + return false; + } + + /** + * Checking the insertion of the quotation price. + * @param level - quote level to use + * @param price - price to be checked + * @return true if this price is sorted. + */ + @Override + public boolean checkOrderPrice(final int level, final @Decimal long price) { + + @Decimal final long previousPrice = hasLevel(level - 1) ? getQuote(level - 1).getPrice() : NULL; + @Decimal final long nextPrice = hasLevel(level) ? getQuote(level).getPrice() : NULL; + + boolean badState = false; + if (getSide() == QuoteSide.ASK) { + if (isNotEqual(previousPrice, NULL) && isGreater(previousPrice, price)) { + badState = true; + } + if (isNotEqual(nextPrice, NULL) && isLess(nextPrice, price)) { + badState = true; + } + } else { + if (isNotEqual(previousPrice, NULL) && isLess(previousPrice, price)) { + badState = true; + } + if (isNotEqual(nextPrice, NULL) && isGreater(nextPrice, price)) { + badState = true; + } + } + return !badState; + } + + @Override + public boolean validateState() { + if (isEmpty()) { + return true; + } + for (int i = 0; i < depth(); i++) { + final Quote quote = getQuote(i); + if (isInvalidInsert(i, quote.getPrice(), quote.getSize(), quote.getExchangeId())) { + //TODO add log + return false; + } + } + return true; + } + @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -254,7 +328,7 @@ public String toString() { } @Override - public Iteratoriterator(final short fromLevel, final short toLevel) { + public Iteratoriterator(final int fromLevel, final int toLevel) { itr.iterateBy(this, fromLevel, toLevel); return itr; } @@ -267,18 +341,18 @@ static final class ReusableIteratorimplements Iterator{ /** * Index of element to be returned by subsequent call to next. */ - private short cursor; + private int cursor; - private short size; + private int size; private MarketSidemarketSide; - private void iterateBy(final MarketSidemarketSide, final short cursor, final short size) { + private void iterateBy(final MarketSidemarketSide, final int cursor, final int size) { Objects.requireNonNull(marketSide); this.marketSide = marketSide; this.cursor = cursor; if (size > marketSide.depth() || size < 0) { - this.size = (short) marketSide.depth(); + this.size = marketSide.depth(); } else { this.size = size; } @@ -305,7 +379,7 @@ public void remove() { static class ASKextends AbstractL2MarketSide{ - ASK(final int initialCapacity, final short maxDepth) { + ASK(final int initialCapacity, final int maxDepth) { super(initialCapacity, maxDepth); } @@ -318,7 +392,7 @@ public QuoteSide getSide() { static class BIDextends AbstractL2MarketSide{ - BID(final int initialDepth, final short maxDepth) { + BID(final int initialDepth, final int maxDepth) { super(initialDepth, maxDepth); } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL2MultiExchangeProcessor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL2MultiExchangeProcessor.java index 31d2e6e..0db2d3b 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL2MultiExchangeProcessor.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL2MultiExchangeProcessor.java @@ -16,12 +16,17 @@ */ package com.epam.deltix.orderbook.core.impl; +import com.epam.deltix.containers.AlphanumericUtils; import com.epam.deltix.orderbook.core.api.MarketSide; import com.epam.deltix.orderbook.core.options.*; +import com.epam.deltix.timebase.messages.TypeConstants; +import com.epam.deltix.timebase.messages.service.FeedStatus; +import com.epam.deltix.timebase.messages.service.SecurityFeedStatusMessage; import com.epam.deltix.timebase.messages.universal.*; import com.epam.deltix.util.annotations.Alphanumeric; import com.epam.deltix.util.collections.generated.ObjectList; + /** * Main class for L2 quote level order book. * @@ -32,38 +37,46 @@ abstract class AbstractL2MultiExchangeProcessorbids; protected final L2MarketSideasks; + protected final ObjectPoolpool; + protected final MutableExchangeList>> exchanges; //Parameters - protected final GapMode gapMode; - protected final UpdateMode updateMode; - protected final UnreachableDepthMode unreachableDepthMode; - protected final ObjectPool pool; - protected final short initialDepth; - protected final int maxDepth; - /** - * This parameter using for handle book reset entry. - * - * @see QuoteProcessor#isWaitingForSnapshot() - */ - private boolean isWaitingForSnapshot = false; - - AbstractL2MultiExchangeProcessor(final int initialExchangeCount, - final int initialDepth, - final int maxDepth, - final ObjectPoolpool, - final GapMode gapMode, - final UpdateMode updateMode, - final UnreachableDepthMode unreachableDepthMode) { - this.initialDepth = (short) initialDepth; - this.maxDepth = maxDepth; + protected final DisconnectMode disconnectMode; + protected final ValidationOptions validationOptions; + private final OrderBookOptions options; + + AbstractL2MultiExchangeProcessor(final OrderBookOptions options, final ObjectPoolpool) { + this.options = options; + this.validationOptions = options.getInvalidQuoteMode().orElse(Defaults.VALIDATION_OPTIONS); + this.disconnectMode = options.getDisconnectMode().orElse(Defaults.DISCONNECT_MODE); + + final int maxDepth = options.getMaxDepth().orElse(Defaults.MAX_DEPTH); + final int depth = options.getInitialDepth().orElse(Math.min(Defaults.INITIAL_DEPTH, maxDepth)); + final int exchanges = options.getInitialExchangesPoolSize().orElse(Defaults.INITIAL_EXCHANGES_POOL_SIZE); this.pool = pool; - this.gapMode = gapMode; - this.updateMode = updateMode; - this.unreachableDepthMode = unreachableDepthMode; - this.exchanges = new MutableExchangeListImpl<>(initialExchangeCount); - this.asks = L2MarketSide.factory(initialExchangeCount * initialDepth, Defaults.MAX_DEPTH, QuoteSide.ASK); - this.bids = L2MarketSide.factory(initialExchangeCount * initialDepth, Defaults.MAX_DEPTH, QuoteSide.BID); + this.exchanges = new MutableExchangeListImpl<>(exchanges); + this.asks = L2MarketSide.factory(exchanges * depth, Defaults.MAX_DEPTH, QuoteSide.ASK); + this.bids = L2MarketSide.factory(exchanges * depth, Defaults.MAX_DEPTH, QuoteSide.BID); + } + + @Override + public boolean isWaitingForSnapshot() { + if (exchanges.isEmpty()) { + return true; // No data from exchanges, so we are in "waiting" state + } + + for (final MutableExchange exchange : exchanges) { + if (exchange.isWaitingForSnapshot()) { + return true; // At least one of source exchanges awaits snapshot + } + } + return false; + } + + @Override + public boolean isSnapshotAllowed(final PackageHeaderInfo msg) { + throw new UnsupportedOperationException("Unsupported for multi exchange processor!"); } @Override @@ -82,154 +95,144 @@ public boolean isEmpty() { } @Override - public void processBookResetEntry(final BookResetEntryInfo bookResetEntryInfo) { - clearExchange(bookResetEntryInfo.getExchangeId()); - waitingForSnapshot(); + public boolean processSecurityFeedStatus(final SecurityFeedStatusMessage msg) { + if (msg.getStatus() == FeedStatus.NOT_AVAILABLE) { + if (disconnectMode == DisconnectMode.CLEAR_EXCHANGE) { + @Alphanumeric final long exchangeId = msg.getExchangeId(); + final Option>> holder = getOrCreateExchange(exchangeId); + + if (!holder.hasValue()) { + return false; + } + final L2Processor exchange = holder.get().getProcessor(); + + unmapQuote(exchange); + return exchange.processSecurityFeedStatus(msg); + } + } + return false; } @Override - public void processL2VendorSnapshot(final PackageHeaderInfo marketMessageInfo) { - final ObjectListentries = marketMessageInfo.getEntries(); - @Alphanumeric final long exchangeId = entries.get(0).getExchangeId(); + public boolean processBookResetEntry(final PackageHeaderInfo pck, final BookResetEntryInfo msg) { + @Alphanumeric final long exchangeId = msg.getExchangeId(); + final Option >> holder = getOrCreateExchange(exchangeId); - final L2Processor exchange = clearExchange(exchangeId); - - exchange.processL2VendorSnapshot(marketMessageInfo); + if (!holder.hasValue()) { + return false; + } + final L2Processorexchange = holder.get().getProcessor(); - insertAll(exchange, QuoteSide.BID); - insertAll(exchange, QuoteSide.ASK); - notWaitingForSnapshot(); + unmapQuote(exchange); + return exchange.processBookResetEntry(pck, msg); } @Override - public Quote processL2EntryNewInfo(final L2EntryNewInfo l2EntryNewInfo) { - final QuoteSide side = l2EntryNewInfo.getSide(); - final long exchangeId = l2EntryNewInfo.getExchangeId(); - final short level = l2EntryNewInfo.getLevel(); - final L2Processorexchange = getOrCreateExchange(exchangeId); + public boolean processL2Snapshot(final PackageHeaderInfo msg) { + final ObjectListentries = msg.getEntries(); - // Duplicate - if (exchange.isEmpty()) { - switch (updateMode) { - case WAITING_FOR_SNAPSHOT: - return null; // Todo ADD null check!! - case NON_WAITING_FOR_SNAPSHOT: - break; - default: - throw new UnsupportedOperationException("Unsupported mode: " + updateMode); + // we assume that all entries in the message are from the same exchange + @Alphanumeric final long exchangeId = entries.get(0).getExchangeId(); + + final Option >> holder = getOrCreateExchange(exchangeId); + + if (!holder.hasValue()) { + return false; + } + + final L2Processor exchange = holder.get().getProcessor(); + if (exchange.isSnapshotAllowed(msg)) { + unmapQuote(exchange); + if (exchange.processL2Snapshot(msg)) { + mapQuote(exchange, QuoteSide.BID); + mapQuote(exchange, QuoteSide.ASK); + return true; } } + return false; + } - final L2MarketSidemarketSide = exchange.getMarketSide(side); + @Override + public Quote processL2EntryNew(final PackageHeaderInfo pck, final L2EntryNewInfo msg) { + assert pck.getPackageType() == PackageType.INCREMENTAL_UPDATE; + final QuoteSide side = msg.getSide(); + final int level = msg.getLevel(); + @Alphanumeric final long exchangeId = msg.getExchangeId(); - // TODO: 6/30/2022 need to refactor return value + final Option>> holder = getOrCreateExchange(exchangeId); // Duplicate - if (level >= marketSide.getMaxDepth()) { - switch (unreachableDepthMode) { - case SKIP_AND_DROP: - clear(); - return null; - case SKIP: - default: - return null; - } - // Unreachable quote level + if (!holder.hasValue() || holder.get().getProcessor().isWaitingForSnapshot()) { + return null; } - // TODO: 6/30/2022 need to refactor return value - // Duplicate - if (marketSide.isGap(level)) { - switch (gapMode) { - case FILL_GAP:// We fill gaps at the exchange level. - break; - case SKIP_AND_DROP: - clearExchange(exchange); - return null; - case SKIP: - return null; - default: - throw new UnsupportedOperationException("Unsupported mode: " + gapMode); + final L2Processor exchange = holder.get().getProcessor(); + + final L2MarketSidemarketSide = exchange.getMarketSide(side); + if (marketSide.isInvalidInsert(level, msg.getPrice(), msg.getSize(), exchangeId)) { + if (validationOptions.isQuoteInsert()) { + unmapQuote(exchange); + exchange.processL2EntryNew(pck, msg); } + return null; } - if (marketSide.hasLevel(level)) { //Remove worst quote + //Remove worst quote + //...maybe we should remove + if (marketSide.isFull()) { removeQuote(marketSide.getWorstQuote(), side); } - final Quote quote = exchange.processL2EntryNewInfo(l2EntryNewInfo); + // We process quote as new by single exchange and then insert it to the aggregated book + final Quote quote = exchange.processL2EntryNew(pck, msg); + if (quote == null) { + return null; + } final Quote insertQuote = insertQuote(quote, side); - return insertQuote; } @Override - public void processL2EntryUpdateInfo(final L2EntryUpdateInfo l2EntryUpdateInfo) { - final long exchangeId = l2EntryUpdateInfo.getExchangeId(); - final L2Processorexchange = getOrCreateExchange(exchangeId); - - if (exchange.isEmpty()) { - return; + public boolean processL2EntryUpdate(final PackageHeaderInfo pck, final L2EntryUpdateInfo msg) { + assert pck.getPackageType() == PackageType.INCREMENTAL_UPDATE; + final int level = msg.getLevel(); + final QuoteSide side = msg.getSide(); + @Alphanumeric final long exchangeId = msg.getExchangeId(); + final BookUpdateAction action = msg.getAction(); + + final Option>> exchange = getExchanges().getById(exchangeId); + + if (!exchange.hasValue() || exchange.get().getProcessor().isEmpty() || + exchange.get().getProcessor().isWaitingForSnapshot()) { + return false; } - final BookUpdateAction bookUpdateAction = l2EntryUpdateInfo.getAction(); - - final short level = l2EntryUpdateInfo.getLevel(); - final QuoteSide side = l2EntryUpdateInfo.getSide(); - - final L2MarketSide marketSide = exchange.getMarketSide(side); + final L2MarketSidemarketSide = exchange.get().getProcessor().getMarketSide(side); - // TODO check if overlay WHY -// if (depth < currentSize) { -// final T item = items[depth]; -// -// if ((item != null) && (item.getPrice() != event.getPrice())) { // check if overlay -// delete(depth); -// insert(depth, event); -// -// break; -// } else { -// update(depth, event); -// } -// } else { -// insert(depth, event); -// } - if (!marketSide.hasLevel(level)) { - return; // Stop processing if exchange don't know about quote level + if (marketSide.isInvalidUpdate(action, level, msg.getPrice(), msg.getSize(), exchangeId)) { + if (validationOptions.isQuoteUpdate()) { + unmapQuote(exchangeId); + exchange.get().getProcessor().processL2EntryUpdate(pck, msg); + } + return false; } + final BookUpdateAction bookUpdateAction = msg.getAction(); + if (bookUpdateAction == BookUpdateAction.DELETE) { - final Quote remove = marketSide.getQuote(level); - removeQuote(remove, side); + final Quote quote = marketSide.getQuote(level); + removeQuote(quote, side); } else if (bookUpdateAction == BookUpdateAction.UPDATE) { final Quote quote = marketSide.getQuote(level); - updateQuote(quote, side, l2EntryUpdateInfo); + updateQuote(quote, side, msg); } - exchange.processL2EntryUpdateInfo(l2EntryUpdateInfo); + return exchange.get().getProcessor().processL2EntryUpdate(pck, msg); } + protected abstract void updateQuote(final Quote previous, + final QuoteSide side, + final L2EntryUpdateInfo update); - @Override - public boolean isWaitingForSnapshot() { - return isWaitingForSnapshot; - } - - private void waitingForSnapshot() { - if (!isWaitingForSnapshot()) { - isWaitingForSnapshot = true; - } - } - - private void notWaitingForSnapshot() { - if (isWaitingForSnapshot()) { - isWaitingForSnapshot = false; - } - } - - abstract void updateQuote(final Quote previous, - final QuoteSide side, - final L2EntryUpdateInfo update); - - public void insertAll(final L2Processorexchange, final QuoteSide side) { + private void mapQuote(final L2Processorexchange, final QuoteSide side) { final L2MarketSidemarketSide = exchange.getMarketSide(side); for (int i = 0; i < marketSide.depth(); i++) { final Quote insert = marketSide.getQuote(i); @@ -237,13 +240,13 @@ public void insertAll(final L2Processorexchange, final QuoteSide side) { } } - public Quote insertQuote(final Quote insert, final QuoteSide side) { + private Quote insertQuote(final Quote insert, final QuoteSide side) { return insertQuote(insert, getMarketSide(side)); } - abstract Quote insertQuote(final Quote insert, final L2MarketSidemarketSide); + protected abstract Quote insertQuote(final Quote insert, final L2MarketSidemarketSide); - public void removeAll(final L2Processorexchange, final QuoteSide side) { + protected void removeAll(final L2Processorexchange, final QuoteSide side) { final MarketSidemarketSide = exchange.getMarketSide(side); for (int i = 0; i < marketSide.depth(); i++) { final Quote remove = marketSide.getQuote(i); @@ -251,35 +254,38 @@ public void removeAll(final L2Processorexchange, final QuoteSide side) { } } - public void removeQuote(final Quote remove, final QuoteSide side) { + private void removeQuote(final Quote remove, final QuoteSide side) { final L2MarketSidemarketSide = getMarketSide(side); removeQuote(remove, marketSide); } - abstract boolean removeQuote(final Quote remove, final L2MarketSidemarketSide); + protected abstract boolean removeQuote(Quote remove, L2MarketSidemarketSide); - abstract L2ProcessorclearExchange(final L2Processorexchange); - - public L2ProcessorclearExchange(final long exchangeId) { - final L2Processorexchange = getOrCreateExchange(exchangeId); - return clearExchange(exchange); + protected L2ProcessorunmapQuote(final long exchangeId) { + final L2Processorexchange = getOrCreateExchange(exchangeId).get().getProcessor(); + return unmapQuote(exchange); } + protected abstract L2ProcessorunmapQuote(L2Processorexchange); + /** * Get stock exchange holder by id(create new if it does not exist). * * @param exchangeId - id of exchange. * @return exchange book by id. */ - public L2ProcessorgetOrCreateExchange(final long exchangeId) { + private Option>> getOrCreateExchange(@Alphanumeric final long exchangeId) { + if (!AlphanumericUtils.isValidAlphanumeric(exchangeId) || TypeConstants.EXCHANGE_NULL == exchangeId) { + //TODO LOG warning + return Option.empty(); + } final MutableExchangeList >> exchanges = this.getExchanges(); - Option >> exchangeHolder = exchanges.getById(exchangeId); - if (!exchangeHolder.hasValue()) { - final L2SingleExchangeQuoteProcessor processor = - new L2SingleExchangeQuoteProcessor<>(exchangeId, initialDepth, maxDepth, pool, gapMode, updateMode, unreachableDepthMode); + Option>> holder = exchanges.getById(exchangeId); + if (!holder.hasValue()) { + final L2Processor processor = new L2SingleExchangeQuoteProcessor<>(options, pool, exchangeId); exchanges.add(new MutableExchangeImpl<>(exchangeId, processor)); - exchangeHolder = exchanges.getById(exchangeId); + holder = exchanges.getById(exchangeId); } - return exchangeHolder.get().getProcessor(); + return holder; } } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL3MarketSide.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL3MarketSide.java new file mode 100644 index 0000000..c1ba577 --- /dev/null +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/AbstractL3MarketSide.java @@ -0,0 +1,317 @@ +/* + * Copyright 2021 EPAM Systems, Inc + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.epam.deltix.orderbook.core.impl; + +import com.epam.deltix.dfp.Decimal; +import com.epam.deltix.dfp.Decimal64Utils; +import com.epam.deltix.orderbook.core.api.EntryValidationCode; +import com.epam.deltix.orderbook.core.impl.collections.rbt.RBTree; +import com.epam.deltix.timebase.messages.universal.InsertType; +import com.epam.deltix.timebase.messages.universal.QuoteSide; +import com.epam.deltix.util.collections.CharSeqToObjMap; + +import java.util.*; + +import static com.epam.deltix.dfp.Decimal64Utils.*; +import static com.epam.deltix.orderbook.core.api.EntryValidationCode.*; + +/** + * @author Andrii_Ostapenko1 + */ +abstract class AbstractL3MarketSideimplements L3MarketSide{ + + protected final RBTreedata; + private final CharSeqToObjMapquoteHashMap; + private final ReusableIterator itr; + // This parameter is used to limit maximum elements and to understand whether the side is full or not. + private final int maxDepth; + private long virtualClock; + + AbstractL3MarketSide(final int initialCapacity, final int maxDepth) { + this.maxDepth = maxDepth; + this.data = new RBTree<>(initialCapacity, new QuoteComparator()); + this.itr = new ReusableIterator<>(); + this.quoteHashMap = new CharSeqToObjMap<>(); + virtualClock = 0; + } + + @Override + public int getMaxDepth() { + return maxDepth; + } + + @Override + public int depth() { + return data.size(); + } + + @Override + public long getTotalQuantity() { + @Decimal long result = ZERO; + for (final Quote quote : this) { + result = Decimal64Utils.add(result, quote.getSize()); + } + return result; + } + + /** + * Clears the market side in linear time + */ + @Override + public void clear() { + data.clear(); + quoteHashMap.clear(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public Quote getQuote(final int level) { + throw new UnsupportedOperationException(); + } + + @Override + public Quote getQuote(final CharSequence quoteId) { + return quoteHashMap.get(quoteId, null); + } + + @Override + public boolean add(final Quote insert) { + if (quoteHashMap.put(insert.getQuoteId(), insert)) { + insert.setSequenceNumber(virtualClock++); + data.put(insert, insert); + return true; + } + return false; + } + + @Override + public Quote remove(final CharSequence quoteId) { + final Quote result = quoteHashMap.remove(quoteId, null); + if (result != null) { + data.remove(result); + } + return result; + } + + @Override + public Quote remove(final Quote quote) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFull() { + return depth() == maxDepth; + } + + @Override + public Quote getBestQuote() { + if (isEmpty()) { + return null; + } + return data.firstKey(); + } + + @Override + public boolean hasLevel(final int level) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasQuote(final CharSequence quoteId) { + return quoteHashMap.containsKey(quoteId); + } + + @Override + public Quote getWorstQuote() { + if (isEmpty()) { + return null; + } + return data.lastKey(); + } + + /** + * @return error code, or null if everything is valid + */ + @Override + public EntryValidationCode isInvalidInsert(final InsertType type, + final CharSequence quoteId, + final @Decimal long price, + final @Decimal long size, + final QuoteSide side) { + if (type != InsertType.ADD_BACK) { + return UNSUPPORTED_INSERT_TYPE; + } + + if (side == null) { + return UNSPECIFIED_SIDE; + } + + if (quoteId == null || quoteId.length() == 0) { + return MISSING_QUOTE_ID; + } + + if (isNaN(price)) { + return MISSING_PRICE; + } + + if (isLessOrEqual(size, ZERO)) { + return BAD_SIZE; + } + + return null; // all good + } + + /** + * @return error code, or null if everything is valid + */ + @Override + public EntryValidationCode isInvalidUpdate(final Quote quote, + final CharSequence quoteId, + final @Decimal long price, + final @Decimal long size, + final QuoteSide side) { + if (side == null) { + return UNSPECIFIED_SIDE; + } + + if (quoteId == null || quoteId.length() == 0) { + return MISSING_QUOTE_ID; + } + + if (quote == null) { + return UNKNOWN_QUOTE_ID; + } + + if (isNotEqual(quote.getPrice(), price)) { + return MODIFY_CHANGE_PRICE; + } + + if (isLessOrEqual(size, ZERO)) { + return BAD_SIZE; + } + + if (Decimal64Utils.isLess(quote.getSize(), size)) { + return MODIFY_INCREASE_SIZE; + } + + return null; // all good + } + + @Override + public void buildFromSorted(final ArrayListquotes) { + data.buildFromSorted(quotes); + final int len = quotes.size(); + for (int i = 0; i < len; i++) { + final Quote quote = quotes.get(i); + quoteHashMap.put(quote.getQuoteId(), quote); + } + virtualClock = data.size(); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + for (final Quote quote : this) { + builder.append(quote).append("\n"); + } + return builder.toString(); + } + + @Override + public Iteratoriterator(final int fromLevel, final int toLevel) { + if (fromLevel != 0) { + throw new UnsupportedOperationException(); + } + itr.iterateBy(data); + return itr; + } + + /** + * An adapter to safely externalize the value iterator. + */ + static final class ReusableIteratorimplements Iterator{ + + private Iterator> iterator; + + private void iterateBy(final RBTree tm) { + Objects.requireNonNull(tm); + iterator = tm.iterator(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Quote next() { + return iterator.next().getValue(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read only iterator"); + } + } + + static class ASKSextends AbstractL3MarketSide{ + + ASKS(final int initialDepth, final int maxDepth) { + super(initialDepth, maxDepth); + } + + @Override + public QuoteSide getSide() { + return QuoteSide.ASK; + } + + } + + static class BIDSextends AbstractL3MarketSide{ + + BIDS(final int initialDepth, final int maxDepth) { + super(initialDepth, maxDepth); + } + + @Override + public QuoteSide getSide() { + return QuoteSide.BID; + } + + } + + class QuoteComparator implements Comparator{ + + @Override + public int compare(final Quote o1, final Quote o2) { + final int priceComp = Decimal64Utils.compareTo(o1.getPrice(), o2.getPrice()); + if (priceComp == 0) { + return Long.compare(o1.getSequenceNumber(), o2.getSequenceNumber()); + } + if (getSide() == QuoteSide.ASK) { + return priceComp; + } else { + return -priceComp; + } + } + } + +} diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactAbstractL2MarketSide.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactAbstractL2MarketSide.java new file mode 100644 index 0000000..1d77f51 --- /dev/null +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactAbstractL2MarketSide.java @@ -0,0 +1,313 @@ +/* + * Copyright 2021 EPAM Systems, Inc + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.epam.deltix.orderbook.core.impl; + +import com.epam.deltix.dfp.Decimal; +import com.epam.deltix.dfp.Decimal64Utils; +import com.epam.deltix.orderbook.core.api.MarketSide; +import com.epam.deltix.timebase.messages.universal.BookUpdateAction; +import com.epam.deltix.timebase.messages.universal.QuoteSide; +import com.epam.deltix.util.annotations.Alphanumeric; + +import java.util.Iterator; +import java.util.Objects; + +import static com.epam.deltix.dfp.Decimal64Utils.*; +import static com.epam.deltix.timebase.messages.TypeConstants.EXCHANGE_NULL; + +/** + * @author Andrii_Ostapenko1 + */ +abstract class CompactAbstractL2MarketSideimplements CompactL2MarketSide{ + + protected final long[] data; + private final Quote holder = (Quote) new MutableOrderBookQuoteImpl(); + private final ReusableIteratoritr; + // This parameter is used to limit maximum elements and to understand whether the side is full or not. + private final int maxDepth; + private int depth; + + CompactAbstractL2MarketSide(final int maxDepth) { + this.maxDepth = maxDepth; + this.data = new long[maxDepth << 1]; + depth = 0; + this.itr = new ReusableIterator<>(); + } + + @Override + public int getMaxDepth() { + return maxDepth; + } + + @Override + public int depth() { + return depth; + } + + @Override + public long getTotalQuantity() { + @Decimal long result = ZERO; + final int len = depth << 1; + for (int i = 1; i < len; i += 2) { + result = Decimal64Utils.add(result, data[i]); + } + return result; + } + + @Override + public void clear() { + depth = 0; + } + + @Override + public boolean isEmpty() { + return depth == 0; + } + + @Override + public Quote getQuote(final int level) { + if (!hasLevel(level)) { + return null; + } + final int idx = level << 1; + holder.setPrice(data[idx]); + holder.setSize(data[idx + 1]); + return holder; + } + + @Override + public void add(final int level, final long price, final long size) { + final int idx = level << 1; + if (level < depth) { + System.arraycopy(data, idx, data, idx + 2, (depth << 1) - idx); + } + data[idx] = price; + data[idx + 1] = size; + ++depth; + } + + public void set(final int level, final long price, final long size) { + if (level == depth) { + ++depth; + } + final int idx = level << 1; + data[idx] = price; + data[idx + 1] = size; + } + + @Override + public void remove(final int level) { + if (hasLevel(level)) { + final int idx = level << 1; + if (level < depth) { + System.arraycopy(data, idx + 2, data, idx, (depth << 1) - idx - 2); + } + --depth; + } + } + + @Override + public boolean hasLevel(final int level) { + return depth > level && level >= 0; + } + + @Override + public void removeWorstQuote() { + --depth; + } + + @Override + public boolean isFull() { + return depth == maxDepth; + } + + @Override + public boolean isGap(final int level) { + return !hasLevel(level) && level > depth; + } + + @Override + public boolean isUnreachableLeve(int level) { + return level < 0 || level >= maxDepth; + } + + @Override + public Quote getBestQuote() { + return getQuote(0); + } + + @Override + public Quote getWorstQuote() { + return getQuote(depth - 1); + } + + @Override + public boolean isInvalidInsert(final int level, + final @Decimal long price, + final @Decimal long size, + final @Alphanumeric long exchangeId) { + if (level < 0 || isEqual(price, NULL) || isLessOrEqual(size, ZERO) || exchangeId == EXCHANGE_NULL) { + return true; + } + if (isUnreachableLeve(level)) { + return true; + } + if (isGap(level)) { + return true; + } + return !checkOrderPrice(level, price); + } + + @Override + public boolean isInvalidUpdate(final BookUpdateAction action, + final int level, + final @Decimal long price, + final @Decimal long size, + final @Alphanumeric long exchangeId) { + if (!hasLevel(level)) { + return true; + } + if (action != BookUpdateAction.DELETE) { + return isNotEqual(data[level << 1], price) || isLess(size, ZERO); + } + return false; + } + + @Override + public boolean checkOrderPrice(final int level, final @Decimal long price) { + + @Decimal final long previousPrice = hasLevel(level - 1) ? data[(level - 1) << 1] : NULL; + @Decimal final long nextPrice = hasLevel(level) ? data[level << 1] : NULL; + + boolean badState = false; + if (getSide() == QuoteSide.ASK) { + if (isNotEqual(previousPrice, NULL) && isGreater(previousPrice, price)) { + badState = true; + } + if (isNotEqual(nextPrice, NULL) && isLess(nextPrice, price)) { + badState = true; + } + } else { + if (isNotEqual(previousPrice, NULL) && isLess(previousPrice, price)) { + badState = true; + } + if (isNotEqual(nextPrice, NULL) && isGreater(nextPrice, price)) { + badState = true; + } + } + return !badState; + } + + @Override + public boolean validateState() { + if (isEmpty()) { + return true; + } + final int len = depth << 1; + for (int i = 0; i < len; i += 2) { + if (isInvalidInsert(i >> 1, data[i], data[i + 1], EXCHANGE_NULL ^ 1)) { + return false; + } + } + return true; + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + for (final Quote quote : this) { + builder.append(quote).append(" "); + } + return builder.toString(); + } + + @Override + public Iteratoriterator(final int fromLevel, final int toLevel) { + itr.iterateBy(this, fromLevel, toLevel); + return itr; + } + + /** + * An adapter to safely externalize the value iterator. + */ + static final class ReusableIteratorimplements Iterator{ + + /** + * Index of element to be returned by subsequent call to next. + */ + private int cursor; + + private int end; + + private MarketSidemarketSide; + + private void iterateBy(final MarketSidemarketSide, final int cursor, final int end) { + Objects.requireNonNull(marketSide); + this.marketSide = marketSide; + this.cursor = cursor; + if (end > marketSide.depth() || end < 0) { + this.end = marketSide.depth(); + } else { + this.end = end; + } + if (cursor > end || cursor < 0) { + this.cursor = end; + } + } + + @Override + public boolean hasNext() { + return cursor != end; + } + + @Override + public Quote next() { + return marketSide.getQuote(cursor++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read only iterator"); + } + } + + static class ASKextends CompactAbstractL2MarketSide{ + + ASK(final int maxDepth) { + super(maxDepth); + } + + @Override + public QuoteSide getSide() { + return QuoteSide.ASK; + } + + } + + static class BIDextends CompactAbstractL2MarketSide{ + + BID(final int maxDepth) { + super(maxDepth); + } + + @Override + public QuoteSide getSide() { + return QuoteSide.BID; + } + + } +} diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2MarketSide.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2MarketSide.java new file mode 100644 index 0000000..0d42de8 --- /dev/null +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2MarketSide.java @@ -0,0 +1,160 @@ +/* + * Copyright 2021 EPAM Systems, Inc + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.epam.deltix.orderbook.core.impl; + +import com.epam.deltix.dfp.Decimal; +import com.epam.deltix.orderbook.core.api.MarketSide; +import com.epam.deltix.timebase.messages.universal.BookUpdateAction; +import com.epam.deltix.timebase.messages.universal.QuoteSide; +import com.epam.deltix.util.annotations.Alphanumeric; + +import java.util.Objects; + +/** + * @author Andrii_Ostapenko1 + */ +interface CompactL2MarketSideextends MarketSide{ + + staticCompactL2MarketSidefactory(final int maxDepth, + final QuoteSide side) { + Objects.requireNonNull(side); + switch (side) { + case BID: + return new CompactAbstractL2MarketSide.BID<>(maxDepth); + case ASK: + return new CompactAbstractL2MarketSide.ASK<>(maxDepth); + default: + throw new IllegalStateException("Unexpected value: " + side); + } + } + + @Override + default Quote getQuote(final CharSequence quoteId) { + // Not supported for L2 + return null; + } + + @Override + default boolean hasQuote(final CharSequence quoteId) { + // Not supported for L2 + return false; + } + + /** + * Inserts the specified quote at the specified level and shifts the quotes right. + * + * @param level - the level at which quote needs to be inserted. + * @param price - the price of the quote. + * @param size - the size of the quote. + */ + void add(int level, long price, long size); + + /** + * Returns the maximum depth of this market side. + * + * @return the maximum depth. + */ + int getMaxDepth(); + + /** + * Removes the worst quote from this market side. + */ + void removeWorstQuote(); + + /** + * Remove quote by level. + * Shifts any subsequent elements to the left. + * + * @param level - the level of the quote to be removed + */ + void remove(int level); + + /** + * Returns true if this market side if full. + * + * @return true if this market side if full. + */ + boolean isFull(); + + /** + * Verifies ability insert or update quote by market side. + * + * @param level - quote level to use + * @return true if this quote level is unexpected + */ + boolean isGap(int level); + + + boolean isUnreachableLeve(int level); + + /** + * Checks if the specified price is sorted. + * + * @param level - quote level to use + * @param price - price to be checked + * @return true if this price is sorted. + */ + boolean checkOrderPrice(int level, @Decimal long price); + + void clear(); + + /** + * Validates the state of this market side. + * + * @return true if the market side state is valid, false otherwise. + */ + boolean validateState(); + + /** + * Sets a quote at given level with provided price and size. + * + * @param level - level at which the quote should be set. + * @param price - price of the quote. + * @param size - size of the quote. + */ + void set(int level, long price, long size); + + /** + * Checks if inserting a quote at given level with provided price, size and exchangeId would be invalid. + * + * @param level - level to check + * @param price - price to check + * @param size - size to check + * @param exchangeId - exchangeId to check + * @return true if the insert operation would be invalid, false otherwise. + */ + boolean isInvalidInsert(int level, + @Decimal long price, + @Decimal long size, + @Alphanumeric long exchangeId); + + /** + * Checks if updating a quote with given action, level, price, size and exchangeId would be invalid. + * + * @param action - action to check + * @param level - level to check + * @param price - price to check + * @param size - size to check + * @param exchangeId - exchangeId to check + * @return true if the update operation would be invalid, false otherwise. + */ + boolean isInvalidUpdate(BookUpdateAction action, + int level, + @Decimal long price, + @Decimal long size, + @Alphanumeric long exchangeId); +} diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2Processor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2Processor.java new file mode 100644 index 0000000..34ed9b2 --- /dev/null +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2Processor.java @@ -0,0 +1,96 @@ +/* + * Copyright 2021 EPAM Systems, Inc + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.epam.deltix.orderbook.core.impl; + + +import com.epam.deltix.timebase.messages.universal.*; +import com.epam.deltix.util.collections.generated.ObjectList; + +/** + * Processor for L2 universal market data format that included in package (Package Header). + * + * @author Andrii_Ostapenko + */ +interface CompactL2Processorextends QuoteProcessor{ + + @Override + default DataModelType getQuoteLevels() { + return DataModelType.LEVEL_TWO; + } + + @Override + CompactL2MarketSidegetMarketSide(QuoteSide side); + + @Override + MutableExchangeList>> getExchanges(); + + /** + * This type reports incremental Level2-new: insert one line in Order Book either on ask or bid side. + * + * @param pck + * @param msg - Level2-new entry + * @return insert quote + */ + Quote processL2EntryNew(PackageHeaderInfo pck, L2EntryNewInfo msg); + + /** + * This type reports incremental Level2-update: update or delete one line in Order Book either on ask or bid side. + * + * @param pck + * @param msg - Level2-update + * @return true if quote was updated, false if quote was not found + */ + boolean processL2EntryUpdate(PackageHeaderInfo pck, L2EntryUpdateInfo msg); + + boolean processL2Snapshot(PackageHeaderInfo msg); + + boolean isWaitingForSnapshot(); + + boolean isSnapshotAllowed(PackageHeaderInfo msg); + + default boolean processIncrementalUpdate(PackageHeaderInfo pck, final BaseEntryInfo entryInfo) { + if (entryInfo instanceof L2EntryNew) { + final L2EntryNew entry = (L2EntryNew) entryInfo; + return processL2EntryNew(pck, entry) != null; + } else if (entryInfo instanceof L2EntryUpdate) { + final L2EntryUpdate entry = (L2EntryUpdate) entryInfo; + return processL2EntryUpdate(pck, entry); + } else if (entryInfo instanceof StatisticsEntry) { + return true; + } + return false; + } + + default boolean processSnapshot(final PackageHeaderInfo msg) { + final ObjectList entries = msg.getEntries(); + final int n = entries.size(); + // skip statistic entries try to establish if we are dealing with order book reset or normal snapshot + for (int i = 0; i < n; i++) { + final BaseEntryInfo entry = entries.get(i); + if (entry instanceof L2EntryNewInfo) { + return processL2Snapshot(msg); + } else if (entry instanceof BookResetEntryInfo) { + final BookResetEntryInfo resetEntry = (BookResetEntryInfo) entry; + if (resetEntry.getModelType() == getQuoteLevels()) { + return processBookResetEntry(msg, resetEntry); + } + } + } + return false; + } + +} diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2SingleExchangeQuoteProcessor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2SingleExchangeQuoteProcessor.java new file mode 100644 index 0000000..ea11044 --- /dev/null +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/CompactL2SingleExchangeQuoteProcessor.java @@ -0,0 +1,283 @@ +/* + * Copyright 2021 EPAM Systems, Inc + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.epam.deltix.orderbook.core.impl; + +import com.epam.deltix.containers.AlphanumericUtils; +import com.epam.deltix.orderbook.core.options.*; +import com.epam.deltix.timebase.messages.TypeConstants; +import com.epam.deltix.timebase.messages.service.FeedStatus; +import com.epam.deltix.timebase.messages.service.SecurityFeedStatusMessage; +import com.epam.deltix.timebase.messages.universal.*; +import com.epam.deltix.util.annotations.Alphanumeric; +import com.epam.deltix.util.collections.generated.ObjectList; + +import static com.epam.deltix.timebase.messages.universal.QuoteSide.ASK; +import static com.epam.deltix.timebase.messages.universal.QuoteSide.BID; + + +/** + * @author Andrii_Ostapenko1 + */ +public class CompactL2SingleExchangeQuoteProcessor implements CompactL2Processor{ + + protected final CompactL2MarketSidebids; + protected final CompactL2MarketSideasks; + private final MutableExchangeList>> exchanges; + + private final EventHandler eventHandler; + + //Parameters + private final ValidationOptions validationOptions; + private final DisconnectMode disconnectMode; + + public CompactL2SingleExchangeQuoteProcessor(final OrderBookOptions options) { + this.disconnectMode = options.getDisconnectMode().orElse(Defaults.DISCONNECT_MODE); + this.validationOptions = options.getInvalidQuoteMode().orElse(Defaults.VALIDATION_OPTIONS); + this.eventHandler = new EventHandlerImpl(options); + + this.exchanges = new MutableExchangeListImpl<>(); + + final int maxDepth = options.getMaxDepth().orElse(Defaults.MAX_DEPTH); + this.asks = CompactL2MarketSide.factory(maxDepth, ASK); + this.bids = CompactL2MarketSide.factory(maxDepth, BID); + } + + @Override + public String getDescription() { + return "Compact L2/Single exchange"; + } + + @Override + public Quote processL2EntryNew(final PackageHeaderInfo pck, final L2EntryNewInfo msg) { + final long exchangeId = msg.getExchangeId(); + final Option >> exchange = getOrCreateExchange(exchangeId); + if (!exchange.hasValue()) { + return null; + } + + if (exchange.get().getProcessor().isWaitingForSnapshot()) { + return null; + } + + final QuoteSide side = msg.getSide(); + final CompactL2MarketSide marketSide = exchange.get().getProcessor().getMarketSide(side); + final int level = msg.getLevel(); + + if (marketSide.isInvalidInsert(level, msg.getPrice(), msg.getSize(), exchangeId)) { + if (validationOptions.isQuoteInsert()) { + clear(); + eventHandler.onBroken(); + } + return null; + } + + if (marketSide.isFull()) { + marketSide.removeWorstQuote(); + } + marketSide.add(level, msg.getPrice(), msg.getSize()); + return marketSide.getQuote(level); + } + + @Override + public boolean processL2EntryUpdate(final PackageHeaderInfo pck, final L2EntryUpdateInfo msg) { + final QuoteSide side = msg.getSide(); + final int level = msg.getLevel(); + @Alphanumeric final long exchangeId = msg.getExchangeId(); + final BookUpdateAction action = msg.getAction(); + + final Option>> exchange = getExchanges().getById(exchangeId); + if (!exchange.hasValue()) { + return false; + } + + if (exchange.get().getProcessor().isWaitingForSnapshot()) { + return false; + } + + final CompactL2MarketSide marketSide = exchange.get().getProcessor().getMarketSide(side); + if (marketSide.isInvalidUpdate(action, level, msg.getPrice(), msg.getSize(), exchangeId)) { + if (validationOptions.isQuoteUpdate()) { + clear(); + eventHandler.onBroken(); + return false; + } + return true; // skip invalid update + } + + if (action == BookUpdateAction.DELETE) { + marketSide.remove(level); + } else if (action == BookUpdateAction.UPDATE) { + marketSide.set(level, msg.getPrice(), msg.getSize()); + } + return true; + } + + @Override + public boolean processL2Snapshot(final PackageHeaderInfo pck) { + if (!isSnapshotAllowed(pck)) { + return false; + } + + final ObjectListentries = pck.getEntries(); + + clear(); + int askCnt = 0; + int bidCnt = 0; + for (int i = 0; i < entries.size(); i++) { + final BaseEntryInfo e = entries.get(i); + if (e instanceof L2EntryNewInterface) { + final L2EntryNewInterface entry = (L2EntryNewInterface) e; + + final int level = entry.getLevel(); + final QuoteSide side = entry.getSide(); + @Alphanumeric final long exchangeId = entry.getExchangeId(); + + // We expect that exchangeId is valid and all entries have the same exchangeId + final Option >> exchange = getOrCreateExchange(exchangeId); + if (!exchange.hasValue()) { + clear(); + eventHandler.onBroken(); + return false; + } + + final CompactL2MarketSide marketSide = exchange.get().getProcessor().getMarketSide(side); + + // Both side have the same max depth + final int maxDepth = marketSide.getMaxDepth(); + if ((side == ASK && askCnt == maxDepth) || (side == BID && bidCnt == maxDepth)) { + continue; + } + marketSide.add(level, entry.getPrice(), entry.getSize()); + + if (side == ASK) { + askCnt++; + } else { + bidCnt++; + } + + if (askCnt == maxDepth && bidCnt == maxDepth) { + break; + } + } + } + + //Validate state after snapshot + //We believe that snapshot is valid, but... + if (!asks.validateState() || !bids.validateState()) { + clear(); + eventHandler.onBroken(); + return false; + } + + eventHandler.onSnapshot(); + return true; + } + + @Override + public boolean processBookResetEntry(final PackageHeaderInfo pck, final BookResetEntryInfo msg) { + @Alphanumeric final long exchangeId = msg.getExchangeId(); + final Option>> exchange = getOrCreateExchange(exchangeId); + + if (exchange.hasValue()) { + clear(); + eventHandler.onReset(); + return true; + } else { + return false; + } + } + + @Override + public boolean processSecurityFeedStatus(final SecurityFeedStatusMessage msg) { + if (msg.getStatus() == FeedStatus.NOT_AVAILABLE) { + if (disconnectMode == DisconnectMode.CLEAR_EXCHANGE) { + @Alphanumeric final long exchangeId = msg.getExchangeId(); + final Option >> exchange = getOrCreateExchange(exchangeId); + if (exchange.hasValue()) { + clear(); + eventHandler.onDisconnect(); + return true; + } + } + } + return false; + } + + @Override + public MutableExchangeList >> getExchanges() { + return exchanges; + } + + @Override + public CompactL2MarketSide getMarketSide(final QuoteSide side) { + return side == BID ? bids : asks; + } + + @Override + public void clear() { + releaseAndClean(asks); + releaseAndClean(bids); + } + + @Override + public boolean isEmpty() { + return asks.isEmpty() && bids.isEmpty(); + } + + /** + * Check if snapshot is available for processing. + * + * @param msg - snapshot message + * @return true if snapshot is available for processing + */ + @Override + public boolean isSnapshotAllowed(final PackageHeaderInfo msg) { + final PackageType type = msg.getPackageType(); + return eventHandler.isSnapshotAllowed(type); + } + + @Override + public boolean isWaitingForSnapshot() { + return eventHandler.isWaitingForSnapshot(); + } + + private void releaseAndClean(final CompactL2MarketSideside) { + side.clear(); + } + + /** + * Get stock exchange holder by id(create new if it does not exist). + * You can create only one exchange. + * + * @param exchangeId - id of exchange. + * @return exchange book by id. + */ + private Option>> getOrCreateExchange(@Alphanumeric final long exchangeId) { + if (!AlphanumericUtils.isValidAlphanumeric(exchangeId) || TypeConstants.EXCHANGE_NULL == exchangeId) { + return Option.empty(); + } + + if (!exchanges.isEmpty()) { + return exchanges.getById(exchangeId); + } + final MutableExchange > exchange = new MutableExchangeImpl<>(exchangeId, this); + exchanges.add(exchange); + return exchanges.getById(exchangeId); + } + +} + diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/EventHandler.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/EventHandler.java new file mode 100644 index 0000000..b0efb60 --- /dev/null +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/EventHandler.java @@ -0,0 +1,43 @@ +package com.epam.deltix.orderbook.core.impl; + + +import com.epam.deltix.timebase.messages.universal.PackageType; + +/** + * Callback interface to be implemented for processing events as they become available in the {@link deltix.orderbook.core.api.OrderBook} + */ +interface EventHandler { + /** + * Check if snapshot type is available for processing. + * + * @param type snapshot type + * @return true if snapshot is available for processing + */ + boolean isSnapshotAllowed(PackageType type); + + /** + * @return true if we can't process incremental update messages but waiting for next snapshot message + */ + boolean isWaitingForSnapshot(); + + /** + * Call this method when book lost connection with feed. + */ + void onDisconnect(); + + /** + * Call this method when book received reset entry. + */ + void onReset(); + + /** + * Call this method when book received snapshot. + */ + void onSnapshot(); + + + /** + * Call this method when book received invalid date. + */ + void onBroken(); +} diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/EventHandlerImpl.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/EventHandlerImpl.java new file mode 100644 index 0000000..f17fdd2 --- /dev/null +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/EventHandlerImpl.java @@ -0,0 +1,79 @@ +package com.epam.deltix.orderbook.core.impl; + + +import com.epam.deltix.orderbook.core.options.*; +import com.epam.deltix.timebase.messages.universal.PackageType; + +import static com.epam.deltix.timebase.messages.universal.PackageType.PERIODICAL_SNAPSHOT; +import static com.epam.deltix.timebase.messages.universal.PackageType.VENDOR_SNAPSHOT; + +class EventHandlerImpl implements EventHandler { + + /** + * This parameter using for handle periodical snapshot entry and depends on {@link PeriodicalSnapshotMode}. + */ + private boolean isPeriodicalSnapshotAllowed; + + /** + * This parameter using for handle incremental update entry and depends on {@link UpdateMode} and {@link ResetMode}. + */ + private boolean isWaitingForSnapshot; + + //Parameters + private final UpdateMode updateMode; + private final ResetMode resetMode; + private final PeriodicalSnapshotMode periodicalSnapshotMode; + private final DisconnectMode disconnectMode; + + EventHandlerImpl(final OrderBookOptions options) { + this.disconnectMode = options.getDisconnectMode().orElse(Defaults.DISCONNECT_MODE); + this.updateMode = options.getUpdateMode().orElse(Defaults.UPDATE_MODE); + this.resetMode = options.getResetMode().orElse(Defaults.RESET_MODE); + this.periodicalSnapshotMode = options.getPeriodicalSnapshotMode().orElse(Defaults.PERIODICAL_SNAPSHOT_MODE); + + this.isPeriodicalSnapshotAllowed = periodicalSnapshotMode != PeriodicalSnapshotMode.SKIP_ALL; + this.isWaitingForSnapshot = (updateMode == UpdateMode.WAITING_FOR_SNAPSHOT); + } + + @Override + public boolean isSnapshotAllowed(final PackageType type) { + if (type == null) { + return false; + } + + if (type == VENDOR_SNAPSHOT) { + return true; + } + + return isPeriodicalSnapshotAllowed && type == PERIODICAL_SNAPSHOT; + } + + @Override + public boolean isWaitingForSnapshot() { + return isWaitingForSnapshot; + } + + @Override + public void onDisconnect() { + isWaitingForSnapshot = (updateMode == UpdateMode.WAITING_FOR_SNAPSHOT); //TODO: review: switch to (disconnectMode == CLEAR_EXCHANGE) ? + isPeriodicalSnapshotAllowed = periodicalSnapshotMode != PeriodicalSnapshotMode.SKIP_ALL; + } + + @Override + public void onReset() { + isWaitingForSnapshot = (resetMode == ResetMode.WAITING_FOR_SNAPSHOT); + isPeriodicalSnapshotAllowed = periodicalSnapshotMode != PeriodicalSnapshotMode.SKIP_ALL; + } + + @Override + public void onSnapshot() { + isWaitingForSnapshot = false; + isPeriodicalSnapshotAllowed = periodicalSnapshotMode == PeriodicalSnapshotMode.PROCESS_ALL; + } + + @Override + public void onBroken() { + isWaitingForSnapshot = true; + isPeriodicalSnapshotAllowed = periodicalSnapshotMode != PeriodicalSnapshotMode.SKIP_ALL; + } +} diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1MarketSide.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1MarketSide.java index 7ad3a2a..61e2a07 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1MarketSide.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1MarketSide.java @@ -16,6 +16,7 @@ */ package com.epam.deltix.orderbook.core.impl; + import com.epam.deltix.orderbook.core.api.MarketSide; import com.epam.deltix.timebase.messages.universal.QuoteSide; @@ -38,6 +39,18 @@ staticL1MarketSidefactory(final Q } } + @Override + default Quote getQuote(final CharSequence quoteId) { + // Not supported for L1 + return null; + } + + @Override + default boolean hasQuote(final CharSequence quoteId) { + // Not supported for L1 + return false; + } + void insert(Quote insert); void clear(); diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1OrderBookFactory.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1OrderBookFactory.java index c32bd44..822db86 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1OrderBookFactory.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1OrderBookFactory.java @@ -16,10 +16,10 @@ */ package com.epam.deltix.orderbook.core.impl; + import com.epam.deltix.orderbook.core.api.OrderBook; import com.epam.deltix.orderbook.core.api.OrderBookQuote; -import com.epam.deltix.orderbook.core.options.Option; -import com.epam.deltix.orderbook.core.options.UpdateMode; +import com.epam.deltix.orderbook.core.options.OrderBookOptions; /** * A factory that implements order book for Level1. @@ -32,19 +32,27 @@ @SuppressWarnings("unchecked") public class L1OrderBookFactory { + /** + * Prevents instantiation + */ + protected L1OrderBookFactory() { + } + /** * Creates OrderBook for single exchange market feed. * - * @param- type of quote - * @param symbol - type of symbol - * @param updateMode - modes of order book work. Waiting first snapshot don't apply incremental updates before it or no. + * @param- type of quote. + * @param options - to use. * @return order book */ - public staticOrderBooknewSingleExchangeBook(final Optionsymbol, final UpdateMode updateMode) { + public static OrderBooknewSingleExchangeBook(final OrderBookOptions options) { final int initialSize = 2; - final ObjectPoolpool = new ObjectPool<>(initialSize, MutableOrderBookQuoteImpl::new); - final QuoteProcessor processor = new L1SingleExchangeQuoteProcessor<>(pool, updateMode); - return (OrderBook ) new OrderBookDecorator<>(symbol, processor); + + final ObjectPool extends MutableOrderBookQuote> pool = (ObjectPool extends MutableOrderBookQuote>) + options.getSharedObjectPool().orElse(QuotePoolFactory.create(options, initialSize)); + + final QuoteProcessor extends MutableOrderBookQuote> processor = new L1SingleExchangeQuoteProcessor<>(options, pool); + return (OrderBook) new OrderBookDecorator<>(options.getSymbol(), processor); } } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1Processor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1Processor.java index 1f1c2e1..4659030 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1Processor.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1Processor.java @@ -16,6 +16,7 @@ */ package com.epam.deltix.orderbook.core.impl; + import com.epam.deltix.timebase.messages.universal.*; import com.epam.deltix.util.collections.generated.ObjectList; @@ -24,7 +25,7 @@ * * @author Andrii_Ostapenko */ -interface L1Processorextends QuoteProcessor, ResetEntryProcessor { +interface L1Processorextends QuoteProcessor{ @Override default DataModelType getQuoteLevels() { @@ -32,22 +33,26 @@ default DataModelType getQuoteLevels() { } @Override - L1MarketSidegetMarketSide(final QuoteSide side); + L1MarketSidegetMarketSide(QuoteSide side); /** * This type reports Level1-new: insert or update one line in Order Book either on ask or bid side. * + * @param pck - package header container * @param l1EntryNewInfo - Level1-new entry to use * @return insert quote */ - Quote processL1EntryNewInfo(final L1EntryInfo l1EntryNewInfo); + Quote processL1EntryNew(PackageHeaderInfo pck, L1EntryInfo l1EntryNewInfo); - void processL1VendorSnapshot(final PackageHeaderInfo marketMessageInfo); + boolean processL1Snapshot(PackageHeaderInfo marketMessageInfo); - default boolean process(final BaseEntryInfo pck) { - if (pck instanceof L1EntryInfo) { - final L1EntryInfo l1EntryNewInfo = (L1EntryInfo) pck; - processL1EntryNewInfo(l1EntryNewInfo); + default boolean processIncrementalUpdate(final PackageHeaderInfo pck, final BaseEntryInfo entryInfo) { + if (entryInfo instanceof L1EntryInfo) { + final L1EntryInfo l1EntryNewInfo = (L1EntryInfo) entryInfo; + //TODO need processing return value + processL1EntryNew(pck, l1EntryNewInfo); + return true; + } else if (entryInfo instanceof StatisticsEntry) { return true; } return false; @@ -55,15 +60,18 @@ default boolean process(final BaseEntryInfo pck) { default boolean processSnapshot(final PackageHeaderInfo marketMessageInfo) { final ObjectListentries = marketMessageInfo.getEntries(); - final BaseEntryInfo baseEntryInfo = entries.get(0); - if (baseEntryInfo instanceof L1EntryInfo) { - processL1VendorSnapshot(marketMessageInfo); - return true; - } else if (baseEntryInfo instanceof BookResetEntryInfo) { - final BookResetEntryInfo resetEntryInfo = (BookResetEntryInfo) baseEntryInfo; - if (resetEntryInfo.getModelType() == getQuoteLevels()) { - processBookResetEntry(resetEntryInfo); - return true; + final int n = entries.size(); + // skip statistic entries try to establish if we are dealing with order book reset or normal snapshot + for (int i = 0; i < n; i++) { + final BaseEntryInfo baseEntryInfo = entries.get(i); + if (baseEntryInfo instanceof L1EntryInfo) { + return processL1Snapshot(marketMessageInfo); + } else if (baseEntryInfo instanceof BookResetEntryInfo) { + final BookResetEntryInfo resetEntryInfo = (BookResetEntryInfo) baseEntryInfo; + if (resetEntryInfo.getModelType() == getQuoteLevels()) { + processBookResetEntry(marketMessageInfo, resetEntryInfo); + return true; + } } } return false; diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1SingleExchangeQuoteProcessor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1SingleExchangeQuoteProcessor.java index 7a10f58..994c513 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1SingleExchangeQuoteProcessor.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L1SingleExchangeQuoteProcessor.java @@ -16,10 +16,16 @@ */ package com.epam.deltix.orderbook.core.impl; + import com.epam.deltix.orderbook.core.api.ExchangeList; +import com.epam.deltix.orderbook.core.options.Defaults; +import com.epam.deltix.orderbook.core.options.DisconnectMode; import com.epam.deltix.orderbook.core.options.Option; -import com.epam.deltix.orderbook.core.options.UpdateMode; +import com.epam.deltix.orderbook.core.options.OrderBookOptions; +import com.epam.deltix.timebase.messages.service.FeedStatus; +import com.epam.deltix.timebase.messages.service.SecurityFeedStatusMessage; import com.epam.deltix.timebase.messages.universal.*; +import com.epam.deltix.util.annotations.Alphanumeric; import com.epam.deltix.util.collections.generated.ObjectList; /** @@ -27,26 +33,23 @@ */ class L1SingleExchangeQuoteProcessor implements L1Processor{ + private final ObjectPoolpool; + protected final L1MarketSidebids; protected final L1MarketSideasks; - private final MutableExchangeList>> exchanges; - // Parameters - private final UpdateMode updateMode; - private final ObjectPool pool; + private final EventHandler eventHandler; - /** - * This parameter using for handle book reset entry. - * - * @see QuoteProcessor#isWaitingForSnapshot() - */ - private boolean isWaitingForSnapshot = false; + // Parameters + private final DisconnectMode disconnectMode; - L1SingleExchangeQuoteProcessor(final ObjectPoolpool, - final UpdateMode updateMode) { + L1SingleExchangeQuoteProcessor(final OrderBookOptions options, + final ObjectPoolpool) { this.pool = pool; - this.updateMode = updateMode; + this.disconnectMode = options.getDisconnectMode().orElse(Defaults.DISCONNECT_MODE); + this.eventHandler = new EventHandlerImpl(options); + this.asks = L1MarketSide.factory(QuoteSide.ASK); this.bids = L1MarketSide.factory(QuoteSide.BID); this.exchanges = new MutableExchangeListImpl<>(); @@ -74,8 +77,8 @@ public boolean isEmpty() { } @Override - public Quote processL1EntryNewInfo(final L1EntryInfo l1EntryNewInfo) { - final long exchangeId = l1EntryNewInfo.getExchangeId(); + public Quote processL1EntryNew(final PackageHeaderInfo pck, final L1EntryInfo msg) { + @Alphanumeric final long exchangeId = msg.getExchangeId(); final Option>> exchange = getOrCreateExchange(exchangeId); if (!exchange.hasValue()) { // TODO add null check @@ -86,16 +89,7 @@ public Quote processL1EntryNewInfo(final L1EntryInfo l1EntryNewInfo) { return null; } - if (exchange.get().getProcessor().isEmpty()) { - switch (updateMode) { - case WAITING_FOR_SNAPSHOT: - return null; // Todo ADD null check!! - case NON_WAITING_FOR_SNAPSHOT: - break; - } - } - - final QuoteSide side = l1EntryNewInfo.getSide(); + final QuoteSide side = msg.getSide(); final L1MarketSide marketSide = exchange.get().getProcessor().getMarketSide(side); final Quote quote; @@ -105,24 +99,29 @@ public Quote processL1EntryNewInfo(final L1EntryInfo l1EntryNewInfo) { } else { quote = marketSide.getBestQuote(); } - updateByL1EntryNew(quote, l1EntryNewInfo); + quote.copyFrom(pck, msg); return quote; } @Override // TODO add validation for exchange id - public void processL1VendorSnapshot(final PackageHeaderInfo marketMessageInfo) { - final ObjectListentries = marketMessageInfo.getEntries(); + public boolean processL1Snapshot(final PackageHeaderInfo pck) { + if (!eventHandler.isSnapshotAllowed(pck.getPackageType())) { + return false; + } + + // We expect that all entries are sorted by exchange id + final ObjectList entries = pck.getEntries(); for (int i = 0; i < entries.size(); i++) { - final BaseEntryInfo pck = entries.get(i); - final L1EntryInfo l1EntryInfo = (L1EntryInfo) pck; - final QuoteSide side = l1EntryInfo.getSide(); - final long exchangeId = l1EntryInfo.getExchangeId(); + final BaseEntryInfo entryInfo = entries.get(i); + final L1EntryInfo entry = (L1EntryInfo) entryInfo; + final QuoteSide side = entry.getSide(); + @Alphanumeric final long exchangeId = entry.getExchangeId(); final Option >> exchange = getOrCreateExchange(exchangeId); if (!exchange.hasValue()) { // TODO Log error and throw exception or add package validation - continue; + return false; } final L1MarketSide marketSide = exchange.get().getProcessor().getMarketSide(side); @@ -134,33 +133,46 @@ public void processL1VendorSnapshot(final PackageHeaderInfo marketMessageInfo) { } else { quote = marketSide.getBestQuote(); } - updateByL1EntryNew(quote, l1EntryInfo); + quote.copyFrom(pck, entry); } - notWaitingForSnapshot(); + eventHandler.onSnapshot(); + return true; } @Override public boolean isWaitingForSnapshot() { - return isWaitingForSnapshot; + return eventHandler.isWaitingForSnapshot(); } - private void waitingForSnapshot() { - if (!isWaitingForSnapshot()) { - isWaitingForSnapshot = true; - } - } + @Override + public boolean processBookResetEntry(final PackageHeaderInfo pck, final BookResetEntryInfo msg) { + @Alphanumeric final long exchangeId = msg.getExchangeId(); + final Option>> exchange = getOrCreateExchange(exchangeId); - private void notWaitingForSnapshot() { - if (isWaitingForSnapshot()) { - isWaitingForSnapshot = false; + if (exchange.hasValue()) { + clear(); + eventHandler.onReset(); + return true; + } else { + return false; } } @Override - public void processBookResetEntry(final BookResetEntryInfo bookResetEntryInfo) { - clear(); - waitingForSnapshot(); + public boolean processSecurityFeedStatus(final SecurityFeedStatusMessage msg) { + if (msg.getStatus() == FeedStatus.NOT_AVAILABLE) { + if (disconnectMode == DisconnectMode.CLEAR_EXCHANGE) { + @Alphanumeric final long exchangeId = msg.getExchangeId(); + final Option >> exchange = getOrCreateExchange(exchangeId); + if (exchange.hasValue()) { + clear(); + eventHandler.onDisconnect(); + return true; + } + } + } + return false; } @Override @@ -168,12 +180,12 @@ public ExchangeList >> getExchanges() { return exchanges; } - private void releaseAndClean(final L1MarketSide marketSide) { - for (int i = 0; i < marketSide.depth(); i++) { - final Quote quote = marketSide.getQuote(i); + private void releaseAndClean(final L1MarketSideside) { + for (int i = 0; i < side.depth(); i++) { + final Quote quote = side.getQuote(i); pool.release(quote); } - marketSide.clear(); + side.clear(); } /** @@ -183,33 +195,13 @@ private void releaseAndClean(final L1MarketSidemarketSide) { * @param exchangeId - id of exchange. * @return exchange book by id. */ - private Option>> getOrCreateExchange(final long exchangeId) { + private Option >> getOrCreateExchange(@Alphanumeric final long exchangeId) { if (!exchanges.isEmpty()) { return exchanges.getById(exchangeId); } - final MutableExchangeImpl > exchange = new MutableExchangeImpl<>(exchangeId, this); + final MutableExchange> exchange = new MutableExchangeImpl<>(exchangeId, this); exchanges.add(exchange); return exchanges.getById(exchangeId); } - /** - * Update quote with L1EntryNew. - * - * @param l1EntryInfo - L1EntryNew - * @param quote - type of quote - */ - protected void updateByL1EntryNew(final Quote quote, final L1EntryInfo l1EntryInfo) { - if (quote.getSize() != l1EntryInfo.getSize()) { - quote.setSize(l1EntryInfo.getSize()); - } - if (quote.getPrice() != l1EntryInfo.getPrice()) { - quote.setPrice(l1EntryInfo.getPrice()); - } - if (quote.getExchangeId() != l1EntryInfo.getExchangeId()) { - quote.setExchangeId(l1EntryInfo.getExchangeId()); - } - if (quote.getNumberOfOrders() != l1EntryInfo.getNumberOfOrders()) { - quote.setNumberOfOrders(l1EntryInfo.getNumberOfOrders()); - } - } } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2AggregatedQuoteProcessor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2AggregatedQuoteProcessor.java index 983f983..bf76e9d 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2AggregatedQuoteProcessor.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2AggregatedQuoteProcessor.java @@ -16,17 +16,15 @@ */ package com.epam.deltix.orderbook.core.impl; + import com.epam.deltix.dfp.Decimal; -import com.epam.deltix.orderbook.core.options.GapMode; -import com.epam.deltix.orderbook.core.options.UnreachableDepthMode; -import com.epam.deltix.orderbook.core.options.UpdateMode; +import com.epam.deltix.orderbook.core.options.OrderBookOptions; import com.epam.deltix.timebase.messages.TypeConstants; import com.epam.deltix.timebase.messages.universal.L2EntryUpdateInfo; import com.epam.deltix.timebase.messages.universal.QuoteSide; import static com.epam.deltix.dfp.Decimal64Utils.*; - /** * Implementation aggregated order book for L2 quote level. * @@ -34,14 +32,8 @@ */ class L2AggregatedQuoteProcessorextends AbstractL2MultiExchangeProcessor{ - L2AggregatedQuoteProcessor(final int initialExchangeCount, - final int initialDepth, - final int maxDepth, - final ObjectPoolpool, - final GapMode gapMode, - final UpdateMode updateMode, - final UnreachableDepthMode unreachableDepthMode) { - super(initialExchangeCount, initialDepth, maxDepth, pool, gapMode, updateMode, unreachableDepthMode); + L2AggregatedQuoteProcessor(final OrderBookOptions options, final ObjectPoolpool) { + super(options, pool); } @Override @@ -50,11 +42,9 @@ public String getDescription() { } @Override - public void updateQuote(final Quote previous, - final QuoteSide side, - final L2EntryUpdateInfo update) { + public void updateQuote(final Quote previous, final QuoteSide side, final L2EntryUpdateInfo update) { final L2MarketSidemarketSide = getMarketSide(side); - final short level = marketSide.binarySearchLevelByPrice(previous); + final int level = marketSide.binarySearch(previous); if (level != L2MarketSide.NOT_FOUND) { final Quote quote = marketSide.getQuote(level); @Decimal final long size = add(subtract(quote.getSize(), previous.getSize()), update.getSize()); @@ -66,9 +56,8 @@ public void updateQuote(final Quote previous, } @Override - public boolean removeQuote(final Quote remove, - final L2MarketSidemarketSide) { - final short level = marketSide.binarySearchLevelByPrice(remove); + public boolean removeQuote(final Quote remove, final L2MarketSidemarketSide) { + final int level = marketSide.binarySearch(remove); if (level != L2MarketSide.NOT_FOUND) { final Quote quote = marketSide.getQuote(level); @@ -89,7 +78,7 @@ public boolean removeQuote(final Quote remove, @Override public Quote insertQuote(final Quote insert, final L2MarketSidemarketSide) { - final short level = marketSide.binarySearchNextLevelByPrice(insert); + final int level = marketSide.binarySearchNextLevelByPrice(insert); Quote quote; if (level != marketSide.depth()) { quote = marketSide.getQuote(level); @@ -128,7 +117,7 @@ public void clear() { } @Override - public L2ProcessorclearExchange(final L2Processorexchange) { + public L2ProcessorunmapQuote(final L2Processorexchange) { if (exchange.isEmpty()) { return exchange; } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2ConsolidatedQuoteProcessor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2ConsolidatedQuoteProcessor.java index b3d179d..e1b868c 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2ConsolidatedQuoteProcessor.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2ConsolidatedQuoteProcessor.java @@ -16,10 +16,9 @@ */ package com.epam.deltix.orderbook.core.impl; + import com.epam.deltix.dfp.Decimal64Utils; -import com.epam.deltix.orderbook.core.options.GapMode; -import com.epam.deltix.orderbook.core.options.UnreachableDepthMode; -import com.epam.deltix.orderbook.core.options.UpdateMode; +import com.epam.deltix.orderbook.core.options.OrderBookOptions; import com.epam.deltix.timebase.messages.universal.L2EntryUpdateInfo; import com.epam.deltix.timebase.messages.universal.QuoteSide; @@ -30,14 +29,8 @@ */ class L2ConsolidatedQuoteProcessorextends AbstractL2MultiExchangeProcessor{ - L2ConsolidatedQuoteProcessor(final int initialExchangeCount, - final int initialDepth, - final int maxDepth, - final ObjectPoolpool, - final GapMode gapMode, - final UpdateMode updateMode, - final UnreachableDepthMode unreachableDepthMode) { - super(initialExchangeCount, initialDepth, maxDepth, pool, gapMode, updateMode,unreachableDepthMode); + L2ConsolidatedQuoteProcessor(final OrderBookOptions options, final ObjectPoolpool) { + super(options, pool); } @Override @@ -61,28 +54,33 @@ public void clear() { @Override public boolean removeQuote(final Quote remove, final L2MarketSidemarketSide) { - final short level = marketSide.binarySearchLevelByPrice(remove); + final int level = marketSide.binarySearch(remove); if (level != L2MarketSide.NOT_FOUND) { - if (remove.getExchangeId() == marketSide.getQuote(level).getExchangeId() && - Decimal64Utils.equals(remove.getPrice(), marketSide.getQuote(level).getPrice())) { + if (remove.equals(marketSide.getQuote(level))) { marketSide.remove(level); return true; } else { - final int size = exchanges.size(); - for (int i = 0, k = level + i; i < size; i++, k = level + i) { - if (marketSide.hasLevel((short) (k))) { - if (remove.getExchangeId() == marketSide.getQuote(k).getExchangeId() && - Decimal64Utils.equals(remove.getPrice(), marketSide.getQuote(k).getPrice())) { + final int depth = marketSide.depth(); + for (int i = 0, k = level + i; i < depth; i++, k = level + i) { + if (marketSide.hasLevel(k)) { + final Quote quote = marketSide.getQuote(k); + if (Decimal64Utils.isNotEqual(remove.getPrice(), quote.getPrice())) { + break; + } + if (remove.equals(quote)) { marketSide.remove(k); return true; } } } - for (int i = 0, k = level - i; i < size; i++, k = level - i) { - if (marketSide.hasLevel((short) (k))) { - if (remove.getExchangeId() == marketSide.getQuote(k).getExchangeId() && - Decimal64Utils.equals(remove.getPrice(), marketSide.getQuote(k).getPrice())) { + for (int i = 0, k = level - i; i < depth; i++, k = level - i) { + if (marketSide.hasLevel(k)) { + final Quote quote = marketSide.getQuote(k); + if (Decimal64Utils.isNotEqual(remove.getPrice(), quote.getPrice())) { + break; + } + if (remove.equals(quote)) { marketSide.remove(k); return true; } @@ -95,13 +93,13 @@ public boolean removeQuote(final Quote remove, final L2MarketSidemarketS @Override public Quote insertQuote(final Quote insert, final L2MarketSidemarketSide) { - final short level = marketSide.binarySearchNextLevelByPrice(insert); + final int level = marketSide.binarySearchNextLevelByPrice(insert); marketSide.add(level, insert); return insert; } @Override - public L2ProcessorclearExchange(final L2Processorexchange) { + public L2ProcessorunmapQuote(final L2Processorexchange) { removeAll(exchange, QuoteSide.ASK); removeAll(exchange, QuoteSide.BID); exchange.clear(); diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2MarketSide.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2MarketSide.java index 1b0f807..83cb52d 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2MarketSide.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2MarketSide.java @@ -16,8 +16,11 @@ */ package com.epam.deltix.orderbook.core.impl; +import com.epam.deltix.dfp.Decimal; import com.epam.deltix.orderbook.core.api.MarketSide; +import com.epam.deltix.timebase.messages.universal.BookUpdateAction; import com.epam.deltix.timebase.messages.universal.QuoteSide; +import com.epam.deltix.util.annotations.Alphanumeric; import java.util.Objects; @@ -34,55 +37,46 @@ staticL2MarketSidefactory(final i Objects.requireNonNull(side); switch (side) { case BID: - return new AbstractL2MarketSide.BID<>(initialDepth, (short) maxDepth); + return new AbstractL2MarketSide.BID<>(initialDepth, maxDepth); case ASK: - return new AbstractL2MarketSide.ASK<>(initialDepth, (short) maxDepth); + return new AbstractL2MarketSide.ASK<>(initialDepth, maxDepth); default: throw new IllegalStateException("Unexpected value: " + side); } } + @Override + default Quote getQuote(final CharSequence quoteId) { + // Not supported for L2 + return null; + } + + @Override + default boolean hasQuote(final CharSequence quoteId) { + // Not supported for L2 + return false; + } + /** * Inserts the specified quote at the specified level. Shifts the quotes right (adds one to their indices). * * @param level level to be inserted * @param insert quote to be inserted */ - void add(short level, Quote insert); + void add(int level, Quote insert); /** * Inserts the specified quote at the end. Shifts the quotes right (adds one to their indices). * * @param insert quote to be inserted */ - void addLast(Quote insert); + void addWorstQuote(Quote insert); - /** - * Inserts the specified quote by price. Shifts the quotes right (adds one to their indices). - * - * @param insert quote to be inserted - */ - void add(Quote insert); - - short getMaxDepth(); - - short binarySearchLevelByPrice(Quote find); + int getMaxDepth(); - short binarySearchNextLevelByPrice(Quote find); - - /** - * Trims the limit depth of market. - * An application can use this operation to minimize the storage of stock quotes. - * After trim, we can't add stock quote with level more than limit. - */ - void trim(); + int binarySearch(Quote find); - /** - * Return worst quote. - * - * @return last quote - */ - Quote getWorstQuote(); + int binarySearchNextLevelByPrice(Quote find); /** * Remove worst quote. @@ -93,6 +87,7 @@ staticL2MarketSidefactory(final i /** * Remove quote by level. + * Shifts any subsequent elements to the left. * * @param level - the level of the quote to be removed * @return removed quote @@ -112,8 +107,37 @@ staticL2MarketSidefactory(final i * @param level - quote level to use * @return true if this quote level is unexpected */ - boolean isGap(final short level); + boolean isGap(int level); + + + boolean isUnreachableLeve(int level); + + /** + * Checks if the specified price is sorted. + * + * @param level - quote level to use + * @param price - price to be checked + * @return true if this price is sorted. + */ + boolean checkOrderPrice(int level, @Decimal long price); void clear(); + /** + * Validates state of market side. + * + * @return - true if state is valid + */ + boolean validateState(); + + boolean isInvalidInsert(int level, + @Decimal long price, + @Decimal long size, + @Alphanumeric long exchangeId); + + boolean isInvalidUpdate(BookUpdateAction action, + int level, + @Decimal long price, + @Decimal long size, + @Alphanumeric long exchangeId); } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2OrderBookFactory.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2OrderBookFactory.java index 623a958..70efb69 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2OrderBookFactory.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2OrderBookFactory.java @@ -16,12 +16,11 @@ */ package com.epam.deltix.orderbook.core.impl; + import com.epam.deltix.orderbook.core.api.OrderBook; import com.epam.deltix.orderbook.core.api.OrderBookQuote; -import com.epam.deltix.orderbook.core.options.GapMode; -import com.epam.deltix.orderbook.core.options.Option; -import com.epam.deltix.orderbook.core.options.UnreachableDepthMode; -import com.epam.deltix.orderbook.core.options.UpdateMode; +import com.epam.deltix.orderbook.core.options.Defaults; +import com.epam.deltix.orderbook.core.options.OrderBookOptions; /** * A factory that implements order book for Level2. @@ -34,79 +33,73 @@ @SuppressWarnings("unchecked") public class L2OrderBookFactory { + /** + * Prevents instantiation + */ + protected L2OrderBookFactory() { + } + /** * Creates OrderBook for single exchange market feed of given initial depth * - * @param symbol - type of symbol - * @param initialDepth - initial book depth - * @param maxDepth - max order book depth - * @param gapMode - skipped levels mode - * @param updateMode - modes of order book update. - * @param- type of quote + * @param options - options to use + * @param- type of quote * @return instance of OrderBook for single exchange */ - public staticOrderBooknewSingleExchangeBook(final Optionsymbol, - final int initialDepth, - final int maxDepth, - final GapMode gapMode, - final UpdateMode updateMode, - final UnreachableDepthMode unreachableDepthMode) { - final ObjectPool pool = new ObjectPool<>(initialDepth, MutableOrderBookQuoteImpl::new); - final QuoteProcessor processor = - new L2SingleExchangeQuoteProcessor<>(initialDepth, maxDepth, pool, gapMode, updateMode, unreachableDepthMode); - return (OrderBook ) new OrderBookDecorator<>(symbol, processor); + public staticOrderBooknewSingleExchangeBook(final OrderBookOptions options) { + final int maxDepth = options.getMaxDepth().orElse(Defaults.MAX_DEPTH); + final int depth = options.getInitialDepth().orElse(Math.min(Defaults.INITIAL_DEPTH, maxDepth)); + final boolean isCompact = options.isCompactVersion().orElse(false); + + final QuoteProcessor extends MutableOrderBookQuote> processor; + if (isCompact) { + processor = new CompactL2SingleExchangeQuoteProcessor<>(options); + } else { + final ObjectPool extends MutableOrderBookQuote> pool = + (ObjectPool extends MutableOrderBookQuote>) options.getSharedObjectPool().orElse(QuotePoolFactory.create(options, depth)); + processor = new L2SingleExchangeQuoteProcessor<>(options, pool); + } + + return (OrderBook) new OrderBookDecorator<>(options.getSymbol(), processor); } /** * Creates OrderBook for market feed from multiple exchanges of given maximum depth. * Consolidated book preserve information about quote's exchange. * - * @param symbol - type of symbol\ - * @param initialExchangeCount - initial pool size for stock exchanges - * @param initialDepth - initial book depth - * @param maxDepth - max order book depth - * @param gapMode - skipped levels mode - * @param updateMode - modes of order book update. - * @param- type of quote + * @param options - options to use + * @param- type of quote * @return instance of Order Book with multiple exchanges */ - public staticOrderBooknewConsolidatedBook(final Optionsymbol, - final int initialExchangeCount, - final int initialDepth, - final int maxDepth, - final GapMode gapMode, - final UpdateMode updateMode, - final UnreachableDepthMode unreachableDepthMode) { - final ObjectPool pool = new ObjectPool<>(initialExchangeCount * initialDepth, MutableOrderBookQuoteImpl::new); - final QuoteProcessor processor = - new L2ConsolidatedQuoteProcessor<>(initialExchangeCount, initialDepth, maxDepth, pool, gapMode, updateMode, unreachableDepthMode); - return (OrderBook ) new OrderBookDecorator<>(symbol, processor); + public staticOrderBooknewConsolidatedBook(final OrderBookOptions options) { + final int maxDepth = options.getMaxDepth().orElse(Defaults.MAX_DEPTH); + final int depth = options.getInitialDepth().orElse(Math.min(Defaults.INITIAL_DEPTH, maxDepth)); + final int exchanges = options.getInitialExchangesPoolSize().orElse(Defaults.INITIAL_EXCHANGES_POOL_SIZE); + + final ObjectPool extends MutableOrderBookQuote> pool = (ObjectPool extends MutableOrderBookQuote>) options.getSharedObjectPool() + .orElse(QuotePoolFactory.create(options, exchanges * depth)); + + final QuoteProcessor extends MutableOrderBookQuote> processor = new L2ConsolidatedQuoteProcessor<>(options, pool); + return (OrderBook) new OrderBookDecorator<>(options.getSymbol(), processor); } /** * Creates OrderBook for market feed from multiple exchanges of given maximum depth. * Aggregated order book groups quotes from multiple exchanges by price. * - * @param symbol - type of symbol\ - * @param initialExchangeCount - initial pool size for stock exchanges - * @param initialDepth - initial book depth - * @param maxDepth - max order book depth - * @param gapMode - skipped levels mode - * @param updateMode - modes of order book update. - * @param- type of quote + * @param options - options to use + * @param- type of quote * @return instance of Order Book with multiple exchanges */ - public staticOrderBooknewAggregatedBook(final Optionsymbol, - final int initialExchangeCount, - final int initialDepth, - final int maxDepth, - final GapMode gapMode, - final UpdateMode updateMode, - final UnreachableDepthMode unreachableDepthMode) { - final ObjectPool pool = new ObjectPool<>(initialExchangeCount * initialDepth * 4, MutableOrderBookQuoteImpl::new); - final QuoteProcessor processor = - new L2AggregatedQuoteProcessor<>(initialExchangeCount, initialDepth, maxDepth, pool, gapMode, updateMode, unreachableDepthMode); - return (OrderBook ) new OrderBookDecorator<>(symbol, processor); - } + public staticOrderBooknewAggregatedBook(final OrderBookOptions options) { + final int maxDepth = options.getMaxDepth().orElse(Defaults.MAX_DEPTH); + final int depth = options.getInitialDepth().orElse(Math.min(Defaults.INITIAL_DEPTH, maxDepth)); + final int exchanges = options.getInitialExchangesPoolSize().orElse(Defaults.INITIAL_EXCHANGES_POOL_SIZE); + + final ObjectPool extends MutableOrderBookQuote> pool = (ObjectPool extends MutableOrderBookQuote>) options.getSharedObjectPool() + .orElse(QuotePoolFactory.create(options, exchanges * depth * 4)); + final QuoteProcessor extends MutableOrderBookQuote> processor = new L2AggregatedQuoteProcessor<>(options, pool); + return (OrderBook) new OrderBookDecorator<>(options.getSymbol(), processor); + } } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2Processor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2Processor.java index c364646..0487c05 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2Processor.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2Processor.java @@ -16,6 +16,7 @@ */ package com.epam.deltix.orderbook.core.impl; + import com.epam.deltix.timebase.messages.universal.*; import com.epam.deltix.util.collections.generated.ObjectList; @@ -24,7 +25,7 @@ * * @author Andrii_Ostapenko */ -interface L2Processorextends QuoteProcessor, ResetEntryProcessor { +interface L2Processorextends QuoteProcessor{ @Override default DataModelType getQuoteLevels() { @@ -32,54 +33,64 @@ default DataModelType getQuoteLevels() { } @Override - L2MarketSidegetMarketSide(final QuoteSide side); + L2MarketSidegetMarketSide(QuoteSide side); @Override MutableExchangeList>> getExchanges(); /** - * This type reports incremental Level2-new: insert one line in Order Book either on ask or bid side. + * This type reports incremental Level2-new: insert one line in Order Book either on ask or bid side. * - * @param l2EntryNewInfo - Level2-new entry + * @param pck + * @param msg - Level2-new entry * @return insert quote */ - Quote processL2EntryNewInfo(final L2EntryNewInfo l2EntryNewInfo); + Quote processL2EntryNew(PackageHeaderInfo pck, L2EntryNewInfo msg); /** * This type reports incremental Level2-update: update or delete one line in Order Book either on ask or bid side. * - * @param l2EntryUpdateInfo - Level2-update + * @param pck + * @param msg - Level2-update + * @return true if quote was updated, false if quote was not found */ - void processL2EntryUpdateInfo(final L2EntryUpdateInfo l2EntryUpdateInfo); + boolean processL2EntryUpdate(PackageHeaderInfo pck, L2EntryUpdateInfo msg); - void processL2VendorSnapshot(final PackageHeaderInfo marketMessageInfo); + boolean processL2Snapshot(PackageHeaderInfo msg); - default boolean process(final BaseEntryInfo pck) { - if (pck instanceof L2EntryNew) { - final L2EntryNew l2EntryNewInfo = (L2EntryNew) pck; - processL2EntryNewInfo(l2EntryNewInfo); - return true; - } else if (pck instanceof L2EntryUpdate) { - final L2EntryUpdate l2EntryUpdateInfo = (L2EntryUpdate) pck; - processL2EntryUpdateInfo(l2EntryUpdateInfo); + boolean isWaitingForSnapshot(); + + boolean isSnapshotAllowed(PackageHeaderInfo msg); + + default boolean processIncrementalUpdate(PackageHeaderInfo pck, final BaseEntryInfo entryInfo) { + if (entryInfo instanceof L2EntryNew) { + final L2EntryNew entry = (L2EntryNew) entryInfo; + return processL2EntryNew(pck, entry) != null; + } else if (entryInfo instanceof L2EntryUpdate) { + final L2EntryUpdate entry = (L2EntryUpdate) entryInfo; + return processL2EntryUpdate(pck, entry); + } else if (entryInfo instanceof StatisticsEntry) { return true; } return false; } - default boolean processSnapshot(final PackageHeaderInfo marketMessageInfo) { - final ObjectList entries = marketMessageInfo.getEntries(); - final BaseEntryInfo baseEntryInfo = entries.get(0); - if (baseEntryInfo instanceof L2EntryNewInfo) { - processL2VendorSnapshot(marketMessageInfo); - return true; - } else if (baseEntryInfo instanceof BookResetEntryInfo) { - final BookResetEntryInfo resetEntryInfo = (BookResetEntryInfo) baseEntryInfo; - if (resetEntryInfo.getModelType() == getQuoteLevels()) { - processBookResetEntry(resetEntryInfo); - return true; + default boolean processSnapshot(final PackageHeaderInfo msg) { + final ObjectList entries = msg.getEntries(); + final int n = entries.size(); + // skip statistic entries try to establish if we are dealing with order book reset or normal snapshot + for (int i = 0; i < n; i++) { + final BaseEntryInfo entry = entries.get(i); + if (entry instanceof L2EntryNewInfo) { + return processL2Snapshot(msg); + } else if (entry instanceof BookResetEntryInfo) { + final BookResetEntryInfo resetEntry = (BookResetEntryInfo) entry; + if (resetEntry.getModelType() == getQuoteLevels()) { + return processBookResetEntry(msg, resetEntry); + } } } return false; } + } diff --git a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2SingleExchangeQuoteProcessor.java b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2SingleExchangeQuoteProcessor.java index df18ec3..9d9fc3b 100644 --- a/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2SingleExchangeQuoteProcessor.java +++ b/orderbook-core/src/main/java/com/epam/deltix/orderbook/core/impl/L2SingleExchangeQuoteProcessor.java @@ -16,16 +16,19 @@ */ package com.epam.deltix.orderbook.core.impl; -import com.epam.deltix.orderbook.core.options.GapMode; -import com.epam.deltix.orderbook.core.options.Option; -import com.epam.deltix.orderbook.core.options.UnreachableDepthMode; -import com.epam.deltix.orderbook.core.options.UpdateMode; +import com.epam.deltix.containers.AlphanumericUtils; +import com.epam.deltix.orderbook.core.options.*; +import com.epam.deltix.timebase.messages.TypeConstants; +import com.epam.deltix.timebase.messages.service.FeedStatus; +import com.epam.deltix.timebase.messages.service.SecurityFeedStatusMessage; import com.epam.deltix.timebase.messages.universal.*; +import com.epam.deltix.util.annotations.Alphanumeric; import com.epam.deltix.util.collections.generated.ObjectList; import static com.epam.deltix.timebase.messages.universal.QuoteSide.ASK; import static com.epam.deltix.timebase.messages.universal.QuoteSide.BID; + /** * @author Andrii_Ostapenko1 */ @@ -35,45 +38,32 @@ public class L2SingleExchangeQuoteProcessor protected final L2MarketSidebids; protected final L2MarketSide