Replies: 8 comments 6 replies
-
One thing that can improve things considerably is to chunk zarr files differently. Facing a version of this issue on a ZFS file system while running with MPI, I gained significantly by chunking the observations. By doing something like
I dramatically reduced the number of files, and the efficiency of reading, with no real impact on run time. Note that the first chunks dimension, 1e6, in practice will be reduced to the number of particles launched in the initial time of the parcels run. I post-process my output files, and choose a different and even more efficient chunking scheme, as described in the parcels tutorial on large MPI runs here. Also, once you post process it, you can store it in a zip store (or zip the existing directory) to make it a single file. I have not played with this much, but it is possible. Note that I don't think (?) that you can write to a zip store more than once. Finally, note that the most recent version of Zarr includes support for sharding, in which multiple chunks can be stored in a single file. I have not played with this, and in particular do not understand its impact on run speeds. But this is designed very specifically to deal with this kind of issue. See https://zarr.readthedocs.io/en/stable/release.html As I final note, if you have not noticed the tutorial on dealing with large output, I strongly recommend it (I am the author). In particular, pay attention to some of the comments on decoding time, especially if you are not using the latest xarray. I run cases on 32 cores and billions of particles, and feel this pain. I will experiment with sharding at some point. Jamie |
Beta Was this translation helpful? Give feedback.
-
Thanks for starting this discussion, @tiagoams; and for weighing in with your comments to optimise the chunking, @JamiePringle. More generally, I wonder what an alternative output format could be for Parallel File Systems? The particlefile class is now more modular, so it should not be too much work to (also) implement other output formats; if they work better for some cases. Do you have any thoughts on what those file formats could be, @tiagoams? |
Beta Was this translation helpful? Give feedback.
-
Two obvious (as in I have not thought long enough to find their pitfalls) choices for very parallel runs with distributed filesystems are 1) writing to a database (time, trajectory, lat, lon, z) tuples and 2) appending the tuples to a per-process or per-node flat file, and then merging them all at the end. 1) and 2) could be combined by having local databases and then merging them. All of this is trivial conceptually and will be a nasty mess of little details in practice. In particular, not all HPC systems have high-performance databases set up. Can you write each of the proc*.zarr to a local per node file system, and then merge them later to avoid the slow big filesystem? Jamie |
Beta Was this translation helpful? Give feedback.
-
@JamiePringle many thanks for the suggestion to chunk the output in order to reduce the number of files in the zarr storage. I have run small tests without mpi (to avoid having to merge the zarrs from each process) which results in 400 trajs by 1450 observations. The number of files is reduced 10 fold in this example, and it makes it manageable to move zarrs around. I have tried chunking with 10 x 1e6 but this becomes very slow and it still running as I write this. I don't understand why, and I thought that this would suit the prostprocessing better, where trajectories would be read whole. Below are some number for running and file count with 1e6x10 and default chunking. Runtime with and wo chunking
default:
Number of files with and wo chunking
Our HCP support have also been looking into this, and although it seems that not much can be done regarding the way gpfs is being served, they have found out that writing a tar and even untaring is relatively fast and can sped up by paralelising subdirs with xargs, e.g.
@erikvansebille thanks for discussing this, I am not aware of which other format would solve this problems, apart from keeping netcdf, which is less space efficient. For hydrodynamic model output, netcdf3 often provides better runtime performance over HDF5 enabled netcdf4, but neither can deal smartly with the sparse arrays that are common in particle tracking. Scale factors and offset are often used in netcdf4 model output to save space and speed up IO. |
Beta Was this translation helpful? Give feedback.
-
I am out now and will explain more in a bit. But quickly my choice of 10 x
1e6 was stupid. Something closer to 10 by several thousands to 10,000 would
be better. For reasons I’ll explain later this is irrelevant with MPI, but
if you run it without MPI having a chunk length of 1 million is not right.
…On Mon, Feb 20, 2023 at 8:24 AM tiagoams ***@***.***> wrote:
CAUTION: This email originated from outside of the University System. Do
not click links or open attachments unless you recognize the sender and
know the content is safe.
CAUTION: This email originated from outside of the University System. Do
not click links or open attachments unless you recognize the sender and
know the content is safe.
@JamiePringle <https://github.com/JamiePringle> many thanks for the
suggestion to chunk the output in orde to reduce the number of files in the
zarr storage. I have run small tests without mpi (to avoid having to merge
the zarrs from each process) that results in 400 trajs by 84600
observations. The number of files is reduced 10 fold in this example, and
makes it manageable to move around. I have tried chunking with 10 x 1e6 but
this becomes very slow and it still running as I write this. I don't
understand why, and I thought that this would suit the prostprocessing
better, where trajectories would be read whole.
Below are some number for running and file count with 1e6x10 and default
chunking.
*Runtime with and wo chunking*
test_chunking_1e6_10.zarr
WARNING: Chunk size for trajectory (1000000) is larger than length of initial set to write. Reducing ParticleFile chunks to (400, 10)
execute time 88 s mem used: 586 MB
default:
test_chunking_default.zarr
execute time 184 s mem used: 602 MB
*Number of files with and wo chunking*
***@***.*** runs]$ find $scratch/test_chunking_1e6_10.temp.zarr -type f | wc -l
888
***@***.*** runs]$ find $scratch/test_chunking_default.temp.zarr -type f | wc -l
8664
Our HCP support have also been looking into this, and although it seems
that not much can be done regarding the way gpfs is being served, they have
found out that writing a tar and even untaring is relatively fast and can
sped up by paralelising subdirs with xargs, e.g.
$ cd my.zarr
$ time find . -mindepth 1 -maxdepth 1 -type d | xargs -n1 -P12 -I{} tar -cf ../my.zarr-tar/{}.tar {}
real 0m15.670s
user 0m0.478s
sys 0m3.942s
@erikvansebille <https://github.com/erikvansebille> thanks for discussing
this, I am not aware of which other format would solve this problems, apart
from keeping netcdf, which is less space efficient. For hydrodynamic model
output, netcdf3 often provides better runtime performance over HDF5 enabled
netcdf4, but neither can deal smartly with the sparse arrays that are
common in particle tracking. Scale factors and offset are often used in
netcdf4 model output to save space and speed up IO.
If zarr is the only supported format maybe seting a sensible chunking
value will be a good enough solution.
—
Reply to this email directly, view it on GitHub
<#1316 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ADBZR2YMUJTXZSPNQEGKK4LWYNWANANCNFSM6AAAAAAU7UTJYY>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Beta Was this translation helpful? Give feedback.
-
The good thing about chunking is that you can optimize performance for your use case. The bad thing is that you have to think about it and can do the opposite of optimizing if you don't do it correctly. The optimal choice can depend on the nature of of your reading/writing patterns and the underlying file system. The key thing to remember about each chunk is that for any reading operation, the whole chunk must be read, and for any writing operation on an existing chunk the whole chunk must be read and then written. It might seem then that the optimal choice would be a very small chunk (1,1). But this is very wrong for two reasons. 1) zarr and python have per-chunk overhead, which will dominate performance and slow things down if if there are too many chunks. 2) Filesystems, disks and SSDs all have minimum size chunks of data that can be optimally read. Usually the Filesystem chunk size is the one you need to worry about. If your Zarr chunk size is smaller than the filesystem chunk size, you can have very poor performance and lots of space wasted. Parcels can write to multiple observation indices in a single time, if particles start at different times. The maximum trajectory chunk size seems to be the initial number of particles written the first time particle locations are written out (note that if you run with MPI, the run is split up into a number of processes, so each process will have a smaller number of particles to process and thus a smaller maximum trajectory chunk size). I find that on my 32 or 64 process MPI runs the trajectory chunk size is limited to, and thus set by, the initial number of particles written out at the initial writing time for each MPI process. However, this is not as bad as it seems, because if you make the trajectory chunk too large, you end up with lots of wasteful reading and writing. I have found through experimentation with my case that I get better performance (and it is easier to delete/move/etc the zarr) if I set the trajectory chunk to the maximum number of particles written out initially per process (1000s to 10000s) and the observation chunk to 10. It is well worth experimenting! The experiments should use the same MPI scheme as the real run. You mention that for large runs it can be painful to process or load the many zarr files for each MPI run. In this tutorial I show how you can reprocess the output into a single zarr file with an optimal structure for reading and potentially greatly reduced size. You could also modify it to write to a zip store, making it a single file (I think -- I have not done so :-). I don't know if it is faster to make it as a zip file directly (with the zarr zip store) or to zip the Zarr directory after writing. This might well depend on your file system. I find it much easier to convert to a single Zarr file than to always have to concatenate many proc*.zarr files Cheers, |
Beta Was this translation helpful? Give feedback.
-
@tiagoams -- I wonder if you have run into an interesting edge case that @erikvansebille did not think about. The chunk size you specify for the trajectory dimension WILL get overridden if the initial particle set is too small, as I read the code. I would examine the chunk size in one of the ProcN.zarr files, and see if the trajectory chunk size is much smaller than you would expect from what you specify. To do so, go to one of the directories with the output of an MPI run and do something like
It will print something like
From which you can see the chunk size (here 26200,10). If this is something ridiculously small, you might want to 1) alter the code to change this behavior (of course, @erikvansebille may have had a good reason to do this...), or you can start a larger set of particles initially. I realize this can be a pain. It is important to realize that each procN.zarr file can have its own chunking size.... On my concatenation of the procN.zarr files, I do that in a separate processing step after my MPI run has finished. I like to keep my MPI script as simple as possible. After the particle tracking run finishes, I have a script that 1) does a basic sanity check that it finished, 2) converts the zarr files, subsamples and changes particle type, adds the grid-cells the particles are in, and 3) copies from the temp running directory to the final data set. However, if you were to want to do it in the same python code as you run with mpirun, you can check what rank each process is in, and only run it in one. To do this in rank 0, you need to do something like
|
Beta Was this translation helpful? Give feedback.
-
Closing as a solution was found. This is also being discussed in #1340 |
Beta Was this translation helpful? Give feedback.
-
Hi,
I am having performance problems managing zarr files on an HPC cluster with gpfs filesystem. I am posting it here so that i) others using gpfs may be aware of it ii) possible solutions may be shared and iii) to inform the developers since native zarr output has been selected as the solution to "Memory limitation in netcdf export step" #1091 as well as "Zarr output saves disk space and increases data access speed" #1164.
It seems that gpfs is inherently poor at handling zarr files in particular for recursive operations such as cp, mv, ls, and du. I am seeing 100x slow down on moving files on gpfs compared with the local file system of the node. I haven't benchmarked the effect on parcels runs, but the cost of managing zarr outputs seems prohibitive, for instance the required step of merging per process zarr files for mpi runs. For the time being I will revert to parcels 2.3.1 so that I can avoid this problem.
This paper offer an explanation:
Predicting and Comparing the Performance of Array Management Libraries (lbl.gov)
“Zarr stores a chunk, a fixed-size partition of an array, in a separate file, such that storing a large array requires many files which is challenging for parallel file systems. HDF5 and Zarr represent the two extremes of array storage mechanisms”
“A. Parallel file systems Parallel file systems, such as Lustre and GPFS are widely used on HPC clusters. These file systems have architectural similarities in terms of having separate metadata and data servers and transferring data to clients through the network. A file is partitioned into stripes and each stripe is stored in a single data server. The metadata server stores the location of each stripe for a file. We refer to the mapping between stripes and data servers as the file layout. There are two parameters that control the file layout: (1) the stripe size, or the size of a stripe in bytes, and (2) the server count, or the number of data servers that will be used. In GPFS, all files in the same file system use the same stripe size and server count, whereas in Lustre two files in the same file system can have different stripe sizes and server counts. Accessing many files in a parallel file system is inefficient due to the frequent communication with metadata servers and the lack of prefetching in data servers.”
Beta Was this translation helpful? Give feedback.
All reactions