Skip to content

Commit

Permalink
Data update: Add merging of WRDS and AlphaVantage monthly returns
Browse files Browse the repository at this point in the history
- Update AlphaVantage monthly returns now done together with daily AlphaVantage price update statemachine
- Merge between Alpha and WRDS monthly returns is included at the end of the same statemachine as a query (for speed)
  • Loading branch information
nico-corthorn committed Oct 6, 2024
1 parent 8eb110d commit 929c11b
Show file tree
Hide file tree
Showing 12 changed files with 5,413 additions and 76 deletions.
7 changes: 4 additions & 3 deletions db/merge/prices_alpha_monthly.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

--DROP TABLE prices_alpha_monthly;
CREATE TABLE prices_alpha_monthly
(
symbol varchar(20) NOT NULL,
Expand All @@ -11,10 +11,11 @@ CREATE TABLE prices_alpha_monthly
monthly_volume numeric(14,0),
monthly_return_std float,
monthly_return float,
source_lud timestamp NOT NULL,
day_count int,
source_lud timestamp,
lud timestamp NOT NULL,
PRIMARY KEY (symbol, date)
)
);

CREATE INDEX prices_alpha_monthly_symbol_idx ON prices_alpha_monthly (symbol);
CREATE INDEX prices_alpha_monthly_date_idx ON prices_alpha_monthly (date);
11 changes: 6 additions & 5 deletions esgtools/alpha/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ def get_api_data(self, symbol, size='full'):
Parameters
----------
date_input : datetime
Date at which the delisting snapshot is taken
symbol : str
size: str
compact or full
Returns
-------
Expand Down Expand Up @@ -564,11 +565,11 @@ def update_list(self, symbols:list, parallel:bool=False):
print(f"Updating prices for {symbols}")
if parallel:
args = symbols
fun = lambda p: self.update(*p)
fun = lambda p: self.update(p)
utils.compute(args, fun, max_workers=self.max_workers)
else:
for symbol in symbols:
self.update(symbol, size)
self.update(symbol)


def update(self, symbol: str):
Expand Down Expand Up @@ -759,7 +760,7 @@ def update(self, symbol):

else:
print(f"api_balance was empty for {symbol}")


def get_api_data(self, symbol: str):
"""Hit AlphaVantage API to get balance sheet
Expand Down
116 changes: 60 additions & 56 deletions esgtools/consolidation/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,63 +62,67 @@ def merge_alpha_and_wrds_assets(sql_params=None):
sql.query(query)


def update_prices_alpha_monthly(sql_params=None):
sql = sql_manager.ManagerSQL(sql_params)
alpha_prices = sql.select_query("select * from prices_alpha;")
def merge_alpha_and_wrds_returns(sql_params=None):

# Compute monthly
alpha_prices["date"] = pd.to_datetime(alpha_prices.date)
alpha_prices = alpha_prices.sort_values(by=["symbol", "date"])
alpha_prices["previous_adjusted_close"] = alpha_prices.groupby("symbol")["adjusted_close"].shift(1)
alpha_prices["daily_return"] = alpha_prices.adjusted_close / alpha_prices.previous_adjusted_close - 1
alpha_prices['daily_cont_return'] = np.log(1 + alpha_prices.daily_return)
agg_map = {
"volume": np.sum,
"daily_cont_return": np.sum,
"daily_return": np.std,
"symbol": len
}
agg_rename = {
"volume": "monthly_volume",
"daily_cont_return": "monthly_cont_return",
"daily_return": "monthly_return_std",
"symbol": "day_count"
}

# Last monthly values
alpha_prices_month_last = alpha_prices.set_index('date').groupby('symbol').resample('BM').last()

# Aggregate monthly values
alpha_prices_month_agg = (
alpha_prices
.set_index("date")
.groupby("symbol")
.resample("BM")
.agg(agg_map)
.rename(columns=agg_rename)
)
alpha_prices_month_agg['monthly_return'] = np.exp(alpha_prices_month_agg['monthly_cont_return']) - 1

# Join monthly values
alpha_prices_month = (
alpha_prices_month_last
.join(alpha_prices_month_agg)
)

# Format and filter columns
alpha_prices_month.rename(columns={"lud": "source_lud"}, inplace=True)
alpha_prices_month["lud"] = datetime.now()
alpha_prices_month = alpha_prices_month.drop(columns="symbol").reset_index()
cols = ["symbol", "date", "open", "high", "low", "close", "adjusted_close", "monthly_volume", "monthly_return_std", "monthly_return", "day_count", "source_lud", "lud"]
missing_data_cond = alpha_prices_month.adjusted_close.isnull()
one_record_cond = (alpha_prices_month.day_count == 1) & (alpha_prices_month.monthly_return == 0)
alpha_prices_month = alpha_prices_month.loc[~missing_data_cond & ~one_record_cond, cols].copy()
sql = sql_manager.ManagerSQL(sql_params)

# Clean table
table_name = "prices_alpha_monthly"
query = f"delete from {table_name}"
sql.query(query)
# Delete table
sql.query("drop table if exists returns_monthly")

# Upload to database
sql.upload_df_chunks(table_name, alpha_prices_month)
# Read query for creating prices_monthly table
query = """
create table returns_monthly as
select
id, symbol, period, date, source
, close, adjusted_close, shares_outstanding
, monthly_volume, monthly_return_std, monthly_return
, row_number() over(partition by id, period order by priority) rnk
from (
select a.id, a.symbol, period, date, source, priority
, close, adjusted_close, shares_outstanding
, monthly_volume, monthly_return_std, monthly_return
from assets a
left join (
select
permno
, to_char(date, 'YYYY-MM') period
, cast(date as date) as date
, 'WRDS' as source
, 0 as priority
, price as close
, NULL::numeric adjusted_close
, shrout shares_outstanding
, volume_month as monthly_volume
, std_month as monthly_return_std
, ret_month as monthly_return
from returns_wrds
) w
on w.permno = a.permno
union
select a.id, a.symbol, period, date, source, priority
, close, adjusted_close, shares_outstanding
, monthly_volume, monthly_return_std, monthly_return
from assets a
left join (
select
symbol
, to_char(date, 'YYYY-MM') period
, date
, 'ALPHA' as source
, 1 as priority
, close
, adjusted_close
, NULL::numeric shares_outstanding
, monthly_volume
, monthly_return_std
, monthly_return
, source_lud
from prices_alpha_monthly
) v
on v.symbol = a.symbol and a.in_alpha = 1
) a
where date is not NULL
"""

# Create prices_montly table
sql.query(query)
Loading

0 comments on commit 929c11b

Please sign in to comment.