diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index ccfe963b..770dfaf2 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -1,6 +1,7 @@ import csv from datetime import datetime from typing import List +from operator import attrgetter from dagster import In, Nothing, Out, job, op, usable_as_dagster_type from pydantic import BaseModel @@ -51,15 +52,16 @@ def get_s3_data(context): @op -def process_data(): - pass +def process_data(stock_list: List[Stock]): + max_high = max(stock_list, key=attrgetter("high")) + return Aggregation(date=max_high.date, high=max_high.high) @op -def put_redis_data(): +def put_redis_data(aggregation: Aggregation): pass @job def week_1_pipeline(): - pass + put_redis_data(process_data(get_s3_data())) diff --git a/week_1/project/week_1_challenge.py b/week_1/project/week_1_challenge.py index 7d7abe1f..5865abaa 100644 --- a/week_1/project/week_1_challenge.py +++ b/week_1/project/week_1_challenge.py @@ -1,7 +1,9 @@ import csv from datetime import datetime +import heapq from heapq import nlargest from typing import List +from operator import attrgetter from dagster import ( DynamicOut, @@ -60,16 +62,44 @@ def get_s3_data(context): return output -@op -def process_data(): - pass +@op( + config_schema={"nlargest": int}, + ins={"stock_list": In(dagster_type=List[Stock])}, + out=DynamicOut(), + tags={"kind": "Aggregation"}, + description="Get a list of Aggregation based on the nlarges in config file", +) +def process_data(context, stock_list): + aggr_list = [] + numb_Items = context.op_config["nlargest"] + # One more way to find the nlargest items in a list + # largest_stocks_ow = heapq.nlargest(numb_Items, stock_list, + # key=lambda stock: stock.high) + largest_stocks = heapq.nlargest(numb_Items, stock_list, + key=sortkey) + [aggr_list.append(Aggregation(date=stock.date, high=stock.high)) + for stock in largest_stocks] + print(aggr_list) + for idx, aggregation in enumerate(aggr_list): + yield DynamicOutput(aggregation, mapping_key=str(idx)) -@op -def put_redis_data(): +@op(description="Upload an Aggregation to Redis", + tags={"kind": "redis"}) +def put_redis_data(context, aggregation: Aggregation) -> None: + print(aggregation) pass +# Define a function that returns a comparison key for Stock objects + + +def sortkey(stock): + return stock.high + @job def week_1_pipeline(): - pass + aggregations = process_data(get_s3_data()) + aggr_data = aggregations.map(put_redis_data) + # Why do we need this step? + aggr_data.collect() diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index bebdba3c..a0f0e375 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -1,3 +1,7 @@ +import csv +from importlib import resources +from operator import attrgetter +import resource from typing import List from dagster import In, Nothing, Out, ResourceDefinition, graph, op @@ -5,26 +9,44 @@ from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource -@op -def get_s3_data(): - pass +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock], + description="List of Stocks")}, +) +def get_s3_data(context): + output = list() + for csv_row in context.resources.s3.get_data(context.op_config["s3_key"]): + stock = Stock.from_list(csv_row) + output.append(stock) + return output -@op -def process_data(): - # Use your op from week 1 - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"highest_stock": Out(dagster_type=Aggregation)}, + description="Given a list of stocks, return an Aggregation with the highest value" +) +def process_data(stocks): + highest_stock = max(stocks, key=attrgetter("high")) + aggregation = Aggregation(date=highest_stock.date, high=highest_stock.high) + return aggregation -@op -def put_redis_data(): - pass +@op( + ins={"aggregation": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + description="Given a Aggregation, Upload to Redis" +) +def put_redis_data(context, aggregation): + context.resources.redis.put_data(aggregation.date, str(aggregation.high)) @graph def week_2_pipeline(): # Use your graph from week 1 - pass + put_redis_data(process_data(get_s3_data())) local = { @@ -54,7 +76,8 @@ def week_2_pipeline(): local_week_2_pipeline = week_2_pipeline.to_job( name="local_week_2_pipeline", config=local, - resource_defs={"s3": mock_s3_resource, "redis": ResourceDefinition.mock_resource()}, + resource_defs={"s3": mock_s3_resource, + "redis": ResourceDefinition.mock_resource()}, ) docker_week_2_pipeline = week_2_pipeline.to_job( diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index adeb742f..a5ab466a 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -91,13 +91,30 @@ def mock_s3_resource(): return s3_mock -@resource -def s3_resource(): - """This resource defines a S3 client""" - pass +@resource( + config_schema={ + "bucket": Field(String), + "access_key": Field(String), + "secret_key": Field(String), + "endpoint_url": Field(String), + }, + description="A resource that can run S3") +def s3_resource(context) -> S3: + + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"], + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": Field(String), + "port": Field(Int), + }, + description="A resource that can run Redis") +def redis_resource(context) -> Redis: """This resource defines a Redis client""" - pass + return Redis(host=context.resource_config["host"], port=context.resource_config["port"]) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index 49ef55ea..f26480b5 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -1,3 +1,4 @@ +from operator import attrgetter from typing import List from dagster import ( @@ -8,6 +9,9 @@ RetryPolicy, RunRequest, ScheduleDefinition, + schedule, + DefaultScheduleStatus, + StaticPartitionsDefinition, SkipReason, graph, op, @@ -19,28 +23,47 @@ from project.types import Aggregation, Stock -@op -def get_s3_data(): +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock], + description="List of Stocks")}, +) +def get_s3_data(context): # Use your ops from week 2 - pass + output = list() + for csv_row in context.resources.s3.get_data(context.op_config["s3_key"]): + stock = Stock.from_list(csv_row) + output.append(stock) + return output -@op -def process_data(): +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"highest_stock": Out(dagster_type=Aggregation)}, + description="Given a list of stocks, return an Aggregation with the highest value" +) +def process_data(stocks): # Use your ops from week 2 - pass + highest_stock = max(stocks, key=attrgetter("high")) + aggregation = Aggregation(date=highest_stock.date, high=highest_stock.high) + return aggregation -@op -def put_redis_data(): +@op( + ins={"aggregation": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + description="Given a Aggregation, Upload to Redis" +) +def put_redis_data(context, aggregation): # Use your ops from week 2 - pass + context.resources.redis.put_data(aggregation.date, str(aggregation.high)) @graph def week_3_pipeline(): # Use your graph from week 2 - pass + put_redis_data(process_data(get_s3_data())) local = { @@ -69,8 +92,12 @@ def week_3_pipeline(): } -def docker_config(): - pass +@static_partitioned_config(partition_keys=["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]) +def docker_config(partition: str): + update_docker_ops = docker + update_docker_ops["ops"]["get_s3_data"]["config"][ + "s3_key"] = f"prefix/stock_{partition}.csv" + return update_docker_ops local_week_3_pipeline = week_3_pipeline.to_job( @@ -89,14 +116,57 @@ def docker_config(): "s3": s3_resource, "redis": redis_resource, }, -) + op_retry_policy=RetryPolicy( + max_retries=10, + delay=1, # 200ms + ), - -local_week_3_schedule = None # Add your schedule - -docker_week_3_schedule = None # Add your schedule +) -@sensor -def docker_week_3_sensor(): - pass +local_week_3_schedule = ScheduleDefinition( + job=local_week_3_pipeline, cron_schedule="*/15 * * * *", default_status=DefaultScheduleStatus.RUNNING) # Add your schedule + +docker_week_3_schedule = ScheduleDefinition( + job=docker_week_3_pipeline, cron_schedule="0 * * * *", default_status=DefaultScheduleStatus.RUNNING) # Add your schedule + + +# Struggled to figure out what this job is +@ sensor(job=docker_week_3_pipeline, minimum_interval_seconds=30) +def docker_week_3_sensor(context): + new_files = get_s3_keys( + bucket="dagster", + prefix="prefix", + endpoint_url="http://localstack:4566", + since_key=".", + max_keys=1000, + ) + updated_config = docker_config + if not new_files: + yield SkipReason("No new s3 files found in bucket.") + return + for new_file in new_files: + + yield RunRequest( + run_key=new_file, + # updated_config["ops"]["get_s3_data"]["config"]["s3_key"] = new_file -- this is failing with ParsedConfig not subscriptable issue , Need to figure out how to update the config + # run_config=updated_config -- this is failing with ParsedConfig not subscriptable issue + run_config={"resources": { + "s3": { + "config": { + "bucket": "dagster", + "access_key": "test", + "secret_key": "test", + "endpoint_url": "http://localstack:4566", + } + }, + "redis": { + "config": { + "host": "redis", + "port": 6379, + } + }, + }, + "ops": {"get_s3_data": {"config": {"s3_key": new_file}}}, + }, + ) diff --git a/week_4/project/repo.py b/week_4/project/repo.py index b8668c5c..7cd09b71 100644 --- a/week_4/project/repo.py +++ b/week_4/project/repo.py @@ -15,6 +15,6 @@ def repo(): return [get_s3_data_docker, process_data_docker, put_redis_data_docker] -@repository -def assets_dbt(): - pass +# @repository +# def assets_dbt(): +# pass diff --git a/week_4/project/week_4.py b/week_4/project/week_4.py index 5c7def08..7aa5e74c 100644 --- a/week_4/project/week_4.py +++ b/week_4/project/week_4.py @@ -1,26 +1,93 @@ -from typing import List -from dagster import Nothing, asset, with_resources +from typing import List +from operator import attrgetter +from dagster import Nothing, AssetIn, AssetOut, asset, with_resources, resource, op, pipeline from project.resources import redis_resource, s3_resource from project.types import Aggregation, Stock -@asset -def get_s3_data(): +@asset( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + op_tags={"kind": "s3"}, + group_name="corise", + compute_kind="s3", + # out={"stocks": AssetOut(dagster_type=List[Stock], + # description="List of Stocks")}, + # Can we use AssetOut ? +) +def get_s3_data(context): # Use your op logic from week 3 - pass + output = list() + for csv_row in context.resources.s3.get_data(context.op_config["s3_key"]): + stock = Stock.from_list(csv_row) + output.append(stock) + return output -@asset -def process_data(): +@asset( + # Looks like AsseTIn is missning dagster_tye in ver 0.15.0 23 days ago somebody merged a PR + #ins={"stocks": AssetIn(dagster_type=List[Stock])}, + #out={"highest_stock": AssetOut(dagster_type=Aggregation)}, + #description="Given a list of stocks, return an Aggregation with the highest value" + group_name="corise", + compute_kind="python", +) +def process_data(get_s3_data): # Use your op logic from week 3 (you will need to make a slight change) - pass + # Use your ops from week 2 + highest_stock = max(get_s3_data, key=attrgetter("high")) + aggregation = Aggregation(date=highest_stock.date, high=highest_stock.high) + return aggregation -@asset -def put_redis_data(): +@asset( + #ins={"aggregation": AssetIn(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + description="Given a Aggregation, Upload to Redis", + group_name="corise", + compute_kind="redis", +) +def put_redis_data(context, process_data): # Use your op logic from week 3 (you will need to make a slight change) - pass + context.resources.redis.put_data( + str(process_data.date), str(process_data.high)) + + +get_s3_data_docker, process_data_docker, put_redis_data_docker = with_resources( + definitions=[get_s3_data, process_data, put_redis_data], + resource_defs={"s3": s3_resource, "redis": redis_resource}, + resource_config_by_key={ + "s3": { + "config": { + "bucket": "dagster", + "access_key": "test", + "secret_key": "test", + "endpoint_url": "http://localstack:4566", + } + }, + "redis": { + "config": { + "host": "redis", + "port": 6379, + } + }, + }, +) + +# I feel we can use this way to declare the resource as well -get_s3_data_docker, process_data_docker, put_redis_data_docker = with_resources() +@resource(config_schema={"s3_key": str}) +def s3_resource(): + s3_res = { + "s3": { + "config": { + "bucket": "dagster", + "access_key": "test", + "secret_key": "test", + "endpoint_url": "http://localstack:4566", + } + } + } + return s3_res