Skip to content

Commit

Permalink
Add support for streaming realtime market data
Browse files Browse the repository at this point in the history
This change adds support for streaming realtime market data to the
program. The functionality resides under the newly introduced updates
data subcommand. This subcommand currently accepts a list of symbols as
positional arguments. The source of the data can be controlled via an
optional flag.
  • Loading branch information
d-e-s-o committed Jan 31, 2022
1 parent 820ae09 commit 3c8c82c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Unreleased
- Removed account update streaming support via `events account`
- Removed `--json` argument from `events` subcommand
- Renamed `events` subcommand to `updates`
- Added support for streaming realtime market data via `updates data`
- Formatted code base using `rustfmt` and checked in configuration
- Added enforcement of code formatting style checks in CI
- Bumped minimum supported Rust version to `1.56`
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ path = "utils/shell-complete.rs"
anyhow = {version = "1.0", default-features = false, features = ["std"]}

[dependencies]
apca = "0.22.0"
apca = "0.22.1"
anyhow = {version = "1.0", default-features = false, features = ["std"]}
chrono = {version = "0.4", default-features = false}
futures = {version = "0.3", default-features = false, features = ["async-await", "std"]}
Expand Down
36 changes: 36 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,47 @@ pub enum Asset {
}


/// A enumeration of all supported realtime market data sources.
#[derive(Copy, Clone, Debug, StructOpt)]
pub enum DataSource {
/// Use the Investors Exchange (IEX) as the data source.
Iex,
/// Use CTA (administered by NYSE) and UTP (administered by Nasdaq)
/// SIPs as the data source.
///
/// This source is only usable with the unlimited market data plan.
Sip,
}

impl FromStr for DataSource {
type Err = String;

fn from_str(side: &str) -> Result<Self, Self::Err> {
match side {
"iex" => Ok(DataSource::Iex),
"sip" => Ok(DataSource::Sip),
s => Err(format!(
"{} is not a valid data source (use 'iex' or 'sip')",
s
)),
}
}
}


/// A struct representing the `updates` command.
#[derive(Debug, StructOpt)]
pub enum Updates {
/// Subscribe to trade events.
Trades,
/// Subscribe to realtime market data aggregates.
Data {
/// The symbols for which to receive aggregate data.
symbols: Vec<String>,
/// The data source to use.
#[structopt(long, default_value = "iex")]
source: DataSource,
},
}


Expand Down
71 changes: 71 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use apca::api::v2::position;
use apca::api::v2::positions;
use apca::api::v2::updates;
use apca::data::v2::last_quote;
use apca::data::v2::stream;
use apca::ApiInfo;
use apca::Client;

Expand All @@ -41,6 +42,7 @@ use chrono::DateTime;

use futures::future::join;
use futures::future::ready;
use futures::future::FutureExt as _;
use futures::future::TryFutureExt;
use futures::join;
use futures::stream::FuturesOrdered;
Expand Down Expand Up @@ -70,6 +72,7 @@ use crate::args::ChangeOrder;
use crate::args::Command;
use crate::args::Config;
use crate::args::ConfigSet;
use crate::args::DataSource;
use crate::args::Order;
use crate::args::OrderId;
use crate::args::Position;
Expand Down Expand Up @@ -579,9 +582,77 @@ async fn stream_trade_updates(client: Client) -> Result<()> {
Ok(())
}


/// Subscribe to and stream realtime market data updates.
async fn stream_realtime_data(
client: Client,
source: DataSource,
symbols: Vec<String>,
) -> Result<()> {
let result = match source {
DataSource::Iex => {
client
.subscribe::<stream::RealtimeData<stream::IEX>>()
.await
},
DataSource::Sip => {
client
.subscribe::<stream::RealtimeData<stream::SIP>>()
.await
},
};

let (mut stream, mut subscription) =
result.with_context(|| "failed to subscribe to realtime market data updates")?;

let mut data = stream::MarketData::default();
data.set_bars(symbols);

let subscribe = subscription.subscribe(&data).boxed_local().fuse();
let () = stream::drive(subscribe, &mut stream)
.await
.map_err(|result| {
result
.map(|result| apca::Error::Json(result.unwrap_err()))
.map_err(apca::Error::WebSocket)
.unwrap_or_else(|err| err)
})
.context("failed to subscribe to market data")???;

stream
.try_for_each(|result| async {
let data = result.unwrap();
match data {
stream::Data::Bar(bar) => {
println!(
r#"{symbol}:
time stamp: {timestamp}
open price: {open_price}
close price: {close_price}
high price: {high_price}
low price: {low_price}
volume: {volume}"#,
symbol = bar.symbol,
timestamp = format_local_time_short(bar.timestamp),
open_price = bar.open_price,
close_price = bar.close_price,
high_price = bar.high_price,
low_price = bar.low_price,
volume = bar.volume,
);
},
}
Ok(())
})
.await?;

Ok(())
}

async fn updates(client: Client, updates: Updates) -> Result<()> {
match updates {
Updates::Trades => stream_trade_updates(client).await,
Updates::Data { source, symbols } => stream_realtime_data(client, source, symbols).await,
}
}

Expand Down

0 comments on commit 3c8c82c

Please sign in to comment.