Skip to content

Commit

Permalink
New memory profiling tools, tests, and some improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
akoumjian committed Dec 23, 2023
1 parent 6aa8758 commit c9f97f3
Show file tree
Hide file tree
Showing 40 changed files with 1,077 additions and 342 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.github
.volumes
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,6 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

.volumes/*
.docker_bash_history.txt
51 changes: 35 additions & 16 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,52 @@ SHELL ["/bin/bash", "-c"]

# Update system dependencies
RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y curl gfortran git liblapack-dev make pip python3.11 python-is-python3 unzip
&& apt-get upgrade -y \
&& apt-get install -y curl gfortran git liblapack-dev make pip python3.11 python-is-python3 unzip wget

# Upgrade pip to the latest version and install pre-commit
RUN pip install --upgrade pip pre-commit
RUN pip install --upgrade cython==0.29.36 setuptools setuptools_scm hatch
RUN chmod 777 /opt

# Download openorb data files and set the environment variable
RUN curl -fL -o /tmp/oorb_data.zip \
"https://github.com/B612-Asteroid-Institute/oorb/releases/download/v1.2.1a1.dev2/oorb_data.zip"
RUN unzip -d /opt/oorb_data /tmp/oorb_data.zip
ENV OORB_DATA=/opt/oorb_data
# Install numpy
RUN git clone https://github.com/numpy/numpy.git /opt/numpy
RUN cd /opt/numpy && git checkout v1.24.4 && git submodule update --init
RUN cd /opt/numpy && python3 setup.py build --cpu-baseline=native install

# Update OBSCODE.dat
RUN cd $OORB_DATA \
&& curl https://www.minorplanetcenter.net/iau/lists/ObsCodes.html -o ObsCodes.html \
&& sed -e '2d' ObsCodes.html | grep -v "<" > OBSCODE.dat \
&& rm -f ObsCodes.html

# Install openorb
# TODO: We need a more robust way to be appropriately natively compiled pyoorb installed
# including data file generation
RUN git clone https://github.com/B612-Asteroid-Institute/oorb.git /opt/oorb
RUN cd /opt/oorb && git checkout fork
RUN cd /opt/oorb && ./configure gfortran opt --with-pyoorb --with-f2py=/usr/local/bin/f2py --with-python=python3
# Add '-march=native' to compiler options by running a sed
# script directly on the Makefile.includse file. This is a
# hack to get around the fact that the configure script
# doesn't support this option.
RUN sed -i 's/FCOPTIONS = .*/FCOPTIONS = $(FCOPTIONS_OPT_GFORTRAN) -march=native/g' /opt/oorb/Makefile.include
# --no-build-isolation is needed because we need to ensure we use
# the same version of numpy as the one we compiled previously so
# that it matches the version of f2py we passed in to ./configure.
RUN pip install --no-build-isolation -v /opt/oorb

# Generate the data files
RUN cd /opt/oorb && make ephem
RUN cd /opt/oorb/data && ./getBC430
RUN cd /opt/oorb/data && ./updateOBSCODE
ENV OORBROOT=/opt/oorb
ENV OORB_DATA=/opt/oorb/data

# Install pre-commit hooks (before THOR is installed to cache this step)
RUN mkdir /code/
COPY .pre-commit-config.yaml /code/
WORKDIR /code/
RUN git init . \
&& git add .pre-commit-config.yaml \
&& pre-commit install-hooks \
&& rm -rf .git
&& git add .pre-commit-config.yaml \
&& pre-commit install-hooks \
&& rm -rf .git

# Install THOR
ADD . /code/
RUN SETUPTOOLS_SCM_PRETEND_VERSION=1 pip install -e .[tests]
RUN SETUPTOOLS_SCM_PRETEND_VERSION=1 pip install -e .[tests,dev]
7 changes: 6 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ services:
context: .
dockerfile: Dockerfile
volumes:
- ".:/code"
- ".ipython:/root/.ipython/"
- '.:/code'
- ".docker_bash_history.txt:/root/.bash_history"
- ".volumes:/opt/volumes/"
tmpfs:
- /dev/shm:size=8g
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = ["setuptools>=45", "wheel", "setuptools_scm[toml]>=6.0"]
requires = ["setuptools>=66", "wheel", "setuptools_scm[toml]>=6.0"]
build-backend = "setuptools.build_meta"

[tool.setuptools_scm]
Expand Down
13 changes: 10 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ packages =
thor
include_package_data = True
setup_requires =
setuptools >= 45
setuptools >= 66
wheel
setuptools_scm >= 6.0
install_requires =
adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@main#egg=adam_core
adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@e6173ed402fd2293c682d773f9d17644a7baefcb#egg=adam_core
astropy >= 5.3.1
astroquery
difi
Expand All @@ -54,9 +54,15 @@ install_requires =
tests =
pytest
pytest-benchmark
pytest-memray
pytest-cov
pre-commit

dev =
ipython
memray


[options.package_data]
thor =
data/*.yaml
Expand All @@ -67,9 +73,10 @@ test=pytest

[tool:pytest]
python_functions = test_*
addopts = -m "not integration"
addopts = -m "not (integration or memory)"
markers =
integration: Mark a test as an integration test.
memory: Mark a test as a memory test.

[isort]
profile = black
Expand Down
160 changes: 84 additions & 76 deletions thor/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,73 +36,74 @@
logger = logging.getLogger(__name__)


def drop_duplicate_clusters(
clusters: "Clusters",
cluster_members: "ClusterMembers",
) -> Tuple["Clusters", "ClusterMembers"]:
"""
Drop clusters that have identical sets of observation IDs.
Parameters
----------
cluster_members: `~thor.clusters.ClusterMembers`
A table of cluster members.
Returns
-------
`~thor.clusters.Clusters`
A table of clusters with duplicate clusters removed.
"""
# Sort by cluster_id and obs_id
clusters = clusters.sort_by([("cluster_id", "ascending")])
cluster_members = cluster_members.sort_by(
[("cluster_id", "ascending"), ("obs_id", "ascending")]
)

# Group by cluster_id and aggregate a list of distinct obs_ids
grouped_by_cluster_id = cluster_members.table.group_by(
["cluster_id"], use_threads=False
).aggregate([("obs_id", "distinct")])
obs_ids_per_cluster = grouped_by_cluster_id["obs_id_distinct"].to_pylist()

# Group by with a distinct aggregation is not guaranteed to preserve the order of the elements within each list
# but does preserve the order of the lists themselves. So we sort each list of obs_ids and while we are
# sorting we also convert the lists to a single string on which we can group later.
# Pyarrow currently does not support groupby on lists of strings, this is likely a missing feature.
# As an example, the following code doesn't work:
# grouped_by_obs_lists = grouped_by_cluster_id.group_by(
# ["obs_id_distinct"],
# use_threads=False
# ).aggregate([("index", "first")])
for i, obs_ids_i in enumerate(obs_ids_per_cluster):
obs_ids_i.sort()
obs_ids_per_cluster[i] = "".join(obs_ids_i)

squashed_obs_ids = pa.table(
{
"index": pa.array(np.arange(0, len(obs_ids_per_cluster))),
"obs_ids": obs_ids_per_cluster,
}
)
indices = (
squashed_obs_ids.group_by(["obs_ids"], use_threads=False)
.aggregate([("index", "first")])["index_first"]
.combine_chunks()
)

clusters = clusters.take(indices)
cluster_members = cluster_members.apply_mask(
pc.is_in(cluster_members.cluster_id, clusters.cluster_id)
)
return clusters, cluster_members


class Clusters(qv.Table):
cluster_id = qv.StringColumn(default=lambda: uuid.uuid4().hex)
vtheta_x = qv.Float64Column()
vtheta_y = qv.Float64Column()
arc_length = qv.Float64Column()
num_obs = qv.Int64Column()

def drop_duplicates(
self,
cluster_members: "ClusterMembers",
) -> Tuple["Clusters", "ClusterMembers"]:
"""
Drop clusters that have identical sets of observation IDs.
Parameters
----------
cluster_members: `~thor.clusters.ClusterMembers`
A table of cluster members.
Returns
-------
`~thor.clusters.Clusters`
A table of clusters with duplicate clusters removed.
"""
# Sort by cluster_id and obs_id
clusters_sorted = self.sort_by([("cluster_id", "ascending")])
cluster_members_sorted = cluster_members.sort_by(
[("cluster_id", "ascending"), ("obs_id", "ascending")]
)

# Group by cluster_id and aggregate a list of distinct obs_ids
grouped_by_cluster_id = cluster_members_sorted.table.group_by(
["cluster_id"], use_threads=False
).aggregate([("obs_id", "distinct")])
obs_ids_per_cluster = grouped_by_cluster_id["obs_id_distinct"].to_pylist()

# Group by with a distinct aggregation is not guaranteed to preserve the order of the elements within each list
# but does preserve the order of the lists themselves. So we sort each list of obs_ids and while we are
# sorting we also convert the lists to a single string on which we can group later.
# Pyarrow currently does not support groupby on lists of strings, this is likely a missing feature.
# As an example, the following code doesn't work:
# grouped_by_obs_lists = grouped_by_cluster_id.group_by(
# ["obs_id_distinct"],
# use_threads=False
# ).aggregate([("index", "first")])
for i, obs_ids_i in enumerate(obs_ids_per_cluster):
obs_ids_i.sort()
obs_ids_per_cluster[i] = "".join(obs_ids_i)

squashed_obs_ids = pa.table(
{
"index": pa.array(np.arange(0, len(obs_ids_per_cluster))),
"obs_ids": obs_ids_per_cluster,
}
)
indices = (
squashed_obs_ids.group_by(["obs_ids"], use_threads=False)
.aggregate([("index", "first")])["index_first"]
.combine_chunks()
)

filtered = clusters_sorted.take(indices)
filtered_cluster_members = cluster_members_sorted.apply_mask(
pc.is_in(cluster_members_sorted.cluster_id, filtered.cluster_id)
)
return filtered, filtered_cluster_members


class ClusterMembers(qv.Table):
cluster_id = qv.StringColumn()
Expand Down Expand Up @@ -571,8 +572,8 @@ def cluster_velocity_worker(
for that batch.
"""
clusters_list = []
cluster_members_list = []
clusters = Clusters.empty()
cluster_members = ClusterMembers.empty()
for vx_i, vy_i in zip(vx, vy):
clusters_i, cluster_members_i = cluster_velocity(
obs_ids,
Expand All @@ -586,10 +587,13 @@ def cluster_velocity_worker(
min_arc_length=min_arc_length,
alg=alg,
)
clusters_list.append(clusters_i)
cluster_members_list.append(cluster_members_i)
clusters = qv.concatenate([clusters, clusters_i])
clusters = qv.defragment(clusters)

cluster_members = qv.concatenate([cluster_members, cluster_members_i])
cluster_members = qv.defragment(cluster_members)

return qv.concatenate(clusters_list), qv.concatenate(cluster_members_list)
return clusters, cluster_members


cluster_velocity_remote = ray.remote(cluster_velocity_worker)
Expand Down Expand Up @@ -726,8 +730,9 @@ def cluster_and_link(
)
return Clusters.empty(), ClusterMembers.empty()

clusters_list = []
cluster_members_list = []
clusters = Clusters.empty()
cluster_members = ClusterMembers.empty()

# Extract useful quantities
obs_ids = observations.id.to_numpy(zero_copy_only=False)
theta_x = observations.coordinates.theta_x.to_numpy(zero_copy_only=False)
Expand All @@ -752,7 +757,6 @@ def cluster_and_link(
# TODO: transformed detections are already in the object store so we might
# want to instead pass references to those rather than extract arrays
# from them and put them in the object store again.

futures = []
for vxi_chunk, vyi_chunk in zip(
_iterate_chunks(vxx, chunk_size), _iterate_chunks(vyy, chunk_size)
Expand All @@ -774,9 +778,12 @@ def cluster_and_link(

while futures:
finished, futures = ray.wait(futures, num_returns=1)
result = ray.get(finished[0])
clusters_list.append(result[0])
cluster_members_list.append(result[1])
clusters_chunk, cluster_members_chunk = ray.get(finished[0])
clusters = qv.concatenate([clusters, clusters_chunk])
clusters = qv.defragment(clusters)

cluster_members = qv.concatenate([cluster_members, cluster_members_chunk])
cluster_members = qv.defragment(cluster_members)

ray.internal.free(refs_to_free)
logger.info(f"Removed {len(refs_to_free)} references from the object store.")
Expand All @@ -797,17 +804,18 @@ def cluster_and_link(
min_arc_length=min_arc_length,
alg=alg,
)
clusters_list.append(clusters_i)
cluster_members_list.append(cluster_members_i)

clusters = qv.concatenate(clusters_list)
cluster_members = qv.concatenate(cluster_members_list)
clusters = qv.concatenate([clusters, clusters_i])
clusters = qv.defragment(clusters)

cluster_members = qv.concatenate([cluster_members, cluster_members_i])
cluster_members = qv.defragment(cluster_members)

# Drop duplicate clusters
time_start_drop = time.perf_counter()
logger.info("Removing duplicate clusters...")
num_clusters = len(clusters)
clusters, cluster_members = clusters.drop_duplicates(cluster_members)
clusters, cluster_members = drop_duplicate_clusters(clusters, cluster_members)
logger.info(f"Removed {num_clusters - len(clusters)} duplicate clusters.")
time_end_drop = time.perf_counter()
logger.info(
Expand Down
2 changes: 1 addition & 1 deletion thor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def link_test_orbit(
raise ValueError(f"Unknown propagator: {config.propagator}")

use_ray = initialize_use_ray(
num_cpus=config.max_processes, object_store_bytes=config.ray_memory_bytes
num_cpus=config.max_processes, object_store_bytes=config.ray_memory_bytes or None
)

refs_to_free = []
Expand Down
Loading

0 comments on commit c9f97f3

Please sign in to comment.