Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to make all algorithms use the same formats, simplify parallelism, direct pre-conditioning and produce uniformly blurred images #111

Merged
merged 235 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
235 commits
Select commit Hold shift + click to select a range
f2bea94
add option to visualise init graphs
May 3, 2024
3f70801
name jones ones
May 3, 2024
25fe8bd
clone t mapping in stokes2vis
May 3, 2024
8989337
print visualised graph locations to log
May 3, 2024
c05f6dd
add jsk wavelets and initial approach to distribute spotless
May 8, 2024
904e91e
growing bytearray problem
May 10, 2024
795b4e7
eliminate wavelets and cf.futures for testing
May 10, 2024
a8a5249
band actor chaos
May 10, 2024
1d7e6e7
images rst
May 10, 2024
14110ee
heading at the top
May 10, 2024
ed75c31
unify gain-table and gain-term params in init
May 13, 2024
9537694
simplified band actors approach
May 14, 2024
885f5dd
residuals decreasing
May 14, 2024
0e9dafa
Add imit worker that combines init and grid steps (for use with distr…
May 16, 2024
df203b7
cleaner distributed workflow
May 17, 2024
d72767c
add l2 reweighting to dist_spotless
May 20, 2024
e76710c
tweak when L2 reweighting happens
May 20, 2024
d7bcbfb
add imit cab
May 20, 2024
6467604
add missing xds_from_zarr import to imit
May 20, 2024
393b12e
allow diagonal jones in imit worker
May 20, 2024
4d28514
add unique keys for imit jobs, print scheduler address
May 20, 2024
b6f6408
add key for reading in worker_client in stokes2im
May 20, 2024
5a5d52d
test to_zarr with no dask
May 20, 2024
25043b8
clone subds in imit
May 20, 2024
0122060
direct to workers in imit client
May 20, 2024
4df377c
clone inside image_data_products
May 21, 2024
65ef95c
set OMP_NUM_THREADS so wgridder can parallelise
May 21, 2024
34dae04
print update and ratio timings
May 21, 2024
6b3ed1a
test communication times with direct_to_workers
May 21, 2024
225038d
cache of imaging data products. allow worker compression
May 22, 2024
5b9617f
numba ratio computation
May 22, 2024
2e76026
allow overwriting existing model
May 22, 2024
0ed0ea3
allow fits output format for fastim worker
May 23, 2024
0701541
add DATE-OBS to fits header
May 23, 2024
0026eae
add BDA support
May 23, 2024
42e77e4
make sure ddsstore is initialised when overwriting
May 23, 2024
250f912
norm wsum when writing fits out of fastim
May 23, 2024
7f752da
depend on uninneddeps qcal branch
May 23, 2024
535dc30
typo
May 23, 2024
17688ef
depend on codex-africanus >= 0.3.7
May 23, 2024
15f43bf
prepend protocol to ds_list)
landmanbester May 24, 2024
e012d06
l2reweight-from option
landmanbester May 24, 2024
927a354
remove print debug statements
landmanbester May 24, 2024
cd81cc9
try except around imaging call
landmanbester May 24, 2024
5bb27bc
direct to workers to false
landmanbester May 24, 2024
dc4d3b4
print cache name that we are attempting to write to
landmanbester May 24, 2024
cbdba80
print cname
landmanbester May 24, 2024
52e8847
don't use flag_row in bda
landmanbester May 25, 2024
83c74ce
test broadcast_to trick for psf_vis
landmanbester May 25, 2024
37c4581
print shape after bda
landmanbester May 25, 2024
b95056d
print shapes outside of sunbmit call
landmanbester May 25, 2024
dacc313
check for infs or nans
landmanbester May 26, 2024
953579c
check for null subbands
landmanbester May 26, 2024
2acb375
allow empty string for frange. Write residual at iter0
landmanbester May 27, 2024
4304a12
grab uvw from res after averaging
landmanbester May 27, 2024
eccc0eb
change imit output name to xds
landmanbester May 27, 2024
4788bc9
re-enable bda
landmanbester May 27, 2024
6acc942
update chan_width after simple average
landmanbester May 27, 2024
c1febc6
debug call to stokes vis func outside of client
landmanbester May 27, 2024
0066439
debug call to stokes vis func outside of client, ipdb
landmanbester May 27, 2024
fedf0f5
typo
landmanbester May 27, 2024
095f34d
remove debug call
landmanbester May 27, 2024
b20dd07
drop flagged rows in intermediaries
landmanbester May 27, 2024
d4f2e20
add wgt in compute_counts
landmanbester May 27, 2024
d33f8d7
make grid app compatible with init output
landmanbester May 30, 2024
70e405e
sum counts before filtering extremely low values
landmanbester Jun 5, 2024
028a262
test negate w
landmanbester Jun 5, 2024
2c65a76
also negate w in degrid
landmanbester Jun 6, 2024
dc5aff2
undo w negation
landmanbester Jun 9, 2024
5e5217f
yet another fastim test
landmanbester Jun 9, 2024
1b0c46f
overwrite restore products by default. Scale by pixel area. raise err…
landmanbester Jun 10, 2024
3df7d5f
add or-model-with-mask option to fluxmop
landmanbester Jun 12, 2024
5fbcb90
keep track of initial model and residual after fluxmop in case of fai…
landmanbester Jun 12, 2024
2cc81b5
typo
landmanbester Jun 12, 2024
d47bfb9
don't zero model outside mask by default
landmanbester Jun 12, 2024
3bf3a91
make model2comps write output fits cube
landmanbester Jun 12, 2024
8d02ed2
output model cube in model2comps
landmanbester Jun 13, 2024
7ac08bd
use model-out to set fits output path if set
landmanbester Jun 13, 2024
dd8a267
residname to fluxmop in case of recovery
landmanbester Jun 13, 2024
f0ab12f
linear extrapolation in model2comps
landmanbester Jun 14, 2024
8a8de54
pin numpy <= 2.0.0
landmanbester Jun 19, 2024
a5f6fbd
resize ducc threadpool
landmanbester Jun 19, 2024
e6104a8
separate distributed spotless algorithm from memory conservative sara…
landmanbester Jun 19, 2024
025bd30
fastim -> hci
landmanbester Jun 19, 2024
2dc3cde
report number of basis functions used for fit in model2comps
landmanbester Jun 20, 2024
19530db
report interpolation error in model2comps
landmanbester Jun 20, 2024
5c7bcfe
fastim -> hci in main
landmanbester Jun 20, 2024
f3ae838
slice out time 0 for reporting fractional interp error
landmanbester Jun 20, 2024
62d1fd9
try except in sum_overlap
landmanbester Jun 20, 2024
11804cf
rudimentary support for MFS weighting (only on second run of grid app)
landmanbester Jun 20, 2024
121a151
allow null input model in fluxmop
landmanbester Jun 21, 2024
7a21c9a
add memory greedy option to fluxmop
landmanbester Jun 21, 2024
bb32c07
use cetral frequency as ref_freq
landmanbester Jun 21, 2024
52bc7d5
set memory limit. Move imit functionality to init and remove duplicat…
landmanbester Jun 22, 2024
64388c6
remove imit import from main
landmanbester Jun 22, 2024
0e32fd2
start simplifying band_worker
landmanbester Jun 22, 2024
bbd34eb
set crpix3 for MFS fits files
landmanbester Jun 23, 2024
6425fc1
allow running fluxmop without psf in dds
landmanbester Jun 24, 2024
088c4b5
pin numba, don't ignore deprecatin warnings
landmanbester Jun 24, 2024
e69e561
unpin numba and prefer_literal in overload
landmanbester Jun 25, 2024
d771024
remove imit cab
landmanbester Jun 26, 2024
58866ba
add noise map option to grid worker
landmanbester Jun 26, 2024
277d26b
Merge branch 'band_actors' into pcg_spotless
landmanbester Jun 26, 2024
ecb0ea2
start incorporating new preconditioner
landmanbester Jun 27, 2024
b2a17e8
use model and resid name in restore worker
landmanbester Jun 27, 2024
ac4b9f3
add separate fits output folder
landmanbester Jun 27, 2024
af69c73
Merge branch 'band_actors' into pcg_spotless
landmanbester Jun 27, 2024
63714d5
test precond
landmanbester Jun 27, 2024
edae7b6
report compute time after grid and sara
landmanbester Jun 27, 2024
bad0da5
Merge branch 'band_actors' into pcg_spotless
landmanbester Jun 27, 2024
119d640
rewrite band_actor with precond and full hess
landmanbester Jun 28, 2024
16ffe2d
tweak spotless
landmanbester Jun 29, 2024
ec82e34
fix missing xds_from_zarr import
landmanbester Jun 29, 2024
05a27e8
Merge branch 'band_actors' into pcg_spotless
landmanbester Jun 30, 2024
e92f304
compare full Hessian in forward and backward steps
landmanbester Jul 1, 2024
d88e54b
don't import vis2dirty and dirty2vis from experimental module
landmanbester Jul 1, 2024
5701b7b
add minimal hessian vs psf convolve test
landmanbester Jul 1, 2024
da19dbe
add .vscode to gitignore
landmanbester Jul 1, 2024
50eb6ed
test .rst docs
landmanbester Jul 1, 2024
94a4684
naming conventions
landmanbester Jul 1, 2024
71c3db0
naming conventions
landmanbester Jul 1, 2024
083dbe8
fix testing suite after changing naming conventions
landmanbester Jul 1, 2024
aac331b
fix failing tests
landmanbester Jul 1, 2024
c503a20
add missing imports and create naming.py for consistent naming conven…
landmanbester Jul 1, 2024
294efa8
apply min-val cut after extrapolating in model2comps
landmanbester Jul 2, 2024
75ec456
fix typo in fits output name
landmanbester Jul 2, 2024
0e0f685
fix another typo in fits output name
landmanbester Jul 2, 2024
d137c19
add fits-output-folder for restore and model2comps
landmanbester Jul 2, 2024
400ca2e
remove nthreads-dask from hci and smoovie inputs
landmanbester Jul 3, 2024
249ed72
fake client for testing
landmanbester Jul 6, 2024
1716785
move grid worker to client interface. Introduce fake_client
landmanbester Jul 8, 2024
9259b5c
fix beam test
landmanbester Jul 8, 2024
65f464b
drop all collections in sara and klean
landmanbester Jul 8, 2024
08e6e38
get fluxmop working with new data structures
landmanbester Jul 9, 2024
8ec451a
make fluxmop reversible again
landmanbester Jul 10, 2024
6ecef4f
start towards uniformly blurred image
landmanbester Jul 11, 2024
1c51285
client interface in restore worker
landmanbester Jul 12, 2024
d52a36f
don't over-subscribe threads in compute_counts
landmanbester Jul 14, 2024
471f3ad
make BDA max_fov an input
landmanbester Jul 14, 2024
f02a238
set_client -> get_client in fluxmop
landmanbester Jul 15, 2024
de8ead0
compare parallel to serial read zarr from list of datasets
landmanbester Jul 15, 2024
eded7b1
do not use RGI unnecessarily
landmanbester Jul 15, 2024
3e793f3
remove serial read comparison
landmanbester Jul 15, 2024
965981b
fix slow im weight computation
landmanbester Jul 15, 2024
71484f7
don't pass evaluated dds when writing fits
landmanbester Jul 15, 2024
ebc2563
typo max-field-of0view -> max-field-of-view
landmanbester Jul 15, 2024
dae6886
remove tbid from compute_counts inputs
landmanbester Jul 15, 2024
cc12cf1
return sum over grid axis in _compute_counts
landmanbester Jul 15, 2024
c2c1f70
don't log in submit calls
landmanbester Jul 15, 2024
a9deb12
write fits in parallel when using client in fluxmop
landmanbester Jul 15, 2024
ff3489f
add missing wait import
landmanbester Jul 15, 2024
b1c03a9
taper preconditioner for CG
landmanbester Jul 16, 2024
d6c4c4d
test image restoration tweak for more homogenous resolution
landmanbester Jul 16, 2024
6611e83
typo
landmanbester Jul 16, 2024
a70c19b
write fits to correct location in restore
landmanbester Jul 17, 2024
08baace
fix direct inverse bug
landmanbester Jul 18, 2024
ec03605
direct to workers by default in spotless
landmanbester Jul 18, 2024
8789dbe
original psi
landmanbester Jul 19, 2024
1162d05
optimise wavelets and direct hess-approx
landmanbester Jul 19, 2024
33a8b83
typo
landmanbester Jul 19, 2024
a966e2d
test retrieving result with as_completed
landmanbester Jul 19, 2024
9a5f682
crude actor test to find where the time is spent
landmanbester Jul 19, 2024
35c7eff
checkpoint
landmanbester Jul 19, 2024
6dcc0c6
test numexpr and add missing imports
landmanbester Jul 19, 2024
5813716
sara with new direct updates
landmanbester Jul 20, 2024
a495391
time pd units
landmanbester Jul 20, 2024
40ca852
control number of vertical numba threads with context manager
landmanbester Jul 20, 2024
318269d
try process pool for wavelets
landmanbester Jul 20, 2024
5931fe2
truly vertical psi
landmanbester Jul 20, 2024
4712ae3
test if copyto is more consistent
landmanbester Jul 20, 2024
fe2d036
fix tests
landmanbester Jul 21, 2024
fe0a686
restore __version__
landmanbester Jul 21, 2024
566350e
fix noise realisation
landmanbester Jul 21, 2024
d2d2653
add option to compute intrinsic model resolution by deconvolving psf.…
landmanbester Jul 22, 2024
b452323
report timings in primal dual
landmanbester Jul 22, 2024
2f7f0d8
fix typo
landmanbester Jul 22, 2024
84eabb6
fix stupid uint8 slicing bug
landmanbester Jul 22, 2024
91accb7
psfhat -> abspsf for precond
landmanbester Jul 22, 2024
66a017c
fix failing sara test
landmanbester Jul 23, 2024
d5a6c38
fix failing clean test, raise error if PSF is too small
landmanbester Jul 23, 2024
96348a8
fix restore mess
landmanbester Jul 25, 2024
180eb2a
iterable pd-tol
landmanbester Jul 26, 2024
0e6d1b3
fix failing convolve2gaussres test
landmanbester Jul 27, 2024
a5529a1
fix sara test
landmanbester Jul 27, 2024
1b23368
add ability to combine models in model2comps
landmanbester Jul 31, 2024
523fdc4
wait for futures when using restore worker
landmanbester Aug 1, 2024
2de08b1
add option to drop nearly null bands
landmanbester Aug 2, 2024
1f94586
freq_factor when thresholding
landmanbester Aug 10, 2024
0d05943
add init-factor, rms-outside-model to sara and allow empty list for d…
landmanbester Aug 11, 2024
d62f01c
try inverted freq_factor
landmanbester Aug 11, 2024
b3770cc
try low order poly fit in major cycle
landmanbester Aug 11, 2024
a8c7e09
check for missing/inconsistent number of antennas
landmanbester Aug 13, 2024
21f4783
raise error if future does not complete correctly in init
landmanbester Aug 13, 2024
768462d
check if future result in BaseError
landmanbester Aug 13, 2024
a8e96fd
report antenna table size vs max ants inferred from ant1 and ant2 in …
landmanbester Aug 13, 2024
18ab7dd
report msid when things go wrong in stokes2vis
landmanbester Aug 13, 2024
10ca039
fix init bug when using multiple measurement sets without gain soluti…
landmanbester Aug 14, 2024
d8ad890
fix fits reference freq
landmanbester Aug 15, 2024
071976d
add timing summary to primal dual
landmanbester Aug 15, 2024
d711704
eps per scan and fix indexing when no-concat in dds2fits
landmanbester Aug 16, 2024
6a56167
make freq ordering optional in dds_from_list
landmanbester Aug 16, 2024
6513e3a
propagate change to dds2fits
landmanbester Aug 16, 2024
cb60599
use per basis rms_comps, add epsfactor
landmanbester Aug 19, 2024
29d5233
typo in model2comps
landmanbester Aug 19, 2024
4dfc142
remove repeated tol in stokes2im
landmanbester Aug 19, 2024
b55af97
fix imports in stokes2im
landmanbester Aug 19, 2024
fa6f92e
add import in hci
landmanbester Aug 19, 2024
1995e45
cellx -> cell_rad in stokes2im
landmanbester Aug 19, 2024
d1885c3
robustness -> opts.robustness in stokes2im
landmanbester Aug 19, 2024
eb15628
add natural grad to hci
landmanbester Aug 20, 2024
5272dfa
remove concat_time in hci for now
landmanbester Aug 21, 2024
fa83bb4
tweak freq mapping in hci
landmanbester Aug 21, 2024
120ba22
allow freq selection in degrid
landmanbester Aug 22, 2024
ea4395f
add model-column as input to hci
landmanbester Aug 22, 2024
428dd7f
model-column also in hci parset
landmanbester Aug 22, 2024
083027e
typo
landmanbester Aug 22, 2024
c4169cb
remove fmap in hci
landmanbester Aug 22, 2024
6ff91c0
scatter mds to workers in degrid
landmanbester Aug 22, 2024
5497df9
fix bug when transferring model from comps
landmanbester Aug 23, 2024
2108b7a
cast model to double precision in stokes2im
landmanbester Aug 24, 2024
415cdb7
flip_v everywhere and correct fits crpix2 convention
landmanbester Aug 26, 2024
15021cd
fix and improve sara test
landmanbester Aug 26, 2024
5eff059
fix weighting and polproducts test
landmanbester Aug 27, 2024
9e911d0
fix hanging behaviour when ntasks < nworkers in init and hci
landmanbester Aug 27, 2024
e82df89
report worker memory in init
landmanbester Aug 27, 2024
6ccef0b
attempt to clear memory with client.run
landmanbester Aug 27, 2024
a3dd719
clean up custom as_completed logic
landmanbester Aug 27, 2024
91709c4
make memory reporting optional
landmanbester Aug 28, 2024
72ff08d
attempt to get rid of cluster cleanup errors
landmanbester Aug 28, 2024
fac50dc
remove debugging print statements
landmanbester Aug 28, 2024
4c06b78
consistent use of _ in l1 and l2 reweight params
landmanbester Aug 29, 2024
c28f6d0
add hci can definition
landmanbester Aug 29, 2024
0f087a2
add pass_missing_as_none policies to all workers
landmanbester Aug 30, 2024
ce9890e
remove stray quit()
landmanbester Aug 30, 2024
f882eb0
remove defaults dict from worker definitions
landmanbester Aug 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,6 @@ ENV/

# mypy
.mypy_cache/

# vscode
.vscode/
64 changes: 63 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
===========
pfb-imaging
=========
===========

.. .. image:: /logo/Gemini_Generated_Image_m19n6gm19n6gm19n.jpg
.. :align: center

Radio interferometric imaging suite base on the pre-conditioned forward-backward algorithm.

Installation
~~~~~~~~~~~~

Install the package by cloning and running

:code:`$ pip install -e pfb-imaging/`
Expand All @@ -20,6 +27,61 @@ no binary mode eg

:code:`$ pip install -e ducc`

Default naming conventions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The default outputs are named using a conbination of the following parameters

* `--output-filename`
* `--product`
* `--suffix`

The output dataset of the `init` application (viz. the `xds`) will be called

:code:`f"{output_filename}_{product}.xds"`

with standard python string substitutions. The grid worker creates another dataset (viz. the `dds``) called

:code:`f"{output_filename}_{product}_{suffix}.dds"`

i.e. with the suffix appended (the default suffix being `main`) which contains imaging data products such as the dirty and PSF images etc.
This is to allow imaging multiple fields from a single set of (possibly averaged) corrected Stokes visibilities produced by the `init` applcation.
For example, the sun can be imaged from and `xds` prepared for the main field by setting `--target sun` and, for example, `--suffix sun`.
The `--target` parameter can be any object recognised by `astropy` and can also be specified using the `HH:MM:SS,DD:MM:SS` format.
All deconvolution algorithms interact with the `dds` produced by the `grid` application and will produce a component model called

:code:`f"{output_filename}_{product}_{suffix}_model.mds"`

as well as standard pixelated images which are stored in the `dds`.
They also produce fits images but these are mainly for inspection with fits viewers.
By default logs and fits files are stored in the directory specifed by `--output-filename` but these can be overwritten with the

* `--fits-output-folder` and
* `--log-directory`

parameters. Fits files are stored with the same naming convention but with the base output directory set to `--fits-output-folder`.
Logs are stored by application name with a time stamp to prevent inadvertently overwriting them.
The output paths for all files are reported in the log.

Parallelism settings
~~~~~~~~~~~~~~~~~~~~

There are two settings controlling parallelism viz.

* `--nworkers` which controls how many chunks (usually corresponding to imaging bands) will be processed in parallel and
* `--nthreads-per-worker` which specifies the number of threads available to each worker (eg. for gridding, FFT's and wavelet transforms).

By default only a single worker is used so that datasets will be processed one at a time.
This results in the smallest memory footprint but won't always result in optimal resource utilisation.
It also allows for easy debugging as there is no Dask cluster involved in this case.
However, for better resource utilisation and or distributing computations over a cluster, you may wish to set `--nworkers` larger than one.
This uses multiple Dask workers (processes) to process chunks in parallel and is especially useful for the `init`, `grid` and `fluxmop` applications.
It is usually advisable to set `--nworkers` to the number of desired imaging bands which is set by the `--channels-per-image` parameter when initialising corrected Stokes visibilities with the `init` application.
The product of `--nworkers` and `--nthreads-per-worker` should not exceed the available resources.

Acknowledgement
~~~~~~~~~~~~~~

If you find any of this useful please cite (for now)

https://arxiv.org/abs/2101.08072
Binary file added logo/Gemini_Generated_Image_m19n6gm19n6gm19n.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
154 changes: 49 additions & 105 deletions pfb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,20 @@
"""
pfb-imaging

author - Landman Bester
email - [email protected]
date - 31/03/2020

MIT License

Copyright (c) 2020 Rhodes University Centre for Radio Astronomy Techniques & Technologies (RATT)

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

"""
__version__ = '0.0.4'

import os
import sys
# import psutil
# import resource

def set_client(opts, stack, log, scheduler='distributed'):
# mem_total = psutil.virtual_memory().total
# _, hardlim = resource.getrlimit(resource.RLIMIT_AS)
# resource.setrlimit(resource.RLIMIT_AS, (mem_total, hardlim))

from omegaconf import open_dict
# attempt somewhat intelligent default setup
import multiprocessing
nthreads_max = max(multiprocessing.cpu_count(), 1)
if opts.nvthreads is None:
# we allocate half by default
nthreads_tot = max(nthreads_max//2, 1)
if opts.scheduler in ['single-threaded', 'sync']:
with open_dict(opts):
opts.nvthreads = nthreads_tot
else:
ndask_chunks = opts.nthreads_dask*opts.nworkers
nvthreads = max(nthreads_tot//ndask_chunks, 1)
with open_dict(opts):
opts.nvthreads = nvthreads
__version__ = '0.0.4'

# os.environ["OMP_NUM_THREADS"] = str(opts.nvthreads)
os.environ["OPENBLAS_NUM_THREADS"] = str(opts.nvthreads)
os.environ["MKL_NUM_THREADS"] = str(opts.nvthreads)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(opts.nvthreads)
os.environ["NUMBA_NUM_THREADS"] = str(opts.nvthreads)
def set_envs(nthreads, ncpu):
os.environ["OMP_NUM_THREADS"] = str(nthreads)
os.environ["OPENBLAS_NUM_THREADS"] = str(nthreads)
os.environ["MKL_NUM_THREADS"] = str(nthreads)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(nthreads)
# os.environ["NUMBA_NUM_THREADS"] = str(nthreads)
os.environ["JAX_ENABLE_X64"] = 'True'
# this may be required for numba parallelism
# find python and set LD_LIBRARY_PATH
Expand All @@ -67,61 +26,46 @@ def set_client(opts, stack, log, scheduler='distributed'):
os.environ["LD_LIBRARY_PATH"] = f'{ldpath}:{ldcurrent}'
# TODO - should we fall over in else?

import numexpr as ne
max_cores = ne.detect_number_of_cores()
# ne_threads = min(max_cores, opts.nvthreads)
os.environ["NUMEXPR_NUM_THREADS"] = str(max_cores)
ne_threads = min(ncpu, nthreads)
os.environ["NUMEXPR_NUM_THREADS"] = str(ne_threads)

import dask
if scheduler=='distributed':
# TODO - investigate what difference this makes
# with dask.config.set({"distributed.scheduler.worker-saturation": 1.1}):
# client = distributed.Client()
# set up client
host_address = opts.host_address or os.environ.get("DASK_SCHEDULER_ADDRESS")
if host_address is not None:
from distributed import Client
print("Initialising distributed client.", file=log)
client = stack.enter_context(Client(host_address))
else:
if opts.nthreads_dask * opts.nvthreads > nthreads_max:
print("Warning - you are attempting to use more threads than "
"available. This may lead to suboptimal performance.",
file=log)
from dask.distributed import Client, LocalCluster
print("Initialising client with LocalCluster.", file=log)
with dask.config.set({"distributed.scheduler.worker-saturation": 1.1}):
cluster = LocalCluster(processes=opts.nworkers > 1,
n_workers=opts.nworkers,
threads_per_worker=opts.nthreads_dask,
memory_limit=0, # str(mem_limit/nworkers)+'GB'
asynchronous=False)
cluster = stack.enter_context(cluster)
client = stack.enter_context(Client(cluster))

from quartical.scheduling import install_plugin
client.run_on_scheduler(install_plugin)
client.wait_for_workers(opts.nworkers)
elif scheduler in ['sync', 'single-threaded']:
dask.config.set(scheduler=scheduler)
print(f"Initialising with synchronous scheduler",
file=log)
elif scheduler=='threads':
from multiprocessing.pool import ThreadPool
dask.config.set(pool=ThreadPool(opts.nthreads_dask))
print(f"Initialising ThreadPool with {opts.nthreads_dask} threads",
file=log)
elif scheduler=='processes':
# TODO - why is the performance so terrible in this case?
from multiprocessing.pool import Pool
dask.config.set(pool=Pool(opts.nthreads_dask))
print(f"Initialising Pool with {opts.nthreads_dask} processes",
file=log)
def set_client(nworkers, log, stack=None,
host_address=None, direct_to_workers=False):
import dask
dask.config.set({'distributed.comm.compression': 'lz4'})
# set up client
host_address = host_address or os.environ.get("DASK_SCHEDULER_ADDRESS")
if host_address is not None:
from distributed import Client
print("Initialising distributed client.", file=log)
client = stack.enter_context(Client(host_address))
else:
raise ValueError(f"Unknown scheduler option {opts.scheduler}")

# return updated opts
return opts
from dask.distributed import Client, LocalCluster
print("Initialising client with LocalCluster.", file=log)
dask.config.set({
'distributed.comm.compression': {
'on': True,
'type': 'blosc'
}
})
cluster = LocalCluster(processes=True,
n_workers=nworkers,
threads_per_worker=1,
memory_limit=0, # str(mem_limit/nworkers)+'GB'
asynchronous=False)
if stack is not None:
cluster = stack.enter_context(cluster)
client = Client(cluster,
direct_to_workers=direct_to_workers)
if stack is not None:
client = stack.enter_context(client)

client.wait_for_workers(nworkers)
dashboard_url = client.dashboard_link
print(f"Dask Dashboard URL at {dashboard_url}", file=log)

return client


def logo():
Expand Down
24 changes: 15 additions & 9 deletions pfb/deconv/clark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from numba import njit
import dask.array as da
from pfb.operators.psf import psf_convolve_cube
from ducc0.misc import make_noncritical
from ducc0.misc import empty_noncritical
import pyscilog
log = pyscilog.get_logger('CLARK')

Expand Down Expand Up @@ -45,10 +45,13 @@ def subminor(A, psf, Ip, Iq, model, wsums, gamma=0.05, th=0.0, maxit=10000):
q = Iq[pq]

where pq is the location of the maximum in A.

# TODO - this does not work unless the PSF is twice the size of the image
"""
nband, nx_psf, ny_psf = psf.shape
nxo2 = nx_psf//2
nyo2 = ny_psf//2
# _, nx, ny = model.shape
Asearch = np.sum(A, axis=0)**2
pq = Asearch.argmax()
p = Ip[pq]
Expand All @@ -63,7 +66,13 @@ def subminor(A, psf, Ip, Iq, model, wsums, gamma=0.05, th=0.0, maxit=10000):
model[fsel, p, q] += gamma * xhat[fsel]/wsums[fsel]
Idelp = p - Ip
Idelq = q - Iq
# find where PSF overlaps with image
# # find where PSF overlaps with image
# left_x = p - Ip >= 0
# right_x = p + Ip < nx
# left_y = q - Iq >= 0
# right_y = q + Iq < ny
# mask = (left_x & right_x) & (left_y & right_y)

mask = (np.abs(Idelp) <= nxo2) & (np.abs(Idelq) <= nyo2)
# Ipp, Iqq = psf[:, nxo2 - Ip[mask], nyo2 - Iq[mask]]
A = subtract(A[:, mask], psf,
Expand Down Expand Up @@ -105,15 +114,11 @@ def clark(ID,
# PSFHAT /= wsum
# wsums = wsums/wsum
model = np.zeros((nband, nx, ny), dtype=ID.dtype)
model = make_noncritical(model)
IR = ID.copy()
# pre-allocate arrays for doing FFT's
xout = np.empty(ID.shape, dtype=ID.dtype, order='C')
xout = make_noncritical(xout)
xpad = np.empty(PSF.shape, dtype=ID.dtype, order='C')
xpad = make_noncritical(xpad)
xhat = np.empty(PSFHAT.shape, dtype=PSFHAT.dtype)
xhat = make_noncritical(xhat)
xout = empty_noncritical(ID.shape, dtype=ID.dtype)
xpad = empty_noncritical(PSF.shape, dtype=ID.dtype)
xhat = empty_noncritical(PSFHAT.shape, dtype=PSFHAT.dtype)
# square avoids abs of full array
IRsearch = np.sum(IR, axis=0)**2
pq = IRsearch.argmax()
Expand All @@ -132,6 +137,7 @@ def clark(ID,
gamma=gamma,
th=subth,
maxit=submaxit)

# subtract from full image (as in major cycle)
psf_convolve_cube(
xpad,
Expand Down
Loading
Loading