From f5ddb2742611ed70b63fc63d1d2b0780e318e1e2 Mon Sep 17 00:00:00 2001 From: "sola.akanmu" Date: Fri, 29 Mar 2024 14:35:06 +0100 Subject: [PATCH 01/11] data migration scripts --- python/etlv2/.gitignore | 125 ++++++++++ python/etlv2/README.md | 53 +++++ python/etlv2/config.properties | 11 + python/etlv2/migration_job.py | 338 +++++++++++++++++++++++++++ python/etlv2/myenv/bin/Activate.ps1 | 247 ++++++++++++++++++++ python/etlv2/myenv/bin/activate | 69 ++++++ python/etlv2/myenv/bin/activate.csh | 26 +++ python/etlv2/myenv/bin/activate.fish | 69 ++++++ python/etlv2/myenv/bin/dotenv | 8 + python/etlv2/myenv/bin/f2py | 8 + python/etlv2/myenv/bin/pip | 8 + python/etlv2/myenv/bin/pip3 | 8 + python/etlv2/myenv/bin/pip3.11 | 8 + python/etlv2/myenv/bin/python | 1 + python/etlv2/myenv/bin/python3 | 1 + python/etlv2/myenv/bin/python3.11 | 1 + python/etlv2/myenv/pyvenv.cfg | 5 + python/etlv2/requirements.txt | 10 + 18 files changed, 996 insertions(+) create mode 100644 python/etlv2/.gitignore create mode 100644 python/etlv2/README.md create mode 100644 python/etlv2/config.properties create mode 100644 python/etlv2/migration_job.py create mode 100644 python/etlv2/myenv/bin/Activate.ps1 create mode 100644 python/etlv2/myenv/bin/activate create mode 100644 python/etlv2/myenv/bin/activate.csh create mode 100644 python/etlv2/myenv/bin/activate.fish create mode 100755 python/etlv2/myenv/bin/dotenv create mode 100755 python/etlv2/myenv/bin/f2py create mode 100755 python/etlv2/myenv/bin/pip create mode 100755 python/etlv2/myenv/bin/pip3 create mode 100755 python/etlv2/myenv/bin/pip3.11 create mode 120000 python/etlv2/myenv/bin/python create mode 120000 python/etlv2/myenv/bin/python3 create mode 120000 python/etlv2/myenv/bin/python3.11 create mode 100644 python/etlv2/myenv/pyvenv.cfg create mode 100644 python/etlv2/requirements.txt diff --git a/python/etlv2/.gitignore b/python/etlv2/.gitignore new file mode 100644 index 0000000..25be5ab --- /dev/null +++ b/python/etlv2/.gitignore @@ -0,0 +1,125 @@ +# Editors +.vscode/ +.idea/ + +# Vagrant +.vagrant/ + +# Mac/OSX +.DS_Store + +# Windows +Thumbs.db + +# Source for the following rules: https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json diff --git a/python/etlv2/README.md b/python/etlv2/README.md new file mode 100644 index 0000000..8f3004e --- /dev/null +++ b/python/etlv2/README.md @@ -0,0 +1,53 @@ +Run the following command to create a virtual environment (replace myenv with your preferred name): +`python3 -m venv myenv` + +## Activate the virtual environment: + +#### On Windows: `myenv\Scripts\activate` + +#### On macOS/Linux: `source myenv/bin/activate` + +## Install requirements + +`pip install -r requirements.txt` + +## Create an Environment Variable File + +Create a .env file in the project directory to store environment variables. +Open the .env file and add the following key-value configs, update the values. + +``` +SRC_DB_HOST=localhost +SRC_DB_PORT=5432 +SRC_DB_NAME=mydatabase +SRC_DB_USER=myuser +SRC_DB_PASSWORD=mypassword + +DEST_DB_HOST=localhost +DEST_DB_PORT=5432 +DEST_DB_NAME=mydatabase +DEST_DB_USER=myuser +DEST_DB_PASSWORD=mypassword +``` + +## Update config.properties file + +``` +# here job is scheduled to run every 24 hours, set value to preferred frequency +job_interval_seconds=86400 +# set batch size for each run, 1000 here means a limit of 1000 records per batch of query +query_batch_size=1000 +# list of organization_ids separated by commas +organization_ids=15,1,13,14,2,3,4,5,6,30,31,32,11 + +#queries ... +list of queries being run for data migration +``` + +## Activate Environment Variables: + +Activate the environment variables by sourcing the .env file in your terminal: `source .env` + +## Run Application: + +Run the application using: `python migration_job.py` diff --git a/python/etlv2/config.properties b/python/etlv2/config.properties new file mode 100644 index 0000000..4f0e4fa --- /dev/null +++ b/python/etlv2/config.properties @@ -0,0 +1,11 @@ +# here job is scheduled to run every 24 hours, set value to preferred frequency +job_interval_seconds=86400 +# set batch size for each run, 1000 here means a limit of 1000 records per batch of query +query_batch_size=1000 +# list of organization_ids separated by commas +organization_ids=15,1,13,14,2,3,4,5,6,30,31,32,11 + +#queries +organization_query=SELECT * FROM entity WHERE id > %s and id in (%s) ORDER BY id LIMIT %s; +planter_query=SELECT * FROM planter WHERE id > %s and organization_id in (%s) ORDER BY id LIMIT %s; +tree_query=SELECT trees.* FROM trees, planter, entity WHERE trees.id > %s and planter_id = planter.id and entity.id = planter.organization_id and entity.id in (%s) ORDER BY trees.id LIMIT %s; \ No newline at end of file diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py new file mode 100644 index 0000000..8528884 --- /dev/null +++ b/python/etlv2/migration_job.py @@ -0,0 +1,338 @@ +from dotenv import load_dotenv +from apscheduler.schedulers.blocking import BlockingScheduler +import os +import psycopg2 +import pandas as pd +import re +from decimal import Decimal + +load_dotenv(override=True) + +src_db_host = os.environ.get("SRC_DB_HOST") +src_db_port = os.environ.get("SRC_DB_PORT") +src_db_name = os.environ.get("SRC_DB_NAME") +src_db_user = os.environ.get("SRC_DB_USER") +src_db_password = os.environ.get("SRC_DB_PASSWORD") + +dest_db_host = os.environ.get("DEST_DB_HOST") +dest_db_port = os.environ.get("DEST_DB_PORT") +dest_db_name = os.environ.get("DEST_DB_NAME") +dest_db_user = os.environ.get("DEST_DB_USER") +dest_db_password = os.environ.get("DEST_DB_PASSWORD") + +# Database connection details for dbone +dbone_config = { + "host": src_db_host, + "port": src_db_port, + "dbname": src_db_name, + "user": src_db_user, + "password": src_db_password +} + +# Database connection details for dbtwo +dbtwo_config = { + "host": dest_db_host, + "port": dest_db_port, + "dbname": dest_db_name, + "user": dest_db_user, + "password": dest_db_password +} + +#function to read properties file as a json object +def read_properties_file(file_path): + properties = {} + try: + with open(file_path, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + key, value = line.split('=', 1) + properties[key.strip()] = value.strip() + except FileNotFoundError: + print(f"File {file_path} not found.") + return properties + +print("Loading properties...") +# load properties +loaded_properties = read_properties_file("config.properties") + +def escape_str(value): + return value.replace("'", "''") + +def generate_insert_queries(df, table_name): + try: + # Initialize the query + query = f"INSERT INTO {table_name} (" + + # Get the column names + columns = df.columns.tolist() + + # Add column names to the query + query += ", ".join(columns) + ") VALUES (" + + # Iterate over each row in the dataframe + for index, row in df.iterrows(): + values = [] + for col in columns: + value = row[col] + if isinstance(value, (int, float, Decimal)): + # If value is an integer or float, add it without quotes + values.append(f"NULL" if pd.isnull(value) else str(value)) + else: + # Otherwise, add it with quotes + values.append(f"NULL" if pd.isnull(value) else (f"'{value}'" if isinstance(value, pd.Timestamp) else f"'{escape_str(value)}'")) + + query += ", ".join(values) + "), (" + + # Remove the trailing ", (" and add a semicolon at the end + query = query[:-3] + ";" + + except (Exception) as error: + print(f"Error generating insert query: {error}") + + return query + +def generate_update_queries(df, table_name, identifier_column="id"): + try: + query_prefix = f"UPDATE {table_name} SET " + + # Get the column names + columns = df.columns.tolist() + update_queries = [] + + # Iterate over each row in the dataframe + for index, row in df.iterrows(): + identifier_value = row[identifier_column] + updates = [] + for col in columns: + if (col == identifier_column): + continue + value = row[col] + if isinstance(value, (int, float, Decimal)): + # If value is an integer or float, add it without quotes + value = f"NULL" if pd.isnull(value) else str(value) + updates.append(f"{col} = {value}") + else: + # Otherwise, add it with quotes + value = f"NULL" if pd.isnull(value) else (f"'{value}'" if isinstance(value, pd.Timestamp) else f"'{escape_str(value)}'") + updates.append(f"{col} = {value}") + + query = query_prefix + ", ".join(updates) + " WHERE " + f"{identifier_column} = {identifier_value};" + update_queries.append(query) + + except (Exception) as error: + print(f"Error generating update query: {error}") + + return update_queries + +def extract_table_name(query): + # Regular expression pattern to match the table name + pattern = r"from\s+(\w+)(?:\s*,|\s+where|$)" + + # Search for the pattern in the query + match = re.search(pattern, query, re.IGNORECASE) + if match: + return match.group(1) + else: + return None + +def process_new_records(db_config, df_dbone, df_dbtwo, table_name, columns): + # Compare the DataFrames to find new records. + # Merge the two DataFrames with an indicator flag + merged_df = df_dbone.merge(df_dbtwo, on='id', suffixes=("_df1", "_df2"), how='left', indicator=True) + + # Filter out the rows that are only in dataframe1 + new_records = merged_df[merged_df['_merge'] == 'left_only'] + + if not new_records.empty: + new_records.drop(columns=['_merge'], inplace=True) + + # Keep only the columns from dataframe1 + new_records = new_records[[col + '_df1' if col != 'id' else col for col in columns]] + # Rename the columns to remove the suffix + new_records.columns = columns + + # generate insert queries + insert_queries = generate_insert_queries(new_records, table_name) + + # Insert new records + try: + with psycopg2.connect(**db_config) as conn: + with conn.cursor() as cur: + # Execute each insert query + # for query in insert_queries: + cur.execute(insert_queries) + conn.commit() # Commit the transaction + + except (Exception, psycopg2.DatabaseError) as error: + print(f"Error inserting new records: {error}") + +def process_updated_records(db_config, df_dbone, df_dbtwo, table_name, columns): + # Merge both DataFrames on 'id' + merged_df = df_dbone.merge(df_dbtwo, on="id", suffixes=("_df1", "_df2"), indicator=True) + + # Initialize the mask to False for all rows + mask = pd.Series([False] * len(merged_df)) + + # Update the mask for each column (excluding 'id') + for col in columns: + if col != 'id': + # Use the pandas isnull() function to handle None/NaN comparisons + mask |= ((merged_df[f'{col}_df1'] != merged_df[f'{col}_df2']) & + ~(pd.isnull(merged_df[f'{col}_df1']) & pd.isnull(merged_df[f'{col}_df2']))) + + # Apply the mask to filter the merged DataFrame + updated_records = merged_df[mask] + if not updated_records.empty: + updated_records.drop(columns=['_merge'], inplace=True) + + # Keep only the columns from dataframe1 + updated_records = updated_records[[col + '_df1' if col != 'id' else col for col in columns]] + # Rename the columns to remove the suffix + updated_records.columns = columns + + # generate update queries + update_queries = generate_update_queries(updated_records, table_name) + + # Update modified records + try: + with psycopg2.connect(**db_config) as conn: + with conn.cursor() as cur: + # Execute each insert query + for query in update_queries: + cur.execute(query) + conn.commit() # Commit the transaction + + except (Exception, psycopg2.DatabaseError) as error: + print(f"Error updating existing records: {error}") + +def process_data(parsed_query, columns): + try: + # Fetch a batch of records from dbone + with psycopg2.connect(**dbone_config) as conn1: + with conn1.cursor() as cursor1: + cursor1.execute(parsed_query) + records_dbone = cursor1.fetchall() + + with psycopg2.connect(**dbtwo_config) as conn2: + with conn2.cursor() as cursor2: + cursor2.execute(parsed_query) + records_dbtwo = cursor2.fetchall() + + if not records_dbone: + return -1 # No more records to process + + # Load the records into a pandas dataframe + df_dbone = pd.DataFrame(records_dbone, columns=columns) + df_dbtwo = pd.DataFrame(records_dbtwo, columns=columns) + + table_name = extract_table_name(parsed_query) + process_new_records(dbtwo_config, df_dbone, df_dbtwo, table_name, columns) + process_updated_records(dbtwo_config, df_dbone, df_dbtwo, table_name, columns) + + return records_dbone[-1][0] + + except (Exception, psycopg2.DatabaseError) as error: + print(f"Error: {error}") + +def process_organizations(loaded_properties, batch_size): + print("Processing organizations") + # comma-separated organization IDs + organization_ids = loaded_properties.get("organization_ids") + # Split the comma-separated IDs into a list + organization_ids_list = ",".join(organization_ids.split(",")) + + # Organization List SQL query + organization_query = loaded_properties.get("organization_query") + + columns = ["id", "type", "name", "first_name", "last_name", "email", "phone", "pwd_reset_required", "website", + "wallet", "password", "salt", "active_contract_id", "offering_pay_to_plant", "tree_validation_contract_id", + "logo_url", "map_name", "stakeholder_uuid"] + + last_processed_id = 0 + while last_processed_id > -1: + parsed_query = organization_query % (last_processed_id, organization_ids_list, batch_size) + # Process data & update the last processed ID + last_processed_id = process_data(parsed_query, columns) + if last_processed_id is None: + print("entity: something went wrong so I could not get last processed record") + break + print(f"process organization iteration ended with last processed id: {last_processed_id}") + + print(f"process organization iterations done") + + +def process_planters(loaded_properties, batch_size): + print("Processing planters") + # comma-separated organization IDs + organization_ids = loaded_properties.get("organization_ids") + # Split the comma-separated IDs into a list + organization_ids_list = ",".join(organization_ids.split(",")) + + # Planter List SQL query + planter_query = loaded_properties.get("planter_query") + + columns = ["id", "first_name", "last_name", "email", "organization", "phone", "pwd_reset_required", "image_url", + "person_id", "organization_id", "image_rotation", "gender", "grower_account_uuid"] + + last_processed_id = 0 + while last_processed_id > -1: + parsed_query = planter_query % (last_processed_id, organization_ids_list, batch_size) + # Process data & update the last processed ID + last_processed_id = process_data(parsed_query, columns) + if last_processed_id is None: + print("planter: something went wrong so I could not get last processed record") + break + print(f"process planter iteration ended with last processed id: {last_processed_id}") + + print(f"process planter iterations done") + +def process_trees(loaded_properties, batch_size): + print("Processing trees") + # comma-separated organization IDs + organization_ids = loaded_properties.get("organization_ids") + # Split the comma-separated IDs into a list + organization_ids_list = ",".join(organization_ids.split(",")) + + # Planter List SQL query + tree_query = loaded_properties.get("tree_query") + + columns = ["id", "time_created", "time_updated", "missing", "priority", "cause_of_death_id", "planter_id", + "primary_location_id", "settings_id", "override_settings_id", "dead", "photo_id", "image_url", + "certificate_id", "estimated_geometric_location", "lat", "lon", "gps_accuracy", "active", + "planter_photo_url", "planter_identifier", "device_id", "sequence", "note", "verified", "uuid", "approved", + "status", "cluster_regions_assigned", "species_id", "planting_organization_id", "payment_id", + "contract_id", "token_issued", "morphology", "age", "species", "capture_approval_tag", "rejection_reason", + "matching_hash", "device_identifier", "images", "domain_specific_data", "token_id", "name", "earnings_id", + "session_id"] + + last_processed_id = 0 + while last_processed_id > -1: + parsed_query = tree_query % (last_processed_id, organization_ids_list, batch_size) + + # Process data & update the last processed ID + last_processed_id = process_data(parsed_query, columns) + if last_processed_id is None: + print("tree: something went wrong so I could not get last processed record") + break + + print(f"process tree iteration ended with last processed id: {last_processed_id}") + + print(f"process tree iterations done") + +def scheduled_job(): + print("Migration job running") + + batch_size = loaded_properties.get("query_batch_size") + process_organizations(loaded_properties, batch_size) + process_planters(loaded_properties, batch_size) + process_trees(loaded_properties, batch_size) + + +if __name__ == "__main__": + print("Starting scheduler...") + + sched = BlockingScheduler() + sched.add_job(scheduled_job, "interval", seconds=int(loaded_properties.get("job_interval_seconds"))) + sched.start() + diff --git a/python/etlv2/myenv/bin/Activate.ps1 b/python/etlv2/myenv/bin/Activate.ps1 new file mode 100644 index 0000000..b49d77b --- /dev/null +++ b/python/etlv2/myenv/bin/Activate.ps1 @@ -0,0 +1,247 @@ +<# +.Synopsis +Activate a Python virtual environment for the current PowerShell session. + +.Description +Pushes the python executable for a virtual environment to the front of the +$Env:PATH environment variable and sets the prompt to signify that you are +in a Python virtual environment. Makes use of the command line switches as +well as the `pyvenv.cfg` file values present in the virtual environment. + +.Parameter VenvDir +Path to the directory that contains the virtual environment to activate. The +default value for this is the parent of the directory that the Activate.ps1 +script is located within. + +.Parameter Prompt +The prompt prefix to display when this virtual environment is activated. By +default, this prompt is the name of the virtual environment folder (VenvDir) +surrounded by parentheses and followed by a single space (ie. '(.venv) '). + +.Example +Activate.ps1 +Activates the Python virtual environment that contains the Activate.ps1 script. + +.Example +Activate.ps1 -Verbose +Activates the Python virtual environment that contains the Activate.ps1 script, +and shows extra information about the activation as it executes. + +.Example +Activate.ps1 -VenvDir C:\Users\MyUser\Common\.venv +Activates the Python virtual environment located in the specified location. + +.Example +Activate.ps1 -Prompt "MyPython" +Activates the Python virtual environment that contains the Activate.ps1 script, +and prefixes the current prompt with the specified string (surrounded in +parentheses) while the virtual environment is active. + +.Notes +On Windows, it may be required to enable this Activate.ps1 script by setting the +execution policy for the user. You can do this by issuing the following PowerShell +command: + +PS C:\> Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser + +For more information on Execution Policies: +https://go.microsoft.com/fwlink/?LinkID=135170 + +#> +Param( + [Parameter(Mandatory = $false)] + [String] + $VenvDir, + [Parameter(Mandatory = $false)] + [String] + $Prompt +) + +<# Function declarations --------------------------------------------------- #> + +<# +.Synopsis +Remove all shell session elements added by the Activate script, including the +addition of the virtual environment's Python executable from the beginning of +the PATH variable. + +.Parameter NonDestructive +If present, do not remove this function from the global namespace for the +session. + +#> +function global:deactivate ([switch]$NonDestructive) { + # Revert to original values + + # The prior prompt: + if (Test-Path -Path Function:_OLD_VIRTUAL_PROMPT) { + Copy-Item -Path Function:_OLD_VIRTUAL_PROMPT -Destination Function:prompt + Remove-Item -Path Function:_OLD_VIRTUAL_PROMPT + } + + # The prior PYTHONHOME: + if (Test-Path -Path Env:_OLD_VIRTUAL_PYTHONHOME) { + Copy-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME -Destination Env:PYTHONHOME + Remove-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME + } + + # The prior PATH: + if (Test-Path -Path Env:_OLD_VIRTUAL_PATH) { + Copy-Item -Path Env:_OLD_VIRTUAL_PATH -Destination Env:PATH + Remove-Item -Path Env:_OLD_VIRTUAL_PATH + } + + # Just remove the VIRTUAL_ENV altogether: + if (Test-Path -Path Env:VIRTUAL_ENV) { + Remove-Item -Path env:VIRTUAL_ENV + } + + # Just remove VIRTUAL_ENV_PROMPT altogether. + if (Test-Path -Path Env:VIRTUAL_ENV_PROMPT) { + Remove-Item -Path env:VIRTUAL_ENV_PROMPT + } + + # Just remove the _PYTHON_VENV_PROMPT_PREFIX altogether: + if (Get-Variable -Name "_PYTHON_VENV_PROMPT_PREFIX" -ErrorAction SilentlyContinue) { + Remove-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Scope Global -Force + } + + # Leave deactivate function in the global namespace if requested: + if (-not $NonDestructive) { + Remove-Item -Path function:deactivate + } +} + +<# +.Description +Get-PyVenvConfig parses the values from the pyvenv.cfg file located in the +given folder, and returns them in a map. + +For each line in the pyvenv.cfg file, if that line can be parsed into exactly +two strings separated by `=` (with any amount of whitespace surrounding the =) +then it is considered a `key = value` line. The left hand string is the key, +the right hand is the value. + +If the value starts with a `'` or a `"` then the first and last character is +stripped from the value before being captured. + +.Parameter ConfigDir +Path to the directory that contains the `pyvenv.cfg` file. +#> +function Get-PyVenvConfig( + [String] + $ConfigDir +) { + Write-Verbose "Given ConfigDir=$ConfigDir, obtain values in pyvenv.cfg" + + # Ensure the file exists, and issue a warning if it doesn't (but still allow the function to continue). + $pyvenvConfigPath = Join-Path -Resolve -Path $ConfigDir -ChildPath 'pyvenv.cfg' -ErrorAction Continue + + # An empty map will be returned if no config file is found. + $pyvenvConfig = @{ } + + if ($pyvenvConfigPath) { + + Write-Verbose "File exists, parse `key = value` lines" + $pyvenvConfigContent = Get-Content -Path $pyvenvConfigPath + + $pyvenvConfigContent | ForEach-Object { + $keyval = $PSItem -split "\s*=\s*", 2 + if ($keyval[0] -and $keyval[1]) { + $val = $keyval[1] + + # Remove extraneous quotations around a string value. + if ("'""".Contains($val.Substring(0, 1))) { + $val = $val.Substring(1, $val.Length - 2) + } + + $pyvenvConfig[$keyval[0]] = $val + Write-Verbose "Adding Key: '$($keyval[0])'='$val'" + } + } + } + return $pyvenvConfig +} + + +<# Begin Activate script --------------------------------------------------- #> + +# Determine the containing directory of this script +$VenvExecPath = Split-Path -Parent $MyInvocation.MyCommand.Definition +$VenvExecDir = Get-Item -Path $VenvExecPath + +Write-Verbose "Activation script is located in path: '$VenvExecPath'" +Write-Verbose "VenvExecDir Fullname: '$($VenvExecDir.FullName)" +Write-Verbose "VenvExecDir Name: '$($VenvExecDir.Name)" + +# Set values required in priority: CmdLine, ConfigFile, Default +# First, get the location of the virtual environment, it might not be +# VenvExecDir if specified on the command line. +if ($VenvDir) { + Write-Verbose "VenvDir given as parameter, using '$VenvDir' to determine values" +} +else { + Write-Verbose "VenvDir not given as a parameter, using parent directory name as VenvDir." + $VenvDir = $VenvExecDir.Parent.FullName.TrimEnd("\\/") + Write-Verbose "VenvDir=$VenvDir" +} + +# Next, read the `pyvenv.cfg` file to determine any required value such +# as `prompt`. +$pyvenvCfg = Get-PyVenvConfig -ConfigDir $VenvDir + +# Next, set the prompt from the command line, or the config file, or +# just use the name of the virtual environment folder. +if ($Prompt) { + Write-Verbose "Prompt specified as argument, using '$Prompt'" +} +else { + Write-Verbose "Prompt not specified as argument to script, checking pyvenv.cfg value" + if ($pyvenvCfg -and $pyvenvCfg['prompt']) { + Write-Verbose " Setting based on value in pyvenv.cfg='$($pyvenvCfg['prompt'])'" + $Prompt = $pyvenvCfg['prompt']; + } + else { + Write-Verbose " Setting prompt based on parent's directory's name. (Is the directory name passed to venv module when creating the virtual environment)" + Write-Verbose " Got leaf-name of $VenvDir='$(Split-Path -Path $venvDir -Leaf)'" + $Prompt = Split-Path -Path $venvDir -Leaf + } +} + +Write-Verbose "Prompt = '$Prompt'" +Write-Verbose "VenvDir='$VenvDir'" + +# Deactivate any currently active virtual environment, but leave the +# deactivate function in place. +deactivate -nondestructive + +# Now set the environment variable VIRTUAL_ENV, used by many tools to determine +# that there is an activated venv. +$env:VIRTUAL_ENV = $VenvDir + +if (-not $Env:VIRTUAL_ENV_DISABLE_PROMPT) { + + Write-Verbose "Setting prompt to '$Prompt'" + + # Set the prompt to include the env name + # Make sure _OLD_VIRTUAL_PROMPT is global + function global:_OLD_VIRTUAL_PROMPT { "" } + Copy-Item -Path function:prompt -Destination function:_OLD_VIRTUAL_PROMPT + New-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Description "Python virtual environment prompt prefix" -Scope Global -Option ReadOnly -Visibility Public -Value $Prompt + + function global:prompt { + Write-Host -NoNewline -ForegroundColor Green "($_PYTHON_VENV_PROMPT_PREFIX) " + _OLD_VIRTUAL_PROMPT + } + $env:VIRTUAL_ENV_PROMPT = $Prompt +} + +# Clear PYTHONHOME +if (Test-Path -Path Env:PYTHONHOME) { + Copy-Item -Path Env:PYTHONHOME -Destination Env:_OLD_VIRTUAL_PYTHONHOME + Remove-Item -Path Env:PYTHONHOME +} + +# Add the venv to the PATH +Copy-Item -Path Env:PATH -Destination Env:_OLD_VIRTUAL_PATH +$Env:PATH = "$VenvExecDir$([System.IO.Path]::PathSeparator)$Env:PATH" diff --git a/python/etlv2/myenv/bin/activate b/python/etlv2/myenv/bin/activate new file mode 100644 index 0000000..8e45217 --- /dev/null +++ b/python/etlv2/myenv/bin/activate @@ -0,0 +1,69 @@ +# This file must be used with "source bin/activate" *from bash* +# you cannot run it directly + +deactivate () { + # reset old environment variables + if [ -n "${_OLD_VIRTUAL_PATH:-}" ] ; then + PATH="${_OLD_VIRTUAL_PATH:-}" + export PATH + unset _OLD_VIRTUAL_PATH + fi + if [ -n "${_OLD_VIRTUAL_PYTHONHOME:-}" ] ; then + PYTHONHOME="${_OLD_VIRTUAL_PYTHONHOME:-}" + export PYTHONHOME + unset _OLD_VIRTUAL_PYTHONHOME + fi + + # This should detect bash and zsh, which have a hash command that must + # be called to get it to forget past commands. Without forgetting + # past commands the $PATH changes we made may not be respected + if [ -n "${BASH:-}" -o -n "${ZSH_VERSION:-}" ] ; then + hash -r 2> /dev/null + fi + + if [ -n "${_OLD_VIRTUAL_PS1:-}" ] ; then + PS1="${_OLD_VIRTUAL_PS1:-}" + export PS1 + unset _OLD_VIRTUAL_PS1 + fi + + unset VIRTUAL_ENV + unset VIRTUAL_ENV_PROMPT + if [ ! "${1:-}" = "nondestructive" ] ; then + # Self destruct! + unset -f deactivate + fi +} + +# unset irrelevant variables +deactivate nondestructive + +VIRTUAL_ENV="/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv" +export VIRTUAL_ENV + +_OLD_VIRTUAL_PATH="$PATH" +PATH="$VIRTUAL_ENV/bin:$PATH" +export PATH + +# unset PYTHONHOME if set +# this will fail if PYTHONHOME is set to the empty string (which is bad anyway) +# could use `if (set -u; : $PYTHONHOME) ;` in bash +if [ -n "${PYTHONHOME:-}" ] ; then + _OLD_VIRTUAL_PYTHONHOME="${PYTHONHOME:-}" + unset PYTHONHOME +fi + +if [ -z "${VIRTUAL_ENV_DISABLE_PROMPT:-}" ] ; then + _OLD_VIRTUAL_PS1="${PS1:-}" + PS1="(myenv) ${PS1:-}" + export PS1 + VIRTUAL_ENV_PROMPT="(myenv) " + export VIRTUAL_ENV_PROMPT +fi + +# This should detect bash and zsh, which have a hash command that must +# be called to get it to forget past commands. Without forgetting +# past commands the $PATH changes we made may not be respected +if [ -n "${BASH:-}" -o -n "${ZSH_VERSION:-}" ] ; then + hash -r 2> /dev/null +fi diff --git a/python/etlv2/myenv/bin/activate.csh b/python/etlv2/myenv/bin/activate.csh new file mode 100644 index 0000000..a92802e --- /dev/null +++ b/python/etlv2/myenv/bin/activate.csh @@ -0,0 +1,26 @@ +# This file must be used with "source bin/activate.csh" *from csh*. +# You cannot run it directly. +# Created by Davide Di Blasi . +# Ported to Python 3.3 venv by Andrew Svetlov + +alias deactivate 'test $?_OLD_VIRTUAL_PATH != 0 && setenv PATH "$_OLD_VIRTUAL_PATH" && unset _OLD_VIRTUAL_PATH; rehash; test $?_OLD_VIRTUAL_PROMPT != 0 && set prompt="$_OLD_VIRTUAL_PROMPT" && unset _OLD_VIRTUAL_PROMPT; unsetenv VIRTUAL_ENV; unsetenv VIRTUAL_ENV_PROMPT; test "\!:*" != "nondestructive" && unalias deactivate' + +# Unset irrelevant variables. +deactivate nondestructive + +setenv VIRTUAL_ENV "/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv" + +set _OLD_VIRTUAL_PATH="$PATH" +setenv PATH "$VIRTUAL_ENV/bin:$PATH" + + +set _OLD_VIRTUAL_PROMPT="$prompt" + +if (! "$?VIRTUAL_ENV_DISABLE_PROMPT") then + set prompt = "(myenv) $prompt" + setenv VIRTUAL_ENV_PROMPT "(myenv) " +endif + +alias pydoc python -m pydoc + +rehash diff --git a/python/etlv2/myenv/bin/activate.fish b/python/etlv2/myenv/bin/activate.fish new file mode 100644 index 0000000..0dd3170 --- /dev/null +++ b/python/etlv2/myenv/bin/activate.fish @@ -0,0 +1,69 @@ +# This file must be used with "source /bin/activate.fish" *from fish* +# (https://fishshell.com/); you cannot run it directly. + +function deactivate -d "Exit virtual environment and return to normal shell environment" + # reset old environment variables + if test -n "$_OLD_VIRTUAL_PATH" + set -gx PATH $_OLD_VIRTUAL_PATH + set -e _OLD_VIRTUAL_PATH + end + if test -n "$_OLD_VIRTUAL_PYTHONHOME" + set -gx PYTHONHOME $_OLD_VIRTUAL_PYTHONHOME + set -e _OLD_VIRTUAL_PYTHONHOME + end + + if test -n "$_OLD_FISH_PROMPT_OVERRIDE" + set -e _OLD_FISH_PROMPT_OVERRIDE + # prevents error when using nested fish instances (Issue #93858) + if functions -q _old_fish_prompt + functions -e fish_prompt + functions -c _old_fish_prompt fish_prompt + functions -e _old_fish_prompt + end + end + + set -e VIRTUAL_ENV + set -e VIRTUAL_ENV_PROMPT + if test "$argv[1]" != "nondestructive" + # Self-destruct! + functions -e deactivate + end +end + +# Unset irrelevant variables. +deactivate nondestructive + +set -gx VIRTUAL_ENV "/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv" + +set -gx _OLD_VIRTUAL_PATH $PATH +set -gx PATH "$VIRTUAL_ENV/bin" $PATH + +# Unset PYTHONHOME if set. +if set -q PYTHONHOME + set -gx _OLD_VIRTUAL_PYTHONHOME $PYTHONHOME + set -e PYTHONHOME +end + +if test -z "$VIRTUAL_ENV_DISABLE_PROMPT" + # fish uses a function instead of an env var to generate the prompt. + + # Save the current fish_prompt function as the function _old_fish_prompt. + functions -c fish_prompt _old_fish_prompt + + # With the original prompt function renamed, we can override with our own. + function fish_prompt + # Save the return status of the last command. + set -l old_status $status + + # Output the venv prompt; color taken from the blue of the Python logo. + printf "%s%s%s" (set_color 4B8BBE) "(myenv) " (set_color normal) + + # Restore the return status of the previous command. + echo "exit $old_status" | . + # Output the original/"old" prompt. + _old_fish_prompt + end + + set -gx _OLD_FISH_PROMPT_OVERRIDE "$VIRTUAL_ENV" + set -gx VIRTUAL_ENV_PROMPT "(myenv) " +end diff --git a/python/etlv2/myenv/bin/dotenv b/python/etlv2/myenv/bin/dotenv new file mode 100755 index 0000000..8679cf5 --- /dev/null +++ b/python/etlv2/myenv/bin/dotenv @@ -0,0 +1,8 @@ +#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 +# -*- coding: utf-8 -*- +import re +import sys +from dotenv.__main__ import cli +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(cli()) diff --git a/python/etlv2/myenv/bin/f2py b/python/etlv2/myenv/bin/f2py new file mode 100755 index 0000000..ddf280e --- /dev/null +++ b/python/etlv2/myenv/bin/f2py @@ -0,0 +1,8 @@ +#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 +# -*- coding: utf-8 -*- +import re +import sys +from numpy.f2py.f2py2e import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/python/etlv2/myenv/bin/pip b/python/etlv2/myenv/bin/pip new file mode 100755 index 0000000..36d0e9a --- /dev/null +++ b/python/etlv2/myenv/bin/pip @@ -0,0 +1,8 @@ +#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 +# -*- coding: utf-8 -*- +import re +import sys +from pip._internal.cli.main import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/python/etlv2/myenv/bin/pip3 b/python/etlv2/myenv/bin/pip3 new file mode 100755 index 0000000..36d0e9a --- /dev/null +++ b/python/etlv2/myenv/bin/pip3 @@ -0,0 +1,8 @@ +#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 +# -*- coding: utf-8 -*- +import re +import sys +from pip._internal.cli.main import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/python/etlv2/myenv/bin/pip3.11 b/python/etlv2/myenv/bin/pip3.11 new file mode 100755 index 0000000..36d0e9a --- /dev/null +++ b/python/etlv2/myenv/bin/pip3.11 @@ -0,0 +1,8 @@ +#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 +# -*- coding: utf-8 -*- +import re +import sys +from pip._internal.cli.main import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/python/etlv2/myenv/bin/python b/python/etlv2/myenv/bin/python new file mode 120000 index 0000000..6e7f3c7 --- /dev/null +++ b/python/etlv2/myenv/bin/python @@ -0,0 +1 @@ +python3.11 \ No newline at end of file diff --git a/python/etlv2/myenv/bin/python3 b/python/etlv2/myenv/bin/python3 new file mode 120000 index 0000000..6e7f3c7 --- /dev/null +++ b/python/etlv2/myenv/bin/python3 @@ -0,0 +1 @@ +python3.11 \ No newline at end of file diff --git a/python/etlv2/myenv/bin/python3.11 b/python/etlv2/myenv/bin/python3.11 new file mode 120000 index 0000000..34556ec --- /dev/null +++ b/python/etlv2/myenv/bin/python3.11 @@ -0,0 +1 @@ +/usr/local/opt/python@3.11/bin/python3.11 \ No newline at end of file diff --git a/python/etlv2/myenv/pyvenv.cfg b/python/etlv2/myenv/pyvenv.cfg new file mode 100644 index 0000000..91e8f84 --- /dev/null +++ b/python/etlv2/myenv/pyvenv.cfg @@ -0,0 +1,5 @@ +home = /usr/local/opt/python@3.11/bin +include-system-site-packages = false +version = 3.11.4 +executable = /usr/local/Cellar/python@3.11/3.11.4_1/Frameworks/Python.framework/Versions/3.11/bin/python3.11 +command = /usr/local/opt/python@3.11/bin/python3.11 -m venv /Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv diff --git a/python/etlv2/requirements.txt b/python/etlv2/requirements.txt new file mode 100644 index 0000000..02f98a6 --- /dev/null +++ b/python/etlv2/requirements.txt @@ -0,0 +1,10 @@ +APScheduler==3.10.4 +numpy==1.26.4 +pandas==2.2.1 +psycopg2==2.9.9 +python-dateutil==2.8.2 +python-dotenv==1.0.1 +pytz==2024.1 +six==1.16.0 +tzdata==2024.1 +tzlocal==5.2 From 18ed179493abe329e93a85468ea90d6b87bc9fe4 Mon Sep 17 00:00:00 2001 From: "sola.akanmu" Date: Sat, 6 Apr 2024 12:15:16 +0200 Subject: [PATCH 02/11] updated queries and added max iterations config --- python/etlv2/README.md | 2 ++ python/etlv2/config.properties | 13 ++++++++----- python/etlv2/migration_job.py | 29 +++++++++++++++++++++++++---- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/python/etlv2/README.md b/python/etlv2/README.md index 8f3004e..338cd68 100644 --- a/python/etlv2/README.md +++ b/python/etlv2/README.md @@ -35,6 +35,8 @@ DEST_DB_PASSWORD=mypassword ``` # here job is scheduled to run every 24 hours, set value to preferred frequency job_interval_seconds=86400 +# number of iterations to run (0) means to run continuously, unless manually stopped +max_iterations=0 # set batch size for each run, 1000 here means a limit of 1000 records per batch of query query_batch_size=1000 # list of organization_ids separated by commas diff --git a/python/etlv2/config.properties b/python/etlv2/config.properties index 4f0e4fa..d7e5fce 100644 --- a/python/etlv2/config.properties +++ b/python/etlv2/config.properties @@ -1,11 +1,14 @@ # here job is scheduled to run every 24 hours, set value to preferred frequency -job_interval_seconds=86400 +job_interval_seconds=10 +# number of iterations to run (0) means to run continuously, unless manually stopped +max_iterations=1 # set batch size for each run, 1000 here means a limit of 1000 records per batch of query query_batch_size=1000 # list of organization_ids separated by commas -organization_ids=15,1,13,14,2,3,4,5,6,30,31,32,11 +# organization_ids=15,1,13,14,2,3,4,5,6,30,31,32,11 +organization_ids=3312 #queries -organization_query=SELECT * FROM entity WHERE id > %s and id in (%s) ORDER BY id LIMIT %s; -planter_query=SELECT * FROM planter WHERE id > %s and organization_id in (%s) ORDER BY id LIMIT %s; -tree_query=SELECT trees.* FROM trees, planter, entity WHERE trees.id > %s and planter_id = planter.id and entity.id = planter.organization_id and entity.id in (%s) ORDER BY trees.id LIMIT %s; \ No newline at end of file +organization_query=SELECT * FROM entity WHERE id > %s and (type = 'o' or type = 'O') and id in (%s) ORDER BY id LIMIT %s; +planter_query=SELECT planter.* FROM planter, entity WHERE planter.id > %s and entity.id = planter.organization_id and (entity.type = 'o' or entity.type = 'O') and planter.organization_id in (%s) ORDER BY id LIMIT %s; +tree_query=SELECT trees.* FROM trees, planter, entity WHERE trees.id > %s and planter_id = planter.id and entity.id = planter.organization_id and (entity.type = 'o' or entity.type = 'O') and entity.id in (%s) ORDER BY trees.id LIMIT %s; \ No newline at end of file diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index 8528884..c9bfc7c 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -5,6 +5,7 @@ import pandas as pd import re from decimal import Decimal +import sys load_dotenv(override=True) @@ -38,6 +39,9 @@ "password": dest_db_password } +sched = BlockingScheduler() +iterations = 0 + #function to read properties file as a json object def read_properties_file(file_path): properties = {} @@ -239,6 +243,7 @@ def process_organizations(loaded_properties, batch_size): print("Processing organizations") # comma-separated organization IDs organization_ids = loaded_properties.get("organization_ids") + # Split the comma-separated IDs into a list organization_ids_list = ",".join(organization_ids.split(",")) @@ -320,19 +325,35 @@ def process_trees(loaded_properties, batch_size): print(f"process tree iterations done") +def track_iterations(max_iterations): + global iterations + + iterations += 1 + if iterations >= max_iterations: + print(f"job has now run {iterations} times and is shutting down") + sched.shutdown(wait=False) + raise sys.exit() + + def scheduled_job(): print("Migration job running") batch_size = loaded_properties.get("query_batch_size") + max_iterations = int(loaded_properties.get("max_iterations")) + process_organizations(loaded_properties, batch_size) process_planters(loaded_properties, batch_size) process_trees(loaded_properties, batch_size) + if max_iterations > 0: + track_iterations(max_iterations) if __name__ == "__main__": print("Starting scheduler...") - sched = BlockingScheduler() - sched.add_job(scheduled_job, "interval", seconds=int(loaded_properties.get("job_interval_seconds"))) - sched.start() - + try: + sched.add_job(scheduled_job, "interval", max_instances=1, seconds=int(loaded_properties.get("job_interval_seconds"))) + sched.start() + except Exception as e: + print(f"Error: {e}") + raise SystemExit("Exiting due to the error above") From 7d2623ecca588545d198547ee72c73f1e633ce74 Mon Sep 17 00:00:00 2001 From: "sola.akanmu" Date: Sun, 14 Apr 2024 11:51:07 +0200 Subject: [PATCH 03/11] allow selection of columns to migrate --- python/etlv2/README.md | 13 +++++- python/etlv2/config.properties | 35 ++++++++++++---- python/etlv2/migration_job.py | 76 ++++++++++++++++------------------ 3 files changed, 76 insertions(+), 48 deletions(-) diff --git a/python/etlv2/README.md b/python/etlv2/README.md index 338cd68..d3a1fcc 100644 --- a/python/etlv2/README.md +++ b/python/etlv2/README.md @@ -32,7 +32,11 @@ DEST_DB_PASSWORD=mypassword ## Update config.properties file +### Parsing the config file makes use of configparser, so the square brackets such as [job] represents sections and should not be deleted. In case you are commenting a part of a key/value pair, makes ure to leave a space before. e,g: key=1,23 #456 + ``` +#section name job - job configs +[job] # here job is scheduled to run every 24 hours, set value to preferred frequency job_interval_seconds=86400 # number of iterations to run (0) means to run continuously, unless manually stopped @@ -42,8 +46,15 @@ query_batch_size=1000 # list of organization_ids separated by commas organization_ids=15,1,13,14,2,3,4,5,6,30,31,32,11 -#queries ... +#section name queries +[queries] list of queries being run for data migration +#[columns] will be replaced by list of columns specified in columns section below +organization_query = select [columns] from table + +#section name columns: columns to migrate +[columns] +list of columns to migrate for each query ``` ## Activate Environment Variables: diff --git a/python/etlv2/config.properties b/python/etlv2/config.properties index d7e5fce..9df7ce0 100644 --- a/python/etlv2/config.properties +++ b/python/etlv2/config.properties @@ -1,14 +1,35 @@ +#section name job - job configs +[job] # here job is scheduled to run every 24 hours, set value to preferred frequency -job_interval_seconds=10 +job_interval_seconds=5 # number of iterations to run (0) means to run continuously, unless manually stopped max_iterations=1 # set batch size for each run, 1000 here means a limit of 1000 records per batch of query query_batch_size=1000 # list of organization_ids separated by commas -# organization_ids=15,1,13,14,2,3,4,5,6,30,31,32,11 -organization_ids=3312 +organization_ids=3312 #15,1,13,14,2,3,4,5,6,30,31,32,11 -#queries -organization_query=SELECT * FROM entity WHERE id > %s and (type = 'o' or type = 'O') and id in (%s) ORDER BY id LIMIT %s; -planter_query=SELECT planter.* FROM planter, entity WHERE planter.id > %s and entity.id = planter.organization_id and (entity.type = 'o' or entity.type = 'O') and planter.organization_id in (%s) ORDER BY id LIMIT %s; -tree_query=SELECT trees.* FROM trees, planter, entity WHERE trees.id > %s and planter_id = planter.id and entity.id = planter.organization_id and (entity.type = 'o' or entity.type = 'O') and entity.id in (%s) ORDER BY trees.id LIMIT %s; \ No newline at end of file +#section name queries +[queries] +#[columns] will be replaced by list of columns specified in columns section below +organization_query=SELECT [columns] FROM entity WHERE entity.id > %s and (type = 'o' or type = 'O') and entity.id in (%s) ORDER BY entity.id LIMIT %s; +planter_query=SELECT [columns] FROM planter, entity WHERE planter.id > %s and entity.id = planter.organization_id and (entity.type = 'o' or entity.type = 'O') and planter.organization_id in (%s) ORDER BY planter.id LIMIT %s; +tree_query=SELECT [columns] FROM trees, planter, entity WHERE trees.id > %s and planter_id = planter.id and entity.id = planter.organization_id and (entity.type = 'o' or entity.type = 'O') and entity.id in (%s) ORDER BY trees.id LIMIT %s; + +#section name columns: columns to migrate +[columns] +organization_columns=entity.id, entity.type, entity.name, entity.first_name, entity.last_name, entity.email, entity.phone, + entity.pwd_reset_required, entity.website, entity.wallet, entity.password, entity.salt, entity.active_contract_id, + entity.offering_pay_to_plant, entity.tree_validation_contract_id, entity.logo_url, entity.map_name, + entity.stakeholder_uuid +planter_columns=planter.id, planter.first_name, planter.last_name, planter.email, planter.organization, planter.phone, + planter.pwd_reset_required, planter.image_url, planter.person_id, planter.organization_id, planter.image_rotation, + planter.gender, planter.grower_account_uuid +tree_columns=trees.id, trees.time_created, trees.time_updated, trees.missing, trees.priority, trees.cause_of_death_id, + trees.planter_id, trees.primary_location_id, trees.settings_id, trees.override_settings_id, trees.dead, trees.photo_id, + trees.image_url, trees.certificate_id, trees.estimated_geometric_location, trees.lat, trees.lon, trees.gps_accuracy, + trees.active, trees.planter_photo_url, trees.planter_identifier, trees.device_id, trees.note, trees.verified, + trees.uuid, trees.approved, trees.status, trees.cluster_regions_assigned, trees.species_id, + trees.planting_organization_id, trees.payment_id, trees.contract_id, trees.token_issued, trees.morphology, trees.age, + trees.species, trees.capture_approval_tag, trees.rejection_reason, trees.matching_hash, trees.device_identifier, + trees.images, trees.domain_specific_data, trees.token_id, trees.name, trees.earnings_id, trees.session_id \ No newline at end of file diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index c9bfc7c..3a170fd 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -6,6 +6,7 @@ import re from decimal import Decimal import sys +import configparser load_dotenv(override=True) @@ -44,17 +45,10 @@ #function to read properties file as a json object def read_properties_file(file_path): - properties = {} - try: - with open(file_path, 'r') as f: - for line in f: - line = line.strip() - if line and not line.startswith('#'): - key, value = line.split('=', 1) - properties[key.strip()] = value.strip() - except FileNotFoundError: - print(f"File {file_path} not found.") - return properties + config = configparser.ConfigParser(inline_comment_prefixes=('#'), interpolation=None) + config.read(file_path) + + return config print("Loading properties...") # load properties @@ -242,23 +236,25 @@ def process_data(parsed_query, columns): def process_organizations(loaded_properties, batch_size): print("Processing organizations") # comma-separated organization IDs - organization_ids = loaded_properties.get("organization_ids") + organization_ids = loaded_properties["job"]["organization_ids"] # Split the comma-separated IDs into a list organization_ids_list = ",".join(organization_ids.split(",")) # Organization List SQL query - organization_query = loaded_properties.get("organization_query") - - columns = ["id", "type", "name", "first_name", "last_name", "email", "phone", "pwd_reset_required", "website", - "wallet", "password", "salt", "active_contract_id", "offering_pay_to_plant", "tree_validation_contract_id", - "logo_url", "map_name", "stakeholder_uuid"] - + organization_query = loaded_properties["queries"]["organization_query"] + columns_str = re.sub(r'\s+', '', loaded_properties["columns"]["organization_columns"]) + # Remove the table name and keep only the column names + columns_arr = [col.split(".")[1] for col in columns_str.split(",")] + last_processed_id = 0 while last_processed_id > -1: parsed_query = organization_query % (last_processed_id, organization_ids_list, batch_size) + # Replace the placeholder with the new string + parsed_query = parsed_query.replace("[columns]", columns_str) + # Process data & update the last processed ID - last_processed_id = process_data(parsed_query, columns) + last_processed_id = process_data(parsed_query, columns_arr) if last_processed_id is None: print("entity: something went wrong so I could not get last processed record") break @@ -270,21 +266,24 @@ def process_organizations(loaded_properties, batch_size): def process_planters(loaded_properties, batch_size): print("Processing planters") # comma-separated organization IDs - organization_ids = loaded_properties.get("organization_ids") + organization_ids = loaded_properties["job"]["organization_ids"] # Split the comma-separated IDs into a list organization_ids_list = ",".join(organization_ids.split(",")) # Planter List SQL query - planter_query = loaded_properties.get("planter_query") - - columns = ["id", "first_name", "last_name", "email", "organization", "phone", "pwd_reset_required", "image_url", - "person_id", "organization_id", "image_rotation", "gender", "grower_account_uuid"] + planter_query = loaded_properties["queries"]["planter_query"] + columns_str = re.sub(r'\s+', '', loaded_properties["columns"]["planter_columns"]) + # Remove the table name and keep only the column names + columns_arr = [col.split(".")[1] for col in columns_str.split(",")] last_processed_id = 0 while last_processed_id > -1: parsed_query = planter_query % (last_processed_id, organization_ids_list, batch_size) + # Replace the placeholder with the new string + parsed_query = parsed_query.replace("[columns]", columns_str) + # Process data & update the last processed ID - last_processed_id = process_data(parsed_query, columns) + last_processed_id = process_data(parsed_query, columns_arr) if last_processed_id is None: print("planter: something went wrong so I could not get last processed record") break @@ -295,28 +294,25 @@ def process_planters(loaded_properties, batch_size): def process_trees(loaded_properties, batch_size): print("Processing trees") # comma-separated organization IDs - organization_ids = loaded_properties.get("organization_ids") + organization_ids = loaded_properties["job"]["organization_ids"] # Split the comma-separated IDs into a list organization_ids_list = ",".join(organization_ids.split(",")) # Planter List SQL query - tree_query = loaded_properties.get("tree_query") + tree_query = loaded_properties["queries"]["tree_query"] + columns_str = re.sub(r'\s+', '', loaded_properties["columns"]["tree_columns"]) + # Remove the table name and keep only the column names + columns_arr = [col.split(".")[1] for col in columns_str.split(",")] - columns = ["id", "time_created", "time_updated", "missing", "priority", "cause_of_death_id", "planter_id", - "primary_location_id", "settings_id", "override_settings_id", "dead", "photo_id", "image_url", - "certificate_id", "estimated_geometric_location", "lat", "lon", "gps_accuracy", "active", - "planter_photo_url", "planter_identifier", "device_id", "sequence", "note", "verified", "uuid", "approved", - "status", "cluster_regions_assigned", "species_id", "planting_organization_id", "payment_id", - "contract_id", "token_issued", "morphology", "age", "species", "capture_approval_tag", "rejection_reason", - "matching_hash", "device_identifier", "images", "domain_specific_data", "token_id", "name", "earnings_id", - "session_id"] last_processed_id = 0 while last_processed_id > -1: parsed_query = tree_query % (last_processed_id, organization_ids_list, batch_size) - + # Replace the placeholder with the new string + parsed_query = parsed_query.replace("[columns]", columns_str) + # Process data & update the last processed ID - last_processed_id = process_data(parsed_query, columns) + last_processed_id = process_data(parsed_query, columns_arr) if last_processed_id is None: print("tree: something went wrong so I could not get last processed record") break @@ -338,8 +334,8 @@ def track_iterations(max_iterations): def scheduled_job(): print("Migration job running") - batch_size = loaded_properties.get("query_batch_size") - max_iterations = int(loaded_properties.get("max_iterations")) + batch_size = loaded_properties["job"]["query_batch_size"] + max_iterations = int(loaded_properties["job"]["max_iterations"]) process_organizations(loaded_properties, batch_size) process_planters(loaded_properties, batch_size) @@ -352,7 +348,7 @@ def scheduled_job(): print("Starting scheduler...") try: - sched.add_job(scheduled_job, "interval", max_instances=1, seconds=int(loaded_properties.get("job_interval_seconds"))) + sched.add_job(scheduled_job, "interval", max_instances=1, seconds=int(loaded_properties["job"]["job_interval_seconds"])) sched.start() except Exception as e: print(f"Error: {e}") From bfb6c4cfd4f2a859ab6849304dab658c43770fba Mon Sep 17 00:00:00 2001 From: Zexi Yin Date: Thu, 4 Jul 2024 16:12:07 -0700 Subject: [PATCH 04/11] variable name changes and typo fixed, makes the exit control flow nicer --- python/etlv2/README.md | 2 +- python/etlv2/migration_job.py | 38 +++++++++++++++++------------------ 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/python/etlv2/README.md b/python/etlv2/README.md index d3a1fcc..34d3ce1 100644 --- a/python/etlv2/README.md +++ b/python/etlv2/README.md @@ -32,7 +32,7 @@ DEST_DB_PASSWORD=mypassword ## Update config.properties file -### Parsing the config file makes use of configparser, so the square brackets such as [job] represents sections and should not be deleted. In case you are commenting a part of a key/value pair, makes ure to leave a space before. e,g: key=1,23 #456 +### Parsing the config file makes use of configparser, so the square brackets such as [job] represents sections and should not be deleted. In case you are commenting a part of a key/value pair, makes sure to leave a space before. e,g: key=1,23 #456 ``` #section name job - job configs diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index 3a170fd..6eb8b49 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -23,7 +23,7 @@ dest_db_password = os.environ.get("DEST_DB_PASSWORD") # Database connection details for dbone -dbone_config = { +src_db_config = { "host": src_db_host, "port": src_db_port, "dbname": src_db_name, @@ -32,7 +32,7 @@ } # Database connection details for dbtwo -dbtwo_config = { +dst_db_config = { "host": dest_db_host, "port": dest_db_port, "dbname": dest_db_name, @@ -95,7 +95,7 @@ def generate_update_queries(df, table_name, identifier_column="id"): query_prefix = f"UPDATE {table_name} SET " # Get the column names - columns = df.columns.tolist() + columns = df.columns.tolistupdate_queries() update_queries = [] # Iterate over each row in the dataframe @@ -207,28 +207,28 @@ def process_updated_records(db_config, df_dbone, df_dbtwo, table_name, columns): def process_data(parsed_query, columns): try: # Fetch a batch of records from dbone - with psycopg2.connect(**dbone_config) as conn1: - with conn1.cursor() as cursor1: - cursor1.execute(parsed_query) - records_dbone = cursor1.fetchall() + with psycopg2.connect(**src_db_config) as src_conn: + with src_conn.cursor() as src_cursor: + src_cursor.execute(parsed_query) + records_src_db = src_cursor.fetchall() - with psycopg2.connect(**dbtwo_config) as conn2: - with conn2.cursor() as cursor2: - cursor2.execute(parsed_query) - records_dbtwo = cursor2.fetchall() + with psycopg2.connect(**dst_db_config) as dest_conn: + with dest_conn.cursor() as dest_cursor: + dest_cursor.execute(parsed_query) + records_dst_db = dest_cursor.fetchall() - if not records_dbone: + if not records_src_db: return -1 # No more records to process # Load the records into a pandas dataframe - df_dbone = pd.DataFrame(records_dbone, columns=columns) - df_dbtwo = pd.DataFrame(records_dbtwo, columns=columns) + df_src_db = pd.DataFrame(records_src_db, columns=columns) + df_dst_db = pd.DataFrame(records_dst_db, columns=columns) table_name = extract_table_name(parsed_query) - process_new_records(dbtwo_config, df_dbone, df_dbtwo, table_name, columns) - process_updated_records(dbtwo_config, df_dbone, df_dbtwo, table_name, columns) + process_new_records(dst_db_config, df_src_db, df_dst_db, table_name, columns) + process_updated_records(dst_db_config, df_src_db, df_dst_db, table_name, columns) - return records_dbone[-1][0] + return records_src_db[-1][0] except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") @@ -326,10 +326,8 @@ def track_iterations(max_iterations): iterations += 1 if iterations >= max_iterations: - print(f"job has now run {iterations} times and is shutting down") + print(f"job has now run {iterations} times, max_iteration is {max_iterations} and is shutting down") sched.shutdown(wait=False) - raise sys.exit() - def scheduled_job(): print("Migration job running") From ec27c15895ead1801a1b33407ee908323afda022 Mon Sep 17 00:00:00 2001 From: Zexi Yin Date: Sat, 6 Jul 2024 09:56:27 -0700 Subject: [PATCH 05/11] WIP: trying to make the logic nicer --- python/etlv2/migration_job.py | 20 ++++++++++++-------- python/etlv2/myenv/pyvenv.cfg | 6 ++---- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index 6eb8b49..70b1ae8 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -134,10 +134,10 @@ def extract_table_name(query): else: return None -def process_new_records(db_config, df_dbone, df_dbtwo, table_name, columns): +def process_new_records(db_config, df_src_db, df_dst_db, table_name, columns): # Compare the DataFrames to find new records. # Merge the two DataFrames with an indicator flag - merged_df = df_dbone.merge(df_dbtwo, on='id', suffixes=("_df1", "_df2"), how='left', indicator=True) + merged_df = df_src_db.merge(df_dst_db, on='id', suffixes=("_df1", "_df2"), how='left', indicator=True) # Filter out the rows that are only in dataframe1 new_records = merged_df[merged_df['_merge'] == 'left_only'] @@ -159,6 +159,7 @@ def process_new_records(db_config, df_dbone, df_dbtwo, table_name, columns): with conn.cursor() as cur: # Execute each insert query # for query in insert_queries: + print("DEBUG INSERT QUERY:", insert_queries) cur.execute(insert_queries) conn.commit() # Commit the transaction @@ -198,6 +199,7 @@ def process_updated_records(db_config, df_dbone, df_dbtwo, table_name, columns): with conn.cursor() as cur: # Execute each insert query for query in update_queries: + print("DEBUG UPDATE QUERY:", query) cur.execute(query) conn.commit() # Commit the transaction @@ -217,6 +219,8 @@ def process_data(parsed_query, columns): dest_cursor.execute(parsed_query) records_dst_db = dest_cursor.fetchall() + # print("DEBUG, source_record:", records_src_db) + # print("DEBUG, dest_record:", records_dst_db) if not records_src_db: return -1 # No more records to process @@ -252,13 +256,13 @@ def process_organizations(loaded_properties, batch_size): parsed_query = organization_query % (last_processed_id, organization_ids_list, batch_size) # Replace the placeholder with the new string parsed_query = parsed_query.replace("[columns]", columns_str) - + print("DEBUG organization query:", parsed_query, "LAST PROCESSED ID: ", last_processed_id) # Process data & update the last processed ID last_processed_id = process_data(parsed_query, columns_arr) if last_processed_id is None: print("entity: something went wrong so I could not get last processed record") break - print(f"process organization iteration ended with last processed id: {last_processed_id}") + # print(f"process organization iteration ended with last processed id: {last_processed_id}") print(f"process organization iterations done") @@ -281,7 +285,7 @@ def process_planters(loaded_properties, batch_size): parsed_query = planter_query % (last_processed_id, organization_ids_list, batch_size) # Replace the placeholder with the new string parsed_query = parsed_query.replace("[columns]", columns_str) - + print("DEBUG planter query:", parsed_query, "LAST PROCESSED ID: ", last_processed_id) # Process data & update the last processed ID last_processed_id = process_data(parsed_query, columns_arr) if last_processed_id is None: @@ -310,7 +314,7 @@ def process_trees(loaded_properties, batch_size): parsed_query = tree_query % (last_processed_id, organization_ids_list, batch_size) # Replace the placeholder with the new string parsed_query = parsed_query.replace("[columns]", columns_str) - + print("DEBUG tree query:", parsed_query, "LAST PROCESSED ID: ", last_processed_id) # Process data & update the last processed ID last_processed_id = process_data(parsed_query, columns_arr) if last_processed_id is None: @@ -344,10 +348,10 @@ def scheduled_job(): if __name__ == "__main__": print("Starting scheduler...") - try: - sched.add_job(scheduled_job, "interval", max_instances=1, seconds=int(loaded_properties["job"]["job_interval_seconds"])) + sched.add_job(scheduled_job, "interval", max_instances=10, seconds=int(loaded_properties["job"]["job_interval_seconds"])) sched.start() + # scheduled_job() except Exception as e: print(f"Error: {e}") raise SystemExit("Exiting due to the error above") diff --git a/python/etlv2/myenv/pyvenv.cfg b/python/etlv2/myenv/pyvenv.cfg index 91e8f84..0537ffc 100644 --- a/python/etlv2/myenv/pyvenv.cfg +++ b/python/etlv2/myenv/pyvenv.cfg @@ -1,5 +1,3 @@ -home = /usr/local/opt/python@3.11/bin +home = /usr/bin include-system-site-packages = false -version = 3.11.4 -executable = /usr/local/Cellar/python@3.11/3.11.4_1/Frameworks/Python.framework/Versions/3.11/bin/python3.11 -command = /usr/local/opt/python@3.11/bin/python3.11 -m venv /Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv +version = 3.10.12 From 25c75477b6f2e13e7a7fd16f988f0d79b00693c9 Mon Sep 17 00:00:00 2001 From: Zexi Yin Date: Sat, 6 Jul 2024 09:57:45 -0700 Subject: [PATCH 06/11] trying to do a complete rewrite --- python/etlv2/migration_job.py | 445 ++++++++++------------------------ 1 file changed, 129 insertions(+), 316 deletions(-) diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index 70b1ae8..3470af4 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -1,10 +1,14 @@ from dotenv import load_dotenv -from apscheduler.schedulers.blocking import BlockingScheduler +from typing import List, Tuple, Any +# from apscheduler.schedulers.blocking import BlockingScheduler import os import psycopg2 -import pandas as pd -import re -from decimal import Decimal +from psycopg2 import sql +import logging +# import pandas as pd +# import re +# from decimal import Decimal +import collections import sys import configparser @@ -23,7 +27,11 @@ dest_db_password = os.environ.get("DEST_DB_PASSWORD") # Database connection details for dbone +<<<<<<< Updated upstream src_db_config = { +======= +src_db_info = { +>>>>>>> Stashed changes "host": src_db_host, "port": src_db_port, "dbname": src_db_name, @@ -32,7 +40,11 @@ } # Database connection details for dbtwo +<<<<<<< Updated upstream dst_db_config = { +======= +dst_db_info = { +>>>>>>> Stashed changes "host": dest_db_host, "port": dest_db_port, "dbname": dest_db_name, @@ -40,318 +52,119 @@ "password": dest_db_password } -sched = BlockingScheduler() -iterations = 0 - -#function to read properties file as a json object -def read_properties_file(file_path): - config = configparser.ConfigParser(inline_comment_prefixes=('#'), interpolation=None) - config.read(file_path) - - return config - -print("Loading properties...") -# load properties -loaded_properties = read_properties_file("config.properties") - -def escape_str(value): - return value.replace("'", "''") - -def generate_insert_queries(df, table_name): - try: - # Initialize the query - query = f"INSERT INTO {table_name} (" - - # Get the column names - columns = df.columns.tolist() - - # Add column names to the query - query += ", ".join(columns) + ") VALUES (" - - # Iterate over each row in the dataframe - for index, row in df.iterrows(): - values = [] - for col in columns: - value = row[col] - if isinstance(value, (int, float, Decimal)): - # If value is an integer or float, add it without quotes - values.append(f"NULL" if pd.isnull(value) else str(value)) - else: - # Otherwise, add it with quotes - values.append(f"NULL" if pd.isnull(value) else (f"'{value}'" if isinstance(value, pd.Timestamp) else f"'{escape_str(value)}'")) - - query += ", ".join(values) + "), (" - - # Remove the trailing ", (" and add a semicolon at the end - query = query[:-3] + ";" - - except (Exception) as error: - print(f"Error generating insert query: {error}") - - return query - -def generate_update_queries(df, table_name, identifier_column="id"): - try: - query_prefix = f"UPDATE {table_name} SET " - - # Get the column names - columns = df.columns.tolistupdate_queries() - update_queries = [] - - # Iterate over each row in the dataframe - for index, row in df.iterrows(): - identifier_value = row[identifier_column] - updates = [] - for col in columns: - if (col == identifier_column): - continue - value = row[col] - if isinstance(value, (int, float, Decimal)): - # If value is an integer or float, add it without quotes - value = f"NULL" if pd.isnull(value) else str(value) - updates.append(f"{col} = {value}") - else: - # Otherwise, add it with quotes - value = f"NULL" if pd.isnull(value) else (f"'{value}'" if isinstance(value, pd.Timestamp) else f"'{escape_str(value)}'") - updates.append(f"{col} = {value}") - - query = query_prefix + ", ".join(updates) + " WHERE " + f"{identifier_column} = {identifier_value};" - update_queries.append(query) - - except (Exception) as error: - print(f"Error generating update query: {error}") - - return update_queries - -def extract_table_name(query): - # Regular expression pattern to match the table name - pattern = r"from\s+(\w+)(?:\s*,|\s+where|$)" - - # Search for the pattern in the query - match = re.search(pattern, query, re.IGNORECASE) - if match: - return match.group(1) - else: - return None - -def process_new_records(db_config, df_src_db, df_dst_db, table_name, columns): - # Compare the DataFrames to find new records. - # Merge the two DataFrames with an indicator flag - merged_df = df_src_db.merge(df_dst_db, on='id', suffixes=("_df1", "_df2"), how='left', indicator=True) - - # Filter out the rows that are only in dataframe1 - new_records = merged_df[merged_df['_merge'] == 'left_only'] - - if not new_records.empty: - new_records.drop(columns=['_merge'], inplace=True) - - # Keep only the columns from dataframe1 - new_records = new_records[[col + '_df1' if col != 'id' else col for col in columns]] - # Rename the columns to remove the suffix - new_records.columns = columns - - # generate insert queries - insert_queries = generate_insert_queries(new_records, table_name) - - # Insert new records - try: - with psycopg2.connect(**db_config) as conn: - with conn.cursor() as cur: - # Execute each insert query - # for query in insert_queries: - print("DEBUG INSERT QUERY:", insert_queries) - cur.execute(insert_queries) - conn.commit() # Commit the transaction - - except (Exception, psycopg2.DatabaseError) as error: - print(f"Error inserting new records: {error}") - -def process_updated_records(db_config, df_dbone, df_dbtwo, table_name, columns): - # Merge both DataFrames on 'id' - merged_df = df_dbone.merge(df_dbtwo, on="id", suffixes=("_df1", "_df2"), indicator=True) - - # Initialize the mask to False for all rows - mask = pd.Series([False] * len(merged_df)) - - # Update the mask for each column (excluding 'id') - for col in columns: - if col != 'id': - # Use the pandas isnull() function to handle None/NaN comparisons - mask |= ((merged_df[f'{col}_df1'] != merged_df[f'{col}_df2']) & - ~(pd.isnull(merged_df[f'{col}_df1']) & pd.isnull(merged_df[f'{col}_df2']))) - - # Apply the mask to filter the merged DataFrame - updated_records = merged_df[mask] - if not updated_records.empty: - updated_records.drop(columns=['_merge'], inplace=True) - - # Keep only the columns from dataframe1 - updated_records = updated_records[[col + '_df1' if col != 'id' else col for col in columns]] - # Rename the columns to remove the suffix - updated_records.columns = columns - - # generate update queries - update_queries = generate_update_queries(updated_records, table_name) - - # Update modified records - try: - with psycopg2.connect(**db_config) as conn: - with conn.cursor() as cur: - # Execute each insert query - for query in update_queries: - print("DEBUG UPDATE QUERY:", query) - cur.execute(query) - conn.commit() # Commit the transaction - - except (Exception, psycopg2.DatabaseError) as error: - print(f"Error updating existing records: {error}") - -def process_data(parsed_query, columns): - try: - # Fetch a batch of records from dbone - with psycopg2.connect(**src_db_config) as src_conn: - with src_conn.cursor() as src_cursor: - src_cursor.execute(parsed_query) - records_src_db = src_cursor.fetchall() - - with psycopg2.connect(**dst_db_config) as dest_conn: - with dest_conn.cursor() as dest_cursor: - dest_cursor.execute(parsed_query) - records_dst_db = dest_cursor.fetchall() - - # print("DEBUG, source_record:", records_src_db) - # print("DEBUG, dest_record:", records_dst_db) - if not records_src_db: - return -1 # No more records to process - - # Load the records into a pandas dataframe - df_src_db = pd.DataFrame(records_src_db, columns=columns) - df_dst_db = pd.DataFrame(records_dst_db, columns=columns) - - table_name = extract_table_name(parsed_query) - process_new_records(dst_db_config, df_src_db, df_dst_db, table_name, columns) - process_updated_records(dst_db_config, df_src_db, df_dst_db, table_name, columns) - - return records_src_db[-1][0] - - except (Exception, psycopg2.DatabaseError) as error: - print(f"Error: {error}") - -def process_organizations(loaded_properties, batch_size): - print("Processing organizations") - # comma-separated organization IDs - organization_ids = loaded_properties["job"]["organization_ids"] - - # Split the comma-separated IDs into a list - organization_ids_list = ",".join(organization_ids.split(",")) - - # Organization List SQL query - organization_query = loaded_properties["queries"]["organization_query"] - columns_str = re.sub(r'\s+', '', loaded_properties["columns"]["organization_columns"]) - # Remove the table name and keep only the column names - columns_arr = [col.split(".")[1] for col in columns_str.split(",")] - - last_processed_id = 0 - while last_processed_id > -1: - parsed_query = organization_query % (last_processed_id, organization_ids_list, batch_size) - # Replace the placeholder with the new string - parsed_query = parsed_query.replace("[columns]", columns_str) - print("DEBUG organization query:", parsed_query, "LAST PROCESSED ID: ", last_processed_id) - # Process data & update the last processed ID - last_processed_id = process_data(parsed_query, columns_arr) - if last_processed_id is None: - print("entity: something went wrong so I could not get last processed record") - break - # print(f"process organization iteration ended with last processed id: {last_processed_id}") - - print(f"process organization iterations done") - - -def process_planters(loaded_properties, batch_size): - print("Processing planters") - # comma-separated organization IDs - organization_ids = loaded_properties["job"]["organization_ids"] - # Split the comma-separated IDs into a list - organization_ids_list = ",".join(organization_ids.split(",")) - - # Planter List SQL query - planter_query = loaded_properties["queries"]["planter_query"] - columns_str = re.sub(r'\s+', '', loaded_properties["columns"]["planter_columns"]) - # Remove the table name and keep only the column names - columns_arr = [col.split(".")[1] for col in columns_str.split(",")] - - last_processed_id = 0 - while last_processed_id > -1: - parsed_query = planter_query % (last_processed_id, organization_ids_list, batch_size) - # Replace the placeholder with the new string - parsed_query = parsed_query.replace("[columns]", columns_str) - print("DEBUG planter query:", parsed_query, "LAST PROCESSED ID: ", last_processed_id) - # Process data & update the last processed ID - last_processed_id = process_data(parsed_query, columns_arr) - if last_processed_id is None: - print("planter: something went wrong so I could not get last processed record") - break - print(f"process planter iteration ended with last processed id: {last_processed_id}") - - print(f"process planter iterations done") - -def process_trees(loaded_properties, batch_size): - print("Processing trees") - # comma-separated organization IDs - organization_ids = loaded_properties["job"]["organization_ids"] - # Split the comma-separated IDs into a list - organization_ids_list = ",".join(organization_ids.split(",")) - - # Planter List SQL query - tree_query = loaded_properties["queries"]["tree_query"] - columns_str = re.sub(r'\s+', '', loaded_properties["columns"]["tree_columns"]) - # Remove the table name and keep only the column names - columns_arr = [col.split(".")[1] for col in columns_str.split(",")] - - - last_processed_id = 0 - while last_processed_id > -1: - parsed_query = tree_query % (last_processed_id, organization_ids_list, batch_size) - # Replace the placeholder with the new string - parsed_query = parsed_query.replace("[columns]", columns_str) - print("DEBUG tree query:", parsed_query, "LAST PROCESSED ID: ", last_processed_id) - # Process data & update the last processed ID - last_processed_id = process_data(parsed_query, columns_arr) - if last_processed_id is None: - print("tree: something went wrong so I could not get last processed record") - break - - print(f"process tree iteration ended with last processed id: {last_processed_id}") - - print(f"process tree iterations done") - -def track_iterations(max_iterations): - global iterations - - iterations += 1 - if iterations >= max_iterations: - print(f"job has now run {iterations} times, max_iteration is {max_iterations} and is shutting down") - sched.shutdown(wait=False) - -def scheduled_job(): - print("Migration job running") - - batch_size = loaded_properties["job"]["query_batch_size"] - max_iterations = int(loaded_properties["job"]["max_iterations"]) - - process_organizations(loaded_properties, batch_size) - process_planters(loaded_properties, batch_size) - process_trees(loaded_properties, batch_size) - if max_iterations > 0: - track_iterations(max_iterations) - +def fetch_table_schema(src_db_conn, table_name) -> List[Tuple[str, str]]: # returns a list of (column_name, type) tuples + with src_db_conn.cursor() as cursor: + cursor.execute(sql.SQL(""" + SELECT + c.column_name, + c.data_type, + CASE + WHEN c.data_type = 'USER-DEFINED' THEN + (SELECT t.typname FROM pg_catalog.pg_type t WHERE t.oid = a.atttypid) + ELSE c.data_type + END as actual_data_type + FROM + information_schema.columns c + JOIN pg_catalog.pg_class r ON r.relname = c.table_name + JOIN pg_catalog.pg_attribute a ON a.attrelid = r.oid AND a.attname = c.column_name + WHERE + c.table_name = %s + ORDER BY c.ordinal_position; + """), [table_name]) + # cursor.execute(sql.SQL("SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s"), [table_name]) + table_schema = [(row[0], row[1]) for row in cursor.fetchall()] + print(table_schema) + return table_schema + +def fetch_user_defined_types(src_db_conn): + with src_db_conn.cursor() as cursor: + cursor.execute(""" + SELECT pg_type.typname, pg_enum.enumlabel FROM pg_type JOIN pg_enum ON pg_enum.enumtypid = pg_type.oid; + """) + types = cursor.fetchall() + user_defined_types = collections.defaultdict(list) + for type_name, enum_label in types: + user_defined_types[type_name].append(enum_label) + + return user_defined_types + +def create_user_defined_types(dest_db_conn, user_defined_types): + with dest_db_conn.cursor() as cursor: + for type_name, enum_labels in user_defined_types.items(): + enum_labels = list(set(enum_labels)) + # print(type_name, enum_labels) + check_type_query = sql.SQL("SELECT 1 FROM pg_type WHERE typname = %s;") + cursor.execute(check_type_query, (type_name,)) + if not cursor.fetchone(): + create_type_query = sql.SQL(""" + CREATE TYPE {} AS ENUM ({}); + """).format( + sql.Identifier(type_name), + sql.SQL(', ').join(sql.Literal(label) for label in enum_labels) + ) + cursor.execute(create_type_query) + + dest_db_conn.commit() + +def fetch_table_data(conn, table_name: str, batch_size = 10 ** 6) -> Tuple[List[tuple[Any, ...]], List[str]]: + logger.info(f"Reading from the {table_name} table from the source db") + data = [] + with conn.cursor() as cursor: + cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 2").format(sql.Identifier(table_name))) + col_names = [desc[0] for desc in cursor.description] + while True: + fetched_data = cursor.fetchmany(batch_size) + if not fetched_data: + break + else: + data.append(fetched_data) + return data, col_names + +def create_table(dst_db_conn, table_name, table_schema) -> None: + with dst_db_conn.cursor() as cursor: + create_query = f"CREATE TABLE IF NOT EXISTS {table_name} (" + n = len(table_schema) + for i in range(n): + col_name, t = table_schema[i] + if i != n - 1: + create_query += f"{col_name} {t}, " + else: + create_query += f"{col_name} {t}" + create_query += ");" + cursor.execute(create_query) + dst_db_conn.commit() + +def insert_table_data(conn, table_name, col_names, data) -> None: + with conn.cursor() as cur: + insert_query = sql.SQL("INSERT INTO {} VALUES ({})").format( + sql.SQL(table_name), + sql.SQL(', ').join(sql.Placeholder() for _ in col_names) + ) + print(table_name) + print(col_names) + cur.executemany(insert_query, data[0]) + conn.commit() + +# def migrate_table(source_conn, dest_conn, table_name) -> None: +# data, colnames = fetch_table_data(source_conn, table_name) +# create_table(dest_conn, table_name, colnames) +# insert_table_data(dest_conn, table_name, colnames, data) if __name__ == "__main__": - print("Starting scheduler...") + logger = logging.getLogger(__name__) + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + logger.info("Starting the data migration job") + src_conn = psycopg2.connect(**src_db_info) + dest_conn = psycopg2.connect(**dst_db_info) try: - sched.add_job(scheduled_job, "interval", max_instances=10, seconds=int(loaded_properties["job"]["job_interval_seconds"])) - sched.start() - # scheduled_job() - except Exception as e: - print(f"Error: {e}") - raise SystemExit("Exiting due to the error above") + # user_defined_types = fetch_user_defined_types(src_conn) + # create_user_defined_types(dest_conn, user_defined_types) + for table_name in ['entity', 'planter', 'trees']: + data_tuples, col_names = fetch_table_data(src_conn, table_name) + table_schema = fetch_table_schema(src_conn, table_name) + # create_table(dest_conn, table_name, table_schema) # create table in the dest db + col_names = [elem[0] for elem in table_schema] + insert_table_data(dest_conn, table_name, col_names, data_tuples) + finally: + src_conn.close() + dest_conn.close() From 89d583b0c310f0fc7223e84a9a1fc4a3fc48f2c2 Mon Sep 17 00:00:00 2001 From: Zexi Yin Date: Mon, 8 Jul 2024 22:22:20 -0700 Subject: [PATCH 07/11] rewrite the migration process and reconciled table differences in columns --- python/etlv2/.gitignore | 1 + python/etlv2/migration_job.py | 192 +++++++++++++++++++++++----------- 2 files changed, 133 insertions(+), 60 deletions(-) diff --git a/python/etlv2/.gitignore b/python/etlv2/.gitignore index 25be5ab..1277034 100644 --- a/python/etlv2/.gitignore +++ b/python/etlv2/.gitignore @@ -105,6 +105,7 @@ celerybeat-schedule .venv env/ venv/ +myenv/ ENV/ env.bak/ venv.bak/ diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index 3470af4..ee31c20 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -1,16 +1,15 @@ +import collections from dotenv import load_dotenv -from typing import List, Tuple, Any -# from apscheduler.schedulers.blocking import BlockingScheduler +import logging import os import psycopg2 from psycopg2 import sql -import logging +import sys +from typing import List, Tuple, Any # import pandas as pd # import re # from decimal import Decimal -import collections -import sys -import configparser +# from apscheduler.schedulers.blocking import BlockingScheduler load_dotenv(override=True) @@ -27,11 +26,8 @@ dest_db_password = os.environ.get("DEST_DB_PASSWORD") # Database connection details for dbone -<<<<<<< Updated upstream -src_db_config = { -======= + src_db_info = { ->>>>>>> Stashed changes "host": src_db_host, "port": src_db_port, "dbname": src_db_name, @@ -40,11 +36,8 @@ } # Database connection details for dbtwo -<<<<<<< Updated upstream -dst_db_config = { -======= + dst_db_info = { ->>>>>>> Stashed changes "host": dest_db_host, "port": dest_db_port, "dbname": dest_db_name, @@ -72,8 +65,7 @@ def fetch_table_schema(src_db_conn, table_name) -> List[Tuple[str, str]]: # retu ORDER BY c.ordinal_position; """), [table_name]) # cursor.execute(sql.SQL("SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s"), [table_name]) - table_schema = [(row[0], row[1]) for row in cursor.fetchall()] - print(table_schema) + table_schema = [(row[0], row[2]) for row in cursor.fetchall()] return table_schema def fetch_user_defined_types(src_db_conn): @@ -88,37 +80,117 @@ def fetch_user_defined_types(src_db_conn): return user_defined_types -def create_user_defined_types(dest_db_conn, user_defined_types): - with dest_db_conn.cursor() as cursor: - for type_name, enum_labels in user_defined_types.items(): - enum_labels = list(set(enum_labels)) - # print(type_name, enum_labels) - check_type_query = sql.SQL("SELECT 1 FROM pg_type WHERE typname = %s;") - cursor.execute(check_type_query, (type_name,)) - if not cursor.fetchone(): - create_type_query = sql.SQL(""" - CREATE TYPE {} AS ENUM ({}); - """).format( - sql.Identifier(type_name), - sql.SQL(', ').join(sql.Literal(label) for label in enum_labels) - ) - cursor.execute(create_type_query) - - dest_db_conn.commit() - -def fetch_table_data(conn, table_name: str, batch_size = 10 ** 6) -> Tuple[List[tuple[Any, ...]], List[str]]: - logger.info(f"Reading from the {table_name} table from the source db") - data = [] + +def register_all_udts(source_conn, dest_conn): + query = """ + SELECT t.typname AS type_name, + n.nspname AS schema_name, + array_agg(e.enumlabel ORDER BY e.enumsortorder) AS enum_labels + FROM pg_type t + JOIN pg_namespace n ON n.oid = t.typnamespace + JOIN pg_enum e ON t.oid = e.enumtypid + GROUP BY t.typname, n.nspname; + """ + + cursor = source_conn.cursor() + cursor.execute(query) + udt_info = cursor.fetchall() + + for type_name, schema_name, enum_labels in udt_info: + full_type_name = f"{schema_name}.{type_name}" + + psycopg2.extensions.register_type( + psycopg2.extensions.new_type( + (source_conn.get_dsn_parameters()['dbname'] + '.' + full_type_name,), + full_type_name, + lambda value, cursor: value + ) + ) + + cursor.close() + + # Register the UDTs in the destination database + for type_name, schema_name, enum_labels in udt_info: + # full_type_name = f"{schema_name}.{type_name}" + full_type_name = f"{type_name}" + enum_lst = ', '.join([f"'{label}'" for label in enum_labels]) + + create_enum_type_query = f"CREATE TYPE {full_type_name} AS ENUM ({enum_lst});" + udt_query = f""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = '{type_name}') THEN + {create_enum_type_query} + END IF; + END$$; + """ + dest_cursor = dest_conn.cursor() + dest_cursor.execute(udt_query) + dest_cursor.close() + +# only fetch columns that are present in the source table +# transform the data fetched +def fetch_and_transform(conn, table_name: str, src_table_columns: List[str], dest_table_columns: List[str], batch_size = 10 ** 6) -> List[tuple[Any, ...]]: + """ + Only fetch columns that are present in BOTH the source table and the dest table + For columns that only exists in the dest table but not in the source table, + we transform the data fetched from the source table to fit the dest table's schema by appending NULL values. + """ + res_data = [] + request_columns = [col for col in dest_table_columns if col in src_table_columns] # all columns that exist both in the src table and dest table with conn.cursor() as cursor: - cursor.execute(sql.SQL("SELECT * FROM {} LIMIT 2").format(sql.Identifier(table_name))) - col_names = [desc[0] for desc in cursor.description] + request_columns_str = ", ".join(request_columns) if request_columns else "*" + query = f"SELECT {request_columns_str} FROM {table_name};" + cursor.execute(query) while True: fetched_data = cursor.fetchmany(batch_size) if not fetched_data: break else: - data.append(fetched_data) - return data, col_names + for data in fetched_data: # iterate over every row + transformed_data = [] + i = j = 0 + m, n = len(request_columns), len(dest_table_columns) + while i < m and j < n: # for every column of that row + if request_columns[i] == dest_table_columns[j]: + transformed_data.append(data[i]) + i += 1 + j += 1 + else: + transformed_data.append(None) + j += 1 + res_data.append(tuple(transformed_data)) + # for col_val, col_name in zip(data, request_columns): # for every column of that row + # if col_name == "sequence": + # print("reached") + # if col_name not in dest_table_columns: + # transformed_data.append("NULL") + # else: + # transformed_data.append(col_val) + # res_data.append(tuple(transformed_data)) + return res_data + +def fetch_table_columns(conn, table_name: str) -> List[str]: + with conn.cursor() as cursor: + cursor.execute(sql.SQL(""" + SELECT + c.column_name, + c.data_type, + CASE + WHEN c.data_type = 'USER-DEFINED' THEN + (SELECT t.typname FROM pg_catalog.pg_type t WHERE t.oid = a.atttypid) + ELSE c.data_type + END as actual_data_type + FROM + information_schema.columns c + JOIN pg_catalog.pg_class r ON r.relname = c.table_name + JOIN pg_catalog.pg_attribute a ON a.attrelid = r.oid AND a.attname = c.column_name + WHERE + c.table_name = %s + ORDER BY c.ordinal_position; + """), [table_name]) + table_cols = [row[0] for row in cursor.fetchall()] + return table_cols def create_table(dst_db_conn, table_name, table_schema) -> None: with dst_db_conn.cursor() as cursor: @@ -136,35 +208,35 @@ def create_table(dst_db_conn, table_name, table_schema) -> None: def insert_table_data(conn, table_name, col_names, data) -> None: with conn.cursor() as cur: - insert_query = sql.SQL("INSERT INTO {} VALUES ({})").format( - sql.SQL(table_name), - sql.SQL(', ').join(sql.Placeholder() for _ in col_names) - ) - print(table_name) - print(col_names) - cur.executemany(insert_query, data[0]) - conn.commit() - -# def migrate_table(source_conn, dest_conn, table_name) -> None: -# data, colnames = fetch_table_data(source_conn, table_name) -# create_table(dest_conn, table_name, colnames) -# insert_table_data(dest_conn, table_name, colnames, data) - + cols = ", ".join(col_names) + placeholders = ", ".join(["%s"] * len(col_names)) + for d in data: + insert_query = f"INSERT INTO {table_name} ({cols}) VALUES ({placeholders}) ON CONFLICT DO NOTHING;" + cur.execute(insert_query, d) + conn.commit() + if __name__ == "__main__": logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, stream=sys.stdout) logger.info("Starting the data migration job") src_conn = psycopg2.connect(**src_db_info) dest_conn = psycopg2.connect(**dst_db_info) + register_all_udts(src_conn, dest_conn) + + entity_ids = [] try: # user_defined_types = fetch_user_defined_types(src_conn) # create_user_defined_types(dest_conn, user_defined_types) for table_name in ['entity', 'planter', 'trees']: - data_tuples, col_names = fetch_table_data(src_conn, table_name) - table_schema = fetch_table_schema(src_conn, table_name) - # create_table(dest_conn, table_name, table_schema) # create table in the dest db - col_names = [elem[0] for elem in table_schema] - insert_table_data(dest_conn, table_name, col_names, data_tuples) + src_table_columns = fetch_table_columns(src_conn, table_name) + dest_table_columns = fetch_table_columns(dest_conn, table_name) + transformed_tuples = fetch_and_transform(src_conn, table_name, src_table_columns, dest_table_columns) + # print(table_name, len(dest_table_columns), len(transformed_tuples[0])) + insert_table_data(dest_conn, table_name, dest_table_columns, transformed_tuples) + # table_schema = fetch_table_schema(dest_conn, table_name) + # new_table_name = table_name + "_new" + # create_table(dest_conn, new_table_name, table_schema) # create table in the dest db + # col_names = [elem[0] for elem in table_schema] finally: src_conn.close() dest_conn.close() From 21bbc8859c6b3a79f86a685ccc226e2bf1422cb7 Mon Sep 17 00:00:00 2001 From: Zexi Yin Date: Mon, 8 Jul 2024 22:24:58 -0700 Subject: [PATCH 08/11] removed comments --- python/etlv2/migration_job.py | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index ee31c20..5d0b73d 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -6,10 +6,6 @@ from psycopg2 import sql import sys from typing import List, Tuple, Any -# import pandas as pd -# import re -# from decimal import Decimal -# from apscheduler.schedulers.blocking import BlockingScheduler load_dotenv(override=True) @@ -111,7 +107,6 @@ def register_all_udts(source_conn, dest_conn): # Register the UDTs in the destination database for type_name, schema_name, enum_labels in udt_info: - # full_type_name = f"{schema_name}.{type_name}" full_type_name = f"{type_name}" enum_lst = ', '.join([f"'{label}'" for label in enum_labels]) @@ -160,14 +155,6 @@ def fetch_and_transform(conn, table_name: str, src_table_columns: List[str], des transformed_data.append(None) j += 1 res_data.append(tuple(transformed_data)) - # for col_val, col_name in zip(data, request_columns): # for every column of that row - # if col_name == "sequence": - # print("reached") - # if col_name not in dest_table_columns: - # transformed_data.append("NULL") - # else: - # transformed_data.append(col_val) - # res_data.append(tuple(transformed_data)) return res_data def fetch_table_columns(conn, table_name: str) -> List[str]: @@ -225,18 +212,14 @@ def insert_table_data(conn, table_name, col_names, data) -> None: entity_ids = [] try: - # user_defined_types = fetch_user_defined_types(src_conn) + # the following is needed if we need to create user defined types that isn't present the the dest database + # user_defined_types = fetch_user_defined_types(src_conn) # create_user_defined_types(dest_conn, user_defined_types) for table_name in ['entity', 'planter', 'trees']: src_table_columns = fetch_table_columns(src_conn, table_name) dest_table_columns = fetch_table_columns(dest_conn, table_name) transformed_tuples = fetch_and_transform(src_conn, table_name, src_table_columns, dest_table_columns) - # print(table_name, len(dest_table_columns), len(transformed_tuples[0])) insert_table_data(dest_conn, table_name, dest_table_columns, transformed_tuples) - # table_schema = fetch_table_schema(dest_conn, table_name) - # new_table_name = table_name + "_new" - # create_table(dest_conn, new_table_name, table_schema) # create table in the dest db - # col_names = [elem[0] for elem in table_schema] finally: src_conn.close() dest_conn.close() From 652c11b285b23f5f1902e162ecaf26c65517fe96 Mon Sep 17 00:00:00 2001 From: Zexi Yin Date: Sat, 20 Jul 2024 15:26:32 -0700 Subject: [PATCH 09/11] filter data by entity_id --- python/etlv2/.gitignore | 2 ++ python/etlv2/README.md | 33 +++------------------------------ python/etlv2/migration_job.py | 12 ++++++++++-- python/etlv2/requirements.txt | 10 +--------- 4 files changed, 16 insertions(+), 41 deletions(-) diff --git a/python/etlv2/.gitignore b/python/etlv2/.gitignore index 1277034..a60b1a8 100644 --- a/python/etlv2/.gitignore +++ b/python/etlv2/.gitignore @@ -124,3 +124,5 @@ venv.bak/ .mypy_cache/ .dmypy.json dmypy.json + +*.txt diff --git a/python/etlv2/README.md b/python/etlv2/README.md index 34d3ce1..e630105 100644 --- a/python/etlv2/README.md +++ b/python/etlv2/README.md @@ -11,10 +11,10 @@ Run the following command to create a virtual environment (replace myenv with yo `pip install -r requirements.txt` -## Create an Environment Variable File +## Set up Database Access Credential in .env -Create a .env file in the project directory to store environment variables. -Open the .env file and add the following key-value configs, update the values. +Create an .env file in the project directory to configure database login credentials. +Open the .env file and copy the following key-value configs, update the values accordingly. ``` SRC_DB_HOST=localhost @@ -30,33 +30,6 @@ DEST_DB_USER=myuser DEST_DB_PASSWORD=mypassword ``` -## Update config.properties file - -### Parsing the config file makes use of configparser, so the square brackets such as [job] represents sections and should not be deleted. In case you are commenting a part of a key/value pair, makes sure to leave a space before. e,g: key=1,23 #456 - -``` -#section name job - job configs -[job] -# here job is scheduled to run every 24 hours, set value to preferred frequency -job_interval_seconds=86400 -# number of iterations to run (0) means to run continuously, unless manually stopped -max_iterations=0 -# set batch size for each run, 1000 here means a limit of 1000 records per batch of query -query_batch_size=1000 -# list of organization_ids separated by commas -organization_ids=15,1,13,14,2,3,4,5,6,30,31,32,11 - -#section name queries -[queries] -list of queries being run for data migration -#[columns] will be replaced by list of columns specified in columns section below -organization_query = select [columns] from table - -#section name columns: columns to migrate -[columns] -list of columns to migrate for each query -``` - ## Activate Environment Variables: Activate the environment variables by sourcing the .env file in your terminal: `source .env` diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index 5d0b73d..01341f8 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -131,11 +131,18 @@ def fetch_and_transform(conn, table_name: str, src_table_columns: List[str], des For columns that only exists in the dest table but not in the source table, we transform the data fetched from the source table to fit the dest table's schema by appending NULL values. """ + assert table_name in TABLES, "you must specify the tables you want to fetch in TABLES" res_data = [] request_columns = [col for col in dest_table_columns if col in src_table_columns] # all columns that exist both in the src table and dest table with conn.cursor() as cursor: request_columns_str = ", ".join(request_columns) if request_columns else "*" query = f"SELECT {request_columns_str} FROM {table_name};" + if entity_ids: # use the specified entity id as filter condition + query = query[:-1] + " WHERE " + filter_column = "id" if table_name == "entity" else "organization_id" if table_name == "planter" else "planter_id" + entity_ids_str = ", ".join(map(str, entity_ids)) + query += f"{filter_column} IN ({entity_ids_str})" + print(query) cursor.execute(query) while True: fetched_data = cursor.fetchmany(batch_size) @@ -210,12 +217,13 @@ def insert_table_data(conn, table_name, col_names, data) -> None: dest_conn = psycopg2.connect(**dst_db_info) register_all_udts(src_conn, dest_conn) - entity_ids = [] + entity_ids = [1] + TABLES = ['entity', 'planter', 'trees'] try: # the following is needed if we need to create user defined types that isn't present the the dest database # user_defined_types = fetch_user_defined_types(src_conn) # create_user_defined_types(dest_conn, user_defined_types) - for table_name in ['entity', 'planter', 'trees']: + for table_name in TABLES: src_table_columns = fetch_table_columns(src_conn, table_name) dest_table_columns = fetch_table_columns(dest_conn, table_name) transformed_tuples = fetch_and_transform(src_conn, table_name, src_table_columns, dest_table_columns) diff --git a/python/etlv2/requirements.txt b/python/etlv2/requirements.txt index 02f98a6..3299fc7 100644 --- a/python/etlv2/requirements.txt +++ b/python/etlv2/requirements.txt @@ -1,10 +1,2 @@ -APScheduler==3.10.4 -numpy==1.26.4 -pandas==2.2.1 psycopg2==2.9.9 -python-dateutil==2.8.2 -python-dotenv==1.0.1 -pytz==2024.1 -six==1.16.0 -tzdata==2024.1 -tzlocal==5.2 +python-dotenv==1.0.1 \ No newline at end of file From f5e39cb5b21b9228762303a54a23b0aa49d4ed44 Mon Sep 17 00:00:00 2001 From: Zexi Yin Date: Sat, 20 Jul 2024 15:36:31 -0700 Subject: [PATCH 10/11] removed myenv/ --- python/etlv2/.gitignore | 1 - python/etlv2/README.md | 7 +- python/etlv2/myenv/bin/Activate.ps1 | 247 --------------------------- python/etlv2/myenv/bin/activate | 69 -------- python/etlv2/myenv/bin/activate.csh | 26 --- python/etlv2/myenv/bin/activate.fish | 69 -------- python/etlv2/myenv/bin/dotenv | 8 - python/etlv2/myenv/bin/f2py | 8 - python/etlv2/myenv/bin/pip | 8 - python/etlv2/myenv/bin/pip3 | 8 - python/etlv2/myenv/bin/pip3.11 | 8 - python/etlv2/myenv/bin/python | 1 - python/etlv2/myenv/bin/python3 | 1 - python/etlv2/myenv/bin/python3.11 | 1 - python/etlv2/myenv/pyvenv.cfg | 3 - 15 files changed, 5 insertions(+), 460 deletions(-) delete mode 100644 python/etlv2/myenv/bin/Activate.ps1 delete mode 100644 python/etlv2/myenv/bin/activate delete mode 100644 python/etlv2/myenv/bin/activate.csh delete mode 100644 python/etlv2/myenv/bin/activate.fish delete mode 100755 python/etlv2/myenv/bin/dotenv delete mode 100755 python/etlv2/myenv/bin/f2py delete mode 100755 python/etlv2/myenv/bin/pip delete mode 100755 python/etlv2/myenv/bin/pip3 delete mode 100755 python/etlv2/myenv/bin/pip3.11 delete mode 120000 python/etlv2/myenv/bin/python delete mode 120000 python/etlv2/myenv/bin/python3 delete mode 120000 python/etlv2/myenv/bin/python3.11 delete mode 100644 python/etlv2/myenv/pyvenv.cfg diff --git a/python/etlv2/.gitignore b/python/etlv2/.gitignore index a60b1a8..75ca440 100644 --- a/python/etlv2/.gitignore +++ b/python/etlv2/.gitignore @@ -105,7 +105,6 @@ celerybeat-schedule .venv env/ venv/ -myenv/ ENV/ env.bak/ venv.bak/ diff --git a/python/etlv2/README.md b/python/etlv2/README.md index e630105..24a8917 100644 --- a/python/etlv2/README.md +++ b/python/etlv2/README.md @@ -13,8 +13,11 @@ Run the following command to create a virtual environment (replace myenv with yo ## Set up Database Access Credential in .env -Create an .env file in the project directory to configure database login credentials. -Open the .env file and copy the following key-value configs, update the values accordingly. +Create an .env file in the project directory to configure your database login credentials. + +`touch .env` + +Edit the .env file and copy the following key-value configs, update the values according to your actual database access credential. ``` SRC_DB_HOST=localhost diff --git a/python/etlv2/myenv/bin/Activate.ps1 b/python/etlv2/myenv/bin/Activate.ps1 deleted file mode 100644 index b49d77b..0000000 --- a/python/etlv2/myenv/bin/Activate.ps1 +++ /dev/null @@ -1,247 +0,0 @@ -<# -.Synopsis -Activate a Python virtual environment for the current PowerShell session. - -.Description -Pushes the python executable for a virtual environment to the front of the -$Env:PATH environment variable and sets the prompt to signify that you are -in a Python virtual environment. Makes use of the command line switches as -well as the `pyvenv.cfg` file values present in the virtual environment. - -.Parameter VenvDir -Path to the directory that contains the virtual environment to activate. The -default value for this is the parent of the directory that the Activate.ps1 -script is located within. - -.Parameter Prompt -The prompt prefix to display when this virtual environment is activated. By -default, this prompt is the name of the virtual environment folder (VenvDir) -surrounded by parentheses and followed by a single space (ie. '(.venv) '). - -.Example -Activate.ps1 -Activates the Python virtual environment that contains the Activate.ps1 script. - -.Example -Activate.ps1 -Verbose -Activates the Python virtual environment that contains the Activate.ps1 script, -and shows extra information about the activation as it executes. - -.Example -Activate.ps1 -VenvDir C:\Users\MyUser\Common\.venv -Activates the Python virtual environment located in the specified location. - -.Example -Activate.ps1 -Prompt "MyPython" -Activates the Python virtual environment that contains the Activate.ps1 script, -and prefixes the current prompt with the specified string (surrounded in -parentheses) while the virtual environment is active. - -.Notes -On Windows, it may be required to enable this Activate.ps1 script by setting the -execution policy for the user. You can do this by issuing the following PowerShell -command: - -PS C:\> Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUser - -For more information on Execution Policies: -https://go.microsoft.com/fwlink/?LinkID=135170 - -#> -Param( - [Parameter(Mandatory = $false)] - [String] - $VenvDir, - [Parameter(Mandatory = $false)] - [String] - $Prompt -) - -<# Function declarations --------------------------------------------------- #> - -<# -.Synopsis -Remove all shell session elements added by the Activate script, including the -addition of the virtual environment's Python executable from the beginning of -the PATH variable. - -.Parameter NonDestructive -If present, do not remove this function from the global namespace for the -session. - -#> -function global:deactivate ([switch]$NonDestructive) { - # Revert to original values - - # The prior prompt: - if (Test-Path -Path Function:_OLD_VIRTUAL_PROMPT) { - Copy-Item -Path Function:_OLD_VIRTUAL_PROMPT -Destination Function:prompt - Remove-Item -Path Function:_OLD_VIRTUAL_PROMPT - } - - # The prior PYTHONHOME: - if (Test-Path -Path Env:_OLD_VIRTUAL_PYTHONHOME) { - Copy-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME -Destination Env:PYTHONHOME - Remove-Item -Path Env:_OLD_VIRTUAL_PYTHONHOME - } - - # The prior PATH: - if (Test-Path -Path Env:_OLD_VIRTUAL_PATH) { - Copy-Item -Path Env:_OLD_VIRTUAL_PATH -Destination Env:PATH - Remove-Item -Path Env:_OLD_VIRTUAL_PATH - } - - # Just remove the VIRTUAL_ENV altogether: - if (Test-Path -Path Env:VIRTUAL_ENV) { - Remove-Item -Path env:VIRTUAL_ENV - } - - # Just remove VIRTUAL_ENV_PROMPT altogether. - if (Test-Path -Path Env:VIRTUAL_ENV_PROMPT) { - Remove-Item -Path env:VIRTUAL_ENV_PROMPT - } - - # Just remove the _PYTHON_VENV_PROMPT_PREFIX altogether: - if (Get-Variable -Name "_PYTHON_VENV_PROMPT_PREFIX" -ErrorAction SilentlyContinue) { - Remove-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Scope Global -Force - } - - # Leave deactivate function in the global namespace if requested: - if (-not $NonDestructive) { - Remove-Item -Path function:deactivate - } -} - -<# -.Description -Get-PyVenvConfig parses the values from the pyvenv.cfg file located in the -given folder, and returns them in a map. - -For each line in the pyvenv.cfg file, if that line can be parsed into exactly -two strings separated by `=` (with any amount of whitespace surrounding the =) -then it is considered a `key = value` line. The left hand string is the key, -the right hand is the value. - -If the value starts with a `'` or a `"` then the first and last character is -stripped from the value before being captured. - -.Parameter ConfigDir -Path to the directory that contains the `pyvenv.cfg` file. -#> -function Get-PyVenvConfig( - [String] - $ConfigDir -) { - Write-Verbose "Given ConfigDir=$ConfigDir, obtain values in pyvenv.cfg" - - # Ensure the file exists, and issue a warning if it doesn't (but still allow the function to continue). - $pyvenvConfigPath = Join-Path -Resolve -Path $ConfigDir -ChildPath 'pyvenv.cfg' -ErrorAction Continue - - # An empty map will be returned if no config file is found. - $pyvenvConfig = @{ } - - if ($pyvenvConfigPath) { - - Write-Verbose "File exists, parse `key = value` lines" - $pyvenvConfigContent = Get-Content -Path $pyvenvConfigPath - - $pyvenvConfigContent | ForEach-Object { - $keyval = $PSItem -split "\s*=\s*", 2 - if ($keyval[0] -and $keyval[1]) { - $val = $keyval[1] - - # Remove extraneous quotations around a string value. - if ("'""".Contains($val.Substring(0, 1))) { - $val = $val.Substring(1, $val.Length - 2) - } - - $pyvenvConfig[$keyval[0]] = $val - Write-Verbose "Adding Key: '$($keyval[0])'='$val'" - } - } - } - return $pyvenvConfig -} - - -<# Begin Activate script --------------------------------------------------- #> - -# Determine the containing directory of this script -$VenvExecPath = Split-Path -Parent $MyInvocation.MyCommand.Definition -$VenvExecDir = Get-Item -Path $VenvExecPath - -Write-Verbose "Activation script is located in path: '$VenvExecPath'" -Write-Verbose "VenvExecDir Fullname: '$($VenvExecDir.FullName)" -Write-Verbose "VenvExecDir Name: '$($VenvExecDir.Name)" - -# Set values required in priority: CmdLine, ConfigFile, Default -# First, get the location of the virtual environment, it might not be -# VenvExecDir if specified on the command line. -if ($VenvDir) { - Write-Verbose "VenvDir given as parameter, using '$VenvDir' to determine values" -} -else { - Write-Verbose "VenvDir not given as a parameter, using parent directory name as VenvDir." - $VenvDir = $VenvExecDir.Parent.FullName.TrimEnd("\\/") - Write-Verbose "VenvDir=$VenvDir" -} - -# Next, read the `pyvenv.cfg` file to determine any required value such -# as `prompt`. -$pyvenvCfg = Get-PyVenvConfig -ConfigDir $VenvDir - -# Next, set the prompt from the command line, or the config file, or -# just use the name of the virtual environment folder. -if ($Prompt) { - Write-Verbose "Prompt specified as argument, using '$Prompt'" -} -else { - Write-Verbose "Prompt not specified as argument to script, checking pyvenv.cfg value" - if ($pyvenvCfg -and $pyvenvCfg['prompt']) { - Write-Verbose " Setting based on value in pyvenv.cfg='$($pyvenvCfg['prompt'])'" - $Prompt = $pyvenvCfg['prompt']; - } - else { - Write-Verbose " Setting prompt based on parent's directory's name. (Is the directory name passed to venv module when creating the virtual environment)" - Write-Verbose " Got leaf-name of $VenvDir='$(Split-Path -Path $venvDir -Leaf)'" - $Prompt = Split-Path -Path $venvDir -Leaf - } -} - -Write-Verbose "Prompt = '$Prompt'" -Write-Verbose "VenvDir='$VenvDir'" - -# Deactivate any currently active virtual environment, but leave the -# deactivate function in place. -deactivate -nondestructive - -# Now set the environment variable VIRTUAL_ENV, used by many tools to determine -# that there is an activated venv. -$env:VIRTUAL_ENV = $VenvDir - -if (-not $Env:VIRTUAL_ENV_DISABLE_PROMPT) { - - Write-Verbose "Setting prompt to '$Prompt'" - - # Set the prompt to include the env name - # Make sure _OLD_VIRTUAL_PROMPT is global - function global:_OLD_VIRTUAL_PROMPT { "" } - Copy-Item -Path function:prompt -Destination function:_OLD_VIRTUAL_PROMPT - New-Variable -Name _PYTHON_VENV_PROMPT_PREFIX -Description "Python virtual environment prompt prefix" -Scope Global -Option ReadOnly -Visibility Public -Value $Prompt - - function global:prompt { - Write-Host -NoNewline -ForegroundColor Green "($_PYTHON_VENV_PROMPT_PREFIX) " - _OLD_VIRTUAL_PROMPT - } - $env:VIRTUAL_ENV_PROMPT = $Prompt -} - -# Clear PYTHONHOME -if (Test-Path -Path Env:PYTHONHOME) { - Copy-Item -Path Env:PYTHONHOME -Destination Env:_OLD_VIRTUAL_PYTHONHOME - Remove-Item -Path Env:PYTHONHOME -} - -# Add the venv to the PATH -Copy-Item -Path Env:PATH -Destination Env:_OLD_VIRTUAL_PATH -$Env:PATH = "$VenvExecDir$([System.IO.Path]::PathSeparator)$Env:PATH" diff --git a/python/etlv2/myenv/bin/activate b/python/etlv2/myenv/bin/activate deleted file mode 100644 index 8e45217..0000000 --- a/python/etlv2/myenv/bin/activate +++ /dev/null @@ -1,69 +0,0 @@ -# This file must be used with "source bin/activate" *from bash* -# you cannot run it directly - -deactivate () { - # reset old environment variables - if [ -n "${_OLD_VIRTUAL_PATH:-}" ] ; then - PATH="${_OLD_VIRTUAL_PATH:-}" - export PATH - unset _OLD_VIRTUAL_PATH - fi - if [ -n "${_OLD_VIRTUAL_PYTHONHOME:-}" ] ; then - PYTHONHOME="${_OLD_VIRTUAL_PYTHONHOME:-}" - export PYTHONHOME - unset _OLD_VIRTUAL_PYTHONHOME - fi - - # This should detect bash and zsh, which have a hash command that must - # be called to get it to forget past commands. Without forgetting - # past commands the $PATH changes we made may not be respected - if [ -n "${BASH:-}" -o -n "${ZSH_VERSION:-}" ] ; then - hash -r 2> /dev/null - fi - - if [ -n "${_OLD_VIRTUAL_PS1:-}" ] ; then - PS1="${_OLD_VIRTUAL_PS1:-}" - export PS1 - unset _OLD_VIRTUAL_PS1 - fi - - unset VIRTUAL_ENV - unset VIRTUAL_ENV_PROMPT - if [ ! "${1:-}" = "nondestructive" ] ; then - # Self destruct! - unset -f deactivate - fi -} - -# unset irrelevant variables -deactivate nondestructive - -VIRTUAL_ENV="/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv" -export VIRTUAL_ENV - -_OLD_VIRTUAL_PATH="$PATH" -PATH="$VIRTUAL_ENV/bin:$PATH" -export PATH - -# unset PYTHONHOME if set -# this will fail if PYTHONHOME is set to the empty string (which is bad anyway) -# could use `if (set -u; : $PYTHONHOME) ;` in bash -if [ -n "${PYTHONHOME:-}" ] ; then - _OLD_VIRTUAL_PYTHONHOME="${PYTHONHOME:-}" - unset PYTHONHOME -fi - -if [ -z "${VIRTUAL_ENV_DISABLE_PROMPT:-}" ] ; then - _OLD_VIRTUAL_PS1="${PS1:-}" - PS1="(myenv) ${PS1:-}" - export PS1 - VIRTUAL_ENV_PROMPT="(myenv) " - export VIRTUAL_ENV_PROMPT -fi - -# This should detect bash and zsh, which have a hash command that must -# be called to get it to forget past commands. Without forgetting -# past commands the $PATH changes we made may not be respected -if [ -n "${BASH:-}" -o -n "${ZSH_VERSION:-}" ] ; then - hash -r 2> /dev/null -fi diff --git a/python/etlv2/myenv/bin/activate.csh b/python/etlv2/myenv/bin/activate.csh deleted file mode 100644 index a92802e..0000000 --- a/python/etlv2/myenv/bin/activate.csh +++ /dev/null @@ -1,26 +0,0 @@ -# This file must be used with "source bin/activate.csh" *from csh*. -# You cannot run it directly. -# Created by Davide Di Blasi . -# Ported to Python 3.3 venv by Andrew Svetlov - -alias deactivate 'test $?_OLD_VIRTUAL_PATH != 0 && setenv PATH "$_OLD_VIRTUAL_PATH" && unset _OLD_VIRTUAL_PATH; rehash; test $?_OLD_VIRTUAL_PROMPT != 0 && set prompt="$_OLD_VIRTUAL_PROMPT" && unset _OLD_VIRTUAL_PROMPT; unsetenv VIRTUAL_ENV; unsetenv VIRTUAL_ENV_PROMPT; test "\!:*" != "nondestructive" && unalias deactivate' - -# Unset irrelevant variables. -deactivate nondestructive - -setenv VIRTUAL_ENV "/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv" - -set _OLD_VIRTUAL_PATH="$PATH" -setenv PATH "$VIRTUAL_ENV/bin:$PATH" - - -set _OLD_VIRTUAL_PROMPT="$prompt" - -if (! "$?VIRTUAL_ENV_DISABLE_PROMPT") then - set prompt = "(myenv) $prompt" - setenv VIRTUAL_ENV_PROMPT "(myenv) " -endif - -alias pydoc python -m pydoc - -rehash diff --git a/python/etlv2/myenv/bin/activate.fish b/python/etlv2/myenv/bin/activate.fish deleted file mode 100644 index 0dd3170..0000000 --- a/python/etlv2/myenv/bin/activate.fish +++ /dev/null @@ -1,69 +0,0 @@ -# This file must be used with "source /bin/activate.fish" *from fish* -# (https://fishshell.com/); you cannot run it directly. - -function deactivate -d "Exit virtual environment and return to normal shell environment" - # reset old environment variables - if test -n "$_OLD_VIRTUAL_PATH" - set -gx PATH $_OLD_VIRTUAL_PATH - set -e _OLD_VIRTUAL_PATH - end - if test -n "$_OLD_VIRTUAL_PYTHONHOME" - set -gx PYTHONHOME $_OLD_VIRTUAL_PYTHONHOME - set -e _OLD_VIRTUAL_PYTHONHOME - end - - if test -n "$_OLD_FISH_PROMPT_OVERRIDE" - set -e _OLD_FISH_PROMPT_OVERRIDE - # prevents error when using nested fish instances (Issue #93858) - if functions -q _old_fish_prompt - functions -e fish_prompt - functions -c _old_fish_prompt fish_prompt - functions -e _old_fish_prompt - end - end - - set -e VIRTUAL_ENV - set -e VIRTUAL_ENV_PROMPT - if test "$argv[1]" != "nondestructive" - # Self-destruct! - functions -e deactivate - end -end - -# Unset irrelevant variables. -deactivate nondestructive - -set -gx VIRTUAL_ENV "/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv" - -set -gx _OLD_VIRTUAL_PATH $PATH -set -gx PATH "$VIRTUAL_ENV/bin" $PATH - -# Unset PYTHONHOME if set. -if set -q PYTHONHOME - set -gx _OLD_VIRTUAL_PYTHONHOME $PYTHONHOME - set -e PYTHONHOME -end - -if test -z "$VIRTUAL_ENV_DISABLE_PROMPT" - # fish uses a function instead of an env var to generate the prompt. - - # Save the current fish_prompt function as the function _old_fish_prompt. - functions -c fish_prompt _old_fish_prompt - - # With the original prompt function renamed, we can override with our own. - function fish_prompt - # Save the return status of the last command. - set -l old_status $status - - # Output the venv prompt; color taken from the blue of the Python logo. - printf "%s%s%s" (set_color 4B8BBE) "(myenv) " (set_color normal) - - # Restore the return status of the previous command. - echo "exit $old_status" | . - # Output the original/"old" prompt. - _old_fish_prompt - end - - set -gx _OLD_FISH_PROMPT_OVERRIDE "$VIRTUAL_ENV" - set -gx VIRTUAL_ENV_PROMPT "(myenv) " -end diff --git a/python/etlv2/myenv/bin/dotenv b/python/etlv2/myenv/bin/dotenv deleted file mode 100755 index 8679cf5..0000000 --- a/python/etlv2/myenv/bin/dotenv +++ /dev/null @@ -1,8 +0,0 @@ -#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 -# -*- coding: utf-8 -*- -import re -import sys -from dotenv.__main__ import cli -if __name__ == '__main__': - sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) - sys.exit(cli()) diff --git a/python/etlv2/myenv/bin/f2py b/python/etlv2/myenv/bin/f2py deleted file mode 100755 index ddf280e..0000000 --- a/python/etlv2/myenv/bin/f2py +++ /dev/null @@ -1,8 +0,0 @@ -#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 -# -*- coding: utf-8 -*- -import re -import sys -from numpy.f2py.f2py2e import main -if __name__ == '__main__': - sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) - sys.exit(main()) diff --git a/python/etlv2/myenv/bin/pip b/python/etlv2/myenv/bin/pip deleted file mode 100755 index 36d0e9a..0000000 --- a/python/etlv2/myenv/bin/pip +++ /dev/null @@ -1,8 +0,0 @@ -#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 -# -*- coding: utf-8 -*- -import re -import sys -from pip._internal.cli.main import main -if __name__ == '__main__': - sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) - sys.exit(main()) diff --git a/python/etlv2/myenv/bin/pip3 b/python/etlv2/myenv/bin/pip3 deleted file mode 100755 index 36d0e9a..0000000 --- a/python/etlv2/myenv/bin/pip3 +++ /dev/null @@ -1,8 +0,0 @@ -#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 -# -*- coding: utf-8 -*- -import re -import sys -from pip._internal.cli.main import main -if __name__ == '__main__': - sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) - sys.exit(main()) diff --git a/python/etlv2/myenv/bin/pip3.11 b/python/etlv2/myenv/bin/pip3.11 deleted file mode 100755 index 36d0e9a..0000000 --- a/python/etlv2/myenv/bin/pip3.11 +++ /dev/null @@ -1,8 +0,0 @@ -#!/Users/sola.akanmu/greenstand/treetracker-functions/python/etlv2/myenv/bin/python3.11 -# -*- coding: utf-8 -*- -import re -import sys -from pip._internal.cli.main import main -if __name__ == '__main__': - sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) - sys.exit(main()) diff --git a/python/etlv2/myenv/bin/python b/python/etlv2/myenv/bin/python deleted file mode 120000 index 6e7f3c7..0000000 --- a/python/etlv2/myenv/bin/python +++ /dev/null @@ -1 +0,0 @@ -python3.11 \ No newline at end of file diff --git a/python/etlv2/myenv/bin/python3 b/python/etlv2/myenv/bin/python3 deleted file mode 120000 index 6e7f3c7..0000000 --- a/python/etlv2/myenv/bin/python3 +++ /dev/null @@ -1 +0,0 @@ -python3.11 \ No newline at end of file diff --git a/python/etlv2/myenv/bin/python3.11 b/python/etlv2/myenv/bin/python3.11 deleted file mode 120000 index 34556ec..0000000 --- a/python/etlv2/myenv/bin/python3.11 +++ /dev/null @@ -1 +0,0 @@ -/usr/local/opt/python@3.11/bin/python3.11 \ No newline at end of file diff --git a/python/etlv2/myenv/pyvenv.cfg b/python/etlv2/myenv/pyvenv.cfg deleted file mode 100644 index 0537ffc..0000000 --- a/python/etlv2/myenv/pyvenv.cfg +++ /dev/null @@ -1,3 +0,0 @@ -home = /usr/bin -include-system-site-packages = false -version = 3.10.12 From 4c37ea715285f77d10af65b32191077f062aab30 Mon Sep 17 00:00:00 2001 From: Zexi Yin Date: Sat, 20 Jul 2024 15:51:47 -0700 Subject: [PATCH 11/11] clean ups --- python/etlv2/migration_job.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/etlv2/migration_job.py b/python/etlv2/migration_job.py index 01341f8..d84182b 100644 --- a/python/etlv2/migration_job.py +++ b/python/etlv2/migration_job.py @@ -142,7 +142,8 @@ def fetch_and_transform(conn, table_name: str, src_table_columns: List[str], des filter_column = "id" if table_name == "entity" else "organization_id" if table_name == "planter" else "planter_id" entity_ids_str = ", ".join(map(str, entity_ids)) query += f"{filter_column} IN ({entity_ids_str})" - print(query) + else: + logger.warn("User did not specify any entity ids, fetching all columns...") cursor.execute(query) while True: fetched_data = cursor.fetchmany(batch_size) @@ -217,7 +218,7 @@ def insert_table_data(conn, table_name, col_names, data) -> None: dest_conn = psycopg2.connect(**dst_db_info) register_all_udts(src_conn, dest_conn) - entity_ids = [1] + entity_ids = [] TABLES = ['entity', 'planter', 'trees'] try: # the following is needed if we need to create user defined types that isn't present the the dest database