diff --git a/db/nyt/create_table_nyt_archive.sql b/db/nyt/nyt_archive.sql similarity index 100% rename from db/nyt/create_table_nyt_archive.sql rename to db/nyt/nyt_archive.sql diff --git a/db/nyt/nyt_news_to_asset.sql b/db/nyt/nyt_news_to_asset.sql new file mode 100644 index 0000000..bdf4d3e --- /dev/null +++ b/db/nyt/nyt_news_to_asset.sql @@ -0,0 +1,10 @@ + +-- DROP TABLE nyt_news_to_asset + +CREATE TABLE nyt_news_to_asset +( + web_url text NOT NULL, + organization text NOT NULL, + asset_ids text NOT NULL, + PRIMARY KEY (web_url) +) \ No newline at end of file diff --git a/db/nyt/nyt_org_to_asset.sql b/db/nyt/nyt_org_to_asset.sql new file mode 100644 index 0000000..53d21d7 --- /dev/null +++ b/db/nyt/nyt_org_to_asset.sql @@ -0,0 +1,10 @@ + +-- DROP TABLE nyt_org_to_asset + +CREATE TABLE nyt_org_to_asset +( + organization text NOT NULL, + count int NOT NULL, + asset_ids text NOT NULL, + PRIMARY KEY (organization) +) \ No newline at end of file diff --git a/db/nyt/nyt_org_to_asset_view.sql b/db/nyt/nyt_org_to_asset_view.sql new file mode 100644 index 0000000..fc50b2a --- /dev/null +++ b/db/nyt/nyt_org_to_asset_view.sql @@ -0,0 +1,9 @@ + +-- drop view nyt_org_asset_view + +create view nyt_org_asset_view as +select n.*, a.name, a.exchange, a.ipo_date, a.delisting_date +from nyt_org_to_asset n +left join assets a +on n.asset_ids LIKE CONCAT('%', a.id, '%') +--order by n.count desc \ No newline at end of file diff --git a/esgtools/nyt/nyt.py b/esgtools/nyt/nyt.py deleted file mode 100644 index e5640bc..0000000 --- a/esgtools/nyt/nyt.py +++ /dev/null @@ -1,130 +0,0 @@ - -# External -import os -import sys -import requests -import json -import pandas as pd -from datetime import datetime -import traceback - -# Internal -from utils import sql_manager - - -class ManagerNYTdata: - - def __init__(self, clean_tables=False, api_key=None): - # Initial parameters - self.sql_manager = sql_manager.ManagerSQL() - if api_key is None: - self.api_key = self.get_api_key() - - # Clean - if clean_tables: - self.sql_manager.clean_table('nyt_archive') - - def get_api_key(self): - with open('etl/keys.json', ) as f: - keys = json.load(f) - api_key = keys["nyt"]["Key"] - return api_key - - def nyt_upload_all_articles(self, year_start=2001, verbose=0): - now = datetime.now() - for year in range(year_start, now.year + 1): - month_end = 12 - if year == now.year: - month_end = now.month - 1 - for month in range(1, month_end + 1): - self.nyt_upload_one_month_articles(year, month, verbose) - - - def nyt_upload_one_month_articles(self, year, month, verbose=0): - try: - bool_verbose = verbose > 0 - - # Verify data has not been downloaded - year_month = f"{year}{month:02}" - db_year_month_list = self.sql_manager.select_distinct_column_list('year_month', 'nyt_archive') - if year_month in db_year_month_list: - if bool_verbose: - print(f"Database already has NYT articles for {year}-{month}") - return - - # Download - if bool_verbose: - print(f"Downloading NYT {year_month}") - t0 = datetime.now() - df = self.nyt_api_download_one_month_articles(year, month) - t1 = datetime.now() - if bool_verbose: - if df is None: - print(f"No data returned for {year_month} ({(t1 - t0).total_seconds():.2f} sec)") - return - else: - print(f"Downloaded NYT {year_month} ({(t1 - t0).total_seconds():.2f} sec)") - - # Upload - if bool_verbose: - print(f"Uploading NYT {year_month}") - t0 = datetime.now() - self.sql_manager.upload_df_chunks('nyt_archive', df) - t1 = datetime.now() - if verbose > 0: - print(f"Uploaded NYT {year_month} ({(t1 - t0).total_seconds():.2f} sec)") - - except Exception as e: - t1 = datetime.now() - print(f"ERROR: Failed to upload NYT {year_month} ({(t1 - t0).total_seconds():.2f} sec)") - exc_type, exc_value, exc_tb = sys.exc_info() - traceback.print_exception(exc_type, exc_value, exc_tb) - - def nyt_api_download_one_month_articles(self, year, month, lud=None): - - # Scrape nyt api - url = f"https://api.nytimes.com/svc/archive/v1/{year}/{month}.json?api-key={self.api_key}" - if lud is None: - lud = datetime.now() - json_data = requests.get(url) - data = json.loads(json_data.text) - biz_articles = [] - for i, article in enumerate(data['response']['docs']): - news_desk = article['news_desk'] - if news_desk == 'Business': - org = [keyword['value'] for keyword in article['keywords'] if keyword['name'] == 'organizations'] - if org: - sub = [keyword['value'] for keyword in article['keywords'] if keyword['name'] == 'subject'] - org_str = ' | '.join(org) - sub_str = ' | '.join(sub) - web_url = article['web_url'] - pub_date = article['pub_date'] - headline = article['headline']['main'] - snippet = article['snippet'] - biz_article = { - 'web_url': web_url, - 'pub_date': pub_date, - 'organizations': org_str, - 'subjects': sub_str, - 'headline': headline, - 'snippet': snippet, - } - biz_articles.append(biz_article) - - # Convert to DataFrame and minimal cleaning - cols = ['web_url', 'pub_date', 'organizations', 'subjects', 'headline', 'snippet'] - df = pd.DataFrame(biz_articles) - if df.empty: - return None - df = df[cols] - df['pub_date'] = pd.to_datetime(df.pub_date) - text_insert_cols = ['organizations', 'subjects', 'headline', 'snippet'] - df[text_insert_cols] = df[text_insert_cols].mask(df[text_insert_cols].isnull(), '') - df['year_month'] = f"{year}{month:02}" - df['lud'] = lud - final_cols = ['web_url', 'year_month', 'pub_date', 'organizations', 'subjects', 'headline', 'snippet', 'lud'] - df = df[final_cols] - - return df - - diff --git a/esgtools/nyt/nyt_link.py b/esgtools/nyt/nyt_link.py new file mode 100644 index 0000000..06c728e --- /dev/null +++ b/esgtools/nyt/nyt_link.py @@ -0,0 +1,209 @@ + +# External +import os +import sys +import requests +import json +import time +import pandas as pd +from datetime import datetime +import traceback + +# Internal +from utils import sql_manager + + +# Manual mapping of NYT orgs to asset ids +# needed for stocks that are not easily linked by name +# or where name is not available +MANUAL_ORG_TO_STOCK_MAP = [ + ["Google Inc", ["GOOG-14542","GOOGL-90319"]], + ["YouTube.com", ["GOOG-14542","GOOGL-90319"]], + ["Facebook Inc", ["META-13407"]], + ["Facebook.com", ["META-13407"]], + ["Lehman Brothers Holdings Inc", ["LEH-80599"]], + ["Wal-Mart Stores Inc", ["WMT-55976"]], + ["Tesla Motors Inc", ["TSLA-93436"]], + ["Tesla Motors", ["TSLA-93436"]], + ["Bear Stearns Cos", ["BSC-68304"]], + ["Bear Stearns Companies Incorporated", ["BSC-68304"]], +] + + +class NytNewsLinker: + """ Linking NYT news to assets""" + + def __init__(self, org_to_asset_table: str, news_to_asset_table: str, db_credentials): + # Initial parameters + self.org_to_asset_table = org_to_asset_table + self.news_to_asset_table = news_to_asset_table + self.sql_manager = sql_manager.ManagerSQL(db_credentials) + + def update_news_links(self): + + # Read stock info + stocks = self.sql_manager.select_query(""" + select id as asset_id, symbol, name, share_class, exchange, ipo_date, delisting_date + from assets where asset_type = 'Stock' + """) + + # Read news info + nyt_news = self.sql_manager.select_query(f"select * from nyt_archive") + + # Get list of unique NYT organizations with their frequency on news articles + org_counts = self.get_org_counts(nyt_news) + + # Base link between orgs and stocks based on name + org_merged_base = self.get_org_to_asset_link_based_on_name(org_counts, stocks) + + # Link based on manual map + org_merged_add = self.get_org_to_asset_link_based_on_manual_map(org_counts, stocks) + + # Final link between orgs and stocks + org_merged = self.consolidate_org_to_asset_links(org_merged_base, org_merged_add) + + # Upload org to stock map + self.upload_org_to_asset(org_merged) + + # Link news to stocks + news_to_asset = self.get_news_to_asset_link(nyt_news, org_merged) + + # Upload news to stock map + self.upload_news_to_asset(news_to_asset) + + @staticmethod + def get_org_counts(nyt_news): + exploded_orgs = nyt_news.organizations.str.split(' \| ').explode() + org_counts = ( + exploded_orgs + .value_counts() + .reset_index() + .rename(columns={"index": "organization", "organizations": "count"}) + ) + return org_counts + + def get_org_to_asset_link_based_on_name(self, org_counts: pd.DataFrame, stocks: pd.DataFrame): + """ Systematic attempt at linking orgs with stocks based on a minimal version of the name. """ + col_clean = "name_clean" + self.add_name_clean(org_counts, "organization", col_clean) + self.add_name_clean(stocks, "name", col_clean) + org_merged_base = org_counts.merge( + stocks, + how="left", + left_on=[col_clean], + right_on=[col_clean] + ) + return org_merged_base + + @staticmethod + def add_name_clean(df: pd.DataFrame, col: str, col_clean: str): + df[col_clean] = ( + df[col] + .str.lower() + .str.replace(r"[.,&+()!`']", "", regex=True) + .str.replace(r"\b(corporation|company|incorporated|group|stores|enterprise|enterprises|holdings|solutions|technology|technologies|pictures|television|entertainment|network|limited|corp|co|inc|ltd|llc|plc|ag|sa|spa|nv|the)\b", "", regex=True) + .str.replace(r"[/]", "", regex=True) + .str.replace(r"class\s+[a-z]$", "", regex=True) + .str.replace(r"cls\s+[a-z]$", "", regex=True) + .str.replace(r"\-", " ", regex=True) + .str.strip() + ) + + @staticmethod + def get_org_to_asset_link_based_on_manual_map(org_counts: pd.DataFrame, stocks: pd.DataFrame): + + # Prepare map + df_manual_org_to_stock_map = ( + pd.DataFrame( + MANUAL_ORG_TO_STOCK_MAP, + columns=["organization", "asset_id"] + ) + ) + df_manual_org_to_stock_map = ( + df_manual_org_to_stock_map + .explode("asset_id") + ) + + # Merge with manual map + org_merged_add = ( + org_counts.merge( + df_manual_org_to_stock_map, + how="left", + left_on=["organization"], + right_on=["organization"] + ) + ) + org_merged_add = ( + org_merged_add + .loc[~org_merged_add.asset_id.isnull()] + ) + org_merged_add = ( + org_merged_add + .merge( + stocks.rename(columns={"name_clean": "name_clean_manual"}), + how="left", + left_on=["asset_id"], + right_on=["asset_id"] + ) + ) + + return org_merged_add + + @staticmethod + def consolidate_org_to_asset_links(org_merged_base: pd.DataFrame, org_merged_add: pd.DataFrame): + + # Merge between both strategies + org_merged_raw = ( + pd.concat((org_merged_base, + org_merged_add[org_merged_base.columns]), + axis=0) + ) + + # Separate those with missing link + org_merged = ( + org_merged_raw + .drop_duplicates(["organization", "asset_id"]) + .groupby('organization').agg({ + 'asset_id': lambda x: ','.join(x.dropna()), + 'count': 'first' + }) + .rename(columns={"asset_id": "asset_ids"}) + .reset_index() + .sort_values(["count"], ascending=False) + ) + + return org_merged + + @staticmethod + def get_news_to_asset_link(nyt_news, org_merged) -> pd.DataFrame: + + # Explode orgs column in news + nyt_news_expanded = ( + nyt_news + .assign(organizations=nyt_news['organizations'].str.split(' \| ')) + .explode('organizations') + .reset_index(drop=True) + .rename(columns={"organizations": "organization"}) + ) + + news_to_asset = ( + nyt_news_expanded[["web_url", "organization"]] + .merge( + org_merged[["organization", "asset_ids"]], + how="left", + on=["organization"] + ) + .drop_duplicates(["web_url", "organization"], keep="first", inplace=True) + ) + + return news_to_asset + + def upload_org_to_asset(self, org_merged: pd.DataFrame): + org_to_asset_cols = ["organization", "count", "asset_ids"] + self.sql_manager.query(f"delete from {self.org_to_asset_table}") + self.sql_manager.upload_df_chunks(self.org_to_asset_table, org_merged[org_to_asset_cols]) + + def upload_news_to_asset(self, news_to_asset: pd.DataFrame): + news_to_asset_cols = ["web_url", "organization", "asset_ids"] + self.sql_manager.query(f"delete from {self.news_to_asset_table}") + self.sql_manager.upload_df_chunks(self.news_to_asset_table, news_to_asset[news_to_asset_cols]) diff --git a/esgtools/nyt/nyt_scrape.py b/esgtools/nyt/nyt_scrape.py new file mode 100644 index 0000000..c42b8ef --- /dev/null +++ b/esgtools/nyt/nyt_scrape.py @@ -0,0 +1,142 @@ + +# External +import os +import sys +import requests +import json +import time +import pandas as pd +from datetime import datetime +import traceback + +# Internal +from utils import sql_manager + + +class NytNewsScraper: + """ NYT news articles download """ + + def __init__(self, table_name: str, api_key: str, db_credentials): + # Initial parameters + self.table_name = table_name + self.api_key = api_key + self.sql_manager = sql_manager.ManagerSQL(db_credentials) + + def nyt_upload_all_articles(self, year_start=2001, clean_tables=False, verbose=False): + if clean_tables: + self.sql_manager.clean_table(self.table_name) + now = datetime.now() + for year in range(year_start, now.year + 1): + month_end = 12 + if year == now.year: + month_end = now.month - 1 + for month in range(1, month_end + 1): + self.nyt_upload_one_month_articles(year, month, verbose) + + def nyt_upload_one_month_articles(self, year, month, verbose=False): + try: + + # Verify data has not been downloaded + year_month = f"{year}{month:02}" + db_year_month_list = self.sql_manager.select_distinct_column_list("year_month", "nyt_archive") + if year_month in db_year_month_list: + if verbose: + print(f"Database already has NYT articles for {year}-{month}") + return + + # Download + if verbose: + print(f"Downloading NYT {year_month}") + t0 = datetime.now() + df = self.nyt_api_download_one_month_articles(year, month) + t1 = datetime.now() + if verbose: + if df is None: + print(f"No data returned for {year_month} ({(t1 - t0).total_seconds():.2f} sec)") + return + else: + print(f"Downloaded NYT {year_month} ({(t1 - t0).total_seconds():.2f} sec)") + + # Upload + if verbose: + print(f"Uploading NYT {year_month}") + t0 = datetime.now() + self.sql_manager.upload_df_chunks("nyt_archive", df) + t1 = datetime.now() + if verbose > 0: + print(f"Uploaded NYT {year_month} ({(t1 - t0).total_seconds():.2f} sec)") + + except Exception as e: + t1 = datetime.now() + print(f"ERROR: Failed to upload NYT {year_month} ({(t1 - t0).total_seconds():.2f} sec)") + exc_type, exc_value, exc_tb = sys.exc_info() + traceback.print_exception(exc_type, exc_value, exc_tb) + + def nyt_api_download_one_month_articles(self, year, month, attempts=3, lud=None): + + try: + # Scrape nyt api + url = f"https://api.nytimes.com/svc/archive/v1/{year}/{month}.json?api-key={self.api_key}" + if lud is None: + lud = datetime.now() + json_data = requests.get(url) + print(f"attempts = {attempts}") + for attempt in range(attempts): + print(f"attempt {attempt}") + data = json.loads(json_data.text) + if "response" in data: + break + elif "fault" in data: + if "faultstring" in data["fault"]: + if data["fault"]["faultstring"].startswith("Rate limit quota violation"): + print("Quota violation, sleeping for 60 seconds") + time.sleep(60) + if "response" not in data: + print(data) + raise ValueError("data has not the expected keys") + + biz_articles = [] + for i, article in enumerate(data["response"]["docs"]): + news_desk = article["news_desk"] + if news_desk == "Business": + org = [keyword["value"] for keyword in article["keywords"] if keyword["name"] == "organizations"] + if org: + sub = [keyword["value"] for keyword in article["keywords"] if keyword["name"] == "subject"] + org_str = " | ".join(org) + sub_str = " | ".join(sub) + web_url = article["web_url"] + pub_date = article["pub_date"] + headline = article["headline"]["main"] + snippet = article["snippet"] + biz_article = { + "web_url": web_url, + "pub_date": pub_date, + "organizations": org_str, + "subjects": sub_str, + "headline": headline, + "snippet": snippet, + } + biz_articles.append(biz_article) + + # Convert to DataFrame and minimal cleaning + cols = ["web_url", "pub_date", "organizations", "subjects", "headline", "snippet"] + df = pd.DataFrame(biz_articles) + if df.empty: + return None + df = df[cols] + df["pub_date"] = pd.to_datetime(df.pub_date) + text_insert_cols = ["organizations", "subjects", "headline", "snippet"] + df[text_insert_cols] = df[text_insert_cols].mask(df[text_insert_cols].isnull(), "") + df["year_month"] = f"{year}{month:02}" + df["lud"] = lud + final_cols = ["web_url", "year_month", "pub_date", "organizations", "subjects", "headline", "snippet", "lud"] + df = df[final_cols] + + return df + + except Exception as e: + t1 = datetime.now() + exc_type, exc_value, exc_tb = sys.exc_info() + traceback.print_exception(exc_type, exc_value, exc_tb) + if data: + print("data:", data) diff --git a/esgtools/update_nyt_articles.py b/esgtools/update_nyt_articles.py new file mode 100644 index 0000000..936197c --- /dev/null +++ b/esgtools/update_nyt_articles.py @@ -0,0 +1,48 @@ +import os +import json +import boto3 +from botocore.exceptions import ClientError +from ast import literal_eval + +from nyt.nyt_scrape import NytNewsScraper +from utils import sql_manager, aws, utils +from alpha import api, table + + +def lambda_handler(event, context): + """ Update NYT articles. """ + print("event", event) + + # Example + # {'queryStringParameters': {'table_name': 'nyt_archive', 'year_start': 2001, 'clean_table': 'False'}} + + # Inputs + if 'queryStringParameters' in event: + inputs = event["queryStringParameters"] + else: + inputs = event + + # Gather parameters + table_name = inputs.get("table_name", "nyt_archive") + year_start = int(inputs.get("year_start", 2001)) + clean_table = utils.str2bool(inputs.get("clean_table", "False")) + verbose = utils.str2bool(inputs.get("verbose", "False")) + print(f"table_name = {table_name}") + print(f"year_start = {year_start}") + print(f"clean_table = {clean_table}") + print(f"verbose = {verbose}") + + # Decrypts secret using the associated KMS key. + db_credentials = literal_eval(aws.get_secret("prod/awsportfolio/key")) + api_key = literal_eval(aws.get_secret("prod/NYTApi/key"))["NYT_API_KEY"] + + # Update NYT articles + nyt_scraper = NytNewsScraper(table_name, api_key, db_credentials) + nyt_scraper.nyt_upload_all_articles(year_start, clean_table, verbose) + + return { + "statusCode": 200, + "body": json.dumps({ + "message": "New York Times articles updated.", + }), + } diff --git a/esgtools/update_nyt_links.py b/esgtools/update_nyt_links.py new file mode 100644 index 0000000..2d9b8e3 --- /dev/null +++ b/esgtools/update_nyt_links.py @@ -0,0 +1,43 @@ +import os +import json +import boto3 +from botocore.exceptions import ClientError +from ast import literal_eval + +from nyt.nyt_link import NytNewsLinker +from utils import sql_manager, aws, utils +from alpha import api, table + + +def lambda_handler(event, context): + """ Update NYT articles. """ + print("event", event) + + # Example + # {'queryStringParameters': {'org_to_asset_table': 'nyt_org_to_asset', 'news_to_asset_table': 'nyt_news_to_asset'}} + + # Inputs + if 'queryStringParameters' in event: + inputs = event["queryStringParameters"] + else: + inputs = event + + # Gather parameters + org_to_asset_table = inputs.get("org_to_asset_table", "nyt_org_to_asset") + news_to_asset_table = inputs.get("news_to_asset_table", "nyt_news_to_asset") + print(f"org_to_asset_table = {org_to_asset_table}") + print(f"news_to_asset_table = {news_to_asset_table}") + + # Decrypts secret using the associated KMS key. + db_credentials = literal_eval(aws.get_secret("prod/awsportfolio/key")) + + # Update NYT articles + nyt_linker = NytNewsLinker(org_to_asset_table, news_to_asset_table, db_credentials) + nyt_linker.update_news_links() + + return { + "statusCode": 200, + "body": json.dumps({ + "message": "New York Times article to asset links updated.", + }), + } diff --git a/template.yaml b/template.yaml index f2f2b28..41a46bc 100644 --- a/template.yaml +++ b/template.yaml @@ -120,5 +120,29 @@ Resources: Role: arn:aws:iam::654580413909:role/LambdaSecretsManagerReadAccess Timeout: 900 MemorySize: 1000 + Architectures: + - x86_64 + + UpdateNytArticlesFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: esgtools/ + Handler: update_nyt_articles.lambda_handler + Runtime: python3.9 + Role: arn:aws:iam::654580413909:role/LambdaSecretsManagerReadAccess + Timeout: 600 + MemorySize: 500 + Architectures: + - x86_64 + + UpdateNytLinksFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: esgtools/ + Handler: update_nyt_links.lambda_handler + Runtime: python3.9 + Role: arn:aws:iam::654580413909:role/LambdaSecretsManagerReadAccess + Timeout: 600 + MemorySize: 500 Architectures: - x86_64 \ No newline at end of file