Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get core Dask functionality from cfdm #836

Merged
merged 31 commits into from
Jan 15, 2025
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
test_Data passes
  • Loading branch information
davidhassell committed Oct 30, 2024
commit 9f2c748b0bcf1a28e41acd0e5486794295924370
89 changes: 46 additions & 43 deletions cf/data/array/h5netcdfarray.py
Original file line number Diff line number Diff line change
@@ -2,13 +2,18 @@
import cfdm

from ...mixin_container import Container
from .locks import netcdf_lock
from .mixin import ActiveStorageMixin, ArrayMixin, FileArrayMixin, IndexMixin

# from .locks import netcdf_lock
from .mixin import ( # , IndexMixin
ActiveStorageMixin,
ArrayMixin,
FileArrayMixin,
)


class H5netcdfArray(
ActiveStorageMixin,
IndexMixin,
# IndexMixin,
FileArrayMixin,
ArrayMixin,
Container,
@@ -25,60 +30,58 @@ class H5netcdfArray(

"""

def __dask_tokenize__(self):
"""Return a value fully representative of the object.
# def __dask_tokenize__(self):
# """Return a value fully representative of the object.

.. versionadded:: NEXTVERSION
# .. versionadded:: NEXTVERSION

"""
return super().__dask_tokenize__() + (self.get_mask(),)
# """
# return super().__dask_tokenize__() + (self.get_mask(),)

@property
def _lock(self):
"""Set the lock for use in `dask.array.from_array`.
# @property
# def _lock(self):
# """Set the lock for use in `dask.array.from_array`.

Returns a lock object because concurrent reads are not
currently supported by the HDF5 library. The lock object will
be the same for all `NetCDF4Array` and `H5netcdfArray`
instances, regardless of the dataset they access, which means
that access to all netCDF and HDF files coordinates around the
same lock.
# Returns a lock object because concurrent reads are not
# currently supported by the HDF5 library. The lock object will
# be the same for all `NetCDF4Array` and `H5netcdfArray`
# instances, regardless of the dataset they access, which means
# that access to all netCDF and HDF files coordinates around the
# same lock.

.. versionadded:: NEXTVERSION
# .. versionadded:: NEXTVERSION

"""
return netcdf_lock
# """
# return netcdf_lock

# REVIEW: h5: `_get_array`: Ignore this for h5 review
# REVIEW: getitem: `_get_array`: new method to convert subspace to numpy array.
def _get_array(self, index=None):
"""Returns a subspace of the dataset variable.
# def _get_array(self, index=None):
# """Returns a subspace of the dataset variable.

.. versionadded:: NEXTVERSION
# .. versionadded:: NEXTVERSION

.. seealso:: `__array__`, `index`
# .. seealso:: `__array__`, `index`

:Parameters:
# :Parameters:

{{index: `tuple` or `None`, optional}}
# {{index: `tuple` or `None`, optional}}

:Returns:
# :Returns:

`numpy.ndarray`
The subspace.
# `numpy.ndarray`
# The subspace.

"""
if index is None:
index = self.index()
# """
# if index is None:
# index = self.index()

# We need to lock because the netCDF file is about to be accessed.
self._lock.acquire()
# # We need to lock because the netCDF file is about to be accessed.
# self._lock.acquire()

# It's cfdm.H5netcdfArray.__getitem__ that we want to
# call here, but we use 'Container' in super because
# that comes immediately before cfdm.H5netcdfArray in
# the method resolution order.
array = super(Container, self).__getitem__(index)
# # It's cfdm.H5netcdfArray.__getitem__ that we want to
# # call here, but we use 'Container' in super because
# # that comes immediately before cfdm.H5netcdfArray in
# # the method resolution order.
# array = super(Container, self).__getitem__(index)

self._lock.release()
return array
# self._lock.release()
# return array
3 changes: 1 addition & 2 deletions cf/data/array/mixin/cfamixin.py
Original file line number Diff line number Diff line change
@@ -3,8 +3,7 @@
from itertools import accumulate, product

import numpy as np

from ...utils import chunk_locations, chunk_positions
from cfdm.data.utils import chunk_locations, chunk_positions


class CFAMixin:
3 changes: 1 addition & 2 deletions cf/data/array/mixin/compressedarraymixin.py
Original file line number Diff line number Diff line change
@@ -76,12 +76,11 @@ def to_dask_array(self, chunks="auto"):
from functools import partial

import dask.array as da
from cfdm.data.utils import normalize_chunks
from dask import config
from dask.array.core import getter
from dask.base import tokenize

from ...utils import normalize_chunks

name = (f"{self.__class__.__name__}-{tokenize(self)}",)

dtype = self.dtype
121 changes: 61 additions & 60 deletions cf/data/array/netcdf4array.py
Original file line number Diff line number Diff line change
@@ -2,13 +2,14 @@
import cfdm

from ...mixin_container import Container
from .locks import netcdf_lock

# from .locks import netcdf_lock
from .mixin import ActiveStorageMixin, ArrayMixin, FileArrayMixin, IndexMixin


class NetCDF4Array(
ActiveStorageMixin,
IndexMixin,
# IndexMixin,
FileArrayMixin,
ArrayMixin,
Container,
@@ -23,61 +24,61 @@ class NetCDF4Array(

"""

def __dask_tokenize__(self):
"""Return a value fully representative of the object.

.. versionadded:: 3.15.0

"""
return super().__dask_tokenize__() + (self.get_mask(),)

@property
def _lock(self):
"""Set the lock for use in `dask.array.from_array`.

Returns a lock object because concurrent reads are not
currently supported by the netCDF and HDF libraries. The lock
object will be the same for all `NetCDF4Array` and
`H5netcdfArray` instances, regardless of the dataset they
access, which means that access to all netCDF and HDF files
coordinates around the same lock.

.. versionadded:: 3.14.0

"""
return netcdf_lock

# REVIEW: getitem: `_get_array`: Ignore this for h5 review
# REVIEW: getitem: `_get_array`: new method to convert subspace to numpy array
def _get_array(self, index=None):
"""Returns a subspace of the dataset variable.

.. versionadded:: NEXTVERSION

.. seealso:: `__array__`, `index`

:Parameters:

{{index: `tuple` or `None`, optional}}

:Returns:

`numpy.ndarray`
The subspace.

"""
if index is None:
index = self.index()

# Note: We need to lock because the netCDF file is about to be
# accessed.
self._lock.acquire()

# Note: It's cfdm.NetCDFArray.__getitem__ that we want to call
# here, but we use 'Container' in super because that
# comes immediately before cfdm.NetCDFArray in the
# method resolution order.
array = super(Container, self).__getitem__(index)

self._lock.release()
return array
# def __dask_tokenize__(self):
# """Return a value fully representative of the object.
#
# .. versionadded:: 3.15.0
#
# """
# return super().__dask_tokenize__() + (self.get_mask(),)


#
# @property
# def _lock(self):
# """Set the lock for use in `dask.array.from_array`.
#
# Returns a lock object because concurrent reads are not
# currently supported by the netCDF and HDF libraries. The lock
# object will be the same for all `NetCDF4Array` and
# `H5netcdfArray` instances, regardless of the dataset they
# access, which means that access to all netCDF and HDF files
# coordinates around the same lock.
#
# .. versionadded:: 3.14.0
#
# """
# return netcdf_lock
#
# def _get_array(self, index=None):
# """Returns a subspace of the dataset variable.
#
# .. versionadded:: NEXTVERSION
#
# .. seealso:: `__array__`, `index`
#
# :Parameters:
#
# {{index: `tuple` or `None`, optional}}
#
# :Returns:
#
# `numpy.ndarray`
# The subspace.
#
# """
# if index is None:
# index = self.index()
#
# # Note: We need to lock because the netCDF file is about to be
# # accessed.
# self._lock.acquire()
#
# # Note: It's cfdm.NetCDFArray.__getitem__ that we want to call
# # here, but we use 'Container' in super because that
# # comes immediately before cfdm.NetCDFArray in the
# # method resolution order.
# array = super(Container, self).__getitem__(index)
#
# self._lock.release()
# return array
5 changes: 0 additions & 5 deletions cf/data/array/umarray.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@ class UMArray(
):
"""A sub-array stored in a PP or UM fields file."""

# REVIEW: h5: `__init__`: replace units/calendar API with attributes
def __init__(
self,
filename=None,
@@ -171,7 +170,6 @@ def __init__(
# By default, close the UM file after data array access
self._set_component("close", True, copy=False)

# REVIEW: getitem: `_get_array`: new method to convert subspace to numpy array
def _get_array(self, index=None):
"""Returns a subspace of the dataset variable.

@@ -272,7 +270,6 @@ def _get_rec(self, f, header_offset):
# if r.hdr_offset == header_offset:
# return r

# REVIEW: getitem: `_set_FillValue`: record _FillValue in attributes
def _set_FillValue(self, int_hdr, real_hdr, attributes):
"""Set the ``_FillValue`` attribute.

@@ -367,10 +364,8 @@ def _set_units(self, int_hdr, attributes):
units = units0
break

# REVIEW: getitem: `_set_units`: record units in attributes
attributes["units"] = units

# REVIEW: getitem: `_set_unpack`: record unpack in attributes
def _set_unpack(self, int_hdr, real_hdr, attributes):
"""Set the ``add_offset`` and ``scale_factor`` attributes.

1 change: 0 additions & 1 deletion cf/data/collapse/collapse_active.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# REVIEW: active: `collapse_active.py`: new module for active storage functionality
import datetime
import logging
import time
Loading