From 3ca4facc04fda8ed7e1302978aa770bf5f22b009 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Fri, 1 Nov 2024 08:19:04 +0200 Subject: [PATCH 01/18] depend on codex-africanus@relax-dependencies --- setup.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 01bce549..73b3c0ce 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ long_description = fh.read() requirements = [ - 'numpy <= 2.0.0', + 'numpy', 'scikit-image', 'PyWavelets', 'katbeam', @@ -20,7 +20,10 @@ "@git+https://github.com/caracal-pipeline/stimela.git" "@clickify_missing_as_none", "streamjoy >= 0.0.8", - "codex-africanus[complete] >= 0.3.7", + # "codex-africanus[complete] >= 0.3.7", + "codex-africanus" + "@git+https://github.com/ratt-ru/codex-africanus.git" + "@relax-dependencies", "dask-ms[xarray, zarr, s3]", "tbb", "jax[cpu]", From 9ed0335e7948eb0d5991c7b33a2ef8fdf7ddf17b Mon Sep 17 00:00:00 2001 From: landmanbester Date: Fri, 1 Nov 2024 08:35:48 +0200 Subject: [PATCH 02/18] drop python3.9 --- .github/workflows/ci.yml | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8bac7b7b..c216be31 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,7 +12,7 @@ jobs: if: "!contains(github.event.head_commit.message, '[skip ci]')" strategy: matrix: - python-version: ["3.9", "3.10", "3.11"] + python-version: ["3.10", "3.11"] steps: - name: Set up Python ${{ matrix.python-version }} diff --git a/setup.py b/setup.py index 73b3c0ce..cba79f09 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ packages=find_packages(), include_package_data=True, zip_safe=False, - python_requires='>=3.9', + python_requires='>=3.10', install_requires=requirements, classifiers=[ "Programming Language :: Python :: 3", From 70a77831bf8367e37eb3a657e1ec5edf13a98a6b Mon Sep 17 00:00:00 2001 From: landmanbester Date: Fri, 1 Nov 2024 08:45:26 +0200 Subject: [PATCH 03/18] py.test -> pytest --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c216be31..2fe0f60b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: run: python -m pip install .[testing] - name: Run tests - run: py.test -s -vvv tests/ + run: pytest -s -vvv tests/ deploy: needs: [test] From ec7dbbe4e514d27c7f6002934402d4c6b1490404 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Fri, 1 Nov 2024 09:02:09 +0200 Subject: [PATCH 04/18] add pytest dependency --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index cba79f09..41a75024 100644 --- a/setup.py +++ b/setup.py @@ -6,6 +6,7 @@ requirements = [ 'numpy', + 'pytest >= 8.0.0' 'scikit-image', 'PyWavelets', 'katbeam', From 4f757c2f6fa8dbed5e19bf57a78df08f25241c33 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Fri, 1 Nov 2024 09:03:09 +0200 Subject: [PATCH 05/18] add missing , --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 41a75024..8887d847 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ requirements = [ 'numpy', - 'pytest >= 8.0.0' + 'pytest >= 8.0.0', 'scikit-image', 'PyWavelets', 'katbeam', From f9d6dc0a81ea91c37b179a03a5068d37dbeeec46 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Fri, 1 Nov 2024 09:26:20 +0200 Subject: [PATCH 06/18] depend on bokeh >= 3.1.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8887d847..cf3abf94 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ "lz4", "ipdb", "psutil", - "bokeh < 3.0.0", + "bokeh >= 3.1.0", "regions" ] From 13ec4d39ed2336ad492126d5b973ccbc2d380b12 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Mon, 11 Nov 2024 10:08:29 +0200 Subject: [PATCH 07/18] update CI to use poetry --- .github/workflows/ci.yml | 83 +++++++++++++++++++++++++++++++++------- pyproject.toml | 49 ++++++++++++++++++++++++ setup.py | 66 -------------------------------- 3 files changed, 119 insertions(+), 79 deletions(-) create mode 100644 pyproject.toml delete mode 100644 setup.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2fe0f60b..bae0362e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,4 +1,4 @@ -name: pfb-imaging Workflow +name: pfb-imaging CI Workflow on: push: @@ -6,13 +6,22 @@ on: - 'v*' pull_request: +env: + POETRY_VERSION: 1.5 + jobs: test: - runs-on: ubuntu-latest + runs-on: ${{ matrix.os }} + continue-on-error: true if: "!contains(github.event.head_commit.message, '[skip ci]')" + + env: + NUMBA_CACHE_DIR: /tmp/numba-cache + strategy: matrix: - python-version: ["3.10", "3.11"] + os: [ubuntu-20.04, ubuntu-22.04] + python-version: ["3.9", "3.10", "3.11"] steps: - name: Set up Python ${{ matrix.python-version }} @@ -20,22 +29,65 @@ jobs: with: python-version: ${{ matrix.python-version}} + - name: Install poetry + uses: abatilo/actions-poetry@v2 + with: + poetry-version: ${{ env.POETRY_VERSION }} + + - name: Check poetry install + run: poetry --version + - name: Checkout source uses: actions/checkout@v3 with: - fetch-depth: 1 + fetch-depth: 0 + + - name: Restore repo times + uses: chetan/git-restore-mtime-action@v2 - name: Upgrade pip and setuptools run: python -m pip install -U pip setuptools - # - name: Pin setuptools - # run: python -m pip install setuptools==65.5 + - name: Create Key and Numba Cache Directory + id: numba-key + run: | + mkdir -p ${{ env.NUMBA_CACHE_DIR }} + echo "timestamp=$(/bin/date -u '+%Y%m%d%H%M%S')" >> $GITHUB_OUTPUT + + - name: Cache Numba Kernels + uses: actions/cache@v3 + with: + key: numba-cache-${{ matrix.python-version }}-${{ steps.numba-key.outputs.timestamp }} + restore-keys: numba-cache-${{ matrix.python-version }}- + path: ${{ env.NUMBA_CACHE_DIR }} + + - name: List the measures directory + run: curl ftp://ftp.astron.nl/outgoing/Measures/ > measures_dir.txt + + - name: Load cached CASA Measures Data + id: load-cached-casa-measures + uses: actions/cache@v3 + with: + key: casa-measures-${{ hashFiles('measures_dir.txt')}} + path: | + ~/measures + ~/.casarc + + - name: Download and install CASA Measures Data + if: steps.load-cached-casa-measures.outputs.cache-hit != 'true' + run: | + mkdir -p ~/measures + curl ftp://ftp.astron.nl/outgoing/Measures/WSRT_Measures.ztar | tar xvzf - -C ~/measures + echo "measures.directory: ~/measures" > ~/.casarc - name: Install pfb-imaging - run: python -m pip install .[testing] + run: poetry install + + - name: Run pfb-imaging + run: poetry run pfb --help - name: Run tests - run: pytest -s -vvv tests/ + run: poetry rin pytest -v tests/ deploy: needs: [test] @@ -48,8 +100,13 @@ jobs: with: python-version: "3.10" - - name: Install latest setuptools, wheel, pip - run: python3 -m pip install -U pip setuptools wheel + - name: Install poetry + uses: abatilo/actions-poetry@v2 + with: + poetry-version: ${{ env.POETRY_VERSION }} + + - name: Check poetry install + run: poetry --version - name: Checkout source uses: actions/checkout@v4 @@ -57,10 +114,10 @@ jobs: fetch-depth: 1 - name: Build distributions - run: python setup.py sdist bdist_wheel + run: poetry build - name: Publish distribution to Test PyPI - uses: pypa/gh-action-pypi-publish@master + uses: pypa/gh-action-pypi-publish@1.8.6 with: user: __token__ password: ${{ secrets.PYPI_TEST_API_TOKEN }} @@ -68,7 +125,7 @@ jobs: continue-on-error: false - name: Publish distribution 📦 to PyPI - uses: pypa/gh-action-pypi-publish@master + uses: pypa/gh-action-pypi-publish@1.8.6 with: user: __token__ password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..7c23ad4c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,49 @@ +[tool.poetry] +name = "pfb-imaging" +version = "0.0.4" +description = "Radio interferometric imaging suite based on a preconditioned forward-backward approach" +authors = ["Landman Bester "] +license = "MIT" +readme = "README.rst" +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Topic :: Scientific/Engineering :: Astronomy" +] +packages = [{include = "pfb"}] + +[tool.poetry.dependencies] +python = ">=3.10, <3.12" +pytest = ">=8.0.0" +scikit-image = ">=0.24.0" +PyWavelets = ">=1.7.0" +katbeam = ">=0.1" +numexpr = ">=2.10.1" +pyscilog = ">=0.1.2" +Click = ">=8.1" +ducc0 = ">=0.34.0" +sympy = ">=1.13" +stimela = { git = "https://github.com/caracal-pipeline/stimela.git", branch = "clickify_missing_as_none" } +streamjoy = ">=0.0.8" +codex-africanus = {extras = ["dask", "scipy", "astropy", "python-casacore"], version = ">=0.3.7, <=0.3.7"} +dask-ms = {extras = ["s3", "xarray", "zarr"], version = ">=0.2.20, <=0.2.20"} +tbb = ">=2021.13.1" +jax = {extras = ["cpu"], version = ">=0.4.31"} +lz4 = ">=4.3.3" +bokeh = ">=3.1.0" +regions = ">=0.9" +psutil = ">=5.9.8" + +[tool.poetry.scripts] +pfb = "pfb.workers.main:cli" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +## add section for dev component which installs builder and Dockerfiles diff --git a/setup.py b/setup.py deleted file mode 100644 index cf3abf94..00000000 --- a/setup.py +++ /dev/null @@ -1,66 +0,0 @@ -from setuptools import setup, find_packages -import pfb - -with open("README.rst", "r") as fh: - long_description = fh.read() - -requirements = [ - 'numpy', - 'pytest >= 8.0.0', - 'scikit-image', - 'PyWavelets', - 'katbeam', - 'numexpr', - 'pyscilog >= 0.1.2', - 'Click', - "ducc0" - "@git+https://github.com/mreineck/ducc.git" - "@ducc0", - "sympy", - "stimela" - "@git+https://github.com/caracal-pipeline/stimela.git" - "@clickify_missing_as_none", - "streamjoy >= 0.0.8", - # "codex-africanus[complete] >= 0.3.7", - "codex-africanus" - "@git+https://github.com/ratt-ru/codex-africanus.git" - "@relax-dependencies", - "dask-ms[xarray, zarr, s3]", - "tbb", - "jax[cpu]", - "lz4", - "ipdb", - "psutil", - "bokeh >= 3.1.0", - "regions" - ] - - -setup( - name='pfb-imaging', - version=pfb.__version__, - author="Landman Bester", - author_email="lbester@sarao.ac.za", - description="Radio interferometric imaging suite base on the pre-conditioned forward-backward algorithm", - long_description=long_description, - long_description_content_type="text/markdown", - url="https://github.com/ratt-ru/pfb-imaging", - packages=find_packages(), - include_package_data=True, - zip_safe=False, - python_requires='>=3.10', - install_requires=requirements, - classifiers=[ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: MIT License", - "Operating System :: POSIX :: Linux", - "Topic :: Scientific/Engineering :: Astronomy", - ], - entry_points={'console_scripts':[ - 'pfb = pfb.workers.main:cli' - ] - } - - - , - ) From 9faa8a68e889d6332304e2a27e5f831bd58cf2c1 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Mon, 11 Nov 2024 10:10:40 +0200 Subject: [PATCH 08/18] remove 3.9 from test matrix --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bae0362e..138f2ac7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: strategy: matrix: os: [ubuntu-20.04, ubuntu-22.04] - python-version: ["3.9", "3.10", "3.11"] + python-version: ["3.10", "3.11"] steps: - name: Set up Python ${{ matrix.python-version }} @@ -87,7 +87,7 @@ jobs: run: poetry run pfb --help - name: Run tests - run: poetry rin pytest -v tests/ + run: poetry run pytest -v tests/ deploy: needs: [test] From 315ff3985907c1e8f03fefd57f6d237c47adf708 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Mon, 11 Nov 2024 10:28:30 +0200 Subject: [PATCH 09/18] update dask-ms and codex dependency versions --- pfb/workers/sara.py | 2 +- pyproject.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pfb/workers/sara.py b/pfb/workers/sara.py index 96dce2bf..b2232f6b 100644 --- a/pfb/workers/sara.py +++ b/pfb/workers/sara.py @@ -214,7 +214,7 @@ def _sara(**kw): ntol = 1 pd_tol = [opts.pd_tol] niters = opts.niter - if ntol < niters: + if ntol <= niters: pd_tolf = pd_tol[-1] pd_tol += [pd_tolf]*niters # no harm in too many diff --git a/pyproject.toml b/pyproject.toml index 7c23ad4c..9cdc003e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,8 +30,8 @@ ducc0 = ">=0.34.0" sympy = ">=1.13" stimela = { git = "https://github.com/caracal-pipeline/stimela.git", branch = "clickify_missing_as_none" } streamjoy = ">=0.0.8" -codex-africanus = {extras = ["dask", "scipy", "astropy", "python-casacore"], version = ">=0.3.7, <=0.3.7"} -dask-ms = {extras = ["s3", "xarray", "zarr"], version = ">=0.2.20, <=0.2.20"} +codex-africanus = {extras = ["dask", "scipy", "astropy", "python-casacore"], version = ">=0.4.1"} +dask-ms = {extras = ["s3", "xarray", "zarr"], version = ">=0.2.22"} tbb = ">=2021.13.1" jax = {extras = ["cpu"], version = ">=0.4.31"} lz4 = ">=4.3.3" From be93da87a6014618758ae5b970ad841c2334f98c Mon Sep 17 00:00:00 2001 From: landmanbester Date: Mon, 11 Nov 2024 12:32:18 +0200 Subject: [PATCH 10/18] pin dask <2024.11.0 --- pfb/parser/init.yaml | 7 ++++++- pfb/utils/stokes2vis.py | 17 +++++++++-------- pyproject.toml | 1 + 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/pfb/parser/init.yaml b/pfb/parser/init.yaml index 9acaa602..a8f8982e 100644 --- a/pfb/parser/init.yaml +++ b/pfb/parser/init.yaml @@ -121,11 +121,16 @@ inputs: info: Display progress. Use --no-progressbar to deactivate. - memory_reporting: + memory-reporting: dtype: bool default: false info: Report worker memory as tasks complete + check_ants: + dtype: bool + default: false + info: + Check that ANTENNA1 and ANTENNA2 tables are consistent with the ANTENNA table. _include: - (.)out.yml diff --git a/pfb/utils/stokes2vis.py b/pfb/utils/stokes2vis.py index 63f8f054..f2941fb9 100644 --- a/pfb/utils/stokes2vis.py +++ b/pfb/utils/stokes2vis.py @@ -164,14 +164,15 @@ def single_stokes( # check that antpos gives the correct size table antmax = allants.size - try: - assert antmax == nant - except Exception as e: - raise ValueError('Inconsistent ANTENNA table. ' - 'Shape does not match max number of antennas ' - 'as inferred from ant1 and ant2. ' - f'Table size is {antpos.shape} but got {antmax}. ' - f'{oname}') + if opts.check_ants: + try: + assert antmax == nant + except Exception as e: + raise ValueError('Inconsistent ANTENNA table. ' + 'Shape does not match max number of antennas ' + 'as inferred from ant1 and ant2. ' + f'Table size is {antpos.shape} but got {antmax}. ' + f'{oname}') # relabel antennas by index # this only works because allants is sorted in ascending order diff --git a/pyproject.toml b/pyproject.toml index 9cdc003e..8ed8dbc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ ducc0 = ">=0.34.0" sympy = ">=1.13" stimela = { git = "https://github.com/caracal-pipeline/stimela.git", branch = "clickify_missing_as_none" } streamjoy = ">=0.0.8" +dask = ">=2023.1.1, <2024.11.0" codex-africanus = {extras = ["dask", "scipy", "astropy", "python-casacore"], version = ">=0.4.1"} dask-ms = {extras = ["s3", "xarray", "zarr"], version = ">=0.2.22"} tbb = ">=2021.13.1" From 9a09feb331b3f3100e6feb5d12d14457f3f337c9 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Mon, 11 Nov 2024 13:10:39 +0200 Subject: [PATCH 11/18] temp solution for issue-120 --- pfb/workers/init.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pfb/workers/init.py b/pfb/workers/init.py index 07dfee75..8edc0e4e 100644 --- a/pfb/workers/init.py +++ b/pfb/workers/init.py @@ -254,7 +254,7 @@ def _init(**kw): Irow = slice(ridx[0], ridx[-1] + rcnts[-1]) fitr = enumerate(zip(freq_mapping[ms][idt]['start_indices'], - freq_mapping[ms][idt]['counts'])) + freq_mapping[ms][idt]['counts'])) for fi, (flow, fcounts) in fitr: Inu = slice(flow, flow + fcounts) @@ -276,7 +276,7 @@ def _init(**kw): utimes[ms][idt][It], ridx, rcnts, radecs[ms][idt], - fi, ti, ims, ms, flow, flow+fcounts]) + ddid + fi, ti, ims, ms, flow, flow+fcounts]) futures = [] associated_workers = {} From 1ab26767d72716220c6109ab7dc5dc13eae70457 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Mon, 11 Nov 2024 14:53:19 +0200 Subject: [PATCH 12/18] slightly more general frequency<->band mapping --- pfb/workers/init.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/pfb/workers/init.py b/pfb/workers/init.py index 8edc0e4e..312a037f 100644 --- a/pfb/workers/init.py +++ b/pfb/workers/init.py @@ -208,6 +208,20 @@ def _init(**kw): print(f"No weights provided, using unity weights", file=log) + # band mapping + ddid2bid = {} + b0 = 0 + for ms in opts.ms: + for idt, fmap in freq_mapping[ms].items(): + ilo = idt.find('DDID') + 4 + ihi = idt.rfind('_') + ddid = int(idt[ilo:ihi]) + if (opts.ddids is not None) and (ddid not in opts.ddids): + continue + elif ddid not in ddid2bid.keys(): + ddid2bid[ddid] = b0 + b0 += fmap['counts'].size + # a flat list to use with as_completed datasets = [] @@ -223,17 +237,12 @@ def _init(**kw): fid = ds.FIELD_ID ddid = ds.DATA_DESC_ID scanid = ds.SCAN_NUMBER - # TODO - cleaner syntax - if opts.fields is not None: - if fid not in opts.fields: - continue - if opts.ddids is not None: - if ddid not in opts.ddids: - continue - if opts.scans is not None: - if scanid not in opts.scans: - continue - + if (opts.fields is not None) and (fid not in opts.fields): + continue + if (opts.ddids is not None) and (ddid not in opts.ddids): + continue + if (opts.scans is not None) and (scanid not in opts.scans): + continue idt = f"FIELD{fid}_DDID{ddid}_SCAN{scanid}" nrow = ds.sizes['row'] @@ -255,7 +264,7 @@ def _init(**kw): fitr = enumerate(zip(freq_mapping[ms][idt]['start_indices'], freq_mapping[ms][idt]['counts'])) - + b0 = ddid2bid[ddid] for fi, (flow, fcounts) in fitr: Inu = slice(flow, flow + fcounts) @@ -276,7 +285,7 @@ def _init(**kw): utimes[ms][idt][It], ridx, rcnts, radecs[ms][idt], - ddid + fi, ti, ims, ms, flow, flow+fcounts]) + b0 + fi, ti, ims, ms, flow, flow+fcounts]) futures = [] associated_workers = {} From 363d2875dc4b4b3f987ac733e75f85d4bb21d1e3 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Mon, 11 Nov 2024 17:16:52 +0200 Subject: [PATCH 13/18] generalise band mapping for multi-ms and multi-spw data --- pfb/utils/misc.py | 6 ------ pfb/workers/init.py | 39 ++++++++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/pfb/utils/misc.py b/pfb/utils/misc.py index eef8bf80..b500eb95 100644 --- a/pfb/utils/misc.py +++ b/pfb/utils/misc.py @@ -461,12 +461,6 @@ def construct_mappings(ms_name, raise RuntimeError("Something went wrong constructing the " "frequency mapping. sum(fchunks != nchan)") - # nfreq_chunks = nchan_in // cpit - # freq_chunks = (cpit,)*nfreq_chunks - # rem = nchan_in - nfreq_chunks * cpit - # if rem: - # freq_chunks += (rem,) - ms_chunks[ms].append({'row': row_chunks, 'chan': freq_chunks}) diff --git a/pfb/workers/init.py b/pfb/workers/init.py index 312a037f..7381b516 100644 --- a/pfb/workers/init.py +++ b/pfb/workers/init.py @@ -207,20 +207,41 @@ def _init(**kw): else: print(f"No weights provided, using unity weights", file=log) - - # band mapping - ddid2bid = {} - b0 = 0 + # distinct freq groups + igroup = 0 + sgroup = 0 + freq_groups = {} for ms in opts.ms: - for idt, fmap in freq_mapping[ms].items(): + for idt, freq in freqs[ms].items(): ilo = idt.find('DDID') + 4 ihi = idt.rfind('_') ddid = int(idt[ilo:ihi]) if (opts.ddids is not None) and (ddid not in opts.ddids): continue - elif ddid not in ddid2bid.keys(): - ddid2bid[ddid] = b0 - b0 += fmap['counts'].size + if not len(freq_groups.keys()): + freq_groups[igroup] = {} + freq_groups[igroup]['freq'] = freq + freq_groups[igroup]['sgroup'] = sgroup + igroup += 1 + sgroup += freq_mapping[ms][idt]['counts'].size + else: + for i, fs in freq_groups.items(): + if not np.all(freq == fs['freq']): + freq_groups[igroup] = {} + freq_groups[igroup]['freq'] = freq + freq_groups[igroup]['sgroup'] = sgroup + igroup += 1 + sgroup += freq_mapping[ms][idt]['counts'].size + + # band mapping + msddid2bid = {} + for ms in opts.ms: + msddid2bid[ms] = {} + for idt, freq in freqs[ms].items(): + # find group where it matches + for igroup, fs in freq_groups.items(): + if np.all(freq == fs['freq']): + msddid2bid[ms][idt] = fs['sgroup'] # a flat list to use with as_completed datasets = [] @@ -264,7 +285,7 @@ def _init(**kw): fitr = enumerate(zip(freq_mapping[ms][idt]['start_indices'], freq_mapping[ms][idt]['counts'])) - b0 = ddid2bid[ddid] + b0 = msddid2bid[ms][idt] for fi, (flow, fcounts) in fitr: Inu = slice(flow, flow + fcounts) From 29eddee224b824baf8607dc6ec58042f3d38607f Mon Sep 17 00:00:00 2001 From: landmanbester Date: Mon, 11 Nov 2024 19:29:00 +0200 Subject: [PATCH 14/18] simplify logic in freq<->band mapping --- pfb/workers/init.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/pfb/workers/init.py b/pfb/workers/init.py index 7381b516..ba9cabe2 100644 --- a/pfb/workers/init.py +++ b/pfb/workers/init.py @@ -208,9 +208,9 @@ def _init(**kw): print(f"No weights provided, using unity weights", file=log) # distinct freq groups - igroup = 0 sgroup = 0 - freq_groups = {} + freq_groups = [] + freq_sgroups = [] for ms in opts.ms: for idt, freq in freqs[ms].items(): ilo = idt.find('DDID') + 4 @@ -218,20 +218,20 @@ def _init(**kw): ddid = int(idt[ilo:ihi]) if (opts.ddids is not None) and (ddid not in opts.ddids): continue - if not len(freq_groups.keys()): - freq_groups[igroup] = {} - freq_groups[igroup]['freq'] = freq - freq_groups[igroup]['sgroup'] = sgroup - igroup += 1 + if not len(freq_groups): + freq_groups.append(freq) + freq_sgroups.append(sgroup) sgroup += freq_mapping[ms][idt]['counts'].size else: - for i, fs in freq_groups.items(): - if not np.all(freq == fs['freq']): - freq_groups[igroup] = {} - freq_groups[igroup]['freq'] = freq - freq_groups[igroup]['sgroup'] = sgroup - igroup += 1 - sgroup += freq_mapping[ms][idt]['counts'].size + in_group = False + for fs in freq_groups: + if freq.size == fs.size and np.all(freq == fs): + in_group = True + break + if not in_group: + freq_groups.append(freq) + freq_sgroups.append(sgroup) + sgroup += freq_mapping[ms][idt]['counts'].size # band mapping msddid2bid = {} @@ -239,13 +239,12 @@ def _init(**kw): msddid2bid[ms] = {} for idt, freq in freqs[ms].items(): # find group where it matches - for igroup, fs in freq_groups.items(): - if np.all(freq == fs['freq']): - msddid2bid[ms][idt] = fs['sgroup'] + for sgroup, fs in zip(freq_sgroups, freq_groups): + if freq.size == fs.size and np.all(freq == fs): + msddid2bid[ms][idt] = sgroup # a flat list to use with as_completed datasets = [] - for ims, ms in enumerate(opts.ms): xds = xds_from_ms(ms, chunks=ms_chunks[ms], columns=columns, table_schema=schema, group_cols=group_by) From ca12cb4acbbde6adee61f26ced4d28790ea4076e Mon Sep 17 00:00:00 2001 From: landmanbester Date: Tue, 12 Nov 2024 10:16:54 +0200 Subject: [PATCH 15/18] tweak fluxtractor --- pfb/opt/pcg.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pfb/opt/pcg.py b/pfb/opt/pcg.py index 416151a6..6afee248 100644 --- a/pfb/opt/pcg.py +++ b/pfb/opt/pcg.py @@ -426,6 +426,7 @@ def pcg_dds(ds_name, else: j = ds.DIRTY.values * mask * ds.BEAM.values + beam = mask*ds.BEAM.values psf = ds.PSF.values nx_psf, py_psf = psf.shape nx, ny = j.shape @@ -497,10 +498,13 @@ def pcg_dds(ds_name, weight=ds.WEIGHT.values, vis_mask=ds.MASK.values, freq=ds.FREQ.values, - beam=ds.BEAM.values, + beam=beam, cell=ds.cell_rad, x0=ds.x0, y0=ds.y0, + flip_u=ds.flip_u, + flip_v=ds.flip_v, + flip_w=ds.flip_w, do_wgridding=do_wgridding, epsilon=epsilon, double_accum=double_accum, From a0b8086a2268f79b401ecb893b8d4474b385cb72 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Tue, 12 Nov 2024 12:50:48 +0200 Subject: [PATCH 16/18] fix multiple logo printing, rename klean -> kclean --- pfb/parser/{klean.yaml => kclean.yaml} | 0 pfb/parser/uncabbedcabs.yml | 6 ++-- pfb/workers/{klean.py => kclean.py} | 14 ++++----- pfb/workers/main.py | 4 +-- tests/{test_klean.py => test_kclean.py} | 40 ++++++++++++------------- 5 files changed, 32 insertions(+), 32 deletions(-) rename pfb/parser/{klean.yaml => kclean.yaml} (100%) rename pfb/workers/{klean.py => kclean.py} (98%) rename tests/{test_klean.py => test_kclean.py} (92%) diff --git a/pfb/parser/klean.yaml b/pfb/parser/kclean.yaml similarity index 100% rename from pfb/parser/klean.yaml rename to pfb/parser/kclean.yaml diff --git a/pfb/parser/uncabbedcabs.yml b/pfb/parser/uncabbedcabs.yml index db4e92b5..ba7e7797 100644 --- a/pfb/parser/uncabbedcabs.yml +++ b/pfb/parser/uncabbedcabs.yml @@ -25,14 +25,14 @@ pfb.degrid: _include: - (.)degrid.yaml -pfb.klean: - command: pfb.workers.klean.klean +pfb.kclean: + command: pfb.workers.kclean.kclean flavour: python policies: pass_missing_as_none: true _include: - - (.)klean.yaml + - (.)kclean.yaml pfb.restore: command: pfb.workers.restore.restore diff --git a/pfb/workers/klean.py b/pfb/workers/kclean.py similarity index 98% rename from pfb/workers/klean.py rename to pfb/workers/kclean.py index 506630f5..2ed65896 100644 --- a/pfb/workers/klean.py +++ b/pfb/workers/kclean.py @@ -7,15 +7,15 @@ from omegaconf import OmegaConf import pyscilog pyscilog.init('pfb') -log = pyscilog.get_logger('KLEAN') +log = pyscilog.get_logger('kclean') from scabha.schema_utils import clickify_parameters from pfb.parser.schemas import schema @cli.command(context_settings={'show_default': True}) -@clickify_parameters(schema.klean) -def klean(**kw): +@clickify_parameters(schema.kclean) +def kclean(**kw): ''' Modified single-scale clean. ''' @@ -40,7 +40,7 @@ def klean(**kw): import time timestamp = time.strftime("%Y%m%d-%H%M%S") - logname = f'{str(opts.log_directory)}/klean_{timestamp}.log' + logname = f'{str(opts.log_directory)}/kclean_{timestamp}.log' pyscilog.log_to_file(logname) print(f'Logs will be written to {logname}', file=log) @@ -59,7 +59,7 @@ def klean(**kw): with ExitStack() as stack: ti = time.time() - _klean(**opts) + _kclean(**opts) dds, dds_list = xds_from_url(dds_name) @@ -84,7 +84,7 @@ def klean(**kw): print(f"All done after {time.time() - ti}s", file=log) -def _klean(**kw): +def _kclean(**kw): opts = OmegaConf.create(kw) OmegaConf.set_struct(opts, True) @@ -113,7 +113,7 @@ def _klean(**kw): nx, ny = dds[0].x.size, dds[0].y.size nx_psf, ny_psf = dds[0].x_psf.size, dds[0].y_psf.size if nx_psf//2 < nx or ny_psf//2 < ny: - raise ValueError("klean currently assumes a double sized PSF") + raise ValueError("kclean currently assumes a double sized PSF") lastsize = ny_psf freq_out = [] time_out = [] diff --git a/pfb/workers/main.py b/pfb/workers/main.py index 730f4009..7d2682ce 100644 --- a/pfb/workers/main.py +++ b/pfb/workers/main.py @@ -1,14 +1,14 @@ # flake8: noqa import click from pfb import logo -logo() @click.group() def cli(): + logo() pass -from pfb.workers import (init, grid, degrid, klean, +from pfb.workers import (init, grid, degrid, kclean, restore, model2comps, fluxtractor, hci, smoovie, sara) diff --git a/tests/test_klean.py b/tests/test_kclean.py similarity index 92% rename from tests/test_klean.py rename to tests/test_kclean.py index 01db80cb..238dd60c 100644 --- a/tests/test_klean.py +++ b/tests/test_kclean.py @@ -8,7 +8,7 @@ pmp = pytest.mark.parametrize @pmp('do_gains', (True, False)) -def test_klean(do_gains, ms_name): +def test_kclean(do_gains, ms_name): ''' Here we test that clean correctly infers the fluxes of point sources placed at the centers of pixels in the presence of the wterm and DI gain @@ -227,26 +227,26 @@ def test_klean(do_gains, ms_name): from pfb.workers.grid import _grid _grid(**grid_args) - # run klean - klean_args = {} - for key in schema.klean["inputs"].keys(): - klean_args[key.replace("-", "_")] = schema.klean["inputs"][key]["default"] - klean_args["output_filename"] = outname - klean_args["dirosion"] = 0 - klean_args["do_residual"] = False - klean_args["niter"] = 100 + # run kclean + kclean_args = {} + for key in schema.kclean["inputs"].keys(): + kclean_args[key.replace("-", "_")] = schema.kclean["inputs"][key]["default"] + kclean_args["output_filename"] = outname + kclean_args["dirosion"] = 0 + kclean_args["do_residual"] = False + kclean_args["niter"] = 100 threshold = 1e-1 - klean_args["threshold"] = threshold - klean_args["gamma"] = 0.1 - klean_args["peak_factor"] = 0.75 - klean_args["sub_peak_factor"] = 0.75 - klean_args["nthreads"] = 1 - klean_args["do_wgridding"] = True - klean_args["epsilon"] = epsilon - klean_args["mop_flux"] = True - klean_args["fits_mfs"] = False - from pfb.workers.klean import _klean - _klean(**klean_args) + kclean_args["threshold"] = threshold + kclean_args["gamma"] = 0.1 + kclean_args["peak_factor"] = 0.75 + kclean_args["sub_peak_factor"] = 0.75 + kclean_args["nthreads"] = 1 + kclean_args["do_wgridding"] = True + kclean_args["epsilon"] = epsilon + kclean_args["mop_flux"] = True + kclean_args["fits_mfs"] = False + from pfb.workers.kclean import _kclean + _kclean(**kclean_args) # get inferred model dds, _ = xds_from_url(dds_name) From 7c71b2485f99e358ae4bb9e93fa02f2fd6152f40 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Tue, 12 Nov 2024 13:41:43 +0200 Subject: [PATCH 17/18] fix zero-model-outside-mask option --- pfb/opt/pcg.py | 115 ++++++++++++------------------------ pfb/parser/fluxtractor.yaml | 7 +-- pfb/workers/fluxtractor.py | 37 ++---------- 3 files changed, 43 insertions(+), 116 deletions(-) diff --git a/pfb/opt/pcg.py b/pfb/opt/pcg.py index 6afee248..8fa0f4b2 100644 --- a/pfb/opt/pcg.py +++ b/pfb/opt/pcg.py @@ -390,7 +390,6 @@ def pcg_psf(psfhat, def pcg_dds(ds_name, eta, # regularisation for Hessian approximation - sigma, # regularisation for preconditioner mask=1.0, use_psf=True, residual_name='RESIDUAL', @@ -399,6 +398,7 @@ def pcg_dds(ds_name, epsilon=5e-4, double_accum=True, nthreads=1, + zero_model_outside_mask=False, tol=1e-5, maxit=500, verbosity=1, @@ -413,84 +413,49 @@ def pcg_dds(ds_name, if not isinstance(ds_name, list): ds_name = [ds_name] - # drop_vars = ['PSF'] - # if not use_psf: - # drop_vars.append('PSFHAT') - drop_vars = None + drop_vars = ['PSF', 'PSFHAT'] ds = xds_from_list(ds_name, nthreads=nthreads, drop_vars=drop_vars)[0] - - if residual_name in ds: - j = getattr(ds, residual_name).values * mask * ds.BEAM.values - ds = ds.drop_vars(residual_name) + beam = mask * ds.BEAM.values + if zero_model_outside_mask: + if model_name not in ds: + raise RuntimeError(f"Asked to zero model outside mask but {model_name} not in dds") + model = getattr(ds, model_name).values + model = np.where(mask > 0, model, 0.0) + resid = ds.DIRTY.values - _hessian_slice( + model, + uvw=ds.UVW.values, + weight=ds.WEIGHT.values, + vis_mask=ds.MASK.values, + freq=ds.FREQ.values, + beam=ds.BEAM.values, + cell=ds.cell_rad, + x0=ds.x0, + y0=ds.y0, + do_wgridding=do_wgridding, + epsilon=epsilon, + double_accum=double_accum, + nthreads=nthreads) + j = resid * beam else: - j = ds.DIRTY.values * mask * ds.BEAM.values + if model_name in ds: + model = getattr(ds, model_name).values + else: + model = np.zeros(mask.shape, dtype=float) + + if residual_name in ds: + j = getattr(ds, residual_name).values * beam + ds = ds.drop_vars(residual_name) + else: + j = ds.DIRTY.values * beam - beam = mask*ds.BEAM.values - psf = ds.PSF.values - nx_psf, py_psf = psf.shape nx, ny = j.shape - wsum = np.sum(ds.WEIGHT.values * ds.MASK.values) - psf /= wsum + wsum = ds.wsum j /= wsum - - # downweight edges of field compared to center - # this allows the PCG to downweight the fit to the edges - # which may be contaminated by edge effects and also - # stabalises the preconditioner - width = np.minimum(int(0.1*nx), 32) - taperxy = taperf((nx, ny), width) - # eta /= taperxy - - # set precond if PSF is present - if 'PSFHAT' in ds and use_psf: - psfhat = np.abs(ds.PSFHAT.values)/wsum - ds.drop_vars(('PSFHAT')) - nx_psf, nyo2 = psfhat.shape - ny_psf = 2*(nyo2-1) # is this always the case? - nxpadl = (nx_psf - nx)//2 - nxpadr = nx_psf - nx - nxpadl - nypadl = (ny_psf - ny)//2 - nypadr = ny_psf - ny - nypadl - if nx_psf != nx: - unpad_x = slice(nxpadl, -nxpadr) - else: - unpad_x = slice(None) - if ny_psf != ny: - unpad_y = slice(nypadl, -nypadr) - else: - unpad_y = slice(None) - xpad = empty_noncritical((nx_psf, ny_psf), - dtype=j.dtype) - xhat = empty_noncritical((nx_psf, nyo2), - dtype='c16') - xout = empty_noncritical((nx, ny), - dtype=j.dtype) - precond = partial( - hess_direct_slice, - xpad=xpad, - xhat=xhat, - xout=xout, - abspsf=psfhat, - taperxy=taperxy, - lastsize=ny_psf, - nthreads=nthreads, - eta=sigma, - mode='backward') - - x0 = precond(j) - - # get intrinsic resolution by deconvolving psf - upsf = precond(psf[unpad_x, unpad_y]) - upsf /= upsf.max() - gaussparu = fitcleanbeam(upsf[None], level=0.25, pixsize=1.0)[0] - ds = ds.assign(**{ - 'UPSF': (('x', 'y'), upsf) - }) - ds = ds.assign_attrs(gaussparu=gaussparu) + precond = None + if 'UPDATE' in ds: + x0 = ds.UPDATE.values * mask else: - # print('Not using preconditioning') - precond = None x0 = np.zeros_like(j) hess = partial(_hessian_slice, @@ -524,11 +489,7 @@ def pcg_dds(ds_name, backtrack=False, return_resid=False) - if model_name in ds: - model = getattr(ds, model_name).values + x - else: - model = x - + model += x resid = ds.DIRTY.values - _hessian_slice( model, diff --git a/pfb/parser/fluxtractor.yaml b/pfb/parser/fluxtractor.yaml index 965998bf..6aa40fbd 100644 --- a/pfb/parser/fluxtractor.yaml +++ b/pfb/parser/fluxtractor.yaml @@ -30,14 +30,9 @@ inputs: eta: dtype: float default: 1e-5 - abbreviation: sinv + abbreviation: eta info: Standard deviation of assumed GRF prior - sigma: - dtype: float - default: 1 - info: - The value that is added to |psfhat| for preconditioning. model-name: dtype: str default: MODEL diff --git a/pfb/workers/fluxtractor.py b/pfb/workers/fluxtractor.py index 996f9d00..7a1eec42 100644 --- a/pfb/workers/fluxtractor.py +++ b/pfb/workers/fluxtractor.py @@ -124,19 +124,6 @@ def fluxtractor(**kw): do_mfs=opts.fits_mfs, do_cube=opts.fits_cubes) futures.append(fut) - try: - fut = client.submit( - dds2fits, - dds_list, - 'UPSF', - f'{fits_oname}_{opts.suffix}', - norm_wsum=False, - nthreads=opts.nthreads, - do_mfs=opts.fits_mfs, - do_cube=opts.fits_cubes) - futures.append(fut) - except Exception as e: - print(e) for fut in as_completed(futures): column = fut.result() @@ -228,22 +215,6 @@ def _fluxtractor(**kw): mask = mask.astype(residual.dtype) print('Using provided fits mask', file=log) - if opts.zero_model_outside_mask and not opts.or_mask_with_model: - model[:, mask<1] = 0 - print("Recomputing residual since asked to zero model", file=log) - convimage = hess(model) - ne.evaluate('dirty - convimage', out=residual, - casting='same_kind') - ne.evaluate('sum(residual, axis=0)', out=residual_mfs, - casting='same_kind') - save_fits(np.mean(model[fsel], axis=0), - basename + f'_{opts.suffix}_model_mfs_zeroed.fits', - hdr_mfs) - save_fits(residual_mfs, - basename + f'_{opts.suffix}_residual_mfs_zeroed.fits', - hdr_mfs) - - else: mask = np.ones((nx, ny), dtype=residual.dtype) print('Caution - No mask is being applied', file=log) @@ -255,22 +226,22 @@ def _fluxtractor(**kw): print("Solving for update", file=log) try: - from distributed import get_client + from distributed import get_client, wait, as_completed client = get_client() names = list(client.scheduler_info()['workers'].keys()) - from distributed import as_completed + foo = client.scatter(mask, broadcast=True) + wait(foo) except: from pfb.utils.dist import fake_client client = fake_client() names = [0] as_completed = lambda x: x futures = [] - for wname, ds, ds_name in zip(cycle(names), dds, dds_list): + for wname, ds_name in zip(cycle(names), dds_list): fut = client.submit( pcg_dds, ds_name, opts.eta, - opts.sigma, use_psf=opts.use_psf, residual_name=opts.residual_name, model_name=opts.model_name, From c327df1ee942c62d4e8bb84daecd41cfc39fc661 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Tue, 12 Nov 2024 13:51:03 +0200 Subject: [PATCH 18/18] pass zmom option to pcg_dds --- pfb/opt/pcg.py | 1 + pfb/workers/fluxtractor.py | 1 + 2 files changed, 2 insertions(+) diff --git a/pfb/opt/pcg.py b/pfb/opt/pcg.py index 8fa0f4b2..4e63c896 100644 --- a/pfb/opt/pcg.py +++ b/pfb/opt/pcg.py @@ -422,6 +422,7 @@ def pcg_dds(ds_name, raise RuntimeError(f"Asked to zero model outside mask but {model_name} not in dds") model = getattr(ds, model_name).values model = np.where(mask > 0, model, 0.0) + print("Zeroing model outside mask") resid = ds.DIRTY.values - _hessian_slice( model, uvw=ds.UVW.values, diff --git a/pfb/workers/fluxtractor.py b/pfb/workers/fluxtractor.py index 7a1eec42..a9f136e5 100644 --- a/pfb/workers/fluxtractor.py +++ b/pfb/workers/fluxtractor.py @@ -250,6 +250,7 @@ def _fluxtractor(**kw): epsilon=opts.epsilon, double_accum=opts.double_accum, nthreads=opts.nthreads, + zero_model_outside_mask=opts.zero_model_outside_mask, tol=opts.cg_tol, maxit=opts.cg_maxit, verbosity=opts.cg_verbose,