Skip to content

Commit

Permalink
Fix/stripe/delete metric source (#509)
Browse files Browse the repository at this point in the history
* remove metrics resource

* remove metrics resource

* Update sources/stripe_analytics/README.md

* Update sources/stripe_analytics/README.md

---------

Co-authored-by: Anton Burnashev <[email protected]>
  • Loading branch information
AstrakhantsevaAA and burnash authored Jun 26, 2024
1 parent fcc6821 commit 614dc7b
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 152 deletions.
22 changes: 11 additions & 11 deletions sources/stripe_analytics/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Stripe
Stripe is an online payment company that provides a platform for businesses to process payments from customers over the internet. This verified source uses Stripe's API and `dlt` to extract key data such as customer information, subscription details, event records, etc. and then load it into a database. In addition, the pipeline example shows how to calculate some key metrics such as MRR (Monthly Recurring Revenue) and churn rate.
Stripe is an online payment company that provides a platform for businesses to process payments from customers over the internet. This verified source uses Stripe's API and `dlt` to extract key data such as customer information, subscription details, event records, etc. and then load it into a database.

This verified source loads data from the following default endpoints:

| Endpoint | Description |
| --- | --- |
| Subscription | recurring payment model offered by the Stripe payment platform. |
Expand All @@ -14,11 +15,11 @@ This verified source loads data from the following default endpoints:
| Invoice | a document that represents a request for payment from a customer. |
| BalanceTransaction | represents a record of funds movement within a Stripe account. |

> Please note that the endpoints within the verified source can be tailored to meet your specific requirements, as outlined in the Stripe API reference documentation Detailed instructions on customizing these endpoints can be found in the customization section here.
> Please note that the endpoints within the verified source can be tailored to meet your specific requirements, as outlined in the Stripe API reference documentation. Detailed instructions on customizing these endpoints can be found in the customization section [here](https://dlthub.com/docs/dlt-ecosystem/verified-sources/stripe#customization).
## Initialize the pipeline
```bash
dlt init stripe bigquery
dlt init stripe duckdb
```

Here, we chose BigQuery as the destination. Alternatively, you can also choose redshift, duckdb, or any of the other [destinations.](https://dlthub.com/docs/dlt-ecosystem/destinations/)
Expand All @@ -34,27 +35,26 @@ To get the full list of supported endpoints, grab API credentials and initialise
[sources.stripe_analytics]
stripe_secret_key = "stripe_secret_key"# Please set me up!
```

3. Enter credentials for your chosen destination as per the [docs.](https://dlthub.com/docs/dlt-ecosystem/destinations/)

## Running the pipeline example
1. Install the required dependencies by running the following command:
```bash
pip install -r requirements.txt
```

2. Now you can build the verified source by using the command:
```bash
python3 stripe_analytics_pipeline.py
python stripe_analytics_pipeline.py
```

3. To ensure that everything loads as expected, use the command:
```bash
dlt pipeline <pipeline_name> show
```
For example, the pipeline_name for the above pipeline example is `stripe_analytics`, you can use any custom name instead.


💡 To explore additional customizations for this pipeline, we recommend referring to the official dlt Stripe verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the dlt Stripe documentation in [Setup Guide: Stripe](https://dlthub.com/docs/dlt-ecosystem/verified-sources/stripe) .

💡 To explore additional customizations for this pipeline, we recommend referring to the official dlt Stripe verified documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the dlt Stripe documentation in [Setup Guide: Stripe](https://dlthub.com/docs/dlt-ecosystem/verified-sources/stripe).

37 changes: 0 additions & 37 deletions sources/stripe_analytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@

import dlt
import stripe
from dlt.common import pendulum
from dlt.common.typing import TDataItem
from dlt.sources import DltResource
from pendulum import DateTime

from .helpers import pagination, transform_date
from .metrics import calculate_mrr, churn_rate
from .settings import ENDPOINTS, INCREMENTAL_ENDPOINTS


Expand Down Expand Up @@ -100,37 +97,3 @@ def incremental_resource(
write_disposition="append",
primary_key="id",
)(endpoint)


@dlt.resource(name="Metrics", write_disposition="append", primary_key="created")
def metrics_resource() -> Iterable[TDataItem]:
"""
Uses a SQL client to query the subscription and event data,
calculate the metrics, and store the results in the SQL table.
The function returns a generator that yields a dictionary containing
the calculated metrics data, including MRR (Monthly Recurring Revenue)
and Churn rate, as well as the current timestamp.
Returns:
Iterable[TDataItem]: A generator that yields a dictionary containing the calculated metrics data.
"""
pipeline = dlt.current.pipeline() # type: ignore
with pipeline.sql_client() as client:
with client.execute_query("SELECT * FROM subscription") as table:
sub_info = table.df()

# Access to events through the Retrieve Event API is guaranteed only for 30 days.
# But we probably have old data in the database.
with pipeline.sql_client() as client:
with client.execute_query(
"SELECT * FROM event WHERE created > %s", pendulum.now().subtract(days=30)
) as table:
event_info = table.df()

mrr = calculate_mrr(sub_info)
print(f"MRR: {mrr}")

churn = churn_rate(event_info, sub_info)
print(f"Churn rate: {round(churn * 100, 1)}%")

yield {"MRR": mrr, "Churn rate": churn, "created": pendulum.now()}
50 changes: 0 additions & 50 deletions sources/stripe_analytics_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
ENDPOINTS,
INCREMENTAL_ENDPOINTS,
incremental_stripe_source,
metrics_resource,
stripe_source,
)

Expand Down Expand Up @@ -91,53 +90,6 @@ def load_incremental_endpoints(
# print(load_info)


def load_data_and_get_metrics() -> None:
"""
With the pipeline, you can calculate the most important metrics
and store them in a database as a resource.
Store metrics, get calculated metrics from the database, build dashboards.
Supported metrics:
Monthly Recurring Revenue (MRR),
Subscription churn rate.
Pipeline returns both metrics.
Use Subscription and Event endpoints to calculate the metrics.
"""

pipeline = dlt.pipeline(
pipeline_name="stripe_analytics",
destination="duckdb",
dataset_name="stripe_metrics",
)

# Event is an endpoint with uneditable data, so we can use 'incremental_stripe_source'.
source_event = incremental_stripe_source(endpoints=("Event",))
# Subscription is an endpoint with editable data, use stripe_source.
source_subs = stripe_source(endpoints=("Subscription",))

# convert dates to the timestamp format
source_event.resources["Event"].apply_hints(
columns={
"created": {"data_type": "timestamp"},
}
)

source_subs.resources["Subscription"].apply_hints(
columns={
"created": {"data_type": "timestamp"},
}
)

load_info = pipeline.run(data=[source_subs, source_event])
print(load_info)

resource = metrics_resource()
load_info = pipeline.run(resource)
print(load_info)


if __name__ == "__main__":
load_data()
# # load only data that was created during the period between the Jan 1, 2024 (incl.), and the Feb 1, 2024 (not incl.).
Expand All @@ -149,5 +101,3 @@ def load_data_and_get_metrics() -> None:
# initial_start_date=datetime(2023, 5, 3),
# end_date=datetime(2024, 3, 1),
# )
# # load Subscription and Event data, calculate metrics, store them in a database
# load_data_and_get_metrics()
11 changes: 0 additions & 11 deletions tests/stripe_analytics/test_metrics.py

This file was deleted.

45 changes: 2 additions & 43 deletions tests/stripe_analytics/test_stripe_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from pendulum import datetime

from sources.stripe_analytics import (
metrics_resource,
incremental_stripe_source,
stripe_source,
)

from tests.utils import ALL_DESTINATIONS, assert_load_info


Expand Down Expand Up @@ -66,10 +64,10 @@ def test_load_subscription(destination_name: str) -> None:
# make sure all jobs were loaded
assert_load_info(info)
# now let's inspect the generated schema.
# it should contain just two tables: subscription and subscription__items__data
# it should contain just three tables: subscription, subscription__items__data and subscription__discounts
schema = pipeline.default_schema
user_tables = schema.data_tables()
assert len(user_tables) == 2
assert len(user_tables) == 3
# tables are typed dicts
subscription_table = user_tables[0]
assert subscription_table["name"] == "subscription"
Expand Down Expand Up @@ -131,42 +129,3 @@ def get_active_subs() -> int:
# we have new subscriptions in the next day!
assert_load_info(info)
assert get_active_subs() > active_subs


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_metrics(destination_name: str) -> None:
# mind the full_refresh flag - it makes sure that data is loaded to unique dataset.
# this allows you to run the tests on the same database in parallel
pipeline = dlt.pipeline(
pipeline_name="stripe_analytics_metric_test",
destination=destination_name,
dataset_name="stripe_metric_test",
full_refresh=True,
)
# Event has only uneditable data, so we should use 'incremental_stripe_source'.
source = incremental_stripe_source(endpoints=("Event",))
source.resources["Event"].apply_hints(
columns={
"created": {"data_type": "timestamp"},
}
)
load_info = pipeline.run(source)
print(load_info)

# Subscription has editable data, use stripe_source.
source = stripe_source(endpoints=("Subscription",))
source.resources["Subscription"].apply_hints(
columns={
"created": {"data_type": "timestamp"},
}
)
load_info = pipeline.run(source)
print(load_info)

resource = metrics_resource()

mrr = list(resource)[0]["MRR"]
assert mrr > 0

load_info = pipeline.run(resource)
assert_load_info(load_info)

0 comments on commit 614dc7b

Please sign in to comment.