Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up schema and dependencies #114

Merged
merged 52 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
83684ea
set dask logging level when setting up client
landmanbester Sep 4, 2024
300b275
remove exitstack from grid worker
landmanbester Sep 4, 2024
bd68b5c
use built in numba.set_num_threads to set the number of threads to us…
landmanbester Sep 4, 2024
1ff7673
remove QuartiCal dependency
landmanbester Sep 4, 2024
446df17
set nthreads=1 in test suite
landmanbester Sep 4, 2024
4cc7069
add bokeh dependency and don't lose protocol in output-filename
landmanbester Sep 4, 2024
3ab346d
catch and raise errors returned by futures
landmanbester Sep 5, 2024
d0a6a99
report numba cache dir for validation
landmanbester Sep 5, 2024
d2fe71a
more general hess_psf operator
landmanbester Sep 9, 2024
0973d5c
fix failing tests. Report timings in pcg. Plot residual histograms af…
landmanbester Sep 9, 2024
fd195a1
optimise simple task for large arrays
landmanbester Sep 10, 2024
8d0d3e8
compare cube to per slice hess
landmanbester Sep 10, 2024
74e8e71
explicitly check if ac_iter is empty and break out of loop if it is
landmanbester Sep 11, 2024
06144d6
add direction dependent predict
landmanbester Sep 13, 2024
cd46ac4
fix typo in degrid
landmanbester Sep 13, 2024
94b36ec
drop last two axes of wcs before rendering region to mask
landmanbester Sep 13, 2024
0d82069
transpose region mask in degridder
landmanbester Sep 13, 2024
9947a5a
stokes2vis return None when all data are flagged in a chunk
landmanbester Sep 13, 2024
e09fa01
Merge branch 'ddpredict' into outputs
landmanbester Sep 16, 2024
5f0539d
fix typo in pcg for fluxmop
landmanbester Sep 16, 2024
4af9f14
don't drop DIRTY in pcg for fluxmop
landmanbester Sep 17, 2024
0524470
Merge branch 'ddpredict' into outputs
landmanbester Sep 17, 2024
956e803
don't strip protocall in basename
landmanbester Sep 17, 2024
fa5bf79
don't strip protocall in basename also in fluxmop
landmanbester Sep 17, 2024
234b6c9
optionally use stack.enter_context
landmanbester Sep 17, 2024
663db16
add regions dependency
landmanbester Sep 17, 2024
581f664
report load location in xds_from_url call
landmanbester Sep 17, 2024
bf9395c
keep protocol in tact when globbing from object stores
landmanbester Sep 17, 2024
4c3d939
use s3fs directly with pickle instead of boto3
landmanbester Sep 17, 2024
87aca03
explicitly resize_thread_pool in gridding workers
landmanbester Sep 17, 2024
f4b3b70
more consistent use of xds_from_url
landmanbester Sep 18, 2024
358ff07
make wgridder calls more verbose
landmanbester Sep 19, 2024
366f44b
parallel norm_diff and print sara timings in fractions
landmanbester Sep 19, 2024
c540401
time residual steps
landmanbester Sep 19, 2024
6a84862
attempt to write to zarr in separate thread
landmanbester Sep 20, 2024
a514a43
don't write unnecessary data vars during major cycle
landmanbester Sep 20, 2024
215a9d4
better use of numba in pcg
landmanbester Sep 21, 2024
c3f0ee9
use --model-name and --residual-name parameters in restore worker
landmanbester Sep 23, 2024
90b85db
Merge branch 'ddpredict' into outputs
landmanbester Sep 23, 2024
55882b6
revert numba updates in pcg
landmanbester Sep 23, 2024
f69f699
don't use exitstack in degrid
landmanbester Sep 25, 2024
90eefda
Merge branch 'ddpredict' into outputs
landmanbester Sep 25, 2024
1a61ca9
pass jit options correctly to overload
landmanbester Sep 26, 2024
df9c779
resize ducc thread pool in comps2vis
landmanbester Sep 26, 2024
cee6165
report mean and var ressq before and after reweighting
landmanbester Oct 10, 2024
98da2e1
make beam computational optional in grid worker
landmanbester Oct 11, 2024
3acfded
allow missing antennas by relabelling ant1 and ant2 by index
landmanbester Oct 14, 2024
6590170
downweight extrapolated edges in model2comps
landmanbester Oct 16, 2024
699d5a5
don't select out frow before antenna sanity check in stokes2vis
landmanbester Oct 16, 2024
064cad6
add outputs and read degrid chan mapping from dds by default
landmanbester Oct 29, 2024
c974a0a
rename fluxmop -> fluxtractor
landmanbester Oct 30, 2024
ff2831c
only print timings if verbosity larger than 1
landmanbester Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ By default only a single worker is used so that datasets will be processed one a
This results in the smallest memory footprint but won't always result in optimal resource utilisation.
It also allows for easy debugging as there is no Dask cluster involved in this case.
However, for better resource utilisation and or distributing computations over a cluster, you may wish to set `--nworkers` larger than one.
This uses multiple Dask workers (processes) to process chunks in parallel and is especially useful for the `init`, `grid` and `fluxmop` applications.
This uses multiple Dask workers (processes) to process chunks in parallel and is especially useful for the `init`, `grid` and `fluxtractor` applications.
It is usually advisable to set `--nworkers` to the number of desired imaging bands which is set by the `--channels-per-image` parameter when initialising corrected Stokes visibilities with the `init` application.
The product of `--nworkers` and `--nthreads-per-worker` should not exceed the available resources.

Expand Down
37 changes: 31 additions & 6 deletions pfb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import logging
# import psutil
# import resource

Expand All @@ -14,7 +15,9 @@ def set_envs(nthreads, ncpu):
os.environ["OPENBLAS_NUM_THREADS"] = str(nthreads)
os.environ["MKL_NUM_THREADS"] = str(nthreads)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(nthreads)
# os.environ["NUMBA_NUM_THREADS"] = str(nthreads)
os.environ["NPY_NUM_THREADS"] = str(nthreads)
os.environ["NUMBA_NUM_THREADS"] = str(nthreads)
os.environ["JAX_PLATFORMS"] = 'cpu'
os.environ["JAX_ENABLE_X64"] = 'True'
# this may be required for numba parallelism
# find python and set LD_LIBRARY_PATH
Expand All @@ -30,23 +33,45 @@ def set_envs(nthreads, ncpu):
os.environ["NUMEXPR_NUM_THREADS"] = str(ne_threads)


def set_client(nworkers, log, stack=None,
host_address=None, direct_to_workers=False):
def set_client(nworkers, log, stack=None, host_address=None,
direct_to_workers=False, client_log_level=None):

import warnings
warnings.filterwarnings("ignore", message="Port 8787 is already in use")
if client_log_level == 'error':
logging.getLogger("distributed").setLevel(logging.ERROR)
logging.getLogger("bokeh").setLevel(logging.ERROR)
logging.getLogger("tornado").setLevel(logging.CRITICAL)
elif client_log_level == 'warning':
logging.getLogger("distributed").setLevel(logging.WARNING)
logging.getLogger("bokeh").setLevel(logging.WARNING)
logging.getLogger("tornado").setLevel(logging.WARNING)
elif client_log_level == 'info':
logging.getLogger("distributed").setLevel(logging.INFO)
logging.getLogger("bokeh").setLevel(logging.INFO)
logging.getLogger("tornado").setLevel(logging.INFO)
elif client_log_level == 'debug':
logging.getLogger("distributed").setLevel(logging.DEBUG)
logging.getLogger("bokeh").setLevel(logging.DEBUG)
logging.getLogger("tornado").setLevel(logging.DEBUG)

import dask
dask.config.set({'distributed.comm.compression': 'lz4'})
# set up client
host_address = host_address or os.environ.get("DASK_SCHEDULER_ADDRESS")
if host_address is not None:
from distributed import Client
print("Initialising distributed client.", file=log)
client = stack.enter_context(Client(host_address))
if stack is not None:
client = stack.enter_context(Client(host_address))
else:
client = Client(host_address)
else:
from dask.distributed import Client, LocalCluster
print("Initialising client with LocalCluster.", file=log)
dask.config.set({
'distributed.comm.compression': {
'on': True,
'type': 'blosc'
'type': 'lz4'
}
})
cluster = LocalCluster(processes=True,
Expand Down
6 changes: 3 additions & 3 deletions pfb/deconv/clark.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ def clark(ID,
model = np.zeros((nband, nx, ny), dtype=ID.dtype)
IR = ID.copy()
# pre-allocate arrays for doing FFT's
xout = empty_noncritical(ID.shape, dtype=ID.dtype)
xpad = empty_noncritical(PSF.shape, dtype=ID.dtype)
xhat = empty_noncritical(PSFHAT.shape, dtype=PSFHAT.dtype)
xout = empty_noncritical(ID.shape, dtype='f8')
xpad = empty_noncritical(PSF.shape, dtype='f8')
xhat = empty_noncritical(PSFHAT.shape, dtype='c16')
# square avoids abs of full array
IRsearch = np.sum(IR, axis=0)**2
pq = IRsearch.argmax()
Expand Down
Loading
Loading