From fd7c3ab3392029da8be339323d3343e3cb868201 Mon Sep 17 00:00:00 2001 From: ori Date: Mon, 15 Jul 2024 10:41:37 +0300 Subject: [PATCH] add filtering options to ckan dataset fetcher --- configuration.template.json | 9 +- .../generic_fetchers/ckan_dataset_fetcher.py | 144 +++++++++++++++--- .../operators/generic_fetcher.py | 7 +- 3 files changed, 129 insertions(+), 31 deletions(-) diff --git a/configuration.template.json b/configuration.template.json index 9e23497..3eb0047 100644 --- a/configuration.template.json +++ b/configuration.template.json @@ -133,11 +133,16 @@ }, { "name": "generic_fetcher", - "display": "Fetch/Update data from different source types into a CKAN Package", + "display": "Fetch/Update data from different source types into a CKAN Package. See details about the fetchers in DESCRIPTION of each fetcher at https://github.com/hasadna/datacity-ckan-dgp/blob/main/datacity_ckan_dgp/generic_fetchers/", "fields": [ { "name": "source_url", - "display": "Source URL (source type will be inferred from the URL, see https://github.com/hasadna/datacity-ckan-dgp/blob/main/datacity_ckan_dgp/operators/generic_fetcher.py for details)", + "display": "Source URL (source type will be inferred from the URL)", + "type": "text" + }, + { + "name": "source_filter", + "display": "Source Filter (optional, value depends on the source type)", "type": "text" }, { diff --git a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py index 3148aad..103c5ea 100644 --- a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py +++ b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py @@ -1,12 +1,104 @@ import os +import json +import uuid +import shutil import requests +import dataflows as DF from .. import ckan from ..utils import http_stream_download -def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir): +DESCRIPTION = """ +Fetch from CKAN Dataset + +URL example: https://data.gov.il/dataset/automated-devices + +If source filter is not provided it will copy all resources as-is + +If source filter is provided it will do the following: + 1. Find the source resource for tabular data - either CSV or XLSX + 2. Create the filtered tabular data as a CSV (XLSX will be created by our other automation) + 3. If GEOJSON resource is available it will be copied and filtered separately + 4. All other resources will be ignored + +Local development examples: + without source filter: + python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}' + with source filter: + python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir", "source_filter": {"City": "חיפה"}}' +""" + +DEVEL_SKIP_DOWNLOAD = os.getenv('DEVEL_SKIP_DOWNLOAD', 'false').lower() == 'true' + + +def get_filtered_tabular_resources_to_update(tmpdir, source_filter, id_, name, format_, hash_, description, filename): + print(f'filtering tabular data from {filename} with format {format_}...') + resources_to_update = [] + DF.Flow( + DF.load(f'{tmpdir}/{id_}', name='filtered', format=format_.lower()), + DF.filter_rows(lambda row: all(row.get(k) == v for k, v in source_filter.items())), + DF.printer(), + DF.dump_to_path(f'{tmpdir}/{id_}-filtered') + ).process() + with open(f'{tmpdir}/{id_}-filtered/datapackage.json', 'r') as f: + hash_ = json.load(f)['hash'] + shutil.copyfile(f'{tmpdir}/{id_}-filtered/filtered.csv', f'{tmpdir}/{id_}') + resources_to_update.append((id_, name, 'CSV', hash_, description, filename)) + return resources_to_update + + +def get_filtered_geojson_resources_to_update(tmpdir, source_filter, id_, name, format_, hash_, description, filename): + print(f'filtering geojson data from {filename} with format {format_}...') + resources_to_update = [] + with open(f'{tmpdir}/{id_}', 'r') as f: + data = json.load(f) + features = data.get('features') or [] + features = [feature for feature in features if all(feature['properties'].get(k) == v for k, v in source_filter.items())] + data['features'] = features + with open(f'{tmpdir}/{id_}', 'w') as f: + json.dump(data, f) + resources_to_update.append((id_, name, 'GEOJSON', hash_, description, filename)) + return resources_to_update + + +def get_resources_to_update(resources, tmpdir, headers, existing_target_resources, source_filter): + resources_to_update = [] + for resource in resources: + id_ = resource.get('id') or '' + url = resource.get('url') or '' + if url and id_: + if 'e.data.gov.il' in url: + url = url.replace('e.data.gov.il', 'data.gov.il') + filename = url.split('/')[-1] + if DEVEL_SKIP_DOWNLOAD: + print(f'skipping download of {filename} from {url}') + source_hash = '' + else: + source_hash = http_stream_download(f'{tmpdir}/{id_}', {'url': url, 'headers': headers}) + source_format = resource.get('format') or '' + source_name = resource.get('name') or '' + description = resource.get('description') or '' + if source_filter or existing_target_resources.get(f'{source_name}.{source_format}', {}).get('hash') != source_hash: + resources_to_update.append((id_, source_name, source_format, source_hash, description, filename)) + if source_filter: + prefiltered_resources = resources_to_update + resources_to_update = [] + names = set(args[1].lower() for args in prefiltered_resources) + for name in names: + print(f'filtering resources for {name}') + source_resources_by_format = {args[2].lower(): args for args in prefiltered_resources if args[1].lower() == name} + if 'csv' in source_resources_by_format: + resources_to_update.extend(get_filtered_tabular_resources_to_update(tmpdir, source_filter, *source_resources_by_format['csv'])) + elif 'xlsx' in source_resources_by_format: + resources_to_update.extend(get_filtered_tabular_resources_to_update(tmpdir, source_filter, *source_resources_by_format['xlsx'])) + if 'geojson' in source_resources_by_format: + resources_to_update.extend(get_filtered_geojson_resources_to_update(tmpdir, source_filter, *source_resources_by_format['geojson'])) + return resources_to_update + + +def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter): res = ckan.package_show(target_instance_name, target_package_id) target_package_exists = False existing_target_resources = {} @@ -17,7 +109,7 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati name = resource.get('name') or '' hash_ = resource.get('hash') or '' id_ = resource.get('id') or '' - if format_ and name and hash_ and id_: + if format_ and name and id_: existing_target_resources[f'{name}.{format_}'] = {'hash': hash_, 'id': id_} source_package_id = source_url.split('/dataset/')[1].split('/')[0] source_instance_baseurl = source_url.split('/dataset/')[0] @@ -25,23 +117,22 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati headers = {'user-agent': 'datagov-external-client'} else: headers = None - res = requests.get(f'{source_instance_baseurl}/api/3/action/package_show?id={source_package_id}', headers=headers).json() - assert res['success'] + if DEVEL_SKIP_DOWNLOAD: + print('skipping download of package metadata') + with open(f'{tmpdir}/package.json', 'r') as f: + res = json.load(f) + else: + try: + res = requests.get(f'{source_instance_baseurl}/api/3/action/package_show?id={source_package_id}', headers=headers) + res_json = res.json() + assert res_json['success'] + except Exception as e: + raise Exception(f'Failed to fetch source package\n{res.text if res else ""}') from e + res = res_json + with open(f'{tmpdir}/package.json', 'w') as f: + json.dump(res, f) package_title = res['result']['title'] - resources_to_update = [] - for resource in res['result']['resources']: - id_ = resource.get('id') or '' - url = resource.get('url') or '' - if url and id_: - if 'e.data.gov.il' in url: - url = url.replace('e.data.gov.il', 'data.gov.il') - filename = url.split('/')[-1] - source_hash = http_stream_download(f'{tmpdir}/{id_}', {'url': url, 'headers': headers}) - source_format = resource.get('format') or '' - source_name = resource.get('name') or '' - description = resource.get('description') or '' - if existing_target_resources.get(f'{source_name}.{source_format}', {}).get('hash') != source_hash: - resources_to_update.append((id_, source_name, source_format, source_hash, description, filename)) + resources_to_update = get_resources_to_update(res['result']['resources'], tmpdir, headers, existing_target_resources, source_filter) if resources_to_update: print(f'updating {len(resources_to_update)} resources') if not target_package_exists: @@ -58,13 +149,16 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati os.unlink(f'{tmpdir}/{filename}') os.rename(f'{tmpdir}/{id_}', f'{tmpdir}/{filename}') if f'{name}.{format_}' in existing_target_resources: - print('existing resource found, but hash is different, updating resource data') - res = ckan.resource_update(target_instance_name, { - 'id': existing_target_resources[f'{name}.{format_}']['id'], - 'hash': hash_, - 'description': description - }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) - assert res['success'], str(res) + if existing_target_resources[f'{name}.{format_}'].get('hash') and existing_target_resources[f'{name}.{format_}']['hash'] == hash_: + print('existing resource found and hash is the same, skipping resource data update') + else: + print('existing resource found, but hash is different, updating resource data') + res = ckan.resource_update(target_instance_name, { + 'id': existing_target_resources[f'{name}.{format_}']['id'], + 'hash': hash_, + 'description': description + }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) + assert res['success'], str(res) else: print('no existing resource found, creating new resource') res = ckan.resource_create(target_instance_name, { diff --git a/datacity_ckan_dgp/operators/generic_fetcher.py b/datacity_ckan_dgp/operators/generic_fetcher.py index 13eaef0..85c7993 100644 --- a/datacity_ckan_dgp/operators/generic_fetcher.py +++ b/datacity_ckan_dgp/operators/generic_fetcher.py @@ -6,10 +6,8 @@ from importlib import import_module -# the source url will be checked against the following types in order to determine which type of source it is FETCHERS = [ { - # python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}' 'fetcher': 'ckan_dataset', 'match': { 'url_contains': '/dataset/' @@ -30,6 +28,7 @@ def tempdir(tmpdir): def operator(name, params): source_url = params['source_url'] + source_filter = params.get('source_filter') target_instance_name = params['target_instance_name'] target_package_id = params['target_package_id'] target_organization_id = params['target_organization_id'] @@ -37,14 +36,14 @@ def operator(name, params): with tempdir(tmpdir) as tmpdir: print('starting generic_fetcher operator') print(f'source_url={source_url} target_instance_name={target_instance_name} target_package_id={target_package_id} target_organization_id={target_organization_id}') + print(f'source_filter={source_filter}') print(f'tmpdir={tmpdir}') for fetcher in FETCHERS: assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment' if fetcher['match']['url_contains'] in source_url: - import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir) + import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter) break -# python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}' if __name__ == '__main__': operator('_', json.loads(sys.argv[1]))