Skip to content

Commit

Permalink
Refactor to make all algorithms use the same formats, simplify parall…
Browse files Browse the repository at this point in the history
…elism, direct pre-conditioning and produce uniformly blurred images (#111)

* add option to visualise init graphs

* name jones ones

* clone t mapping in stokes2vis

* print visualised graph locations to log

* add jsk wavelets and initial approach to distribute spotless

* growing bytearray problem

* eliminate wavelets and cf.futures for testing

* band actor chaos

* images rst

* heading at the top

* unify gain-table and gain-term params in init

* simplified band actors approach

* residuals decreasing

* Add imit worker that combines init and grid steps (for use with distributed minors)

* cleaner distributed workflow

* add l2 reweighting to dist_spotless

* tweak when L2 reweighting happens

* add imit cab

* add missing xds_from_zarr import to imit

* allow diagonal jones in imit worker

* add unique keys for imit jobs, print scheduler address

* add key for reading in worker_client in stokes2im

* test to_zarr with no dask

* clone subds in imit

* direct to workers in imit client

* clone inside image_data_products

* set OMP_NUM_THREADS so wgridder can parallelise

* print update and ratio timings

* test communication times with direct_to_workers

* cache of imaging data products. allow worker compression

* numba ratio computation

* allow overwriting existing model

* allow fits output format for fastim worker

* add DATE-OBS to fits header

* add BDA support

* make sure ddsstore is initialised when overwriting

* norm wsum when writing fits out of fastim

* depend on uninneddeps qcal branch

* typo

* depend on codex-africanus >= 0.3.7

* prepend protocol to ds_list)

* l2reweight-from option

* remove print debug statements

* try except around imaging call

* direct to workers to false

* print cache name that we are attempting to write to

* print cname

* don't use flag_row in bda

* test broadcast_to trick for psf_vis

* print shape after bda

* print shapes outside of sunbmit call

* check for infs or nans

* check for null subbands

* allow empty string for frange. Write residual at iter0

* grab uvw from res after averaging

* change imit output name to xds

* re-enable bda

* update chan_width after simple average

* debug call to stokes vis func outside of client

* debug call to stokes vis func outside of client, ipdb

* typo

* remove debug call

* drop flagged rows in intermediaries

* add wgt in compute_counts

* make grid app compatible with init output

* sum counts before filtering extremely low values

* test negate w

* also negate w in degrid

* undo w negation

* yet another fastim test

* overwrite restore products by default. Scale by pixel area. raise error if fastim future errs

* add or-model-with-mask option to fluxmop

* keep track of initial model and residual after fluxmop in case of failure

* typo

* don't zero model outside mask by default

* make model2comps write output fits cube

* output model cube in model2comps

* use model-out to set fits output path if set

* residname to fluxmop in case of recovery

* linear extrapolation in model2comps

* pin numpy <= 2.0.0

* resize ducc threadpool

* separate distributed spotless algorithm from memory conservative sara deconv worker

* fastim -> hci

* report number of basis functions used for fit in model2comps

* report interpolation error in model2comps

* fastim -> hci in main

* slice out time 0 for reporting fractional interp error

* try except in sum_overlap

* rudimentary support for MFS weighting (only on second run of grid app)

* allow null input model in fluxmop

* add memory greedy option to fluxmop

* use cetral frequency as ref_freq

* set memory limit. Move imit functionality to init and remove duplicate. remove dead pcg code

* remove imit import from main

* start simplifying band_worker

* set crpix3 for MFS fits files

* allow running fluxmop without psf in dds

* pin numba, don't ignore deprecatin warnings

* unpin numba and prefer_literal in overload

* remove imit cab

* add noise map option to grid worker

* start incorporating new preconditioner

* use model and resid name in restore worker

* add separate fits output folder

* test precond

* report compute time after grid and sara

* rewrite band_actor with precond and full hess

* tweak spotless

* fix missing xds_from_zarr import

* compare full Hessian in forward and backward steps

* don't import vis2dirty and dirty2vis from experimental module

* add minimal hessian vs psf convolve test

* add .vscode to gitignore

* test .rst docs

* naming conventions

* naming conventions

* fix testing suite after changing naming conventions

* fix failing tests

* add missing imports and create naming.py for consistent naming conventions

* apply min-val cut after extrapolating in model2comps

* fix typo in fits output name

* fix another typo in fits output name

* add fits-output-folder for restore and model2comps

* remove nthreads-dask from hci and smoovie inputs

* fake client for testing

* move grid worker to client interface. Introduce fake_client

* fix beam test

* drop all collections in sara and klean

* get fluxmop working with new data structures

* make fluxmop reversible again

* start towards uniformly blurred image

* client interface in restore worker

* don't over-subscribe threads in compute_counts

* make BDA max_fov an input

* set_client -> get_client in fluxmop

* compare parallel to serial read zarr from list of datasets

* do not use RGI unnecessarily

* remove serial read comparison

* fix slow im weight computation

* don't pass evaluated dds when writing fits

* typo max-field-of0view -> max-field-of-view

* remove tbid from compute_counts inputs

* return sum over grid axis in _compute_counts

* don't log in submit calls

* write fits in parallel when using client in fluxmop

* add missing wait import

* taper preconditioner for CG

* test image restoration tweak for more homogenous resolution

* typo

* write fits to correct location in restore

* fix direct inverse bug

* direct to workers by default in spotless

* original psi

* optimise wavelets and direct hess-approx

* typo

* test retrieving result with as_completed

* crude actor test to find where the time is spent

* checkpoint

* test numexpr and add missing imports

* sara with new direct updates

* time pd units

* control number of vertical numba threads with context manager

* try process pool for wavelets

* truly vertical psi

* test if copyto is more consistent

* fix tests

* restore __version__

* fix noise realisation

* add option to compute intrinsic model resolution by deconvolving psf. Change psi settings for better performance

* report timings in primal dual

* fix typo

* fix stupid uint8 slicing bug

* psfhat -> abspsf for precond

* fix failing sara test

* fix failing clean test, raise error if PSF is too small

* fix restore mess

* iterable pd-tol

* fix failing convolve2gaussres test

* fix sara test

* add ability to combine models in model2comps

* wait for futures when using restore worker

* add option to drop nearly null bands

* freq_factor when thresholding

* add init-factor, rms-outside-model to sara and allow empty list for drop-bands in restore

* try inverted freq_factor

* try low order poly fit in major cycle

* check for missing/inconsistent number of antennas

* raise error if future does not complete correctly in init

* check if future result in BaseError

* report antenna table size vs max ants inferred from ant1 and ant2 in stokes2vis

* report msid when things go wrong in stokes2vis

* fix init bug when using multiple measurement sets without gain solutions. add concat-time option to hci worker

* fix fits reference freq

* add timing summary to primal dual

* eps per scan and fix indexing when no-concat in dds2fits

* make freq ordering optional in dds_from_list

* propagate change to dds2fits

* use per basis rms_comps, add epsfactor

* typo in model2comps

* remove repeated tol in stokes2im

* fix imports in stokes2im

* add import in hci

* cellx -> cell_rad in stokes2im

* robustness -> opts.robustness in stokes2im

* add natural grad to hci

* remove concat_time in hci for now

* tweak freq mapping in hci

* allow freq selection in degrid

* add model-column as input to hci

* model-column also in hci parset

* typo

* remove fmap in hci

* scatter mds to workers in degrid

* fix bug when transferring model from comps

* cast model to double precision in stokes2im

* flip_v everywhere and correct fits crpix2 convention

* fix and improve sara test

* fix weighting and polproducts test

* fix hanging behaviour when ntasks < nworkers in init and hci

* report worker memory in init

* attempt to clear memory with client.run

* clean up custom as_completed logic

* make memory reporting optional

* attempt to get rid of cluster cleanup errors

* remove debugging print statements

* consistent use of _ in l1 and l2 reweight params

* add hci can definition

* add pass_missing_as_none policies to all workers

* remove stray quit()

* remove defaults dict from worker definitions

---------

Co-authored-by: landmanbester <[email protected]>
  • Loading branch information
landmanbester and landmanbester authored Aug 30, 2024
1 parent 7e3077c commit d38dff9
Show file tree
Hide file tree
Showing 74 changed files with 7,912 additions and 6,290 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,6 @@ ENV/

# mypy
.mypy_cache/

# vscode
.vscode/
64 changes: 63 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
===========
pfb-imaging
=========
===========

.. .. image:: /logo/Gemini_Generated_Image_m19n6gm19n6gm19n.jpg
.. :align: center
Radio interferometric imaging suite base on the pre-conditioned forward-backward algorithm.

Installation
~~~~~~~~~~~~

Install the package by cloning and running

:code:`$ pip install -e pfb-imaging/`
Expand All @@ -20,6 +27,61 @@ no binary mode eg

:code:`$ pip install -e ducc`

Default naming conventions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The default outputs are named using a conbination of the following parameters

* `--output-filename`
* `--product`
* `--suffix`

The output dataset of the `init` application (viz. the `xds`) will be called

:code:`f"{output_filename}_{product}.xds"`

with standard python string substitutions. The grid worker creates another dataset (viz. the `dds``) called

:code:`f"{output_filename}_{product}_{suffix}.dds"`

i.e. with the suffix appended (the default suffix being `main`) which contains imaging data products such as the dirty and PSF images etc.
This is to allow imaging multiple fields from a single set of (possibly averaged) corrected Stokes visibilities produced by the `init` applcation.
For example, the sun can be imaged from and `xds` prepared for the main field by setting `--target sun` and, for example, `--suffix sun`.
The `--target` parameter can be any object recognised by `astropy` and can also be specified using the `HH:MM:SS,DD:MM:SS` format.
All deconvolution algorithms interact with the `dds` produced by the `grid` application and will produce a component model called

:code:`f"{output_filename}_{product}_{suffix}_model.mds"`

as well as standard pixelated images which are stored in the `dds`.
They also produce fits images but these are mainly for inspection with fits viewers.
By default logs and fits files are stored in the directory specifed by `--output-filename` but these can be overwritten with the

* `--fits-output-folder` and
* `--log-directory`

parameters. Fits files are stored with the same naming convention but with the base output directory set to `--fits-output-folder`.
Logs are stored by application name with a time stamp to prevent inadvertently overwriting them.
The output paths for all files are reported in the log.

Parallelism settings
~~~~~~~~~~~~~~~~~~~~

There are two settings controlling parallelism viz.

* `--nworkers` which controls how many chunks (usually corresponding to imaging bands) will be processed in parallel and
* `--nthreads-per-worker` which specifies the number of threads available to each worker (eg. for gridding, FFT's and wavelet transforms).

By default only a single worker is used so that datasets will be processed one at a time.
This results in the smallest memory footprint but won't always result in optimal resource utilisation.
It also allows for easy debugging as there is no Dask cluster involved in this case.
However, for better resource utilisation and or distributing computations over a cluster, you may wish to set `--nworkers` larger than one.
This uses multiple Dask workers (processes) to process chunks in parallel and is especially useful for the `init`, `grid` and `fluxmop` applications.
It is usually advisable to set `--nworkers` to the number of desired imaging bands which is set by the `--channels-per-image` parameter when initialising corrected Stokes visibilities with the `init` application.
The product of `--nworkers` and `--nthreads-per-worker` should not exceed the available resources.

Acknowledgement
~~~~~~~~~~~~~~

If you find any of this useful please cite (for now)

https://arxiv.org/abs/2101.08072
Binary file added logo/Gemini_Generated_Image_m19n6gm19n6gm19n.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
154 changes: 49 additions & 105 deletions pfb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,20 @@
"""
pfb-imaging
author - Landman Bester
email - [email protected]
date - 31/03/2020
MIT License
Copyright (c) 2020 Rhodes University Centre for Radio Astronomy Techniques & Technologies (RATT)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
__version__ = '0.0.4'

import os
import sys
# import psutil
# import resource

def set_client(opts, stack, log, scheduler='distributed'):
# mem_total = psutil.virtual_memory().total
# _, hardlim = resource.getrlimit(resource.RLIMIT_AS)
# resource.setrlimit(resource.RLIMIT_AS, (mem_total, hardlim))

from omegaconf import open_dict
# attempt somewhat intelligent default setup
import multiprocessing
nthreads_max = max(multiprocessing.cpu_count(), 1)
if opts.nvthreads is None:
# we allocate half by default
nthreads_tot = max(nthreads_max//2, 1)
if opts.scheduler in ['single-threaded', 'sync']:
with open_dict(opts):
opts.nvthreads = nthreads_tot
else:
ndask_chunks = opts.nthreads_dask*opts.nworkers
nvthreads = max(nthreads_tot//ndask_chunks, 1)
with open_dict(opts):
opts.nvthreads = nvthreads
__version__ = '0.0.4'

# os.environ["OMP_NUM_THREADS"] = str(opts.nvthreads)
os.environ["OPENBLAS_NUM_THREADS"] = str(opts.nvthreads)
os.environ["MKL_NUM_THREADS"] = str(opts.nvthreads)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(opts.nvthreads)
os.environ["NUMBA_NUM_THREADS"] = str(opts.nvthreads)
def set_envs(nthreads, ncpu):
os.environ["OMP_NUM_THREADS"] = str(nthreads)
os.environ["OPENBLAS_NUM_THREADS"] = str(nthreads)
os.environ["MKL_NUM_THREADS"] = str(nthreads)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(nthreads)
# os.environ["NUMBA_NUM_THREADS"] = str(nthreads)
os.environ["JAX_ENABLE_X64"] = 'True'
# this may be required for numba parallelism
# find python and set LD_LIBRARY_PATH
Expand All @@ -67,61 +26,46 @@ def set_client(opts, stack, log, scheduler='distributed'):
os.environ["LD_LIBRARY_PATH"] = f'{ldpath}:{ldcurrent}'
# TODO - should we fall over in else?

import numexpr as ne
max_cores = ne.detect_number_of_cores()
# ne_threads = min(max_cores, opts.nvthreads)
os.environ["NUMEXPR_NUM_THREADS"] = str(max_cores)
ne_threads = min(ncpu, nthreads)
os.environ["NUMEXPR_NUM_THREADS"] = str(ne_threads)

import dask
if scheduler=='distributed':
# TODO - investigate what difference this makes
# with dask.config.set({"distributed.scheduler.worker-saturation": 1.1}):
# client = distributed.Client()
# set up client
host_address = opts.host_address or os.environ.get("DASK_SCHEDULER_ADDRESS")
if host_address is not None:
from distributed import Client
print("Initialising distributed client.", file=log)
client = stack.enter_context(Client(host_address))
else:
if opts.nthreads_dask * opts.nvthreads > nthreads_max:
print("Warning - you are attempting to use more threads than "
"available. This may lead to suboptimal performance.",
file=log)
from dask.distributed import Client, LocalCluster
print("Initialising client with LocalCluster.", file=log)
with dask.config.set({"distributed.scheduler.worker-saturation": 1.1}):
cluster = LocalCluster(processes=opts.nworkers > 1,
n_workers=opts.nworkers,
threads_per_worker=opts.nthreads_dask,
memory_limit=0, # str(mem_limit/nworkers)+'GB'
asynchronous=False)
cluster = stack.enter_context(cluster)
client = stack.enter_context(Client(cluster))

from quartical.scheduling import install_plugin
client.run_on_scheduler(install_plugin)
client.wait_for_workers(opts.nworkers)
elif scheduler in ['sync', 'single-threaded']:
dask.config.set(scheduler=scheduler)
print(f"Initialising with synchronous scheduler",
file=log)
elif scheduler=='threads':
from multiprocessing.pool import ThreadPool
dask.config.set(pool=ThreadPool(opts.nthreads_dask))
print(f"Initialising ThreadPool with {opts.nthreads_dask} threads",
file=log)
elif scheduler=='processes':
# TODO - why is the performance so terrible in this case?
from multiprocessing.pool import Pool
dask.config.set(pool=Pool(opts.nthreads_dask))
print(f"Initialising Pool with {opts.nthreads_dask} processes",
file=log)
def set_client(nworkers, log, stack=None,
host_address=None, direct_to_workers=False):
import dask
dask.config.set({'distributed.comm.compression': 'lz4'})
# set up client
host_address = host_address or os.environ.get("DASK_SCHEDULER_ADDRESS")
if host_address is not None:
from distributed import Client
print("Initialising distributed client.", file=log)
client = stack.enter_context(Client(host_address))
else:
raise ValueError(f"Unknown scheduler option {opts.scheduler}")

# return updated opts
return opts
from dask.distributed import Client, LocalCluster
print("Initialising client with LocalCluster.", file=log)
dask.config.set({
'distributed.comm.compression': {
'on': True,
'type': 'blosc'
}
})
cluster = LocalCluster(processes=True,
n_workers=nworkers,
threads_per_worker=1,
memory_limit=0, # str(mem_limit/nworkers)+'GB'
asynchronous=False)
if stack is not None:
cluster = stack.enter_context(cluster)
client = Client(cluster,
direct_to_workers=direct_to_workers)
if stack is not None:
client = stack.enter_context(client)

client.wait_for_workers(nworkers)
dashboard_url = client.dashboard_link
print(f"Dask Dashboard URL at {dashboard_url}", file=log)

return client


def logo():
Expand Down
24 changes: 15 additions & 9 deletions pfb/deconv/clark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from numba import njit
import dask.array as da
from pfb.operators.psf import psf_convolve_cube
from ducc0.misc import make_noncritical
from ducc0.misc import empty_noncritical
import pyscilog
log = pyscilog.get_logger('CLARK')

Expand Down Expand Up @@ -45,10 +45,13 @@ def subminor(A, psf, Ip, Iq, model, wsums, gamma=0.05, th=0.0, maxit=10000):
q = Iq[pq]
where pq is the location of the maximum in A.
# TODO - this does not work unless the PSF is twice the size of the image
"""
nband, nx_psf, ny_psf = psf.shape
nxo2 = nx_psf//2
nyo2 = ny_psf//2
# _, nx, ny = model.shape
Asearch = np.sum(A, axis=0)**2
pq = Asearch.argmax()
p = Ip[pq]
Expand All @@ -63,7 +66,13 @@ def subminor(A, psf, Ip, Iq, model, wsums, gamma=0.05, th=0.0, maxit=10000):
model[fsel, p, q] += gamma * xhat[fsel]/wsums[fsel]
Idelp = p - Ip
Idelq = q - Iq
# find where PSF overlaps with image
# # find where PSF overlaps with image
# left_x = p - Ip >= 0
# right_x = p + Ip < nx
# left_y = q - Iq >= 0
# right_y = q + Iq < ny
# mask = (left_x & right_x) & (left_y & right_y)

mask = (np.abs(Idelp) <= nxo2) & (np.abs(Idelq) <= nyo2)
# Ipp, Iqq = psf[:, nxo2 - Ip[mask], nyo2 - Iq[mask]]
A = subtract(A[:, mask], psf,
Expand Down Expand Up @@ -105,15 +114,11 @@ def clark(ID,
# PSFHAT /= wsum
# wsums = wsums/wsum
model = np.zeros((nband, nx, ny), dtype=ID.dtype)
model = make_noncritical(model)
IR = ID.copy()
# pre-allocate arrays for doing FFT's
xout = np.empty(ID.shape, dtype=ID.dtype, order='C')
xout = make_noncritical(xout)
xpad = np.empty(PSF.shape, dtype=ID.dtype, order='C')
xpad = make_noncritical(xpad)
xhat = np.empty(PSFHAT.shape, dtype=PSFHAT.dtype)
xhat = make_noncritical(xhat)
xout = empty_noncritical(ID.shape, dtype=ID.dtype)
xpad = empty_noncritical(PSF.shape, dtype=ID.dtype)
xhat = empty_noncritical(PSFHAT.shape, dtype=PSFHAT.dtype)
# square avoids abs of full array
IRsearch = np.sum(IR, axis=0)**2
pq = IRsearch.argmax()
Expand All @@ -132,6 +137,7 @@ def clark(ID,
gamma=gamma,
th=subth,
maxit=submaxit)

# subtract from full image (as in major cycle)
psf_convolve_cube(
xpad,
Expand Down
Loading

0 comments on commit d38dff9

Please sign in to comment.