diff --git a/deployment/.template.env b/deployment/.template.env index 57a0dfe4..f74e4c2a 100644 --- a/deployment/.template.env +++ b/deployment/.template.env @@ -90,3 +90,10 @@ TAHMO_API_PASSWORD= # https://windbornesystems.com/ API Credential WIND_BORNE_SYSTEMS_USERNAME= WIND_BORNE_SYSTEMS_PASSWORD= + +# SFTP server config +SFTP_HOST=127.0.0.1 +SFTP_PORT=2222 +SFTP_USERNAME=user +SFTP_PASSWORD=password +SFTP_REMOTE_PATH=upload diff --git a/deployment/docker/requirements.txt b/deployment/docker/requirements.txt index 77597398..9acf221b 100644 --- a/deployment/docker/requirements.txt +++ b/deployment/docker/requirements.txt @@ -100,6 +100,9 @@ dask-geopandas==0.4.2 duckdb==1.1.3 pyarrow==18.1.0 +# paramiko (ssh) +paramiko==3.5.0 + # djngo-import-export django-import-export==4.3.4 django-import-export-celery==1.7.1 diff --git a/django_project/core/settings/project.py b/django_project/core/settings/project.py index a76f4807..56521c02 100644 --- a/django_project/core/settings/project.py +++ b/django_project/core/settings/project.py @@ -104,3 +104,12 @@ ) MIDDLEWARE = MIDDLEWARE + MIDDLEWARE_CELERY + +# SFTP settings +SFTP_HOST = os.getenv("SFTP_HOST", "127.0.0.1") # Default: localhost +SFTP_PORT = int(os.getenv("SFTP_PORT", "2222")) # Default: 2222 +SFTP_USERNAME = os.getenv("SFTP_USERNAME", "user") +SFTP_PASSWORD = os.getenv("SFTP_PASSWORD", "password") +SFTP_REMOTE_PATH = os.getenv( + "SFTP_REMOTE_PATH", "upload" +) diff --git a/django_project/dcas/outputs.py b/django_project/dcas/outputs.py index 71ac5bf4..872c4b0d 100644 --- a/django_project/dcas/outputs.py +++ b/django_project/dcas/outputs.py @@ -5,6 +5,7 @@ .. note:: DCAS Outputs """ +import paramiko import os import shutil import fsspec @@ -26,6 +27,7 @@ class OutputType: GRID_DATA = 1 GRID_CROP_DATA = 2 FARM_CROP_DATA = 3 + MESSAGE_DATA = 4 class DCASPipelineOutput: @@ -190,6 +192,38 @@ def _save_grid_data(self, df: pd.DataFrame): print(f'writing dataframe to {file_path}') df.to_parquet(file_path) + def _upload_to_sftp(self, local_file): + """Upload CSV file to Docker SFTP.""" + try: + print(f'Connecting to SFTP server at ' + f'{settings.SFTP_HOST}:{settings.SFTP_PORT}...') + transport = paramiko.Transport( + (settings.SFTP_HOST, settings.SFTP_PORT) + ) + transport.connect( + username=settings.SFTP_USERNAME, + password=settings.SFTP_PASSWORD + ) + + sftp = paramiko.SFTPClient.from_transport(transport) + + # Ensure correct remote path + remote_file_path = ( + f"{settings.SFTP_REMOTE_PATH}/{os.path.basename(local_file)}" + ) + print(f"Uploading {local_file} to {remote_file_path}...") + + sftp.put(local_file, remote_file_path) # Upload file + + print("Upload to Docker SFTP successful!") + + # Close connection + sftp.close() + transport.close() + + except Exception as e: + print(f"Failed to upload to SFTP: {e}") + def _get_connection(self, s3): endpoint = s3['AWS_ENDPOINT_URL'] if settings.DEBUG: diff --git a/django_project/dcas/pipeline.py b/django_project/dcas/pipeline.py index 76d2edf3..cb657e00 100644 --- a/django_project/dcas/pipeline.py +++ b/django_project/dcas/pipeline.py @@ -445,6 +445,10 @@ def extract_csv_output(self): return file_path + def send_csv_to_sftp(self, file_path): + """Upload the given CSV file to SFTP.""" + self.data_output._upload_to_sftp(file_path) + def run(self): """Run data pipeline.""" self.setup() @@ -454,7 +458,9 @@ def run(self): self.process_grid_crop_data() self.process_farm_registry_data() - self.extract_csv_output() + csv_file = self.extract_csv_output() + + self.send_csv_to_sftp(csv_file) self.cleanup_gdd_matrix() diff --git a/django_project/dcas/tests/test_pipeline_outputs.py b/django_project/dcas/tests/test_pipeline_outputs.py index d8214af0..af3a74f5 100644 --- a/django_project/dcas/tests/test_pipeline_outputs.py +++ b/django_project/dcas/tests/test_pipeline_outputs.py @@ -45,3 +45,62 @@ def test_convert_to_csv(self, mock_duckdb_connect): mock_conn.sql.assert_called_once() mock_conn.close.assert_called_once() self.assertIn('output_20250101.csv', csv_file) + + @patch("paramiko.SFTPClient.from_transport") + @patch("paramiko.Transport", autospec=True) + @patch("dcas.outputs.settings") + def test_upload_to_sftp( + self, + mock_settings, + mock_transport, + mock_sftp_from_transport + ): + """Test that SFTP upload is triggered correctly.""" + # Set mock environment variables in `settings` + mock_settings.SFTP_HOST = "127.0.0.1" + mock_settings.SFTP_PORT = 2222 + mock_settings.SFTP_USERNAME = "user" + mock_settings.SFTP_PASSWORD = "password" + mock_settings.SFTP_REMOTE_PATH = "upload" + + # Mock the transport instance + mock_transport_instance = mock_transport.return_value + mock_transport_instance.connect.return_value = None + + # Mock the SFTP client and `put()` method + mock_sftp_instance = MagicMock() + mock_sftp_from_transport.return_value = mock_sftp_instance + mock_sftp_instance.put.return_value = None + + # Initialize the pipeline output + pipeline_output = DCASPipelineOutput(request_date="2025-01-15") + + # Create a temporary file for testing + test_file = "/tmp/test_message_data.csv" + with open(test_file, "w") as f: + f.write("test message") + + # Call the method under test + pipeline_output._upload_to_sftp(test_file) + + # Assertions + # Verify `connect()` was called once + mock_transport_instance.connect.assert_called_once_with( + username=mock_settings.SFTP_USERNAME, + password=mock_settings.SFTP_PASSWORD + ) + + # Verify `SFTPClient.from_transport()` was called once + mock_sftp_from_transport.assert_called_once_with( + mock_transport_instance + ) + + # Verify file was uploaded via `put()` + mock_sftp_instance.put.assert_called_once_with( + test_file, + f'{mock_settings.SFTP_REMOTE_PATH}/test_message_data.csv' + ) + + # Verify SFTP client & transport were properly closed + mock_sftp_instance.close.assert_called_once() + mock_transport_instance.close.assert_called_once()