From bf2c3640660f123c6d21c4fbf4a015769f8fb900 Mon Sep 17 00:00:00 2001 From: Max Abitbol Date: Tue, 22 Nov 2022 14:57:58 +0000 Subject: [PATCH 1/5] Started work --- week_1/project/week_1.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 095b93af..6036ebc6 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -41,14 +41,20 @@ def csv_helper(file_name: str) -> Iterator[Stock]: yield Stock.from_list(row) -@op -def get_s3_data(): - pass +@op(config_schema={"s3_key": String}) +def get_s3_data(context) -> List[Stock]: + csv = csv_helper(context.op_config["s3_key"]) + return [stock for stock in csv] @op -def process_data(): - pass +def process_data(data: List[Stock]) -> Aggregation: + agg_stock = Aggregation(date=datetime.today(), high=-1) + for stock in data: + next_agg = Aggregation(date=stock.date, high=stock.high) + if next_agg.high > agg_stock.high: + agg_stock = next_agg + return agg_stock @op @@ -58,4 +64,6 @@ def put_redis_data(): @job def week_1_pipeline(): + data = get_s3_data() + agg_data = process_data(data) pass From 29f8b03929c01459e0c01f928ad10ed9ee8f81e2 Mon Sep 17 00:00:00 2001 From: Max Abitbol Date: Mon, 28 Nov 2022 15:27:19 +0000 Subject: [PATCH 2/5] test --- .gitignore | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f425d9f5..45685919 100644 --- a/.gitignore +++ b/.gitignore @@ -156,4 +156,15 @@ cython_debug/ #.idea/ # Mac OS -.DS_Store \ No newline at end of file +.DS_Store +.idea/.gitignore +.idea/corise-dagster.iml +.gitignore +.gitignore +.gitignore +.gitignore +.idea/modules.xml +.gitignore +.idea/inspectionProfiles/profiles_settings.xml +.idea/vcs.xml +dagster From d467c460b726a578a44dc1cdce88bcaea605b155 Mon Sep 17 00:00:00 2001 From: Max Abitbol Date: Mon, 28 Nov 2022 15:38:05 +0000 Subject: [PATCH 3/5] project 1 implementation --- week_1/project/week_1.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 6036ebc6..9ec5ee69 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -48,7 +48,7 @@ def get_s3_data(context) -> List[Stock]: @op -def process_data(data: List[Stock]) -> Aggregation: +def process_data(context, data: List[Stock]) -> Aggregation: agg_stock = Aggregation(date=datetime.today(), high=-1) for stock in data: next_agg = Aggregation(date=stock.date, high=stock.high) @@ -58,7 +58,7 @@ def process_data(data: List[Stock]) -> Aggregation: @op -def put_redis_data(): +def put_redis_data(context, agg_data: Aggregation): pass @@ -66,4 +66,5 @@ def put_redis_data(): def week_1_pipeline(): data = get_s3_data() agg_data = process_data(data) + put_redis_data(agg_data) pass From 00d2e36491f1434e1bc596a4421d2761696f5d0d Mon Sep 17 00:00:00 2001 From: Max Abitbol Date: Mon, 28 Nov 2022 15:39:59 +0000 Subject: [PATCH 4/5] gitignore update --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 45685919..ce68e102 100644 --- a/.gitignore +++ b/.gitignore @@ -168,3 +168,4 @@ cython_debug/ .idea/inspectionProfiles/profiles_settings.xml .idea/vcs.xml dagster +*.db From f3600524989950c11692c7b04fb3d744acf1a4c4 Mon Sep 17 00:00:00 2001 From: Max Abitbol Date: Mon, 28 Nov 2022 15:42:31 +0000 Subject: [PATCH 5/5] removed pass in job --- week_1/project/week_1.py | 1 - 1 file changed, 1 deletion(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 9ec5ee69..516d19c7 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -67,4 +67,3 @@ def week_1_pipeline(): data = get_s3_data() agg_data = process_data(data) put_redis_data(agg_data) - pass