diff --git a/.flake8 b/.flake8 index a4eea9e3..32068ca7 100644 --- a/.flake8 +++ b/.flake8 @@ -17,8 +17,4 @@ max-line-length=127 # List ignore rules one per line. ignore = - E501 - C901 W503 - F401 - F403 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8a601204..a2ebb89f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,22 +10,20 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 with: - python-version: '3.x' + python-version: '3.10' - name: Install requirements run: pip install flake8 pycodestyle - name: Check syntax run: flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --extend-exclude ckan - #- name: Run flake8 - # run: flake8 . --count --max-line-length=127 --statistics --exclude ckan test: needs: lint strategy: matrix: - ckan-version: ["2.10", 2.9, 2.9-py2, 2.8, 2.7] + ckan-version: ["2.10", 2.9] fail-fast: false name: CKAN ${{ matrix.ckan-version }} @@ -54,7 +52,7 @@ jobs: CKAN_REDIS_URL: redis://redis:6379/1 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install requirements run: | pip install -r requirements.txt @@ -64,17 +62,7 @@ jobs: # Replace default path to CKAN core config file with the one on the container sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini - name: Setup extension (CKAN >= 2.9) - if: ${{ matrix.ckan-version != '2.7' && matrix.ckan-version != '2.8' }} run: | ckan -c test.ini db init - - name: Setup extension (CKAN 2.8) - if: ${{ matrix.ckan-version == '2.8' }} - run: | - paster --plugin=ckan db init -c test.ini - - name: Setup extension (CKAN 2.7) - if: ${{ matrix.ckan-version == '2.7' }} - run: | - psql -d "postgresql://datastore_write:pass@postgres/datastore_test" -f full_text_function.sql - paster --plugin=ckan db init -c test.ini - name: Run tests run: pytest --ckan-ini=test.ini --cov=ckanext.xloader --disable-warnings ckanext/xloader/tests diff --git a/MANIFEST.in b/MANIFEST.in index 820ba404..4aeb22fa 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,7 +1,6 @@ -include full_text_function.sql include *requirements*.txt include CHANGELOG include LICENSE include README.rst +include ckanext/xloader/config_declaration.yaml recursive-include ckanext/xloader/templates *.html -recursive-include ckanext/xloader/templates-bs2 *.html diff --git a/README.rst b/README.rst index a024df09..a17ac872 100644 --- a/README.rst +++ b/README.rst @@ -69,8 +69,8 @@ DataPusher - job queue is done by ckan-service-provider which is bespoke, complicated and stores jobs in its own database (sqlite by default). XLoader - job queue is done by RQ, which is simpler, is backed by Redis, allows -access to the CKAN model and is CKAN's default queue technology (since CKAN -2.7). You can also debug jobs easily using pdb. Job results are stored in +access to the CKAN model and is CKAN's default queue technology. +You can also debug jobs easily using pdb. Job results are stored in Sqlite by default, and for production simply specify CKAN's database in the config and it's held there - easy. @@ -98,7 +98,7 @@ Caveat - column types Note: With XLoader, all columns are stored in DataStore's database as 'text' type (whereas DataPusher did some rudimentary type guessing - see 'Robustness' above). However once a resource is xloaded, an admin can use the resource's -Data Dictionary tab (CKAN 2.7 onwards) to change these types to numeric or +Data Dictionary tab to change these types to numeric or datestamp and re-load the file. When migrating from DataPusher to XLoader you can preserve the types of existing resources by using the ``migrate_types`` command. @@ -116,13 +116,10 @@ Compatibility with core CKAN versions: =============== ============= CKAN version Compatibility =============== ============= -2.3 no longer tested and you must install ckanext-rq -2.4 no longer tested and you must install ckanext-rq -2.5 no longer tested and you must install ckanext-rq -2.6 no longer tested and you must install ckanext-rq -2.7 yes -2.8 yes -2.9 yes (both Python2 and Python3) +2.7 no longer supported (last supported version: 0.12.2) +2.8 no longer supported (last supported version: 0.12.2) +2.9 yes (Python3) (last supported version for Python 2.7: 0.12.2)) +2.10 yes =============== ============= ------------ @@ -144,24 +141,7 @@ To install XLoader: pip install -r https://raw.githubusercontent.com/ckan/ckanext-xloader/master/requirements.txt pip install -U requests[security] -4. If you are using CKAN version before 2.8.x you need to define the - ``populate_full_text_trigger`` in your database - :: - - sudo -u postgres psql datastore_default -f full_text_function.sql - - If successful it will print - :: - - CREATE FUNCTION - ALTER FUNCTION - - NB this assumes you used the defaults for the database name and username. - If in doubt, check your config's ``ckan.datastore.write_url``. If you don't have - database name ``datastore_default`` and username ``ckan_default`` then adjust - the psql option and ``full_text_function.sql`` before running this. - -5. Add ``xloader`` to the ``ckan.plugins`` setting in your CKAN +4. Add ``xloader`` to the ``ckan.plugins`` setting in your CKAN config file (by default the config file is located at ``/etc/ckan/default/production.ini``). @@ -170,12 +150,12 @@ To install XLoader: Ensure ``datastore`` is also listed, to enable CKAN DataStore. -6. Starting CKAN 2.10 you will need to set an API Token to be able to +5. Starting CKAN 2.10 you will need to set an API Token to be able to execute jobs against the server:: ckanext.xloader.api_token = -7. If it is a production server, you'll want to store jobs info in a more +6. If it is a production server, you'll want to store jobs info in a more robust database than the default sqlite file. It can happily use the main CKAN postgres db by adding this line to the config, but with the same value as you have for ``sqlalchemy.url``:: @@ -184,31 +164,13 @@ To install XLoader: (This step can be skipped when just developing or testing.) -8. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu:: +7. Restart CKAN. For example if you've deployed CKAN with Apache on Ubuntu:: sudo service apache2 reload -9. Run the worker. First test it on the command-line:: - - paster --plugin=ckan jobs -c /etc/ckan/default/ckan.ini worker - - or if you have CKAN version 2.6.x or less (and are therefore using ckanext-rq):: - - paster --plugin=ckanext-rq jobs -c /etc/ckan/default/ckan.ini worker - - Test it will load a CSV ok by submitting a `CSV in the web interface `_ - or in another shell:: +8. Run the worker:: - paster --plugin=ckanext-xloader xloader submit -c /etc/ckan/default/ckan.ini - - Clearly, running the worker on the command-line is only for testing - for - production services see: - - http://docs.ckan.org/en/ckan-2.7.0/maintaining/background-tasks.html#using-supervisor - - If you have CKAN version 2.6.x or less then you'll need to download - `supervisor-ckan-worker.conf `_ and adjust the ``command`` to reference - ckanext-rq. + ckan -c /etc/ckan/default/ckan.ini jobs worker --------------- @@ -217,58 +179,7 @@ Config settings Configuration: -:: - - # The connection string for the jobs database used by XLoader. The - # default of an sqlite file is fine for development. For production use a - # Postgresql database. - ckanext.xloader.jobs_db.uri = sqlite:////tmp/xloader_jobs.db - - # The formats that are accepted. If the value of the resource.format is - # anything else then it won't be 'xloadered' to DataStore (and will therefore - # only be available to users in the form of the original download/link). - # Case insensitive. - # (optional, defaults are listed in plugin.py - DEFAULT_FORMATS). - ckanext.xloader.formats = csv application/csv xls application/vnd.ms-excel - - # The maximum size of files to load into DataStore. In bytes. Default is 1 GB. - ckanext.xloader.max_content_length = 1000000000 - - # To always use messytables to load data, instead of attempting a direct - # PostgreSQL COPY, set this to True. This more closely matches the - # DataPusher's behavior. It has the advantage that the column types - # are guessed. However it is more error prone, far slower and you can't run - # the CPU-intensive queue on a separate machine. - ckanext.xloader.just_load_with_messytables = False - - # The maximum time for the loading of a resource before it is aborted. - # Give an amount in seconds. Default is 60 minutes - ckanext.xloader.job_timeout = 3600 - - # Ignore the file hash when submitting to the DataStore, if set to True - # resources are always submitted (if their format matches), if set to - # False (default), resources are only submitted if their hash has changed. - ckanext.xloader.ignore_hash = False - - # When loading a file that is bigger than `max_content_length`, xloader can - # still try and load some of the file, which is useful to display a - # preview. Set this option to the desired number of lines/rows that it - # loads in this case. - # If the file-type is supported (CSV, TSV) an excerpt with the number of - # `max_excerpt_lines` lines will be submitted while the `max_content_length` - # is not exceeded. - # If set to 0 (default) files that exceed the `max_content_length` will - # not be loaded into the datastore. - ckanext.xloader.max_excerpt_lines = 100 - - # Requests verifies SSL certificates for HTTPS requests. Setting verify to - # False should only be enabled during local development or testing. Default - # to True. - ckanext.xloader.ssl_verify = True - - # Uses a specific API token for the xloader_submit action instead of the - # apikey of the site_user - ckanext.xloader.api_token = ckan-provided-api-token +See the extension's `config_declaration.yaml `_ file. ------------------------ @@ -280,7 +191,7 @@ in the directory up from your local ckan repo:: git clone https://github.com/ckan/ckanext-xloader.git cd ckanext-xloader - python setup.py develop + pip install -e . pip install -r requirements.txt pip install -r dev-requirements.txt @@ -301,8 +212,8 @@ To upgrade from DataPusher to XLoader: ``ckan.plugins`` line replace ``datapusher`` with ``xloader``. 4. (Optional) If you wish, you can disable the direct loading and continue to - just use messytables - for more about this see the docs on config option: - ``ckanext.xloader.just_load_with_messytables`` + just use tabulator - for more about this see the docs on config option: + ``ckanext.xloader.use_type_guessing`` 5. Stop the datapusher worker:: @@ -322,35 +233,31 @@ command-line interface. e.g. :: - [2.9] ckan -c /etc/ckan/default/ckan.ini xloader submit - [pre-2.9] paster --plugin=ckanext-xloader xloader submit -c /etc/ckan/default/ckan.ini + ckan -c /etc/ckan/default/ckan.ini xloader submit For debugging you can try xloading it synchronously (which does the load directly, rather than asking the worker to do it) with the ``-s`` option:: - [2.9] ckan -c /etc/ckan/default/ckan.ini xloader submit -s - [pre-2.9] paster --plugin=ckanext-xloader xloader submit -s -c /etc/ckan/default/ckan.ini + ckan -c /etc/ckan/default/ckan.ini xloader submit -s See the status of jobs:: - [2.9] ckan -c /etc/ckan/default/ckan.ini xloader status - [pre-2.9] paster --plugin=ckanext-xloader xloader status -c /etc/ckan/default/development.ini + ckan -c /etc/ckan/default/ckan.ini xloader status Submit all datasets' resources to the DataStore:: - [2.9] ckan -c /etc/ckan/default/ckan.ini xloader submit all - [pre-2.9] paster --plugin=ckanext-xloader xloader submit all -c /etc/ckan/default/ckan.ini + ckan -c /etc/ckan/default/ckan.ini xloader submit all Re-submit all the resources already in the DataStore (Ignores any resources that have not been stored in DataStore e.g. because they are not tabular):: - [2.9] ckan -c /etc/ckan/default/ckan.ini xloader submit all-existing - [pre-2.9] paster --plugin=ckanext-xloader xloader submit all-existing -c /etc/ckan/default/ckan.ini + ckan -c /etc/ckan/default/ckan.ini xloader submit all-existing + **Full list of XLoader CLI commands**:: - [2.9] ckan -c /etc/ckan/default/ckan.ini xloader --help - [pre-2.9] paster --plugin=ckanext-xloader xloader --help + ckan -c /etc/ckan/default/ckan.ini xloader --help + Jobs and workers ---------------- @@ -363,8 +270,7 @@ Useful commands: Clear (delete) all outstanding jobs:: - CKAN 2.9, Python 3 ckan -c /etc/ckan/default/ckan.ini jobs clear [QUEUES] - CKAN <2.9, Python 2 paster --plugin=ckanext-xloader xloader jobs clear [QUEUES] -c /etc/ckan/default/development.ini + ckan -c /etc/ckan/default/ckan.ini jobs clear [QUEUES] If having trouble with the worker process, restarting it can help:: @@ -385,13 +291,6 @@ exist** Your DataStore permissions have not been set-up - see: -**When editing a package, all its existing resources get re-loaded by xloader** - -This behavior was documented in -`Issue 75 `_ and is related -to a bug in CKAN that is fixed in versions 2.6.9, 2.7.7, 2.8.4 -and 2.9.0+. - ----------------- Running the Tests ----------------- @@ -402,12 +301,8 @@ The first time, your test datastore database needs the trigger applied:: To run the tests, do:: - nosetests --nologcapture --with-pylons=test.ini - -To run the tests and produce a coverage report, first make sure you have -coverage installed in your virtualenv (``pip install coverage``) then run:: + pytest ckan-ini=test.ini ckanext/xloader/tests - nosetests --nologcapture --with-pylons=test.ini --with-coverage --cover-package=ckanext.xloader --cover-inclusive --cover-erase --cover-tests ---------------------------------- Releasing a New Version of XLoader diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index 6185b2be..3fa26803 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -10,7 +10,7 @@ from ckan.logic import side_effect_free import ckan.plugins as p from dateutil.parser import parse as parse_date -from six import text_type as str +from dateutil.parser import isoparse as parse_iso_date import ckanext.xloader.schema @@ -99,8 +99,7 @@ def xloader_submit(context, data_dict): for job in get_queue().get_jobs() if 'xloader_to_datastore' in str(job) # filter out test_job etc ] - updated = datetime.datetime.strptime( - existing_task['last_updated'], '%Y-%m-%dT%H:%M:%S.%f') + updated = parse_iso_date(existing_task['last_updated']) time_since_last_updated = datetime.datetime.utcnow() - updated if (res_id not in queued_res_ids and time_since_last_updated > assume_task_stillborn_after): @@ -158,11 +157,6 @@ def xloader_submit(context, data_dict): job = enqueue_job( jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout) ) - except TypeError: - # This except provides support for 2.7. - job = _enqueue( - jobs.xloader_data_into_datastore, [data], timeout=timeout - ) except Exception: log.exception('Unable to enqueued xloader res_id=%s', res_id) return False diff --git a/ckanext/xloader/command.py b/ckanext/xloader/command.py index ffde1c68..7f2c000a 100644 --- a/ckanext/xloader/command.py +++ b/ckanext/xloader/command.py @@ -119,10 +119,7 @@ def _submit_resource(self, resource, user, indent=0): self.error_occured = True def print_status(self): - try: - import ckan.lib.jobs as rq_jobs - except ImportError: - import ckanext.rq.jobs as rq_jobs + import ckan.lib.jobs as rq_jobs jobs = rq_jobs.get_queue().jobs if not jobs: print('No jobs currently queued') diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml new file mode 100644 index 00000000..b31f12e2 --- /dev/null +++ b/ckanext/xloader/config_declaration.yaml @@ -0,0 +1,116 @@ +version: 1 +groups: + - annotation: ckanext-xloader settings + options: + - key: ckanext.xloader.jobs_db.uri + default: sqlite:////tmp/xloader_jobs.db + description: | + The connection string for the jobs database used by XLoader. The + default of an sqlite file is fine for development. For production use a + Postgresql database. + validators: not_missing + required: true + - key: ckanext.xloader.api_token + example: eyJ0eXAiOiJKV1QiLCJh.eyJqdGkiOiJ0M2VNUFlQWFg0VU.8QgV8em4RA + description: | + Uses a specific API token for the xloader_submit action instead of the + apikey of the site_user. Will be mandatory when dropping support for + CKAN 2.9. + required: false + - key: ckanext.xloader.formats + example: csv application/csv xls application/vnd.ms-excel + description: | + The formats that are accepted. If the value of the resource.format is + anything else then it won't be 'xloadered' to DataStore (and will therefore + only be available to users in the form of the original download/link). + Case insensitive. Defaults are listed in plugin.py. + required: false + - key: ckanext.xloader.max_content_length + default: 1_000_000_000 + example: 100000 + description: | + The connection string for the jobs database used by XLoader. The + default of an sqlite file is fine for development. For production use a + Postgresql database. + type: int + required: false + - key: ckanext.xloader.use_type_guessing + default: False + example: False + description: | + By default, xloader will first try to add tabular data to the DataStore + with a direct PostgreSQL COPY. This is relatively fast, but does not + guess column types. If this fails, xloader falls back to a method more + like DataPusher's behaviour. This has the advantage that the column types + are guessed. However it is more error prone and far slower. + To always skip the direct PostgreSQL COPY and use type guessing, set + this option to True. + type: bool + required: false + legacy_key: ckanext.xloader.just_load_with_messytables + - key: ckanext.xloader.parse_dates_dayfirst + default: False + example: False + description: | + Whether ambiguous dates should be parsed day first. Defaults to False. + If set to True, dates like '01.02.2022' will be parsed as day = 01, + month = 02. + NB: isoformat dates like '2022-01-02' will be parsed as YYYY-MM-DD, and + this option will not override that. + See https://dateutil.readthedocs.io/en/stable/parser.html#dateutil.parser.parse + for more details. + type: bool + required: false + - key: ckanext.xloader.parse_dates_yearfirst + default: False + example: False + description: | + Whether ambiguous dates should be parsed year first. Defaults to False. + If set to True, dates like '01.02.03' will be parsed as year = 2001, + month = 02, day = 03. See https://dateutil.readthedocs.io/en/stable/parser.html#dateutil.parser.parse + for more details. + type: bool + required: false + - key: ckanext.xloader.job_timeout + default: 3600 + example: 3600 + description: | + The maximum time for the loading of a resource before it is aborted. + Give an amount in seconds. Default is 60 minutes + type: int + required: false + - key: ckanext.xloader.ignore_hash + default: False + example: False + description: | + Ignore the file hash when submitting to the DataStore, if set to True + resources are always submitted (if their format matches), if set to + False (default), resources are only submitted if their hash has changed. + type: bool + required: false + - key: ckanext.xloader.max_excerpt_lines + default: 0 + example: 100 + description: | + When loading a file that is bigger than `max_content_length`, xloader can + still try and load some of the file, which is useful to display a + preview. Set this option to the desired number of lines/rows that it + loads in this case. + If the file-type is supported (CSV, TSV) an excerpt with the number of + `max_excerpt_lines` lines will be submitted while the `max_content_length` + is not exceeded. + If set to 0 (default) files that exceed the `max_content_length` will + not be loaded into the datastore. + type: int + required: false + - key: ckanext.xloader.ssl_verify + default: True + example: True + description: | + Requests verifies SSL certificates for HTTPS requests. Setting verify to + False should only be enabled during local development or testing. Default + to True. + type: bool + required: false + + diff --git a/ckanext/xloader/controllers.py b/ckanext/xloader/controllers.py deleted file mode 100644 index a1ab3c3f..00000000 --- a/ckanext/xloader/controllers.py +++ /dev/null @@ -1,7 +0,0 @@ -import ckan.plugins as p -import ckanext.xloader.utils as utils - - -class ResourceDataController(p.toolkit.BaseController): - def resource_data(self, id, resource_id): - return utils.resource_data(id, resource_id) diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 0a6d2b20..0d242db1 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -9,7 +9,6 @@ import datetime import traceback import sys -from six import text_type as str from six.moves.urllib.parse import urlsplit import requests @@ -17,7 +16,7 @@ import sqlalchemy as sa from ckan import model -from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config, check_ckan_version +from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config from . import loader from . import db @@ -186,39 +185,42 @@ def direct_load(): patch_only=True) logger.info('File Hash updated for resource: %s', resource['hash']) - def messytables_load(): + def tabulator_load(): try: loader.load_table(tmp_file.name, resource_id=resource['id'], mimetype=resource.get('format'), logger=logger) except JobError as e: - logger.error('Error during messytables load: %s', e) + logger.error('Error during tabulator load: %s', e) raise loader.calculate_record_count( resource_id=resource['id'], logger=logger) set_datastore_active(data, resource, logger) - logger.info('Finished loading with messytables') + logger.info('Finished loading with tabulator') update_resource(resource={'id': resource['id'], 'hash': resource['hash']}, patch_only=True) logger.info('File Hash updated for resource: %s', resource['hash']) # Load it logger.info('Loading CSV') - just_load_with_messytables = asbool(config.get( - 'ckanext.xloader.just_load_with_messytables', config.get('ckanext.xloader.compatibility_mode', False))) - logger.info("'Just load with messytables' mode is: %s", - just_load_with_messytables) + # If ckanext.xloader.use_type_guessing is not configured, fall back to + # deprecated ckanext.xloader.just_load_with_messytables + use_type_guessing = asbool(config.get( + 'ckanext.xloader.use_type_guessing', config.get( + 'ckanext.xloader.just_load_with_messytables', False))) + logger.info("'use_type_guessing' mode is: %s", + use_type_guessing) try: - if just_load_with_messytables: - messytables_load() + if use_type_guessing: + tabulator_load() else: try: direct_load() except JobError as e: logger.warning('Load using COPY failed: %s', e) - logger.info('Trying again with messytables') - messytables_load() + logger.info('Trying again with tabulator') + tabulator_load() except FileCouldNotBeLoadedError as e: logger.warning('Loading excerpt for this format not supported.') logger.error('Loading file raised an error: %s', e) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index b1b95fbe..55c9cab5 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -1,40 +1,29 @@ 'Load a CSV into postgres' from __future__ import absolute_import -from six import text_type as str + +import datetime +import itertools import os import os.path import tempfile -import itertools -import csv +from decimal import Decimal -import six -from six.moves import zip import psycopg2 -import messytables +from six.moves import zip +from tabulator import Stream, TabulatorException from unidecode import unidecode import ckan.plugins as p -from .job_exceptions import LoaderError, FileCouldNotBeLoadedError -import ckan.plugins.toolkit as tk -try: - from ckan.plugins.toolkit import config -except ImportError: - # older versions of ckan - from pylons import config - -if tk.check_ckan_version(min_version='2.7'): - import ckanext.datastore.backend.postgres as datastore_db - get_write_engine = datastore_db.get_write_engine -else: - # older versions of ckan - def get_write_engine(): - from ckanext.datastore.db import _get_engine - from pylons import config - data_dict = {'connection_url': config['ckan.datastore.write_url']} - return _get_engine(data_dict) - import ckanext.datastore.db as datastore_db +from .job_exceptions import FileCouldNotBeLoadedError, LoaderError +from .parser import XloaderCSVParser +from .utils import headers_guess, type_guess + +from ckan.plugins.toolkit import config +import ckanext.datastore.backend.postgres as datastore_db + +get_write_engine = datastore_db.get_write_engine create_indexes = datastore_db.create_indexes _drop_indexes = datastore_db._drop_indexes @@ -44,45 +33,33 @@ def get_write_engine(): def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): '''Loads a CSV into DataStore. Does not create the indexes.''' - # use messytables to determine the header row - extension = os.path.splitext(csv_filepath)[1] - with open(csv_filepath, 'rb') as f: + # Determine the header row + try: + file_format = os.path.splitext(csv_filepath)[1].strip('.') + with Stream(csv_filepath, format=file_format) as stream: + header_offset, headers = headers_guess(stream.sample) + except TabulatorException: try: - table_set = messytables.any_tableset(f, mimetype=mimetype, - extension=extension) - except messytables.ReadError as e: - raise LoaderError('Messytables error: {}'.format(e)) - except Exception as e: - raise FileCouldNotBeLoadedError(e) + file_format = mimetype.lower().split('/')[-1] + with Stream(csv_filepath, format=file_format) as stream: + header_offset, headers = headers_guess(stream.sample) + except TabulatorException as e: + raise LoaderError('Tabulator error: {}'.format(e)) + except Exception as e: + raise FileCouldNotBeLoadedError(e) - if not table_set.tables: - raise LoaderError('Could not detect tabular data in this file') - row_set = table_set.tables.pop() - try: - header_offset, headers = messytables.headers_guess(row_set.sample) - except messytables.ReadError as e: - raise LoaderError('Messytables error: {}'.format(e)) # Some headers might have been converted from strings to floats and such. headers = encode_headers(headers) - # Guess the delimiter used in the file - with open(csv_filepath, 'rb') as f: - header_line = f.readline() - try: - sniffer = csv.Sniffer() - delimiter = sniffer.sniff(six.ensure_text(header_line)).delimiter - except csv.Error: + # Get the list of rows to skip. The rows in the tabulator stream are + # numbered starting with 1. + skip_rows = list(range(1, header_offset + 1)) + + # Get the delimiter used in the file + delimiter = stream.dialect.get('delimiter') + if delimiter is None: logger.warning('Could not determine delimiter from file, use default ","') delimiter = ',' - except UnicodeDecodeError: - raise LoaderError('Could not detect delimiter in this file') - - # Setup the converters that run when you iterate over the row_set. - # With pgloader only the headers will be iterated over. - row_set.register_processor(messytables.headers_processor(headers)) - row_set.register_processor( - messytables.offset_processor(header_offset + 1)) - # types = messytables.type_guess(row_set.sample, types=TYPES, strict=True) headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()] @@ -93,13 +70,11 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): # It is easier to reencode it as UTF8 than convert the name of the encoding # to one that pgloader will understand. logger.info('Ensuring character coding is UTF8') - f_write = tempfile.NamedTemporaryFile(suffix=extension, delete=False) + f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False) try: - with open(csv_filepath, 'rb') as f_read: - csv_decoder = messytables.commas.UTF8Recoder(f_read, encoding=None) - for line in csv_decoder: - f_write.write(line) - f_write.close() # ensures the last line is written + with Stream(csv_filepath, format=file_format, skip_rows=skip_rows) as stream: + stream.save(target=f_write.name, format='csv', encoding='utf-8', + delimiter=delimiter) csv_filepath = f_write.name # datastore db connection @@ -127,7 +102,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): fields = [ {'id': header_name, 'type': existing_info.get(header_name, {}) - .get('type_override') or 'text', + .get('type_override') or 'text', } for header_name in headers] @@ -165,9 +140,6 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): raise LoaderError('Could not create the database table: {}' .format(e)) connection = context['connection'] = engine.connect() - if not fulltext_trigger_exists(connection, resource_id): - logger.info('Trigger created') - _create_fulltext_trigger(connection, resource_id) # datstore_active is switched on by datastore_create - TODO temporarily # disable it until the load is complete @@ -254,75 +226,66 @@ def create_column_indexes(fields, resource_id, logger): def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): - '''Loads an Excel file (or other tabular data recognized by messytables) + '''Loads an Excel file (or other tabular data recognized by tabulator) into Datastore and creates indexes. Largely copied from datapusher - see below. Is slower than load_csv. ''' - # use messytables to determine the header row + # Determine the header row logger.info('Determining column names and types') - ct = mimetype - format = os.path.splitext(table_filepath)[1] # filename extension - with open(table_filepath, 'rb') as tmp: - - # - # Copied from datapusher/jobs.py:push_to_datastore - # - + try: + file_format = os.path.splitext(table_filepath)[1].strip('.') + with Stream(table_filepath, format=file_format, + custom_parsers={'csv': XloaderCSVParser}) as stream: + header_offset, headers = headers_guess(stream.sample) + except TabulatorException: try: - table_set = messytables.any_tableset(tmp, mimetype=ct, extension=ct) - except messytables.ReadError: - # try again with format - tmp.seek(0) - try: - table_set = messytables.any_tableset(tmp, mimetype=format, extension=format) - except Exception as e: - raise LoaderError(e) + file_format = mimetype.lower().split('/')[-1] + with Stream(table_filepath, format=file_format, + custom_parsers={'csv': XloaderCSVParser}) as stream: + header_offset, headers = headers_guess(stream.sample) + except TabulatorException as e: + raise LoaderError('Tabulator error: {}'.format(e)) + except Exception as e: + raise FileCouldNotBeLoadedError(e) + + existing = datastore_resource_exists(resource_id) + existing_info = None + if existing: + existing_info = dict( + (f['id'], f['info']) + for f in existing.get('fields', []) if 'info' in f) - if not table_set.tables: - raise LoaderError('Could not parse file as tabular data') - row_set = table_set.tables.pop() - offset, headers = messytables.headers_guess(row_set.sample) - - existing = datastore_resource_exists(resource_id) - existing_info = None - if existing: - existing_info = dict( - (f['id'], f['info']) - for f in existing.get('fields', []) if 'info' in f) - - # Some headers might have been converted from strings to floats and such. - headers = encode_headers(headers) + # Some headers might have been converted from strings to floats and such. + headers = encode_headers(headers) - row_set.register_processor(messytables.headers_processor(headers)) - row_set.register_processor(messytables.offset_processor(offset + 1)) - TYPES, TYPE_MAPPING = get_types() - types = messytables.type_guess(row_set.sample, types=TYPES, strict=True) + # Get the list of rows to skip. The rows in the tabulator stream are + # numbered starting with 1. We also want to skip the header row. + skip_rows = list(range(1, header_offset + 2)) - # override with types user requested - if existing_info: - types = [ - { - 'text': messytables.StringType(), - 'numeric': messytables.DecimalType(), - 'timestamp': messytables.DateUtilType(), - }.get(existing_info.get(h, {}).get('type_override'), t) - for t, h in zip(types, headers)] + TYPES, TYPE_MAPPING = get_types() + types = type_guess(stream.sample[1:], types=TYPES, strict=True) - row_set.register_processor(messytables.types_processor(types)) + # override with types user requested + if existing_info: + types = [ + { + 'text': str, + 'numeric': Decimal, + 'timestamp': datetime.datetime, + }.get(existing_info.get(h, {}).get('type_override'), t) + for t, h in zip(types, headers)] - headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()] - headers_set = set(headers) + headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()] + with Stream(table_filepath, format=file_format, skip_rows=skip_rows, + custom_parsers={'csv': XloaderCSVParser}) as stream: def row_iterator(): - for row in row_set: + for row in stream: data_row = {} for index, cell in enumerate(row): - column_name = cell.column.strip() - if column_name not in headers_set: - continue - data_row[column_name] = cell.value + data_row[headers[index]] = cell yield data_row result = row_iterator() @@ -354,33 +317,43 @@ def row_iterator(): logger.info('Copying to database...') count = 0 + # Some types cannot be stored as empty strings and must be converted to None, + # https://github.com/ckan/ckanext-xloader/issues/182 + non_empty_types = ['timestamp', 'numeric'] for i, records in enumerate(chunky(result, 250)): count += len(records) logger.info('Saving chunk {number}'.format(number=i)) + for row in records: + for column_index, column_name in enumerate(row): + if headers_dicts[column_index]['type'] in non_empty_types and row[column_name] == '': + row[column_name] = None send_resource_to_datastore(resource_id, headers_dicts, records) logger.info('...copying done') - if count: - logger.info('Successfully pushed {n} entries to "{res_id}".'.format( - n=count, res_id=resource_id)) - else: - # no datastore table is created - raise LoaderError('No entries found - nothing to load') + if count: + logger.info('Successfully pushed {n} entries to "{res_id}".'.format( + n=count, res_id=resource_id)) + else: + # no datastore table is created + raise LoaderError('No entries found - nothing to load') _TYPE_MAPPING = { - 'String': 'text', - # 'int' may not be big enough, - # and type detection may not realize it needs to be big - 'Integer': 'numeric', - 'Decimal': 'numeric', - 'DateUtil': 'timestamp' + "": 'text', + "": 'text', + "": 'numeric', + "": 'numeric', + "": 'numeric', + "": 'text', + "": 'text', + "": 'numeric', + "": 'numeric', + "": 'timestamp', } def get_types(): - _TYPES = [messytables.StringType, messytables.DecimalType, - messytables.IntegerType, messytables.DateUtilType] + _TYPES = [int, bool, str, datetime.datetime, float, Decimal] TYPE_MAPPING = config.get('TYPE_MAPPING', _TYPE_MAPPING) return _TYPES, TYPE_MAPPING @@ -530,18 +503,6 @@ def calculate_record_count(resource_id, logger): .format(resource_id=resource_id)) -################################ -# datastore copied code # -# (for use with older ckans that lack this) - -def _create_fulltext_trigger(connection, resource_id): - connection.execute( - u'''CREATE TRIGGER zfulltext - BEFORE INSERT OR UPDATE ON {table} - FOR EACH ROW EXECUTE PROCEDURE populate_full_text_trigger()'''.format( - table=identifier(resource_id))) - - def identifier(s): # "%" needs to be escaped, otherwise connection.execute thinks it is for # substituting a bind parameter @@ -551,6 +512,3 @@ def identifier(s): def literal_string(s): return u"'" + s.replace(u"'", u"''").replace(u'\0', '') + u"'" - -# end of datastore copied code # -################################ diff --git a/ckanext/xloader/parser.py b/ckanext/xloader/parser.py new file mode 100644 index 00000000..b52c59a3 --- /dev/null +++ b/ckanext/xloader/parser.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +import csv +from decimal import Decimal, InvalidOperation +from itertools import chain + +from ckan.plugins.toolkit import asbool +from dateutil.parser import isoparser, parser +from dateutil.parser import ParserError + +from tabulator import helpers +from tabulator.parser import Parser + +from ckan.plugins.toolkit import config + +CSV_SAMPLE_LINES = 100 + + +class XloaderCSVParser(Parser): + """Extends tabulator CSVParser to detect datetime and numeric values. + """ + + # Public + + options = [ + 'delimiter', + 'doublequote', + 'escapechar', + 'quotechar', + 'quoting', + 'skipinitialspace', + 'lineterminator' + ] + + def __init__(self, loader, force_parse=False, **options): + super(XloaderCSVParser, self).__init__(loader, force_parse, **options) + # Set attributes + self.__loader = loader + self.__options = options + self.__force_parse = force_parse + self.__extended_rows = None + self.__encoding = None + self.__dialect = None + self.__chars = None + + @property + def closed(self): + return self.__chars is None or self.__chars.closed + + def open(self, source, encoding=None): + # Close the character stream, if necessary, before reloading it. + self.close() + self.__chars = self.__loader.load(source, encoding=encoding) + self.__encoding = getattr(self.__chars, 'encoding', encoding) + if self.__encoding: + self.__encoding.lower() + self.reset() + + def close(self): + if not self.closed: + self.__chars.close() + + def reset(self): + helpers.reset_stream(self.__chars) + self.__extended_rows = self.__iter_extended_rows() + + @property + def encoding(self): + return self.__encoding + + @property + def dialect(self): + if self.__dialect: + dialect = { + 'delimiter': self.__dialect.delimiter, + 'doubleQuote': self.__dialect.doublequote, + 'lineTerminator': self.__dialect.lineterminator, + 'quoteChar': self.__dialect.quotechar, + 'skipInitialSpace': self.__dialect.skipinitialspace, + } + if self.__dialect.escapechar is not None: + dialect['escapeChar'] = self.__dialect.escapechar + return dialect + + @property + def extended_rows(self): + return self.__extended_rows + + # Private + + def __iter_extended_rows(self): + + def type_value(value): + """Returns numeric values as Decimal(). Uses dateutil to parse + date values. Otherwise, returns values as it receives them + (strings). + """ + if value in ('', None): + return '' + + try: + return Decimal(value) + except InvalidOperation: + pass + + try: + i = isoparser() + return i.isoparse(value) + except ValueError: + pass + + try: + p = parser() + yearfirst = asbool(config.get( + 'ckanext.xloader.parse_dates_yearfirst', False)) + dayfirst = asbool(config.get( + 'ckanext.xloader.parse_dates_dayfirst', False)) + return p.parse(value, yearfirst=yearfirst, dayfirst=dayfirst) + except ParserError: + pass + + return value + + sample, dialect = self.__prepare_dialect(self.__chars) + items = csv.reader(chain(sample, self.__chars), dialect=dialect) + for row_number, item in enumerate(items, start=1): + values = [] + for value in item: + value = type_value(value) + values.append(value) + yield row_number, None, list(values) + + def __prepare_dialect(self, stream): + + # Get sample + sample = [] + while True: + try: + sample.append(next(stream)) + except StopIteration: + break + if len(sample) >= CSV_SAMPLE_LINES: + break + + # Get dialect + try: + separator = '' + delimiter = self.__options.get('delimiter', ',\t;|') + dialect = csv.Sniffer().sniff(separator.join(sample), delimiter) + if not dialect.escapechar: + dialect.doublequote = True + except csv.Error: + class dialect(csv.excel): + pass + for key, value in self.__options.items(): + setattr(dialect, key, value) + # https://github.com/frictionlessdata/FrictionlessDarwinCore/issues/1 + if getattr(dialect, 'quotechar', None) == '': + setattr(dialect, 'quoting', csv.QUOTE_NONE) + + self.__dialect = dialect + return sample, dialect diff --git a/ckanext/xloader/paster.py b/ckanext/xloader/paster.py deleted file mode 100644 index 7e58a119..00000000 --- a/ckanext/xloader/paster.py +++ /dev/null @@ -1,219 +0,0 @@ -from __future__ import print_function -import sys - -import ckan.lib.cli as cli -import ckan.plugins as p -import ckan.model as model - -import ckanext.datastore.helpers as h -from ckanext.xloader.command import XloaderCmd - -# Paster command for CKAN 2.8 and below - - -class xloaderCommand(cli.CkanCommand): - '''xloader commands - - Usage: - - xloader submit [options] - Submit the given datasets' resources to be xloaded into the - DataStore. (They are added to the queue for CKAN's background task - worker.) - - where is one of: - - - Submit a particular dataset's resources - - - Submit a particular dataset's resources - - all - Submit all datasets' resources to the DataStore - - all-existing - Re-submits all the resources already in the - DataStore. (Ignores any resources that have not been stored - in DataStore, e.g. because they are not tabular) - - options: - - --dry-run - doesn't actually submit any resources - - --ignore-format - submit resources even if they have a format - not in the configured ckanext.xloader.formats - - xloader status - Shows status of jobs - ''' - - summary = __doc__.split('\n')[0] - usage = __doc__ - min_args = 1 - - def __init__(self, name): - super(xloaderCommand, self).__init__(name) - self.error_occured = False - - self.parser.add_option('-y', dest='yes', - action='store_true', default=False, - help='Always answer yes to questions') - self.parser.add_option('--ignore-format', - action='store_true', default=False, - help='Submit even if the resource.format is not' - ' in ckanext.xloader.formats') - self.parser.add_option('--dry-run', - action='store_true', default=False, - help='Don\'t actually submit anything') - - def command(self): - cmd = XloaderCmd(self.options.dry_run) - if not self.args: - print(self.usage) - sys.exit(1) - if self.args[0] == 'submit': - if len(self.args) < 2: - self.parser.error('This command requires an argument') - if self.args[1] == 'all': - self._load_config() - cmd._setup_xloader_logger() - cmd._submit_all() - elif self.args[1] == 'all-existing': - self._confirm_or_abort() - self._load_config() - cmd._setup_xloader_logger() - cmd._submit_all_existing() - else: - pkg_name_or_id = self.args[1] - self._load_config() - cmd._setup_xloader_logger() - cmd._submit_package(pkg_name_or_id) - self._handle_command_status(cmd.error_occured) - elif self.args[0] == 'status': - self._load_config() - cmd.print_status() - else: - self.parser.error('Unrecognized command') - - def _handle_command_status(self, error_occured): - if error_occured: - print('Finished but saw errors - see above for details') - sys.exit(1) - - def _confirm_or_abort(self): - if self.options.yes or self.options.dry_run: - return - question = ( - "Data in any datastore resource that isn't in their source files " - "(e.g. data added using the datastore API) will be permanently " - "lost. Are you sure you want to proceed?" - ) - answer = cli.query_yes_no(question, default=None) - if not answer == 'yes': - print("Aborting...") - sys.exit(0) - - -class MigrateTypesCommand(cli.CkanCommand): - '''Migrate command - - Turn existing resource field types into Data Dictionary overrides. - This is intended to simplify migration from DataPusher to XLoader, - by allowing you to reuse the types that DataPusher has guessed. - - Usage: - - migrate_types [options] [resource-spec] - Add the given resources' field types to the Data Dictionary. - - where resource-spec is one of: - - - Migrate a particular resource - - all - Migrate all resources (this is the default) - - ''' - summary = __doc__.split('\n')[0] - usage = __doc__ - min_args = 0 - - def __init__(self, name): - super(MigrateTypesCommand, self).__init__(name) - self.error_occured = False - - self.parser.add_option('-t', '--include-text', - action='store_true', default=False, - help='Add Data Dictionary overrides even for text fields') - - self.parser.add_option('--force', - action='store_true', default=False, - help='Overwrite existing data dictionary if it exists') - - def command(self): - self._load_config() - if not self.args or len(self.args) == 0 or self.args[0] == 'all': - self._migrate_all() - else: - self._migrate_resource(self.args[0]) - self._handle_command_status() - - def _migrate_all(self): - session = model.Session - resource_count = session.query(model.Resource).filter_by(state='active').count() - print("Updating {} resource(s)".format(resource_count)) - resources_done = 0 - for resource in session.query(model.Resource).filter_by(state='active'): - resources_done += 1 - self._migrate_resource(resource.id, - prefix='[{}/{}]: '.format(resources_done, - resource_count)) - if resources_done % 100 == 0: - print("[{}/{}] done".format(resources_done, resource_count)) - print("[{}/{}] done".format(resources_done, resource_count)) - - def _migrate_resource(self, resource_id, prefix=''): - data_dict = h.datastore_dictionary(resource_id) - - def print_status(status): - if self.options.verbose: - print("{}{}: {}".format(prefix, resource_id, status)) - - if not data_dict: - print_status("not found") - return - - fields = [] - for field in data_dict: - if field['type'] == 'text' and not self.options.include_text: - type_override = '' - else: - type_override = field['type'] - - if 'info' not in field: - field.update({'info': {'notes': '', - 'type_override': type_override, - 'label': ''}}) - elif self.options.force: - field['info'].update({'type_override': type_override}) - else: - print_status("skipped") - return - - fields.append({ - 'id': field['id'], - 'type': field['type'], - 'info': field['info'] - }) - - try: - p.toolkit.get_action('datastore_create')(None, { - 'resource_id': resource_id, - 'force': True, - 'fields': fields - }) - print_status("updated") - except Exception as e: - self.error_occured = True - print("{}: failed, {}".format(resource_id, e)) - - def _handle_command_status(self): - if self.error_occured: - print('Finished but saw errors - see above for details') - sys.exit(1) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 083d5f60..159b99de 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -6,7 +6,14 @@ from ckan.plugins import toolkit from . import action, auth, helpers as xloader_helpers, utils -from .loader import fulltext_function_exists, get_write_engine + +try: + config_declarations = toolkit.blanket.config_declarations +except AttributeError: + # CKAN 2.9 does not have config_declarations. + # Remove when dropping support. + def config_declarations(cls): + return cls log = logging.getLogger(__name__) @@ -41,6 +48,7 @@ def is_it_an_xloader_format(cls, format_): return format_.lower() in cls._formats +@config_declarations class xloaderPlugin(plugins.SingletonPlugin): plugins.implements(plugins.IConfigurer) plugins.implements(plugins.IConfigurable) @@ -49,44 +57,25 @@ class xloaderPlugin(plugins.SingletonPlugin): plugins.implements(plugins.IAuthFunctions) plugins.implements(plugins.ITemplateHelpers) plugins.implements(plugins.IResourceController, inherit=True) + plugins.implements(plugins.IClick) + plugins.implements(plugins.IBlueprint) - if toolkit.check_ckan_version("2.9"): - plugins.implements(plugins.IClick) - plugins.implements(plugins.IBlueprint) - - # IClick - def get_commands(self): - from ckanext.xloader.cli import get_commands - - return get_commands() - - # IBlueprint - def get_blueprint(self): - from ckanext.xloader.views import get_blueprints + # IClick + def get_commands(self): + from ckanext.xloader.cli import get_commands - return get_blueprints() + return get_commands() - else: - plugins.implements(plugins.IRoutes, inherit=True) + # IBlueprint + def get_blueprint(self): + from ckanext.xloader.views import get_blueprints - # IRoutes - def before_map(self, m): - m.connect( - "xloader.resource_data", - "/dataset/{id}/resource_data/{resource_id}", - controller="ckanext.xloader.controllers:ResourceDataController", - action="resource_data", - ckan_icon="cloud-upload", - ) - return m + return get_blueprints() # IConfigurer def update_config(self, config): - templates_base = config.get( - "ckan.base_templates_folder", "templates-bs2" - ) # for ckan < 2.8 - toolkit.add_template_directory(config, templates_base) + toolkit.add_template_directory(config, 'templates') # IConfigurable @@ -104,20 +93,6 @@ def configure(self, config_): ) ) - if toolkit.check_ckan_version(max_version="2.7.99"): - # populate_full_text_trigger() needs to be defined, and this was - # introduced in CKAN 2.8 when you installed datastore e.g.: - # paster datastore set-permissions - # However before CKAN 2.8 we need to check the user has defined - # this function manually. - connection = get_write_engine().connect() - if not fulltext_function_exists(connection): - raise Exception( - "populate_full_text_trigger is not defined. " - "See ckanext-xloader's README.rst for more " - "details." - ) - # IResourceUrlChange def notify(self, resource): @@ -161,7 +136,7 @@ def after_resource_update(self, context, resource_dict): datastore_exists = False if datastore_active != datastore_exists: - # datastore does exist; update flag + # flag is out of sync with datastore; update it utils.set_resource_metadata( {'resource_id': resource_dict['id'], 'datastore_active': datastore_exists}) diff --git a/ckanext/xloader/templates-bs2/package/resource_edit_base.html b/ckanext/xloader/templates-bs2/package/resource_edit_base.html deleted file mode 100644 index 34403521..00000000 --- a/ckanext/xloader/templates-bs2/package/resource_edit_base.html +++ /dev/null @@ -1,6 +0,0 @@ -{% ckan_extends %} - -{% block inner_primary_nav %} - {{ super() }} - {{ h.build_nav_icon('xloader.resource_data', _('DataStore'), id=pkg.name, resource_id=res.id) }} -{% endblock %} diff --git a/ckanext/xloader/templates-bs2/xloader/resource_data.html b/ckanext/xloader/templates-bs2/xloader/resource_data.html deleted file mode 100644 index ace37859..00000000 --- a/ckanext/xloader/templates-bs2/xloader/resource_data.html +++ /dev/null @@ -1,88 +0,0 @@ -{% extends "package/resource_edit_base.html" %} - -{% block subtitle %}{{ h.dataset_display_name(pkg) }} - {{ h.resource_display_name(res) }}{% endblock %} - -{% block primary_content_inner %} - - {% set action = h.url_for(controller='ckanext.xloader.controllers:ResourceDataController', action='resource_data', id=pkg.name, resource_id=res.id) %} - {% set show_table = true %} - -
- -
- - {% if status.error and status.error.message %} - {% set show_table = false %} -
- {{ _('Upload error:') }} {{ status.error.message }} -
- {% elif status.task_info and status.task_info.error %} -
- {% if status.task_info.error is string %} - {# DataPusher < 0.0.3 #} - {{ _('Error:') }} {{ status.task_info.error }} - {% elif status.task_info.error is mapping %} - {{ _('Error:') }} {{ status.task_info.error.message }} - {% for error_key, error_value in status.task_info.error.items() %} - {% if error_key != "message" and error_value %} -
- {{ error_key }}: - {{ error_value }} - {% endif %} - {% endfor %} - {% elif status.task_info.error is iterable %} - {{ _('Error traceback:') }} -
{{ ''.join(status.task_info.error) }}
- {% endif %} -
- {% endif %} - - - - - - - - - - - - - {% if status.status %} - - {% else %} - - {% endif %} - -
{{ _('Status') }}{{ h.xloader_status_description(status) }}
{{ _('Last updated') }}{{ h.time_ago_from_timestamp(status.last_updated) }}{{ _('Never') }}
- - {% if status.status and status.task_info and show_table %} -

{{ _('Upload Log') }}

-
    - {% for item in status.task_info.logs %} - {% set icon = 'ok' if item.level == 'INFO' else 'exclamation' %} - {% set class = ' failure' if icon == 'exclamation' else ' success' %} - {% set popover_content = 'test' %} -
  • - -

    - {% for line in item.message.strip().split('\n') %} - {{ line | urlize }}
    - {% endfor %} - - {{ h.time_ago_from_timestamp(item.timestamp) }} - {{ _('Details') }} - -

    -
  • - {% endfor %} -
  • - -

    {{ _('End of log') }}

    -
  • -
- {% endif %} - -{% endblock %} diff --git a/ckanext/xloader/templates/xloader/resource_data.html b/ckanext/xloader/templates/xloader/resource_data.html index 2634b081..a94ad631 100644 --- a/ckanext/xloader/templates/xloader/resource_data.html +++ b/ckanext/xloader/templates/xloader/resource_data.html @@ -7,11 +7,14 @@ {% set action = h.url_for('xloader.resource_data', id=pkg.name, resource_id=res.id) %} {% set show_table = true %} -
- -
+ {% block upload_ds_button %} +
+ {{ h.csrf_input() if 'csrf_input' in h }} + +
+ {% endblock %} {% if status.error and status.error.message %} {% set show_table = false %} @@ -20,10 +23,7 @@ {% elif status.task_info and status.task_info.error %}
- {% if status.task_info.error is string %} - {# DataPusher < 0.0.3 #} - {{ _('Error:') }} {{ status.task_info.error }} - {% elif status.task_info.error is mapping %} + {% if status.task_info.error is mapping %} {{ _('Error:') }} {{ status.task_info.error.message }} {% for error_key, error_value in status.task_info.error.items() %} {% if error_key != "message" and error_value %} diff --git a/ckanext/xloader/tests/ckan_setup.py b/ckanext/xloader/tests/ckan_setup.py index ae8bfb3e..ff43d74c 100644 --- a/ckanext/xloader/tests/ckan_setup.py +++ b/ckanext/xloader/tests/ckan_setup.py @@ -1,5 +1,5 @@ try: - from ckan.tests.pytest_ckan.ckan_setup import * + from ckan.tests.pytest_ckan.ckan_setup import * # noqa except ImportError: import pkg_resources from paste.deploy import loadapp diff --git a/ckanext/xloader/tests/fixtures.py b/ckanext/xloader/tests/fixtures.py index f43916ab..9a7ad37f 100644 --- a/ckanext/xloader/tests/fixtures.py +++ b/ckanext/xloader/tests/fixtures.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -import sqlalchemy -import sqlalchemy.orm as orm +from sqlalchemy import orm import os from ckanext.datastore.tests import helpers as datastore_helpers @@ -11,7 +10,7 @@ ) try: - from ckan.tests.pytest_ckan.fixtures import * + from ckan.tests.pytest_ckan.fixtures import * # noqa except ImportError: import pytest diff --git a/ckanext/xloader/tests/samples/date_formats.csv b/ckanext/xloader/tests/samples/date_formats.csv new file mode 100644 index 00000000..a3ab1a2a --- /dev/null +++ b/ckanext/xloader/tests/samples/date_formats.csv @@ -0,0 +1,5 @@ +date,temperature,place +2011-01-02,-1,Galway +01-03-2011,0.5,Galway +2011.01.02,5,Berkeley +11-01-03,6,Berkeley diff --git a/ckanext/xloader/tests/samples/mixed_numeric_string_sample.csv b/ckanext/xloader/tests/samples/mixed_numeric_string_sample.csv new file mode 100644 index 00000000..9d076602 --- /dev/null +++ b/ckanext/xloader/tests/samples/mixed_numeric_string_sample.csv @@ -0,0 +1,3 @@ +Funding agency,Program title,Maximum (indicative) grant amount +DTIS,Accessible Tourism Infrastructure Grants,Five hundred thousand dollars +DTIS,Boosting Accessible Tourism Experiences Grants,5000 diff --git a/ckanext/xloader/tests/samples/sample_with_blanks.csv b/ckanext/xloader/tests/samples/sample_with_blanks.csv new file mode 100644 index 00000000..b53b25db --- /dev/null +++ b/ckanext/xloader/tests/samples/sample_with_blanks.csv @@ -0,0 +1,4 @@ +Funding agency,Program title,Opening date,Service ID +DTIS,Visitor First Experiences Fund,23/03/2023,63039 +DTIS,First Nations Sport and Recreation Program Round 2,22/03/2023,63040 +,,,63041 diff --git a/ckanext/xloader/tests/test_action.py b/ckanext/xloader/tests/test_action.py index 3e551276..71f4ad01 100644 --- a/ckanext/xloader/tests/test_action.py +++ b/ckanext/xloader/tests/test_action.py @@ -1,5 +1,8 @@ import pytest -import mock +try: + from unittest import mock +except ImportError: + import mock from ckan.tests import helpers, factories diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index 9d811ab8..e819dad9 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -1,737 +1,131 @@ -from __future__ import absolute_import -import os -import json -import random -import datetime -import time -import six - -try: - from collections import OrderedDict # from python 2.7 -except ImportError: - from sqlalchemy.util import OrderedDict import pytest +import io -from nose.tools import make_decorator -import mock -import responses -from sqlalchemy import MetaData, Table -from sqlalchemy.sql import select - -import ckan.plugins as p +from datetime import datetime -try: - config = p.toolkit.config -except AttributeError: - from pylons import config - -from ckanext.xloader import jobs -from ckanext.xloader import db as jobs_db -from ckanext.xloader.loader import get_write_engine +from requests import Response +from ckan.cli.cli import ckan +from ckan.plugins import toolkit from ckan.tests import helpers, factories -SOURCE_URL = "http://www.example.com/static/file" - - -def mock_actions(func): - """ - Decorator that mocks actions used by these tests - Based on ckan.test.helpers.mock_action - """ - - def wrapper(*args, **kwargs): - # Mock CKAN's resource_show API - from ckan.logic import get_action as original_get_action - - def side_effect(called_action_name): - if called_action_name == "resource_show": - - def mock_resource_show(context, data_dict): - return { - "id": data_dict["id"], - "name": "short name", - "url": SOURCE_URL, - "format": "", - "package_id": "test-pkg", - } - - return mock_resource_show - elif called_action_name == "package_show": - - def mock_package_show(context, data_dict): - return { - "id": data_dict["id"], - "name": "pkg-name", - } - - return mock_package_show - else: - return original_get_action(called_action_name) - - try: - with mock.patch( - "ckanext.xloader.jobs.get_action" - ) as mock_get_action: - mock_get_action.side_effect = side_effect - - return_value = func(*args, **kwargs) - finally: - pass - # Make sure to stop the mock, even with an exception - # mock_action.stop() - return return_value - - return make_decorator(func)(wrapper) - - -@pytest.mark.skip -@pytest.mark.usefixtures("with_plugins") -@pytest.mark.ckan_config("ckan.plugins", "datastore xloader") -class TestxloaderDataIntoDatastore(object): - - @pytest.fixture(autouse=True) - def setup_class(self): - self.host = "www.ckan.org" - self.api_key = "my-fake-key" - self.resource_id = "foo-bar-42" - factories.Resource(id=self.resource_id) - jobs_db.init(config, echo=False) - # drop test table - engine, conn = self.get_datastore_engine_and_connection() - conn.execute('DROP TABLE IF EXISTS "{}"'.format(self.resource_id)) - yield - if "_datastore" in dir(self): - connection = self._datastore[1] - connection.close() - - def register_urls( - self, filename="simple.csv", content_type="application/csv" - ): - """Mock some test URLs with responses. - - Mocks some URLs related to a data file and a CKAN resource that - contains the data file, including the URL of the data file itself and - the resource_show, resource_update and datastore_delete URLs. - - :returns: a 2-tuple containing the URL of the data file itself and the - resource_show URL for the resource that contains the data file - - """ - responses.add_passthru(config["solr_url"]) - - # A URL that just returns a static file - responses.add( - responses.GET, - SOURCE_URL, - body=get_sample_file(filename), - content_type=content_type, - ) - - # A URL that mocks the response that CKAN's resource_update API would - # give after successfully updating a resource. - resource_update_url = ( - "http://www.ckan.org/api/3/action/resource_update" - ) - responses.add( - responses.POST, - resource_update_url, - body=json.dumps({"success": True}), - content_type="application/json", - ) - - # A URL that mock's the response that CKAN's datastore plugin's - # datastore_delete API would give after successfully deleting a - # resource from the datastore. - datastore_del_url = "http://www.ckan.org/api/3/action/datastore_delete" - responses.add( - responses.POST, - datastore_del_url, - body=json.dumps({"success": True}), - content_type="application/json", - ) - - self.callback_url = "http://www.ckan.org/api/3/action/xloader_hook" - responses.add( - responses.POST, - self.callback_url, - body=json.dumps({"success": True}), - content_type="application/json", - ) - - @classmethod - def get_datastore_engine_and_connection(cls): - if "_datastore" not in dir(cls): - engine = get_write_engine() - conn = engine.connect() - cls._datastore = (engine, conn) - return cls._datastore - - def get_datastore_table(self): - engine, conn = self.get_datastore_engine_and_connection() - meta = MetaData(bind=engine) - table = Table( - self.resource_id, meta, autoload=True, autoload_with=engine - ) - s = select([table]) - with conn.begin(): - result = conn.execute(s) - return dict( - num_rows=result.rowcount, - headers=list(result.keys()), - header_dict=OrderedDict( - [(c.key, six.text_type(c.type)) for c in table.columns] - ), - rows=result.fetchall(), - ) - - def get_load_logs(self, task_id): - conn = jobs_db.ENGINE.connect() - logs = jobs_db.LOGS_TABLE - result = conn.execute( - select([logs.c.level, logs.c.message]).where( - logs.c.job_id == task_id - ) - ) - return Logs(result.fetchall()) - - def get_time_of_last_analyze(self): - # When ANALYZE runs it appears to take a moment for the - # pg_stat_user_tables to update, which we use to check analyze runs, - # so sadly we need a sleep :( - # DR: 0.25 is pretty reliable on my machine, but give a wide margin - time.sleep(1) - engine, conn = self.get_datastore_engine_and_connection() - result = conn.execute( - """ - SELECT last_analyze, last_autoanalyze - FROM pg_stat_user_tables - WHERE relname='{}'; - """.format( - self.resource_id - ) - ) - last_analyze_datetimes = result.fetchall()[0] - return max([x for x in last_analyze_datetimes if x] or [None]) - - @mock_actions - @responses.activate - def test_simple_csv(self): - # Test not only the load and xloader_hook is called at the end - self.register_urls(filename="simple.csv") - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch( - "ckanext.xloader.jobs.set_resource_metadata" - ) as mocked_set_resource_metadata: - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"datastore_contains_all_records_of_source_file": True, - u"datastore_active": True, - u"ckan_url": u"http://www.ckan.org/", - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - # Check the load - data = self.get_datastore_table() - assert data["headers"] == [ - "_id", - "_full_text", - "date", - "temperature", - "place", - ] - assert data["header_dict"]["date"] == "TEXT" - # 'TIMESTAMP WITHOUT TIME ZONE') - assert data["header_dict"]["temperature"] == "TEXT" # 'NUMERIC') - assert data["header_dict"]["place"] == "TEXT" # 'TEXT') - assert data["num_rows"] == 6 - assert data["rows"][0][2:] == (u"2011-01-01", u"1", u"Galway") - # (datetime.datetime(2011, 1, 1), 1, 'Galway')) - - # Check it wanted to set the datastore_active=True - mocked_set_resource_metadata.assert_called_once() - assert mocked_set_resource_metadata.call_args[1]["update_dict"] == { - "datastore_contains_all_records_of_source_file": True, - "datastore_active": True, - "ckan_url": "http://www.ckan.org/", - "resource_id": "foo-bar-42", - } - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job["status"] == u"complete" - assert job["error"] is None - - # Check ANALYZE was run - last_analyze = self.get_time_of_last_analyze() - assert last_analyze - - @mock_actions - @responses.activate - @mock.patch("ckanext.xloader.jobs.MAX_CONTENT_LENGTH", 10000) - @mock.patch("ckanext.xloader.jobs.MAX_EXCERPT_LINES", 100) - def test_too_large_csv(self): - - # Test not only the load and xloader_hook is called at the end - self.register_urls(filename="simple-large.csv") - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch( - "ckanext.xloader.jobs.set_resource_metadata" - ) as mocked_set_resource_metadata: - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"datastore_contains_all_records_of_source_file": False, - u"datastore_active": True, - u"ckan_url": u"http://www.ckan.org/", - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - # Check the load - data = self.get_datastore_table() - assert data["headers"] == ["_id", "_full_text", "id", "text"] - assert data["header_dict"]["id"] == "TEXT" - # 'TIMESTAMP WITHOUT TIME ZONE') - assert data["header_dict"]["text"] == "TEXT" - assert data["num_rows"] <= 100 - assert data["num_rows"] > 0 - assert data["rows"][0][2:] == (u"1", u"a") - - # Check it wanted to set the datastore_active=True - mocked_set_resource_metadata.assert_called_once() - assert mocked_set_resource_metadata.call_args[1]["update_dict"] == { - "datastore_contains_all_records_of_source_file": False, - "datastore_active": True, - "ckan_url": "http://www.ckan.org/", - "resource_id": "foo-bar-42", - } - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job["status"] == u"complete" - assert job["error"] is None - - # Check ANALYZE was run - last_analyze = self.get_time_of_last_analyze() - assert last_analyze - - @mock_actions - @responses.activate - @mock.patch("ckanext.xloader.jobs.MAX_CONTENT_LENGTH", 10000) - @mock.patch("ckanext.xloader.jobs.MAX_EXCERPT_LINES", 100) - def test_too_large_xls(self): - - # Test not only the load and xloader_hook is called at the end - self.register_urls(filename="simple-large.xls") - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch("ckanext.xloader.jobs.set_resource_metadata"): - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is not None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"error", job_dict - assert job_dict == { - u"status": u"error", - u"metadata": { - u"ckan_url": u"http://www.ckan.org/", - u"datastore_contains_all_records_of_source_file": False, - u"resource_id": u"foo-bar-42", - }, - u"error": u"Loading file raised an error: array index out of range", - } - - job = jobs_db.get_job(job_id) - assert job["status"] == u"error" - assert job["error"] == { - u"message": u"Loading file raised an error: array index out of range" - } - - @mock_actions - @responses.activate - def test_messytables(self): - # xloader's COPY can't handle xls, so it will be dealt with by - # messytables - self.register_urls( - filename="simple.xls", content_type="application/vnd.ms-excel" - ) - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch( - "ckanext.xloader.jobs.set_resource_metadata" - ) as mocked_set_resource_metadata: - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"datastore_contains_all_records_of_source_file": True, - u"datastore_active": True, - u"ckan_url": u"http://www.ckan.org/", - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - # Check the load - data = self.get_datastore_table() - assert data["headers"] == [ - "_id", - "_full_text", - "date", - "temperature", - "place", - ] - assert data["header_dict"]["date"] == "TIMESTAMP WITHOUT TIME ZONE" - assert data["header_dict"]["temperature"] == "NUMERIC" - assert data["header_dict"]["place"] == "TEXT" - assert data["num_rows"] == 6 - assert data["rows"][0][2:] == ( - datetime.datetime(2011, 1, 1), - 1, - u"Galway", - ) - - # Check it wanted to set the datastore_active=True - mocked_set_resource_metadata.assert_called_once() - assert mocked_set_resource_metadata.call_args[1]["update_dict"] == { - "ckan_url": "http://www.ckan.org/", - "datastore_contains_all_records_of_source_file": True, - "datastore_active": True, - "resource_id": "foo-bar-42", - } - - # check logs have the error doing the COPY - logs = self.get_load_logs(job_id) - copy_error_index = None - for i, log in enumerate(logs): - if log[0] == "WARNING" and log[1].startswith( - "Load using COPY failed: Error during the load into PostgreSQL" - ): - copy_error_index = i - break - assert copy_error_index, "Missing COPY error" - - # check messytable portion of the logs - logs = Logs(logs[copy_error_index + 1:]) - assert logs[0] == (u"INFO", u"Trying again with messytables") - logs.assert_no_errors() - - # Check ANALYZE was run - last_analyze = self.get_time_of_last_analyze() - assert last_analyze - - @mock_actions - @responses.activate - def test_umlaut_and_extra_comma(self): - self.register_urls(filename="umlaut_and_extra_comma.csv") - # This csv has an extra comma which causes the COPY to throw a - # psycopg2.DataError and the umlaut can cause problems for logging the - # error. We need to check that it correctly reverts to using - # messytables to load it - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch("ckanext.xloader.jobs.set_resource_metadata"): - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"datastore_contains_all_records_of_source_file": True, - u"datastore_active": True, - u"ckan_url": u"http://www.ckan.org/", - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job["status"] == u"complete" - assert job["error"] is None - - @mock_actions - @responses.activate - def test_invalid_byte_sequence(self): - self.register_urls(filename='go-realtime.xlsx') - # This xlsx throws an Postgres error on INSERT because of - # 'invalid byte sequence for encoding "UTF8": 0x00' which causes - # the COPY to throw a psycopg2.DataError and umlauts in the file can - # cause problems for logging the error. We need to check that - # it correctly reverts to using messytables to load it - data = { - 'api_key': self.api_key, - 'job_type': 'xloader_to_datastore', - 'result_url': self.callback_url, - 'metadata': { - 'ckan_url': 'http://%s/' % self.host, - 'resource_id': self.resource_id - } - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch('ckanext.xloader.jobs.set_datastore_active_flag'): - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert responses.calls[-1].request.url == \ - 'http://www.ckan.org/api/3/action/xloader_hook' - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict['status'] == u'complete', job_dict - assert job_dict == \ - {u'metadata': {u'ckan_url': u'http://www.ckan.org/', - u'resource_id': u'foo-bar-42'}, - u'status': u'complete'} - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job['status'] == u'complete' - assert job['error'] is None - - @mock_actions - @responses.activate - def test_first_request_is_202_pending_response(self): - # when you first get the CSV it returns this 202 response, which is - # what this server does: https://data-cdfw.opendata.arcgis.com/datasets - responses.add( - responses.GET, - SOURCE_URL, - status=202, - body='{"processingTime":"8.716 seconds","status":"Processing","generating":{}}', - content_type="application/json", - ) - # subsequent GETs of the CSV work fine - self.register_urls() - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch( - "ckanext.xloader.jobs.set_resource_metadata" - ) as mocked_set_resource_metadata: - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"ckan_url": u"http://www.ckan.org/", - u"datastore_contains_all_records_of_source_file": True, - u"datastore_active": True, - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - # Check the load - data = self.get_datastore_table() - assert data["headers"] == [ - "_id", - "_full_text", - "date", - "temperature", - "place", - ] - assert data["header_dict"]["date"] == "TEXT" - # 'TIMESTAMP WITHOUT TIME ZONE') - assert data["header_dict"]["temperature"] == "TEXT" # 'NUMERIC') - assert data["header_dict"]["place"] == "TEXT" # 'TEXT') - assert data["num_rows"] == 6 - assert data["rows"][0][2:] == (u"2011-01-01", u"1", u"Galway") - # (datetime.datetime(2011, 1, 1), 1, 'Galway')) - - # Check it wanted to set the datastore_active=True - mocked_set_resource_metadata.assert_called_once() - assert mocked_set_resource_metadata.call_args[1]["update_dict"] == { - "datastore_contains_all_records_of_source_file": True, - "datastore_active": True, - "ckan_url": "http://www.ckan.org/", - "resource_id": "foo-bar-42", - } - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job["status"] == u"complete" - assert job["error"] is None - - -class Logs(list): - def get_errors(self): - return [message for level, message in self if level == "ERROR"] - - def grep(self, text): - return [message for level, message in self if text in message] - - def assert_no_errors(self): - errors = self.get_errors() - assert not errors, errors - - -def get_sample_file(filename): - filepath = os.path.join(os.path.dirname(__file__), "samples", filename) - return open(filepath).read() - +from unittest import mock +from ckanext.xloader import jobs +from ckanext.xloader.utils import get_xloader_user_apitoken + + +_TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10" + + +def get_response(download_url, headers): + """Mock jobs.get_response() method.""" + resp = Response() + resp.raw = io.BytesIO(_TEST_FILE_CONTENT.encode()) + resp.headers = headers + return resp + + +def get_large_response(download_url, headers): + """Mock jobs.get_response() method to fake a large file.""" + resp = Response() + resp.raw = io.BytesIO(_TEST_FILE_CONTENT.encode()) + resp.headers = {'content-length': 2000000000} + return resp + + +@pytest.fixture +def apikey(): + if toolkit.check_ckan_version(min_version="2.10"): + sysadmin = factories.SysadminWithToken() + else: + # To provide support with CKAN 2.9 + sysadmin = factories.Sysadmin() + sysadmin["token"] = get_xloader_user_apitoken() + + return sysadmin["token"] + + +@pytest.fixture +def data(create_with_upload, apikey): + dataset = factories.Dataset() + resource = create_with_upload( + _TEST_FILE_CONTENT, + "multiplication_2.csv", + url="http://data", + package_id=dataset["id"] + ) + callback_url = toolkit.url_for( + "api.action", ver=3, logic_function="xloader_hook", qualified=True + ) + return { + 'api_key': apikey, + 'job_type': 'xloader_to_datastore', + 'result_url': callback_url, + 'metadata': { + 'ignore_hash': True, + 'ckan_url': toolkit.config.get('ckan.site_url'), + 'resource_id': resource["id"], + 'set_url_type': False, + 'task_created': datetime.utcnow().isoformat(), + 'original_url': resource["url"], + } + } + + +@pytest.mark.usefixtures("clean_db", "with_plugins") +class TestXLoaderJobs(helpers.FunctionalRQTestBase): + + def test_xloader_data_into_datastore(self, cli, data): + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "File hash: d44fa65eda3675e11710682fdb5f1648" in stdout + assert "Fields: [{'id': 'x', 'type': 'text'}, {'id': 'y', 'type': 'text'}]" in stdout + assert "Copying to database..." in stdout + assert "Creating search index..." in stdout + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + def test_xloader_ignore_hash(self, cli, data): + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Copying to database..." in stdout + assert "Express Load completed" in stdout + + data["metadata"]["ignore_hash"] = False + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Ignoring resource - the file hash hasn't changed" in stdout + + def test_data_too_big_error_if_content_length_bigger_than_config(self, cli, data): + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_large_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Data too large to load into Datastore:" in stdout + + def test_data_max_excerpt_lines_config(self, cli, data): + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_large_response): + with mock.patch("ckanext.xloader.jobs.MAX_EXCERPT_LINES", 1): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Loading excerpt of ~1 lines to DataStore." in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] is False + + +@pytest.mark.usefixtures("clean_db") class TestSetResourceMetadata(object): - @classmethod - def setup_class(cls): - helpers.reset_db() - def test_simple(self): resource = factories.Resource() @@ -745,15 +139,6 @@ def test_simple(self): ) resource = helpers.call_action("resource_show", id=resource["id"]) - from pprint import pprint - - pprint(resource) - assert resource["datastore_contains_all_records_of_source_file"] in ( - True, - u"True", - ) - # I'm not quite sure why this is a string on travis - I get the bool - # locally - + assert resource["datastore_contains_all_records_of_source_file"] assert resource["datastore_active"] assert resource["ckan_url"] == "http://www.ckan.org/" diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index 776b1fb6..68452d11 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import os import pytest +import six import sqlalchemy.orm as orm import datetime import logging @@ -611,22 +612,29 @@ def test_german(self, Session): u"tsvector", ] + [u"text"] * (len(records[0]) - 1) - def test_integer_header_xlsx(self): - # this xlsx file's header is detected by messytables.headers_guess as - # integers and we should cope with that - csv_filepath = get_sample_filepath("go-realtime.xlsx") - resource_id = factories.Resource()["id"] - try: - loader.load_csv( - csv_filepath, - resource_id=resource_id, - mimetype="CSV", - logger=logger, - ) - except (LoaderError, UnicodeDecodeError): - pass - else: - assert 0, "There should have been an exception" + def test_with_blanks(self, Session): + csv_filepath = get_sample_filepath("sample_with_blanks.csv") + resource_id = "test1" + factories.Resource(id=resource_id) + loader.load_csv( + csv_filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + assert len(self._get_records(Session, "test1")) == 3 + + def test_with_mixed_types(self, Session): + csv_filepath = get_sample_filepath("mixed_numeric_string_sample.csv") + resource_id = "test1" + factories.Resource(id=resource_id) + loader.load_csv( + csv_filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + assert len(self._get_records(Session, "test1")) == 2 def test_reload(self, Session): csv_filepath = get_sample_filepath("simple.csv") @@ -800,7 +808,12 @@ def test_geojson(self): in str(exception.value) ) - def test_shapefile_zip(self): + @pytest.mark.skipif( + six.PY3, + reason="In Python 3, tabulator will unzip archives and load the first " + "file found (if possible)." + ) + def test_shapefile_zip_python2(self): filepath = get_sample_filepath("polling_locations.shapefile.zip") resource_id = "test1" factories.Resource(id=resource_id) @@ -812,8 +825,34 @@ def test_shapefile_zip(self): logger=logger, ) + @pytest.mark.skipif( + six.PY2, + reason="In Python 2, tabulator will not load a zipped archive, so the " + "loader will raise a LoaderError." + ) + def test_shapefile_zip_python3(self, Session): + # tabulator unzips the archive and tries to load the first file it + # finds, 'Polling_Locations.cpg'. This file only contains the + # following data: `UTF-8`. + filepath = get_sample_filepath("polling_locations.shapefile.zip") + resource_id = "test1" + factories.Resource(id=resource_id) + loader.load_csv( + filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + + assert self._get_records(Session, "test1") == [] + assert self._get_column_names(Session, "test1") == [ + '_id', + '_full_text', + 'UTF-8' + ] + -class TestLoadMessytables(TestLoadBase): +class TestLoadTabulator(TestLoadBase): def test_simple(self, Session): csv_filepath = get_sample_filepath("simple.xls") resource_id = "test1" diff --git a/ckanext/xloader/tests/test_parser.py b/ckanext/xloader/tests/test_parser.py new file mode 100644 index 00000000..67929d9f --- /dev/null +++ b/ckanext/xloader/tests/test_parser.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +import os +import pytest + +from decimal import Decimal +from datetime import datetime + +from tabulator import Stream +from ckanext.xloader.parser import XloaderCSVParser + +csv_filepath = os.path.abspath( + os.path.join(os.path.dirname(__file__), "samples", "date_formats.csv") +) + + +class TestParser(object): + def test_simple(self): + with Stream(csv_filepath, format='csv', + custom_parsers={'csv': XloaderCSVParser}) as stream: + assert stream.sample == [ + [ + 'date', + 'temperature', + 'place' + ], + [ + datetime(2011, 1, 2, 0, 0), + Decimal('-1'), + 'Galway' + ], + [ + datetime(2011, 1, 3, 0, 0), + Decimal('0.5'), + 'Galway' + ], + [ + datetime(2011, 1, 2, 0, 0), + Decimal('5'), + 'Berkeley' + ], + [ + datetime(2003, 11, 1, 0, 0), + Decimal('6'), + 'Berkeley' + ], + ] + + @pytest.mark.ckan_config("ckanext.xloader.parse_dates_dayfirst", True) + def test_dayfirst(self): + print('test_dayfirst') + with Stream(csv_filepath, format='csv', + custom_parsers={'csv': XloaderCSVParser}) as stream: + assert stream.sample == [ + [ + 'date', + 'temperature', + 'place' + ], + [ + datetime(2011, 1, 2, 0, 0), + Decimal('-1'), + 'Galway' + ], + [ + datetime(2011, 3, 1, 0, 0), + Decimal('0.5'), + 'Galway' + ], + [ + datetime(2011, 2, 1, 0, 0), + Decimal('5'), + 'Berkeley' + ], + [ + datetime(2003, 1, 11, 0, 0), + Decimal('6'), + 'Berkeley' + ], + ] + + @pytest.mark.ckan_config("ckanext.xloader.parse_dates_yearfirst", True) + def test_yearfirst(self): + print('test_yearfirst') + with Stream(csv_filepath, format='csv', + custom_parsers={'csv': XloaderCSVParser}) as stream: + assert stream.sample == [ + [ + 'date', + 'temperature', + 'place' + ], + [ + datetime(2011, 1, 2, 0, 0), + Decimal('-1'), + 'Galway' + ], + [ + datetime(2011, 1, 3, 0, 0), + Decimal('0.5'), + 'Galway' + ], + [ + datetime(2011, 1, 2, 0, 0), + Decimal('5'), + 'Berkeley' + ], + [ + datetime(2011, 1, 3, 0, 0), + Decimal('6'), + 'Berkeley' + ], + ] + + @pytest.mark.ckan_config("ckanext.xloader.parse_dates_dayfirst", True) + @pytest.mark.ckan_config("ckanext.xloader.parse_dates_yearfirst", True) + def test_yearfirst_dayfirst(self): + with Stream(csv_filepath, format='csv', + custom_parsers={'csv': XloaderCSVParser}) as stream: + assert stream.sample == [ + [ + 'date', + 'temperature', + 'place' + ], + [ + datetime(2011, 1, 2, 0, 0), + Decimal('-1'), + 'Galway' + ], + [ + datetime(2011, 3, 1, 0, 0), + Decimal('0.5'), + 'Galway' + ], + [ + datetime(2011, 2, 1, 0, 0), + Decimal('5'), + 'Berkeley' + ], + [ + datetime(2011, 3, 1, 0, 0), + Decimal('6'), + 'Berkeley' + ], + ] diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index db470e77..05b83b5b 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -2,7 +2,10 @@ import datetime import pytest -import mock +try: + from unittest import mock +except ImportError: + import mock from six import text_type as str from ckan.tests import helpers, factories from ckan.logic import _actions diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index db17a6ab..79facbea 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -1,9 +1,13 @@ # encoding: utf-8 import json +import datetime from ckan import model from ckan.lib import search +from collections import defaultdict +from decimal import Decimal + import ckan.plugins as p @@ -84,12 +88,6 @@ def set_resource_metadata(update_dict): extras.update(update_dict) q.update({'extras': extras}, synchronize_session=False) - # TODO: Remove resource_revision_table when dropping support for 2.8 - if hasattr(model, 'resource_revision_table'): - model.Session.query(model.resource_revision_table).filter( - model.ResourceRevision.id == update_dict['resource_id'], - model.ResourceRevision.current is True - ).update({'extras': extras}, synchronize_session=False) model.Session.commit() # get package with updated resource from solr @@ -110,3 +108,107 @@ def set_resource_metadata(update_dict): resource.update(update_dict) psi.index_package(solr_data_dict) break + + +def column_count_modal(rows): + """ Return the modal value of columns in the row_set's + sample. This can be assumed to be the number of columns + of the table. + + Copied from messytables. + """ + counts = defaultdict(int) + for row in rows: + length = len([c for c in row if c != '']) + if length > 1: + counts[length] += 1 + if not len(counts): + return 0 + return max(list(counts.items()), key=lambda k_v: k_v[1])[0] + + +def headers_guess(rows, tolerance=1): + """ Guess the offset and names of the headers of the row set. + This will attempt to locate the first row within ``tolerance`` + of the mode of the number of rows in the row set sample. + + The return value is a tuple of the offset of the header row + and the names of the columns. + + Copied from messytables. + """ + rows = list(rows) + modal = column_count_modal(rows) + for i, row in enumerate(rows): + length = len([c for c in row if c != '']) + if length >= modal - tolerance: + # TODO: use type guessing to check that this row has + # strings and does not conform to the type schema of + # the table. + return i, row + return 0, [] + + +TYPES = [int, bool, str, datetime.datetime, float, Decimal] + + +def type_guess(rows, types=TYPES, strict=False): + """ The type guesser aggregates the number of successful + conversions of each column to each type, weights them by a + fixed type priority and select the most probable type for + each column based on that figure. It returns a list of + ``CellType``. Empty cells are ignored. + + Strict means that a type will not be guessed + if parsing fails for a single cell in the column.""" + guesses = [] + if strict: + at_least_one_value = [] + for ri, row in enumerate(rows): + diff = len(row) - len(guesses) + for _ in range(diff): + typesdict = {} + for type in types: + typesdict[type] = 0 + guesses.append(typesdict) + at_least_one_value.append(False) + for ci, cell in enumerate(row): + if not cell: + continue + for type in list(guesses[ci].keys()): + if not isinstance(cell, type): + guesses[ci].pop(type) + at_least_one_value[ci] = True if guesses[ci] else False + # no need to set guessing weights before this + # because we only accept a type if it never fails + for i, guess in enumerate(guesses): + for type in guess: + guesses[i][type] = 1 + # in case there were no values at all in the column, + # we just set the guessed type to string + for i, v in enumerate(at_least_one_value): + if not v: + guesses[i] = {str: 1} + else: + for i, row in enumerate(rows): + diff = len(row) - len(guesses) + for _ in range(diff): + guesses.append(defaultdict(int)) + for i, cell in enumerate(row): + # add string guess so that we have at least one guess + guesses[i][str] = guesses[i].get(str, 1) + if not cell: + continue + for type in types: + if isinstance(cell, type): + guesses[i][type] += 1 + _columns = [] + _columns = [] + for guess in guesses: + # this first creates an array of tuples because we want the types to be + # sorted. Even though it is not specified, python chooses the first + # element in case of a tie + # See: http://stackoverflow.com/a/6783101/214950 + guesses_tuples = [(t, guess[t]) for t in types if t in guess] + _columns.append(max(guesses_tuples, key=lambda t_n: t_n[1])[0]) + return _columns diff --git a/full_text_function.sql b/full_text_function.sql deleted file mode 100644 index 8a604258..00000000 --- a/full_text_function.sql +++ /dev/null @@ -1,16 +0,0 @@ --- _full_text fields are now updated by a trigger when set to NULL --- copied from https://github.com/ckan/ckan/pull/3786/files#diff-33d20faeb53559a9b8940bcb418cb5b4R75 -CREATE OR REPLACE FUNCTION populate_full_text_trigger() RETURNS trigger -AS $body$ - BEGIN - IF NEW._full_text IS NOT NULL THEN - RETURN NEW; - END IF; - NEW._full_text := ( - SELECT to_tsvector(string_agg(value, ' ')) - FROM json_each_text(row_to_json(NEW.*)) - WHERE key NOT LIKE '\_%'); - RETURN NEW; - END; -$body$ LANGUAGE plpgsql; -ALTER FUNCTION populate_full_text_trigger() OWNER TO ckan_default; diff --git a/requirements.txt b/requirements.txt index 6488e848..58540beb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ ckantoolkit -messytables==0.15.2 requests[security]>=2.11.1 six>=1.12.0 +tabulator==1.53.5 Unidecode==1.0.22 +python-dateutil>=2.8.2 diff --git a/setup.py b/setup.py index f5acb6e9..6bdefb0b 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # http://packaging.python.org/en/latest/tutorial.html#version - version='0.11.0', + version='1.0.1', description='Express Loader - quickly load data into CKAN DataStore''', long_description=long_description, @@ -38,12 +38,11 @@ # Pick your license as you wish (should match "license" above) 'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)', - # Specify the Python versions you support here. In particular, ensure - # that you indicate whether you support Python 2, Python 3 or both. - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.6', + # Specify the Python versions you support here. 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', ], @@ -85,10 +84,6 @@ [babel.extractors] ckan = ckan.lib.extract:extract_ckan - [paste.paster_command] - xloader = ckanext.xloader.paster:xloaderCommand - migrate_types = ckanext.xloader.paster:MigrateTypesCommand - ''', # If you are changing from the default layout of your extension, you may diff --git a/test.ini b/test.ini index 1415d37f..7bfab684 100644 --- a/test.ini +++ b/test.ini @@ -15,7 +15,7 @@ use = config:../ckan/test-core.ini # Insert any custom config settings to be used when running your extension's # tests here. -ckan.plugins = xloader +ckan.plugins = xloader datastore ckanext.xloader.jobs_db.uri = sqlite:////tmp/jobs.db # Logging configuration