Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message to sftp #359

Merged
merged 7 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deployment/.template.env
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,10 @@ TAHMO_API_PASSWORD=
# https://windbornesystems.com/ API Credential
WIND_BORNE_SYSTEMS_USERNAME=
WIND_BORNE_SYSTEMS_PASSWORD=

# SFTP server config
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USERNAME=user
SFTP_PASSWORD=password
SFTP_REMOTE_PATH=upload
3 changes: 3 additions & 0 deletions deployment/docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ dask-geopandas==0.4.2
duckdb==1.1.3
pyarrow==18.1.0

# paramiko (ssh)
paramiko==3.5.0

# djngo-import-export
django-import-export==4.3.4
django-import-export-celery==1.7.1
9 changes: 9 additions & 0 deletions django_project/core/settings/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,12 @@
)

MIDDLEWARE = MIDDLEWARE + MIDDLEWARE_CELERY

# SFTP settings
SFTP_HOST = os.getenv("SFTP_HOST", "127.0.0.1") # Default: localhost
SFTP_PORT = int(os.getenv("SFTP_PORT", "2222")) # Default: 2222
SFTP_USERNAME = os.getenv("SFTP_USERNAME", "user")
SFTP_PASSWORD = os.getenv("SFTP_PASSWORD", "password")
SFTP_REMOTE_PATH = os.getenv(
"SFTP_REMOTE_PATH", "upload"
)
34 changes: 34 additions & 0 deletions django_project/dcas/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
.. note:: DCAS Outputs
"""

import paramiko
import os
import shutil
import fsspec
Expand All @@ -26,6 +27,7 @@
GRID_DATA = 1
GRID_CROP_DATA = 2
FARM_CROP_DATA = 3
MESSAGE_DATA = 4


class DCASPipelineOutput:
Expand Down Expand Up @@ -190,6 +192,38 @@
print(f'writing dataframe to {file_path}')
df.to_parquet(file_path)

def _upload_to_sftp(self, local_file):
"""Upload CSV file to Docker SFTP."""
try:
print(f'Connecting to SFTP server at '
f'{settings.SFTP_HOST}:{settings.SFTP_PORT}...')
transport = paramiko.Transport(
(settings.SFTP_HOST, settings.SFTP_PORT)
)
transport.connect(
username=settings.SFTP_USERNAME,
password=settings.SFTP_PASSWORD
)

sftp = paramiko.SFTPClient.from_transport(transport)

# Ensure correct remote path
remote_file_path = (
f"{settings.SFTP_REMOTE_PATH}/{os.path.basename(local_file)}"
)
print(f"Uploading {local_file} to {remote_file_path}...")

sftp.put(local_file, remote_file_path) # Upload file

print("Upload to Docker SFTP successful!")

# Close connection
sftp.close()
transport.close()

except Exception as e:
print(f"Failed to upload to SFTP: {e}")

Check warning on line 225 in django_project/dcas/outputs.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/outputs.py#L224-L225

Added lines #L224 - L225 were not covered by tests

def _get_connection(self, s3):
endpoint = s3['AWS_ENDPOINT_URL']
if settings.DEBUG:
Expand Down
8 changes: 7 additions & 1 deletion django_project/dcas/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@

return file_path

def send_csv_to_sftp(self, file_path):
"""Upload the given CSV file to SFTP."""
self.data_output._upload_to_sftp(file_path)

Check warning on line 450 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L450

Added line #L450 was not covered by tests

def run(self):
"""Run data pipeline."""
self.setup()
Expand All @@ -454,7 +458,9 @@
self.process_grid_crop_data()

self.process_farm_registry_data()
self.extract_csv_output()
csv_file = self.extract_csv_output()

Check warning on line 461 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L461

Added line #L461 was not covered by tests

self.send_csv_to_sftp(csv_file)

Check warning on line 463 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L463

Added line #L463 was not covered by tests

self.cleanup_gdd_matrix()

Expand Down
59 changes: 59 additions & 0 deletions django_project/dcas/tests/test_pipeline_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,62 @@ def test_convert_to_csv(self, mock_duckdb_connect):
mock_conn.sql.assert_called_once()
mock_conn.close.assert_called_once()
self.assertIn('output_20250101.csv', csv_file)

@patch("paramiko.SFTPClient.from_transport")
@patch("paramiko.Transport", autospec=True)
@patch("dcas.outputs.settings")
def test_upload_to_sftp(
self,
mock_settings,
mock_transport,
mock_sftp_from_transport
):
"""Test that SFTP upload is triggered correctly."""
# Set mock environment variables in `settings`
mock_settings.SFTP_HOST = "127.0.0.1"
mock_settings.SFTP_PORT = 2222
mock_settings.SFTP_USERNAME = "user"
mock_settings.SFTP_PASSWORD = "password"
mock_settings.SFTP_REMOTE_PATH = "upload"

# Mock the transport instance
mock_transport_instance = mock_transport.return_value
mock_transport_instance.connect.return_value = None

# Mock the SFTP client and `put()` method
mock_sftp_instance = MagicMock()
mock_sftp_from_transport.return_value = mock_sftp_instance
mock_sftp_instance.put.return_value = None

# Initialize the pipeline output
pipeline_output = DCASPipelineOutput(request_date="2025-01-15")

# Create a temporary file for testing
test_file = "/tmp/test_message_data.csv"
with open(test_file, "w") as f:
f.write("test message")

# Call the method under test
pipeline_output._upload_to_sftp(test_file)

# Assertions
# Verify `connect()` was called once
mock_transport_instance.connect.assert_called_once_with(
username=mock_settings.SFTP_USERNAME,
password=mock_settings.SFTP_PASSWORD
)

# Verify `SFTPClient.from_transport()` was called once
mock_sftp_from_transport.assert_called_once_with(
mock_transport_instance
)

# Verify file was uploaded via `put()`
mock_sftp_instance.put.assert_called_once_with(
test_file,
f'{mock_settings.SFTP_REMOTE_PATH}/test_message_data.csv'
)

# Verify SFTP client & transport were properly closed
mock_sftp_instance.close.assert_called_once()
mock_transport_instance.close.assert_called_once()
Loading