Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/updates #7

Merged
merged 14 commits into from
Jun 30, 2024
5 changes: 2 additions & 3 deletions .astro/test_dag_integrity_default.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**"""

from contextlib import contextmanager
import logging
import os
from contextlib import contextmanager

import pytest

from airflow.models import DagBag, Variable, Connection
from airflow.hooks.base import BaseHook
from airflow.models import Connection, DagBag, Variable
from airflow.utils.db import initdb

# init airflow database
Expand Down
5 changes: 5 additions & 0 deletions .env-example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
POSTGRES_USER =
POSTGRES_PASSWORD =
POSTGRES_HOST =
POSTGRES_PORT =
POSTGRES_DB =
69 changes: 58 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ This project also has a CI for every Pull Request made using GitHub Actions, whe

![](pics/etl-diagram.png)

## Table of Contents

- [Context](#context)
- [Contract Schema](#contract-schema)
- [How it works](#how-it-works)
- [Project Folder Structure](#project-folder-structure)
- [How to run this project](#how-to-run-this-project)
- [Run without Airflow](#run-without-airflow)
- [Further Tasks](#further-tasks)


## Context

This project was built in a context where a folder inside Google Drive regularly receives different CSV files containing the operational revenue from various companies in three types of currencies: USD, EUR, and YEN. This data must be stored in a PostgreSQL database with the operational revenue converted to USD and the currency conversion rate, taking the last quotation of the month date that is in the CSV.
Expand All @@ -25,7 +36,7 @@ The project uses the following contract schema to validate the data:
| company | Series[str] | |
| currency | Series[str] | in ['EUR', 'USD', 'YEN'], all values equal |
| operational_revenue | Series[float] | greater than or equal to 0 |
| date | Series[pa.DateTime] | all values equal |
| date | Series[DateTime] | all values equal |
| file_id | Optional[str] | |

* Schema-out: Used when transforming data in Task 03
Expand All @@ -35,7 +46,7 @@ The project uses the following contract schema to validate the data:
| company | Series[str] | |
| currency | Series[str] | in ['EUR', 'USD', 'YEN'], all values equal |
| operational_revenue | Series[float] | greater than or equal to 0 |
| date | Series[pa.DateTime] | all values equal |
| date | Series[DateTime] | all values equal |
| file_id | Series[str] | |
| convertion_rate | Series[float] | greater than or equal to 0 |
| usd_converted | Series[float] | greater than or equal to 0 |
Expand Down Expand Up @@ -76,17 +87,23 @@ This final task loads the data into a PostgreSQL database.
### Project Folder Structure

```
.
├── Dockerfile
├── README.md
├── airflow_settings.yaml
├── dags
│ └── dag_etl.py
├── data
│ ├── eur_revenue.csv
│ ├── usd_revenue.csv
│ ├── wrong_data.csv
│ └── yen_revenue.csv
├── dev-requirements.txt
├── docker-compose.yml
├── packages.txt
├── pics
│ └── etl-diagram.png
├── pyproject.toml
├── pytest.ini
├── requirements.txt
├── src
│ ├── __init__.py
Expand All @@ -98,7 +115,6 @@ This final task loads the data into a PostgreSQL database.
│ └── transform_utils.py
└── tests
├── dags
│ └── test_dag_example.py
├── test_schema_in.py
└── test_schema_out.py
```
Expand All @@ -108,9 +124,10 @@ This final task loads the data into a PostgreSQL database.

All the steps here were intended to a `bash` terminal.

Google Drive API requires a JSON file to authenticate the connection, and although it's not correct, in this project it was uploaded in the root directory, where the file `dags/dag_etl.py` will search for the file. So, to run as it is, you need to upload the JSON file in the root directory with the name `service_account.json`. Make sure that this file is included in `.gitignore`.
Google Drive API requires a JSON file to authenticate the connection, and although it's not correct, in this project it was uploaded in the root directory, where the file `dags/dag_etl.py` will search for the file. So, to run as it is, you need to upload the JSON file in the root directory with the name `service_account.json`. Make sure that this file is included in `.gitignore`. Here are some resources on how to do it: [How to Upload Files Using the Google Drive API in Python](https://ragug.medium.com/how-to-upload-files-using-the-google-drive-api-in-python-ebefdfd63eab), [Using Google Drive API with Python and a Service Account](https://medium.com/@matheodaly/using-google-drive-api-with-python-and-a-service-account-d6ae1f6456c2), and a [video tutorial](https://www.youtube.com/watch?v=tamT_iGoZDQ).


The name of the parent folder in Google Drive is set to be `python_to_drive` and the folder from where it will extract the CSV files must be called `Operational Revenue`. You can change the names by just changing the variables `parent_folder_name` and `folder_to_extract_from` in `dags/dag_etl.py`.
The name of the parent folder in Google Drive is set to be `python_to_drive` and the folder from where it will extract the CSV files must be called `Operational Revenue`. You can change the names by just changing the variables `parent_folder_name` and `folder_to_extract_from` in [`dags/dag_etl.py`](dags/dag_etl.py).

You also need to connect the app to your own PostgreSQL database. You can do this by following the steps below:

Expand All @@ -124,7 +141,37 @@ git https://github.com/lealre/etl-airflow.git
cd etl-airflow
```

1.3 - Create the `.env` file in the root folder, passing the respective keys from your own PostgresSQL Database:
1.3 - Here we have two options to connect to PostgreSQL: locally with Docker (1.3.1) or to connect to an existing database using credentials passed in the `.env` file (1.3.2). By default, if there are no credentials provided, the program will attempt to automatically connect to Docker locally, using the credentials specified in [docker-compose.yml](docker-compose.yml).

1.3.1 - Locally using Docker

Create the database and PGAdmin container with Docker:
```bash
docker compose up -d
```

To access the database:

I. Go to the localhost link in your browser: [http://localhost:8888/](http://localhost:8888/)

II. Access PGAdmin with the credentials specified in the docker-compose.yaml file:
- Username: `[email protected]`
- Password: `pgadmin`

III. Set the master password (when accessing for the first time).

IV. Right-click on the server to connect PGAdmin to the database.

V. Connect to the database using the credentials defined in the `docker-compose.yaml` file:
- Host name: `db`
- Password: `postgres`


**OR**

1.3.2 - Connecting with an existing database

Create the `.env` file in the root folder, passing the respective keys from your own PostgresSQL Database:
```bash
echo "POSTGRES_USER=<your-database-keys>" >> .env
echo "POSTGRES_PASSWORD=<your-database-keys>" >> .env
Expand All @@ -133,16 +180,16 @@ echo "POSTGRES_PORT=<your-database-keys>" >> .env
echo "POSTGRES_DB=<your-database-keys>" >> .env
```

1.4 - Make sure `.env` file is included in `.gitignore`.
Make sure `.env` file is included in `.gitignore`.

From here, we use the Astro CLI to run Airflow with Docker.

1.5 - Install Astro CLI:
1.4 - Install Astro CLI:
```bash
curl -sSL install.astronomer.io | sudo bash -s
```

1.6 - Run Airflow with Docker
1.5 - Run Airflow with Docker
```bash
astro dev start
```
Expand All @@ -155,7 +202,7 @@ After these steps, it will automatically open the localhost link with the Airflo

### Run without Airflow

You also can run this project without Airflow. To do it, you should follow all the steps until 1.4, and then do the following:
You also can run this project without Airflow. To do it, you should follow all the steps until 1.3, and then do the following:

2.1 - Install Python version 3.11.5:
```bash
Expand Down
62 changes: 39 additions & 23 deletions dags/dag_etl.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,62 @@
from src.etl import connect_drive_and_extract_files, validate_data, transform_data, load_files
from airflow.decorators import dag, task
from datetime import datetime

parent_folder_name = 'python_to_drive'
folder_to_extract_from = 'Operational Revenue'
from airflow.decorators import dag, task

from src.etl import (
connect_drive_and_extract_files,
load_files,
transform_data,
validate_data,
)

parent_folder_name = "python_to_drive"
folder_to_extract_from = "Operational Revenue"


@dag(
dag_id = 'ETL-GDrive-to-PostgreSQL',
description='Dag to extract data from Google Drive, transform and store in a POstgreSQL',
schedule= '*/2 * * * *',
start_date= datetime(2024,4,7),
catchup= False,
params= {
'service_account_path' : 'service_account.json',
'parent_folder_name' : parent_folder_name,
'folder_to_extract_from' : folder_to_extract_from
dag_id="ETL-GDrive-to-PostgreSQL",
description=(
"Dag to extract data from Google Drive, "
"transform and store in a POstgreSQL"
),
schedule="*/2 * * * *",
start_date=datetime(2024, 4, 7),
catchup=False,
params={
"service_account_path": "service_account.json",
"parent_folder_name": parent_folder_name,
"folder_to_extract_from": folder_to_extract_from

}
)
def pipeline():

@task(task_id = 'connect-with-drive-and-read-files-info')
@task(task_id="connect-with-drive-and-read-files-info")
def task_connect_drive_and_extract_files(**context):
return connect_drive_and_extract_files(context['params']['service_account_path'], context['params']['parent_folder_name'], context['params']['folder_to_extract_from'])

@task(task_id = 'validate-data')
return connect_drive_and_extract_files(
context["params"]["service_account_path"],
context["params"]["parent_folder_name"],
context["params"]["folder_to_extract_from"]
)

@task(task_id="validate-data")
def task_validate_data(list_df):
return validate_data(list_df)
@task(task_id = 'transform-data')

@task(task_id="transform-data")
def task_transform_data(list_df):
return transform_data(list_df)

@task(task_id = 'load-data')
@task(task_id="load-data")
def task_load_files(list_df):
return load_files(list_df)

task_connect_and_extract = task_connect_drive_and_extract_files()
task_validate = task_validate_data(task_connect_and_extract)
task_transform = task_transform_data(task_validate)
task_load = task_load_files(task_transform)
task_load = task_load_files(task_transform)

task_connect_and_extract >> task_validate >> task_transform >> task_load

pipeline()

pipeline()
4 changes: 3 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ psycopg2-binary==2.9.9
pandera==0.18.3
pytest==8.1.1
pandas-datareader==0.10.0
taskipy==1.12.2
taskipy==1.12.2
ruff==0.5.0
numpy==1.26.4
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
version: "3.8"

services:
db:
image: postgres
container_name: db
restart: always
ports:
- "5432:5432"
environment:
POSTGRES_DB: db
POSTGRES_USER: user-name
POSTGRES_PASSWORD: postgres
volumes:
- local_pgdata:/var/lib/postgresql/data

pgadmin:
image: dpage/pgadmin4
container_name: pgadmin4_container
restart: always
ports:
- "8888:80"
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: pgadmin
volumes:
- pgadmin-data:/var/lib/pgadmin

volumes:
local_pgdata:
pgadmin-data:
21 changes: 20 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,23 @@ addopts = '-p no:warnings'
test_schema_in = 'pytest tests/test_schema_in.py -v'
test_schema_out = 'pytest tests/test_schema_out.py -v'
test = "pytest tests -v"
main = 'python src/main.py'
main = 'python src/main.py'
lint = 'ruff check . && ruff check . --diff'
format = 'ruff check . --fix && ruff format .'

[tool.ruff]
line-length = 79
extend-exclude = [
'tests/dags',
'.astro'
]
unsafe-fixes = true


[tool.ruff.lint]
preview = true
select = ['I', 'F', 'E', 'W', 'PL', 'PT']

[tool.ruff.format]
preview = true
quote-style = "double"
36 changes: 19 additions & 17 deletions src/database.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
from dotenv import load_dotenv
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy import MetaData
import os
from typing import Union

import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import MetaData, create_engine

load_dotenv(".env")

POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
POSTGRES_HOST = os.getenv('POSTGRES_HOST')
POSTGRES_PORT = os.getenv('POSTGRES_PORT')
POSTGRES_DB = os.getenv('POSTGRES_DB')
POSTGRES_USER = os.getenv('POSTGRES_USER', 'user-name')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'postgres')
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost')
POSTGRES_PORT = int(os.getenv('POSTGRES_PORT', "5432"))
POSTGRES_DB = os.getenv('POSTGRES_DB', 'mydb')

POSTGRES_DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
POSTGRES_DATABASE_URL = (
f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}"
f"@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
)

engine = create_engine(POSTGRES_DATABASE_URL)


def get_files_ids_from_db() -> Union[list[str], list[None]]:

table_name = 'revenues'
table_name = 'revenues'
metadata = MetaData(bind=engine)
metadata.reflect()

if table_name in metadata.tables:
query = f"SELECT DISTINCT file_id FROM {table_name}"
database_gd_ids = pd.read_sql_query(query, engine)
return list(database_gd_ids['file_id'])
if table_name in metadata.tables:
query = f"SELECT DISTINCT file_id FROM {table_name}"
database_gd_ids = pd.read_sql_query(query, engine)
return list(database_gd_ids['file_id'])

return []
return []
Loading