Skip to content

Commit

Permalink
add post processing to keep last resource per year
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Aug 8, 2024
1 parent 8410998 commit 4d0002f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 7 deletions.
80 changes: 74 additions & 6 deletions datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import json
import shutil
import datetime

import requests
import dataflows as DF
Expand Down Expand Up @@ -50,6 +51,56 @@ def check_row_kv(row, key, val):
return rowval == val


def get_resource_last_modified(resource):
last_modified = resource.get('last_modified')
if last_modified:
try:
return datetime.datetime.strptime(last_modified, '%Y-%m-%dT%H:%M:%S.%f')
except:
pass
return None


def find_resource_by_name_and_format(resources, name, format_):
for resource in resources:
if (resource.get('name') or '').lower() == name.lower() and (resource.get('format') or '').lower() == format_.lower():
return resource
return None


def post_processing_resource(
source_resources, existing_target_resources, post_processing, target_package_id,
format_, name, hash_, description, upload_filename, target_instance_name
):
if not post_processing:
post_processing = []
for post_process in post_processing:
if post_process.get('type') == 'keep-last-updated-per-year':
post_process_month = post_process['month']
post_process_format = post_process['format']
post_process_resource_name = post_process['resource_name']
source_resource = find_resource_by_name_and_format(source_resources.values(), post_process_resource_name, post_process_format)
source_last_modified = get_resource_last_modified(source_resource) if source_resource else None
source_month = source_last_modified.month
year = datetime.datetime.now().year
if (
name.lower() == post_process_resource_name.lower() and format_.lower() == post_process_format.lower()
and source_last_modified and source_last_modified.year == year
and (post_process_month == source_month or post_process_month + 1 == source_month)
and f'{name} ({year}).{format_}'.lower() not in existing_target_resources
):
print(f'post process keep-last-updated-per-year: creating yearly resource ({name}.{format_})')
data = {
'package_id': target_package_id,
'format': format_,
'name': f'{name} ({year})',
'hash': hash_,
'description': description
}
res = ckan.resource_create(target_instance_name, data, files=[('upload', open(upload_filename, 'rb'))])
assert res['success'], str(res)


def filter_rows(source_filter):

def filter_row(row):
Expand Down Expand Up @@ -152,7 +203,7 @@ def get_resources_to_update(resources, tmpdir, headers, existing_target_resource
return resources_to_update


def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter):
def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter, post_processing):
res = ckan.package_show(target_instance_name, target_package_id)
target_package_exists = False
existing_target_resources = {}
Expand Down Expand Up @@ -193,7 +244,12 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati
notes = res['result'].get('notes') or ''
if notes:
description = f'{notes}\n\n{description}'
resources_to_update = get_resources_to_update(res['result']['resources'], tmpdir, headers, existing_target_resources, source_filter)
source_resources = {
resource['id']: resource for resource in res['result']['resources']
}
resources_to_update = get_resources_to_update(
res['result']['resources'], tmpdir, headers, existing_target_resources, source_filter
)
if resources_to_update:
with instance_package_lock(target_instance_name, target_package_id):
print(f'updating {len(resources_to_update)} resources')
Expand All @@ -216,22 +272,34 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati
print('existing resource found and hash is the same, skipping resource data update')
else:
print('existing resource found, but hash is different, updating resource data')
res = ckan.resource_update(target_instance_name, {
data = {
'id': existing_target_resources[f'{name}.{format_}'.lower()]['id'],
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
}
upload_filename = f'{tmpdir}/{filename}'
res = ckan.resource_update(target_instance_name, data, files=[('upload', open(upload_filename, 'rb'))])
assert res['success'], str(res)
post_processing_resource(
source_resources, existing_target_resources, post_processing, target_package_id,
format_, name, hash_, description, upload_filename, target_instance_name
)
else:
print('no existing resource found, creating new resource')
res = ckan.resource_create(target_instance_name, {
data = {
'package_id': target_package_id,
'format': format_,
'name': name,
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
}
upload_filename = f'{tmpdir}/{filename}'
res = ckan.resource_create(target_instance_name, data, files=[('upload', open(upload_filename, 'rb'))])
assert res['success'], str(res)
post_processing_resource(
source_resources, existing_target_resources, post_processing, target_package_id,
format_, name, hash_, description, upload_filename, target_instance_name
)
run_packages_processing(target_instance_name, target_package_id)
print('done, all resources created/updated')
else:
Expand Down
5 changes: 4 additions & 1 deletion datacity_ckan_dgp/operators/generic_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ def operator(name, params):
target_package_id = params['target_package_id']
target_organization_id = params['target_organization_id']
tmpdir = params.get('tmpdir')
post_processing = params.get('post_processing')
with tempdir(tmpdir) as tmpdir:
print('starting generic_fetcher operator')
print(json.dumps(params, ensure_ascii=False))
for fetcher in FETCHERS:
assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment'
if fetcher['match']['url_contains'] in source_url:
import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter)
import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(
source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter, post_processing
)
break


Expand Down

0 comments on commit 4d0002f

Please sign in to comment.