Skip to content

Commit

Permalink
Merge pull request #610 from RandomDefaultUser/openpmd_shuffling_and_…
Browse files Browse the repository at this point in the history
…ldos

Quickfixing OpenPMD Shuffling
  • Loading branch information
RandomDefaultUser authored Nov 21, 2024
2 parents 01320fa + 3d4ee9e commit a402f79
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 123 deletions.
111 changes: 52 additions & 59 deletions mala/datahandling/data_shuffler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ def __shuffle_numpy(
# if the number of new snapshots is not a divisor of the grid size
# then we have to trim the original snapshots to size
# the indicies to be removed are selected at random
if self.data_points_to_remove is not None:
if (
self.data_points_to_remove is not None
and np.sum(self.data_points_to_remove) > 0
):
if self.parameters.shuffling_seed is not None:
np.random.seed(idx * self.parameters.shuffling_seed)
ngrid = (
Expand Down Expand Up @@ -548,82 +551,72 @@ def shuffle_snapshots(
self.data_points_to_remove = None
if number_of_shuffled_snapshots is None:
number_of_shuffled_snapshots = self.nr_snapshots
number_of_new_snapshots = number_of_shuffled_snapshots

if snapshot_type == "openpmd":
import math
import functools

specified_number_of_new_snapshots = number_of_new_snapshots
number_of_new_snapshots = functools.reduce(
math.gcd,
# Currently, the openPMD interface is not feature-complete.
if snapshot_type == "openpmd" and np.any(
np.array(
[
snapshot.grid_dimension[0]
snapshot.grid_dimension[0] % number_of_shuffled_snapshots
for snapshot in self.parameters.snapshot_directories_list
],
number_of_new_snapshots,
]
)
!= 0
):
raise ValueError(
"Shuffling from OpenPMD files currently only "
"supported if first dimension of all snapshots "
"can evenly be divided by number of snapshots. "
"Please select a different number of shuffled "
"snapshots or use the numpy interface. "
)
if number_of_new_snapshots != specified_number_of_new_snapshots:
print(
f"[openPMD shuffling] Reduced the number of output snapshots to "
f"{number_of_new_snapshots} because of the dataset dimensions."
)
del specified_number_of_new_snapshots
elif snapshot_type == "numpy":
# Implement all of the below for OpenPMD later.
# We need to check if we need to reduce the overall grid size
# because the individual snapshots may not contain enough data
# points
shuffled_gridsizes = snapshot_size_list // number_of_new_snapshots

if np.any(
np.array(snapshot_size_list)
- (
(np.array(snapshot_size_list) // number_of_new_snapshots)
* number_of_new_snapshots
)
> 0
):
number_of_data_points = int(
np.sum(shuffled_gridsizes) * number_of_new_snapshots
)

self.data_points_to_remove = []
for i in range(0, self.nr_snapshots):
self.data_points_to_remove.append(
snapshot_size_list[i]
- shuffled_gridsizes[i] * number_of_new_snapshots
)
tot_points_missing = sum(self.data_points_to_remove)

if tot_points_missing > 0:
printout(
"Warning: number of requested snapshots is not a divisor of",
"the original grid sizes.\n",
f"{tot_points_missing} / {number_of_data_points} data points",
"will be left out of the shuffled snapshots.",
)
shuffled_gridsizes = snapshot_size_list // number_of_shuffled_snapshots

else:
raise Exception("Invalid snapshot type.")
if np.any(
np.array(snapshot_size_list)
- (
(np.array(snapshot_size_list) // number_of_shuffled_snapshots)
* number_of_shuffled_snapshots
)
> 0
):
number_of_data_points = int(
np.sum(shuffled_gridsizes) * number_of_shuffled_snapshots
)

self.data_points_to_remove = []
for i in range(0, self.nr_snapshots):
self.data_points_to_remove.append(
snapshot_size_list[i]
- shuffled_gridsizes[i] * number_of_shuffled_snapshots
)
tot_points_missing = sum(self.data_points_to_remove)

if tot_points_missing > 0:
printout(
"Warning: number of requested snapshots is not a divisor of",
"the original grid sizes.\n",
f"{tot_points_missing} / {number_of_data_points} data points",
"will be left out of the shuffled snapshots.",
)

shuffle_dimensions = [
int(number_of_data_points / number_of_new_snapshots),
int(number_of_data_points / number_of_shuffled_snapshots),
1,
1,
]

printout(
"Data shuffler will generate",
number_of_new_snapshots,
number_of_shuffled_snapshots,
"new snapshots.",
)
printout("Shuffled snapshot dimension will be ", shuffle_dimensions)

# Prepare permutations.
permutations = []
seeds = []
for i in range(0, number_of_new_snapshots):
for i in range(0, number_of_shuffled_snapshots):
# This makes the shuffling deterministic, if specified by the user.
if self.parameters.shuffling_seed is not None:
np.random.seed(i * self.parameters.shuffling_seed)
Expand All @@ -633,7 +626,7 @@ def shuffle_snapshots(

if snapshot_type == "numpy":
self.__shuffle_numpy(
number_of_new_snapshots,
number_of_shuffled_snapshots,
shuffle_dimensions,
descriptor_save_path,
save_name,
Expand All @@ -652,7 +645,7 @@ def shuffle_snapshots(
)
self.__shuffle_openpmd(
descriptor,
number_of_new_snapshots,
number_of_shuffled_snapshots,
shuffle_dimensions,
save_name,
permutations,
Expand All @@ -668,7 +661,7 @@ def shuffle_snapshots(
)
self.__shuffle_openpmd(
target,
number_of_new_snapshots,
number_of_shuffled_snapshots,
shuffle_dimensions,
save_name,
permutations,
Expand Down
128 changes: 64 additions & 64 deletions test/shuffling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,70 +50,70 @@ def test_seed(self):
new = np.load("Be_REshuffled1.out.npy")
assert np.isclose(np.sum(np.abs(old - new)), 0.0, atol=accuracy)

# def test_seed_openpmd(self):
# """
# Test that the shuffling is handled correctly internally.
#
# This function tests the shuffling for OpenPMD and confirms that
# shuffling both from numpy and openpmd into openpmd always gives the
# same results. The first shuffling shuffles from openpmd to openpmd
# format, the second from numpy to openpmd.
# """
# test_parameters = mala.Parameters()
# test_parameters.data.shuffling_seed = 1234
# data_shuffler = mala.DataShuffler(test_parameters)
#
# # Add a snapshot we want to use in to the list.
# data_shuffler.add_snapshot(
# "Be_snapshot0.in.h5",
# data_path,
# "Be_snapshot0.out.h5",
# data_path,
# snapshot_type="openpmd",
# )
# data_shuffler.add_snapshot(
# "Be_snapshot1.in.h5",
# data_path,
# "Be_snapshot1.out.h5",
# data_path,
# snapshot_type="openpmd",
# )
#
# # After shuffling, these snapshots can be loaded as regular snapshots
# # for lazily loaded training-
# data_shuffler.shuffle_snapshots("./", save_name="Be_shuffled*.h5")
#
# test_parameters = mala.Parameters()
# test_parameters.data.shuffling_seed = 1234
# data_shuffler = mala.DataShuffler(test_parameters)
#
# # Add a snapshot we want to use in to the list.
# data_shuffler.add_snapshot(
# "Be_snapshot0.in.npy",
# data_path,
# "Be_snapshot0.out.npy",
# data_path,
# snapshot_type="numpy",
# )
# data_shuffler.add_snapshot(
# "Be_snapshot1.in.npy",
# data_path,
# "Be_snapshot1.out.npy",
# data_path,
# snapshot_type="numpy",
# )
#
# # After shuffling, these snapshots can be loaded as regular snapshots
# # for lazily loaded training-
# data_shuffler.shuffle_snapshots("./", save_name="Be_REshuffled*.h5")
#
# old = data_shuffler.target_calculator.read_from_openpmd_file(
# "Be_shuffled1.out.h5"
# )
# new = data_shuffler.target_calculator.read_from_openpmd_file(
# "Be_REshuffled1.out.h5"
# )
# assert np.isclose(np.sum(np.abs(old - new)), 0.0, atol=accuracy)
def test_seed_openpmd(self):
"""
Test that the shuffling is handled correctly internally.
This function tests the shuffling for OpenPMD and confirms that
shuffling both from numpy and openpmd into openpmd always gives the
same results. The first shuffling shuffles from openpmd to openpmd
format, the second from numpy to openpmd.
"""
test_parameters = mala.Parameters()
test_parameters.data.shuffling_seed = 1234
data_shuffler = mala.DataShuffler(test_parameters)

# Add a snapshot we want to use in to the list.
data_shuffler.add_snapshot(
"Be_snapshot0.in.h5",
data_path,
"Be_snapshot0.out.h5",
data_path,
snapshot_type="openpmd",
)
data_shuffler.add_snapshot(
"Be_snapshot1.in.h5",
data_path,
"Be_snapshot1.out.h5",
data_path,
snapshot_type="openpmd",
)

# After shuffling, these snapshots can be loaded as regular snapshots
# for lazily loaded training-
data_shuffler.shuffle_snapshots("./", save_name="Be_shuffled*.h5")

test_parameters = mala.Parameters()
test_parameters.data.shuffling_seed = 1234
data_shuffler = mala.DataShuffler(test_parameters)

# Add a snapshot we want to use in to the list.
data_shuffler.add_snapshot(
"Be_snapshot0.in.npy",
data_path,
"Be_snapshot0.out.npy",
data_path,
snapshot_type="numpy",
)
data_shuffler.add_snapshot(
"Be_snapshot1.in.npy",
data_path,
"Be_snapshot1.out.npy",
data_path,
snapshot_type="numpy",
)

# After shuffling, these snapshots can be loaded as regular snapshots
# for lazily loaded training-
data_shuffler.shuffle_snapshots("./", save_name="Be_REshuffled*.h5")

old = data_shuffler.target_calculator.read_from_openpmd_file(
"Be_shuffled1.out.h5"
)
new = data_shuffler.target_calculator.read_from_openpmd_file(
"Be_REshuffled1.out.h5"
)
assert np.isclose(np.sum(np.abs(old - new)), 0.0, atol=accuracy)

def test_training(self):
test_parameters = mala.Parameters()
Expand Down

0 comments on commit a402f79

Please sign in to comment.