Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort _metadata by sky-ordering #168

Merged
merged 7 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/testing-and-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10']
python-version: ['3.9', '3.10']

steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ classifiers = [
]
dynamic = ["version"]

requires-python = ">=3.8"
requires-python = ">=3.9"
dependencies = [
"astropy",
"fsspec<=2023.9.2", # Used for abstract filesystems
Expand Down
18 changes: 10 additions & 8 deletions src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
batches = [
pa.RecordBatch.from_arrays(
[[pixel.order], [pixel.dir], [pixel.pixel]],
names=[
self.METADATA_ORDER_COLUMN_NAME,
self.METADATA_DIR_COLUMN_NAME,
self.METADATA_PIXEL_COLUMN_NAME,
],
)
[
pa.RecordBatch.from_arrays(
[[pixel.order], [pixel.dir], [pixel.pixel]],
names=[
self.METADATA_ORDER_COLUMN_NAME,
self.METADATA_DIR_COLUMN_NAME,
self.METADATA_PIXEL_COLUMN_NAME,
],
)
]
for pixel in self.get_healpix_pixels()
]

Expand Down
66 changes: 60 additions & 6 deletions src/hipscat/io/parquet_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import tempfile
from typing import List

import numpy as np
import pyarrow as pa
import pyarrow.dataset as pds
import pyarrow.parquet as pq

from hipscat.io import file_io, paths
from hipscat.io.file_io.file_pointer import get_fs, strip_leading_slash_for_pyarrow
from hipscat.pixel_math.healpix_pixel import INVALID_PIXEL, HealpixPixel
from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort


def row_group_stat_single_value(row_group, stat_key: str):
Expand All @@ -30,7 +33,43 @@ def row_group_stat_single_value(row_group, stat_key: str):
return min_val


def write_parquet_metadata(catalog_path: str, storage_options: dict = None, output_path: str = None):
def get_healpix_pixel_from_metadata(metadata: pq.FileMetaData) -> HealpixPixel:
"""Get the healpix pixel according to a parquet file's metadata.

This is determined by the value of Norder and Npix in the table's data

Args:
metadata (pyarrow.parquet.FileMetaData): full metadata for a single file.

Returns:
Healpix pixel representing the Norder and Npix from the first row group.
"""
if metadata.num_row_groups <= 0 or metadata.num_columns <= 0:
raise ValueError("metadata is for empty table")
order = -1
pixel = -1
first_row_group = metadata.row_group(0)
for i in range(0, first_row_group.num_columns):
column = first_row_group.column(i)
if column.path_in_schema == "Norder":
if column.statistics.min != column.statistics.max:
raise ValueError(
f"Norder stat min != max ({column.statistics.min} != {column.statistics.max})"
)
order = column.statistics.min
elif column.path_in_schema == "Npix":
if column.statistics.min != column.statistics.max:
raise ValueError(f"Npix stat min != max ({column.statistics.min} != {column.statistics.max})")
pixel = column.statistics.min

if order == -1 or pixel == -1:
raise ValueError("Metadata missing Norder or Npix column")
return HealpixPixel(order, pixel)


def write_parquet_metadata(
catalog_path: str, order_by_healpix=True, storage_options: dict = None, output_path: str = None
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
):
"""Generate parquet metadata, using the already-partitioned parquet files
for this catalog.

Expand All @@ -39,6 +78,8 @@ def write_parquet_metadata(catalog_path: str, storage_options: dict = None, outp

Args:
catalog_path (str): base path for the catalog
order_by_healpix (bool): use False if the dataset is not to be reordered by
breadth-first healpix pixel (e.g. secondary indexes)
storage_options: dictionary that contains abstract filesystem credentials
output_path (str): base path for writing out metadata files
defaults to `catalog_path` if unspecified
Expand All @@ -56,6 +97,8 @@ def write_parquet_metadata(catalog_path: str, storage_options: dict = None, outp
exclude_invalid_files=True,
)
metadata_collector = []
# Collect the healpix pixels so we can sort before writing.
healpix_pixels = []

for hips_file in dataset.files:
hips_file_pointer = file_io.get_file_pointer_from_path(hips_file, include_protocol=catalog_path)
Expand All @@ -64,11 +107,21 @@ def write_parquet_metadata(catalog_path: str, storage_options: dict = None, outp
# Users must set the file path of each chunk before combining the metadata.
relative_path = hips_file[len(catalog_path) :]
single_metadata.set_file_path(relative_path)

if order_by_healpix:
healpix_pixel = paths.get_healpix_from_path(relative_path)
if healpix_pixel == INVALID_PIXEL:
healpix_pixel = get_healpix_pixel_from_metadata(single_metadata)

healpix_pixels.append(healpix_pixel)
metadata_collector.append(single_metadata)

## Write out the two metadata files
if output_path is None:
output_path = catalog_path
if order_by_healpix:
argsort = get_pixel_argsort(healpix_pixels)
metadata_collector = np.array(metadata_collector)[argsort]
catalog_base_dir = file_io.get_file_pointer_from_path(output_path)
metadata_file_pointer = paths.get_parquet_metadata_pointer(catalog_base_dir)
common_metadata_file_pointer = paths.get_common_metadata_pointer(catalog_base_dir)
Expand All @@ -86,23 +139,24 @@ def write_parquet_metadata(catalog_path: str, storage_options: dict = None, outp


def write_parquet_metadata_for_batches(
batches: List[pa.RecordBatch], output_path: str = None, storage_options: dict = None
batches: List[List[pa.RecordBatch]], output_path: str = None, storage_options: dict = None
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
):
"""Write parquet metadata files for some pyarrow table batches.
This writes the batches to a temporary parquet dataset using local storage, and
generates the metadata for the partitioned catalog parquet files.

Args:
batches (List[pa.RecordBatch]): create one batch per group of data (partition or row group)
batches (List[List[pa.RecordBatch]]): create one row group per RecordBatch, grouped
into tables by the inner list.
output_path (str): base path for writing out metadata files
defaults to `catalog_path` if unspecified
storage_options: dictionary that contains abstract filesystem credentials
"""

temp_info_table = pa.Table.from_batches(batches)

with tempfile.TemporaryDirectory() as temp_pq_file:
pq.write_to_dataset(temp_info_table, temp_pq_file)
for batch_list in batches:
temp_info_table = pa.Table.from_batches(batch_list)
pq.write_to_dataset(temp_info_table, temp_pq_file)
write_parquet_metadata(temp_pq_file, storage_options=storage_options, output_path=output_path)


Expand Down
24 changes: 24 additions & 0 deletions src/hipscat/io/paths.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Methods for creating partitioned data paths"""
from __future__ import annotations

import re

from hipscat.io.file_io.file_pointer import FilePointer, append_paths_to_pointer
from hipscat.pixel_math.healpix_pixel import INVALID_PIXEL, HealpixPixel

ORDER_DIRECTORY_PREFIX = "Norder"
DIR_DIRECTORY_PREFIX = "Dir"
Expand Down Expand Up @@ -59,6 +62,27 @@ def pixel_directory(
)


def get_healpix_from_path(path: str) -> HealpixPixel:
"""Find the `pixel_order` and `pixel_number` from a string like the following::

Norder=<pixel_order>/Dir=<directory number>/Npix=<pixel_number>.parquet

NB: This expects the format generated by the `pixel_catalog_file` method

Args:
path (str): path to parse

Returns:
Constructed HealpixPixel object representing the pixel in the path.
`INVALID_PIXEL` if the path doesn't match the expected pattern for any reason.
"""
match = re.match(r".*Norder=(\d*).*Npix=(\d*).*", path)
if not match:
return INVALID_PIXEL
order, pixel = match.groups()
return HealpixPixel(int(order), int(pixel))


def pixel_catalog_file(catalog_base_dir: FilePointer, pixel_order: int, pixel_number: int) -> FilePointer:
"""Create path *pointer* for a pixel catalog file. This will not create the directory or file.

Expand Down
3 changes: 3 additions & 0 deletions src/hipscat/pixel_math/healpix_pixel.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,6 @@ def dir(self) -> int:
(pixel_number/10000)*10000
"""
return int(self.pixel / 10_000) * 10_000


INVALID_PIXEL = HealpixPixel(-1, -1)
27 changes: 27 additions & 0 deletions src/hipscat/pixel_math/healpix_pixel_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import List

import numpy as np

from hipscat.pixel_math.healpix_pixel import HealpixPixel


def get_pixel_argsort(pixels: List[HealpixPixel]):
"""Perform an indirect sort of a list of HealpixPixels.

This will order the pixels according to a breadth-first traversal of the healpix
pixel hierarchy, not merely by increasing order by Norder/Npix (depth-first ordering).
This is similar to ordering fully by _hipscat_index.

Args:
pixels (List[HealpixPixel]): array to sort

Returns:
array of indices that sort the pixels in breadth-first order.
"""
# Construct a parallel list of exploded, high order pixels.
highest_order = np.max(pixels).order

constant_breadth_pixel = [pixel.pixel * (4 ** (highest_order - pixel.order)) for pixel in pixels]

# Get the argsort of the higher order array.
return np.argsort(constant_breadth_pixel, kind="stable")
15 changes: 15 additions & 0 deletions tests/hipscat/catalog/test_partition_info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests of partition info functionality"""
import os

import numpy.testing as npt
import pytest

from hipscat.catalog import PartitionInfo
Expand Down Expand Up @@ -78,4 +79,18 @@ def test_write_to_file(tmp_path, small_sky_pixels):

new_partition_info = PartitionInfo.read_from_csv(partition_info_pointer)

# We're not using parquet metadata, so we don't force a re-sorting.
assert partition_info.get_healpix_pixels() == new_partition_info.get_healpix_pixels()


def test_write_to_file_sorted(tmp_path, pixel_list_depth_first, pixel_list_breadth_first):
"""Write out the partition info to file and make sure that it's sorted by breadth-first healpix,
even though the original pixel list is in Norder-major sorting (depth-first)."""
partition_info = PartitionInfo.from_healpix(pixel_list_depth_first)
npt.assert_array_equal(pixel_list_depth_first, partition_info.get_healpix_pixels())
partition_info.write_to_metadata_files(tmp_path)

partition_info_pointer = paths.get_parquet_metadata_pointer(tmp_path)
new_partition_info = PartitionInfo.read_from_file(partition_info_pointer)

npt.assert_array_equal(pixel_list_breadth_first, new_partition_info.get_healpix_pixels())
51 changes: 51 additions & 0 deletions tests/hipscat/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,54 @@ def small_sky_order1_pixels():
HealpixPixel(1, 46),
HealpixPixel(1, 47),
]


@pytest.fixture
def pixel_list_depth_first():
"""A list of pixels that are sorted by Norder (depth-first)"""
# pylint: disable=duplicate-code
return [
HealpixPixel(0, 10),
HealpixPixel(1, 33),
HealpixPixel(1, 35),
HealpixPixel(1, 44),
HealpixPixel(1, 45),
HealpixPixel(1, 46),
HealpixPixel(2, 128),
HealpixPixel(2, 130),
HealpixPixel(2, 131),
]


@pytest.fixture
def pixel_list_breadth_first():
"""The same pixels in the above `pixel_list_depth_first` list, but
in breadth-first traversal order by the healpix pixel hierarchy.

See tree for illustration (brackets indicate inner node)::

.
├── <0,8>
│ ├── <1,32>
│ │ ├── 2,128
│ │ ├── 2,130
│ │ └── 2,131
│ ├── 1,33
│ └── 1,35
├── 0,10
└── <0,11>
├── 1,44
├── 1,45
└── 1,46
"""
return [
HealpixPixel(2, 128),
HealpixPixel(2, 130),
HealpixPixel(2, 131),
HealpixPixel(1, 33),
HealpixPixel(1, 35),
HealpixPixel(0, 10),
HealpixPixel(1, 44),
HealpixPixel(1, 45),
HealpixPixel(1, 46),
]
Loading