Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add verification tests #379

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 43 additions & 42 deletions src/hipscat_import/verification/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,51 @@

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional

from hipscat.catalog import Catalog
from hipscat.io.validation import is_valid_catalog
import attrs
from upath import UPath

from hipscat_import.runtime_arguments import RuntimeArguments


@dataclass
class VerificationArguments(RuntimeArguments):
"""Data class for holding verification arguments"""

## Input
input_catalog_path: str | Path | UPath | None = None
"""Path to an existing catalog that will be inspected."""
input_catalog: Optional[Catalog] = None
"""In-memory representation of a catalog. If not provided, it will be loaded
from the input_catalog_path."""

## Verification options
field_distribution_cols: List[str] = field(default_factory=list)
"""List of fields to get the overall distribution for. e.g. ["ra", "dec"].
Should be valid columns in the parquet files."""

def __post_init__(self):
self._check_arguments()

def _check_arguments(self):
super()._check_arguments()
if not self.input_catalog_path and not self.input_catalog:
raise ValueError("input catalog is required (either input_catalog_path or input_catalog)")
if not self.input_catalog:
if not is_valid_catalog(self.input_catalog_path):
raise ValueError("input_catalog_path not a valid catalog")
self.input_catalog = Catalog.read_from_hipscat(catalog_path=self.input_catalog_path)
if not self.input_catalog_path:
self.input_catalog_path = self.input_catalog.catalog_path

# from hipscat_import.runtime_arguments import RuntimeArguments


def _dir_exists(instance: VerificationArguments, attribute: attrs.Attribute, value: UPath):
"""This function will be used as a validator for attributes of VerificationArguments."""
if not value.is_dir():
raise ValueError(f"{attribute.name} must be an existing directory")


def _path_exists(instance: VerificationArguments, attribute: attrs.Attribute, value: UPath):
"""This function will be used as a validator for attributes of VerificationArguments."""
if not value.exists():
raise ValueError(f"{attribute.name} must be an existing file or directory")


@attrs.define(kw_only=True)
class VerificationArguments:
"""Container for verification arguments."""

input_catalog_path: str | Path | UPath = attrs.field(converter=UPath, validator=_dir_exists)
"""Path to an existing catalog that will be inspected. This must be a directory
containing the Parquet dataset and metadata sidecars."""
output_path: str | Path | UPath = attrs.field(converter=UPath)
"""Base path where output files should be written."""
output_report_filename: str = attrs.field(factory=lambda: "verifier_results.csv")
"""Filename for the verification report that will be generated."""
output_distributions_filename: str = attrs.field(factory=lambda: "field_distributions.csv")
"""Filename for the field distributions that will be calculated."""
truth_total_rows: int | None = attrs.field(default=None)
"""Total number of rows expected in this catalog."""
truth_schema: str | Path | UPath | None = attrs.field(
default=None,
converter=attrs.converters.optional(UPath),
validator=attrs.validators.optional(_path_exists),
)
"""Path to a Parquet file or dataset containing the expected schema.
If you provided the 'use_schema_file' argument when importing the catalog, use the same value here.
If not provided, the catalog's _common_metadata file will be used as the source of truth.
"""

# [FIXME] Connect this with RuntimeArguments.provenance_info. Even then, does this ever get written to file?
def additional_runtime_provenance_info(self) -> dict:
return {
"pipeline": "verification pipeline",
"input_catalog_path": self.input_catalog_path,
"field_distribution_cols": self.field_distribution_cols,
}
return {"pipeline": "verification pipeline", **{k: str(v) for k, v in vars(self).items()}}
Loading