Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add verification tests #428

Merged
merged 16 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 45 additions & 37 deletions src/hats_import/verification/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,51 @@
from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional

from hats import read_hats
from hats.catalog import Catalog
from hats.io.validation import is_valid_catalog
import hats.io.paths
from hats.io import file_io
from upath import UPath

from hats_import.runtime_arguments import RuntimeArguments


@dataclass
class VerificationArguments(RuntimeArguments):
"""Data class for holding verification arguments"""

## Input
input_catalog_path: str | Path | UPath | None = None
"""Path to an existing catalog that will be inspected."""
input_catalog: Optional[Catalog] = None
"""In-memory representation of a catalog. If not provided, it will be loaded
from the input_catalog_path."""

## Verification options
field_distribution_cols: List[str] = field(default_factory=list)
"""List of fields to get the overall distribution for. e.g. ["ra", "dec"].
Should be valid columns in the parquet files."""

def __post_init__(self):
self._check_arguments()

def _check_arguments(self):
super()._check_arguments()
if not self.input_catalog_path and not self.input_catalog:
raise ValueError("input catalog is required (either input_catalog_path or input_catalog)")
if not self.input_catalog:
if not is_valid_catalog(self.input_catalog_path):
raise ValueError("input_catalog_path not a valid catalog")
self.input_catalog = read_hats(catalog_path=self.input_catalog_path)
if not self.input_catalog_path:
self.input_catalog_path = self.input_catalog.catalog_path

@dataclass(kw_only=True)
class VerificationArguments:
"""Container for verification arguments."""

input_catalog_path: UPath = field()
"""Path to an existing catalog that will be inspected. This must be a directory
containing (at least) the hats ancillary files and a 'dataset/' directory
containing the parquet dataset. Can be supplied as a string or path object."""
output_path: UPath = field()
"""Directory where the verification report should be written.
Can be supplied as a string or path object."""
output_filename: str = field(default="verifier_results.csv")
"""Filename for the verification report."""
truth_total_rows: int | None = field(default=None)
"""Total number of rows expected in this catalog."""
truth_schema: UPath | None = field(default=None)
"""Path to a parquet file or dataset containing the expected schema. If None (default),
the catalog's _common_metadata file will be used. This schema will be used to verify
all non-hats columns and (optionally) the file-level metadata. Can be supplied as a
string or path object."""

@property
def input_dataset_path(self) -> UPath:
"""Path to the directory under `input_catalog_path` that contains the parquet dataset."""
return file_io.append_paths_to_pointer(self.input_catalog_path, hats.io.paths.DATASET_DIR)

@property
def output_file_path(self) -> UPath:
"""Path to the output file (`output_path` / `output_filename`)."""
return file_io.append_paths_to_pointer(self.output_path, self.output_filename)

def __post_init__(self) -> None:
self.input_catalog_path = file_io.get_upath(self.input_catalog_path)
if not self.input_catalog_path.is_dir():
raise ValueError("input_catalog_path must be an existing directory")

self.output_path = file_io.get_upath(self.output_path)

if self.truth_schema is not None:
self.truth_schema = file_io.append_paths_to_pointer(self.truth_schema)
if not self.truth_schema.exists():
raise ValueError("truth_schema must be an existing file or directory")
Loading
Loading