Skip to content

Commit

Permalink
Data update: Aggregate AlphaVantage prices to monthly level
Browse files Browse the repository at this point in the history
  • Loading branch information
nico-corthorn committed Sep 30, 2024
1 parent fc152e1 commit 8eb110d
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 76 deletions.
20 changes: 20 additions & 0 deletions db/merge/prices_alpha_monthly.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

CREATE TABLE prices_alpha_monthly
(
symbol varchar(20) NOT NULL,
date date NOT NULL,
open numeric(14,2),
high numeric(14,2),
low numeric(14,2),
close numeric(14,2),
adjusted_close numeric(14,2),
monthly_volume numeric(14,0),
monthly_return_std float,
monthly_return float,
source_lud timestamp NOT NULL,
lud timestamp NOT NULL,
PRIMARY KEY (symbol, date)
)

CREATE INDEX prices_alpha_monthly_symbol_idx ON prices_alpha_monthly (symbol);
CREATE INDEX prices_alpha_monthly_date_idx ON prices_alpha_monthly (date);
70 changes: 0 additions & 70 deletions esgtools/alpha/merge.py

This file was deleted.

124 changes: 123 additions & 1 deletion esgtools/alpha/table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import os
import csv
import numpy as np
import pandas as pd
from datetime import datetime
from abc import ABC , abstractmethod
Expand Down Expand Up @@ -347,7 +348,6 @@ def update(self, symbol: str, size: str):

print(f'Database already up to date for {symbol}')


def get_api_data(self, symbol, size='full'):
"""Hit AlphaVantage API to get prices of symbol
Expand Down Expand Up @@ -531,6 +531,128 @@ def _get_api_prices_to_upload(
return should_upload, clean_db_table, api_prices



class AlphaTablePricesMonthly(ABC):

def __init__(self,
table_name: str,
sql_params=None,
max_workers=os.cpu_count()):
self.table_name = table_name
self.sql = sql_manager.ManagerSQL(sql_params)
self.max_workers = max_workers
print(f"Using {self.max_workers} workers")


def update_all(self, asset_types:list = ['Stock']):

# Get available assets from db, and potentially apply filter
assets = self.get_assets(validate, asset_types)

# Update db prices in parallel
self.update_list(list(assets.symbol))


def get_assets(self, asset_types:list):
assets = self.get_assets_to_refresh(asset_types)
assets = assets.reset_index(drop=True)
return assets


def update_list(self, symbols:list, parallel:bool=False):

print(f"Updating prices for {symbols}")
if parallel:
args = symbols
fun = lambda p: self.update(*p)
utils.compute(args, fun, max_workers=self.max_workers)
else:
for symbol in symbols:
self.update(symbol, size)


def update(self, symbol: str):

print(f'Updating monthly prices for {symbol}')

# Get daily prices
prices_daily = self.sql.select_query(f"select * from prices_alpha where symbol = '{symbol}'")

if prices_daily.shape[0] > 0:

prices_monthly = self._get_prices_monthly(prices_daily)

if prices_monthly.shape[0] > 0:

# Clean symbol in monthly table
delete_query = f"delete from {self.table_name} where symbol = '{symbol}'"
self.sql.query(delete_query)

# Upload to database
print(f'Uploading {prices_monthly.shape[0]} months for {symbol}')
self.sql.upload_df_chunks(self.table_name, prices_monthly)

else:
print(f'No valid monthly prices can be computed for {symbol}')

else:
print(f'No daily prices found for {symbol}')


def _get_prices_monthly(self, prices_daily):

# Compute monthly
prices_daily["date"] = pd.to_datetime(prices_daily.date)
prices_daily = prices_daily.sort_values(by=["symbol", "date"])
prices_daily["previous_adjusted_close"] = prices_daily.groupby("symbol")["adjusted_close"].shift(1)
prices_daily["daily_return"] = prices_daily.adjusted_close / prices_daily.previous_adjusted_close - 1
prices_daily['daily_cont_return'] = np.log(1 + prices_daily.daily_return)
agg_map = {
"volume": np.sum,
"daily_cont_return": np.sum,
"daily_return": np.std,
"symbol": len
}
agg_rename = {
"volume": "monthly_volume",
"daily_cont_return": "monthly_cont_return",
"daily_return": "monthly_return_std",
"symbol": "day_count"
}

# Last monthly values
prices_monthly_last = prices_daily.set_index('date').groupby('symbol').resample('BM').last()

# Aggregate monthly values
prices_monthly_agg = (
prices_daily
.set_index("date")
.groupby("symbol")
.resample("BM")
.agg(agg_map)
.rename(columns=agg_rename)
)
prices_monthly_agg['monthly_return'] = np.exp(prices_monthly_agg['monthly_cont_return']) - 1

# Join monthly values
prices_monthly = (
prices_monthly_last
.join(prices_monthly_agg)
)

# Format and filter columns
prices_monthly.rename(columns={"lud": "source_lud"}, inplace=True)
prices_monthly["lud"] = datetime.now()
prices_monthly = prices_monthly.drop(columns="symbol").reset_index()
cols = ["symbol", "date", "open", "high", "low", "close", "adjusted_close", "monthly_volume", "monthly_return_std", "monthly_return", "day_count", "source_lud", "lud"]
missing_data_cond = prices_monthly.adjusted_close.isnull()
one_record_cond = (prices_monthly.day_count == 1) & (prices_monthly.monthly_return == 0)
prices_monthly = prices_monthly.loc[~missing_data_cond & ~one_record_cond, cols].copy()

return prices_monthly



class AlphaTableAccounting(AlphaTable):
"""
Class to update balance_alpha and income_alpha tables
Expand Down
Empty file.
124 changes: 124 additions & 0 deletions esgtools/consolidation/merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@

import numpy as np
import pandas as pd
from datetime import datetime
from utils import sql_manager, utils, date_utils

def merge_alpha_and_wrds_assets(sql_params=None):

sql = sql_manager.ManagerSQL(sql_params)

# Delete table
sql.query("drop table if exists assets")

# Read query for creating assets table
query = """
create table assets as
select
concat(case when w.ticker is null then coalesce(a.symbol, '0') else w.ticker end
, '-', coalesce(cast(permno as TEXT), '0')) as id
, coalesce(w.ticker, a.symbol) symbol
, w.permno
, a.name
, coalesce(a.exchange, w.primary_exch) exchange
, coalesce(a.asset_type, 'Stock') asset_type
, case
when coalesce(a.asset_type, 'Stock') = 'Stock' then coalesce(w.share_class, 'A')
else w.share_class
end share_class
, case
when ipo_date_proxy is not null and ipo_date is not null then
case when ipo_date_proxy < ipo_date then cast(ipo_date_proxy as date)
else ipo_date end
else cast(coalesce(ipo_date, ipo_date_proxy) as date)
end ipo_date
, cast(coalesce(delisting_date, delisting_date_proxy) as date) delisting_date
, case when a.symbol is not null then 1 else 0 end in_alpha
, case
when a.status is null and delisting_date_proxy is not null then 'Delisted'
else a.status
end status
, lud alpha_lud
, Now() lud
from assets_wrds w
full outer join (
select *
from (
select *
, row_number() over(partition by symbol order by coalesce(delisting_date, CURRENT_DATE) desc) rnk
from assets_alpha
where symbol not like '%-%'
order by symbol, delisting_date
) a
where rnk = 1
) a
on
w.delisting_date_proxy is NULL
and w.rnk=1
and a.symbol = w.ticker
"""

# Create assets table
sql.query(query)


def update_prices_alpha_monthly(sql_params=None):
sql = sql_manager.ManagerSQL(sql_params)
alpha_prices = sql.select_query("select * from prices_alpha;")

# Compute monthly
alpha_prices["date"] = pd.to_datetime(alpha_prices.date)
alpha_prices = alpha_prices.sort_values(by=["symbol", "date"])
alpha_prices["previous_adjusted_close"] = alpha_prices.groupby("symbol")["adjusted_close"].shift(1)
alpha_prices["daily_return"] = alpha_prices.adjusted_close / alpha_prices.previous_adjusted_close - 1
alpha_prices['daily_cont_return'] = np.log(1 + alpha_prices.daily_return)
agg_map = {
"volume": np.sum,
"daily_cont_return": np.sum,
"daily_return": np.std,
"symbol": len
}
agg_rename = {
"volume": "monthly_volume",
"daily_cont_return": "monthly_cont_return",
"daily_return": "monthly_return_std",
"symbol": "day_count"
}

# Last monthly values
alpha_prices_month_last = alpha_prices.set_index('date').groupby('symbol').resample('BM').last()

# Aggregate monthly values
alpha_prices_month_agg = (
alpha_prices
.set_index("date")
.groupby("symbol")
.resample("BM")
.agg(agg_map)
.rename(columns=agg_rename)
)
alpha_prices_month_agg['monthly_return'] = np.exp(alpha_prices_month_agg['monthly_cont_return']) - 1

# Join monthly values
alpha_prices_month = (
alpha_prices_month_last
.join(alpha_prices_month_agg)
)

# Format and filter columns
alpha_prices_month.rename(columns={"lud": "source_lud"}, inplace=True)
alpha_prices_month["lud"] = datetime.now()
alpha_prices_month = alpha_prices_month.drop(columns="symbol").reset_index()
cols = ["symbol", "date", "open", "high", "low", "close", "adjusted_close", "monthly_volume", "monthly_return_std", "monthly_return", "day_count", "source_lud", "lud"]
missing_data_cond = alpha_prices_month.adjusted_close.isnull()
one_record_cond = (alpha_prices_month.day_count == 1) & (alpha_prices_month.monthly_return == 0)
alpha_prices_month = alpha_prices_month.loc[~missing_data_cond & ~one_record_cond, cols].copy()

# Clean table
table_name = "prices_alpha_monthly"
query = f"delete from {table_name}"
sql.query(query)

# Upload to database
sql.upload_df_chunks(table_name, alpha_prices_month)

Loading

0 comments on commit 8eb110d

Please sign in to comment.