-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #7 from lealre/feat/updates
Feat/updates
- Loading branch information
Showing
15 changed files
with
439 additions
and
248 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
POSTGRES_USER = | ||
POSTGRES_PASSWORD = | ||
POSTGRES_HOST = | ||
POSTGRES_PORT = | ||
POSTGRES_DB = |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,17 @@ This project also has a CI for every Pull Request made using GitHub Actions, whe | |
|
||
![](pics/etl-diagram.png) | ||
|
||
## Table of Contents | ||
|
||
- [Context](#context) | ||
- [Contract Schema](#contract-schema) | ||
- [How it works](#how-it-works) | ||
- [Project Folder Structure](#project-folder-structure) | ||
- [How to run this project](#how-to-run-this-project) | ||
- [Run without Airflow](#run-without-airflow) | ||
- [Further Tasks](#further-tasks) | ||
|
||
|
||
## Context | ||
|
||
This project was built in a context where a folder inside Google Drive regularly receives different CSV files containing the operational revenue from various companies in three types of currencies: USD, EUR, and YEN. This data must be stored in a PostgreSQL database with the operational revenue converted to USD and the currency conversion rate, taking the last quotation of the month date that is in the CSV. | ||
|
@@ -25,7 +36,7 @@ The project uses the following contract schema to validate the data: | |
| company | Series[str] | | | ||
| currency | Series[str] | in ['EUR', 'USD', 'YEN'], all values equal | | ||
| operational_revenue | Series[float] | greater than or equal to 0 | | ||
| date | Series[pa.DateTime] | all values equal | | ||
| date | Series[DateTime] | all values equal | | ||
| file_id | Optional[str] | | | ||
|
||
* Schema-out: Used when transforming data in Task 03 | ||
|
@@ -35,7 +46,7 @@ The project uses the following contract schema to validate the data: | |
| company | Series[str] | | | ||
| currency | Series[str] | in ['EUR', 'USD', 'YEN'], all values equal | | ||
| operational_revenue | Series[float] | greater than or equal to 0 | | ||
| date | Series[pa.DateTime] | all values equal | | ||
| date | Series[DateTime] | all values equal | | ||
| file_id | Series[str] | | | ||
| convertion_rate | Series[float] | greater than or equal to 0 | | ||
| usd_converted | Series[float] | greater than or equal to 0 | | ||
|
@@ -76,17 +87,23 @@ This final task loads the data into a PostgreSQL database. | |
### Project Folder Structure | ||
|
||
``` | ||
. | ||
├── Dockerfile | ||
├── README.md | ||
├── airflow_settings.yaml | ||
├── dags | ||
│ └── dag_etl.py | ||
├── data | ||
│ ├── eur_revenue.csv | ||
│ ├── usd_revenue.csv | ||
│ ├── wrong_data.csv | ||
│ └── yen_revenue.csv | ||
├── dev-requirements.txt | ||
├── docker-compose.yml | ||
├── packages.txt | ||
├── pics | ||
│ └── etl-diagram.png | ||
├── pyproject.toml | ||
├── pytest.ini | ||
├── requirements.txt | ||
├── src | ||
│ ├── __init__.py | ||
|
@@ -98,7 +115,6 @@ This final task loads the data into a PostgreSQL database. | |
│ └── transform_utils.py | ||
└── tests | ||
├── dags | ||
│ └── test_dag_example.py | ||
├── test_schema_in.py | ||
└── test_schema_out.py | ||
``` | ||
|
@@ -108,9 +124,10 @@ This final task loads the data into a PostgreSQL database. | |
|
||
All the steps here were intended to a `bash` terminal. | ||
|
||
Google Drive API requires a JSON file to authenticate the connection, and although it's not correct, in this project it was uploaded in the root directory, where the file `dags/dag_etl.py` will search for the file. So, to run as it is, you need to upload the JSON file in the root directory with the name `service_account.json`. Make sure that this file is included in `.gitignore`. | ||
Google Drive API requires a JSON file to authenticate the connection, and although it's not correct, in this project it was uploaded in the root directory, where the file `dags/dag_etl.py` will search for the file. So, to run as it is, you need to upload the JSON file in the root directory with the name `service_account.json`. Make sure that this file is included in `.gitignore`. Here are some resources on how to do it: [How to Upload Files Using the Google Drive API in Python](https://ragug.medium.com/how-to-upload-files-using-the-google-drive-api-in-python-ebefdfd63eab), [Using Google Drive API with Python and a Service Account](https://medium.com/@matheodaly/using-google-drive-api-with-python-and-a-service-account-d6ae1f6456c2), and a [video tutorial](https://www.youtube.com/watch?v=tamT_iGoZDQ). | ||
|
||
|
||
The name of the parent folder in Google Drive is set to be `python_to_drive` and the folder from where it will extract the CSV files must be called `Operational Revenue`. You can change the names by just changing the variables `parent_folder_name` and `folder_to_extract_from` in `dags/dag_etl.py`. | ||
The name of the parent folder in Google Drive is set to be `python_to_drive` and the folder from where it will extract the CSV files must be called `Operational Revenue`. You can change the names by just changing the variables `parent_folder_name` and `folder_to_extract_from` in [`dags/dag_etl.py`](dags/dag_etl.py). | ||
|
||
You also need to connect the app to your own PostgreSQL database. You can do this by following the steps below: | ||
|
||
|
@@ -124,7 +141,37 @@ git https://github.com/lealre/etl-airflow.git | |
cd etl-airflow | ||
``` | ||
|
||
1.3 - Create the `.env` file in the root folder, passing the respective keys from your own PostgresSQL Database: | ||
1.3 - Here we have two options to connect to PostgreSQL: locally with Docker (1.3.1) or to connect to an existing database using credentials passed in the `.env` file (1.3.2). By default, if there are no credentials provided, the program will attempt to automatically connect to Docker locally, using the credentials specified in [docker-compose.yml](docker-compose.yml). | ||
|
||
1.3.1 - Locally using Docker | ||
|
||
Create the database and PGAdmin container with Docker: | ||
```bash | ||
docker compose up -d | ||
``` | ||
|
||
To access the database: | ||
|
||
I. Go to the localhost link in your browser: [http://localhost:8888/](http://localhost:8888/) | ||
|
||
II. Access PGAdmin with the credentials specified in the docker-compose.yaml file: | ||
- Username: `[email protected]` | ||
- Password: `pgadmin` | ||
|
||
III. Set the master password (when accessing for the first time). | ||
|
||
IV. Right-click on the server to connect PGAdmin to the database. | ||
|
||
V. Connect to the database using the credentials defined in the `docker-compose.yaml` file: | ||
- Host name: `db` | ||
- Password: `postgres` | ||
|
||
|
||
**OR** | ||
|
||
1.3.2 - Connecting with an existing database | ||
|
||
Create the `.env` file in the root folder, passing the respective keys from your own PostgresSQL Database: | ||
```bash | ||
echo "POSTGRES_USER=<your-database-keys>" >> .env | ||
echo "POSTGRES_PASSWORD=<your-database-keys>" >> .env | ||
|
@@ -133,16 +180,16 @@ echo "POSTGRES_PORT=<your-database-keys>" >> .env | |
echo "POSTGRES_DB=<your-database-keys>" >> .env | ||
``` | ||
|
||
1.4 - Make sure `.env` file is included in `.gitignore`. | ||
Make sure `.env` file is included in `.gitignore`. | ||
|
||
From here, we use the Astro CLI to run Airflow with Docker. | ||
|
||
1.5 - Install Astro CLI: | ||
1.4 - Install Astro CLI: | ||
```bash | ||
curl -sSL install.astronomer.io | sudo bash -s | ||
``` | ||
|
||
1.6 - Run Airflow with Docker | ||
1.5 - Run Airflow with Docker | ||
```bash | ||
astro dev start | ||
``` | ||
|
@@ -155,7 +202,7 @@ After these steps, it will automatically open the localhost link with the Airflo | |
|
||
### Run without Airflow | ||
|
||
You also can run this project without Airflow. To do it, you should follow all the steps until 1.4, and then do the following: | ||
You also can run this project without Airflow. To do it, you should follow all the steps until 1.3, and then do the following: | ||
|
||
2.1 - Install Python version 3.11.5: | ||
```bash | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,62 @@ | ||
from src.etl import connect_drive_and_extract_files, validate_data, transform_data, load_files | ||
from airflow.decorators import dag, task | ||
from datetime import datetime | ||
|
||
parent_folder_name = 'python_to_drive' | ||
folder_to_extract_from = 'Operational Revenue' | ||
from airflow.decorators import dag, task | ||
|
||
from src.etl import ( | ||
connect_drive_and_extract_files, | ||
load_files, | ||
transform_data, | ||
validate_data, | ||
) | ||
|
||
parent_folder_name = "python_to_drive" | ||
folder_to_extract_from = "Operational Revenue" | ||
|
||
|
||
@dag( | ||
dag_id = 'ETL-GDrive-to-PostgreSQL', | ||
description='Dag to extract data from Google Drive, transform and store in a POstgreSQL', | ||
schedule= '*/2 * * * *', | ||
start_date= datetime(2024,4,7), | ||
catchup= False, | ||
params= { | ||
'service_account_path' : 'service_account.json', | ||
'parent_folder_name' : parent_folder_name, | ||
'folder_to_extract_from' : folder_to_extract_from | ||
dag_id="ETL-GDrive-to-PostgreSQL", | ||
description=( | ||
"Dag to extract data from Google Drive, " | ||
"transform and store in a POstgreSQL" | ||
), | ||
schedule="*/2 * * * *", | ||
start_date=datetime(2024, 4, 7), | ||
catchup=False, | ||
params={ | ||
"service_account_path": "service_account.json", | ||
"parent_folder_name": parent_folder_name, | ||
"folder_to_extract_from": folder_to_extract_from | ||
} | ||
) | ||
def pipeline(): | ||
|
||
@task(task_id = 'connect-with-drive-and-read-files-info') | ||
@task(task_id="connect-with-drive-and-read-files-info") | ||
def task_connect_drive_and_extract_files(**context): | ||
return connect_drive_and_extract_files(context['params']['service_account_path'], context['params']['parent_folder_name'], context['params']['folder_to_extract_from']) | ||
|
||
@task(task_id = 'validate-data') | ||
return connect_drive_and_extract_files( | ||
context["params"]["service_account_path"], | ||
context["params"]["parent_folder_name"], | ||
context["params"]["folder_to_extract_from"] | ||
) | ||
|
||
@task(task_id="validate-data") | ||
def task_validate_data(list_df): | ||
return validate_data(list_df) | ||
@task(task_id = 'transform-data') | ||
|
||
@task(task_id="transform-data") | ||
def task_transform_data(list_df): | ||
return transform_data(list_df) | ||
|
||
@task(task_id = 'load-data') | ||
@task(task_id="load-data") | ||
def task_load_files(list_df): | ||
return load_files(list_df) | ||
|
||
task_connect_and_extract = task_connect_drive_and_extract_files() | ||
task_validate = task_validate_data(task_connect_and_extract) | ||
task_transform = task_transform_data(task_validate) | ||
task_load = task_load_files(task_transform) | ||
task_load = task_load_files(task_transform) | ||
|
||
task_connect_and_extract >> task_validate >> task_transform >> task_load | ||
|
||
pipeline() | ||
|
||
pipeline() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
version: "3.8" | ||
|
||
services: | ||
db: | ||
image: postgres | ||
container_name: db | ||
restart: always | ||
ports: | ||
- "5432:5432" | ||
environment: | ||
POSTGRES_DB: db | ||
POSTGRES_USER: user-name | ||
POSTGRES_PASSWORD: postgres | ||
volumes: | ||
- local_pgdata:/var/lib/postgresql/data | ||
|
||
pgadmin: | ||
image: dpage/pgadmin4 | ||
container_name: pgadmin4_container | ||
restart: always | ||
ports: | ||
- "8888:80" | ||
environment: | ||
PGADMIN_DEFAULT_EMAIL: [email protected] | ||
PGADMIN_DEFAULT_PASSWORD: pgadmin | ||
volumes: | ||
- pgadmin-data:/var/lib/pgadmin | ||
|
||
volumes: | ||
local_pgdata: | ||
pgadmin-data: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,34 @@ | ||
from dotenv import load_dotenv | ||
from sqlalchemy import create_engine | ||
import pandas as pd | ||
from sqlalchemy import MetaData | ||
import os | ||
from typing import Union | ||
|
||
import pandas as pd | ||
from dotenv import load_dotenv | ||
from sqlalchemy import MetaData, create_engine | ||
|
||
load_dotenv(".env") | ||
|
||
POSTGRES_USER = os.getenv('POSTGRES_USER') | ||
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD') | ||
POSTGRES_HOST = os.getenv('POSTGRES_HOST') | ||
POSTGRES_PORT = os.getenv('POSTGRES_PORT') | ||
POSTGRES_DB = os.getenv('POSTGRES_DB') | ||
POSTGRES_USER = os.getenv('POSTGRES_USER', 'user-name') | ||
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'postgres') | ||
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost') | ||
POSTGRES_PORT = int(os.getenv('POSTGRES_PORT', "5432")) | ||
POSTGRES_DB = os.getenv('POSTGRES_DB', 'mydb') | ||
|
||
POSTGRES_DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}" | ||
POSTGRES_DATABASE_URL = ( | ||
f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}" | ||
f"@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}" | ||
) | ||
|
||
engine = create_engine(POSTGRES_DATABASE_URL) | ||
|
||
|
||
def get_files_ids_from_db() -> Union[list[str], list[None]]: | ||
|
||
table_name = 'revenues' | ||
table_name = 'revenues' | ||
metadata = MetaData(bind=engine) | ||
metadata.reflect() | ||
|
||
if table_name in metadata.tables: | ||
query = f"SELECT DISTINCT file_id FROM {table_name}" | ||
database_gd_ids = pd.read_sql_query(query, engine) | ||
return list(database_gd_ids['file_id']) | ||
if table_name in metadata.tables: | ||
query = f"SELECT DISTINCT file_id FROM {table_name}" | ||
database_gd_ids = pd.read_sql_query(query, engine) | ||
return list(database_gd_ids['file_id']) | ||
|
||
return [] | ||
return [] |
Oops, something went wrong.