Skip to content

Commit

Permalink
attempt to write to zarr in separate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
landmanbester committed Sep 20, 2024
1 parent c540401 commit 6a84862
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
17 changes: 14 additions & 3 deletions pfb/operators/gridder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
because we are not guaranteed that each imaging band has the same
number of rows after BDA.
"""
import concurrent.futures as cf
from time import time
import numpy as np
import numba
Expand Down Expand Up @@ -757,17 +758,27 @@ def compute_residual(dsl,
ti = time()
ds['MODEL'] = (('x','y'), model)
ds['RESIDUAL'] = (('x','y'), residual)
tassign = time() - ti

# save
ds.to_zarr(output_name, mode='a')
ti = time()
with cf.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(dataset_to_zarr, ds, output_name)
# future =None
# ti = time()
# ds.to_zarr(output_name, mode='a')
twrite = time() - ti

ttot = time() - tii
ttally = tread + tdegrid + tgrid + tdiff + twrite
ttally = tread + tdegrid + tgrid + tdiff + tassign + twrite
print(f'tread = {tread/ttot}')
print(f'tdegrid = {tdegrid/ttot}')
print(f'tgrid = {tgrid/ttot}')
print(f'tdiff = {tdiff/ttot}')
print(f'tassign = {tassign/ttot}')
print(f'twrite = {twrite/ttot}')
print(f'ttally = {ttally/ttot}')
return residual
return residual, future

def dataset_to_zarr(ds, output_name):
ds.to_zarr(output_name, mode='a')
2 changes: 1 addition & 1 deletion pfb/utils/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def set_residual(self, k, x=None):
return np.zeros_like(x)

# we don;t need counts here because we use the weights in the dds
residual = compute_residual(
residual, _ = compute_residual(
self.cache_path,
self.nx, self.ny,
self.cell_rad, self.cell_rad,
Expand Down
4 changes: 2 additions & 2 deletions pfb/workers/klean.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _klean(**kw):
print(f'Computing residual', file=log)
for ds_name, ds in zip(dds_list, dds):
b = int(ds.bandid)
resid = compute_residual(ds_name,
resid, _ = compute_residual(ds_name,
nx, ny,
cell_rad, cell_rad,
ds_name,
Expand Down Expand Up @@ -349,7 +349,7 @@ def _klean(**kw):
print(f'Computing residual', file=log)
for ds_name, ds in zip(dds_list, dds):
b = int(ds.bandid)
resid = compute_residual(ds_name,
resid, _ = compute_residual(ds_name,
nx, ny,
cell_rad, cell_rad,
ds_name,
Expand Down
12 changes: 9 additions & 3 deletions pfb/workers/sara.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import sys
from contextlib import ExitStack
import concurrent.futures as cf
from pfb.workers.main import cli
import click
from omegaconf import OmegaConf
Expand Down Expand Up @@ -328,8 +329,7 @@ def _sara(**kw):
residual *= beam # avoid copy
update = precond.idot(residual,
mode=opts.hess_approx,
# only warm start close to convergence
x0=update if eps < 0.1 else None)
x0=update if update.any() else None)
update_mfs = np.mean(update, axis=0)
save_fits(update_mfs,
fits_oname + f'_{opts.suffix}_update_{k+1}.fits',
Expand Down Expand Up @@ -441,9 +441,10 @@ def _sara(**kw):
hdr_mfs)

print(f'Computing residual', file=log)
write_futures = []
for ds_name, ds in zip(dds_list, dds):
b = int(ds.bandid)
residual[b] = compute_residual(
residual[b], fut = compute_residual(
ds_name,
nx, ny,
cell_rad, cell_rad,
Expand All @@ -453,6 +454,8 @@ def _sara(**kw):
epsilon=opts.epsilon,
do_wgridding=opts.do_wgridding,
double_accum=opts.double_accum)
write_futures.append(fut)

residual /= wsum
residual_mfs = np.sum(residual, axis=0)
save_fits(residual_mfs,
Expand Down Expand Up @@ -532,4 +535,7 @@ def _sara(**kw):
print("Algorithm is diverging. Terminating.", file=log)
break

# make sure write futures have finished
cf.wait(write_futures)

return

0 comments on commit 6a84862

Please sign in to comment.