From 614dc7bc611f207283bab82210364bfa74f5a62e Mon Sep 17 00:00:00 2001 From: Alena Astrakhantseva Date: Wed, 26 Jun 2024 16:09:53 +0200 Subject: [PATCH] Fix/stripe/delete metric source (#509) * remove metrics resource * remove metrics resource * Update sources/stripe_analytics/README.md * Update sources/stripe_analytics/README.md --------- Co-authored-by: Anton Burnashev --- sources/stripe_analytics/README.md | 22 ++++----- sources/stripe_analytics/__init__.py | 37 --------------- sources/stripe_analytics_pipeline.py | 50 -------------------- tests/stripe_analytics/test_metrics.py | 11 ----- tests/stripe_analytics/test_stripe_source.py | 45 +----------------- 5 files changed, 13 insertions(+), 152 deletions(-) delete mode 100644 tests/stripe_analytics/test_metrics.py diff --git a/sources/stripe_analytics/README.md b/sources/stripe_analytics/README.md index 7a9607626..6b03e5642 100644 --- a/sources/stripe_analytics/README.md +++ b/sources/stripe_analytics/README.md @@ -1,7 +1,8 @@ # Stripe -Stripe is an online payment company that provides a platform for businesses to process payments from customers over the internet. This verified source uses Stripe's API and `dlt` to extract key data such as customer information, subscription details, event records, etc. and then load it into a database. In addition, the pipeline example shows how to calculate some key metrics such as MRR (Monthly Recurring Revenue) and churn rate. +Stripe is an online payment company that provides a platform for businesses to process payments from customers over the internet. This verified source uses Stripe's API and `dlt` to extract key data such as customer information, subscription details, event records, etc. and then load it into a database. This verified source loads data from the following default endpoints: + | Endpoint | Description | | --- | --- | | Subscription | recurring payment model offered by the Stripe payment platform. | @@ -14,11 +15,11 @@ This verified source loads data from the following default endpoints: | Invoice | a document that represents a request for payment from a customer. | | BalanceTransaction | represents a record of funds movement within a Stripe account. | -> Please note that the endpoints within the verified source can be tailored to meet your specific requirements, as outlined in the Stripe API reference documentation Detailed instructions on customizing these endpoints can be found in the customization section here. +> Please note that the endpoints within the verified source can be tailored to meet your specific requirements, as outlined in the Stripe API reference documentation. Detailed instructions on customizing these endpoints can be found in the customization section [here](https://dlthub.com/docs/dlt-ecosystem/verified-sources/stripe#customization). ## Initialize the pipeline ```bash -dlt init stripe bigquery +dlt init stripe duckdb ``` Here, we chose BigQuery as the destination. Alternatively, you can also choose redshift, duckdb, or any of the other [destinations.](https://dlthub.com/docs/dlt-ecosystem/destinations/) @@ -34,7 +35,7 @@ To get the full list of supported endpoints, grab API credentials and initialise [sources.stripe_analytics] stripe_secret_key = "stripe_secret_key"# Please set me up! ``` - + 3. Enter credentials for your chosen destination as per the [docs.](https://dlthub.com/docs/dlt-ecosystem/destinations/) ## Running the pipeline example @@ -42,19 +43,18 @@ To get the full list of supported endpoints, grab API credentials and initialise ```bash pip install -r requirements.txt ``` - + 2. Now you can build the verified source by using the command: ```bash - python3 stripe_analytics_pipeline.py + python stripe_analytics_pipeline.py ``` - + 3. To ensure that everything loads as expected, use the command: ```bash dlt pipeline show ``` For example, the pipeline_name for the above pipeline example is `stripe_analytics`, you can use any custom name instead. - -💡 To explore additional customizations for this pipeline, we recommend referring to the official dlt Stripe verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the dlt Stripe documentation in [Setup Guide: Stripe](https://dlthub.com/docs/dlt-ecosystem/verified-sources/stripe) . - - + +💡 To explore additional customizations for this pipeline, we recommend referring to the official dlt Stripe verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the dlt Stripe documentation in [Setup Guide: Stripe](https://dlthub.com/docs/dlt-ecosystem/verified-sources/stripe). + diff --git a/sources/stripe_analytics/__init__.py b/sources/stripe_analytics/__init__.py index a0de8209c..853e58efd 100644 --- a/sources/stripe_analytics/__init__.py +++ b/sources/stripe_analytics/__init__.py @@ -4,13 +4,10 @@ import dlt import stripe -from dlt.common import pendulum -from dlt.common.typing import TDataItem from dlt.sources import DltResource from pendulum import DateTime from .helpers import pagination, transform_date -from .metrics import calculate_mrr, churn_rate from .settings import ENDPOINTS, INCREMENTAL_ENDPOINTS @@ -100,37 +97,3 @@ def incremental_resource( write_disposition="append", primary_key="id", )(endpoint) - - -@dlt.resource(name="Metrics", write_disposition="append", primary_key="created") -def metrics_resource() -> Iterable[TDataItem]: - """ - Uses a SQL client to query the subscription and event data, - calculate the metrics, and store the results in the SQL table. - The function returns a generator that yields a dictionary containing - the calculated metrics data, including MRR (Monthly Recurring Revenue) - and Churn rate, as well as the current timestamp. - - Returns: - Iterable[TDataItem]: A generator that yields a dictionary containing the calculated metrics data. - """ - pipeline = dlt.current.pipeline() # type: ignore - with pipeline.sql_client() as client: - with client.execute_query("SELECT * FROM subscription") as table: - sub_info = table.df() - - # Access to events through the Retrieve Event API is guaranteed only for 30 days. - # But we probably have old data in the database. - with pipeline.sql_client() as client: - with client.execute_query( - "SELECT * FROM event WHERE created > %s", pendulum.now().subtract(days=30) - ) as table: - event_info = table.df() - - mrr = calculate_mrr(sub_info) - print(f"MRR: {mrr}") - - churn = churn_rate(event_info, sub_info) - print(f"Churn rate: {round(churn * 100, 1)}%") - - yield {"MRR": mrr, "Churn rate": churn, "created": pendulum.now()} diff --git a/sources/stripe_analytics_pipeline.py b/sources/stripe_analytics_pipeline.py index 19b45c1db..e790266c1 100644 --- a/sources/stripe_analytics_pipeline.py +++ b/sources/stripe_analytics_pipeline.py @@ -6,7 +6,6 @@ ENDPOINTS, INCREMENTAL_ENDPOINTS, incremental_stripe_source, - metrics_resource, stripe_source, ) @@ -91,53 +90,6 @@ def load_incremental_endpoints( # print(load_info) -def load_data_and_get_metrics() -> None: - """ - With the pipeline, you can calculate the most important metrics - and store them in a database as a resource. - Store metrics, get calculated metrics from the database, build dashboards. - - Supported metrics: - Monthly Recurring Revenue (MRR), - Subscription churn rate. - - Pipeline returns both metrics. - - Use Subscription and Event endpoints to calculate the metrics. - """ - - pipeline = dlt.pipeline( - pipeline_name="stripe_analytics", - destination="duckdb", - dataset_name="stripe_metrics", - ) - - # Event is an endpoint with uneditable data, so we can use 'incremental_stripe_source'. - source_event = incremental_stripe_source(endpoints=("Event",)) - # Subscription is an endpoint with editable data, use stripe_source. - source_subs = stripe_source(endpoints=("Subscription",)) - - # convert dates to the timestamp format - source_event.resources["Event"].apply_hints( - columns={ - "created": {"data_type": "timestamp"}, - } - ) - - source_subs.resources["Subscription"].apply_hints( - columns={ - "created": {"data_type": "timestamp"}, - } - ) - - load_info = pipeline.run(data=[source_subs, source_event]) - print(load_info) - - resource = metrics_resource() - load_info = pipeline.run(resource) - print(load_info) - - if __name__ == "__main__": load_data() # # load only data that was created during the period between the Jan 1, 2024 (incl.), and the Feb 1, 2024 (not incl.). @@ -149,5 +101,3 @@ def load_data_and_get_metrics() -> None: # initial_start_date=datetime(2023, 5, 3), # end_date=datetime(2024, 3, 1), # ) - # # load Subscription and Event data, calculate metrics, store them in a database - # load_data_and_get_metrics() diff --git a/tests/stripe_analytics/test_metrics.py b/tests/stripe_analytics/test_metrics.py deleted file mode 100644 index 69228b122..000000000 --- a/tests/stripe_analytics/test_metrics.py +++ /dev/null @@ -1,11 +0,0 @@ -from sources.stripe_analytics.metrics import calculate_mrr, churn_rate - - -class TestMetrics: - def test_mrr(self, subscription_dataset): - mrr = calculate_mrr(subscription_dataset) - assert mrr == 140, mrr - - def test_churn(self, subscription_dataset, event_dataset): - churn = churn_rate(event_dataset, subscription_dataset) - assert churn == 0.5, churn diff --git a/tests/stripe_analytics/test_stripe_source.py b/tests/stripe_analytics/test_stripe_source.py index 08762c517..601005199 100644 --- a/tests/stripe_analytics/test_stripe_source.py +++ b/tests/stripe_analytics/test_stripe_source.py @@ -3,11 +3,9 @@ from pendulum import datetime from sources.stripe_analytics import ( - metrics_resource, incremental_stripe_source, stripe_source, ) - from tests.utils import ALL_DESTINATIONS, assert_load_info @@ -66,10 +64,10 @@ def test_load_subscription(destination_name: str) -> None: # make sure all jobs were loaded assert_load_info(info) # now let's inspect the generated schema. - # it should contain just two tables: subscription and subscription__items__data + # it should contain just three tables: subscription, subscription__items__data and subscription__discounts schema = pipeline.default_schema user_tables = schema.data_tables() - assert len(user_tables) == 2 + assert len(user_tables) == 3 # tables are typed dicts subscription_table = user_tables[0] assert subscription_table["name"] == "subscription" @@ -131,42 +129,3 @@ def get_active_subs() -> int: # we have new subscriptions in the next day! assert_load_info(info) assert get_active_subs() > active_subs - - -@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) -def test_metrics(destination_name: str) -> None: - # mind the full_refresh flag - it makes sure that data is loaded to unique dataset. - # this allows you to run the tests on the same database in parallel - pipeline = dlt.pipeline( - pipeline_name="stripe_analytics_metric_test", - destination=destination_name, - dataset_name="stripe_metric_test", - full_refresh=True, - ) - # Event has only uneditable data, so we should use 'incremental_stripe_source'. - source = incremental_stripe_source(endpoints=("Event",)) - source.resources["Event"].apply_hints( - columns={ - "created": {"data_type": "timestamp"}, - } - ) - load_info = pipeline.run(source) - print(load_info) - - # Subscription has editable data, use stripe_source. - source = stripe_source(endpoints=("Subscription",)) - source.resources["Subscription"].apply_hints( - columns={ - "created": {"data_type": "timestamp"}, - } - ) - load_info = pipeline.run(source) - print(load_info) - - resource = metrics_resource() - - mrr = list(resource)[0]["MRR"] - assert mrr > 0 - - load_info = pipeline.run(resource) - assert_load_info(load_info)