Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement on-the-fly descriptor calculation #630

Merged
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e154d2a
On-the-fly training works for the RAM case
RandomDefaultUser Jan 6, 2025
88a5edb
Lazy Loading training works now
RandomDefaultUser Jan 6, 2025
0e4ddfe
Checkpointing works now as well
RandomDefaultUser Jan 7, 2025
c43fefc
Made method private
RandomDefaultUser Jan 7, 2025
7a90ed5
Tester class now also works with on-the-fly calculations
RandomDefaultUser Jan 7, 2025
39a40c3
Prefetching works
RandomDefaultUser Jan 7, 2025
2201e34
Is this already enought to get DDP working?
RandomDefaultUser Jan 7, 2025
bc79637
Merge branch 'refs/heads/develop_lenz' into descriptors_on_the_fly
RandomDefaultUser Jan 7, 2025
c1737c5
Renamed "additional info", since it will be used more regularly with …
RandomDefaultUser Jan 7, 2025
4001d0a
Fixing a parallel writing bug
RandomDefaultUser Jan 8, 2025
e6a0723
Can I use DDP and MPI at the same time?
RandomDefaultUser Jan 8, 2025
5162404
It does not help
RandomDefaultUser Jan 8, 2025
3997229
Getting rid of the parallel modification for now
RandomDefaultUser Jan 8, 2025
15ff5dc
Shuffling from atomic positions works now
RandomDefaultUser Jan 9, 2025
8e8cb3f
Shuffling now works as part of the temporary pipeline
RandomDefaultUser Jan 9, 2025
615792b
Fixed docstrings
RandomDefaultUser Jan 9, 2025
d0e8de6
Added automatic snapshot type detection
RandomDefaultUser Jan 9, 2025
07126f1
Added new temporary framework to examples
RandomDefaultUser Jan 14, 2025
a7d7bd3
Fixed examples
RandomDefaultUser Jan 14, 2025
f0738ce
Added documentation
RandomDefaultUser Jan 14, 2025
d359057
Small adjustment in example
RandomDefaultUser Jan 14, 2025
9d87a2f
Implemented rudimentary json+openpmd
RandomDefaultUser Jan 14, 2025
d1960d4
Added tests for on-the-fly calculations
RandomDefaultUser Jan 14, 2025
ac255b2
Small adjustment in test
RandomDefaultUser Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Shuffling now works as part of the temporary pipeline
RandomDefaultUser committed Jan 9, 2025
commit 8e8cb3fc792978b0c22a0ed453bd5de0b6e367f6
153 changes: 129 additions & 24 deletions mala/datahandling/data_shuffler.py
Original file line number Diff line number Diff line change
@@ -3,12 +3,14 @@
import os

import numpy as np
import tempfile

import mala
from mala.common.parameters import (
Parameters,
DEFAULT_NP_DATA_DTYPE,
)
from mala.common.parallelizer import printout
from mala.common.parallelizer import printout, parallel_warn
from mala.common.physical_data import PhysicalData
from mala.datahandling.data_handler_base import DataHandlerBase
from mala.common.parallelizer import get_comm
@@ -32,6 +34,17 @@ class DataShuffler(DataHandlerBase):
target_calculator : mala.targets.target.Target
Used to do unit conversion on output data. If None, then one will
be created by this class.

Attributes
----------
temporary_shuffled_snapshots : list
A list containing snapshot objects of temporary, snapshot-like
shuffled data files. By default, this list is empty. If the
function "shuffle_snapshots_temporary" is used, it will be populated
with temporary files saved to hard drive, which can be deleted
after model training. Please note that the "snapshot_function",
"input_units", "output_units" and "calculation_output" fields of the
snapshots within this list
"""

def __init__(
@@ -46,6 +59,7 @@ def __init__(
descriptor_calculator=descriptor_calculator,
)
self._data_points_to_remove = None
self.temporary_shuffled_snapshots = []

def add_snapshot(
self,
@@ -97,6 +111,7 @@ def __shuffle_numpy(
target_save_path,
permutations,
file_ending,
temporary,
):
# Load the data (via memmap).
descriptor_data = []
@@ -168,12 +183,6 @@ def __shuffle_numpy(
target_data[idx] = current_target[indices]

# Do the actual shuffling.
target_name_openpmd = os.path.join(
target_save_path, save_name.replace("*", "%T")
)
descriptor_name_openpmd = os.path.join(
descriptor_save_path, save_name.replace("*", "%T")
)
for i in range(0, number_of_new_snapshots):
new_descriptors = np.zeros(
(int(np.prod(shuffle_dimensions)), self.input_dimension),
@@ -184,12 +193,73 @@ def __shuffle_numpy(
dtype=DEFAULT_NP_DATA_DTYPE,
)
last_start = 0
descriptor_name = os.path.join(
descriptor_save_path, save_name.replace("*", str(i))
)
target_name = os.path.join(
target_save_path, save_name.replace("*", str(i))
)

# Figure out where to save / how to name things.
# TODO: This could probably be shortened.
if temporary:

# Adding "snapshot numbers" here is technically not necessary
# I think, but it also doesn't hurt.
if file_ending == "npy":
descriptor_name = tempfile.NamedTemporaryFile(
delete=False,
prefix=save_name.replace("*", str(i)),
suffix=".in.npy",
dir=descriptor_save_path,
).name
target_name = tempfile.NamedTemporaryFile(
delete=False,
prefix=save_name.replace("*", str(i)),
suffix=".out.npy",
dir=target_save_path,
).name
snapshot_type = "numpy"
else:
descriptor_name = tempfile.NamedTemporaryFile(
delete=False,
prefix=save_name.replace("*", "%T"),
suffix=".in." + file_ending,
dir=descriptor_save_path,
).name
target_name = tempfile.NamedTemporaryFile(
delete=False,
prefix=save_name.replace("*", "%T"),
suffix=".out." + file_ending,
dir=target_save_path,
).name
snapshot_type = "openpmd"
self.temporary_shuffled_snapshots.append(
mala.Snapshot(
os.path.basename(descriptor_name),
os.path.dirname(descriptor_name),
os.path.basename(target_name),
os.path.dirname(target_name),
snapshot_function="te",
output_units="None",
input_units="None",
calculation_output="",
snapshot_type=snapshot_type,
)
)
else:
if file_ending == "npy":
descriptor_name = os.path.join(
descriptor_save_path,
save_name.replace("*", str(i)) + ".in.npy",
)
target_name = os.path.join(
target_save_path,
save_name.replace("*", str(i)) + ".out.npy",
)
else:
descriptor_name = os.path.join(
descriptor_save_path,
save_name.replace("*", "%T") + ".in." + file_ending,
)
target_name = os.path.join(
target_save_path,
save_name.replace("*", "%T") + ".out." + file_ending,
)

# Each new snapshot gets an number_of_new_snapshots-th from each
# snapshot.
@@ -234,10 +304,10 @@ def __shuffle_numpy(
)
if file_ending == "npy":
self.descriptor_calculator.write_to_numpy_file(
descriptor_name + ".in.npy", new_descriptors
descriptor_name, new_descriptors
)
self.target_calculator.write_to_numpy_file(
target_name + ".out.npy", new_targets
target_name, new_targets
)
else:
# We check above that in the non-numpy case, OpenPMD will work.
@@ -248,7 +318,7 @@ def __shuffle_numpy(
shuffle_dimensions
)
self.descriptor_calculator.write_to_openpmd_file(
descriptor_name_openpmd + ".in." + file_ending,
descriptor_name,
new_descriptors,
additional_attributes={
"global_shuffling_seed": self.parameters.shuffling_seed,
@@ -258,7 +328,7 @@ def __shuffle_numpy(
internal_iteration_number=i,
)
self.target_calculator.write_to_openpmd_file(
target_name_openpmd + ".out." + file_ending,
target_name,
array=new_targets,
additional_attributes={
"global_shuffling_seed": self.parameters.shuffling_seed,
@@ -268,8 +338,6 @@ def __shuffle_numpy(
internal_iteration_number=i,
)

self.delete_temporary_inputs()

# The function __shuffle_openpmd can be used to shuffle descriptor data and
# target data.
# It will be executed one after another for both of them.
@@ -652,6 +720,7 @@ def shuffle_snapshots(
target_save_path=None,
save_name="mala_shuffled_snapshot*",
number_of_shuffled_snapshots=None,
shuffle_to_temporary=False,
):
"""
Shuffle the snapshots into new snapshots.
@@ -677,6 +746,12 @@ def shuffle_snapshots(
If not None, this class will attempt to redistribute the data
to this amount of snapshots. If None, then the same number of
snapshots provided will be used.

shuffle_to_temporary : bool
If True, shuffled files will be writen to temporary data files.
Which paths are used is consistent with non-temporary usage of this
class. The path and names of these temporary files can then be
found in the class attribute temporary_shuffled_snapshots.
"""
# Check the paths.
if complete_save_path is not None:
@@ -691,12 +766,23 @@ def shuffle_snapshots(
file_ending = save_name.split(".")[-1]
save_name = save_name.split(".")[0]
if file_ending != "npy":
import openpmd_api as io

if file_ending not in io.file_extensions:
raise Exception(
"Invalid file ending selected: " + file_ending
if shuffle_to_temporary:
parallel_warn(
"Shuffling to temporary files currently"
" only works with numpy as an enginge for "
"intermediate files. You have selected both"
" openpmd and the temporary file option. "
"Will proceed with numpy instead of "
"openpmd."
)
file_ending = "npy"
else:
import openpmd_api as io

if file_ending not in io.file_extensions:
raise Exception(
"Invalid file ending selected: " + file_ending
)
else:
file_ending = "npy"

@@ -794,6 +880,7 @@ def shuffle_snapshots(
target_save_path,
permutations,
file_ending,
shuffle_to_temporary,
)
elif snapshot_type == "json+numpy":
for snapshot in self.parameters.snapshot_directories_list:
@@ -806,6 +893,7 @@ def shuffle_snapshots(
target_save_path,
permutations,
file_ending,
shuffle_to_temporary,
)
elif snapshot_type == "openpmd":
descriptor = self.__DescriptorOrTarget(
@@ -843,6 +931,23 @@ def shuffle_snapshots(
else:
raise Exception("Unknown snapshot type: {}".format(snapshot_type))

# Deleting temporary files that may have been created.
self.delete_temporary_inputs()

# Since no training will be done with this class, we should always
# clear the data at the end.
self.clear_data()

def delete_temporary_shuffled_snapshots(self):
for snapshot in self.temporary_shuffled_snapshots:
input_file = os.path.join(
snapshot.input_npy_directory, snapshot.input_npy_file
)
if os.path.isfile(input_file):
os.remove(input_file)
output_file = os.path.join(
snapshot.output_npy_directory, snapshot.output_npy_file
)
if os.path.isfile(output_file):
os.remove(output_file)
self.temporary_shuffled_snapshots = []