Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add task to run pipeline #373

Merged
merged 16 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions django_project/dcas/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from import_export.admin import ExportMixin
from import_export_celery.admin_actions import create_export_job_action
from django.contrib import admin
from django.contrib import admin, messages

from dcas.models import (
DCASConfig,
Expand All @@ -20,6 +20,8 @@
GDDMatrix
)
from dcas.resources import DCASErrorLogResource
from core.utils.file import format_size
from dcas.tasks import run_dcas, export_dcas_minio, export_dcas_sftp


class ConfigByCountryInline(admin.TabularInline):
Expand Down Expand Up @@ -51,20 +53,68 @@
)


@admin.action(description='Trigger DCAS processing')
def trigger_dcas_processing(modeladmin, request, queryset):
"""Trigger dcas processing."""
run_dcas.delay(queryset.first().id)
modeladmin.message_user(

Check warning on line 60 in django_project/dcas/admin.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/admin.py#L59-L60

Added lines #L59 - L60 were not covered by tests
request,
'Process will be started in background!',
messages.SUCCESS
)


@admin.action(description='Send DCAS output to minio')
def trigger_dcas_output_to_minio(modeladmin, request, queryset):
"""Send DCAS output to minio."""
export_dcas_minio.delay(queryset.first().id)
modeladmin.message_user(

Check warning on line 71 in django_project/dcas/admin.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/admin.py#L70-L71

Added lines #L70 - L71 were not covered by tests
request,
'Process will be started in background!',
messages.SUCCESS
)


@admin.action(description='Send DCAS output to sftp')
def trigger_dcas_output_to_sftp(modeladmin, request, queryset):
"""Send DCAS output to sftp."""
export_dcas_sftp.delay(queryset.first().id)
modeladmin.message_user(

Check warning on line 82 in django_project/dcas/admin.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/admin.py#L81-L82

Added lines #L81 - L82 were not covered by tests
request,
'Process will be started in background!',
messages.SUCCESS
)


@admin.register(DCASRequest)
class DCASRequestAdmin(admin.ModelAdmin):
"""Admin page for DCASRequest."""

list_display = ('requested_at', 'country', 'start_time', 'end_time')
list_display = ('requested_at', 'start_time', 'end_time', 'status')
list_filter = ('country',)
actions = (
trigger_dcas_processing,
trigger_dcas_output_to_minio,
trigger_dcas_output_to_sftp
)


@admin.register(DCASOutput)
class DCASOutputAdmin(admin.ModelAdmin):
"""Admin page for DCASOutput."""

list_display = ('delivered_at', 'request', 'file_name', 'status')
list_filter = ('request', 'status')
list_display = (
'delivered_at', 'request',
'file_name', 'status',
'get_size', 'delivery_by')
list_filter = ('request', 'status', 'delivery_by')

def get_size(self, obj: DCASOutput):
"""Get the size."""
return format_size(obj.size)

Check warning on line 114 in django_project/dcas/admin.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/admin.py#L114

Added line #L114 was not covered by tests

get_size.short_description = 'Size'
get_size.admin_order_field = 'size'


@admin.register(DCASErrorLog)
Expand Down
4 changes: 4 additions & 0 deletions django_project/dcas/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ class DcasConfig(AppConfig):

default_auto_field = 'django.db.models.BigAutoField'
name = 'dcas'

def ready(self):
"""App ready handler."""
from dcas.tasks import run_dcas, export_dcas_sftp, export_dcas_minio # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ def handle(self, *args, **options):
dt = datetime.date(2024, 12, 1)
farm_registry_group = FarmRegistryGroup.objects.get(id=1)

pipeline = DCASDataPipeline(farm_registry_group, dt)
pipeline = DCASDataPipeline([farm_registry_group.id], dt)

pipeline.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Generated by Django 4.2.7 on 2025-01-29 05:47

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('gap', '0045_preferences_dcas_config_and_more'),
('dcas', '0006_dcaserrorlog_error_type'),
]

operations = [
migrations.AddField(
model_name='dcasoutput',
name='delivery_by',
field=models.CharField(blank=True, choices=[('SFTP', 'SFTP'), ('OBJECT_STORAGE', 'OBJECT_STORAGE')], default='SFTP', help_text='The type of delivery.', max_length=255, null=True),
),
migrations.AddField(
model_name='dcasoutput',
name='path',
field=models.TextField(blank=True, help_text='Full path to the uploaded file.', null=True),
),
migrations.AddField(
model_name='dcasoutput',
name='size',
field=models.PositiveBigIntegerField(default=0),
),
migrations.AddField(
model_name='dcasrequest',
name='config',
field=models.JSONField(blank=True, default=dict, null=True),
),
migrations.AddField(
model_name='dcasrequest',
name='progress_text',
field=models.TextField(blank=True, help_text='Progress or note from the pipeline.', null=True),
),
migrations.AddField(
model_name='dcasrequest',
name='status',
field=models.CharField(choices=[('Pending', 'Pending'), ('Queued', 'Queued'), ('Running', 'Running'), ('Stopped', 'Stopped with error'), ('Completed', 'Completed'), ('Cancelled', 'Cancelled'), ('Invalidated', 'Invalidated')], default='Pending', help_text='The status of the process.', max_length=255),
),
migrations.AlterField(
model_name='dcasrequest',
name='country',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='gap.country'),
),
]
23 changes: 22 additions & 1 deletion django_project/dcas/models/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
from dcas.models.request import DCASRequest


class DCASDeliveryMethod(models.TextChoices):
"""Delivery method choices."""

SFTP = 'SFTP', _('SFTP')
OBJECT_STORAGE = 'OBJECT_STORAGE', _('OBJECT_STORAGE')


class DCASOutput(models.Model):
"""Model to track the delivery of file output to SFTP."""

Expand All @@ -34,12 +41,26 @@ class DCASOutput(models.Model):
)
status = models.CharField(
max_length=255,
null=True,
blank=True,
choices=TaskStatus.choices,
default=TaskStatus.PENDING,
help_text="The delivery status of the file."
)
path = models.TextField(
null=True,
blank=True,
help_text="The delivery status of the file."
help_text="Full path to the uploaded file."
)
delivery_by = models.CharField(
null=True,
blank=True,
max_length=255,
choices=DCASDeliveryMethod.choices,
default=DCASDeliveryMethod.SFTP,
help_text="The type of delivery."
)
size = models.PositiveBigIntegerField(default=0)

class Meta:
"""Meta class for DCASOutput."""
Expand Down
18 changes: 17 additions & 1 deletion django_project/dcas/models/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.utils import timezone
from django.utils.translation import gettext_lazy as _

from core.models.background_task import TaskStatus
from gap.models.common import Country


Expand All @@ -20,7 +21,10 @@ class DCASRequest(models.Model):
help_text="The time when the request was created."
)
country = models.ForeignKey(
Country, on_delete=models.CASCADE
Country,
on_delete=models.CASCADE,
null=True,
blank=True
)
start_time = models.DateTimeField(
null=True,
Expand All @@ -30,6 +34,18 @@ class DCASRequest(models.Model):
null=True,
blank=True
)
status = models.CharField(
max_length=255,
choices=TaskStatus.choices,
default=TaskStatus.PENDING,
help_text="The status of the process."
)
progress_text = models.TextField(
null=True,
blank=True,
help_text="Progress or note from the pipeline."
)
config = models.JSONField(blank=True, default=dict, null=True)

class Meta: # noqa
"""Meta class for DCASRequest."""
Expand Down
21 changes: 15 additions & 6 deletions django_project/dcas/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ def grid_crop_data_path(self):
"""Return full path to grid with crop data."""
return self.grid_crop_data_dir_path + '/*.parquet'

@property
def output_csv_file_path(self):
"""Return full path to output csv file."""
dt = self.request_date.strftime('%Y%m%d')
return os.path.join(
self.TMP_BASE_DIR,
f'output_{dt}.csv'
)

def _setup_s3fs(self):
"""Initialize s3fs."""
self.s3 = self._get_s3_variables()
Expand Down Expand Up @@ -196,8 +205,9 @@ def _save_grid_data(self, df: pd.DataFrame):
print(f'writing dataframe to {file_path}')
df.to_parquet(file_path)

def _upload_to_sftp(self, local_file):
def upload_to_sftp(self, local_file):
"""Upload CSV file to Docker SFTP."""
is_success = False
try:
print(f'Connecting to SFTP server at '
f'{settings.SFTP_HOST}:{settings.SFTP_PORT}...')
Expand Down Expand Up @@ -225,9 +235,12 @@ def _upload_to_sftp(self, local_file):
sftp.close()
transport.close()

is_success = True
except Exception as e:
print(f"Failed to upload to SFTP: {e}")

return is_success

def _get_connection(self, s3):
endpoint = s3['AWS_ENDPOINT_URL']
if settings.DEBUG:
Expand Down Expand Up @@ -257,11 +270,7 @@ def _get_connection(self, s3):

def convert_to_csv(self):
"""Convert output to csv file."""
dt = self.request_date.strftime('%Y%m%d')
file_path = os.path.join(
self.TMP_BASE_DIR,
f'output_{dt}.csv'
)
file_path = self.output_csv_file_path
column_list = [
'farm_unique_id as farmer_id', 'crop',
'planting_date as plantingDate', 'growth_stage as growthStage',
Expand Down
Loading
Loading